Skip to content
Permalink
Browse files
Optimise BTree build, update and transform operations
Patch Benedict Elliott Smith; reviewed by Branimir Lambov and Benjamin Lerer for CASSANDRA-15510
  • Loading branch information
belliottsmith authored and blerer committed May 6, 2022
1 parent 030831c commit 596daeb7f08e14d69af90fd4f07b9e87f2816681
Showing 24 changed files with 4,764 additions and 1,460 deletions.
@@ -40,7 +40,6 @@
import org.apache.cassandra.utils.btree.BTree;
import org.apache.cassandra.utils.btree.BTreeSearchIterator;
import org.apache.cassandra.utils.btree.BTreeRemoval;
import org.apache.cassandra.utils.btree.UpdateFunction;

/**
* An immutable and sorted list of (non-PK) columns for a given table.
@@ -264,8 +263,7 @@ public Columns mergeTo(Columns other)
if (this == NONE)
return other;

Object[] tree = BTree.<ColumnMetadata>merge(this.columns, other.columns, Comparator.naturalOrder(),
UpdateFunction.noOp());
Object[] tree = BTree.update(this.columns, other.columns, Comparator.naturalOrder());
if (tree == this.columns)
return this;
if (tree == other.columns)
@@ -129,22 +129,22 @@ private long[] addAllWithSizeDeltaInternal(RowUpdater updater, PartitionUpdate u
updater.inputDeletionInfoCopy = update.deletionInfo().copy(HeapAllocator.instance);

deletionInfo = current.deletionInfo.mutableCopy().add(updater.inputDeletionInfoCopy);
updater.allocated(deletionInfo.unsharedHeapSize() - current.deletionInfo.unsharedHeapSize());
updater.onAllocatedOnHeap(deletionInfo.unsharedHeapSize() - current.deletionInfo.unsharedHeapSize());
}
else
{
deletionInfo = current.deletionInfo;
}

RegularAndStaticColumns columns = update.columns().mergeTo(current.columns);
updater.allocated(columns.unsharedHeapSize() - current.columns.unsharedHeapSize());
updater.onAllocatedOnHeap(columns.unsharedHeapSize() - current.columns.unsharedHeapSize());
Row newStatic = update.staticRow();
Row staticRow = newStatic.isEmpty()
? current.staticRow
: (current.staticRow.isEmpty() ? updater.apply(newStatic) : updater.apply(current.staticRow, newStatic));
Object[] tree = BTree.update(current.tree, update.metadata().comparator, update, update.rowCount(), updater);
Object[] tree = BTree.update(current.tree, update.holder().tree, update.metadata().comparator, updater);
EncodingStats newStats = current.stats.mergeWith(update.stats());
updater.allocated(newStats.unsharedHeapSize() - current.stats.unsharedHeapSize());
updater.onAllocatedOnHeap(newStats.unsharedHeapSize() - current.stats.unsharedHeapSize());

if (tree != null && refUpdater.compareAndSet(this, current, new Holder(columns, tree, deletionInfo, staticRow, newStats)))
{
@@ -373,7 +373,7 @@ public Row apply(Row insert)
indexer.onInserted(insert);

this.dataSize += data.dataSize();
allocated(data.unsharedHeapSizeExcludingData());
onAllocatedOnHeap(data.unsharedHeapSizeExcludingData());
if (inserted == null)
inserted = new ArrayList<>();
inserted.add(data);
@@ -390,7 +390,7 @@ public Row apply(Row existing, Row update)
indexer.onUpdated(existing, reconciled);

dataSize += reconciled.dataSize() - existing.dataSize();
allocated(reconciled.unsharedHeapSizeExcludingData() - existing.unsharedHeapSizeExcludingData());
onAllocatedOnHeap(reconciled.unsharedHeapSizeExcludingData() - existing.unsharedHeapSizeExcludingData());
if (inserted == null)
inserted = new ArrayList<>();
inserted.add(reconciled);
@@ -410,7 +410,7 @@ public boolean abortEarly()
return updating.ref != ref;
}

public void allocated(long heapSize)
public void onAllocatedOnHeap(long heapSize)
{
this.heapSize += heapSize;
}
@@ -890,8 +890,8 @@ public PartitionUpdate build()
// assert that we are not calling build() several times
assert !isBuilt : "A PartitionUpdate.Builder should only get built once";
Object[] add = rowBuilder.build();
Object[] merged = BTree.<Row>merge(tree, add, metadata.comparator,
UpdateFunction.Simple.of(Rows::merge));
Object[] merged = BTree.<Row, Row, Row>update(tree, add, metadata.comparator,
UpdateFunction.Simple.of(Rows::merge));

EncodingStats newStats = EncodingStats.Collector.collect(staticRow, BTree.iterator(merged), deletionInfo);

@@ -939,7 +939,7 @@ private BTree.Builder<Row> rowBuilder(int initialCapacity)
public Builder updateAllTimestamp(long newTimestamp)
{
deletionInfo.updateAllTimestamp(newTimestamp - 1);
tree = BTree.<Row>transformAndFilter(tree, (x) -> x.updateAllTimestamp(newTimestamp));
tree = BTree.<Row, Row>transformAndFilter(tree, (x) -> x.updateAllTimestamp(newTimestamp));
staticRow = this.staticRow.updateAllTimestamp(newTimestamp);
return this;
}
@@ -745,7 +745,7 @@ public ColumnData resolve(Object[] cells, int lb, int ub)
}
}

Object[] btree = BTree.build(buildFrom, UpdateFunction.noOp());
Object[] btree = BTree.build(buildFrom);
return new ComplexColumnData(column, btree, deletion);
}
}
@@ -20,6 +20,7 @@
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Objects;
import java.util.function.BiFunction;

import com.google.common.base.Function;

@@ -35,6 +36,7 @@
import org.apache.cassandra.utils.BiLongAccumulator;
import org.apache.cassandra.utils.LongAccumulator;
import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.SearchIterator;
import org.apache.cassandra.utils.btree.BTree;

/**
@@ -98,6 +100,11 @@ public Iterator<Cell<?>> iterator()
return BTree.iterator(cells);
}

public SearchIterator<CellPath, Cell> searchIterator()
{
return BTree.slice(cells, column().asymmetricCellPathComparator(), BTree.Dir.ASC);
}

public Iterator<Cell<?>> reverseIterator()
{
return BTree.iterator(cells, BTree.Dir.DESC);
@@ -208,17 +215,25 @@ public ComplexColumnData purgeDataOlderThan(long timestamp)
return transformAndFilter(newDeletion, (cell) -> cell.purgeDataOlderThan(timestamp));
}

private ComplexColumnData transformAndFilter(DeletionTime newDeletion, Function<? super Cell<?>, ? extends Cell<?>> function)
private ComplexColumnData update(DeletionTime newDeletion, Object[] newCells)
{
Object[] transformed = BTree.transformAndFilter(cells, function);

if (cells == transformed && newDeletion == complexDeletion)
if (cells == newCells && newDeletion == complexDeletion)
return this;

if (newDeletion == DeletionTime.LIVE && BTree.isEmpty(transformed))
if (newDeletion == DeletionTime.LIVE && BTree.isEmpty(newCells))
return null;

return new ComplexColumnData(column, transformed, newDeletion);
return new ComplexColumnData(column, newCells, newDeletion);
}

public ComplexColumnData transformAndFilter(DeletionTime newDeletion, Function<? super Cell, ? extends Cell> function)
{
return update(newDeletion, BTree.transformAndFilter(cells, function));
}

public <V> ComplexColumnData transformAndFilter(BiFunction<? super Cell, ? super V, ? extends Cell> function, V param)
{
return update(complexDeletion, BTree.transformAndFilter(cells, function, param));
}

public ComplexColumnData updateAllTimestamp(long newTimestamp)
@@ -747,7 +747,7 @@ public Row merge(DeletionTime activeDeletion)
// Because some data might have been shadowed by the 'activeDeletion', we could have an empty row
return rowInfo.isEmpty() && rowDeletion.isLive() && dataBuffer.isEmpty()
? null
: BTreeRow.create(clustering, rowInfo, rowDeletion, BTree.build(dataBuffer, UpdateFunction.noOp()));
: BTreeRow.create(clustering, rowInfo, rowDeletion, BTree.build(dataBuffer));
}

public Clustering<?> mergedClustering()
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.utils;

import java.util.Iterator;

import org.apache.cassandra.utils.caching.TinyThreadLocalPool;

public interface BulkIterator<V> extends AutoCloseable
{
void fetch(Object[] into, int offset, int count);
V next();
default void close() {};

public static class FromArray<V> implements BulkIterator<V>, AutoCloseable
{
private static final TinyThreadLocalPool<FromArray> POOL = new TinyThreadLocalPool<>();

private Object[] from;
private int i;
private TinyThreadLocalPool.TinyPool<FromArray> pool;

private void init(Object[] from, int offset)
{
this.from = from;
this.i = offset;
}

public void close()
{
pool.offer(this);
from = null;
pool = null;
}

public void fetch(Object[] into, int offset, int count)
{
System.arraycopy(from, i, into, offset, count);
i += count;
}

public V next()
{
return (V) from[i++];
}
}

public static class Adapter<V> implements BulkIterator<V>
{
final Iterator<V> adapt;

private Adapter(Iterator<V> adapt)
{
this.adapt = adapt;
}

public void fetch(Object[] into, int offset, int count)
{
count += offset;
while (offset < count && adapt.hasNext())
into[offset++] = adapt.next();
}

public boolean hasNext()
{
return adapt.hasNext();
}

public V next()
{
return adapt.next();
}
}

public static <V> FromArray<V> of(Object[] from)
{
return of(from, 0);
}

public static <V> FromArray<V> of(Object[] from, int offset)
{
TinyThreadLocalPool.TinyPool<FromArray> pool = FromArray.POOL.get();
FromArray<V> result = pool.poll();
if (result == null)
result = new FromArray<>();
result.init(from, offset);
result.pool = pool;
return result;
}

public static <V> Adapter<V> of(Iterator<V> from)
{
return new Adapter<>(from);
}
}

0 comments on commit 596daeb

Please sign in to comment.