Skip to content

Commit

Permalink
Preemptive open of compaction results
Browse files Browse the repository at this point in the history
Patch by benedict; reviewed by marcuse for CASSANDRA-6916
  • Loading branch information
belliottsmith authored and krummas committed Apr 23, 2014
1 parent b3a225e commit 4e95953
Show file tree
Hide file tree
Showing 65 changed files with 1,345 additions and 682 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
* Require nodetool rebuild_index to specify index names (CASSANDRA-7038)
* fix cassandra stress errors on reads with native protocol (CASSANDRA-7033)
* Use OpOrder to guard sstable references for reads (CASSANDRA-6919)
* Preemptive opening of compaction result (CASSANDRA-6916)
Merged from 2.0:
* Use LOCAL_QUORUM for data reads at LOCAL_SERIAL (CASSANDRA-6939)
* Log a warning for large batches (CASSANDRA-6487)
Expand Down
15 changes: 5 additions & 10 deletions conf/cassandra.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -511,10 +511,11 @@ in_memory_compaction_limit_in_mb: 64
# of compaction, including validation compaction.
compaction_throughput_mb_per_sec: 16

# Track cached row keys during compaction, and re-cache their new
# positions in the compacted sstable. Disable if you use really large
# key caches.
compaction_preheat_key_cache: true
# When compacting, the replacement sstable(s) can be opened before they
# are completely written, and used in place of the prior sstables for
# any range that has been written. This helps to smoothly transfer reads
# between the sstables, reducing page cache churn and keeping hot rows hot
sstable_preemptive_open_interval_in_mb: 50

# Throttles all outbound streaming file transfers on this node to the
# given total throughput in Mbps. This is necessary because Cassandra does
Expand Down Expand Up @@ -730,9 +731,3 @@ internode_compression: all
# reducing overhead from the TCP protocol itself, at the cost of increasing
# latency if you block for cross-datacenter responses.
inter_dc_tcp_nodelay: false

# Enable or disable kernel page cache preheating from contents of the key cache after compaction.
# When enabled it would preheat only first "page" (4KB) of each row to optimize
# for sequential access. Note: This could be harmful for fat rows, see CASSANDRA-4937
# for further details on that topic.
preheat_kernel_page_cache: false
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/cache/AutoSavingCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ private SequentialWriter tempCacheFile(CacheKey.PathInfo pathInfo)
{
File path = getCachePath(pathInfo.keyspace, pathInfo.columnFamily, pathInfo.cfId, CURRENT_VERSION);
File tmpFile = FileUtils.createTempFile(path.getName(), null, path.getParentFile());
return SequentialWriter.open(tmpFile, true);
return SequentialWriter.open(tmpFile);
}

private void deleteOldCacheFiles()
Expand Down
15 changes: 14 additions & 1 deletion src/java/org/apache/cassandra/cache/RefCountedMemory.java
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,19 @@ public boolean reference()
public void unreference()
{
if (UPDATER.decrementAndGet(this) == 0)
free();
super.free();
}

public RefCountedMemory copy(long newSize)
{
RefCountedMemory copy = new RefCountedMemory(newSize);
copy.put(0, this, 0, Math.min(size(), newSize));
return copy;
}

public void free()
{
throw new AssertionError();
}

}
78 changes: 50 additions & 28 deletions src/java/org/apache/cassandra/config/CFMetaData.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,20 @@
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.UUID;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Objects;
Expand All @@ -37,16 +50,44 @@
import org.slf4j.LoggerFactory;

