From f835fb5c6f7037cb5b571f836a89d5f537210eec Mon Sep 17 00:00:00 2001 From: Branimir Lambov Date: Wed, 27 Oct 2021 17:24:07 +0300 Subject: [PATCH] Extracts a basic common interface between memtables and sstables as used by read commands --- .../db/PartitionRangeReadCommand.java | 12 ++-- .../db/SinglePartitionReadCommand.java | 24 +++---- .../db/compaction/CompactionController.java | 5 +- .../db/memtable/AbstractMemtable.java | 14 +--- .../cassandra/db/memtable/Memtable.java | 34 +-------- .../db/memtable/SkipListMemtable.java | 36 ++++++---- .../cassandra/db/rows/UnfilteredSource.java | 69 +++++++++++++++++++ .../io/sstable/format/SSTableReader.java | 18 +---- .../io/sstable/format/big/BigTableReader.java | 2 +- .../sstable/format/big/BigTableScanner.java | 5 ++ .../format/ForwardingSSTableReader.java | 5 +- .../db/compaction/TTLExpiryTest.java | 7 +- .../io/sstable/SSTableScannerTest.java | 7 +- 13 files changed, 132 insertions(+), 106 deletions(-) create mode 100644 src/java/org/apache/cassandra/db/rows/UnfilteredSource.java diff --git a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java index 7a67f9617e29..f15f58e29b93 100644 --- a/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java +++ b/src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java @@ -279,23 +279,19 @@ public UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, Rea InputCollector inputCollector = iteratorsForRange(view, controller); try { - // avoid iterating over the memtable if we purge all tombstones - boolean useMinLocalDeletionTime = cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones(); - + SSTableReadsListener readCountUpdater = newReadCountUpdater(); for (Memtable memtable : view.memtables) { @SuppressWarnings("resource") // We close on exception and on closing the result returned by this method - Memtable.MemtableUnfilteredPartitionIterator iter = memtable.makePartitionIterator(columnFilter(), dataRange()); - if (useMinLocalDeletionTime) - controller.updateMinOldestUnrepairedTombstone(iter.getMinLocalDeletionTime()); + UnfilteredPartitionIterator iter = memtable.partitionIterator(columnFilter(), dataRange(), readCountUpdater); + controller.updateMinOldestUnrepairedTombstone(memtable.getMinLocalDeletionTime()); inputCollector.addMemtableIterator(RTBoundValidator.validate(iter, RTBoundValidator.Stage.MEMTABLE, false)); } - SSTableReadsListener readCountUpdater = newReadCountUpdater(); for (SSTableReader sstable : view.sstables) { @SuppressWarnings("resource") // We close on exception and on closing the result returned by this method - UnfilteredPartitionIterator iter = sstable.getScanner(columnFilter(), dataRange(), readCountUpdater); + UnfilteredPartitionIterator iter = sstable.partitionIterator(columnFilter(), dataRange(), readCountUpdater); inputCollector.addSSTableIterator(sstable, RTBoundValidator.validate(iter, RTBoundValidator.Stage.SSTABLE, false)); if (!sstable.isRepaired()) diff --git a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java index 2c02c52cf511..0b93b300b3c4 100644 --- a/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java +++ b/src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java @@ -615,19 +615,19 @@ private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs InputCollector inputCollector = iteratorsForPartition(view, controller); try { + SSTableReadMetricsCollector metricsCollector = new SSTableReadMetricsCollector(); + for (Memtable memtable : view.memtables) { - Partition partition = memtable.getPartition(partitionKey()); - if (partition == null) + @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator + UnfilteredRowIterator iter = memtable.iterator(partitionKey(), filter.getSlices(metadata()), columnFilter(), filter.isReversed(), metricsCollector); + if (iter == null) continue; minTimestamp = Math.min(minTimestamp, memtable.getMinTimestamp()); - @SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator - UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition); - // Memtable data is always considered unrepaired - controller.updateMinOldestUnrepairedTombstone(partition.stats().minLocalDeletionTime); + controller.updateMinOldestUnrepairedTombstone(memtable.getMinLocalDeletionTime()); inputCollector.addMemtableIterator(RTBoundValidator.validate(iter, RTBoundValidator.Stage.MEMTABLE, false)); mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone, @@ -650,8 +650,6 @@ private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs int nonIntersectingSSTables = 0; int includedDueToTombstones = 0; - SSTableReadMetricsCollector metricsCollector = new SSTableReadMetricsCollector(); - if (controller.isTrackingRepairedStatus()) Tracing.trace("Collecting data from sstables and tracking repaired status"); @@ -814,17 +812,14 @@ private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFam ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey())); ImmutableBTreePartition result = null; + SSTableReadMetricsCollector metricsCollector = new SSTableReadMetricsCollector(); Tracing.trace("Merging memtable contents"); for (Memtable memtable : view.memtables) { - Partition partition = memtable.getPartition(partitionKey()); - if (partition == null) - continue; - - try (UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition)) + try (UnfilteredRowIterator iter = memtable.iterator(partitionKey, filter.getSlices(metadata()), columnFilter(), isReversed(), metricsCollector)) { - if (iter.isEmpty()) + if (iter == null) continue; result = add(RTBoundValidator.validate(iter, RTBoundValidator.Stage.MEMTABLE, false), @@ -838,7 +833,6 @@ private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFam /* add the SSTables on disk */ view.sstables.sort(SSTableReader.maxTimestampDescending); // read sorted sstables - SSTableReadMetricsCollector metricsCollector = new SSTableReadMetricsCollector(); for (SSTableReader sstable : view.sstables) { // if we've already seen a partition tombstone with a timestamp greater diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionController.java b/src/java/org/apache/cassandra/db/compaction/CompactionController.java index e4c947a508ba..5c272069f773 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionController.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionController.java @@ -268,10 +268,9 @@ public LongPredicate getPurgeEvaluator(DecoratedKey key) for (Memtable memtable : memtables) { - Partition partition = memtable.getPartition(key); - if (partition != null) + if (memtable.iterator(key) != null) { - minTimestampSeen = Math.min(minTimestampSeen, partition.stats().minTimestamp); + minTimestampSeen = Math.min(minTimestampSeen, memtable.getMinTimestamp()); hasTimestamp = true; } } diff --git a/src/java/org/apache/cassandra/db/memtable/AbstractMemtable.java b/src/java/org/apache/cassandra/db/memtable/AbstractMemtable.java index 5b5a1fd51112..15c2b8433b72 100644 --- a/src/java/org/apache/cassandra/db/memtable/AbstractMemtable.java +++ b/src/java/org/apache/cassandra/db/memtable/AbstractMemtable.java @@ -40,7 +40,6 @@ public abstract class AbstractMemtable implements Memtable protected final ColumnsCollector columnsCollector; protected final StatsCollector statsCollector = new StatsCollector(); // The smallest timestamp for all partitions stored in this memtable - protected AtomicLong minTimestamp = new AtomicLong(Long.MAX_VALUE); private final AtomicReference flushTransaction = new AtomicReference<>(null); protected TableMetadataRef metadata; @@ -62,19 +61,12 @@ public long getOperations() public long getMinTimestamp() { - return minTimestamp.get(); + return statsCollector.get().minTimestamp; } - protected void updateMin(AtomicLong minTracker, long newValue) + public int getMinLocalDeletionTime() { - while (true) - { - long memtableMinTimestamp = minTracker.get(); - if (memtableMinTimestamp <= newValue) - break; - if (minTracker.compareAndSet(memtableMinTimestamp, newValue)) - break; - } + return statsCollector.get().minLocalDeletionTime; } RegularAndStaticColumns columns() diff --git a/src/java/org/apache/cassandra/db/memtable/Memtable.java b/src/java/org/apache/cassandra/db/memtable/Memtable.java index 31758d9ad6a7..17be49956e97 100644 --- a/src/java/org/apache/cassandra/db/memtable/Memtable.java +++ b/src/java/org/apache/cassandra/db/memtable/Memtable.java @@ -23,16 +23,13 @@ import com.google.common.util.concurrent.ListenableFuture; import org.apache.cassandra.db.ColumnFamilyStore; -import org.apache.cassandra.db.DataRange; -import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.PartitionPosition; import org.apache.cassandra.db.RegularAndStaticColumns; import org.apache.cassandra.db.commitlog.CommitLogPosition; -import org.apache.cassandra.db.filter.ColumnFilter; import org.apache.cassandra.db.partitions.Partition; import org.apache.cassandra.db.partitions.PartitionUpdate; -import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.db.rows.EncodingStats; +import org.apache.cassandra.db.rows.UnfilteredSource; import org.apache.cassandra.index.transactions.UpdateTransaction; import org.apache.cassandra.io.sstable.format.SSTableWriter; import org.apache.cassandra.metrics.TableMetrics; @@ -53,7 +50,7 @@ * - lifecycle management, i.e. operations that prepare and execute switch to a different memtable, together * with ways of tracking the affected commit log spans */ -public interface Memtable extends Comparable +public interface Memtable extends Comparable, UnfilteredSource { // Construction @@ -189,29 +186,7 @@ interface Owner */ long put(PartitionUpdate update, UpdateTransaction indexer, OpOrder.Group opGroup); - /** - * Get the partition for the specified key. Returns null if no such partition is present. - */ - Partition getPartition(DecoratedKey key); - - /** - * Returns a partition iterator for the given data range. - * - * @param columnFilter filter to apply to all returned partitions - * @param dataRange the partition and clustering range queried - */ - MemtableUnfilteredPartitionIterator makePartitionIterator(ColumnFilter columnFilter, - DataRange dataRange); - - interface MemtableUnfilteredPartitionIterator extends UnfilteredPartitionIterator - { - /** - * Returns the minimum local deletion time for all partitions in the range. - * Required for the efficiency of partition range read commands. - */ - int getMinLocalDeletionTime(); - } - + // Read operations are provided by the UnfilteredSource interface. // Statistics @@ -227,9 +202,6 @@ interface MemtableUnfilteredPartitionIterator extends UnfilteredPartitionIterato */ long getOperations(); - /** Minimum timestamp of all stored data */ - long getMinTimestamp(); - /** * The table's definition metadata. * diff --git a/src/java/org/apache/cassandra/db/memtable/SkipListMemtable.java b/src/java/org/apache/cassandra/db/memtable/SkipListMemtable.java index d063420b2ddf..b675e69e0f33 100644 --- a/src/java/org/apache/cassandra/db/memtable/SkipListMemtable.java +++ b/src/java/org/apache/cassandra/db/memtable/SkipListMemtable.java @@ -35,6 +35,7 @@ import org.apache.cassandra.db.DataRange; import org.apache.cassandra.db.DecoratedKey; import org.apache.cassandra.db.PartitionPosition; +import org.apache.cassandra.db.Slices; import org.apache.cassandra.db.commitlog.CommitLogPosition; import org.apache.cassandra.db.filter.ClusteringIndexFilter; import org.apache.cassandra.db.filter.ColumnFilter; @@ -43,6 +44,7 @@ import org.apache.cassandra.db.partitions.AtomicBTreePartition; import org.apache.cassandra.db.partitions.Partition; import org.apache.cassandra.db.partitions.PartitionUpdate; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.dht.AbstractBounds; import org.apache.cassandra.dht.Bounds; @@ -50,6 +52,7 @@ import org.apache.cassandra.dht.Murmur3Partitioner.LongToken; import org.apache.cassandra.dht.Range; import org.apache.cassandra.index.transactions.UpdateTransaction; +import org.apache.cassandra.io.sstable.format.SSTableReadsListener; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.schema.TableMetadataRef; import org.apache.cassandra.utils.ByteBufferUtil; @@ -164,7 +167,6 @@ public long put(PartitionUpdate update, UpdateTransaction indexer, OpOrder.Group } long[] pair = previous.addAllWithSizeDelta(update, opGroup, indexer); - updateMin(minTimestamp, previous.stats().minTimestamp); liveDataSize.addAndGet(initialSize + pair[0]); columnsCollector.update(update.columns()); statsCollector.update(update.stats()); @@ -177,8 +179,9 @@ public long partitionCount() return partitions.size(); } - public MemtableUnfilteredPartitionIterator makePartitionIterator(final ColumnFilter columnFilter, - final DataRange dataRange) + public MemtableUnfilteredPartitionIterator partitionIterator(final ColumnFilter columnFilter, + final DataRange dataRange, + SSTableReadsListener readsListener) { AbstractBounds keyRange = dataRange.keyRange(); @@ -194,6 +197,7 @@ public MemtableUnfilteredPartitionIterator makePartitionIterator(final ColumnFil includeRight); return new MemtableUnfilteredPartitionIterator(metadata.get(), subMap, columnFilter, dataRange); + // readsListener is ignored as it only accepts sstable signals } private Map getPartitionsSubMap(PartitionPosition left, @@ -227,6 +231,21 @@ public Partition getPartition(DecoratedKey key) return partitions.get(key); } + public UnfilteredRowIterator iterator(DecoratedKey key, Slices slices, ColumnFilter selectedColumns, boolean reversed, SSTableReadsListener listener) + { + Partition p = getPartition(key); + if (p == null) + return null; + else + return p.unfilteredIterator(selectedColumns, slices, reversed); + } + + public UnfilteredRowIterator iterator(DecoratedKey key) + { + Partition p = getPartition(key); + return p != null ? p.unfilteredIterator() : null; + } + private static int estimateRowOverhead(final int count) { // calculate row overhead @@ -315,7 +334,7 @@ public long partitionKeySize() } - public static class MemtableUnfilteredPartitionIterator extends AbstractUnfilteredPartitionIterator implements Memtable.MemtableUnfilteredPartitionIterator + public static class MemtableUnfilteredPartitionIterator extends AbstractUnfilteredPartitionIterator implements UnfilteredPartitionIterator { private final TableMetadata metadata; private final Iterator> iter; @@ -332,15 +351,6 @@ public MemtableUnfilteredPartitionIterator(TableMetadata metadata, Map +public abstract class SSTableReader extends SSTable implements UnfilteredSource, SelfRefCounted { private static final Logger logger = LoggerFactory.getLogger(SSTableReader.class); @@ -1420,12 +1420,6 @@ protected abstract RowIndexEntry getPosition(PartitionPosition key, boolean permitMatchPastLast, SSTableReadsListener listener); - public abstract UnfilteredRowIterator iterator(DecoratedKey key, - Slices slices, - ColumnFilter selectedColumns, - boolean reversed, - SSTableReadsListener listener); - public abstract UnfilteredRowIterator iterator(FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry, Slices slices, ColumnFilter selectedColumns, boolean reversed); public abstract UnfilteredRowIterator simpleIterator(FileDataInput file, DecoratedKey key, RowIndexEntry indexEntry, boolean tombstoneOnly); @@ -1587,14 +1581,6 @@ public ISSTableScanner getScanner(Range range) */ public abstract ISSTableScanner getScanner(Iterator> rangeIterator); - /** - * @param columns the columns to return. - * @param dataRange filter to use when reading the columns - * @param listener a listener used to handle internal read events - * @return A Scanner for seeking over the rows of the SSTable. - */ - public abstract ISSTableScanner getScanner(ColumnFilter columns, DataRange dataRange, SSTableReadsListener listener); - public FileDataInput getFileDataInput(long position) { return dfile.createReader(position); diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java index dc13031eb859..03cd49868975 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableReader.java @@ -78,7 +78,7 @@ public UnfilteredRowIterator iterator(FileDataInput file, DecoratedKey key, RowI } @Override - public ISSTableScanner getScanner(ColumnFilter columns, DataRange dataRange, SSTableReadsListener listener) + public ISSTableScanner partitionIterator(ColumnFilter columns, DataRange dataRange, SSTableReadsListener listener) { return BigTableScanner.getScanner(this, columns, dataRange, listener); } diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java index 20105cd1e14c..86816e5d4028 100644 --- a/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java +++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigTableScanner.java @@ -441,5 +441,10 @@ public UnfilteredRowIterator next() { return null; } + + public int getMinLocalDeletionTime() + { + return DeletionTime.LIVE.localDeletionTime(); + } } } diff --git a/test/distributed/org/apache/cassandra/io/sstable/format/ForwardingSSTableReader.java b/test/distributed/org/apache/cassandra/io/sstable/format/ForwardingSSTableReader.java index 9a76661d37dd..fa1308beda57 100644 --- a/test/distributed/org/apache/cassandra/io/sstable/format/ForwardingSSTableReader.java +++ b/test/distributed/org/apache/cassandra/io/sstable/format/ForwardingSSTableReader.java @@ -36,6 +36,7 @@ import org.apache.cassandra.db.RowIndexEntry; import org.apache.cassandra.db.Slices; import org.apache.cassandra.db.filter.ColumnFilter; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.db.rows.EncodingStats; import org.apache.cassandra.db.rows.UnfilteredRowIterator; import org.apache.cassandra.dht.AbstractBounds; @@ -385,9 +386,9 @@ public ISSTableScanner getScanner(Iterator> ra } @Override - public ISSTableScanner getScanner(ColumnFilter columns, DataRange dataRange, SSTableReadsListener listener) + public UnfilteredPartitionIterator partitionIterator(ColumnFilter columns, DataRange dataRange, SSTableReadsListener listener) { - return delegate.getScanner(columns, dataRange, listener); + return delegate.partitionIterator(columns, dataRange, listener); } @Override diff --git a/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java b/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java index 0088dc7881a8..66ea61e58201 100644 --- a/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/TTLExpiryTest.java @@ -29,6 +29,7 @@ import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.db.*; import org.apache.cassandra.db.filter.ColumnFilter; @@ -241,9 +242,9 @@ public void testNoExpire() throws InterruptedException, IOException cfs.enableAutoCompaction(true); assertEquals(1, cfs.getLiveSSTables().size()); SSTableReader sstable = cfs.getLiveSSTables().iterator().next(); - ISSTableScanner scanner = sstable.getScanner(ColumnFilter.all(cfs.metadata()), - DataRange.allData(cfs.getPartitioner()), - SSTableReadsListener.NOOP_LISTENER); + UnfilteredPartitionIterator scanner = sstable.partitionIterator(ColumnFilter.all(cfs.metadata()), + DataRange.allData(cfs.getPartitioner()), + SSTableReadsListener.NOOP_LISTENER); assertTrue(scanner.hasNext()); while(scanner.hasNext()) { diff --git a/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java b/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java index 6d94f778f460..8a45dbb51210 100644 --- a/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java +++ b/test/unit/org/apache/cassandra/io/sstable/SSTableScannerTest.java @@ -30,6 +30,7 @@ import org.apache.cassandra.SchemaLoader; import org.apache.cassandra.Util; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.DataRange; @@ -182,9 +183,9 @@ private static void assertScanMatches(SSTableReader sstable, int scanStart, int assert boundaries.length % 2 == 0; for (DataRange range : dataRanges(sstable.metadata(), scanStart, scanEnd)) { - try(ISSTableScanner scanner = sstable.getScanner(ColumnFilter.all(sstable.metadata()), - range, - SSTableReadsListener.NOOP_LISTENER)) + try(UnfilteredPartitionIterator scanner = sstable.partitionIterator(ColumnFilter.all(sstable.metadata()), + range, + SSTableReadsListener.NOOP_LISTENER)) { for (int b = 0; b < boundaries.length; b += 2) for (int i = boundaries[b]; i <= boundaries[b + 1]; i++)