Permalink
Browse files

merge with 1.2

  • Loading branch information...
2 parents 207cdf3 + 27ed655 commit 480a1a8f5acfd6e93c789f43305d9506d41da3e3 @xedin xedin committed Mar 21, 2013
Showing with 207 additions and 97 deletions.
  1. +3 −0 CHANGES.txt
  2. +1 −0 conf/cassandra-env.sh
  3. +6 −0 conf/cassandra.yaml
  4. +3 −0 src/java/org/apache/cassandra/config/Config.java
  5. +5 −0 src/java/org/apache/cassandra/config/DatabaseDescriptor.java
  6. +1 −1 src/java/org/apache/cassandra/db/commitlog/CommitLogReplayer.java
  7. +1 −1 src/java/org/apache/cassandra/db/compaction/AbstractCompactionStrategy.java
  8. +1 −1 src/java/org/apache/cassandra/db/compaction/CompactionManager.java
  9. +1 −4 src/java/org/apache/cassandra/db/compaction/CompactionTask.java
  10. +3 −3 src/java/org/apache/cassandra/db/compaction/LeveledCompactionStrategy.java
  11. +2 −2 src/java/org/apache/cassandra/db/compaction/Scrubber.java
  12. +5 −5 src/java/org/apache/cassandra/io/compress/CompressedRandomAccessReader.java
  13. +1 −1 src/java/org/apache/cassandra/io/sstable/KeyIterator.java
  14. +2 −2 src/java/org/apache/cassandra/io/sstable/SSTableBoundedScanner.java
  15. +122 −12 src/java/org/apache/cassandra/io/sstable/SSTableReader.java
  16. +5 −5 src/java/org/apache/cassandra/io/sstable/SSTableScanner.java
  17. +9 −45 src/java/org/apache/cassandra/io/util/RandomAccessReader.java
  18. +1 −1 src/java/org/apache/cassandra/streaming/FileStreamTask.java
  19. +1 −1 src/java/org/apache/cassandra/streaming/compress/CompressedFileStreamTask.java
  20. +2 −2 src/java/org/apache/cassandra/tools/SSTableExport.java
  21. +21 −0 src/java/org/apache/cassandra/utils/CLibrary.java
  22. +1 −1 test/unit/org/apache/cassandra/db/KeyCacheTest.java
  23. +4 −4 test/unit/org/apache/cassandra/io/compress/CompressedRandomAccessReaderTest.java
  24. +2 −2 test/unit/org/apache/cassandra/io/sstable/SSTableTest.java
  25. +2 −2 test/unit/org/apache/cassandra/io/sstable/SSTableUtils.java
  26. +2 −2 test/unit/org/apache/cassandra/io/util/BufferedRandomAccessFileTest.java
