Skip to content

Commit

Permalink
Follow-up
Browse files Browse the repository at this point in the history
  • Loading branch information
iamaleksey committed Jul 31, 2015
1 parent 3ad8f6b commit 40b0d7b
Show file tree
Hide file tree
Showing 13 changed files with 61 additions and 52 deletions.
14 changes: 8 additions & 6 deletions src/java/org/apache/cassandra/config/CFMetaData.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.github.jamm.Unmetered;
import org.apache.cassandra.cache.CachingOptions;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.cql3.QueryProcessor;
Expand All @@ -47,7 +46,6 @@
import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.dht.LocalPartitioner;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.io.compress.CompressionParameters;
import org.apache.cassandra.io.compress.LZ4Compressor;
Expand All @@ -57,6 +55,7 @@
import org.apache.cassandra.schema.SchemaKeyspace;
import org.apache.cassandra.schema.Triggers;
import org.apache.cassandra.utils.*;
import org.github.jamm.Unmetered;

/**
* This class can be tricky to modify. Please read http://wiki.apache.org/cassandra/ConfigurationNotes for how to do so safely.
Expand Down Expand Up @@ -185,6 +184,8 @@ public String toString()
private final boolean isCounter;
private final boolean isMaterializedView;

private final boolean isIndex;

public volatile ClusteringComparator comparator; // bytes, long, timeuuid, utf8, etc. This is built directly from clusteringColumns
public final IPartitioner partitioner; // partitioner the table uses

Expand Down Expand Up @@ -288,6 +289,8 @@ private CFMetaData(String keyspace,
flags.add(Flag.MATERIALIZEDVIEW);
this.flags = Sets.immutableEnumSet(flags);

isIndex = cfName.contains(".");

assert partitioner != null;
this.partitioner = partitioner;

Expand Down Expand Up @@ -480,7 +483,7 @@ public CFMetaData copy(UUID newCfId)
this);
}

public CFMetaData copyWithPartitioner(IPartitioner partitioner)
public CFMetaData copy(IPartitioner partitioner)
{
return copyOpts(new CFMetaData(ksName,
cfName,
Expand Down Expand Up @@ -566,11 +569,10 @@ public boolean isSecondaryIndex()

/**
* true if this CFS contains secondary index data.
* FIXME: This should map to the above, partitioner is not a certain way to recognize indices.
*/
public boolean isIndex()
{
return partitioner instanceof LocalPartitioner;
return isIndex;
}

public DecoratedKey decorateKey(ByteBuffer key)
Expand All @@ -589,7 +591,7 @@ public Map<ByteBuffer, ColumnDefinition> getColumnMetadata()
*/
public String getParentColumnFamilyName()
{
return isSecondaryIndex() ? cfName.substring(0, cfName.indexOf('.')) : null;
return isIndex ? cfName.substring(0, cfName.indexOf('.')) : null;
}

public double getReadRepairChance()
Expand Down
15 changes: 7 additions & 8 deletions src/java/org/apache/cassandra/db/HintedHandOffManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,9 @@
import com.google.common.collect.Lists;
import com.google.common.util.concurrent.RateLimiter;
import com.google.common.util.concurrent.Uninterruptibles;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.concurrent.JMXEnabledScheduledThreadPoolExecutor;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.config.ColumnDefinition;
Expand All @@ -46,7 +46,6 @@
import org.apache.cassandra.db.filter.*;
import org.apache.cassandra.db.marshal.Int32Type;
import org.apache.cassandra.db.marshal.UUIDType;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.exceptions.WriteTimeoutException;
import org.apache.cassandra.gms.ApplicationState;
import org.apache.cassandra.gms.FailureDetector;
Expand Down Expand Up @@ -543,12 +542,12 @@ public void pauseHintsDelivery(boolean b)

public List<String> listEndpointsPendingHints()
{
Token.TokenFactory tokenFactory = hintStore.getPartitioner().getTokenFactory();

// Extract the keys as strings to be reported.
LinkedList<String> result = new LinkedList<>();
List<String> result = new ArrayList<>();

ReadCommand cmd = PartitionRangeReadCommand.allDataRead(SystemKeyspace.Hints, FBUtilities.nowInSeconds());
try (ReadOrderGroup orderGroup = cmd.startOrderGroup(); UnfilteredPartitionIterator iter = cmd.executeLocally(orderGroup))
try (ReadOrderGroup orderGroup = cmd.startOrderGroup();
UnfilteredPartitionIterator iter = cmd.executeLocally(orderGroup))
{
while (iter.hasNext())
{
Expand All @@ -557,11 +556,11 @@ public List<String> listEndpointsPendingHints()
// We don't delete by range on the hints table, so we don't have to worry about the
// iterator returning only range tombstone marker
if (partition.hasNext())
// TODO: This is suspect. Token of the endpoint UUID as String?
result.addFirst(tokenFactory.toString(partition.partitionKey().getToken()));
result.add(UUIDType.instance.compose(partition.partitionKey().getKey()).toString());
}
}
}

