Skip to content

Commit

Permalink
Optimise BTree build, update and transform operations
Browse files Browse the repository at this point in the history
Patch Benedict Elliott Smith; reviewed by Branimir Lambov and Benjamin Lerer for CASSANDRA-15510
  • Loading branch information
belliottsmith authored and blerer committed Apr 22, 2022
1 parent 2873c91 commit 018c8e0
Show file tree
Hide file tree
Showing 26 changed files with 4,770 additions and 1,463 deletions.
3 changes: 3 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
4.0.5
* Optimise BTree build,update and transform operations (CASSANDRA-15510)

4.0.4
* Clean up schema migration coordinator and tests (CASSANDRA-17533)
* Shut repair task executor down without interruption to avoid compromising shared channel proxies (CASSANDRA-17466)
Expand Down
4 changes: 3 additions & 1 deletion build.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1236,10 +1236,12 @@

<target name="build-jmh" depends="build-test, jar" description="Create JMH uber jar">
<jar jarfile="${build.test.dir}/deps.jar">
<zipgroupfileset dir="${build.dir.lib}/jars">
<zipgroupfileset dir="${test.lib}/jars">
<include name="*jmh*.jar"/>
<include name="jopt*.jar"/>
<include name="commons*.jar"/>
<include name="junit*.jar"/>
<include name="hamcrest*.jar"/>
</zipgroupfileset>
<zipgroupfileset dir="${build.lib}" includes="*.jar"/>
</jar>
Expand Down
4 changes: 1 addition & 3 deletions src/java/org/apache/cassandra/db/Columns.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
import org.apache.cassandra.utils.btree.BTree;
import org.apache.cassandra.utils.btree.BTreeSearchIterator;
import org.apache.cassandra.utils.btree.BTreeRemoval;
import org.apache.cassandra.utils.btree.UpdateFunction;