View
@@ -28,6 +28,8 @@
* Improve asynchronous hint delivery (CASSANDRA-5179)
* Fix Guava dependency version (12.0 -> 13.0.1) for Maven (CASSANDRA-5364)
* Validate that provided CQL3 collection value are < 64K (CASSANDRA-5355)
+ * Change Kernel Page Cache skipping into row preheating (disabled by default)
+ (CASSANDRA-4937)
@@ -63,6 +65,7 @@ Merged from 1.1:
* Fix possible assertion triggered in SliceFromReadCommand (CASSANDRA-5284)
* cqlsh: Add inet type support on Windows (ipv4-only) (CASSANDRA-4801)
* Fix race when initializing ColumnFamilyStore (CASSANDRA-5350)
+ * Add UseTLAB JVM flag (CASSANDRA-5361)
1.2.2
View
@@ -202,6 +202,7 @@ JVM_OPTS="$JVM_OPTS -XX:SurvivorRatio=8"
JVM_OPTS="$JVM_OPTS -XX:MaxTenuringThreshold=1"
JVM_OPTS="$JVM_OPTS -XX:CMSInitiatingOccupancyFraction=75"
JVM_OPTS="$JVM_OPTS -XX:+UseCMSInitiatingOccupancyOnly"
+JVM_OPTS="$JVM_OPTS -XX:+UseTLAB"
# note: bash evals '1.7.x' as > '1.7' so this is really a >= 1.7 jvm check
if [ "$JVM_VERSION" \> "1.7" ] ; then
JVM_OPTS="$JVM_OPTS -XX:+UseCondCardMark"
View
@@ -660,3 +660,9 @@ internode_compression: all
# reducing overhead from the TCP protocol itself, at the cost of increasing
# latency if you block for cross-datacenter responses.
inter_dc_tcp_nodelay: false
+
+# Enable or disable kernel page cache preheating from contents of the key cache after compaction.
+# When enabled it would preheat only first "page" (4KB) of each row to optimize
+# for sequential access. Note: This could be harmful for fat rows, see CASSANDRA-4937
+# for further details on that topic.
+preheat_kernel_page_cache: false
@@ -166,6 +166,9 @@
public boolean inter_dc_tcp_nodelay = false;
private static boolean isClientMode = false;
+
+ public boolean preheat_kernel_page_cache = false;
+
private static boolean outboundBindAny = false;
public static boolean getOutboundBindAny()
@@ -1224,4 +1224,9 @@ public static boolean getInterDCTcpNoDelay()
{
return conf.inter_dc_tcp_nodelay;
}
+
+ public static boolean shouldPreheatPageCache()
+ {
+ return conf.preheat_kernel_page_cache;
+ }
}
@@ -120,7 +120,7 @@ public void recover(File file) throws IOException
CommitLogDescriptor desc = CommitLogDescriptor.fromFileName(file.getName());
final long segment = desc.id;
int version = desc.getMessagingVersion();
- RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath()), true);
+ RandomAccessReader reader = RandomAccessReader.open(new File(file.getAbsolutePath()));
try
{
assert reader.length() <= Integer.MAX_VALUE;
@@ -184,7 +184,7 @@ public void shutdown()
{
ArrayList<ICompactionScanner> scanners = new ArrayList<ICompactionScanner>();
for (SSTableReader sstable : sstables)
- scanners.add(sstable.getDirectScanner(range));
+ scanners.add(sstable.getScanner(range));
return scanners;
}
@@ -466,7 +466,7 @@ private void doCleanupCompaction(ColumnFamilyStore cfs, Collection<SSTableReader
if (compactionFileLocation == null)
throw new IOException("disk full");
- SSTableScanner scanner = sstable.getDirectScanner();
+ SSTableScanner scanner = sstable.getScanner();
long rowsRead = 0;
List<Column> indexedColumnsInRow = null;
@@ -243,10 +243,7 @@ protected void runWith(File sstableDirectory) throws Exception
cfs.replaceCompactedSSTables(toCompact, sstables, compactionType);
// TODO: this doesn't belong here, it should be part of the reader to load when the tracker is wired up
for (SSTableReader sstable : sstables)
- {
- for (Map.Entry<DecoratedKey, RowIndexEntry> entry : cachedKeyMap.get(sstable.descriptor).entrySet())
- sstable.cacheKey(entry.getKey(), entry.getValue());
- }
+ sstable.preheat(cachedKeyMap.get(sstable.descriptor));
if (logger.isInfoEnabled())
{
@@ -166,7 +166,7 @@ public long getMaxSSTableSize()
{
// L0 makes no guarantees about overlapping-ness. Just create a direct scanner for each
for (SSTableReader sstable : byLevel.get(level))
- scanners.add(sstable.getDirectScanner(range));
+ scanners.add(sstable.getScanner(range));
}
else
{
@@ -215,7 +215,7 @@ public LeveledScanner(Collection<SSTableReader> sstables, Range<Token> range)
Collections.sort(this.sstables, SSTable.sstableComparator);
sstableIterator = this.sstables.iterator();
assert sstableIterator.hasNext(); // caller should check intersecting first
- currentScanner = sstableIterator.next().getDirectScanner(range);
+ currentScanner = sstableIterator.next().getScanner(range);
}
public static List<SSTableReader> intersecting(Collection<SSTableReader> sstables, Range<Token> range)
@@ -250,7 +250,7 @@ protected OnDiskAtomIterator computeNext()
currentScanner = null;
return endOfData();
}
- currentScanner = sstableIterator.next().getDirectScanner(range);
+ currentScanner = sstableIterator.next().getScanner(range);
}
}
catch (IOException e)
@@ -94,8 +94,8 @@ public Scrubber(ColumnFamilyStore cfs, SSTableReader sstable, OutputHandler outp
// we'll also loop through the index at the same time, using the position from the index to recover if the
// row header (key or data size) is corrupt. (This means our position in the index file will be one row
// "ahead" of the data file.)
- this.dataFile = sstable.openDataReader(true);
- this.indexFile = RandomAccessReader.open(new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)), true);
+ this.dataFile = sstable.openDataReader();
+ this.indexFile = RandomAccessReader.open(new File(sstable.descriptor.filenameFor(Component.PRIMARY_INDEX)));
this.scrubInfo = new ScrubInfo(dataFile, sstable);
}
@@ -42,19 +42,19 @@ public static CompressedRandomAccessReader open(String path, CompressionMetadata
{
try
{
- return new CompressedRandomAccessReader(path, metadata, false, owner);
+ return new CompressedRandomAccessReader(path, metadata, owner);
}
catch (FileNotFoundException e)
{
throw new RuntimeException(e);
}
}
- public static CompressedRandomAccessReader open(String dataFilePath, CompressionMetadata metadata, boolean skipIOCache)
+ public static CompressedRandomAccessReader open(String dataFilePath, CompressionMetadata metadata)
{
try
{
- return new CompressedRandomAccessReader(dataFilePath, metadata, skipIOCache, null);
+ return new CompressedRandomAccessReader(dataFilePath, metadata, null);
}
catch (FileNotFoundException e)
{
@@ -73,9 +73,9 @@ public static CompressedRandomAccessReader open(String dataFilePath, Compression
// raw checksum bytes
private final ByteBuffer checksumBytes = ByteBuffer.wrap(new byte[4]);
- private CompressedRandomAccessReader(String dataFilePath, CompressionMetadata metadata, boolean skipIOCache, PoolingSegmentedFile owner) throws FileNotFoundException
+ private CompressedRandomAccessReader(String dataFilePath, CompressionMetadata metadata, PoolingSegmentedFile owner) throws FileNotFoundException
{
- super(new File(dataFilePath), metadata.chunkLength(), skipIOCache, owner);
+ super(new File(dataFilePath), metadata.chunkLength(), owner);
this.metadata = metadata;
compressed = ByteBuffer.wrap(new byte[metadata.compressor().initialCompressedBufferLength(metadata.chunkLength())]);
}
@@ -38,7 +38,7 @@ public KeyIterator(Descriptor desc)
{
this.desc = desc;
File path = new File(desc.filenameFor(SSTable.COMPONENT_INDEX));
- in = RandomAccessReader.open(path, true);
+ in = RandomAccessReader.open(path);
}
protected DecoratedKey computeNext()
@@ -35,9 +35,9 @@
private final Iterator<Pair<Long, Long>> rangeIterator;
private Pair<Long, Long> currentRange;
- SSTableBoundedScanner(SSTableReader sstable, boolean skipCache, Range<Token> range)
+ SSTableBoundedScanner(SSTableReader sstable, Range<Token> range)
{
- super(sstable, skipCache);
+ super(sstable);
this.rangeIterator = sstable.getPositionsForRanges(Collections.singletonList(range)).iterator();
currentRange = rangeIterator.next();
dfile.seek(currentRange.left);
@@ -352,7 +352,7 @@ private void load(boolean recreatebloom) throws IOException
: SegmentedFile.getBuilder(DatabaseDescriptor.getDiskAccessMode());
// we read the positions in a BRAF so we don't have to worry about an entry spanning a mmap boundary.
- RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)), true);
+ RandomAccessReader primaryIndex = RandomAccessReader.open(new File(descriptor.filenameFor(Component.PRIMARY_INDEX)));
// try to load summaries from the disk and check if we need
// to read primary index because we should re-create a BloomFilter or pre-load KeyCache
@@ -704,6 +704,29 @@ public void cacheKey(DecoratedKey key, RowIndexEntry info)
keyCache.put(cacheKey, info);
}
+ public void preheat(Map<DecoratedKey, RowIndexEntry> cachedKeys) throws IOException
+ {
+ RandomAccessFile f = new RandomAccessFile(getFilename(), "r");
+
+ try
+ {
+ int fd = CLibrary.getfd(f.getFD());
+
+ for (Map.Entry<DecoratedKey, RowIndexEntry> entry : cachedKeys.entrySet())
+ {
+ cacheKey(entry.getKey(), entry.getValue());
+
+ // add to the cache but don't do actual preheating if we have it disabled in the config
+ if (DatabaseDescriptor.shouldPreheatPageCache() && fd > 0)
+ CLibrary.preheatPage(fd, entry.getValue().position);
+ }
+ }
+ finally
+ {
+ FileUtils.closeQuietly(f);
+ }
+ }
+
public RowIndexEntry getCachedPosition(DecoratedKey key, boolean updateStats)
{
return getCachedPosition(new KeyCacheKey(descriptor, key.key), updateStats);
@@ -903,6 +926,15 @@ public void releaseReference()
{
if (references.decrementAndGet() == 0 && isCompacted.get())
{
+ /**
+ * Make OS a favour and suggest (using fadvice call) that we
+ * don't want to see pages of this SSTable in memory anymore.
+ *
+ * NOTE: We can't use madvice in java because it requires address of
+ * the mapping, so instead we always open a file and run fadvice(fd, 0, 0) on it
+ */
+ dropPageCache();
+
// Force finalizing mmapping if necessary
ifile.cleanup();
dfile.cleanup();
@@ -954,12 +986,12 @@ public SSTableScanner getScanner(QueryFilter filter)
}
/**
- * Direct I/O SSTableScanner
+ * I/O SSTableScanner
* @return A Scanner for seeking over the rows of the SSTable.
*/
- public SSTableScanner getDirectScanner()
+ public SSTableScanner getScanner()
{
- return new SSTableScanner(this, true);
+ return new SSTableScanner(this);
}
/**
@@ -968,12 +1000,12 @@ public SSTableScanner getDirectScanner()
* @param range the range of keys to cover
* @return A Scanner for seeking over the rows of the SSTable.
*/
- public ICompactionScanner getDirectScanner(Range<Token> range)
+ public ICompactionScanner getScanner(Range<Token> range)
{
if (range == null)
- return getDirectScanner();
+ return getScanner();
- return new SSTableBoundedScanner(this, true, range);
+ return new SSTableBoundedScanner(this, range);
}
public FileDataInput getFileDataInput(long position)
@@ -1151,16 +1183,16 @@ public SSTableMetadata getSSTableMetadata()
return sstableMetadata;
}
- public RandomAccessReader openDataReader(boolean skipIOCache)
+ public RandomAccessReader openDataReader()
{
return compression
- ? CompressedRandomAccessReader.open(getFilename(), getCompressionMetadata(), skipIOCache)
- : RandomAccessReader.open(new File(getFilename()), skipIOCache);
+ ? CompressedRandomAccessReader.open(getFilename(), getCompressionMetadata())
+ : RandomAccessReader.open(new File(getFilename()));
}
- public RandomAccessReader openIndexReader(boolean skipIOCache)
+ public RandomAccessReader openIndexReader()
{
- return RandomAccessReader.open(new File(getIndexFilename()), skipIOCache);
+ return RandomAccessReader.open(new File(getIndexFilename()));
}
/**
@@ -1207,4 +1239,82 @@ public static void releaseReferences(Iterable<SSTableReader> sstables)
sstable.releaseReference();
}
}
+
+ private static class EmptyCompactionScanner implements ICompactionScanner
+ {
+ private final String filename;
+
+ private EmptyCompactionScanner(String filename)
+ {
+ this.filename = filename;
+ }
+
+ public long getLengthInBytes()
+ {
+ return 0;
+ }
+
+ public long getCurrentPosition()
+ {
+ return 0;
+ }
+
+ public String getBackingFiles()
+ {
+ return filename;
+ }
+
+ public void close()
+ {
+ }
+
+ public boolean hasNext()
+ {
+ return false;
+ }
+
+ public OnDiskAtomIterator next()
+ {
+ throw new IndexOutOfBoundsException();
+ }
+
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private void dropPageCache()
+ {
+ dropPageCache(dfile.path);
+ dropPageCache(ifile.path);
+ }
+
+ private void dropPageCache(String filePath)
+ {
+ RandomAccessFile file = null;
+
+ try
+ {
+ file = new RandomAccessFile(filePath, "r");
+
+ int fd = CLibrary.getfd(file.getFD());
+
+ if (fd > 0)
+ {
+ if (logger.isDebugEnabled())
+ logger.debug(String.format("Dropping page cache of file %s.", filePath));
+
+ CLibrary.trySkipCache(fd, 0, 0);
+ }
+ }
+ catch (IOException e)
+ {
+ // we don't care if cache cleanup fails
+ }
+ finally
+ {
+ FileUtils.closeQuietly(file);
+ }
+ }
}
Oops, something went wrong.

0 comments on commit 480a1a8

Please sign in to comment.