import org.apache.cassandra.cache.CachingOptions;
import org.apache.cassandra.cql3.*;
import org.apache.cassandra.cql3.ColumnIdentifier;
import org.apache.cassandra.cql3.QueryProcessor;
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.cql3.statements.CFStatement;
import org.apache.cassandra.cql3.statements.CreateTableStatement;
import org.apache.cassandra.db.*;
import org.apache.cassandra.db.AtomDeserializer;
import org.apache.cassandra.db.CFRowAdder;
import org.apache.cassandra.db.Cell;
import org.apache.cassandra.db.ColumnFamily;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ColumnFamilyType;
import org.apache.cassandra.db.ColumnSerializer;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.Mutation;
import org.apache.cassandra.db.OnDiskAtom;
import org.apache.cassandra.db.RangeTombstone;
import org.apache.cassandra.db.Row;
import org.apache.cassandra.db.SuperColumns;
import org.apache.cassandra.db.SystemKeyspace;
import org.apache.cassandra.db.compaction.AbstractCompactionStrategy;
import org.apache.cassandra.db.compaction.LeveledCompactionStrategy;
import org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy;
import org.apache.cassandra.db.composites.*;
import org.apache.cassandra.db.composites.CType;
import org.apache.cassandra.db.composites.CellName;
import org.apache.cassandra.db.composites.CellNameType;
import org.apache.cassandra.db.composites.CellNames;
import org.apache.cassandra.db.composites.Composite;
import org.apache.cassandra.db.composites.CompoundCType;
import org.apache.cassandra.db.composites.SimpleCType;
import org.apache.cassandra.db.index.SecondaryIndex;
import org.apache.cassandra.db.marshal.*;
import org.apache.cassandra.db.marshal.AbstractType;
import org.apache.cassandra.db.marshal.BytesType;
import org.apache.cassandra.db.marshal.CompositeType;
import org.apache.cassandra.db.marshal.CounterColumnType;
import org.apache.cassandra.db.marshal.LongType;
import org.apache.cassandra.db.marshal.TypeParser;
import org.apache.cassandra.db.marshal.UTF8Type;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.RequestValidationException;
Expand All @@ -55,14 +96,16 @@
import org.apache.cassandra.io.compress.LZ4Compressor;
import org.apache.cassandra.io.sstable.Descriptor;
import org.apache.cassandra.serializers.MarshalException;
import org.apache.cassandra.thrift.CqlRow;
import org.apache.cassandra.thrift.CqlResult;
import org.apache.cassandra.thrift.CqlRow;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.cassandra.utils.FBUtilities;
import org.apache.cassandra.utils.UUIDGen;

import static org.apache.cassandra.utils.FBUtilities.*;
import static org.apache.cassandra.utils.FBUtilities.fromJsonList;
import static org.apache.cassandra.utils.FBUtilities.fromJsonMap;
import static org.apache.cassandra.utils.FBUtilities.json;

