Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP
Browse files

Rename RowMutation->Mutation in preparation for Row->Partition

  • Loading branch information...
commit 6bbb13b9b0fe62de2a2140055af2ea6968c73ccc 1 parent d753661
@iamaleksey iamaleksey authored
Showing with 740 additions and 815 deletions.
  1. +5 −5 examples/triggers/src/org/apache/cassandra/triggers/InvertedIndex.java
  2. +26 −26 src/java/org/apache/cassandra/config/CFMetaData.java
  3. +7 −7 src/java/org/apache/cassandra/config/ColumnDefinition.java
  4. +14 −13 src/java/org/apache/cassandra/config/KSMetaData.java
  5. +8 −8 src/java/org/apache/cassandra/config/TriggerDefinition.java
  6. +8 −8 src/java/org/apache/cassandra/config/UTMetaData.java
  7. +10 −12 src/java/org/apache/cassandra/cql/DeleteStatement.java
  8. +15 −20 src/java/org/apache/cassandra/cql/UpdateStatement.java
  9. +5 −5 src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
  10. +11 −11 src/java/org/apache/cassandra/db/BatchlogManager.java
  11. +2 −2 src/java/org/apache/cassandra/db/CollationController.java
  12. +3 −3 src/java/org/apache/cassandra/db/CounterCell.java
  13. +37 −42 src/java/org/apache/cassandra/db/CounterMutation.java
  14. +3 −3 src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
  15. +3 −5 src/java/org/apache/cassandra/db/DefsTables.java
  16. +18 −18 src/java/org/apache/cassandra/db/HintedHandOffManager.java
  17. +2 −2 src/java/org/apache/cassandra/db/Keyspace.java
  18. +4 −4 src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
  19. +37 −36 src/java/org/apache/cassandra/db/{RowMutation.java → Mutation.java}
  20. +10 −12 src/java/org/apache/cassandra/db/{RowMutationVerbHandler.java → MutationVerbHandler.java}
  21. +3 −4 src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
  22. +15 −18 src/java/org/apache/cassandra/db/SystemKeyspace.java
  23. +6 −7 src/java/org/apache/cassandra/db/commitlog/CommitLog.java
  24. +26 −26 src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
  25. +7 −7 src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
  26. +4 −5 src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
  27. +4 −4 src/java/org/apache/cassandra/net/MessagingService.java
  28. +16 −16 src/java/org/apache/cassandra/service/MigrationManager.java
  29. +3 −3 src/java/org/apache/cassandra/service/MigrationTask.java
  30. +4 −5 src/java/org/apache/cassandra/service/RowDataResolver.java
  31. +28 −30 src/java/org/apache/cassandra/service/StorageProxy.java
  32. +1 −1  src/java/org/apache/cassandra/service/StorageService.java
  33. +3 −8 src/java/org/apache/cassandra/service/paxos/Commit.java
  34. +3 −3 src/java/org/apache/cassandra/service/paxos/PaxosState.java
  35. +2 −2 src/java/org/apache/cassandra/sink/IRequestSink.java
  36. +57 −57 src/java/org/apache/cassandra/thrift/CassandraServer.java
  37. +2 −2 src/java/org/apache/cassandra/tracing/TraceState.java
  38. +3 −3 src/java/org/apache/cassandra/tracing/Tracing.java
  39. +4 −4 src/java/org/apache/cassandra/triggers/ITrigger.java
  40. +9 −12 src/java/org/apache/cassandra/triggers/TriggerExecutor.java
  41. +2 −2 src/resources/org/apache/cassandra/cli/CliHelp.yaml
  42. +1 −3 test/long/org/apache/cassandra/db/LongKeyspaceTest.java
  43. +1 −1  test/long/org/apache/cassandra/db/MeteredFlusherTest.java
  44. +2 −2 test/long/org/apache/cassandra/db/commitlog/ComitLogStress.java
  45. +1 −1  test/long/org/apache/cassandra/db/compaction/LongCompactionsTest.java
  46. +2 −6 test/long/org/apache/cassandra/db/compaction/LongLeveledCompactionStrategyTest.java
  47. +3 −4 test/unit/org/apache/cassandra/SchemaLoader.java
  48. +2 −2 test/unit/org/apache/cassandra/Util.java
  49. +1 −2  test/unit/org/apache/cassandra/config/CFMetaDataTest.java
  50. +9 −9 test/unit/org/apache/cassandra/config/DefsTest.java
  51. +2 −2 test/unit/org/apache/cassandra/db/CleanupTest.java
  52. +8 −11 test/unit/org/apache/cassandra/db/CollationControllerTest.java
  53. +54 −54 test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java
  54. +5 −5 test/unit/org/apache/cassandra/db/CommitLogTest.java
  55. +4 −4 test/unit/org/apache/cassandra/db/CounterMutationTest.java
  56. +1 −1  test/unit/org/apache/cassandra/db/HintedHandOffTest.java
  57. +3 −4 test/unit/org/apache/cassandra/db/KeyCacheTest.java
  58. +2 −2 test/unit/org/apache/cassandra/db/KeyCollisionTest.java
  59. +15 −15 test/unit/org/apache/cassandra/db/KeyspaceTest.java
  60. +3 −3 test/unit/org/apache/cassandra/db/MultitableTest.java
  61. +3 −3 test/unit/org/apache/cassandra/db/NameSortTest.java
  62. +20 −20 test/unit/org/apache/cassandra/db/RangeTombstoneTest.java
  63. +4 −4 test/unit/org/apache/cassandra/db/ReadMessageTest.java
  64. +1 −1  test/unit/org/apache/cassandra/db/RecoveryManager2Test.java
  65. +3 −3 test/unit/org/apache/cassandra/db/RecoveryManager3Test.java
  66. +6 −6 test/unit/org/apache/cassandra/db/RecoveryManagerTest.java
  67. +2 −2 test/unit/org/apache/cassandra/db/RecoveryManagerTruncateTest.java
  68. +3 −3 test/unit/org/apache/cassandra/db/RemoveCellTest.java
  69. +3 −3 test/unit/org/apache/cassandra/db/RemoveColumnFamilyTest.java
  70. +3 −3 test/unit/org/apache/cassandra/db/RemoveColumnFamilyWithFlush1Test.java
  71. +3 −3 test/unit/org/apache/cassandra/db/RemoveColumnFamilyWithFlush2Test.java
  72. +8 −8 test/unit/org/apache/cassandra/db/RemoveSubCellTest.java
  73. +4 −5 test/unit/org/apache/cassandra/db/RowIterationTest.java
  74. +2 −2 test/unit/org/apache/cassandra/db/ScrubTest.java
  75. +24 −24 test/unit/org/apache/cassandra/db/SerializationsTest.java
  76. +6 −6 test/unit/org/apache/cassandra/db/TimeSortTest.java
  77. +2 −5 test/unit/org/apache/cassandra/db/compaction/BlacklistingCompactionsTest.java
  78. +25 −29 test/unit/org/apache/cassandra/db/compaction/CompactionsPurgeTest.java
  79. +9 −11 test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
  80. +4 −10 test/unit/org/apache/cassandra/db/compaction/LeveledCompactionStrategyTest.java
  81. +3 −5 test/unit/org/apache/cassandra/db/compaction/OneCompactionTest.java
  82. +3 −7 test/unit/org/apache/cassandra/db/compaction/SizeTieredCompactionStrategyTest.java
  83. +9 −16 test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java
  84. +7 −7 test/unit/org/apache/cassandra/db/index/PerRowSecondaryIndexTest.java
  85. +2 −2 test/unit/org/apache/cassandra/db/marshal/CompositeTypeTest.java
  86. +2 −2 test/unit/org/apache/cassandra/db/marshal/DynamicCompositeTypeTest.java
  87. +3 −3 test/unit/org/apache/cassandra/io/sstable/IndexSummaryManagerTest.java
  88. +9 −12 test/unit/org/apache/cassandra/io/sstable/SSTableMetadataTest.java
  89. +10 −10 test/unit/org/apache/cassandra/io/sstable/SSTableReaderTest.java
  90. +1 −1  test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java
  91. +1 −1  test/unit/org/apache/cassandra/service/AntiEntropyServiceCounterTest.java
  92. +2 −2 test/unit/org/apache/cassandra/service/AntiEntropyServiceStandardTest.java
  93. +1 −1  test/unit/org/apache/cassandra/service/QueryPagerTest.java
  94. +3 −3 test/unit/org/apache/cassandra/streaming/StreamingTransferTest.java
