Skip to content

Commit

Permalink
Merge branch 'cassandra-4.0' into cassandra-4.1
Browse files Browse the repository at this point in the history
  • Loading branch information
jonmeredith committed Mar 7, 2023
2 parents 426829c + 40f9ca6 commit 6adcff8
Show file tree
Hide file tree
Showing 13 changed files with 525 additions and 35 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Expand Up @@ -6,6 +6,7 @@
* Streaming progress virtual table lock contention can trigger TCP_USER_TIMEOUT and fail streaming (CASSANDRA-18110)
* Fix perpetual load of denylist on read in cases where denylist can never be loaded (CASSANDRA-18116)
Merged from 4.0:
* Improve memtable allocator accounting when updating AtomicBTreePartition (CASSANDRA-18125)
* Update zstd-jni to version 1.5.4-1 (CASSANDRA-18259)
* Split and order IDEA workspace template VM_PARAMETERS (CASSANDRA-18242)
* Log warning message on aggregation queries without key or on multiple keys (CASSANDRA-18219)
Expand Down
Expand Up @@ -21,11 +21,13 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.cassandra.concurrent.ImmediateExecutor;
import org.apache.cassandra.concurrent.ScheduledExecutors;
import org.apache.cassandra.config.Config;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.db.ClusteringComparator;
import org.apache.cassandra.db.ColumnFamilyStore;
Expand Down Expand Up @@ -67,11 +69,20 @@ public abstract class AbstractAllocatorMemtable extends AbstractMemtableWithComm

private static MemtablePool createMemtableAllocatorPool()
{
Config.MemtableAllocationType allocationType = DatabaseDescriptor.getMemtableAllocationType();
long heapLimit = DatabaseDescriptor.getMemtableHeapSpaceInMiB() << 20;
long offHeapLimit = DatabaseDescriptor.getMemtableOffheapSpaceInMiB() << 20;
float memtableCleanupThreshold = DatabaseDescriptor.getMemtableCleanupThreshold();
MemtableCleaner cleaner = AbstractAllocatorMemtable::flushLargestMemtable;
switch (DatabaseDescriptor.getMemtableAllocationType())
return createMemtableAllocatorPoolInternal(allocationType, heapLimit, offHeapLimit, memtableCleanupThreshold, cleaner);
}