return result;
}
}
8 changes: 2 additions & 6 deletions src/java/org/apache/cassandra/db/Mutation.java
Original file line number Diff line number Diff line change
Expand Up @@ -271,9 +271,8 @@ public void serialize(Mutation mutation, DataOutputPlus out, int version) throws

public Mutation deserialize(DataInputPlus in, int version, SerializationHelper.Flag flag) throws IOException
{
String keyspaceName = null; // will always be set from cf.metadata but javac isn't smart enough to see that
if (version < MessagingService.VERSION_20)
keyspaceName = in.readUTF();
in.readUTF(); // read pre-2.0 keyspace name

ByteBuffer key = null;
int size;
Expand Down Expand Up @@ -301,10 +300,7 @@ public Mutation deserialize(DataInputPlus in, int version, SerializationHelper.F
modifications.put(update.metadata().cfId, update);
}

if (keyspaceName == null)
keyspaceName = update.metadata().ksName;

return new Mutation(keyspaceName, dk, modifications);
return new Mutation(update.metadata().ksName, dk, modifications);
}

public Mutation deserialize(DataInputPlus in, int version) throws IOException
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -766,8 +766,7 @@ public PartitionUpdate deserialize(DataInputPlus in, int version, SerializationH
}
}

private PartitionUpdate deserialize30(DataInputPlus in, int version, SerializationHelper.Flag flag)
throws IOException
private static PartitionUpdate deserialize30(DataInputPlus in, int version, SerializationHelper.Flag flag) throws IOException
{
CFMetaData metadata = CFMetaData.serializer.deserialize(in, version);
UnfilteredRowIteratorSerializer.Header header = UnfilteredRowIteratorSerializer.serializer.deserializeHeader(in, version, metadata, flag);
Expand Down Expand Up @@ -803,7 +802,7 @@ private PartitionUpdate deserialize30(DataInputPlus in, int version, Serializati
false);
}

private CFMetaData deserializeMetadata(DataInputPlus in, int version) throws IOException
private static CFMetaData deserializeMetadata(DataInputPlus in, int version) throws IOException
{
// This is only used in mutation, and mutation have never allowed "null" column families
boolean present = in.readBoolean();
Expand All @@ -813,7 +812,7 @@ private CFMetaData deserializeMetadata(DataInputPlus in, int version) throws IOE
return metadata;
}

private PartitionUpdate deserializePre30(DataInputPlus in, int version, SerializationHelper.Flag flag, CFMetaData metadata, DecoratedKey dk) throws IOException
private static PartitionUpdate deserializePre30(DataInputPlus in, int version, SerializationHelper.Flag flag, CFMetaData metadata, DecoratedKey dk) throws IOException
{
LegacyLayout.LegacyDeletionInfo info = LegacyLayout.LegacyDeletionInfo.serializer.deserialize(metadata, in, version);
int size = in.readInt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,6 @@ public AbstractType<?> getTokenValidator()
return BytesType.instance;
}