View
10 examples/triggers/src/org/apache/cassandra/triggers/InvertedIndex.java
@@ -24,12 +24,12 @@
import java.util.List;
import java.util.Properties;
-import org.apache.cassandra.db.Cell;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.cassandra.db.Cell;
import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.io.util.FileUtils;
public class InvertedIndex implements ITrigger
@@ -37,12 +37,12 @@
private static final Logger logger = LoggerFactory.getLogger(InvertedIndex.class);
private Properties properties = loadProperties();
- public Collection<RowMutation> augment(ByteBuffer key, ColumnFamily update)
+ public Collection<Mutation> augment(ByteBuffer key, ColumnFamily update)
{
- List<RowMutation> mutations = new ArrayList<>();
+ List<Mutation> mutations = new ArrayList<>();
for (Cell cell : update)
{
- RowMutation mutation = new RowMutation(properties.getProperty("keyspace"), cell.value());
+ Mutation mutation = new Mutation(properties.getProperty("keyspace"), cell.value());
mutation.add(properties.getProperty("columnfamily"), cell.name(), key, System.currentTimeMillis());
mutations.add(mutation);
}
View
52 src/java/org/apache/cassandra/config/CFMetaData.java
@@ -1022,7 +1022,7 @@ public static CFMetaData fromThrift(org.apache.cassandra.thrift.CfDef cf_def) th
/**
* Create CFMetaData from thrift {@link CqlRow} that contains columns from schema_columnfamilies.
*
- * @param row CqlRow containing columns from schema_columnfamilies.
+ * @param cf CqlRow containing columns from schema_columnfamilies.
* @return CFMetaData derived from CqlRow
*/
public static CFMetaData fromThriftCqlRow(CqlRow cf, CqlResult columnsRes)
@@ -1476,11 +1476,11 @@ private void validateCompactionThresholds() throws ConfigurationException
*
* @return Difference between attributes in form of schema mutation
*/
- public RowMutation toSchemaUpdate(CFMetaData newState, long modificationTimestamp, boolean fromThrift)
+ public Mutation toSchemaUpdate(CFMetaData newState, long modificationTimestamp, boolean fromThrift)
{
- RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, SystemKeyspace.getSchemaKSKey(ksName));
+ Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, SystemKeyspace.getSchemaKSKey(ksName));
- newState.toSchemaNoColumnsNoTriggers(rm, modificationTimestamp);
+ newState.toSchemaNoColumnsNoTriggers(mutation, modificationTimestamp);
MapDifference<ByteBuffer, ColumnDefinition> columnDiff = Maps.difference(columnMetadata, newState.columnMetadata);
@@ -1492,31 +1492,31 @@ public RowMutation toSchemaUpdate(CFMetaData newState, long modificationTimestam
if (fromThrift && cd.kind != ColumnDefinition.Kind.REGULAR)
continue;
- cd.deleteFromSchema(rm, modificationTimestamp);
+ cd.deleteFromSchema(mutation, modificationTimestamp);
}
// newly added columns
for (ColumnDefinition cd : columnDiff.entriesOnlyOnRight().values())
- cd.toSchema(rm, modificationTimestamp);
+ cd.toSchema(mutation, modificationTimestamp);
// old columns with updated attributes
for (ByteBuffer name : columnDiff.entriesDiffering().keySet())
{
ColumnDefinition cd = newState.columnMetadata.get(name);
- cd.toSchema(rm, modificationTimestamp);
+ cd.toSchema(mutation, modificationTimestamp);
}
MapDifference<String, TriggerDefinition> triggerDiff = Maps.difference(triggers, newState.triggers);
// dropped triggers
for (TriggerDefinition td : triggerDiff.entriesOnlyOnLeft().values())
- td.deleteFromSchema(rm, cfName, modificationTimestamp);
+ td.deleteFromSchema(mutation, cfName, modificationTimestamp);
// newly created triggers
for (TriggerDefinition td : triggerDiff.entriesOnlyOnRight().values())
- td.toSchema(rm, cfName, modificationTimestamp);
+ td.toSchema(mutation, cfName, modificationTimestamp);
- return rm;
+ return mutation;
}
/**
@@ -1524,24 +1524,24 @@ public RowMutation toSchemaUpdate(CFMetaData newState, long modificationTimestam
*
* @param timestamp Timestamp to use
*
- * @return RowMutation to use to completely remove cf from schema
+ * @return Mutation to use to completely remove cf from schema
*/
- public RowMutation dropFromSchema(long timestamp)
+ public Mutation dropFromSchema(long timestamp)
{
- RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, SystemKeyspace.getSchemaKSKey(ksName));
- ColumnFamily cf = rm.addOrGet(SchemaColumnFamiliesCf);
+ Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, SystemKeyspace.getSchemaKSKey(ksName));
+ ColumnFamily cf = mutation.addOrGet(SchemaColumnFamiliesCf);
int ldt = (int) (System.currentTimeMillis() / 1000);
Composite prefix = SchemaColumnFamiliesCf.comparator.make(cfName);
cf.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
for (ColumnDefinition cd : allColumns())
- cd.deleteFromSchema(rm, timestamp);
+ cd.deleteFromSchema(mutation, timestamp);
for (TriggerDefinition td : triggers.values())
- td.deleteFromSchema(rm, cfName, timestamp);
+ td.deleteFromSchema(mutation, cfName, timestamp);
- return rm;
+ return mutation;
}
public boolean isPurged()
@@ -1554,19 +1554,19 @@ void markPurged()
isPurged = true;
}
- public void toSchema(RowMutation rm, long timestamp)
+ public void toSchema(Mutation mutation, long timestamp)
{
- toSchemaNoColumnsNoTriggers(rm, timestamp);
+ toSchemaNoColumnsNoTriggers(mutation, timestamp);
for (ColumnDefinition cd : allColumns())
- cd.toSchema(rm, timestamp);
+ cd.toSchema(mutation, timestamp);
}
- private void toSchemaNoColumnsNoTriggers(RowMutation rm, long timestamp)
+ private void toSchemaNoColumnsNoTriggers(Mutation mutation, long timestamp)
{
// For property that can be null (and can be changed), we insert tombstones, to make sure
// we don't keep a property the user has removed
- ColumnFamily cf = rm.addOrGet(SchemaColumnFamiliesCf);
+ ColumnFamily cf = mutation.addOrGet(SchemaColumnFamiliesCf);
Composite prefix = SchemaColumnFamiliesCf.comparator.make(cfName);
CFRowAdder adder = new CFRowAdder(cf, prefix, timestamp);
@@ -1790,11 +1790,11 @@ private String aliasesToJson(List<ColumnDefinition> rawAliases)
*
* @throws ConfigurationException if any of the attributes didn't pass validation
*/
- public RowMutation toSchema(long timestamp) throws ConfigurationException
+ public Mutation toSchema(long timestamp) throws ConfigurationException
{
- RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, SystemKeyspace.getSchemaKSKey(ksName));
- toSchema(rm, timestamp);
- return rm;
+ Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, SystemKeyspace.getSchemaKSKey(ksName));
+ toSchema(mutation, timestamp);
+ return mutation;
}
// The comparator to validate the definition name.
View
14 src/java/org/apache/cassandra/config/ColumnDefinition.java
@@ -279,14 +279,14 @@ public static ColumnDefinition fromThrift(CFMetaData cfm, ColumnDef thriftColumn
}
/**
- * Drop specified column from the schema using given row.
+ * Drop specified column from the schema using given mutation.
*
- * @param rm The schema row mutation
- * @param timestamp The timestamp to use for column modification
+ * @param mutation The schema mutation
+ * @param timestamp The timestamp to use for column modification
*/
- public void deleteFromSchema(RowMutation rm, long timestamp)
+ public void deleteFromSchema(Mutation mutation, long timestamp)
{
- ColumnFamily cf = rm.addOrGet(CFMetaData.SchemaColumnsCf);
+ ColumnFamily cf = mutation.addOrGet(CFMetaData.SchemaColumnsCf);
int ldt = (int) (System.currentTimeMillis() / 1000);
// Note: we do want to use name.toString(), not name.bytes directly for backward compatibility (For CQL3, this won't make a difference).
@@ -294,9 +294,9 @@ public void deleteFromSchema(RowMutation rm, long timestamp)
cf.addAtom(new RangeTombstone(prefix, prefix.end(), timestamp, ldt));
}
- public void toSchema(RowMutation rm, long timestamp)
+ public void toSchema(Mutation mutation, long timestamp)
{
- ColumnFamily cf = rm.addOrGet(CFMetaData.SchemaColumnsCf);
+ ColumnFamily cf = mutation.addOrGet(CFMetaData.SchemaColumnsCf);
Composite prefix = CFMetaData.SchemaColumnsCf.comparator.make(cfName, name.toString());
CFRowAdder adder = new CFRowAdder(cf, prefix, timestamp);
View
27 src/java/org/apache/cassandra/config/KSMetaData.java
@@ -194,7 +194,7 @@ public KsDef toThrift()
return ksdef;
}
- public RowMutation toSchemaUpdate(KSMetaData newState, long modificationTimestamp)
+ public Mutation toSchemaUpdate(KSMetaData newState, long modificationTimestamp)
{
return newState.toSchema(modificationTimestamp);
}
@@ -226,21 +226,22 @@ public KSMetaData reloadAttributes()
return fromSchema(ksDefRow, Collections.<CFMetaData>emptyList());
}
- public RowMutation dropFromSchema(long timestamp)
+ public Mutation dropFromSchema(long timestamp)
{
- RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, SystemKeyspace.getSchemaKSKey(name));
- rm.delete(SystemKeyspace.SCHEMA_KEYSPACES_CF, timestamp);
- rm.delete(SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF, timestamp);
- rm.delete(SystemKeyspace.SCHEMA_COLUMNS_CF, timestamp);
- rm.delete(SystemKeyspace.SCHEMA_TRIGGERS_CF, timestamp);
+ Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, SystemKeyspace.getSchemaKSKey(name));
- return rm;
+ mutation.delete(SystemKeyspace.SCHEMA_KEYSPACES_CF, timestamp);
+ mutation.delete(SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF, timestamp);
+ mutation.delete(SystemKeyspace.SCHEMA_COLUMNS_CF, timestamp);
+ mutation.delete(SystemKeyspace.SCHEMA_TRIGGERS_CF, timestamp);
+
+ return mutation;
}
- public RowMutation toSchema(long timestamp)
+ public Mutation toSchema(long timestamp)
{
- RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, SystemKeyspace.getSchemaKSKey(name));
- ColumnFamily cf = rm.addOrGet(CFMetaData.SchemaKeyspacesCf);
+ Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, SystemKeyspace.getSchemaKSKey(name));
+ ColumnFamily cf = mutation.addOrGet(CFMetaData.SchemaKeyspacesCf);
CFRowAdder adder = new CFRowAdder(cf, CFMetaData.SchemaKeyspacesCf.comparator.builder().build(), timestamp);
adder.add("durable_writes", durableWrites);
@@ -248,9 +249,9 @@ public RowMutation toSchema(long timestamp)
adder.add("strategy_options", json(strategyOptions));
for (CFMetaData cfm : cfMetaData.values())
- cfm.toSchema(rm, timestamp);
+ cfm.toSchema(mutation, timestamp);
- return rm;
+ return mutation;
}
/**
View
16 src/java/org/apache/cassandra/config/TriggerDefinition.java
@@ -72,15 +72,15 @@ public static TriggerDefinition create(String name, String classOption)
}
/**
- * Add specified trigger to the schema using given row.
+ * Add specified trigger to the schema using given mutation.
*
- * @param rm The schema row mutation
+ * @param mutation The schema mutation
* @param cfName The name of the parent ColumnFamily
* @param timestamp The timestamp to use for the columns
*/
- public void toSchema(RowMutation rm, String cfName, long timestamp)
+ public void toSchema(Mutation mutation, String cfName, long timestamp)
{
- ColumnFamily cf = rm.addOrGet(SystemKeyspace.SCHEMA_TRIGGERS_CF);
+ ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SCHEMA_TRIGGERS_CF);
CFMetaData cfm = CFMetaData.SchemaTriggersCf;
Composite prefix = cfm.comparator.make(cfName, name);
@@ -90,15 +90,15 @@ public void toSchema(RowMutation rm, String cfName, long timestamp)
}
/**
- * Drop specified trigger from the schema using given row.
+ * Drop specified trigger from the schema using given mutation.
*
- * @param rm The schema row mutation
+ * @param mutation The schema mutation
* @param cfName The name of the parent ColumnFamily
* @param timestamp The timestamp to use for the tombstone
*/
- public void deleteFromSchema(RowMutation rm, String cfName, long timestamp)
+ public void deleteFromSchema(Mutation mutation, String cfName, long timestamp)
{
- ColumnFamily cf = rm.addOrGet(SystemKeyspace.SCHEMA_TRIGGERS_CF);
+ ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SCHEMA_TRIGGERS_CF);
int ldt = (int) (System.currentTimeMillis() / 1000);
Composite prefix = CFMetaData.SchemaTriggersCf.comparator.make(cfName, name);
View
16 src/java/org/apache/cassandra/config/UTMetaData.java
@@ -83,10 +83,10 @@ public static UTMetaData fromSchema(List<Row> rows)
return fromSchema(result);
}
- public static RowMutation toSchema(UserType newType, long timestamp)
+ public static Mutation toSchema(UserType newType, long timestamp)
{
- RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, newType.name);
- ColumnFamily cf = rm.addOrGet(SystemKeyspace.SCHEMA_USER_TYPES_CF);
+ Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, newType.name);
+ ColumnFamily cf = mutation.addOrGet(SystemKeyspace.SCHEMA_USER_TYPES_CF);
CFMetaData cfm = CFMetaData.SchemaUserTypesCf;
UpdateParameters params = new UpdateParameters(cfm, Collections.<ByteBuffer>emptyList(), timestamp, 0, null);
@@ -106,14 +106,14 @@ public static RowMutation toSchema(UserType newType, long timestamp)
throw new AssertionError();
}
- return rm;
+ return mutation;
}
- public static RowMutation dropFromSchema(UserType droppedType, long timestamp)
+ public static Mutation dropFromSchema(UserType droppedType, long timestamp)
{
- RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, droppedType.name);
- rm.delete(SystemKeyspace.SCHEMA_USER_TYPES_CF, timestamp);
- return rm;
+ Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, droppedType.name);
+ mutation.delete(SystemKeyspace.SCHEMA_USER_TYPES_CF, timestamp);
+ return mutation;
}
public void addAll(UTMetaData types)
View
22 src/java/org/apache/cassandra/cql/DeleteStatement.java
@@ -24,9 +24,9 @@
import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.IMutation;
-import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.UnauthorizedException;
@@ -76,27 +76,25 @@ public DeleteStatement(List<Term> columns, String keyspace, String columnFamily,
clientState.hasColumnFamilyAccess(keyspace, columnFamily, Permission.MODIFY);
AbstractType<?> keyType = Schema.instance.getCFMetaData(keyspace, columnFamily).getKeyValidator();
- List<IMutation> rowMutations = new ArrayList<IMutation>(keys.size());
+ List<IMutation> mutations = new ArrayList<IMutation>(keys.size());
for (Term key : keys)
- {
- rowMutations.add(mutationForKey(key.getByteBuffer(keyType, variables), keyspace, timestamp, clientState, variables, metadata));
- }
+ mutations.add(mutationForKey(key.getByteBuffer(keyType, variables), keyspace, timestamp, clientState, variables, metadata));
- return rowMutations;
+ return mutations;
}
- public RowMutation mutationForKey(ByteBuffer key, String keyspace, Long timestamp, ThriftClientState clientState, List<ByteBuffer> variables, CFMetaData metadata)
+ public Mutation mutationForKey(ByteBuffer key, String keyspace, Long timestamp, ThriftClientState clientState, List<ByteBuffer> variables, CFMetaData metadata)
throws InvalidRequestException
{
- RowMutation rm = new RowMutation(keyspace, key);
+ Mutation mutation = new Mutation(keyspace, key);
QueryProcessor.validateKeyAlias(metadata, keyName);
if (columns.size() < 1)
{
- // No columns, delete the row
- rm.delete(columnFamily, (timestamp == null) ? getTimestamp(clientState) : timestamp);
+ // No columns, delete the partition
+ mutation.delete(columnFamily, (timestamp == null) ? getTimestamp(clientState) : timestamp);
}
else
{
@@ -106,11 +104,11 @@ public RowMutation mutationForKey(ByteBuffer key, String keyspace, Long timestam
{
CellName columnName = metadata.comparator.cellFromByteBuffer(column.getByteBuffer(at, variables));
validateColumnName(columnName);
- rm.delete(columnFamily, columnName, (timestamp == null) ? getTimestamp(clientState) : timestamp);
+ mutation.delete(columnFamily, columnName, (timestamp == null) ? getTimestamp(clientState) : timestamp);
}
}
- return rm;
+ return mutation;
}
public String toString()
View
35 src/java/org/apache/cassandra/cql/UpdateStatement.java
@@ -23,12 +23,9 @@
import org.apache.cassandra.auth.Permission;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.Schema;
+import org.apache.cassandra.db.*;
import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.composites.CellNameType;
-import org.apache.cassandra.db.CounterMutation;
-import org.apache.cassandra.db.ConsistencyLevel;
-import org.apache.cassandra.db.IMutation;
-import org.apache.cassandra.db.RowMutation;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.UnauthorizedException;
@@ -151,18 +148,16 @@ public boolean isSetConsistencyLevel()
clientState.hasColumnFamilyAccess(keyspace, columnFamily, Permission.MODIFY);
- List<IMutation> rowMutations = new LinkedList<IMutation>();
+ List<IMutation> mutations = new LinkedList<>();
for (Term key: keys)
- {
- rowMutations.add(mutationForKey(keyspace, key.getByteBuffer(getKeyType(keyspace),variables), metadata, timestamp, clientState, variables));
- }
+ mutations.add(mutationForKey(keyspace, key.getByteBuffer(getKeyType(keyspace),variables), metadata, timestamp, clientState, variables));
- return rowMutations;
+ return mutations;
}
/**
- * Compute a row mutation for a single key
+ * Compute a mutation for a single key
*
*
* @param keyspace working keyspace
@@ -171,7 +166,7 @@ public boolean isSetConsistencyLevel()
* @param timestamp global timestamp to use for every key mutation
*
* @param clientState
- * @return row mutation
+ * @return mutation
*
* @throws InvalidRequestException on the wrong request
*/
@@ -182,9 +177,9 @@ private IMutation mutationForKey(String keyspace, ByteBuffer key, CFMetaData met
CellNameType comparator = metadata.comparator;
AbstractType<?> at = comparator.asAbstractType();
- // if true we need to wrap RowMutation into CounterMutation
+ // if true we need to wrap Mutation into CounterMutation
boolean hasCounterColumn = false;
- RowMutation rm = new RowMutation(keyspace, key);
+ Mutation mutation = new Mutation(keyspace, key);
for (Map.Entry<Term, Operation> column : getColumns().entrySet())
{
@@ -199,11 +194,11 @@ private IMutation mutationForKey(String keyspace, ByteBuffer key, CFMetaData met
ByteBuffer colValue = op.a.getByteBuffer(metadata.getValueValidator(colName),variables);
validateColumn(metadata, colName, colValue);
- rm.add(columnFamily,
- colName,
- colValue,
- (timestamp == null) ? getTimestamp(clientState) : timestamp,
- getTimeToLive());
+ mutation.add(columnFamily,
+ colName,
+ colValue,
+ (timestamp == null) ? getTimestamp(clientState) : timestamp,
+ getTimeToLive());
}
else
{
@@ -224,11 +219,11 @@ private IMutation mutationForKey(String keyspace, ByteBuffer key, CFMetaData met
op.b.getText()));
}
- rm.addCounter(columnFamily, colName, value);
+ mutation.addCounter(columnFamily, colName, value);
}
}
- return (hasCounterColumn) ? new CounterMutation(rm, getConsistencyLevel()) : rm;
+ return (hasCounterColumn) ? new CounterMutation(mutation, getConsistencyLevel()) : mutation;
}
public String getColumnFamily()
View
10 src/java/org/apache/cassandra/cql3/statements/ModificationStatement.java
@@ -516,18 +516,18 @@ public ResultMessage executeInternal(QueryState queryState) throws RequestValida
private IMutation makeMutation(ByteBuffer key, ColumnFamily cf, ConsistencyLevel cl, boolean isBatch)
{
- RowMutation rm;
+ Mutation mutation;
if (isBatch)
{
// we might group other mutations together with this one later, so make it mutable
- rm = new RowMutation(cfm.ksName, key);
- rm.add(cf);
+ mutation = new Mutation(cfm.ksName, key);
+ mutation.add(cf);
}
else
{
- rm = new RowMutation(cfm.ksName, key, cf);
+ mutation = new Mutation(cfm.ksName, key, cf);
}
- return isCounter() ? new CounterMutation(rm, cl) : rm;
+ return isCounter() ? new CounterMutation(mutation, cl) : mutation;
}
private ColumnFamily buildConditions(ByteBuffer key, Composite clusteringPrefix, UpdateParameters params)
View
22 src/java/org/apache/cassandra/db/BatchlogManager.java
@@ -35,7 +35,6 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
-import org.apache.cassandra.db.composites.CellName;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,6 +43,7 @@
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
+import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.compaction.CompactionManager;
import org.apache.cassandra.db.marshal.LongType;
import org.apache.cassandra.db.marshal.UUIDType;
@@ -121,21 +121,21 @@ public void runMayThrow() throws ExecutionException, InterruptedException
batchlogTasks.execute(runnable);
}
- public static RowMutation getBatchlogMutationFor(Collection<RowMutation> mutations, UUID uuid)
+ public static Mutation getBatchlogMutationFor(Collection<Mutation> mutations, UUID uuid)
{
long timestamp = FBUtilities.timestampMicros();
ByteBuffer writtenAt = LongType.instance.decompose(timestamp / 1000);
- ByteBuffer data = serializeRowMutations(mutations);
+ ByteBuffer data = serializeMutations(mutations);
ColumnFamily cf = ArrayBackedSortedColumns.factory.create(CFMetaData.BatchlogCf);
cf.addColumn(new Cell(cellName(""), ByteBufferUtil.EMPTY_BYTE_BUFFER, timestamp));
cf.addColumn(new Cell(cellName("data"), data, timestamp));
cf.addColumn(new Cell(cellName("written_at"), writtenAt, timestamp));
- return new RowMutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(uuid), cf);
+ return new Mutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(uuid), cf);
}
- private static ByteBuffer serializeRowMutations(Collection<RowMutation> mutations)
+ private static ByteBuffer serializeMutations(Collection<Mutation> mutations)
{
FastByteArrayOutputStream bos = new FastByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(bos);
@@ -143,8 +143,8 @@ private static ByteBuffer serializeRowMutations(Collection<RowMutation> mutation
try
{
out.writeInt(mutations.size());
- for (RowMutation rm : mutations)
- RowMutation.serializer.serialize(rm, out, VERSION);
+ for (Mutation mutation : mutations)
+ Mutation.serializer.serialize(mutation, out, VERSION);
}
catch (IOException e)
{
@@ -204,14 +204,14 @@ private void replaySerializedMutations(ByteBuffer data, long writtenAt) throws I
DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(data));
int size = in.readInt();
for (int i = 0; i < size; i++)
- replaySerializedMutation(RowMutation.serializer.deserialize(in, VERSION), writtenAt);
+ replaySerializedMutation(Mutation.serializer.deserialize(in, VERSION), writtenAt);
}
/*
* We try to deliver the mutations to the replicas ourselves if they are alive and only resort to writing hints
* when a replica is down or a write request times out.
*/
- private void replaySerializedMutation(RowMutation mutation, long writtenAt)
+ private void replaySerializedMutation(Mutation mutation, long writtenAt)
{
int ttl = calculateHintTTL(mutation, writtenAt);
if (ttl <= 0)
@@ -235,7 +235,7 @@ else if (FailureDetector.instance.isAlive(endpoint))
attemptDirectDelivery(mutation, writtenAt, liveEndpoints);
}
- private void attemptDirectDelivery(RowMutation mutation, long writtenAt, Set<InetAddress> endpoints)
+ private void attemptDirectDelivery(Mutation mutation, long writtenAt, Set<InetAddress> endpoints)
{
List<WriteResponseHandler> handlers = Lists.newArrayList();
final CopyOnWriteArraySet<InetAddress> undelivered = new CopyOnWriteArraySet<InetAddress>(endpoints);
@@ -277,7 +277,7 @@ public void run()
// calculate ttl for the mutation's hint (and reduce ttl by the time the mutation spent in the batchlog).
// this ensures that deletes aren't "undone" by an old batch replay.
- private int calculateHintTTL(RowMutation mutation, long writtenAt)
+ private int calculateHintTTL(Mutation mutation, long writtenAt)
{
return (int) ((HintedHandOffManager.calculateHintTTL(mutation) * 1000 - (System.currentTimeMillis() - writtenAt)) / 1000);
}
View
4 src/java/org/apache/cassandra/db/CollationController.java
@@ -149,9 +149,9 @@ private ColumnFamily collectTimeOrderedData()
&& cfs.getCompactionStrategy() instanceof SizeTieredCompactionStrategy)
{
Tracing.trace("Defragmenting requested data");
- RowMutation rm = new RowMutation(cfs.keyspace.getName(), filter.key.key, returnCF.cloneMe());
+ Mutation mutation = new Mutation(cfs.keyspace.getName(), filter.key.key, returnCF.cloneMe());
// skipping commitlog and index updates is fine since we're just de-fragmenting existing data
- Keyspace.open(rm.getKeyspaceName()).apply(rm, false, false);
+ Keyspace.open(mutation.getKeyspaceName()).apply(mutation, false, false);
}
// Caller is responsible for final removeDeletedCF. This is important for cacheRow to work correctly:
View
6 src/java/org/apache/cassandra/db/CounterCell.java
@@ -349,12 +349,12 @@ public Cell markDeltaToBeCleared()
private static void sendToOtherReplica(DecoratedKey key, ColumnFamily cf) throws RequestExecutionException
{
- RowMutation rm = new RowMutation(cf.metadata().ksName, key.key, cf);
+ Mutation mutation = new Mutation(cf.metadata().ksName, key.key, cf);
final InetAddress local = FBUtilities.getBroadcastAddress();
String localDataCenter = DatabaseDescriptor.getEndpointSnitch().getDatacenter(local);
- StorageProxy.performWrite(rm, ConsistencyLevel.ANY, localDataCenter, new StorageProxy.WritePerformer()
+ StorageProxy.performWrite(mutation, ConsistencyLevel.ANY, localDataCenter, new StorageProxy.WritePerformer()
{
public void apply(IMutation mutation, Iterable<InetAddress> targets, AbstractWriteResponseHandler responseHandler, String localDataCenter, ConsistencyLevel consistency_level)
throws OverloadedException
@@ -363,7 +363,7 @@ public void apply(IMutation mutation, Iterable<InetAddress> targets, AbstractWri
Set<InetAddress> remotes = Sets.difference(ImmutableSet.copyOf(targets), ImmutableSet.of(local));
// Fake local response to be a good lad but we won't wait on the responseHandler
responseHandler.response(null);
- StorageProxy.sendToHintedEndpoints((RowMutation) mutation, remotes, responseHandler, localDataCenter);
+ StorageProxy.sendToHintedEndpoints((Mutation) mutation, remotes, responseHandler, localDataCenter);
}
}, null, WriteType.SIMPLE);
View
79 src/java/org/apache/cassandra/db/CounterMutation.java
@@ -42,38 +42,33 @@
{
public static final CounterMutationSerializer serializer = new CounterMutationSerializer();
- private final RowMutation rowMutation;
+ private final Mutation mutation;
private final ConsistencyLevel consistency;
- public CounterMutation(RowMutation rowMutation, ConsistencyLevel consistency)
+ public CounterMutation(Mutation mutation, ConsistencyLevel consistency)
{
- this.rowMutation = rowMutation;
+ this.mutation = mutation;
this.consistency = consistency;
}
public String getKeyspaceName()
{
- return rowMutation.getKeyspaceName();
+ return mutation.getKeyspaceName();
}
public Collection<UUID> getColumnFamilyIds()
{
- return rowMutation.getColumnFamilyIds();
+ return mutation.getColumnFamilyIds();
}
public Collection<ColumnFamily> getColumnFamilies()
{
- return rowMutation.getColumnFamilies();
+ return mutation.getColumnFamilies();
}
public ByteBuffer key()
{
- return rowMutation.key();
- }
-
- public RowMutation rowMutation()
- {
- return rowMutation;
+ return mutation.key();
}
public ConsistencyLevel consistency()
@@ -81,19 +76,19 @@ public ConsistencyLevel consistency()
return consistency;
}
- public RowMutation makeReplicationMutation()
+ public Mutation makeReplicationMutation()
{
List<ReadCommand> readCommands = new LinkedList<ReadCommand>();
long timestamp = System.currentTimeMillis();
- for (ColumnFamily columnFamily : rowMutation.getColumnFamilies())
+ for (ColumnFamily columnFamily : mutation.getColumnFamilies())
{
if (!columnFamily.metadata().getReplicateOnWrite())
continue;
- addReadCommandFromColumnFamily(rowMutation.getKeyspaceName(), rowMutation.key(), columnFamily, timestamp, readCommands);
+ addReadCommandFromColumnFamily(mutation.getKeyspaceName(), mutation.key(), columnFamily, timestamp, readCommands);
}
- // create a replication RowMutation
- RowMutation replicationMutation = new RowMutation(rowMutation.getKeyspaceName(), rowMutation.key());
+ // create a replication Mutation
+ Mutation replicationMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());
for (ReadCommand readCommand : readCommands)
{
Keyspace keyspace = Keyspace.open(readCommand.ksName);
@@ -121,7 +116,7 @@ private void addReadCommandFromColumnFamily(String keyspaceName, ByteBuffer key,
public boolean shouldReplicateOnWrite()
{
- for (ColumnFamily cf : rowMutation.getColumnFamilies())
+ for (ColumnFamily cf : mutation.getColumnFamilies())
if (cf.metadata().getReplicateOnWrite())
return true;
return false;
@@ -130,10 +125,10 @@ public boolean shouldReplicateOnWrite()
public void apply()
{
// transform all CounterUpdateCell to CounterCell: accomplished by localCopy
- RowMutation rm = new RowMutation(rowMutation.getKeyspaceName(), ByteBufferUtil.clone(rowMutation.key()));
- Keyspace keyspace = Keyspace.open(rm.getKeyspaceName());
+ Mutation m = new Mutation(mutation.getKeyspaceName(), ByteBufferUtil.clone(mutation.key()));
+ Keyspace keyspace = Keyspace.open(m.getKeyspaceName());
- for (ColumnFamily cf_ : rowMutation.getColumnFamilies())
+ for (ColumnFamily cf_ : mutation.getColumnFamilies())
{
ColumnFamily cf = cf_.cloneMeShallow();
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(cf.id());
@@ -141,9 +136,9 @@ public void apply()
{
cf.addColumn(cell.localCopy(cfs), HeapAllocator.instance);
}
- rm.add(cf);
+ m.add(cf);
}
- rm.apply();
+ m.apply();
}
public void addAll(IMutation m)
@@ -152,7 +147,7 @@ public void addAll(IMutation m)
throw new IllegalArgumentException();
CounterMutation cm = (CounterMutation)m;
- rowMutation.addAll(cm.rowMutation);
+ mutation.addAll(cm.mutation);
}
@Override
@@ -164,30 +159,30 @@ public String toString()
public String toString(boolean shallow)
{
StringBuilder buff = new StringBuilder("CounterMutation(");
- buff.append(rowMutation.toString(shallow));
+ buff.append(mutation.toString(shallow));
buff.append(", ").append(consistency.toString());
return buff.append(")").toString();
}
-}
-class CounterMutationSerializer implements IVersionedSerializer<CounterMutation>
-{
- public void serialize(CounterMutation cm, DataOutput out, int version) throws IOException
+ public static class CounterMutationSerializer implements IVersionedSerializer<CounterMutation>
{
- RowMutation.serializer.serialize(cm.rowMutation(), out, version);
- out.writeUTF(cm.consistency().name());
- }
+ public void serialize(CounterMutation cm, DataOutput out, int version) throws IOException
+ {
+ Mutation.serializer.serialize(cm.mutation, out, version);
+ out.writeUTF(cm.consistency.name());
+ }
- public CounterMutation deserialize(DataInput in, int version) throws IOException
- {
- RowMutation rm = RowMutation.serializer.deserialize(in, version);
- ConsistencyLevel consistency = Enum.valueOf(ConsistencyLevel.class, in.readUTF());
- return new CounterMutation(rm, consistency);
- }
+ public CounterMutation deserialize(DataInput in, int version) throws IOException
+ {
+ Mutation m = Mutation.serializer.deserialize(in, version);
+ ConsistencyLevel consistency = Enum.valueOf(ConsistencyLevel.class, in.readUTF());
+ return new CounterMutation(m, consistency);
+ }
- public long serializedSize(CounterMutation cm, int version)
- {
- return RowMutation.serializer.serializedSize(cm.rowMutation(), version)
- + TypeSizes.NATIVE.sizeof(cm.consistency().name());
+ public long serializedSize(CounterMutation cm, int version)
+ {
+ return Mutation.serializer.serializedSize(cm.mutation, version)
+ + TypeSizes.NATIVE.sizeof(cm.consistency.name());
+ }
}
}
View
6 src/java/org/apache/cassandra/db/DefinitionsUpdateVerbHandler.java
@@ -32,13 +32,13 @@
* Called when node receives updated schema state from the schema migration coordinator node.
* Such happens when user makes local schema migration on one of the nodes in the ring
* (which is going to act as coordinator) and that node sends (pushes) it's updated schema state
- * (in form of row mutations) to all the alive nodes in the cluster.
+ * (in form of mutations) to all the alive nodes in the cluster.
*/
-public class DefinitionsUpdateVerbHandler implements IVerbHandler<Collection<RowMutation>>
+public class DefinitionsUpdateVerbHandler implements IVerbHandler<Collection<Mutation>>
{
private static final Logger logger = LoggerFactory.getLogger(DefinitionsUpdateVerbHandler.class);
- public void doVerb(final MessageIn<Collection<RowMutation>> message, int id)
+ public void doVerb(final MessageIn<Collection<Mutation>> message, int id)
{
logger.debug("Received schema mutation push from {}", message.from);
View
8 src/java/org/apache/cassandra/db/DefsTables.java
@@ -37,11 +37,9 @@
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.marshal.AsciiType;
import org.apache.cassandra.db.marshal.UserType;
-import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.service.MigrationManager;
import org.apache.cassandra.service.StorageService;
-import org.apache.cassandra.utils.ByteBufferUtil;
/**
* SCHEMA_{KEYSPACES, COLUMNFAMILIES, COLUMNS}_CF are used to store Keyspace/ColumnFamily attributes to make schema
@@ -145,7 +143,7 @@ private static Row serializedColumnFamilies(DecoratedKey ksNameKey)
}
/**
- * Merge remote schema in form of row mutations with local and mutate ks/cf metadata objects
+ * Merge remote schema in form of mutations with local and mutate ks/cf metadata objects
* (which also involves fs operations on add/drop ks/cf)
*
* @param mutations the schema changes to apply
@@ -153,14 +151,14 @@ private static Row serializedColumnFamilies(DecoratedKey ksNameKey)
* @throws ConfigurationException If one of metadata attributes has invalid value
* @throws IOException If data was corrupted during transportation or failed to apply fs operations
*/
- public static synchronized void mergeSchema(Collection<RowMutation> mutations) throws ConfigurationException, IOException
+ public static synchronized void mergeSchema(Collection<Mutation> mutations) throws ConfigurationException, IOException
{
// current state of the schema
Map<DecoratedKey, ColumnFamily> oldKeyspaces = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_KEYSPACES_CF);
Map<DecoratedKey, ColumnFamily> oldColumnFamilies = SystemKeyspace.getSchema(SystemKeyspace.SCHEMA_COLUMNFAMILIES_CF);
List<Row> oldTypes = SystemKeyspace.serializedSchema(SystemKeyspace.SCHEMA_USER_TYPES_CF);
- for (RowMutation mutation : mutations)
+ for (Mutation mutation : mutations)
mutation.apply();
if (!StorageService.instance.isClientMode())
View
36 src/java/org/apache/cassandra/db/HintedHandOffManager.java
@@ -121,7 +121,7 @@
* Returns a mutation representing a Hint to be sent to <code>targetId</code>
* as soon as it becomes available again.
*/
- public RowMutation hintFor(RowMutation mutation, int ttl, UUID targetId)
+ public Mutation hintFor(Mutation mutation, int ttl, UUID targetId)
{
assert ttl > 0;
@@ -135,18 +135,18 @@ public RowMutation hintFor(RowMutation mutation, int ttl, UUID targetId)
UUID hintId = UUIDGen.getTimeUUID();
// serialize the hint with id and version as a composite column name
CellName name = CFMetaData.HintsCf.comparator.makeCellName(hintId, MessagingService.current_version);
- ByteBuffer value = ByteBuffer.wrap(FBUtilities.serialize(mutation, RowMutation.serializer, MessagingService.current_version));
+ ByteBuffer value = ByteBuffer.wrap(FBUtilities.serialize(mutation, Mutation.serializer, MessagingService.current_version));
ColumnFamily cf = ArrayBackedSortedColumns.factory.create(Schema.instance.getCFMetaData(Keyspace.SYSTEM_KS, SystemKeyspace.HINTS_CF));
cf.addColumn(name, value, System.currentTimeMillis(), ttl);
- return new RowMutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(targetId), cf);
+ return new Mutation(Keyspace.SYSTEM_KS, UUIDType.instance.decompose(targetId), cf);
}
/*
- * determine the TTL for the hint RowMutation
+ * determine the TTL for the hint Mutation
* this is set at the smallest GCGraceSeconds for any of the CFs in the RM
* this ensures that deletes aren't "undone" by delivery of an old hint
*/
- public static int calculateHintTTL(RowMutation mutation)
+ public static int calculateHintTTL(Mutation mutation)
{
int ttl = maxHintTTL;
for (ColumnFamily cf : mutation.getColumnFamilies())
@@ -181,9 +181,9 @@ public void run()
private static void deleteHint(ByteBuffer tokenBytes, CellName columnName, long timestamp)
{
- RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, tokenBytes);
- rm.delete(SystemKeyspace.HINTS_CF, columnName, timestamp);
- rm.applyUnsafe(); // don't bother with commitlog since we're going to flush as soon as we're done with delivery
+ Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, tokenBytes);
+ mutation.delete(SystemKeyspace.HINTS_CF, columnName, timestamp);
+ mutation.applyUnsafe(); // don't bother with commitlog since we're going to flush as soon as we're done with delivery
}
public void deleteHintsForEndpoint(final String ipOrHostname)
@@ -206,8 +206,8 @@ public void deleteHintsForEndpoint(final InetAddress endpoint)
return;
UUID hostId = StorageService.instance.getTokenMetadata().getHostId(endpoint);
ByteBuffer hostIdBytes = ByteBuffer.wrap(UUIDGen.decompose(hostId));
- final RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, hostIdBytes);
- rm.delete(SystemKeyspace.HINTS_CF, System.currentTimeMillis());
+ final Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, hostIdBytes);
+ mutation.delete(SystemKeyspace.HINTS_CF, System.currentTimeMillis());
// execute asynchronously to avoid blocking caller (which may be processing gossip)
Runnable runnable = new Runnable()
@@ -217,7 +217,7 @@ public void run()
try
{
logger.info("Deleting any stored hints for {}", endpoint);
- rm.apply();
+ mutation.apply();
compact();
}
catch (Exception e)
@@ -384,10 +384,10 @@ private void doDeliverHintsToEndpoint(InetAddress endpoint)
int version = Int32Type.instance.compose(hint.name().get(1));
DataInputStream in = new DataInputStream(ByteBufferUtil.inputStream(hint.value()));
- RowMutation rm;
+ Mutation mutation;
try
{
- rm = RowMutation.serializer.deserialize(in, version);
+ mutation = Mutation.serializer.deserialize(in, version);
}
catch (UnknownColumnFamilyException e)
{
@@ -401,12 +401,12 @@ private void doDeliverHintsToEndpoint(InetAddress endpoint)
}
truncationTimesCache.clear();
- for (UUID cfId : ImmutableSet.copyOf((rm.getColumnFamilyIds())))
+ for (UUID cfId : ImmutableSet.copyOf((mutation.getColumnFamilyIds())))
{
Long truncatedAt = truncationTimesCache.get(cfId);
if (truncatedAt == null)
{
- ColumnFamilyStore cfs = Keyspace.open(rm.getKeyspaceName()).getColumnFamilyStore(cfId);
+ ColumnFamilyStore cfs = Keyspace.open(mutation.getKeyspaceName()).getColumnFamilyStore(cfId);
truncatedAt = cfs.getTruncationTime();
truncationTimesCache.put(cfId, truncatedAt);
}
@@ -414,17 +414,17 @@ private void doDeliverHintsToEndpoint(InetAddress endpoint)
if (hint.maxTimestamp() < truncatedAt)
{
logger.debug("Skipping delivery of hint for truncated columnfamily {}", cfId);
- rm = rm.without(cfId);
+ mutation = mutation.without(cfId);
}
}
- if (rm.isEmpty())
+ if (mutation.isEmpty())
{
deleteHint(hostIdBytes, hint.name(), hint.maxTimestamp());
continue;
}
- MessageOut<RowMutation> message = rm.createMessage();
+ MessageOut<Mutation> message = mutation.createMessage();
rateLimiter.acquire(message.serializedSize(MessagingService.current_version));
Runnable callback = new Runnable()
{
View
4 src/java/org/apache/cassandra/db/Keyspace.java
@@ -328,7 +328,7 @@ public Row getRow(QueryFilter filter)
return new Row(filter.key, columnFamily);
}
- public void apply(RowMutation mutation, boolean writeCommitLog)
+ public void apply(Mutation mutation, boolean writeCommitLog)
{
apply(mutation, writeCommitLog, true);
}
@@ -341,7 +341,7 @@ public void apply(RowMutation mutation, boolean writeCommitLog)
* @param writeCommitLog false to disable commitlog append entirely
* @param updateIndexes false to disable index updates (used by CollationController "defragmenting")
*/
- public void apply(RowMutation mutation, boolean writeCommitLog, boolean updateIndexes)
+ public void apply(Mutation mutation, boolean writeCommitLog, boolean updateIndexes)
{
// write the mutation to the commitlog and memtables
Tracing.trace("Acquiring switchLock read lock");
View
8 src/java/org/apache/cassandra/db/MigrationRequestVerbHandler.java
@@ -29,7 +29,7 @@
import org.apache.cassandra.service.MigrationManager;
/**
- * Sends it's current schema state in form of row mutations in reply to the remote node's request.
+ * Sends it's current schema state in form of mutations in reply to the remote node's request.
* Such a request is made when one of the nodes, by means of Gossip, detects schema disagreement in the ring.
*/
public class MigrationRequestVerbHandler implements IVerbHandler
@@ -39,9 +39,9 @@
public void doVerb(MessageIn message, int id)
{
logger.debug("Received migration request from {}.", message.from);
- MessageOut<Collection<RowMutation>> response = new MessageOut<>(MessagingService.Verb.INTERNAL_RESPONSE,
- SystemKeyspace.serializeSchema(),
- MigrationManager.MigrationsSerializer.instance);
+ MessageOut<Collection<Mutation>> response = new MessageOut<>(MessagingService.Verb.INTERNAL_RESPONSE,
+ SystemKeyspace.serializeSchema(),
+ MigrationManager.MigrationsSerializer.instance);
MessagingService.instance().sendReply(response, id, message.from);
}
}
View
73 .../org/apache/cassandra/db/RowMutation.java → ...ava/org/apache/cassandra/db/Mutation.java
@@ -34,45 +34,46 @@
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.utils.ByteBufferUtil;
-// TODO convert this to a Builder pattern instead of encouraging RM.add directly,
+// TODO convert this to a Builder pattern instead of encouraging M.add directly,
// which is less-efficient since we have to keep a mutable HashMap around
-public class RowMutation implements IMutation
+public class Mutation implements IMutation
{
- public static final RowMutationSerializer serializer = new RowMutationSerializer();
+ public static final MutationSerializer serializer = new MutationSerializer();
+
public static final String FORWARD_TO = "FWD_TO";
public static final String FORWARD_FROM = "FWD_FRM";
// todo this is redundant
- // when we remove it, also restore SerializationsTest.testRowMutationRead to not regenerate new RowMutations each test
+ // when we remove it, also restore SerializationsTest.testMutationRead to not regenerate new Mutations each test
private final String keyspaceName;
private final ByteBuffer key;
// map of column family id to mutations for that column family.
private final Map<UUID, ColumnFamily> modifications;
- public RowMutation(String keyspaceName, ByteBuffer key)
+ public Mutation(String keyspaceName, ByteBuffer key)
{
this(keyspaceName, key, new HashMap<UUID, ColumnFamily>());
}
- public RowMutation(String keyspaceName, ByteBuffer key, ColumnFamily cf)
+ public Mutation(String keyspaceName, ByteBuffer key, ColumnFamily cf)
{
this(keyspaceName, key, Collections.singletonMap(cf.id(), cf));
}
- public RowMutation(String keyspaceName, Row row)
+ public Mutation(String keyspaceName, Row row)
{
this(keyspaceName, row.key.key, row.cf);
}
- protected RowMutation(String keyspaceName, ByteBuffer key, Map<UUID, ColumnFamily> modifications)
+ protected Mutation(String keyspaceName, ByteBuffer key, Map<UUID, ColumnFamily> modifications)
{
this.keyspaceName = keyspaceName;
this.key = key;
this.modifications = modifications;
}
- public RowMutation(ByteBuffer key, ColumnFamily cf)
+ public Mutation(ByteBuffer key, ColumnFamily cf)
{
this(cf.metadata().ksName, key, cf);
}
@@ -118,7 +119,7 @@ public void add(ColumnFamily columnFamily)
}
/**
- * @return the ColumnFamily in this RowMutation corresponding to @param cfName, creating an empty one if necessary.
+ * @return the ColumnFamily in this Mutation corresponding to @param cfName, creating an empty one if necessary.
*/
public ColumnFamily addOrGet(String cfName)
{
@@ -176,14 +177,14 @@ public void deleteRange(String cfName, Composite start, Composite end, long time
public void addAll(IMutation m)
{
- if (!(m instanceof RowMutation))
+ if (!(m instanceof Mutation))
throw new IllegalArgumentException();
- RowMutation rm = (RowMutation)m;
- if (!keyspaceName.equals(rm.keyspaceName) || !key.equals(rm.key))
+ Mutation mutation = (Mutation)m;
+ if (!keyspaceName.equals(mutation.keyspaceName) || !key.equals(mutation.key))
throw new IllegalArgumentException();
- for (Map.Entry<UUID, ColumnFamily> entry : rm.modifications.entrySet())
+ for (Map.Entry<UUID, ColumnFamily> entry : mutation.modifications.entrySet())
{
// It's slighty faster to assume the key wasn't present and fix if
// not in the case where it wasn't there indeed.
@@ -208,12 +209,12 @@ public void applyUnsafe()
Keyspace.open(keyspaceName).apply(this, false);
}
- public MessageOut<RowMutation> createMessage()
+ public MessageOut<Mutation> createMessage()
{
return createMessage(MessagingService.Verb.MUTATION);
}
- public MessageOut<RowMutation> createMessage(MessagingService.Verb verb)
+ public MessageOut<Mutation> createMessage(MessagingService.Verb verb)
{
return new MessageOut<>(verb, this, serializer);
}
@@ -225,7 +226,7 @@ public String toString()
public String toString(boolean shallow)
{
- StringBuilder buff = new StringBuilder("RowMutation(");
+ StringBuilder buff = new StringBuilder("Mutation(");
buff.append("keyspace='").append(keyspaceName).append('\'');
buff.append(", key='").append(ByteBufferUtil.bytesToHex(key)).append('\'');
buff.append(", modifications=[");
@@ -244,33 +245,33 @@ public String toString(boolean shallow)
return buff.append("])").toString();
}
- public RowMutation without(UUID cfId)
+ public Mutation without(UUID cfId)
{
- RowMutation rm = new RowMutation(keyspaceName, key);
+ Mutation mutation = new Mutation(keyspaceName, key);
for (Map.Entry<UUID, ColumnFamily> entry : modifications.entrySet())
if (!entry.getKey().equals(cfId))
- rm.add(entry.getValue());
- return rm;
+ mutation.add(entry.getValue());
+ return mutation;
}
- public static class RowMutationSerializer implements IVersionedSerializer<RowMutation>
+ public static class MutationSerializer implements IVersionedSerializer<Mutation>
{
- public void serialize(RowMutation rm, DataOutput out, int version) throws IOException
+ public void serialize(Mutation mutation, DataOutput out, int version) throws IOException
{
if (version < MessagingService.VERSION_20)
- out.writeUTF(rm.getKeyspaceName());
+ out.writeUTF(mutation.getKeyspaceName());
- ByteBufferUtil.writeWithShortLength(rm.key(), out);
+ ByteBufferUtil.writeWithShortLength(mutation.key(), out);
/* serialize the modifications in the mutation */
- int size = rm.modifications.size();
+ int size = mutation.modifications.size();
out.writeInt(size);
assert size > 0;
- for (Map.Entry<UUID, ColumnFamily> entry : rm.modifications.entrySet())
+ for (Map.Entry<UUID, ColumnFamily> entry : mutation.modifications.entrySet())
ColumnFamily.serializer.serialize(entry.getValue(), out, version);
}
- public RowMutation deserialize(DataInput in, int version, ColumnSerializer.Flag flag) throws IOException
+ public Mutation deserialize(DataInput in, int version, ColumnSerializer.Flag flag) throws IOException
{
String keyspaceName = null; // will always be set from cf.metadata but javac isn't smart enough to see that
if (version < MessagingService.VERSION_20)
@@ -298,35 +299,35 @@ public RowMutation deserialize(DataInput in, int version, ColumnSerializer.Flag
}
}
- return new RowMutation(keyspaceName, key, modifications);
+ return new Mutation(keyspaceName, key, modifications);
}
private ColumnFamily deserializeOneCf(DataInput in, int version, ColumnSerializer.Flag flag) throws IOException
{
ColumnFamily cf = ColumnFamily.serializer.deserialize(in, UnsortedColumns.factory, flag, version);
- // We don't allow RowMutation with null column family, so we should never get null back.
+ // We don't allow Mutation with null column family, so we should never get null back.
assert cf != null;
return cf;
}
- public RowMutation deserialize(DataInput in, int version) throws IOException
+ public Mutation deserialize(DataInput in, int version) throws IOException
{
return deserialize(in, version, ColumnSerializer.Flag.FROM_REMOTE);
}
- public long serializedSize(RowMutation rm, int version)
+ public long serializedSize(Mutation mutation, int version)
{
TypeSizes sizes = TypeSizes.NATIVE;
int size = 0;
if (version < MessagingService.VERSION_20)
- size += sizes.sizeof(rm.getKeyspaceName());
+ size += sizes.sizeof(mutation.getKeyspaceName());
- int keySize = rm.key().remaining();
+ int keySize = mutation.key().remaining();
size += sizes.sizeof((short) keySize) + keySize;
- size += sizes.sizeof(rm.modifications.size());
- for (Map.Entry<UUID,ColumnFamily> entry : rm.modifications.entrySet())
+ size += sizes.sizeof(mutation.modifications.size());
+ for (Map.Entry<UUID,ColumnFamily> entry : mutation.modifications.entrySet())
size += ColumnFamily.serializer.serializedSize(entry.getValue(), TypeSizes.NATIVE, version);
return size;
View
22 .../cassandra/db/RowMutationVerbHandler.java → ...che/cassandra/db/MutationVerbHandler.java
@@ -28,39 +28,37 @@
import org.apache.cassandra.net.*;
import org.apache.cassandra.tracing.Tracing;
-public class RowMutationVerbHandler implements IVerbHandler<RowMutation>
+public class MutationVerbHandler implements IVerbHandler<Mutation>
{
- private static final Logger logger = LoggerFactory.getLogger(RowMutationVerbHandler.class);
+ private static final Logger logger = LoggerFactory.getLogger(MutationVerbHandler.class);
- public void doVerb(MessageIn<RowMutation> message, int id)
+ public void doVerb(MessageIn<Mutation> message, int id)
{
try
{
- RowMutation rm = message.payload;
-
// Check if there were any forwarding headers in this message
- byte[] from = message.parameters.get(RowMutation.FORWARD_FROM);
+ byte[] from = message.parameters.get(Mutation.FORWARD_FROM);
InetAddress replyTo;
if (from == null)
{
replyTo = message.from;
- byte[] forwardBytes = message.parameters.get(RowMutation.FORWARD_TO);
+ byte[] forwardBytes = message.parameters.get(Mutation.FORWARD_TO);
if (forwardBytes != null)
- forwardToLocalNodes(rm, message.verb, forwardBytes, message.from);
+ forwardToLocalNodes(message.payload, message.verb, forwardBytes, message.from);
}
else
{
replyTo = InetAddress.getByAddress(from);
}
- rm.apply();
+ message.payload.apply();
WriteResponse response = new WriteResponse();
Tracing.trace("Enqueuing response to {}", replyTo);
MessagingService.instance().sendReply(response.createMessage(), id, replyTo);
}
catch (IOException e)
{
- logger.error("Error in row mutation", e);
+ logger.error("Error in mutation", e);
}
}
@@ -68,13 +66,13 @@ public void doVerb(MessageIn<RowMutation> message, int id)
* Older version (< 1.0) will not send this message at all, hence we don't
* need to check the version of the data.
*/
- private void forwardToLocalNodes(RowMutation rm, MessagingService.Verb verb, byte[] forwardBytes, InetAddress from) throws IOException
+ private void forwardToLocalNodes(Mutation mutation, MessagingService.Verb verb, byte[] forwardBytes, InetAddress from) throws IOException
{
DataInputStream in = new DataInputStream(new FastByteArrayInputStream(forwardBytes));
int size = in.readInt();
// tell the recipients who to send their ack to
- MessageOut<RowMutation> message = new MessageOut<RowMutation>(verb, rm, RowMutation.serializer).withParameter(RowMutation.FORWARD_FROM, from.getAddress());
+ MessageOut<Mutation> message = new MessageOut<>(verb, mutation, Mutation.serializer).withParameter(Mutation.FORWARD_FROM, from.getAddress());
// Send a message to each of the addresses on our Forward List
for (int i = 0; i < size; i++)
{
View
7 src/java/org/apache/cassandra/db/ReadRepairVerbHandler.java
@@ -21,12 +21,11 @@
import org.apache.cassandra.net.MessageIn;
import org.apache.cassandra.net.MessagingService;
-public class ReadRepairVerbHandler implements IVerbHandler<RowMutation>
+public class ReadRepairVerbHandler implements IVerbHandler<Mutation>
{
- public void doVerb(MessageIn<RowMutation> message, int id)
+ public void doVerb(MessageIn<Mutation> message, int id)
{
- RowMutation rm = message.payload;
- rm.apply();
+ message.payload.apply();
WriteResponse response = new WriteResponse();
MessagingService.instance().sendReply(response.createMessage(), id, message.from);
}
View
33 src/java/org/apache/cassandra/db/SystemKeyspace.java
@@ -29,12 +29,6 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
-
-import org.apache.cassandra.db.compaction.CompactionHistoryTabularData;
-import org.apache.cassandra.db.composites.Composite;
-import org.apache.cassandra.db.composites.Composites;
-import org.apache.cassandra.metrics.RestorableMeter;
-import org.apache.cassandra.transport.Server;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -47,6 +41,9 @@
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.columniterator.IdentityQueryFilter;
import org.apache.cassandra.db.commitlog.ReplayPosition;
+import org.apache.cassandra.db.compaction.CompactionHistoryTabularData;
+import org.apache.cassandra.db.composites.Composite;
+import org.apache.cassandra.db.composites.Composites;
import org.apache.cassandra.db.filter.QueryFilter;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.dht.Range;
@@ -55,10 +52,12 @@
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.io.util.DataOutputBuffer;
import org.apache.cassandra.locator.IEndpointSnitch;
+import org.apache.cassandra.metrics.RestorableMeter;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.service.paxos.Commit;
import org.apache.cassandra.service.paxos.PaxosState;
import org.apache.cassandra.thrift.cassandraConstants;
+import org.apache.cassandra.transport.Server;
import org.apache.cassandra.utils.*;
import static org.apache.cassandra.cql3.QueryProcessor.processInternal;
@@ -597,15 +596,14 @@ public static void setIndexBuilt(String keyspaceName, String indexName)
{
ColumnFamily cf = ArrayBackedSortedColumns.factory.create(Keyspace.SYSTEM_KS, INDEX_CF);
cf.addColumn(new Cell(cf.getComparator().makeCellName(indexName), ByteBufferUtil.EMPTY_BYTE_BUFFER, FBUtilities.timestampMicros()));
- RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, ByteBufferUtil.bytes(keyspaceName), cf);
- rm.apply();
+ new Mutation(Keyspace.SYSTEM_KS, ByteBufferUtil.bytes(keyspaceName), cf).apply();
}
public static void setIndexRemoved(String keyspaceName, String indexName)
{
- RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, ByteBufferUtil.bytes(keyspaceName));
- rm.delete(INDEX_CF, CFMetaData.IndexCf.comparator.makeCellName(indexName), FBUtilities.timestampMicros());
- rm.apply();
+ Mutation mutation = new Mutation(Keyspace.SYSTEM_KS, ByteBufferUtil.bytes(keyspaceName));
+ mutation.delete(INDEX_CF, CFMetaData.IndexCf.comparator.makeCellName(indexName), FBUtilities.timestampMicros());
+ mutation.apply();
}
/**
@@ -676,8 +674,7 @@ public static void writeCurrentLocalCounterId(CounterId newCounterId, long now)
ColumnFamily cf = ArrayBackedSortedColumns.factory.create(Keyspace.SYSTEM_KS, COUNTER_ID_CF);
cf.addColumn(new Cell(cf.getComparator().makeCellName(newCounterId.bytes()), ip, now));
- RowMutation rm = new RowMutation(Keyspace.SYSTEM_KS, ALL_LOCAL_NODE_ID_KEY, cf);
- rm.apply();
+ new Mutation(Keyspace.SYSTEM_KS, ALL_LOCAL_NODE_ID_KEY, cf).apply();
forceBlockingFlush(COUNTER_ID_CF);
}
@@ -736,9 +733,9 @@ public static ColumnFamilyStore schemaCFS(String cfName)
System.currentTimeMillis());
}
- public static Collection<RowMutation> serializeSchema()
+ public static Collection<Mutation> serializeSchema()
{
- Map<DecoratedKey, RowMutation> mutationMap = new HashMap<>();
+ Map<DecoratedKey, Mutation> mutationMap = new HashMap<>();
for (String cf : allSchemaCfs)
serializeSchema(mutationMap, cf);
@@ -746,17 +743,17 @@ public static ColumnFamilyStore schemaCFS(String cfName)
return mutationMap.values();
}
- private static void serializeSchema(Map<DecoratedKey, RowMutation> mutationMap, String schemaCfName)
+ private static void serializeSchema(Map<DecoratedKey, Mutation> mutationMap, String schemaCfName)
{
for (Row schemaRow : serializedSchema(schemaCfName))
{
if (Schema.ignoredSchemaRow(schemaRow))
continue;
- RowMutation mutation = mutationMap.get(schemaRow.key);
+ Mutation mutation = mutationMap.get(schemaRow.key);
if (mutation == null)
{
- mutation = new RowMutation(Keyspace.SYSTEM_KS, schemaRow.key.key);
+ mutation = new Mutation(Keyspace.SYSTEM_KS, schemaRow.key.key);
mutationMap.put(schemaRow.key, mutation);
}
View
13 src/java/org/apache/cassandra/db/commitlog/CommitLog.java
@@ -21,7 +21,6 @@
import java.lang.management.ManagementFactory;
import java.nio.ByteBuffer;
import java.util.*;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import javax.management.MBeanServer;
@@ -188,13 +187,13 @@ public void requestExtraSync()
}
/**
- * Add a RowMutation to the commit log.
+ * Add a Mutation to the commit log.
*
- * @param rowMutation the RowMutation to add to the log
+ * @param mutation the Mutation to add to the log
*/
- public void add(RowMutation rowMutation)
+ public void add(Mutation mutation)
{
- long size = RowMutation.serializer.serializedSize(rowMutation, MessagingService.current_version);
+ long size = Mutation.serializer.serializedSize(mutation, MessagingService.current_version);
long totalSize = size + ENTRY_OVERHEAD_SIZE;
if (totalSize > MAX_MUTATION_SIZE)
@@ -203,7 +202,7 @@ public void add(RowMutation rowMutation)
return;
}
- Allocation alloc = allocator.allocate(rowMutation, (int) totalSize, new Allocation());
+ Allocation alloc = allocator.allocate(mutation, (int) totalSize, new Allocation());
try
{
PureJavaCrc32 checksum = new PureJavaCrc32();
@@ -215,7 +214,7 @@ public void add(RowMutation rowMutation)
buffer.putLong(checksum.getValue());
// checksummed mutation
- RowMutation.serializer.serialize(rowMutation, dos, MessagingService.current_version);
+ Mutation.serializer.serialize(mutation, dos, MessagingService.current_version);
buffer.putLong(checksum.getValue());
}
catch (IOException e)
View
52 src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
@@ -164,7 +164,7 @@ else if (globalPosition.segment == segmentId)
private abstract static class ReplayFilter
{
- public abstract Iterable<ColumnFamily> filter(RowMutation rm);
+ public abstract Iterable<ColumnFamily> filter(Mutation mutation);
public static ReplayFilter create()
{
@@ -193,9 +193,9 @@ public static ReplayFilter create()
private static class AlwaysReplayFilter extends ReplayFilter
{
- public Iterable<ColumnFamily> filter(RowMutation rm)
+ public Iterable<ColumnFamily> filter(Mutation mutation)
{
- return rm.getColumnFamilies();
+ return mutation.getColumnFamilies();
}
}
@@ -208,13 +208,13 @@ public CustomReplayFilter(Multimap<String, String> toReplay)
this.toReplay = toReplay;
}
- public Iterable<ColumnFamily> filter(RowMutation rm)
+ public Iterable<ColumnFamily> filter(Mutation mutation)
{
- final Collection<String> cfNames = toReplay.get(rm.getKeyspaceName());
+ final Collection<String> cfNames = toReplay.get(mutation.getKeyspaceName());
if (cfNames == null)
return Collections.emptySet();
- return Iterables.filter(rm.getColumnFamilies(), new Predicate<ColumnFamily>()
+ return Iterables.filter(mutation.getColumnFamilies(), new Predicate<ColumnFamily>()
{
public boolean apply(ColumnFamily cf)
{
@@ -264,7 +264,7 @@ public void recover(File file) throws IOException
reader.seek(offset);
- /* read the logs populate RowMutation and apply */
+ /* read the logs populate Mutation and apply */
while (reader.getPosition() < end && !reader.isEOF())
{
if (logger.isDebugEnabled())
@@ -282,7 +282,7 @@ public void recover(File file) throws IOException
break main;
}
- // RowMutation must be at LEAST 10 bytes:
+ // Mutation must be at LEAST 10 bytes:
// 3 each for a non-empty Keyspace and Key (including the
// 2-byte length from writeUTF/writeWithShortLength) and 4 bytes for column count.
// This prevents CRC by being fooled by special-case garbage in the file; see CASSANDRA-2128
@@ -320,14 +320,14 @@ public void recover(File file) throws IOException
/* deserialize the commit log entry */
FastByteArrayInputStream bufIn = new FastByteArrayInputStream(buffer, 0, serializedSize);
- final RowMutation rm;
+ final Mutation mutation;
try
{
// assuming version here. We've gone to lengths to make sure what gets written to the CL is in
// the current version. so do make sure the CL is drained prior to upgrading a node.
- rm = RowMutation.serializer.deserialize(new DataInputStream(bufIn), version, ColumnSerializer.Flag.LOCAL);
+ mutation = Mutation.serializer.deserialize(new DataInputStream(bufIn), version, ColumnSerializer.Flag.LOCAL);
// doublecheck that what we read is [still] valid for the current schema
- for (ColumnFamily cf : rm.getColumnFamilies())
+ for (ColumnFamily cf : mutation.getColumnFamilies())
for (Cell cell : cf)
cf.getComparator().validate(cell.name());
}
@@ -364,27 +364,27 @@ public void recover(File file) throws IOException
}
if (logger.isDebugEnabled())
- logger.debug("replaying mutation for {}.{}: {}", rm.getKeyspaceName(), ByteBufferUtil.bytesToHex(rm.key()), "{" + StringUtils.join(rm.getColumnFamilies().iterator(), ", ") + "}");
+ logger.debug("replaying mutation for {}.{}: {}", mutation.getKeyspaceName(), ByteBufferUtil.bytesToHex(mutation.key()), "{" + StringUtils.join(mutation.getColumnFamilies().iterator(), ", ") + "}");
final long entryLocation = reader.getFilePointer();
Runnable runnable = new WrappedRunnable()
{
public void runMayThrow() throws IOException
{
- if (Schema.instance.getKSMetaData(rm.getKeyspaceName()) == null)
+ if (Schema.instance.getKSMetaData(mutation.getKeyspaceName()) == null)
return;
- if (pointInTimeExceeded(rm))
+ if (pointInTimeExceeded(mutation))
return;
- final Keyspace keyspace = Keyspace.open(rm.getKeyspaceName());
+ final Keyspace keyspace = Keyspace.open(mutation.getKeyspaceName());
- // Rebuild the row mutation, omitting column families that
+ // Rebuild the mutation, omitting column families that
// a) the user has requested that we ignore,
// b) have already been flushed,
// or c) are part of a cf that was dropped.
// Keep in mind that the cf.name() is suspect. do every thing based on the cfid instead.
- RowMutation newRm = null;
- for (ColumnFamily columnFamily : replayFilter.filter(rm))
+ Mutation newMutation = null;
+ for (ColumnFamily columnFamily : replayFilter.filter(mutation))
{
if (Schema.instance.getCF(columnFamily.id()) == null)
continue; // dropped
@@ -395,16 +395,16 @@ public void runMayThrow() throws IOException
// if it is the last known segment, if we are after the replay position
if (segmentId > rp.segment || (segmentId == rp.segment && entryLocation > rp.position))
{
- if (newRm == null)
- newRm = new RowMutation(rm.getKeyspaceName(), rm.key());
- newRm.add(columnFamily);
+ if (newMutation == null)
+ newMutation = new Mutation(mutation.getKeyspaceName(), mutation.key());
+ newMutation.add(columnFamily);
replayedCount.incrementAndGet();
}
}
- if (newRm != null)
+ if (newMutation != null)
{
- assert !newRm.isEmpty();
- Keyspace.open(newRm.getKeyspaceName()).apply(newRm, false);
+ assert !newMutation.isEmpty();
+ Keyspace.open(newMutation.getKeyspaceName()).apply(newMutation, false);
keyspacesRecovered.add(keyspace);
}
}
@@ -431,11 +431,11 @@ public void runMayThrow() throws IOException
}
}
- protected boolean pointInTimeExceeded(RowMutation frm)
+ protected boolean pointInTimeExceeded(Mutation fm)
{
long restoreTarget = CommitLog.instance.archiver.restorePointInTime;
- for (ColumnFamily families : frm.getColumnFamilies())
+ for (ColumnFamily families : fm.getColumnFamilies())
{
if (CommitLog.instance.archiver.precision.toMillis(families.maxTimestamp()) > restoreTarget)
return true;
View
14 src/java/org/apache/cassandra/db/commitlog/CommitLogSegment.java
@@ -46,14 +46,14 @@
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamily;
-import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.io.FSWriteError;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.utils.PureJavaCrc32;
import org.apache.cassandra.utils.WaitQueue;
/*
- * A single commit log file on disk. Manages creation of the file and writing row mutations to disk,
+ * A single commit log file on disk. Manages creation of the file and writing mutations to disk,
* as well as tracking the last mutation position of any "dirty" CFs covered by the segment file. Segment
* files are initially allocated to a fixed size and can grow to accomidate a larger value if necessary.
*/
@@ -166,11 +166,11 @@ static long getNextId()
}
/**
- * allocate space in this buffer for the provided row mutation, and populate the provided
+ * allocate space in this buffer for the provided mutation, and populate the provided
* Allocation object, returning true on success. False indicates there is not enough room in
* this segment, and a new segment is needed
*/
- boolean allocate(RowMutation rowMutation, int size, Allocation alloc)
+ boolean allocate(Mutation mutation, int size, Allocation alloc)
{
final AppendLock appendLock = lockForAppend();
try
@@ -185,7 +185,7 @@ boolean allocate(RowMutation rowMutation, int size, Allocation alloc)
alloc.position = position;
alloc.segment = this;
alloc.appendLock = appendLock;
- markDirty(rowMutation, position);
+ markDirty(mutation, position);
return true;
}
catch (Throwable t)
@@ -386,9 +386,9 @@ void close()
}
}
- void markDirty(RowMutation rowMutation, int allocatedPosition)
+ void markDirty(Mutation mutation, int allocatedPosition)
{
- for (ColumnFamily columnFamily : rowMutation.getColumnFamilies())
+ for (ColumnFamily columnFamily : mutation.getColumnFamilies())
{
// check for deleted CFS
CFMetaData cfm = columnFamily.metadata();
View
9 src/java/org/apache/cassandra/db/commitlog/CommitLogSegmentManager.java
@@ -29,7 +29,6 @@
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingQueue;
@@ -44,7 +43,7 @@
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.Keyspace;
-import org.apache.cassandra.db.RowMutation;
+import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.io.util.FileUtils;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.StorageService;
@@ -168,16 +167,16 @@ public void runMayThrow() throws Exception
}
/**
- * Reserve space in the current segment for the provided row mutation or, if there isn't space available,
+ * Reserve space in the current segment for the provided mutation or, if there isn't space available,
* create a new segment.
*
* @return the provided Allocation object
*/
- public Allocation allocate(RowMutation rowMutation, int size, Allocation alloc)
+ public Allocation allocate(Mutation mutation, int size, Allocation alloc)
{
CommitLogSegment segment = allocatingFrom();
- while (!segment.allocate(rowMutation, size, alloc))
+ while (!segment.allocate(mutation, size, alloc))
{
// failed to allocate, so move to a new segment with enough room
advanceAllocatingFrom(segment);
View
8 src/java/org/apache/cassandra/net/MessagingService.java
@@ -189,8 +189,8 @@
put(Verb.REQUEST_RESPONSE, CallbackDeterminedSerializer.instance);
put(Verb.INTERNAL_RESPONSE, CallbackDeterminedSerializer.instance);
- put(Verb.MUTATION, RowMutation.serializer);
- put(Verb.READ_REPAIR, RowMutation.serializer);
+ put(Verb.MUTATION, Mutation.serializer);
+ put(Verb.READ_REPAIR, Mutation.serializer);
put(Verb.READ, ReadCommand.serializer);
put(Verb.RANGE_SLICE, RangeSliceCommand.serializer);
put(Verb.PAGED_RANGE, PagedRangeCommand.serializer);
@@ -334,8 +334,8 @@ public Object apply(Pair<Integer, ExpiringMap.CacheableObject<CallbackInfo>> pai
if (expiredCallbackInfo.shouldHint())
{
- RowMutation rm = (