@VisibleForTesting
public static MemtablePool createMemtableAllocatorPoolInternal(Config.MemtableAllocationType allocationType,
long heapLimit, long offHeapLimit,
float memtableCleanupThreshold, MemtableCleaner cleaner)
{
switch (allocationType)
{
case unslabbed_heap_buffers_logged:
return new HeapPool.Logged(heapLimit, memtableCleanupThreshold, cleaner);
Expand Down
Expand Up @@ -376,7 +376,7 @@ public Row insert(Row insert)
indexer.onInserted(insert);

this.dataSize += data.dataSize();
onAllocatedOnHeap(data.unsharedHeapSizeExcludingData());
this.heapSize += data.unsharedHeapSizeExcludingData();
if (inserted == null)
inserted = new ArrayList<>();
inserted.add(data);
Expand Down Expand Up @@ -410,12 +410,11 @@ protected void reset()

public Cell<?> merge(Cell<?> previous, Cell<?> insert)
{
if (insert != previous)
{
long timeDelta = Math.abs(insert.timestamp() - previous.timestamp());
if (timeDelta < colUpdateTimeDelta)
colUpdateTimeDelta = timeDelta;
}
if (insert == previous)
return insert;
long timeDelta = Math.abs(insert.timestamp() - previous.timestamp());
if (timeDelta < colUpdateTimeDelta)
colUpdateTimeDelta = timeDelta;
if (cloner != null)
insert = cloner.clone(insert);
dataSize += insert.dataSize() - previous.dataSize();
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/db/rows/ArrayCell.java
Expand Up @@ -105,7 +105,7 @@ public Cell<?> withSkippedValue()
@Override
public Cell<?> clone(ByteBufferCloner cloner)
{
if (value.length == 0)
if (value.length == 0 && path == null)
return this;

return super.clone(cloner);
Expand Down
7 changes: 6 additions & 1 deletion src/java/org/apache/cassandra/db/rows/BTreeRow.java
Expand Up @@ -280,7 +280,12 @@ public Cell<?> getCell(ColumnMetadata c, CellPath path)
public ComplexColumnData getComplexColumnData(ColumnMetadata c)
{
assert c.isComplex();
return (ComplexColumnData) BTree.<Object>find(btree, ColumnMetadata.asymmetricColumnDataComparator, c);
return (ComplexColumnData) getColumnData(c);
}

public ColumnData getColumnData(ColumnMetadata c)
{
return (ColumnData) BTree.<Object>find(btree, ColumnMetadata.asymmetricColumnDataComparator, c);
}

@Override
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/db/rows/BufferCell.java
Expand Up @@ -143,7 +143,7 @@ public long unsharedHeapSize()
@Override
public Cell<?> clone(ByteBufferCloner cloner)
{
if (!value.hasRemaining())
if (!value.hasRemaining() && path == null)
return this;

return super.clone(cloner);
Expand Down
51 changes: 35 additions & 16 deletions src/java/org/apache/cassandra/db/rows/ColumnData.java
Expand Up @@ -99,25 +99,24 @@ public interface PostReconciliationFunction
public static class Reconciler implements UpdateFunction<ColumnData, ColumnData>, AutoCloseable
{
private static final TinyThreadLocalPool<Reconciler> POOL = new TinyThreadLocalPool<>();
private PostReconciliationFunction modifier;
private PostReconciliationFunction postReconcile;
private DeletionTime activeDeletion;
private TinyThreadLocalPool.TinyPool<Reconciler> pool;

private void init(PostReconciliationFunction modifier, DeletionTime activeDeletion)
private void init(PostReconciliationFunction postReconcile, DeletionTime activeDeletion)
{
this.modifier = modifier;
this.postReconcile = postReconcile;
this.activeDeletion = activeDeletion;
}

public ColumnData merge(ColumnData existing, ColumnData update)
{
if (!(existing instanceof ComplexColumnData))
{
Cell<?> existingCell = (Cell) existing, updateCell = (Cell) update;

Cell<?> existingCell = (Cell<?>) existing, updateCell = (Cell<?>) update;
Cell<?> result = Cells.reconcile(existingCell, updateCell);

return modifier.merge(existingCell, result);
return postReconcile.merge(existingCell, result);
}
else
{
Expand All @@ -133,17 +132,22 @@ public ColumnData merge(ColumnData existing, ColumnData update)

Object[] cells;

try (Reconciler reconciler = reconciler(modifier, maxComplexDeletion))
try (Reconciler reconciler = reconciler(postReconcile, maxComplexDeletion))
{
if (!maxComplexDeletion.isLive())
{
if (maxComplexDeletion == existingDeletion)
{
updateTree = BTree.transformAndFilter(updateTree, reconciler::retain);
updateTree = BTree.<ColumnData, ColumnData>transformAndFilter(updateTree, reconciler::removeShadowed);
}
else
{
existingTree = BTree.transformAndFilter(existingTree, reconciler::retain);
Object[] retained = BTree.transformAndFilter(existingTree, reconciler::retain);
if (existingTree != retained)
{
onAllocatedOnHeap(BTree.sizeOnHeapOf(retained) - BTree.sizeOnHeapOf(existingTree));
existingTree = retained;
}
}
}
cells = BTree.update(existingTree, updateTree, existingComplex.column.cellComparator(), (UpdateFunction) reconciler);
Expand All @@ -155,13 +159,13 @@ public ColumnData merge(ColumnData existing, ColumnData update)
@Override
public void onAllocatedOnHeap(long heapSize)
{
modifier.onAllocatedOnHeap(heapSize);
postReconcile.onAllocatedOnHeap(heapSize);
}

@Override
public ColumnData insert(ColumnData insert)
{
return modifier.insert(insert);
return postReconcile.insert(insert);
}

/**
Expand All @@ -171,22 +175,37 @@ public ColumnData insert(ColumnData insert)
* @return {@code null} if the value should be removed from the BTree or the existing value if it should not.
*/
public ColumnData retain(ColumnData existing)
{
return removeShadowed(existing, postReconcile);
}

private ColumnData removeShadowed(ColumnData existing)
{
return removeShadowed(existing, ColumnData.noOp);
}

/**
* Checks if the specified value should be deleted or not.
*
* @param existing the existing value to check
* @return {@code null} if the value should be removed from the BTree or the existing value if it should not.
*/
private ColumnData removeShadowed(ColumnData existing, PostReconciliationFunction recordDeletion)
{
if (!(existing instanceof ComplexColumnData))
{
if (activeDeletion.deletes((Cell) existing))
if (activeDeletion.deletes((Cell<?>) existing))
{
modifier.delete(existing);
recordDeletion.delete(existing);
return null;
}
}
else
{
ComplexColumnData existingComplex = (ComplexColumnData) existing;

if (activeDeletion.supersedes(existingComplex.complexDeletion()))
{
Object[] cells = BTree.transformAndFilter(existingComplex.tree(), this::retain);
Object[] cells = BTree.transformAndFilter(existingComplex.tree(), (ColumnData cd) -> removeShadowed(cd, recordDeletion));
return BTree.isEmpty(cells) ? null : new ComplexColumnData(existingComplex.column, cells, DeletionTime.LIVE);
}
}
Expand All @@ -197,7 +216,7 @@ public ColumnData retain(ColumnData existing)
public void close()
{
activeDeletion = null;
modifier = null;
postReconcile = null;

TinyThreadLocalPool.TinyPool<Reconciler> tmp = pool;
pool = null;
Expand Down
4 changes: 2 additions & 2 deletions src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
Expand Up @@ -135,14 +135,14 @@ public int dataSize()

public long unsharedHeapSize()
{
long heapSize = EMPTY_SIZE + ObjectSizes.sizeOfArray(cells) + complexDeletion.unsharedHeapSize();
long heapSize = EMPTY_SIZE + BTree.sizeOnHeapOf(cells) + complexDeletion.unsharedHeapSize();
return BTree.<Cell>accumulate(cells, (cell, value) -> value + cell.unsharedHeapSize(), heapSize);
}

@Override
public long unsharedHeapSizeExcludingData()
{
long heapSize = EMPTY_SIZE + ObjectSizes.sizeOfArray(cells);
long heapSize = EMPTY_SIZE + BTree.sizeOnHeapOf(cells);
// TODO: this can be turned into a simple multiplication, at least while we have only one Cell implementation
for (Cell<?> cell : this)
heapSize += cell.unsharedHeapSizeExcludingData();
Expand Down
19 changes: 16 additions & 3 deletions src/java/org/apache/cassandra/db/rows/NativeCell.java
Expand Up @@ -72,7 +72,7 @@ public NativeCell(NativeAllocator allocator,
CellPath path)
{
super(column);
long size = simpleSize(value.remaining());
long size = offHeapSizeWithoutPath(value.remaining());

assert value.order() == ByteOrder.BIG_ENDIAN;
assert column.isComplex() == (path != null);
Expand Down Expand Up @@ -105,7 +105,7 @@ public NativeCell(NativeAllocator allocator,
}
}

private static long simpleSize(int length)
private static long offHeapSizeWithoutPath(int length)
{
return VALUE + length;
}
Expand Down Expand Up @@ -138,7 +138,7 @@ public ValueAccessor<ByteBuffer> accessor()

public CellPath path()
{
if (MemoryUtil.getByte(peer+ HAS_CELLPATH) == 0)
if (!hasPath())
return null;

long offset = peer + VALUE + MemoryUtil.getInt(peer + LENGTH);
Expand Down Expand Up @@ -177,4 +177,17 @@ public long unsharedHeapSizeExcludingData()
{
return EMPTY_SIZE;
}

public long offHeapSize()
{
long size = offHeapSizeWithoutPath(MemoryUtil.getInt(peer + LENGTH));
if (hasPath())
size += 4 + MemoryUtil.getInt(peer + size);
return size;
}

private boolean hasPath()
{
return MemoryUtil.getByte(peer+ HAS_CELLPATH) != 0;
}
}
8 changes: 8 additions & 0 deletions src/java/org/apache/cassandra/db/rows/Row.java
Expand Up @@ -149,6 +149,14 @@ public interface Row extends Unfiltered, Iterable<ColumnData>, IMeasurableMemory
*/
public ComplexColumnData getComplexColumnData(ColumnMetadata c);

/**
* The data for a regular or complex column.
*
* @param c the column for which to return the complex data.
* @return the data for {@code c} or {@code null} if the row has no data for this column.
*/
public ColumnData getColumnData(ColumnMetadata c);

/**
* An iterable over the cells of this row.
* <p>
Expand Down
17 changes: 15 additions & 2 deletions src/java/org/apache/cassandra/utils/btree/BTree.java
Expand Up @@ -365,7 +365,9 @@ public static <Compare, Existing extends Compare, Insert extends Compare> Object
toUpdate = insert;
insert = tmp;
}
return updateLeaves(toUpdate, insert, comparator, updateF);
Object[] merged = updateLeaves(toUpdate, insert, comparator, updateF);
updateF.onAllocatedOnHeap(sizeOnHeapOf(merged) - sizeOnHeapOf(toUpdate));
return merged;
}

if (!isLeaf(insert) && isSimple(updateF))
Expand Down Expand Up @@ -2195,6 +2197,9 @@ static int[] sizeMap(Object[] branch)

public static long sizeOnHeapOf(Object[] tree)
{
if (isEmpty(tree))
return 0;

long size = ObjectSizes.sizeOfArray(tree);
if (isLeaf(tree))
return size;
Expand All @@ -2204,6 +2209,14 @@ public static long sizeOnHeapOf(Object[] tree)
return size;
}

private static long sizeOnHeapOfLeaf(Object[] tree)
{
if (isEmpty(tree))
return 0;

return ObjectSizes.sizeOfArray(tree);
}

// Arbitrary boundaries
private static Object POSITIVE_INFINITY = new Object();
private static Object NEGATIVE_INFINITY = new Object();
Expand Down Expand Up @@ -2751,7 +2764,7 @@ else if (!hasOverflow() && unode != null && count == sizeOfLeaf(unode) && areIde
sizeOfLeaf = count;
leaf = drain();
if (allocated >= 0 && sizeOfLeaf > 0)
allocated += ObjectSizes.sizeOfReferenceArray(sizeOfLeaf | 1) - (unode == null ? 0 : ObjectSizes.sizeOfArray(unode));
allocated += ObjectSizes.sizeOfReferenceArray(sizeOfLeaf | 1) - (unode == null ? 0 : sizeOnHeapOfLeaf(unode));
}

count = 0;
Expand Down
Expand Up @@ -191,7 +191,7 @@ void acquired()

void released(long size)
{
assert size >= 0;
assert size >= 0 : "Negative released: " + size;
adjustAllocated(-size);
hasRoom.signalAll();
}
Expand Down

0 comments on commit 6adcff8

Please sign in to comment.