Permalink
Browse files

Make it possible to cache a configurable amount of rows in row cache.

Patch by pcmanus and marcuse, reviewed by pcmanus for CASSANDRA-5357
  • Loading branch information...
1 parent 98e6b08 commit ab6eaed8f6bdef21323be561a22e9fdb16bbd0fc @krummas krummas committed Feb 6, 2014
@@ -55,7 +55,7 @@ namespace rb CassandraThrift
# An effort should be made not to break forward-client-compatibility either
# (e.g. one should avoid removing obsolete fields from the IDL), but no
# guarantees in this respect are made by the Cassandra project.
-const string VERSION = "19.38.0"
+const string VERSION = "19.38.1"
#
@@ -473,6 +473,7 @@ struct CfDef {
41: optional i32 index_interval,
42: optional string speculative_retry="NONE",
43: optional list<TriggerDef> triggers,
+ 44: optional string cells_per_row_to_cache = "100",
/* All of the following are now ignored and unsupplied. */

Large diffs are not rendered by default.

Oops, something went wrong.

Some generated files are not rendered by default. Learn more.

Oops, something went wrong.
@@ -63,6 +63,7 @@ class Cql3ParsingRuleSet(CqlParsingRuleSet):
columnfamily_layout_options = (
('bloom_filter_fp_chance', None),
('caching', None),
+ ('rows_per_partition_to_cache', None),
('comment', None),
('dclocal_read_repair_chance', 'local_read_repair_chance'),
('gc_grace_seconds', None),
@@ -140,6 +140,7 @@ public static String getFunctionNames()
INDEX_INTERVAL,
MEMTABLE_FLUSH_PERIOD_IN_MS,
CACHING,
+ CELLS_PER_ROW_TO_CACHE,
DEFAULT_TIME_TO_LIVE,
SPECULATIVE_RETRY,
POPULATE_IO_CACHE_ON_FLUSH,
@@ -1348,6 +1349,9 @@ private CfDef updateCfDefAttributes(Tree statement, CfDef cfDefToUpdate)
case CACHING:
cfDef.setCaching(CliUtils.unescapeSQLString(mValue));
break;
+ case CELLS_PER_ROW_TO_CACHE:
+ cfDef.setCells_per_row_to_cache(mValue);
+ break;
case DEFAULT_TIME_TO_LIVE:
cfDef.setDefault_time_to_live(Integer.parseInt(mValue));
break;
@@ -1819,6 +1823,7 @@ private void showColumnFamily(PrintStream output, CfDef cfDef)
writeAttr(output, false, "replicate_on_write", cfDef.replicate_on_write);
writeAttr(output, false, "compaction_strategy", cfDef.compaction_strategy);
writeAttr(output, false, "caching", cfDef.caching);
+ writeAttr(output, false, "cells_per_row_to_cache", cfDef.cells_per_row_to_cache);
writeAttr(output, false, "default_time_to_live", cfDef.default_time_to_live);
writeAttr(output, false, "speculative_retry", cfDef.speculative_retry);
@@ -84,6 +84,7 @@
public final static SpeculativeRetry DEFAULT_SPECULATIVE_RETRY = new SpeculativeRetry(SpeculativeRetry.RetryType.PERCENTILE, 0.99);
public final static int DEFAULT_INDEX_INTERVAL = 128;
public final static boolean DEFAULT_POPULATE_IO_CACHE_ON_FLUSH = false;
+ public final static RowsPerPartitionToCache DEFAULT_ROWS_PER_PARTITION_TO_CACHE = new RowsPerPartitionToCache(100, RowsPerPartitionToCache.Type.HEAD);
// Note that this is the default only for user created tables
public final static String DEFAULT_COMPRESSOR = LZ4Compressor.class.getCanonicalName();
@@ -145,6 +146,7 @@ public int compare(ColumnDefinition def1, ColumnDefinition def2)
+ "populate_io_cache_on_flush boolean,"
+ "index_interval int,"
+ "dropped_columns map<text, bigint>,"
+ + "rows_per_partition_to_cache text,"
+ "PRIMARY KEY (keyspace_name, columnfamily_name)"
+ ") WITH COMMENT='ColumnFamily definitions' AND gc_grace_seconds=8640");
@@ -311,6 +313,49 @@ public static Caching fromString(String cache) throws ConfigurationException
}
}
+ public static class RowsPerPartitionToCache
+ {
+ public enum Type
+ {
+ ALL, HEAD
+ }
+ public final int rowsToCache;
+ public final Type type;
+
+ private RowsPerPartitionToCache(int rowsToCache, Type type)
+ {
+ this.rowsToCache = rowsToCache;
+ this.type = type;
+ }
+
+ public static RowsPerPartitionToCache fromString(String rpptc)
+ {
+ if (rpptc.equalsIgnoreCase("all"))
+ return new RowsPerPartitionToCache(Integer.MAX_VALUE, Type.ALL);
+ return new RowsPerPartitionToCache(Integer.parseInt(rpptc), Type.HEAD);
+ }
+
+ public boolean cacheFullPartitions()
+ {
+ return type == Type.ALL;
+ }
+
+ public String toString()
+ {
+ if (rowsToCache == Integer.MAX_VALUE)
+ return "ALL";
+ return String.valueOf(rowsToCache);
+ }
+
+ public boolean equals(Object rhs)
+ {
+ if (!(rhs instanceof RowsPerPartitionToCache))
+ return false;
+ RowsPerPartitionToCache rppc = (RowsPerPartitionToCache)rhs;
+ return rowsToCache == rppc.rowsToCache && type == rppc.type;
+ }
+ }
+
public static class SpeculativeRetry
{
public enum RetryType
@@ -407,7 +452,7 @@ public String toString()
private volatile Map<ColumnIdentifier, Long> droppedColumns = new HashMap<>();
private volatile Map<String, TriggerDefinition> triggers = new HashMap<>();
private volatile boolean isPurged = false;
-
+ private volatile RowsPerPartitionToCache rowsPerPartitionToCache = DEFAULT_ROWS_PER_PARTITION_TO_CACHE;
/*
* All CQL3 columns definition are stored in the columnMetadata map.
* On top of that, we keep separated collection of each kind of definition, to
@@ -450,6 +495,7 @@ public String toString()
public CFMetaData populateIoCacheOnFlush(boolean prop) {populateIoCacheOnFlush = prop; return this;}
public CFMetaData droppedColumns(Map<ColumnIdentifier, Long> cols) {droppedColumns = cols; return this;}
public CFMetaData triggers(Map<String, TriggerDefinition> prop) {triggers = prop; return this;}
+ public CFMetaData rowsPerPartitionToCache(RowsPerPartitionToCache prop) { rowsPerPartitionToCache = prop; return this; }
/**
* Create new ColumnFamily metadata with generated random ID.
@@ -636,6 +682,7 @@ static CFMetaData copyOpts(CFMetaData newCFMD, CFMetaData oldCFMD)
.populateIoCacheOnFlush(oldCFMD.populateIoCacheOnFlush)
.droppedColumns(new HashMap<>(oldCFMD.droppedColumns))
.triggers(new HashMap<>(oldCFMD.triggers))
+ .rowsPerPartitionToCache(oldCFMD.rowsPerPartitionToCache)
.rebuild();
}
@@ -813,6 +860,11 @@ public Caching getCaching()
return caching;
}
+ public RowsPerPartitionToCache getRowsPerPartitionToCache()
+ {
+ return rowsPerPartitionToCache;
+ }
+
public int getIndexInterval()
{
return indexInterval;
@@ -876,6 +928,7 @@ else if (obj == null || obj.getClass() != getClass())
if (populateIoCacheOnFlush != rhs.populateIoCacheOnFlush) return false;
if (Objects.equal(droppedColumns, rhs.droppedColumns)) return false;
if (Objects.equal(triggers, rhs.triggers)) return false;
+ if (!Objects.equal(rowsPerPartitionToCache, rhs.rowsPerPartitionToCache)) return false;
return true;
}
@@ -908,6 +961,7 @@ public int hashCode()
.append(populateIoCacheOnFlush)
.append(droppedColumns)
.append(triggers)
+ .append(rowsPerPartitionToCache)
.toHashCode();
}
@@ -964,6 +1018,8 @@ public static void applyImplicitDefaults(org.apache.cassandra.thrift.CfDef cf_de
cf_def.setDefault_time_to_live(CFMetaData.DEFAULT_DEFAULT_TIME_TO_LIVE);
if (!cf_def.isSetDclocal_read_repair_chance())
cf_def.setDclocal_read_repair_chance(CFMetaData.DEFAULT_DCLOCAL_READ_REPAIR_CHANCE);
+ if (!cf_def.isSetCells_per_row_to_cache())
+ cf_def.setCells_per_row_to_cache(CFMetaData.DEFAULT_ROWS_PER_PARTITION_TO_CACHE.toString());
}
public static CFMetaData fromThrift(org.apache.cassandra.thrift.CfDef cf_def) throws InvalidRequestException, ConfigurationException
@@ -1022,6 +1078,8 @@ public static CFMetaData fromThrift(org.apache.cassandra.thrift.CfDef cf_def) th
newCFMD.populateIoCacheOnFlush(cf_def.populate_io_cache_on_flush);
if (cf_def.isSetTriggers())
newCFMD.triggers(TriggerDefinition.fromThrift(cf_def.triggers));
+ if (cf_def.isSetCells_per_row_to_cache())
+ newCFMD.rowsPerPartitionToCache(RowsPerPartitionToCache.fromString(cf_def.cells_per_row_to_cache));
CompressionParameters cp = CompressionParameters.create(cf_def.compression_options);
@@ -1117,6 +1175,7 @@ void apply(CFMetaData cfm) throws ConfigurationException
bloomFilterFpChance = cfm.bloomFilterFpChance;
memtableFlushPeriod = cfm.memtableFlushPeriod;
caching = cfm.caching;
+ rowsPerPartitionToCache = cfm.rowsPerPartitionToCache;
defaultTimeToLive = cfm.defaultTimeToLive;
speculativeRetry = cfm.speculativeRetry;
populateIoCacheOnFlush = cfm.populateIoCacheOnFlush;
@@ -1265,6 +1324,7 @@ public AbstractCompactionStrategy createCompactionStrategyInstance(ColumnFamilyS
def.setIndex_interval(indexInterval);
def.setMemtable_flush_period_in_ms(memtableFlushPeriod);
def.setCaching(caching.toString());
+ def.setCells_per_row_to_cache(rowsPerPartitionToCache.toString());
def.setDefault_time_to_live(defaultTimeToLive);
def.setSpeculative_retry(speculativeRetry.toString());
def.setTriggers(TriggerDefinition.toThrift(triggers));
@@ -1619,6 +1679,7 @@ private void toSchemaNoColumnsNoTriggers(Mutation mutation, long timestamp)
adder.add("memtable_flush_period_in_ms", memtableFlushPeriod);
adder.add("caching", caching.toString());
+ adder.add("rows_per_partition_to_cache", rowsPerPartitionToCache.toString());
adder.add("default_time_to_live", defaultTimeToLive);
adder.add("compaction_strategy_class", compactionStrategyClass.getName());
adder.add("compression_parameters", json(compressionParameters.asThriftOptions()));
@@ -1684,6 +1745,8 @@ static CFMetaData fromSchemaNoTriggers(UntypedResultSet.Row result, UntypedResul
if (result.has("memtable_flush_period_in_ms"))
cfm.memtableFlushPeriod(result.getInt("memtable_flush_period_in_ms"));
cfm.caching(Caching.valueOf(result.getString("caching")));
+ if (result.has("rows_per_partition_to_cache"))
+ cfm.rowsPerPartitionToCache(RowsPerPartitionToCache.fromString(result.getString("rows_per_partition_to_cache")));
if (result.has("default_time_to_live"))
cfm.defaultTimeToLive(result.getInt("default_time_to_live"));
if (result.has("speculative_retry"))
@@ -2163,6 +2226,7 @@ public String toString()
.append("bloomFilterFpChance", bloomFilterFpChance)
.append("memtable_flush_period_in_ms", memtableFlushPeriod)
.append("caching", caching)
+ .append("rowsPerPartitionToCache", rowsPerPartitionToCache)
.append("defaultTimeToLive", defaultTimeToLive)
.append("speculative_retry", speculativeRetry)
.append("indexInterval", indexInterval)
@@ -180,6 +180,7 @@ public static void applyPropertiesToCFMetadata(CFMetaData cfm, CFPropDefs cfProp
cfm.minCompactionThreshold(minCompactionThreshold);
cfm.maxCompactionThreshold(maxCompactionThreshold);
cfm.caching(CFMetaData.Caching.fromString(cfProps.getPropertyString(CFPropDefs.KW_CACHING, cfm.getCaching().toString())));
+ cfm.rowsPerPartitionToCache(CFMetaData.RowsPerPartitionToCache.fromString(cfProps.getPropertyString(CFPropDefs.KW_ROWS_PER_PARTITION_TO_CACHE, cfm.getRowsPerPartitionToCache().toString())));
cfm.defaultTimeToLive(cfProps.getPropertyInt(CFPropDefs.KW_DEFAULT_TIME_TO_LIVE, cfm.getDefaultTimeToLive()));
cfm.speculativeRetry(CFMetaData.SpeculativeRetry.fromString(cfProps.getPropertyString(CFPropDefs.KW_SPECULATIVE_RETRY, cfm.getSpeculativeRetry().toString())));
cfm.populateIoCacheOnFlush(cfProps.getPropertyBoolean(CFPropDefs.KW_POPULATE_IO_CACHE_ON_FLUSH, cfm.populateIoCacheOnFlush()));
@@ -49,6 +49,7 @@
public static final String KW_MAXCOMPACTIONTHRESHOLD = "max_compaction_threshold";
public static final String KW_COMPACTION_STRATEGY_CLASS = "compaction_strategy_class";
public static final String KW_CACHING = "caching";
+ public static final String KW_ROWS_PER_PARTITION_TO_CACHE = "rows_per_partition_to_cache";
public static final String KW_DEFAULT_TIME_TO_LIVE = "default_time_to_live";
public static final String KW_SPECULATIVE_RETRY = "speculative_retry";
public static final String KW_POPULATE_IO_CACHE_ON_FLUSH = "populate_io_cache_on_flush";
@@ -91,6 +92,7 @@
keywords.add(KW_MAXCOMPACTIONTHRESHOLD);
keywords.add(KW_COMPACTION_STRATEGY_CLASS);
keywords.add(KW_CACHING);
+ keywords.add(KW_ROWS_PER_PARTITION_TO_CACHE);
keywords.add(KW_DEFAULT_TIME_TO_LIVE);
keywords.add(KW_SPECULATIVE_RETRY);
keywords.add(KW_POPULATE_IO_CACHE_ON_FLUSH);
@@ -197,6 +197,7 @@ public CFMetaData getCFMetaData(String keyspace, List<ByteBuffer> variables) thr
.compactionStrategyOptions(cfProps.compactionStrategyOptions)
.compressionParameters(CompressionParameters.create(cfProps.compressionParameters))
.caching(CFMetaData.Caching.fromString(getPropertyString(CFPropDefs.KW_CACHING, CFMetaData.DEFAULT_CACHING_STRATEGY.toString())))
+ .rowsPerPartitionToCache(CFMetaData.RowsPerPartitionToCache.fromString(cfProps.getPropertyString(CFPropDefs.KW_ROWS_PER_PARTITION_TO_CACHE, CFMetaData.DEFAULT_ROWS_PER_PARTITION_TO_CACHE.toString())))
.speculativeRetry(CFMetaData.SpeculativeRetry.fromString(getPropertyString(CFPropDefs.KW_SPECULATIVE_RETRY, CFMetaData.DEFAULT_SPECULATIVE_RETRY.toString())))
.bloomFilterFpChance(getPropertyDouble(CFPropDefs.KW_BF_FP_CHANCE, null))
.memtableFlushPeriod(getPropertyInt(CFPropDefs.KW_MEMTABLE_FLUSH_PERIOD, 0))
@@ -25,6 +25,7 @@
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.SyntaxException;
import org.apache.cassandra.io.compress.CompressionParameters;
+import org.apache.cassandra.service.CacheService;
public class CFPropDefs extends PropertyDefinitions
{
@@ -35,6 +36,7 @@
public static final String KW_MINCOMPACTIONTHRESHOLD = "min_threshold";
public static final String KW_MAXCOMPACTIONTHRESHOLD = "max_threshold";
public static final String KW_CACHING = "caching";
+ public static final String KW_ROWS_PER_PARTITION_TO_CACHE = "rows_per_partition_to_cache";
public static final String KW_DEFAULT_TIME_TO_LIVE = "default_time_to_live";
public static final String KW_INDEX_INTERVAL = "index_interval";
public static final String KW_SPECULATIVE_RETRY = "speculative_retry";
@@ -57,6 +59,7 @@
keywords.add(KW_DCLOCALREADREPAIRCHANCE);
keywords.add(KW_GCGRACESECONDS);
keywords.add(KW_CACHING);
+ keywords.add(KW_ROWS_PER_PARTITION_TO_CACHE);
keywords.add(KW_DEFAULT_TIME_TO_LIVE);
keywords.add(KW_INDEX_INTERVAL);
keywords.add(KW_SPECULATIVE_RETRY);
@@ -153,6 +156,11 @@ public void applyToCFMetadata(CFMetaData cfm) throws ConfigurationException, Syn
cfm.minCompactionThreshold(minCompactionThreshold);
cfm.maxCompactionThreshold(maxCompactionThreshold);
cfm.caching(CFMetaData.Caching.fromString(getString(KW_CACHING, cfm.getCaching().toString())));
+ CFMetaData.RowsPerPartitionToCache newRppc = CFMetaData.RowsPerPartitionToCache.fromString(getString(KW_ROWS_PER_PARTITION_TO_CACHE, cfm.getRowsPerPartitionToCache().toString()));
+ // we need to invalidate row cache if the amount of rows cached changes, otherwise we might serve out bad data.
+ if (!cfm.getRowsPerPartitionToCache().equals(newRppc))
+ CacheService.instance.invalidateRowCacheForCf(cfm.cfId);
+ cfm.rowsPerPartitionToCache(newRppc);
cfm.defaultTimeToLive(getInt(KW_DEFAULT_TIME_TO_LIVE, cfm.getDefaultTimeToLive()));
cfm.speculativeRetry(CFMetaData.SpeculativeRetry.fromString(getString(KW_SPECULATIVE_RETRY, cfm.getSpeculativeRetry().toString())));
cfm.memtableFlushPeriod(getInt(KW_MEMTABLE_FLUSH_PERIOD, cfm.getMemtableFlushPeriod()));
@@ -42,6 +42,7 @@
import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.composites.CellNameType;
import org.apache.cassandra.db.composites.CellNames;
+import org.apache.cassandra.db.filter.ColumnCounter;
import org.apache.cassandra.db.filter.ColumnSlice;
import org.apache.cassandra.io.sstable.ColumnNameHelper;
import org.apache.cassandra.io.sstable.ColumnStats;
@@ -87,6 +88,14 @@ public ColumnFamilyType getType()
return metadata.cfType;
}
+ public int liveCQL3RowCount(long now)
+ {
+ ColumnCounter counter = getComparator().isDense()
+ ? new ColumnCounter(now)
+ : new ColumnCounter.GroupByPrefix(now, getComparator(), metadata.clusteringColumns().size());
+ return counter.countAll(this).live();
+ }
+
/**
* Clones the column map.
*/
Oops, something went wrong.

0 comments on commit ab6eaed

Please sign in to comment.