@Override
public AbstractType<?> partitionOrdering()
{
return BytesType.instance;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,7 @@ private static void addTableToKeyspace(KeyspaceMetadata keyspace, CFMetaData tab
*/
public Builder withPartitioner(IPartitioner partitioner)
{
this.schema = schema.copyWithPartitioner(partitioner);
this.schema = schema.copy(partitioner);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,24 @@
import com.google.common.collect.Ordering;
import com.google.common.primitives.Longs;
import com.google.common.util.concurrent.RateLimiter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.clearspring.analytics.stream.cardinality.CardinalityMergeException;
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus;
import com.clearspring.analytics.stream.cardinality.ICardinality;

import org.apache.cassandra.cache.CachingOptions;
import org.apache.cassandra.cache.InstrumentingCache;
import org.apache.cassandra.cache.KeyCacheKey;
import org.apache.cassandra.concurrent.DebuggableThreadPoolExecutor;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.*;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.db.commitlog.ReplayPosition;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.db.lifecycle.TransactionLogs;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.dht.*;
import org.apache.cassandra.io.FSError;
import org.apache.cassandra.io.compress.CompressionMetadata;
Expand All @@ -59,10 +60,6 @@
import org.apache.cassandra.service.CacheService;
import org.apache.cassandra.utils.*;
import org.apache.cassandra.utils.concurrent.OpOrder;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.utils.concurrent.Ref;
import org.apache.cassandra.utils.concurrent.SelfRefCounted;

Expand Down Expand Up @@ -1292,8 +1289,8 @@ public CompressionMetadata getCompressionMetadata()

CompressionMetadata cmd = ((ICompressedFile) dfile).getMetadata();

//We need the parent cf metadata
String cfName = metadata.isSecondaryIndex() ? metadata.getParentColumnFamilyName() : metadata.cfName;
// We need the parent cf metadata
String cfName = metadata.isIndex() ? metadata.getParentColumnFamilyName() : metadata.cfName;
cmd.parameters.setLiveMetadata(Schema.instance.getCFMetaData(metadata.ksName, cfName));

return cmd;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void doVerb(final MessageIn<RepairMessage> message, final int id)
public boolean apply(SSTableReader sstable)
{
return sstable != null &&
!(sstable.metadata.isIndex()) && // exclude SSTables from 2i
!sstable.metadata.isIndex() && // exclude SSTables from 2i
new Bounds<>(sstable.first.getToken(), sstable.last.getToken()).intersects(Collections.singleton(repairingRange));
}
}, true); //ephemeral snapshot, if repair fails, it will be cleaned next startup
Expand Down
15 changes: 10 additions & 5 deletions src/java/org/apache/cassandra/schema/LegacySchemaMigrator.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.db.rows.RowIterator;
import org.apache.cassandra.db.rows.UnfilteredRowIterators;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.io.compress.CompressionParameters;
import org.apache.cassandra.utils.ByteBufferUtil;
Expand Down Expand Up @@ -307,10 +306,16 @@ private static CFMetaData decodeTableMetadata(UntypedResultSet.Row tableRow,
defaultValidator);
}

// The legacy schema did not have views, so we know that we are not loading a materialized view
boolean isMaterializedView = false;
IPartitioner partitioner = tableRow.has("partitioner") ? FBUtilities.newPartitioner(tableRow.getString("partitioner")) : DatabaseDescriptor.getPartitioner();
CFMetaData cfm = CFMetaData.create(ksName, cfName, cfId, isDense, isCompound, isSuper, isCounter, isMaterializedView, columnDefs, partitioner);
CFMetaData cfm = CFMetaData.create(ksName,
cfName,
cfId,
isDense,
isCompound,
isSuper,
isCounter,
false, // legacy schema did not contain views
columnDefs,
DatabaseDescriptor.getPartitioner());

cfm.readRepairChance(tableRow.getDouble("read_repair_chance"));
cfm.dcLocalReadRepairChance(tableRow.getDouble("local_read_repair_chance"));
Expand Down
17 changes: 11 additions & 6 deletions src/java/org/apache/cassandra/schema/SchemaKeyspace.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.db.partitions.*;
import org.apache.cassandra.db.rows.*;
import org.apache.cassandra.dht.IPartitioner;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.io.compress.CompressionParameters;
Expand Down Expand Up @@ -113,7 +112,6 @@ private SchemaKeyspace()
+ "min_index_interval int,"
+ "read_repair_chance double,"
+ "speculative_retry text,"
+ "partitioner text,"
+ "PRIMARY KEY ((keyspace_name), table_name))");