/**
* This class can be tricky to modify. Please read http://wiki.apache.org/cassandra/ConfigurationNotes for how to do so safely.
Expand All @@ -82,7 +125,6 @@ public final class CFMetaData
public final static SpeculativeRetry DEFAULT_SPECULATIVE_RETRY = new SpeculativeRetry(SpeculativeRetry.RetryType.PERCENTILE, 0.99);
public final static int DEFAULT_MIN_INDEX_INTERVAL = 128;
public final static int DEFAULT_MAX_INDEX_INTERVAL = 2048;
public final static boolean DEFAULT_POPULATE_IO_CACHE_ON_FLUSH = false;

// Note that this is the default only for user created tables
public final static String DEFAULT_COMPRESSOR = LZ4Compressor.class.getCanonicalName();
Expand Down Expand Up @@ -397,7 +439,6 @@ public String toString()
private volatile int memtableFlushPeriod = 0;
private volatile int defaultTimeToLive = DEFAULT_DEFAULT_TIME_TO_LIVE;
private volatile SpeculativeRetry speculativeRetry = DEFAULT_SPECULATIVE_RETRY;
private volatile boolean populateIoCacheOnFlush = DEFAULT_POPULATE_IO_CACHE_ON_FLUSH;
private volatile Map<ColumnIdentifier, Long> droppedColumns = new HashMap<>();
private volatile Map<String, TriggerDefinition> triggers = new HashMap<>();
private volatile boolean isPurged = false;
Expand Down Expand Up @@ -443,7 +484,6 @@ public String toString()
public CFMetaData memtableFlushPeriod(int prop) {memtableFlushPeriod = prop; return this;}
public CFMetaData defaultTimeToLive(int prop) {defaultTimeToLive = prop; return this;}
public CFMetaData speculativeRetry(SpeculativeRetry prop) {speculativeRetry = prop; return this;}
public CFMetaData populateIoCacheOnFlush(boolean prop) {populateIoCacheOnFlush = prop; return this;}
public CFMetaData droppedColumns(Map<ColumnIdentifier, Long> cols) {droppedColumns = cols; return this;}
public CFMetaData triggers(Map<String, TriggerDefinition> prop) {triggers = prop; return this;}

Expand Down Expand Up @@ -621,7 +661,6 @@ static CFMetaData copyOpts(CFMetaData newCFMD, CFMetaData oldCFMD)
.maxIndexInterval(oldCFMD.maxIndexInterval)
.speculativeRetry(oldCFMD.speculativeRetry)
.memtableFlushPeriod(oldCFMD.memtableFlushPeriod)
.populateIoCacheOnFlush(oldCFMD.populateIoCacheOnFlush)
.droppedColumns(new HashMap<>(oldCFMD.droppedColumns))
.triggers(new HashMap<>(oldCFMD.triggers))
.rebuild();
Expand Down Expand Up @@ -673,11 +712,6 @@ public ReadRepairDecision newReadRepairDecision()
return ReadRepairDecision.NONE;
}

public boolean populateIoCacheOnFlush()
{
return populateIoCacheOnFlush;
}

public int getGcGraceSeconds()
{
return gcGraceSeconds;
Expand Down Expand Up @@ -880,7 +914,6 @@ public boolean equals(Object o)
&& Objects.equal(minIndexInterval, other.minIndexInterval)
&& Objects.equal(maxIndexInterval, other.maxIndexInterval)
&& Objects.equal(speculativeRetry, other.speculativeRetry)
&& Objects.equal(populateIoCacheOnFlush, other.populateIoCacheOnFlush)
&& Objects.equal(droppedColumns, other.droppedColumns)
&& Objects.equal(triggers, other.triggers);
}
Expand Down Expand Up @@ -913,7 +946,6 @@ public int hashCode()
.append(minIndexInterval)
.append(maxIndexInterval)
.append(speculativeRetry)
.append(populateIoCacheOnFlush)
.append(droppedColumns)
.append(triggers)
.toHashCode();
Expand All @@ -930,8 +962,6 @@ public static void applyImplicitDefaults(org.apache.cassandra.thrift.CfDef cf_de
{
if (!cf_def.isSetComment())
cf_def.setComment("");
if (!cf_def.isSetPopulate_io_cache_on_flush())
cf_def.setPopulate_io_cache_on_flush(CFMetaData.DEFAULT_POPULATE_IO_CACHE_ON_FLUSH);
if (!cf_def.isSetMin_compaction_threshold())
cf_def.setMin_compaction_threshold(CFMetaData.DEFAULT_MIN_COMPACTION_THRESHOLD);
if (!cf_def.isSetMax_compaction_threshold())
Expand Down Expand Up @@ -1023,7 +1053,6 @@ public static CFMetaData fromThrift(org.apache.cassandra.thrift.CfDef cf_def) th
if (cf_def.isSetSpeculative_retry())
newCFMD.speculativeRetry(SpeculativeRetry.fromString(cf_def.speculative_retry));
if (cf_def.isSetPopulate_io_cache_on_flush())
newCFMD.populateIoCacheOnFlush(cf_def.populate_io_cache_on_flush);
if (cf_def.isSetTriggers())
newCFMD.triggers(TriggerDefinition.fromThrift(cf_def.triggers));

Expand Down Expand Up @@ -1125,7 +1154,6 @@ void apply(CFMetaData cfm) throws ConfigurationException
memtableFlushPeriod = cfm.memtableFlushPeriod;
defaultTimeToLive = cfm.defaultTimeToLive;
speculativeRetry = cfm.speculativeRetry;
populateIoCacheOnFlush = cfm.populateIoCacheOnFlush;

if (!cfm.droppedColumns.isEmpty())
droppedColumns = cfm.droppedColumns;
Expand Down Expand Up @@ -1252,7 +1280,6 @@ public org.apache.cassandra.thrift.CfDef toThrift()
def.setComment(Strings.nullToEmpty(comment));
def.setRead_repair_chance(readRepairChance);
def.setDclocal_read_repair_chance(dcLocalReadRepairChance);
def.setPopulate_io_cache_on_flush(populateIoCacheOnFlush);
def.setGc_grace_seconds(gcGraceSeconds);
def.setDefault_validation_class(defaultValidator == null ? null : defaultValidator.toString());
def.setKey_validation_class(keyValidator.toString());
Expand Down Expand Up @@ -1642,7 +1669,6 @@ private void toSchemaNoColumnsNoTriggers(Mutation mutation, long timestamp)
adder.add("comment", comment);
adder.add("read_repair_chance", readRepairChance);
adder.add("local_read_repair_chance", dcLocalReadRepairChance);
adder.add("populate_io_cache_on_flush", populateIoCacheOnFlush);
adder.add("gc_grace_seconds", gcGraceSeconds);
adder.add("default_validator", defaultValidator.toString());
adder.add("key_validator", keyValidator.toString());
Expand Down Expand Up @@ -1730,9 +1756,6 @@ else if (result.has("index_interval"))
if (result.has("max_index_interval"))
cfm.maxIndexInterval(result.getInt("max_index_interval"));

if (result.has("populate_io_cache_on_flush"))
cfm.populateIoCacheOnFlush(result.getBoolean("populate_io_cache_on_flush"));

/*
* The info previously hold by key_aliases, column_aliases and value_alias is now stored in columnMetadata (because 1) this
* make more sense and 2) this allow to store indexing information).
Expand Down Expand Up @@ -2198,7 +2221,6 @@ public String toString()
.append("minIndexInterval", minIndexInterval)
.append("maxIndexInterval", maxIndexInterval)
.append("speculativeRetry", speculativeRetry)
.append("populateIoCacheOnFlush", populateIoCacheOnFlush)
.append("droppedColumns", droppedColumns)
.append("triggers", triggers)
.toString();
Expand Down
4 changes: 1 addition & 3 deletions src/java/org/apache/cassandra/config/Config.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ public class Config
public int hinted_handoff_throttle_in_kb = 1024;
public int batchlog_replay_throttle_in_kb = 1024;
public int max_hints_delivery_threads = 1;
public boolean compaction_preheat_key_cache = true;
public int sstable_preemptive_open_interval_in_mb = 50;

public volatile boolean incremental_backups = false;
public boolean trickle_fsync = false;
Expand All @@ -199,8 +199,6 @@ public class Config

private static boolean isClientMode = false;

public boolean preheat_kernel_page_cache = false;

public Integer file_cache_size_in_mb;

public boolean inter_dc_tcp_nodelay = true;
Expand Down
15 changes: 5 additions & 10 deletions src/java/org/apache/cassandra/config/DatabaseDescriptor.java
Original file line number Diff line number Diff line change
Expand Up @@ -1331,11 +1331,6 @@ public static int getMaxHintsThread()
return conf.max_hints_delivery_threads;
}

public static boolean getPreheatKeyCache()
{
return conf.compaction_preheat_key_cache;
}

public static boolean isIncrementalBackupsEnabled()
{
return conf.incremental_backups;
Expand All @@ -1356,6 +1351,11 @@ public static long getTotalCommitlogSpaceInMB()
return conf.commitlog_total_space_in_mb;
}

public static int getSSTablePreempiveOpenIntervalInMB()
{
return conf.sstable_preemptive_open_interval_in_mb;
}

public static boolean getTrickleFsync()
{
return conf.trickle_fsync;
Expand Down Expand Up @@ -1476,11 +1476,6 @@ public static boolean getInterDCTcpNoDelay()
return conf.inter_dc_tcp_nodelay;
}

public static boolean shouldPreheatPageCache()
{
return conf.preheat_kernel_page_cache;
}

public static Pool getMemtableAllocatorPool()
{
long heapLimit = ((long) conf.memtable_heap_space_in_mb) << 20;
Expand Down
18 changes: 11 additions & 7 deletions src/java/org/apache/cassandra/cql/AlterTableStatement.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,21 @@
*/
package org.apache.cassandra.cql;

