diff --git a/CHANGES.txt b/CHANGES.txt index 2720494bd740..ff410c497099 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.0.9 + * BTree.FastBuilder.reset() fails to clear savedBuffer and savedNextKey, causing ClassCastException and SSTable header corruption during schema disagreement (CASSANDRA-21216, CASSANDRA-21260) * Use estimated compressed size for tables to check if there is enough free space for a compaction (CASSANDRA-21245) * Fix failing select on system_views.settings for non-string keys (CASSANDRA-21348) * Ensure SAI sends range tombstones to the coordinator for queries on static columns (CASSANDRA-21332) diff --git a/src/java/org/apache/cassandra/utils/btree/BTree.java b/src/java/org/apache/cassandra/utils/btree/BTree.java index 8674d714daf8..6f81e2d7e795 100644 --- a/src/java/org/apache/cassandra/utils/btree/BTree.java +++ b/src/java/org/apache/cassandra/utils/btree/BTree.java @@ -3081,7 +3081,7 @@ Object[] drain() * was constructed from for the contents of {@code buffer}. *
* For {@link FastBuilder} these are mostly the same, so they are fetched from a global cache and - * resized accordingly, but for {@link AbstractUpdater} we maintain a buffer of sizes. + * resized accordingly, but for {@link Updater} we maintain a buffer of sizes. */ int setDrainSizeMap(Object[] original, int keysInOriginal, Object[] branch, int keysInBranch) { @@ -3110,7 +3110,7 @@ int setDrainSizeMap(Object[] original, int keysInOriginal, Object[] branch, int * was constructed from for the contents of {@code savedBuffer}. *
* For {@link FastBuilder} these are always the same size, so they are fetched from a global cache, - * but for {@link AbstractUpdater} we maintain a buffer of sizes. + * but for {@link Updater} we maintain a buffer of sizes. * * @return the size of {@code branch} */ @@ -3141,7 +3141,7 @@ int setOverflowSizeMap(Object[] branch, int keys) * was constructed from the contents of both {@code savedBuffer} and {@code buffer} *
* For {@link FastBuilder} these are mostly the same size, so they are fetched from a global cache - * and only the last items updated, but for {@link AbstractUpdater} we maintain a buffer of sizes. + * and only the last items updated, but for {@link Updater} we maintain a buffer of sizes. */ void setRedistributedSizeMap(Object[] branch, int steal) { @@ -3269,11 +3269,57 @@ final LeafBuilder leaf() /** * Clear any references we might still retain, to avoid holding onto memory. - *
- * While this method is not strictly necessary, it exists to - * ensure the implementing classes are aware they must handle it. */ - abstract void reset(); + void reset() + { + leaf().count = 0; + clearLeafBuffer(leaf().buffer); + if (leaf().savedBuffer != null) + clearLeafBuffer(leaf().savedBuffer); + leaf().savedNextKey = null; + BranchBuilder branch = leaf().parent; + while (branch != null && branch.inUse) + { + branch.count = 0; + clearBranchBuffer(branch.buffer); + if (branch.savedBuffer != null) + clearBranchBuffer(branch.savedBuffer); + branch.savedNextKey = null; + branch.inUse = false; + branch = branch.parent; + } + } + + /** + * Clear the contents of a leaf buffer, aborting once we encounter a null entry + * to save time on small trees + */ + private void clearLeafBuffer(Object[] array) + { + if (array[0] == null) + return; + // find first null entry; loop from beginning, to amortise cost over size of working set + int i = 1; + while (i < array.length && array[i] != null) + ++i; + Arrays.fill(array, 0, i, null); + } + + /** + * Clear the contents of a branch buffer, aborting once we encounter a null entry + * to save time on small trees + */ + private void clearBranchBuffer(Object[] array) + { + if (array[0] == null && array[MAX_KEYS] == null) + return; + // find first null entry; loop from beginning, to amortise cost over size of working set + int i = 1; + while (i < MAX_KEYS && array[i] != null) + ++i; + Arrays.fill(array, 0, i, null); + Arrays.fill(array, MAX_KEYS, MAX_KEYS + i + 1, null); + } } /** @@ -3325,21 +3371,6 @@ public void close() } } - @Override - void reset() - { - Arrays.fill(leaf().buffer, null); - leaf().count = 0; - BranchBuilder branch = leaf().parent; - while (branch != null && branch.inUse) - { - Arrays.fill(branch.buffer, null); - branch.count = 0; - branch.inUse = false; - branch = branch.parent; - } - } - public boolean validateEmpty() { LeafOrBranchBuilder cur = leaf(); @@ -3366,60 +3397,6 @@ private static boolean hasOnlyNulls(Object[] buffer) } } - private static abstract class AbstractUpdater extends AbstractFastBuilder implements AutoCloseable - { - void reset() - { - assert leaf().count == 0; - clearLeafBuffer(leaf().buffer); - if (leaf().savedBuffer != null) - Arrays.fill(leaf().savedBuffer, null); - - BranchBuilder branch = leaf().parent; - while (branch != null && branch.inUse) - { - assert branch.count == 0; - clearBranchBuffer(branch.buffer); - if (branch.savedBuffer != null && branch.savedBuffer[0] != null) - Arrays.fill(branch.savedBuffer, null); // by definition full, if non-empty - branch.inUse = false; - branch = branch.parent; - } - } - - /** - * Clear the contents of a branch buffer, aborting once we encounter a null entry - * to save time on small trees - */ - private void clearLeafBuffer(Object[] array) - { - if (array[0] == null) - return; - // find first null entry; loop from beginning, to amortise cost over size of working set - int i = 1; - while (i < array.length && array[i] != null) - ++i; - Arrays.fill(array, 0, i, null); - } - - /** - * Clear the contents of a branch buffer, aborting once we encounter a null entry - * to save time on small trees - */ - private void clearBranchBuffer(Object[] array) - { - if (array[0] == null) - return; - - // find first null entry; loop from beginning, to amortise cost over size of working set - int i = 1; - while (i < MAX_KEYS && array[i] != null) - ++i; - Arrays.fill(array, 0, i, null); - Arrays.fill(array, MAX_KEYS, MAX_KEYS + i + 1, null); - } - } - /** * A pooled object for modifying an existing tree with a new (typically smaller) tree. *
@@ -3434,7 +3411,7 @@ private void clearBranchBuffer(Object[] array)
* Searches within both trees to accelerate the process of modification, instead of performing a simple
* iteration over the new tree.
*/
- private static class Updater
* The approach taken here hopefully balances simplicity, garbage generation and execution time.
*/
- private static abstract class AbstractTransformer extends AbstractUpdater implements AutoCloseable
+ private static abstract class AbstractTransformer extends AbstractFastBuilder implements AutoCloseable
{
/**
* An iterator over the tree we are updating
diff --git a/test/distributed/org/apache/cassandra/distributed/test/BTreeFastBuilderContaminationTest.java b/test/distributed/org/apache/cassandra/distributed/test/BTreeFastBuilderContaminationTest.java
new file mode 100644
index 000000000000..6956f6b2fd8e
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/test/BTreeFastBuilderContaminationTest.java
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.test;
+
+import java.util.List;
+
+import org.junit.Test;
+
+import org.apache.cassandra.distributed.Cluster;
+import org.apache.cassandra.distributed.shared.ShutdownException;
+import org.apache.cassandra.net.Verb;
+
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
+import static org.apache.cassandra.distributed.api.Feature.NETWORK;
+import static org.junit.Assert.fail;
+
+public class BTreeFastBuilderContaminationTest extends TestBaseImpl
+{
+ // 4200 columns * ~18 bytes/name > 64KB large-message threshold
+ // → READ_REQ deserialized on SEPWorker threads, not Netty event loop
+ private static final int NUM_WIDE_COLUMNS = 4200;
+
+ // Small-message scenario: both READ_REQ and MUTATION_REQ stay under 64KB
+ // → deserialized on Netty event loop threads
+ private static final int NUM_SMALL_SOURCE_COLUMNS = 150; // >31 to trigger FastBuilder overflow
+ private static final int NUM_SMALL_VICTIM_COLUMNS = 2000;
+
+ private static final int NUM_PARTITIONS = 200;
+ private static final int NUM_DELETE_PARTITIONS = 300;
+
+ // Verify CASSANDRA-21216/CASSANDRA-21260 fix: stale ColumnMetadata from a failed
+ // READ_REQ deserialization must not leak into a Row BTree during mutation, which can
+ // cause ClassCastException. Source table is wide (~4200 columns) so READ_REQ exceeds
+ // 64KB, meaning it is deserialized on SEPWorker. Victim table is narrow — without the
+ // fix, corruption can happen via BTree.updateLeaves() during mutation execution on
+ // the same SEPWorker thread (SharedExecutorPool threads hop between stages).
+ @Test
+ public void testSchemaDisagreementCorruptsPartitionViaFastBuilder() throws Throwable
+ {
+ try (Cluster cluster = init(builder().withNodes(2)
+ .withConfig(config -> {
+ config.with(NETWORK, GOSSIP);
+ config.set("concurrent_reads", 2);
+ config.set("concurrent_writes", 2);
+ config.set("read_request_timeout_in_ms", 5000L);
+ config.set("write_request_timeout_in_ms", 5000L);
+ })
+ .start()))
+ {
+ createWideSourceTable(cluster);
+
+ cluster.schemaChange(withKeyspace(
+ "CREATE TABLE %s.victim (pk int, ck int, v text, PRIMARY KEY (pk, ck))"));
+
+ cluster.coordinator(1).execute(
+ withKeyspace("INSERT INTO %s.source (pk, src_wide_col_0000) VALUES (1, 42)"), ALL);
+
+ for (int pk = 0; pk < NUM_PARTITIONS; pk++)
+ cluster.get(2).executeInternal(withKeyspace(
+ "INSERT INTO %s.victim (pk, ck, v) VALUES (" + pk + ", 1, 'seed')"));
+
+ createSchemaDisagreement(cluster);
+ poisonFastBuilder(cluster);
+
+ for (int pk = 0; pk < NUM_PARTITIONS; pk++)
+ {
+ try
+ {
+ cluster.coordinator(1).execute(withKeyspace(
+ "INSERT INTO %s.victim (pk, ck, v) VALUES (" + pk + ", 2, 'probe')"), ALL);
+ }
+ catch (Exception e)
+ {
+ if (rootCauseIs(e, ClassCastException.class))
+ fail("ClassCastException from corrupted partition BTree (CASSANDRA-21216): " + e.getMessage());
+ }
+ }
+
+ for (int pk = 0; pk < NUM_PARTITIONS; pk++)
+ {
+ try
+ {
+ cluster.coordinator(1).execute(withKeyspace(
+ "SELECT * FROM %s.victim WHERE pk = " + pk), ALL);
+ }
+ catch (Exception e)
+ {
+ if (rootCauseIs(e, ClassCastException.class))
+ fail("ClassCastException from corrupted partition BTree (CASSANDRA-21216): " + e.getMessage());
+ }
+ }
+
+ try
+ {
+ cluster.get(2).flush(KEYSPACE);
+ }
+ catch (Exception e)
+ {
+ if (rootCauseIs(e, ClassCastException.class))
+ fail("ClassCastException from corrupted partition BTree (CASSANDRA-21216): " + e.getMessage());
+ }
+ }
+ catch (ShutdownException e)
+ {
+ if (rootCauseIs(e, ClassCastException.class))
+ fail("ClassCastException from corrupted partition BTree during shutdown (CASSANDRA-21216): " + e.getMessage());
+ throw e;
+ }
+ }
+
+ // Verify CASSANDRA-21260 fix: SSTable header must not be contaminated via small messages
+ // on the Netty event loop.
+ // Source: 150 columns (>31 → FastBuilder overflow) but only ~3KB → small message.
+ // Victim: 2000 columns, but partition DELETE has empty updatedColumns → tiny message.
+ // Both deserialized on the same Netty event loop thread (channel-to-EventLoop binding).
+ // Without the fix, the poisoned FastBuilder is reused for the victim's SerializationHeader
+ // deserialization.
+ @Test
+ public void testSmallMessageContaminatesSSTableHeaderViaNettyEventLoop() throws Throwable
+ {
+ try (Cluster cluster = init(builder().withNodes(2)
+ .withConfig(config -> {
+ config.with(NETWORK, GOSSIP);
+ config.set("read_request_timeout_in_ms", 5000L);
+ config.set("write_request_timeout_in_ms", 5000L);
+ })
+ .start()))
+ {
+ createTable(cluster, "source", NUM_SMALL_SOURCE_COLUMNS, "src_col");
+ createTable(cluster, "victim", NUM_SMALL_VICTIM_COLUMNS, "vic_col");
+
+ createSchemaDisagreement(cluster);
+ poisonFastBuilder(cluster);
+
+ // Partition deletions to the victim table. Despite the victim having 2000 columns,
+ // a partition-level DELETE has empty updatedColumns (no column operations), so
+ // the MUTATION_REQ is tiny. It is deserialized on the same Netty event loop thread
+ // that handled the failed READ_REQ. The poisoned FastBuilder's stale savedBuffer
+ // is drained even though 0 new columns are added — build() calls propagateOverflow()
+ // when hasOverflow() is true from the previous use.
+ int batchSize = NUM_DELETE_PARTITIONS / 5;
+ for (int round = 0; round < 5; round++)
+ {
+ if (round > 0)
+ poisonFastBuilder(cluster);
+
+ for (int pk = round * batchSize; pk < (round + 1) * batchSize; pk++)
+ {
+ try
+ {
+ cluster.coordinator(1).execute(withKeyspace(
+ "DELETE FROM %s.victim WHERE pk = " + pk), ALL);
+ }
+ catch (Exception ignored)
+ {
+ }
+ }
+ }
+
+ cluster.get(2).flush(KEYSPACE);
+
+ ListUpdateFunction that count the number of call made to apply for each value.
*/