diff --git a/CHANGES.txt b/CHANGES.txt
index 5c43c68bf373..3ea5a79229c8 100644
--- a/CHANGES.txt
+++ b/CHANGES.txt
@@ -1,4 +1,5 @@
5.0-beta2
+ * Fix the correspondingMessagingVersion of SSTable format and improve TTL overflow tests coverage (CASSANDRA-19197)
* Fix resource cleanup after SAI query timeouts (CASSANDRA-19177)
* Suppress CVE-2023-6481 (CASSANDRA-19184)
Merged from 4.1:
@@ -7,6 +8,7 @@ Merged from 4.0:
Merged from 3.11:
Merged from 3.0:
+
5.0-beta1
* Fix SAI intersection queries (CASSANDRA-19011)
* Clone EndpointState before sending GossipShutdown message (CASSANDRA-19115)
diff --git a/build.xml b/build.xml
index e3f5a517a55f..a701786f84e7 100644
--- a/build.xml
+++ b/build.xml
@@ -1237,12 +1237,16 @@
+
+
+
+
@@ -1764,6 +1768,7 @@
+
@@ -1800,6 +1805,7 @@
+
@@ -1815,6 +1821,7 @@
+
diff --git a/conf/cassandra.yaml b/conf/cassandra.yaml
index ba0eb864a686..3f9dfc577e1f 100644
--- a/conf/cassandra.yaml
+++ b/conf/cassandra.yaml
@@ -2113,27 +2113,27 @@ drop_compact_storage_enabled: false
# This property indicates with what Cassandra major version the storage format will be compatible with.
#
-# The chosen storage compatiblity mode will determine the versions of the written sstables, commitlogs, hints,
-# etc. Those storage elements will use the higher minor versions of the major version that corresponds to the
-# Cassandra version we want to stay compatible with. For example, if we want to stay compatible with Cassandra 4.0
-# or 4.1, the value of this property should be 4, and that will make us use 'oa' sstables.
+# The chosen storage compatibility mode will determine the versions of the written sstables, commitlogs, hints, etc.
+# For example, if we're going to remain compatible with Cassandra 4.x, the value of this property should be 4, which
+# will make us use sstables in the latest N version of the BIG format.
#
-# This will also determine if certain features depending on newer formats are available. For example, extended TTLs
-# up to 2106 depend on the sstable, commitlog, hints and messaging versions that were introduced by Cassandra 5.0,
-# so that feature won't be available if this property is set to CASSANDRA_4. See upgrade guides for details. Currently
-# the only supported major is CASSANDRA_4.
+# This will also determine if certain features that depend on newer formats are available. For example, extended TTL
+# (up to 2106) depends on the sstable, commit-log, hints, and messaging versions introduced by Cassandra 5.0, so that
+# feature won't be available if this property is set to CASSANDRA_4. See the upgrade guide for more details.
#
-# Possible values are in the StorageCompatibilityMode.java file accessible online. At the time of writing these are:
+# Possible values are:
# - CASSANDRA_4: Stays compatible with the 4.x line in features, formats and component versions.
-# - UPGRADING: The cluster monitors nodes versions during this interim stage. _This has a cost_ but ensures any new features,
-# formats, versions, etc are enabled safely.
+# - UPGRADING: The cluster monitors the version of each node during this interim stage. This has a cost but ensures
+# all new features, formats, versions, etc. are enabled safely.
# - NONE: Start with all the new features and formats enabled.
#
# A typical upgrade would be:
-# - Do a rolling upgrade starting all nodes in CASSANDRA_Y compatibility mode.
-# - Once the new binary is rendered stable do a rolling restart with UPGRADING. The cluster will enable new features in a safe way
-# until all nodes are started in UPGRADING, then all new features are enabled.
-# - Do a rolling restart with all nodes starting with NONE. This sheds the extra cost of checking nodes versions and ensures
-# a stable cluster. If a node from a previous version was started by accident we won't any longer toggle behaviors as when UPGRADING.
+# - Do a rolling upgrade, starting all nodes in CASSANDRA_X compatibility mode.
+# - Once the new binary is rendered stable, do a rolling restart with the UPGRADING mode. The cluster will keep new
+# features disabled until all nodes are started in the UPGRADING mode; when that happens, new features controlled by
+# the storage compatibility mode are enabled.
+# - Do a rolling restart with all nodes starting with the NONE mode. This eliminates the cost of checking node versions
+# and ensures stability. If Cassandra was started at the previous version by accident, a node with disabled
+# compatibility mode would no longer toggle behaviors as when it was running in the UPGRADING mode.
#
storage_compatibility_mode: CASSANDRA_4
diff --git a/ide/idea/workspace.xml b/ide/idea/workspace.xml
index ffd7530ad3eb..89528b240854 100644
--- a/ide/idea/workspace.xml
+++ b/ide/idea/workspace.xml
@@ -200,8 +200,9 @@
-Dmigration-sstable-root=$PROJECT_DIR$/test/data/migration-sstables
-XX:ActiveProcessorCount=2
-XX:HeapDumpPath=build/test
- -XX:MaxMetaspaceSize=1G
+ -XX:MaxMetaspaceSize=2G
-XX:SoftRefLRUPolicyMSPerMB=0
+ -Xmx4G
-ea" />
diff --git a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
index 8e9de04b2aa8..4abfc88b5a4f 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/big/BigFormat.java
@@ -385,7 +385,9 @@ static class BigVersion extends Version
super(format, version);
isLatestVersion = version.compareTo(current_version) == 0;
- correspondingMessagingVersion = MessagingService.VERSION_30;
+
+ // Note that, we probably forgot to change that to 40 for N version, and therefore we cannot do it now.
+ correspondingMessagingVersion = version.compareTo("oa") >= 0 ? MessagingService.VERSION_50 : MessagingService.VERSION_30;
hasCommitLogLowerBound = version.compareTo("mb") >= 0;
hasCommitLogIntervals = version.compareTo("mc") >= 0;
diff --git a/src/java/org/apache/cassandra/io/sstable/format/bti/BtiFormat.java b/src/java/org/apache/cassandra/io/sstable/format/bti/BtiFormat.java
index 65d97d3d40d5..e7703c6a0612 100644
--- a/src/java/org/apache/cassandra/io/sstable/format/bti/BtiFormat.java
+++ b/src/java/org/apache/cassandra/io/sstable/format/bti/BtiFormat.java
@@ -302,7 +302,7 @@ static class BtiVersion extends Version
super(format, version);
isLatestVersion = version.compareTo(current_version) == 0;
- correspondingMessagingVersion = MessagingService.VERSION_40;
+ correspondingMessagingVersion = MessagingService.VERSION_50;
}
@Override
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeTTLOverflowAfterUpgradeTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeTTLOverflowAfterUpgradeTest.java
new file mode 100644
index 000000000000..6e6de0bff0fd
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeTTLOverflowAfterUpgradeTest.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.junit.Test;
+
+import static org.apache.cassandra.utils.StorageCompatibilityMode.NONE;
+import static org.apache.cassandra.utils.StorageCompatibilityMode.UPGRADING;
+
+public class MixedModeTTLOverflowAfterUpgradeTest extends MixedModeTTLOverflowUpgradeTestBase
+{
+ @Test
+ public void testTTLOverflowAfterUpgrade() throws Throwable
+ {
+ testTTLOverflow((cluster, node) -> {
+ cluster.disableAutoCompaction(KEYSPACE);
+ if (node == 1) // only node1 is upgraded, and the cluster is in mixed versions mode
+ {
+ verify(Step.NODE1_40_NODE2_PREV, cluster, true);
+ }
+ else // both nodes have been upgraded, and the cluster isn't in mixed version mode anymore
+ {
+ verify(Step.NODE1_40_NODE2_40, cluster, true);
+
+ // We restart node1 with compatibility mode UPGRADING
+ restartNodeWithCompatibilityMode(cluster, 1, UPGRADING);
+ // since node2 is still in 4.0 compatibility mode, the limit should remain 2038
+ verify(Step.NODE1_UPGRADING_NODE2_40, cluster, true);
+
+ // We restart node2 in UPGRADING compatibility mode
+ restartNodeWithCompatibilityMode(cluster, 2, UPGRADING);
+ // Both nodes are in UPGRADING compatibility mode, so the limit should be 2106
+ verify(Step.NODE1_UPGRADING_NODE2_UPGRADING, cluster, false);
+
+ // We restart the cluster out of compatibility mode, so the limit should be 2106
+ restartNodeWithCompatibilityMode(cluster, 1, NONE);
+ verify(Step.NODE1_NONE_NODE2_UPGRADING, cluster, false);
+
+ restartNodeWithCompatibilityMode(cluster, 2, NONE);
+ verify(Step.NODE1_NONE_NODE2_NONE, cluster, false);
+ }
+ });
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeTTLOverflowDuringUpgradeTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeTTLOverflowDuringUpgradeTest.java
new file mode 100644
index 000000000000..c78a4ae252f8
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeTTLOverflowDuringUpgradeTest.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import org.junit.Test;
+
+import static org.apache.cassandra.utils.StorageCompatibilityMode.NONE;
+import static org.apache.cassandra.utils.StorageCompatibilityMode.UPGRADING;
+
+public class MixedModeTTLOverflowDuringUpgradeTest extends MixedModeTTLOverflowUpgradeTestBase
+{
+ @Test
+ public void testTTLOverflowDuringUpgrade() throws Throwable
+ {
+ testTTLOverflow((cluster, node) -> {
+ cluster.disableAutoCompaction(KEYSPACE);
+ if (node == 1) // only node1 is upgraded, and the cluster is in mixed versions mode
+ {
+ verify(Step.NODE1_40_NODE2_PREV, cluster, true);
+
+ // We restart the upgraded node 1 with compatibility mode = UPGRADING
+ restartNodeWithCompatibilityMode(cluster, 1, UPGRADING);
+ // 2038 should still be the limit, because node2 is not upgraded yet
+ verify(Step.NODE1_UPGRADING_NODE2_PREV, cluster, true);
+ }
+ else // both nodes have been upgraded, and the cluster isn't in mixed version mode anymore
+ {
+ // Once we have completed the upgrade, 2038 should still be the limit because
+ // node2 is still in 4.x compatibility mode
+ verify(Step.NODE1_UPGRADING_NODE2_40, cluster, true);
+
+ // We restart the last upgraded node in UPGRADING compatibility mode
+ restartNodeWithCompatibilityMode(cluster, 2, UPGRADING);
+ // Both nodes are in UPGRADING compatibility mode, so the limit should be 2106
+ verify(Step.NODE1_UPGRADING_NODE2_UPGRADING, cluster, false);
+
+ // We restart get both nodes out of compatibility mode, so the limit should be 2106.
+ restartNodeWithCompatibilityMode(cluster, 1, NONE);
+ verify(Step.NODE1_NONE_NODE2_UPGRADING, cluster, false);
+ restartNodeWithCompatibilityMode(cluster, 2, NONE);
+ verify(Step.NODE1_NONE_NODE2_NONE, cluster, false);
+ }
+ });
+ }
+}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeTTLOverflowUpgradeTest.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeTTLOverflowUpgradeTest.java
deleted file mode 100644
index 3f51fd8776c9..000000000000
--- a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeTTLOverflowUpgradeTest.java
+++ /dev/null
@@ -1,161 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.cassandra.distributed.upgrade;
-
-import org.junit.Test;
-
-import org.apache.cassandra.cql3.Attributes;
-import org.apache.cassandra.db.rows.Cell;
-import org.apache.cassandra.distributed.UpgradeableCluster;
-import org.apache.cassandra.distributed.api.ICoordinator;
-import org.apache.cassandra.utils.Clock;
-import org.apache.cassandra.utils.StorageCompatibilityMode;
-import org.assertj.core.api.Assertions;
-
-import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
-import static org.apache.cassandra.utils.StorageCompatibilityMode.NONE;
-import static org.apache.cassandra.utils.StorageCompatibilityMode.UPGRADING;
-
-/**
- * Tests TTL the overflow policy triggers at the correct limit: year 2038 <=nb or 2186 >=oa
- *
- * <=oa overflow policy triggers at year 2038. That could be <=4.1 or 5.0 with 4.x storage compatibility
- * >oa overflow policy triggers at year 2106. That is >=5.0 using >=5.x storage compatibility
- *
- * @see StorageCompatibilityMode
- */
-public class MixedModeTTLOverflowUpgradeTest extends UpgradeTestBase
-{
- @Test
- public void testTTLOverflowDuringUpgrade() throws Throwable
- {
- testTTLOverflow((cluster, node) -> {
- if (node == 1) // only node1 is upgraded, and the cluster is in mixed versions mode
- {
- assertPolicyTriggersAt2038(cluster.coordinator(1));
- assertPolicyTriggersAt2038(cluster.coordinator(2));
-
- // We restart the upgraded node out of 4.0 storage compatibility,
- // and we set it to be compatible with 5.0.
- // 2038 should still be the limit because there is still a not upgraded node.
- restartNodeWithCompatibilityMode(cluster, 1, UPGRADING);
- assertPolicyTriggersAt2038(cluster.coordinator(1));
- assertPolicyTriggersAt2038(cluster.coordinator(2));
- }
- else // both nodes have been upgraded, and the cluster isn't in mixed version mode anymore
- {
- // Once we have completed the upgrade, 2038 should still be the limit because there is still one node
- // in 5.0 storage compatibility mode.
- assertPolicyTriggersAt2038(cluster.coordinator(1));
- assertPolicyTriggersAt2038(cluster.coordinator(2));
-
- // We restart the last upgraded node in 5.0 compatibility mode, so both nodes are now in 5.0
- // compatibility mode, and the limit should be 2106.
- restartNodeWithCompatibilityMode(cluster, 2, UPGRADING);
- assertPolicyTriggersAt2106(cluster.coordinator(1));
- assertPolicyTriggersAt2106(cluster.coordinator(2));
-
- // We restart get both nodes out of compatibility mode, so the limit should be 2106.
- restartNodeWithCompatibilityMode(cluster, 1, NONE);
- restartNodeWithCompatibilityMode(cluster, 2, NONE);
- assertPolicyTriggersAt2106(cluster.coordinator(1));
- assertPolicyTriggersAt2106(cluster.coordinator(2));
- }
- });
- }
-
- @Test
- public void testTTLOverflowAfterUpgrade() throws Throwable
- {
- testTTLOverflow((cluster, node) -> {
- if (node == 1) // only node1 is upgraded, and the cluster is in mixed versions mode
- {
- assertPolicyTriggersAt2038(cluster.coordinator(1));
- assertPolicyTriggersAt2038(cluster.coordinator(2));
- }
- else // both nodes have been upgraded, and the cluster isn't in mixed version mode anymore
- {
- assertPolicyTriggersAt2038(cluster.coordinator(1));
- assertPolicyTriggersAt2038(cluster.coordinator(2));
-
- // We restart one node on 5.0 >oa hence 2038 should still be the limit as the other node is 5.0 <=oa
- // We're on compatibility mode where oa and oa nodes are a possibility
- restartNodeWithCompatibilityMode(cluster, 1, UPGRADING);
- assertPolicyTriggersAt2038(cluster.coordinator(1));
- assertPolicyTriggersAt2038(cluster.coordinator(2));
-
- // We restart the other node so they're all on 5.0 >oa hence 2106 should be the limit
- restartNodeWithCompatibilityMode(cluster, 2, UPGRADING);
- assertPolicyTriggersAt2106(cluster.coordinator(1));
- assertPolicyTriggersAt2106(cluster.coordinator(2));
-
- // We restart the cluster out of compatibility mode once everything is 5.0oa TTL 2106
- restartNodeWithCompatibilityMode(cluster, 1, NONE);
- restartNodeWithCompatibilityMode(cluster, 2, NONE);
- assertPolicyTriggersAt2106(cluster.coordinator(1));
- assertPolicyTriggersAt2106(cluster.coordinator(2));
- }
- });
- }
-
- private static void testTTLOverflow(RunOnClusterAndNode runAfterNodeUpgrade) throws Throwable
- {
- new TestCase()
- .nodes(2)
- .nodesToUpgradeOrdered(1, 2)
- .upgradesToCurrentFrom(v40)
- .setup(cluster -> {
- cluster.schemaChange(withKeyspace("CREATE TABLE %s.t (k int PRIMARY KEY, v int)"));
-
- assertPolicyTriggersAt2038(cluster.coordinator(1));
- assertPolicyTriggersAt2038(cluster.coordinator(2));
- })
- .runAfterNodeUpgrade(runAfterNodeUpgrade)
- .run();
- }
-
- private static void restartNodeWithCompatibilityMode(UpgradeableCluster cluster, int node, StorageCompatibilityMode mode) throws Throwable
- {
- cluster.get(node).shutdown().get();
- cluster.get(node).config().set("storage_compatibility_mode", mode.toString());
- cluster.get(node).startup();
- }
-
- private static void assertPolicyTriggersAt2038(ICoordinator coordinator)
- {
- Assertions.assertThatThrownBy(() -> coordinator.execute(withKeyspace("INSERT INTO %s.t (k, v) VALUES (0, 0) USING TTL " + Attributes.MAX_TTL), ALL))
- .hasMessageContaining("exceeds maximum supported expiration date")
- .hasMessageContaining("2038");
- }
-
- private static void assertPolicyTriggersAt2106(ICoordinator coordinator)
- {
- boolean overflowPoliciesApply = (Clock.Global.currentTimeMillis() / 1000) > (Cell.MAX_DELETION_TIME - Attributes.MAX_TTL);
-
- if (overflowPoliciesApply)
- {
- // This code won't run until 2086
- Assertions.assertThatThrownBy(() -> coordinator.execute(withKeyspace("INSERT INTO %s.t (k, v) VALUES (0, 0) USING TTL " + Attributes.MAX_TTL), ALL))
- .hasMessageContaining("exceeds maximum supported expiration date")
- .hasMessageContaining("2106");
- }
- else
- coordinator.execute(withKeyspace("INSERT INTO %s.t (k, v) VALUES (0, 0) USING TTL " + Attributes.MAX_TTL), ALL);
- }
-}
diff --git a/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeTTLOverflowUpgradeTestBase.java b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeTTLOverflowUpgradeTestBase.java
new file mode 100644
index 000000000000..e64dca295d3f
--- /dev/null
+++ b/test/distributed/org/apache/cassandra/distributed/upgrade/MixedModeTTLOverflowUpgradeTestBase.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.cassandra.distributed.upgrade;
+
+import java.util.List;
+import java.util.function.BiConsumer;
+import java.util.stream.Stream;
+
+import org.apache.cassandra.cql3.Attributes;
+import org.apache.cassandra.distributed.UpgradeableCluster;
+import org.apache.cassandra.distributed.api.ICoordinator;
+import org.apache.cassandra.utils.Clock;
+import org.apache.cassandra.utils.StorageCompatibilityMode;
+import org.assertj.core.api.Assertions;
+import org.assertj.core.data.Offset;
+
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.ALL;
+import static org.apache.cassandra.distributed.api.ConsistencyLevel.LOCAL_ONE;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests TTL the overflow policy triggers at the correct limit
+ *
+ * sstable version < BIG:oa, overflow policy triggers at year 2038. That could be <=4.1 or 5.0 with 4.x storage compatibility
+ * sstable version >= BIG:oa or BTI - overflow policy triggers at year 2106. That is >=5.0 using no storage compatibility
+ *
+ * @see StorageCompatibilityMode
+ *
+ * This test has been split by subclassing in order to avoid OOMs.
+ */
+public abstract class MixedModeTTLOverflowUpgradeTestBase extends UpgradeTestBase
+{
+ static final int SMALL_TTL = 3600;
+
+ static final String T_REGULAR = "table_regular";
+ static final String T_CLUST = "table_clust";
+ static final String T_STATIC = "table_static";
+ static final String T_COMPLEX = "table_complex";
+ static final String T_FROZEN = "table_frozen";
+ static final String T_INDEX = "table_indexed";
+ static final String INDEX = "idx";
+ static final String TYPE = "complex_type";
+
+ static final int NODE_1_MAX_TTL_KEY_OFFSET = 1000;
+ static final int NODE_2_MAX_TTL_KEY_OFFSET = 2000;
+ static final int NODE_1_MIXED_TTL_KEY_OFFSET = 3000;
+ static final int NODE_2_MIXED_TTL_KEY_OFFSET = 4000;
+
+ enum Step
+ {
+ NODE1_PREV_NODE2_PREV,
+ NODE1_40_NODE2_PREV,
+ NODE1_UPGRADING_NODE2_PREV,
+ NODE1_40_NODE2_40,
+ NODE1_UPGRADING_NODE2_40,
+ NODE1_UPGRADING_NODE2_UPGRADING,
+ NODE1_NONE_NODE2_UPGRADING,
+ NODE1_NONE_NODE2_NONE,
+ }
+
+ static volatile long clusterStatupTime = 0;
+
+ static void testTTLOverflow(RunOnClusterAndNode runAfterNodeUpgrade) throws Throwable
+ {
+ new TestCase()
+ .nodes(2)
+ .nodesToUpgradeOrdered(1, 2)
+ .upgradesToCurrentFrom(v40)
+ .setup(cluster -> {
+ cluster.schemaChange(String.format("CREATE TABLE %s.%s (k int PRIMARY KEY, v1 int, v2 int)", KEYSPACE, T_REGULAR));
+ cluster.schemaChange(String.format("CREATE TABLE %s.%s (k int, c int, v1 int, v2 int, PRIMARY KEY (k, c))", KEYSPACE, T_CLUST));
+ cluster.schemaChange(String.format("CREATE TABLE %s.%s (k int, c int, v1 int static, v2 int, PRIMARY KEY (k, c))", KEYSPACE, T_STATIC));
+ cluster.schemaChange(String.format("CREATE TYPE %s.%s (a int, b int)", KEYSPACE, TYPE));
+ cluster.schemaChange(String.format("CREATE TABLE %s.%s (k int PRIMARY KEY, v1 %s, v2 int)", KEYSPACE, T_COMPLEX, TYPE));
+ cluster.schemaChange(String.format("CREATE TABLE %s.%s (k int PRIMARY KEY, v1 frozen<%s>, v2 int)", KEYSPACE, T_FROZEN, TYPE));
+ cluster.schemaChange(String.format("CREATE TABLE %s.%s (k int PRIMARY KEY, v1 int, v2 int)", KEYSPACE, T_INDEX));
+ cluster.schemaChange(String.format("CREATE INDEX %s ON %s.%s (v1)", INDEX, KEYSPACE, T_INDEX));
+
+ cluster.disableAutoCompaction(KEYSPACE);
+ clusterStatupTime = Clock.Global.currentTimeMillis();
+ verify(Step.NODE1_PREV_NODE2_PREV, cluster, true);
+ })
+ .runAfterNodeUpgrade(runAfterNodeUpgrade)
+ .run();
+ }
+
+ /**
+ * Verifies that the TTL overflow policy triggers at the correct limit for a variety types
+ * @param step the step in the upgrade process (manily use a unique primary key for each verification)
+ * @param cluster the cluster
+ * @param expectPolicyTriggerAt2038 when true, we expect the overflow policy to trigger at 2038 and attempts to set
+ * a TTL which would result in expiration date after 2038 to fail. Otherwise, the
+ * allowed expiration date is 2106, and we cannot test that for now because of
+ * {@link Attributes#MAX_TTL} limit of 20 years.
+ */
+ static void verify(Step step, UpgradeableCluster cluster, boolean expectPolicyTriggerAt2038)
+ {
+ insert(cluster, step.ordinal(), expectPolicyTriggerAt2038);
+ query(cluster, step.ordinal(), expectPolicyTriggerAt2038);
+ cluster.coordinator(1).instance().flush(KEYSPACE);
+ query(cluster, step.ordinal(), expectPolicyTriggerAt2038);
+ cluster.coordinator(2).instance().flush(KEYSPACE);
+ query(cluster, step.ordinal(), expectPolicyTriggerAt2038);
+ }
+
+ private static void insert(UpgradeableCluster cluster, int step, boolean expectPolicyTriggerAt2038)
+ {
+ BiConsumer execute = (c, q) -> {
+ if (expectPolicyTriggerAt2038)
+ assertPolicyTriggersAt2038(c, q);
+ else
+ c.execute(q, ALL);
+ };
+
+ inserts(step + NODE_1_MAX_TTL_KEY_OFFSET, Attributes.MAX_TTL).forEach(q -> execute.accept(cluster.coordinator(1), q));
+ inserts(step + NODE_2_MAX_TTL_KEY_OFFSET, Attributes.MAX_TTL).forEach(q -> execute.accept(cluster.coordinator(1), q));
+ inserts(step + NODE_1_MIXED_TTL_KEY_OFFSET, SMALL_TTL).forEach(q -> cluster.coordinator(1).execute(q, ALL));
+ inserts(step + NODE_2_MIXED_TTL_KEY_OFFSET, SMALL_TTL).forEach(q -> cluster.coordinator(2).execute(q, ALL));
+ v1Updates(step + NODE_1_MIXED_TTL_KEY_OFFSET, Attributes.MAX_TTL).forEach(q -> execute.accept(cluster.coordinator(1), q));
+ v1Updates(step + NODE_2_MIXED_TTL_KEY_OFFSET, Attributes.MAX_TTL).forEach(q -> execute.accept(cluster.coordinator(2), q));
+ }
+
+ private static int getTTL(Object[][] result)
+ {
+ Object r = result[0][0];
+ if (r instanceof Number)
+ return ((Number) r).intValue();
+ else
+ return ((List extends Number>) r).get(0).intValue();
+ }
+
+ private static void query(UpgradeableCluster cluster, int step, boolean expectPolicyTriggerAt2038)
+ {
+ BiConsumer verifyQuery = (q, expectedTTL) -> {
+ int ttlLocal1 = getTTL(cluster.coordinator(1).execute(q, LOCAL_ONE));
+ int ttlLocal2 = getTTL(cluster.coordinator(2).execute(q, LOCAL_ONE));
+ int ttlAll1 = getTTL(cluster.coordinator(1).execute(q, ALL));
+ int ttlAll2 = getTTL(cluster.coordinator(2).execute(q, ALL));
+ long t1 = Clock.Global.currentTimeMillis();
+ int delta = (int) Math.max(1 + (t1 - clusterStatupTime) / 1000, 1);
+ assertThat(ttlLocal1).describedAs("TTL from query %s", q).isCloseTo(expectedTTL, Offset.offset(delta));
+ assertThat(ttlLocal2).describedAs("TTL from query %s", q).isCloseTo(expectedTTL, Offset.offset(delta));
+ assertThat(ttlAll1).describedAs("TTL from query %s", q).isCloseTo(expectedTTL, Offset.offset(delta));
+ assertThat(ttlAll2).describedAs("TTL from query %s", q).isCloseTo(expectedTTL, Offset.offset(delta));
+ };
+
+ if (!expectPolicyTriggerAt2038)
+ {
+ queries(step + NODE_1_MAX_TTL_KEY_OFFSET, "v1").forEach(q -> verifyQuery.accept(q, Attributes.MAX_TTL));
+ queries(step + NODE_2_MAX_TTL_KEY_OFFSET, "v1").forEach(q -> verifyQuery.accept(q, Attributes.MAX_TTL));
+ queries(step + NODE_1_MAX_TTL_KEY_OFFSET, "v2").forEach(q -> verifyQuery.accept(q, Attributes.MAX_TTL));
+ queries(step + NODE_2_MAX_TTL_KEY_OFFSET, "v2").forEach(q -> verifyQuery.accept(q, Attributes.MAX_TTL));
+ queries(step + NODE_1_MIXED_TTL_KEY_OFFSET, "v1").forEach(q -> verifyQuery.accept(q, Attributes.MAX_TTL));
+ queries(step + NODE_2_MIXED_TTL_KEY_OFFSET, "v1").forEach(q -> verifyQuery.accept(q, Attributes.MAX_TTL));
+ }
+ queries(step + NODE_1_MIXED_TTL_KEY_OFFSET, "v2").forEach(q -> verifyQuery.accept(q, SMALL_TTL));
+ queries(step + NODE_2_MIXED_TTL_KEY_OFFSET, "v2").forEach(q -> verifyQuery.accept(q, SMALL_TTL));
+ }
+
+ private static Stream inserts(int key, int ttl)
+ {
+ return Stream.of(String.format("INSERT INTO %s.%s (k, v1, v2) VALUES (%d, %d, %d) USING TTL %d", KEYSPACE, T_REGULAR, key, key, key, ttl),
+ String.format("INSERT INTO %s.%s (k, c, v1, v2) VALUES (%d, %d, %d, %d) USING TTL %d", KEYSPACE, T_CLUST, key, key, key, key, ttl),
+ String.format("INSERT INTO %s.%s (k, c, v1, v2) VALUES (%d, %d, %d, %d) USING TTL %d", KEYSPACE, T_STATIC, key, key, key, key, ttl),
+ String.format("INSERT INTO %s.%s (k, v1, v2) VALUES (%d, {a: %d, b: %d}, %d) USING TTL %d", KEYSPACE, T_COMPLEX, key, key, key, key, ttl),
+ String.format("INSERT INTO %s.%s (k, v1, v2) VALUES (%d, {a: %d, b: %d}, %d) USING TTL %d", KEYSPACE, T_FROZEN, key, key, key, key, ttl),
+ String.format("INSERT INTO %s.%s (k, v1, v2) VALUES (%d, %d, %d) USING TTL %d", KEYSPACE, T_INDEX, key, key, key, ttl));
+ }
+
+ private static Stream v1Updates(int key, int ttl)
+ {
+ return Stream.of(String.format("UPDATE %s.%s USING TTL %d SET v1 = %d WHERE k = %d", KEYSPACE, T_REGULAR, ttl, key * 100, key),
+ String.format("UPDATE %s.%s USING TTL %d SET v1 = %d WHERE k = %d AND c = %d", KEYSPACE, T_CLUST, ttl, key * 100, key, key),
+ String.format("UPDATE %s.%s USING TTL %d SET v1 = %d WHERE k = %d", KEYSPACE, T_STATIC, ttl, key * 100, key),
+ String.format("UPDATE %s.%s USING TTL %d SET v1 = {a: %d, b: %d} WHERE k = %d", KEYSPACE, T_COMPLEX, ttl, key * 100, key * 100, key),
+ String.format("UPDATE %s.%s USING TTL %d SET v1 = {a: %d, b: %d} WHERE k = %d", KEYSPACE, T_FROZEN, ttl, key * 100, key * 100, key),
+ String.format("UPDATE %s.%s USING TTL %d SET v1 = %d WHERE k = %d", KEYSPACE, T_INDEX, ttl, key * 100, key));
+ }
+
+ private static Stream queries(int key, String col)
+ {
+ return Stream.of(String.format("SELECT ttl(%s) FROM %s.%s WHERE k = %d", col, KEYSPACE, T_REGULAR, key),
+ String.format("SELECT ttl(%s) FROM %s.%s WHERE k = %d AND c = %d", col, KEYSPACE, T_CLUST, key, key),
+ String.format("SELECT ttl(%s) FROM %s.%s WHERE k = %d", col, KEYSPACE, T_STATIC, key),
+ String.format("SELECT ttl(%s) FROM %s.%s WHERE k = %d", col, KEYSPACE, T_COMPLEX, key),
+ String.format("SELECT ttl(%s) FROM %s.%s WHERE k = %d", col, KEYSPACE, T_FROZEN, key),
+ String.format("SELECT ttl(%s) FROM %s.%s WHERE k = %d", col, KEYSPACE, T_INDEX, key));
+ }
+
+ static void restartNodeWithCompatibilityMode(UpgradeableCluster cluster, int node, StorageCompatibilityMode mode) throws Throwable
+ {
+ cluster.get(node).shutdown().get();
+ cluster.get(node).config().set("storage_compatibility_mode", mode.toString());
+ cluster.get(node).startup();
+ }
+
+ private static void assertPolicyTriggersAt2038(ICoordinator coordinator, String query)
+ {
+ Assertions.assertThatThrownBy(() -> coordinator.execute(query, ALL))
+ .hasMessageContaining("exceeds maximum supported expiration date")
+ .hasMessageContaining("2038");
+ }
+}
diff --git a/test/unit/org/apache/cassandra/cql3/validation/operations/TTLTest.java b/test/unit/org/apache/cassandra/cql3/validation/operations/TTLTest.java
index 8a6417d37d29..609f17a5ee5d 100644
--- a/test/unit/org/apache/cassandra/cql3/validation/operations/TTLTest.java
+++ b/test/unit/org/apache/cassandra/cql3/validation/operations/TTLTest.java
@@ -22,6 +22,7 @@
import java.io.IOException;
import org.junit.After;
+import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
@@ -33,11 +34,11 @@
import org.apache.cassandra.cql3.UntypedResultSet;
import org.apache.cassandra.db.ColumnFamilyStore;
import org.apache.cassandra.db.ExpirationDateOverflowHandling;
-import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.ExpirationDateOverflowHandling.ExpirationDateOverflowPolicy;
+import org.apache.cassandra.db.Keyspace;
import org.apache.cassandra.db.rows.AbstractCell;
-import org.apache.cassandra.distributed.shared.WithProperties;
import org.apache.cassandra.db.rows.Cell;
+import org.apache.cassandra.distributed.shared.WithProperties;
import org.apache.cassandra.exceptions.InvalidRequestException;
import org.apache.cassandra.io.sstable.IScrubber;
import org.apache.cassandra.io.util.File;
@@ -47,9 +48,11 @@
import org.apache.cassandra.tools.ToolRunner;
import org.apache.cassandra.tools.ToolRunner.ToolResult;
import org.apache.cassandra.utils.Clock;
-import org.assertj.core.api.Assertions;
+import org.apache.cassandra.utils.StorageCompatibilityMode;
+import org.assertj.core.data.Offset;
import static org.apache.cassandra.config.CassandraRelevantProperties.TEST_UTIL_ALLOW_TOOL_REINIT_FOR_TEST;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@@ -169,63 +172,76 @@ public void testTTLDefaultLimit() throws Throwable
@Test
public void testCapWarnExpirationOverflowPolicy() throws Throwable
{
- if (overflowPoliciesApply)
- // We don't test that the actual warn is logged here, only on dtest
- testCapExpirationDateOverflowPolicy(ExpirationDateOverflowHandling.ExpirationDateOverflowPolicy.CAP);
+ Assume.assumeTrue(overflowPoliciesApply);
+ // We don't test that the actual warn is logged here, only on dtest
+ testCapExpirationDateOverflowPolicy(ExpirationDateOverflowHandling.ExpirationDateOverflowPolicy.CAP);
}
@Test
public void testCapNoWarnExpirationOverflowPolicy() throws Throwable
{
- if (overflowPoliciesApply)
- testCapExpirationDateOverflowPolicy(ExpirationDateOverflowHandling.ExpirationDateOverflowPolicy.CAP_NOWARN);
+ Assume.assumeTrue(overflowPoliciesApply);
+ testCapExpirationDateOverflowPolicy(ExpirationDateOverflowHandling.ExpirationDateOverflowPolicy.CAP_NOWARN);
}
@Test
public void testCapNoWarnExpirationOverflowPolicyDefaultTTL() throws Throwable
{
- if (overflowPoliciesApply)
- {
- ExpirationDateOverflowPolicy origPolicy = ExpirationDateOverflowHandling.policy;
- ExpirationDateOverflowHandling.policy = ExpirationDateOverflowPolicy.CAP_NOWARN;
- createTable("CREATE TABLE %s (k int PRIMARY KEY, i int) WITH default_time_to_live=" + MAX_TTL);
- execute("INSERT INTO %s (k, i) VALUES (1, 1)");
- checkTTLIsCapped("i");
- ExpirationDateOverflowHandling.policy = origPolicy;
- }
+ Assume.assumeTrue(overflowPoliciesApply);
+ ExpirationDateOverflowPolicy origPolicy = ExpirationDateOverflowHandling.policy;
+ ExpirationDateOverflowHandling.policy = ExpirationDateOverflowPolicy.CAP_NOWARN;
+ createTable("CREATE TABLE %s (k int PRIMARY KEY, i int) WITH default_time_to_live=" + MAX_TTL);
+ execute("INSERT INTO %s (k, i) VALUES (1, 1)");
+ checkTTLIsCapped("i");
+ ExpirationDateOverflowHandling.policy = origPolicy;
}
@Test
public void testRejectExpirationOverflowPolicy() throws Throwable
{
- if (overflowPoliciesApply)
+ Assume.assumeTrue(overflowPoliciesApply);
+ ExpirationDateOverflowPolicy origPolicy = ExpirationDateOverflowHandling.policy;
+ ExpirationDateOverflowHandling.policy = ExpirationDateOverflowPolicy.REJECT;
+
+ createTable("CREATE TABLE %s (k int PRIMARY KEY, i int)");
+ try
+ {
+ execute("INSERT INTO %s (k, i) VALUES (1, 1) USING TTL " + MAX_TTL);
+ fail();
+ }
+ catch (InvalidRequestException e)
+ {
+ assertTrue(e.getMessage().contains("exceeds maximum supported expiration date"));
+ }
+ try
{
- ExpirationDateOverflowPolicy origPolicy = ExpirationDateOverflowHandling.policy;
- ExpirationDateOverflowHandling.policy = ExpirationDateOverflowPolicy.REJECT;
+ createTable("CREATE TABLE %s (k int PRIMARY KEY, i int) WITH default_time_to_live=" + MAX_TTL);
+ execute("INSERT INTO %s (k, i) VALUES (1, 1)");
+ fail();
+ }
+ catch (InvalidRequestException e)
+ {
+ assertTrue(e.getMessage().contains("exceeds maximum supported expiration date"));
+ }
- createTable("CREATE TABLE %s (k int PRIMARY KEY, i int)");
- try
- {
- execute("INSERT INTO %s (k, i) VALUES (1, 1) USING TTL " + MAX_TTL);
- fail();
- }
- catch (InvalidRequestException e)
- {
- assertTrue(e.getMessage().contains("exceeds maximum supported expiration date"));
- }
- try
- {
- createTable("CREATE TABLE %s (k int PRIMARY KEY, i int) WITH default_time_to_live=" + MAX_TTL);
- execute("INSERT INTO %s (k, i) VALUES (1, 1)");
- fail();
- }
- catch (InvalidRequestException e)
- {
- assertTrue(e.getMessage().contains("exceeds maximum supported expiration date"));
- }
+ ExpirationDateOverflowHandling.policy = origPolicy;
+ }
- ExpirationDateOverflowHandling.policy = origPolicy;
- }
+ @Test
+ public void testImprovedMaxTTL()
+ {
+ Assume.assumeTrue(DatabaseDescriptor.getStorageCompatibilityMode() != StorageCompatibilityMode.CASSANDRA_4);
+ createTable("CREATE TABLE %s (k int PRIMARY KEY, i int)");
+ disableCompaction();
+ long t0 = Clock.Global.currentTimeMillis();
+ execute("INSERT INTO %s (k, i) VALUES (1, 1) USING TTL " + MAX_TTL);
+ int ttlMemtable = execute("SELECT TTL(i) FROM %s WHERE k = 1").one().getInt("ttl(i)");
+ flush(true);
+ int ttlSSTable = execute("SELECT TTL(i) FROM %s WHERE k = 1").one().getInt("ttl(i)");
+ long t1 = Clock.Global.currentTimeMillis();
+ int delta = (int) Math.max(1, (t1 - t0) / 1000);
+ assertThat(ttlMemtable).isCloseTo(MAX_TTL, Offset.offset(delta));
+ assertThat(ttlSSTable).isCloseTo(MAX_TTL, Offset.offset(delta));
}
@Test
@@ -436,11 +452,11 @@ public void testRecoverOverflowedExpirationWithScrub(boolean simple, boolean clu
tool = ToolRunner.invokeClass(StandaloneScrubber.class, KEYSPACE, cfs.name);
tool.assertOnCleanExit();
- Assertions.assertThat(tool.getStdout()).contains("Pre-scrub sstables snapshotted into");
+ assertThat(tool.getStdout()).contains("Pre-scrub sstables snapshotted into");
if (reinsertOverflowedTTL)
- Assertions.assertThat(tool.getStdout()).contains("Fixed 2 rows with overflowed local deletion time.");
+ assertThat(tool.getStdout()).contains("Fixed 2 rows with overflowed local deletion time.");
else
- Assertions.assertThat(tool.getStdout()).contains("No valid partitions found while scrubbing");
+ assertThat(tool.getStdout()).contains("No valid partitions found while scrubbing");
}
}