/**
* An immutable and sorted list of (non-PK) columns for a given table.
Expand Down Expand Up @@ -264,8 +263,7 @@ public Columns mergeTo(Columns other)
if (this == NONE)
return other;

Object[] tree = BTree.<ColumnMetadata>merge(this.columns, other.columns, Comparator.naturalOrder(),
UpdateFunction.noOp());
Object[] tree = BTree.update(this.columns, other.columns, Comparator.naturalOrder());
if (tree == this.columns)
return this;
if (tree == other.columns)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,22 +127,22 @@ private long[] addAllWithSizeDeltaInternal(RowUpdater updater, PartitionUpdate u
updater.inputDeletionInfoCopy = update.deletionInfo().copy(HeapAllocator.instance);

deletionInfo = current.deletionInfo.mutableCopy().add(updater.inputDeletionInfoCopy);
updater.allocated(deletionInfo.unsharedHeapSize() - current.deletionInfo.unsharedHeapSize());
updater.onAllocatedOnHeap(deletionInfo.unsharedHeapSize() - current.deletionInfo.unsharedHeapSize());
}
else
{
deletionInfo = current.deletionInfo;
}

RegularAndStaticColumns columns = update.columns().mergeTo(current.columns);
updater.allocated(columns.unsharedHeapSize() - current.columns.unsharedHeapSize());
updater.onAllocatedOnHeap(columns.unsharedHeapSize() - current.columns.unsharedHeapSize());
Row newStatic = update.staticRow();
Row staticRow = newStatic.isEmpty()
? current.staticRow
: (current.staticRow.isEmpty() ? updater.apply(newStatic) : updater.apply(current.staticRow, newStatic));
Object[] tree = BTree.update(current.tree, update.metadata().comparator, update, update.rowCount(), updater);
Object[] tree = BTree.update(current.tree, update.holder().tree, update.metadata().comparator, updater);
EncodingStats newStats = current.stats.mergeWith(update.stats());
updater.allocated(newStats.unsharedHeapSize() - current.stats.unsharedHeapSize());
updater.onAllocatedOnHeap(newStats.unsharedHeapSize() - current.stats.unsharedHeapSize());

if (tree != null && refUpdater.compareAndSet(this, current, new Holder(columns, tree, deletionInfo, staticRow, newStats)))
{
Expand Down Expand Up @@ -371,7 +371,7 @@ public Row apply(Row insert)
indexer.onInserted(insert);

this.dataSize += data.dataSize();
allocated(data.unsharedHeapSizeExcludingData());
onAllocatedOnHeap(data.unsharedHeapSizeExcludingData());
if (inserted == null)
inserted = new ArrayList<>();
inserted.add(data);
Expand All @@ -388,7 +388,7 @@ public Row apply(Row existing, Row update)
indexer.onUpdated(existing, reconciled);

dataSize += reconciled.dataSize() - existing.dataSize();
allocated(reconciled.unsharedHeapSizeExcludingData() - existing.unsharedHeapSizeExcludingData());
onAllocatedOnHeap(reconciled.unsharedHeapSizeExcludingData() - existing.unsharedHeapSizeExcludingData());
if (inserted == null)
inserted = new ArrayList<>();
inserted.add(reconciled);
Expand All @@ -408,7 +408,7 @@ public boolean abortEarly()
return updating.ref != ref;
}

public void allocated(long heapSize)
public void onAllocatedOnHeap(long heapSize)
{
this.heapSize += heapSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -858,8 +858,8 @@ public PartitionUpdate build()
// assert that we are not calling build() several times
assert !isBuilt : "A PartitionUpdate.Builder should only get built once";
Object[] add = rowBuilder.build();
Object[] merged = BTree.<Row>merge(tree, add, metadata.comparator,
UpdateFunction.Simple.of(Rows::merge));
Object[] merged = BTree.<Row, Row, Row>update(tree, add, metadata.comparator,
UpdateFunction.Simple.of(Rows::merge));

EncodingStats newStats = EncodingStats.Collector.collect(staticRow, BTree.iterator(merged), deletionInfo);

Expand Down Expand Up @@ -907,7 +907,7 @@ private BTree.Builder<Row> rowBuilder(int initialCapacity)
public Builder updateAllTimestamp(long newTimestamp)
{
deletionInfo.updateAllTimestamp(newTimestamp - 1);
tree = BTree.<Row>transformAndFilter(tree, (x) -> x.updateAllTimestamp(newTimestamp));
tree = BTree.<Row, Row>transformAndFilter(tree, (x) -> x.updateAllTimestamp(newTimestamp));
staticRow = this.staticRow.updateAllTimestamp(newTimestamp);
return this;
}
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/db/rows/BTreeRow.java
Original file line number Diff line number Diff line change
Expand Up @@ -720,7 +720,7 @@ public ColumnData resolve(Object[] cells, int lb, int ub)
}
}

Object[] btree = BTree.build(buildFrom, UpdateFunction.noOp());
Object[] btree = BTree.build(buildFrom);
return new ComplexColumnData(column, btree, deletion);
}
}
Expand Down
27 changes: 21 additions & 6 deletions src/java/org/apache/cassandra/db/rows/ComplexColumnData.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Objects;
import java.util.function.BiFunction;

import com.google.common.base.Function;

Expand All @@ -35,6 +36,7 @@
import org.apache.cassandra.utils.BiLongAccumulator;
import org.apache.cassandra.utils.LongAccumulator;
import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.SearchIterator;
import org.apache.cassandra.utils.btree.BTree;

/**
Expand Down Expand Up @@ -98,6 +100,11 @@ public Iterator<Cell<?>> iterator()
return BTree.iterator(cells);
}

public SearchIterator<CellPath, Cell> searchIterator()
{
return BTree.slice(cells, column().asymmetricCellPathComparator(), BTree.Dir.ASC);
}

public Iterator<Cell<?>> reverseIterator()
{
return BTree.iterator(cells, BTree.Dir.DESC);
Expand Down Expand Up @@ -195,17 +202,25 @@ public ComplexColumnData withOnlyQueriedData(ColumnFilter filter)
return transformAndFilter(complexDeletion, (cell) -> filter.fetchedCellIsQueried(column, cell.path()) ? null : cell);
}

private ComplexColumnData transformAndFilter(DeletionTime newDeletion, Function<? super Cell<?>, ? extends Cell<?>> function)
private ComplexColumnData update(DeletionTime newDeletion, Object[] newCells)
{
Object[] transformed = BTree.transformAndFilter(cells, function);

if (cells == transformed && newDeletion == complexDeletion)
if (cells == newCells && newDeletion == complexDeletion)
return this;

if (newDeletion == DeletionTime.LIVE && BTree.isEmpty(transformed))
if (newDeletion == DeletionTime.LIVE && BTree.isEmpty(newCells))
return null;

return new ComplexColumnData(column, transformed, newDeletion);
return new ComplexColumnData(column, newCells, newDeletion);
}

public ComplexColumnData transformAndFilter(DeletionTime newDeletion, Function<? super Cell, ? extends Cell> function)
{
return update(newDeletion, BTree.transformAndFilter(cells, function));
}

public <V> ComplexColumnData transformAndFilter(BiFunction<? super Cell, ? super V, ? extends Cell> function, V param)
{
return update(complexDeletion, BTree.transformAndFilter(cells, function, param));
}

public ComplexColumnData updateAllTimestamp(long newTimestamp)
Expand Down
2 changes: 1 addition & 1 deletion src/java/org/apache/cassandra/db/rows/Row.java
Original file line number Diff line number Diff line change
Expand Up @@ -732,7 +732,7 @@ public Row merge(DeletionTime activeDeletion)
// Because some data might have been shadowed by the 'activeDeletion', we could have an empty row
return rowInfo.isEmpty() && rowDeletion.isLive() && dataBuffer.isEmpty()
? null
: BTreeRow.create(clustering, rowInfo, rowDeletion, BTree.build(dataBuffer, UpdateFunction.noOp()));
: BTreeRow.create(clustering, rowInfo, rowDeletion, BTree.build(dataBuffer));
}

public Clustering<?> mergedClustering()
Expand Down
112 changes: 112 additions & 0 deletions src/java/org/apache/cassandra/utils/BulkIterator.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.utils;

import java.util.Iterator;

import org.apache.cassandra.utils.caching.TinyThreadLocalPool;

public interface BulkIterator<V> extends AutoCloseable
{
void fetch(Object[] into, int offset, int count);
V next();
default void close() {};

public static class FromArray<V> implements BulkIterator<V>, AutoCloseable
{
private static final TinyThreadLocalPool<FromArray> POOL = new TinyThreadLocalPool<>();

private Object[] from;
private int i;
private TinyThreadLocalPool.TinyPool<FromArray> pool;

private void init(Object[] from, int offset)
{
this.from = from;
this.i = offset;
}

public void close()
{
pool.offer(this);
from = null;
pool = null;
}

public void fetch(Object[] into, int offset, int count)
{
System.arraycopy(from, i, into, offset, count);
i += count;
}

public V next()
{
return (V) from[i++];
}
}

public static class Adapter<V> implements BulkIterator<V>
{
final Iterator<V> adapt;

private Adapter(Iterator<V> adapt)
{
this.adapt = adapt;
}

public void fetch(Object[] into, int offset, int count)
{
count += offset;
while (offset < count && adapt.hasNext())
into[offset++] = adapt.next();
}

public boolean hasNext()
{
return adapt.hasNext();
}

public V next()
{
return adapt.next();
}
}

public static <V> FromArray<V> of(Object[] from)
{
return of(from, 0);
}

public static <V> FromArray<V> of(Object[] from, int offset)
{
TinyThreadLocalPool.TinyPool<FromArray> pool = FromArray.POOL.get();
FromArray<V> result = pool.poll();
if (result == null)
result = new FromArray<>();
result.init(from, offset);
result.pool = pool;
return result;
}

public static <V> Adapter<V> of(Iterator<V> from)
{
return new Adapter<>(from);
}
}

0 comments on commit 018c8e0

Please sign in to comment.