Skip to content

Commit

Permalink
Extracts a basic common interface between memtables and sstables as u…
Browse files Browse the repository at this point in the history
…sed by read commands
  • Loading branch information
blambov committed Oct 27, 2021
1 parent 8b7742b commit f835fb5
Show file tree
Hide file tree
Showing 13 changed files with 132 additions and 106 deletions.
12 changes: 4 additions & 8 deletions src/java/org/apache/cassandra/db/PartitionRangeReadCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -279,23 +279,19 @@ public UnfilteredPartitionIterator queryStorage(final ColumnFamilyStore cfs, Rea
InputCollector<UnfilteredPartitionIterator> inputCollector = iteratorsForRange(view, controller);
try
{
// avoid iterating over the memtable if we purge all tombstones
boolean useMinLocalDeletionTime = cfs.getCompactionStrategyManager().onlyPurgeRepairedTombstones();

SSTableReadsListener readCountUpdater = newReadCountUpdater();
for (Memtable memtable : view.memtables)
{
@SuppressWarnings("resource") // We close on exception and on closing the result returned by this method
Memtable.MemtableUnfilteredPartitionIterator iter = memtable.makePartitionIterator(columnFilter(), dataRange());
if (useMinLocalDeletionTime)
controller.updateMinOldestUnrepairedTombstone(iter.getMinLocalDeletionTime());
UnfilteredPartitionIterator iter = memtable.partitionIterator(columnFilter(), dataRange(), readCountUpdater);
controller.updateMinOldestUnrepairedTombstone(memtable.getMinLocalDeletionTime());
inputCollector.addMemtableIterator(RTBoundValidator.validate(iter, RTBoundValidator.Stage.MEMTABLE, false));
}

SSTableReadsListener readCountUpdater = newReadCountUpdater();
for (SSTableReader sstable : view.sstables)
{
@SuppressWarnings("resource") // We close on exception and on closing the result returned by this method
UnfilteredPartitionIterator iter = sstable.getScanner(columnFilter(), dataRange(), readCountUpdater);
UnfilteredPartitionIterator iter = sstable.partitionIterator(columnFilter(), dataRange(), readCountUpdater);
inputCollector.addSSTableIterator(sstable, RTBoundValidator.validate(iter, RTBoundValidator.Stage.SSTABLE, false));

if (!sstable.isRepaired())
Expand Down
24 changes: 9 additions & 15 deletions src/java/org/apache/cassandra/db/SinglePartitionReadCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -615,19 +615,19 @@ private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs
InputCollector<UnfilteredRowIterator> inputCollector = iteratorsForPartition(view, controller);
try
{
SSTableReadMetricsCollector metricsCollector = new SSTableReadMetricsCollector();

for (Memtable memtable : view.memtables)
{
Partition partition = memtable.getPartition(partitionKey());
if (partition == null)
@SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
UnfilteredRowIterator iter = memtable.iterator(partitionKey(), filter.getSlices(metadata()), columnFilter(), filter.isReversed(), metricsCollector);
if (iter == null)
continue;

minTimestamp = Math.min(minTimestamp, memtable.getMinTimestamp());

@SuppressWarnings("resource") // 'iter' is added to iterators which is closed on exception, or through the closing of the final merged iterator
UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition);

// Memtable data is always considered unrepaired
controller.updateMinOldestUnrepairedTombstone(partition.stats().minLocalDeletionTime);
controller.updateMinOldestUnrepairedTombstone(memtable.getMinLocalDeletionTime());
inputCollector.addMemtableIterator(RTBoundValidator.validate(iter, RTBoundValidator.Stage.MEMTABLE, false));

mostRecentPartitionTombstone = Math.max(mostRecentPartitionTombstone,
Expand All @@ -650,8 +650,6 @@ private UnfilteredRowIterator queryMemtableAndDiskInternal(ColumnFamilyStore cfs
int nonIntersectingSSTables = 0;
int includedDueToTombstones = 0;

SSTableReadMetricsCollector metricsCollector = new SSTableReadMetricsCollector();

if (controller.isTrackingRepairedStatus())
Tracing.trace("Collecting data from sstables and tracking repaired status");

Expand Down Expand Up @@ -814,17 +812,14 @@ private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFam
ColumnFamilyStore.ViewFragment view = cfs.select(View.select(SSTableSet.LIVE, partitionKey()));

ImmutableBTreePartition result = null;
SSTableReadMetricsCollector metricsCollector = new SSTableReadMetricsCollector();

Tracing.trace("Merging memtable contents");
for (Memtable memtable : view.memtables)
{
Partition partition = memtable.getPartition(partitionKey());
if (partition == null)
continue;

try (UnfilteredRowIterator iter = filter.getUnfilteredRowIterator(columnFilter(), partition))
try (UnfilteredRowIterator iter = memtable.iterator(partitionKey, filter.getSlices(metadata()), columnFilter(), isReversed(), metricsCollector))
{
if (iter.isEmpty())
if (iter == null)
continue;

result = add(RTBoundValidator.validate(iter, RTBoundValidator.Stage.MEMTABLE, false),
Expand All @@ -838,7 +833,6 @@ private UnfilteredRowIterator queryMemtableAndSSTablesInTimestampOrder(ColumnFam
/* add the SSTables on disk */
view.sstables.sort(SSTableReader.maxTimestampDescending);
// read sorted sstables
SSTableReadMetricsCollector metricsCollector = new SSTableReadMetricsCollector();
for (SSTableReader sstable : view.sstables)
{
// if we've already seen a partition tombstone with a timestamp greater
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,10 +268,9 @@ public LongPredicate getPurgeEvaluator(DecoratedKey key)

for (Memtable memtable : memtables)
{
Partition partition = memtable.getPartition(key);
if (partition != null)
if (memtable.iterator(key) != null)
{
minTimestampSeen = Math.min(minTimestampSeen, partition.stats().minTimestamp);
minTimestampSeen = Math.min(minTimestampSeen, memtable.getMinTimestamp());
hasTimestamp = true;
}
}
Expand Down
14 changes: 3 additions & 11 deletions src/java/org/apache/cassandra/db/memtable/AbstractMemtable.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ public abstract class AbstractMemtable implements Memtable
protected final ColumnsCollector columnsCollector;
protected final StatsCollector statsCollector = new StatsCollector();
// The smallest timestamp for all partitions stored in this memtable
protected AtomicLong minTimestamp = new AtomicLong(Long.MAX_VALUE);
private final AtomicReference<LifecycleTransaction> flushTransaction = new AtomicReference<>(null);
protected TableMetadataRef metadata;

Expand All @@ -62,19 +61,12 @@ public long getOperations()

public long getMinTimestamp()
{
return minTimestamp.get();
return statsCollector.get().minTimestamp;
}

protected void updateMin(AtomicLong minTracker, long newValue)
public int getMinLocalDeletionTime()
{
while (true)
{
long memtableMinTimestamp = minTracker.get();
if (memtableMinTimestamp <= newValue)
break;
if (minTracker.compareAndSet(memtableMinTimestamp, newValue))
break;
}
return statsCollector.get().minLocalDeletionTime;
}

RegularAndStaticColumns columns()
Expand Down
34 changes: 3 additions & 31 deletions src/java/org/apache/cassandra/db/memtable/Memtable.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,13 @@
import com.google.common.util.concurrent.ListenableFuture;

import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.DataRange;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.db.RegularAndStaticColumns;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.partitions.Partition;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.rows.EncodingStats;
import org.apache.cassandra.db.rows.UnfilteredSource;
import org.apache.cassandra.index.transactions.UpdateTransaction;
import org.apache.cassandra.io.sstable.format.SSTableWriter;
import org.apache.cassandra.metrics.TableMetrics;
Expand All @@ -53,7 +50,7 @@
* - lifecycle management, i.e. operations that prepare and execute switch to a different memtable, together
* with ways of tracking the affected commit log spans
*/
public interface Memtable extends Comparable<Memtable>
public interface Memtable extends Comparable<Memtable>, UnfilteredSource
{
// Construction

Expand Down Expand Up @@ -189,29 +186,7 @@ interface Owner
*/
long put(PartitionUpdate update, UpdateTransaction indexer, OpOrder.Group opGroup);

/**
* Get the partition for the specified key. Returns null if no such partition is present.
*/
Partition getPartition(DecoratedKey key);

/**
* Returns a partition iterator for the given data range.
*
* @param columnFilter filter to apply to all returned partitions
* @param dataRange the partition and clustering range queried
*/
MemtableUnfilteredPartitionIterator makePartitionIterator(ColumnFilter columnFilter,
DataRange dataRange);

interface MemtableUnfilteredPartitionIterator extends UnfilteredPartitionIterator
{
/**
* Returns the minimum local deletion time for all partitions in the range.
* Required for the efficiency of partition range read commands.
*/
int getMinLocalDeletionTime();
}

// Read operations are provided by the UnfilteredSource interface.

// Statistics

Expand All @@ -227,9 +202,6 @@ interface MemtableUnfilteredPartitionIterator extends UnfilteredPartitionIterato
*/
long getOperations();

/** Minimum timestamp of all stored data */
long getMinTimestamp();

/**
* The table's definition metadata.
*
Expand Down
36 changes: 23 additions & 13 deletions src/java/org/apache/cassandra/db/memtable/SkipListMemtable.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.cassandra.db.DataRange;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.PartitionPosition;
import org.apache.cassandra.db.Slices;
import org.apache.cassandra.db.commitlog.CommitLogPosition;
import org.apache.cassandra.db.filter.ClusteringIndexFilter;
import org.apache.cassandra.db.filter.ColumnFilter;
Expand All @@ -43,13 +44,15 @@
import org.apache.cassandra.db.partitions.AtomicBTreePartition;
import org.apache.cassandra.db.partitions.Partition;
import org.apache.cassandra.db.partitions.PartitionUpdate;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.db.rows.UnfilteredRowIterator;
import org.apache.cassandra.dht.AbstractBounds;
import org.apache.cassandra.dht.Bounds;
import org.apache.cassandra.dht.IncludingExcludingBounds;
import org.apache.cassandra.dht.Murmur3Partitioner.LongToken;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.index.transactions.UpdateTransaction;
import org.apache.cassandra.io.sstable.format.SSTableReadsListener;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.schema.TableMetadataRef;
import org.apache.cassandra.utils.ByteBufferUtil;
Expand Down Expand Up @@ -164,7 +167,6 @@ public long put(PartitionUpdate update, UpdateTransaction indexer, OpOrder.Group
}

long[] pair = previous.addAllWithSizeDelta(update, opGroup, indexer);
updateMin(minTimestamp, previous.stats().minTimestamp);
liveDataSize.addAndGet(initialSize + pair[0]);
columnsCollector.update(update.columns());
statsCollector.update(update.stats());
Expand All @@ -177,8 +179,9 @@ public long partitionCount()
return partitions.size();
}

public MemtableUnfilteredPartitionIterator makePartitionIterator(final ColumnFilter columnFilter,
final DataRange dataRange)
public MemtableUnfilteredPartitionIterator partitionIterator(final ColumnFilter columnFilter,
final DataRange dataRange,
SSTableReadsListener readsListener)
{
AbstractBounds<PartitionPosition> keyRange = dataRange.keyRange();

Expand All @@ -194,6 +197,7 @@ public MemtableUnfilteredPartitionIterator makePartitionIterator(final ColumnFil
includeRight);

return new MemtableUnfilteredPartitionIterator(metadata.get(), subMap, columnFilter, dataRange);
// readsListener is ignored as it only accepts sstable signals
}

private Map<PartitionPosition, AtomicBTreePartition> getPartitionsSubMap(PartitionPosition left,
Expand Down Expand Up @@ -227,6 +231,21 @@ public Partition getPartition(DecoratedKey key)
return partitions.get(key);
}

public UnfilteredRowIterator iterator(DecoratedKey key, Slices slices, ColumnFilter selectedColumns, boolean reversed, SSTableReadsListener listener)
{
Partition p = getPartition(key);
if (p == null)
return null;
else
return p.unfilteredIterator(selectedColumns, slices, reversed);
}

public UnfilteredRowIterator iterator(DecoratedKey key)
{
Partition p = getPartition(key);
return p != null ? p.unfilteredIterator() : null;
}

private static int estimateRowOverhead(final int count)
{
// calculate row overhead
Expand Down Expand Up @@ -315,7 +334,7 @@ public long partitionKeySize()
}


public static class MemtableUnfilteredPartitionIterator extends AbstractUnfilteredPartitionIterator implements Memtable.MemtableUnfilteredPartitionIterator
public static class MemtableUnfilteredPartitionIterator extends AbstractUnfilteredPartitionIterator implements UnfilteredPartitionIterator
{
private final TableMetadata metadata;
private final Iterator<Map.Entry<PartitionPosition, AtomicBTreePartition>> iter;
Expand All @@ -332,15 +351,6 @@ public MemtableUnfilteredPartitionIterator(TableMetadata metadata, Map<Partition
this.dataRange = dataRange;
}

public int getMinLocalDeletionTime()
{
int minLocalDeletionTime = Integer.MAX_VALUE;
for (AtomicBTreePartition partition : source.values())
minLocalDeletionTime = Math.min(minLocalDeletionTime, partition.stats().minLocalDeletionTime);

return minLocalDeletionTime;
}

public TableMetadata metadata()
{
return metadata;
Expand Down
69 changes: 69 additions & 0 deletions src/java/org/apache/cassandra/db/rows/UnfilteredSource.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.db.rows;

import org.apache.cassandra.db.DataRange;
import org.apache.cassandra.db.DecoratedKey;
import org.apache.cassandra.db.Slices;
import org.apache.cassandra.db.filter.ColumnFilter;
import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator;
import org.apache.cassandra.io.sstable.format.SSTableReadsListener;

/**
* Common data access interface for sstables and memtables.
*/
public interface UnfilteredSource
{
/**
* Returns a row iterator for the given partition, applying the specified row and column filters.
*
* @param key the partition key
* @param slices the row ranges to return
* @param columnFilter filter to apply to all returned partitions
* @param reversed true if the content should be returned in reverse order
* @param listener a listener used to handle internal read events
*/
UnfilteredRowIterator iterator(DecoratedKey key,
Slices slices,
ColumnFilter columnFilter,
boolean reversed,
SSTableReadsListener listener);

default UnfilteredRowIterator iterator(DecoratedKey key)
{
return iterator(key, Slices.ALL, ColumnFilter.NONE, false, SSTableReadsListener.NOOP_LISTENER);
}

/**
* Returns a partition iterator for the given data range.
*
* @param columnFilter filter to apply to all returned partitions
* @param dataRange the partition and clustering range queried
* @param listener a listener used to handle internal read events
*/
UnfilteredPartitionIterator partitionIterator(ColumnFilter columnFilter,
DataRange dataRange,
SSTableReadsListener listener);

/** Minimum timestamp of all stored data */
long getMinTimestamp();

/** Minimum local deletion time in the memtable */
int getMinLocalDeletionTime();
}
Loading

0 comments on commit f835fb5

Please sign in to comment.