private static final CFMetaData Columns =
Expand Down Expand Up @@ -703,7 +701,7 @@ public static Mutation makeCreateKeyspaceMutation(KeyspaceMetadata keyspace, lon
public static Mutation makeDropKeyspaceMutation(KeyspaceMetadata keyspace, long timestamp)
{
int nowInSec = FBUtilities.nowInSeconds();
Mutation mutation = new Mutation(NAME, All.get(0).decorateKey(getSchemaKSKey(keyspace.name)));
Mutation mutation = new Mutation(NAME, Keyspaces.decorateKey(getSchemaKSKey(keyspace.name)));

for (CFMetaData schemaTable : All)
mutation.add(PartitionUpdate.fullPartitionDelete(schemaTable, mutation.key(), timestamp, nowInSec));
Expand Down Expand Up @@ -834,7 +832,6 @@ static void addTableToSchemaMutation(CFMetaData table, long timestamp, boolean w
.add("min_index_interval", table.getMinIndexInterval())
.add("read_repair_chance", table.getReadRepairChance())
.add("speculative_retry", table.getSpeculativeRetry().toString())
.add("partitioner", table.partitioner.getClass().getCanonicalName())
.map("caching", table.getCaching().asMap())
.map("compaction", buildCompactionMap(table))
.map("compression", table.compressionParameters().asMap())
Expand Down Expand Up @@ -1081,7 +1078,6 @@ public static CFMetaData createTableFromTableRowAndColumns(UntypedResultSet.Row
String keyspace = row.getString("keyspace_name");
String table = row.getString("table_name");
UUID id = row.getUUID("id");
IPartitioner partitioner = row.has("partitioner") ? FBUtilities.newPartitioner(row.getString("partitioner")) : DatabaseDescriptor.getPartitioner();

Set<CFMetaData.Flag> flags = row.has("flags")
? flagsFromStrings(row.getSet("flags", UTF8Type.instance))
Expand All @@ -1093,7 +1089,16 @@ public static CFMetaData createTableFromTableRowAndColumns(UntypedResultSet.Row
boolean isCompound = flags.contains(CFMetaData.Flag.COMPOUND);
boolean isMaterializedView = flags.contains(CFMetaData.Flag.MATERIALIZEDVIEW);

CFMetaData cfm = CFMetaData.create(keyspace, table, id, isDense, isCompound, isSuper, isCounter, isMaterializedView, columns, partitioner);
CFMetaData cfm = CFMetaData.create(keyspace,
table,
id,
isDense,
isCompound,
isSuper,
isCounter,
isMaterializedView,
columns,
DatabaseDescriptor.getPartitioner());

Map<String, String> compaction = new HashMap<>(row.getTextMap("compaction"));
Class<? extends AbstractCompactionStrategy> compactionStrategyClass =
Expand Down
1 change: 0 additions & 1 deletion src/java/org/apache/cassandra/thrift/CassandraServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import com.google.common.base.Joiner;
import com.google.common.collect.*;
import com.google.common.primitives.Longs;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
12 changes: 10 additions & 2 deletions src/java/org/apache/cassandra/utils/NativeSSTableLoaderClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -128,8 +128,16 @@ private static Map<String, CFMetaData> fetchTablesMetadata(String keyspace, Sess
for (Row colRow : session.execute(columnsQuery, keyspace, name))
defs.add(createDefinitionFromRow(colRow, keyspace, name));

IPartitioner partitioner = !row.isNull("partitioner") ? FBUtilities.newPartitioner(row.getString("partitioner")) : DatabaseDescriptor.getPartitioner();
tables.put(name, CFMetaData.create(keyspace, name, id, isDense, isCompound, isSuper, isCounter, isMaterializedView, defs, partitioner));
tables.put(name, CFMetaData.create(keyspace,
name,
id,
isDense,
isCompound,
isSuper,
isCounter,
isMaterializedView,
defs,
DatabaseDescriptor.getPartitioner()));
}

return tables;
Expand Down
4 changes: 2 additions & 2 deletions test/unit/org/apache/cassandra/db/ScrubTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ public static void defineSchema() throws ConfigurationException
SchemaLoader.standardCFMD(KEYSPACE, CF_UUID, 0, UUIDType.instance),
SchemaLoader.keysIndexCFMD(KEYSPACE, CF_INDEX1, true),
SchemaLoader.compositeIndexCFMD(KEYSPACE, CF_INDEX2, true),
SchemaLoader.keysIndexCFMD(KEYSPACE, CF_INDEX1_BYTEORDERED, true).copyWithPartitioner(ByteOrderedPartitioner.instance),
SchemaLoader.compositeIndexCFMD(KEYSPACE, CF_INDEX2_BYTEORDERED, true).copyWithPartitioner(ByteOrderedPartitioner.instance));
SchemaLoader.keysIndexCFMD(KEYSPACE, CF_INDEX1_BYTEORDERED, true).copy(ByteOrderedPartitioner.instance),
SchemaLoader.compositeIndexCFMD(KEYSPACE, CF_INDEX2_BYTEORDERED, true).copy(ByteOrderedPartitioner.instance));
}

@Test
Expand Down

0 comments on commit 40b0d7b

Please sign in to comment.