import org.apache.cassandra.cache.CachingOptions;
import org.apache.cassandra.config.*;
import org.apache.cassandra.db.marshal.TypeParser;
import org.apache.cassandra.exceptions.*;
import org.apache.cassandra.io.compress.CompressionParameters;

import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.Map;

import org.apache.cassandra.cache.CachingOptions;
import org.apache.cassandra.config.CFMetaData;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.Schema;
import org.apache.cassandra.db.marshal.TypeParser;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.exceptions.RequestValidationException;
import org.apache.cassandra.exceptions.SyntaxException;
import org.apache.cassandra.io.compress.CompressionParameters;

public class AlterTableStatement
{
public static enum OperationType
Expand Down Expand Up @@ -183,7 +188,6 @@ public static void applyPropertiesToCFMetadata(CFMetaData cfm, CFPropDefs cfProp
cfm.caching(CachingOptions.fromString(cfProps.getPropertyString(CFPropDefs.KW_CACHING, cfm.getCaching().toString())));
cfm.defaultTimeToLive(cfProps.getPropertyInt(CFPropDefs.KW_DEFAULT_TIME_TO_LIVE, cfm.getDefaultTimeToLive()));
cfm.speculativeRetry(CFMetaData.SpeculativeRetry.fromString(cfProps.getPropertyString(CFPropDefs.KW_SPECULATIVE_RETRY, cfm.getSpeculativeRetry().toString())));
cfm.populateIoCacheOnFlush(cfProps.getPropertyBoolean(CFPropDefs.KW_POPULATE_IO_CACHE_ON_FLUSH, cfm.populateIoCacheOnFlush()));
cfm.bloomFilterFpChance(cfProps.getPropertyDouble(CFPropDefs.KW_BF_FP_CHANCE, cfm.getBloomFilterFpChance()));
cfm.memtableFlushPeriod(cfProps.getPropertyInt(CFPropDefs.KW_MEMTABLE_FLUSH_PERIOD, cfm.getMemtableFlushPeriod()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,8 +201,7 @@ public CFMetaData getCFMetaData(String keyspace, List<ByteBuffer> variables) thr
.speculativeRetry(CFMetaData.SpeculativeRetry.fromString(getPropertyString(CFPropDefs.KW_SPECULATIVE_RETRY, CFMetaData.DEFAULT_SPECULATIVE_RETRY.toString())))
.bloomFilterFpChance(getPropertyDouble(CFPropDefs.KW_BF_FP_CHANCE, null))
.memtableFlushPeriod(getPropertyInt(CFPropDefs.KW_MEMTABLE_FLUSH_PERIOD, 0))
.defaultTimeToLive(getPropertyInt(CFPropDefs.KW_DEFAULT_TIME_TO_LIVE, CFMetaData.DEFAULT_DEFAULT_TIME_TO_LIVE))
.populateIoCacheOnFlush(getPropertyBoolean(CFPropDefs.KW_POPULATE_IO_CACHE_ON_FLUSH, CFMetaData.DEFAULT_POPULATE_IO_CACHE_ON_FLUSH));
.defaultTimeToLive(getPropertyInt(CFPropDefs.KW_DEFAULT_TIME_TO_LIVE, CFMetaData.DEFAULT_DEFAULT_TIME_TO_LIVE));

// CQL2 can have null keyAliases
if (keyAlias != null)
Expand Down
Loading

0 comments on commit 4e95953

Please sign in to comment.