Skip to content

Commit

Permalink
Backport CASSANDRA-16418 to 3.x
Browse files Browse the repository at this point in the history
When a node is decommissioned, it triggers data transfer to other nodes.
During this transfer process, receiving nodes temporarily hold token ranges in a pending state.
However, the current cleanup process doesn't account for these pending ranges when calculating token ownership,
leading to inadvertent cleanup of data already stored in SSTables.
To address this issue, this patch introduces two changes.
Firstly, it backports CASSANDRA-16418, introducing a preventive check in `StorageService#forceKeyspaceCleanup`.
This check disallows the initiation of cleanup when a node contains any pending ranges for the requested keyspace.
Secondly, it reintroduces a similar condition to test for the existence of pending ranges in `CompactionManager#performCleanup`.
This ensures the safety of this API as well.

patch by Szymon Miezal; reviewed by TBD for CASSANDRA-18824
  • Loading branch information
szymon-miezal committed Dec 12, 2023
1 parent 5bf1d2f commit 7629f8f
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 6 deletions.
14 changes: 11 additions & 3 deletions src/java/org/apache/cassandra/db/compaction/CompactionManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -452,12 +452,15 @@ public void execute(LifecycleTransaction txn) throws IOException
public AllSSTableOpStatus performCleanup(final ColumnFamilyStore cfStore, int jobs) throws InterruptedException, ExecutionException
{
assert !cfStore.isIndex();
Keyspace keyspace = cfStore.keyspace;
if (!StorageService.instance.isJoined())

if (nodeHasPendingRangesForKeyspace(cfStore))
{
logger.info("Cleanup cannot run before a node has joined the ring");
logger.info("Cleanup cannot run while node has pending ranges for keyspace {} table {}, wait for node addition/decommission to complete and try again", cfStore.keyspace.getName(), cfStore.getTableName());
return AllSSTableOpStatus.ABORTED;
}

Keyspace keyspace = cfStore.keyspace;

// if local ranges is empty, it means no data should remain
final Collection<Range<Token>> ranges = StorageService.instance.getLocalRanges(keyspace.getName());
final boolean hasIndexes = cfStore.indexManager.hasIndexes();
Expand Down Expand Up @@ -499,6 +502,11 @@ public void execute(LifecycleTransaction txn) throws IOException
}, jobs, OperationType.CLEANUP);
}

private boolean nodeHasPendingRangesForKeyspace(ColumnFamilyStore cfs)
{
return !StorageService.instance.getTokenMetadata().getPendingRanges(cfs.keyspace.getName(), FBUtilities.getBroadcastAddress()).isEmpty();
}

/**
* Submit anti-compactions for a collection of SSTables over a set of repaired ranges and marks corresponding SSTables
* as repaired.
Expand Down
3 changes: 3 additions & 0 deletions src/java/org/apache/cassandra/service/StorageService.java
Original file line number Diff line number Diff line change
Expand Up @@ -2996,6 +2996,9 @@ public int forceKeyspaceCleanup(int jobs, String keyspaceName, String... tables)
if (Schema.isLocalSystemKeyspace(keyspaceName))
throw new RuntimeException("Cleanup of the system keyspace is neither necessary nor wise");

if (tokenMetadata.getPendingRanges(keyspaceName, FBUtilities.getBroadcastAddress()).size() > 0)
throw new RuntimeException("Node is involved in cluster membership changes. Not safe to run cleanup.");

CompactionManager.AllSSTableOpStatus status = CompactionManager.AllSSTableOpStatus.SUCCESSFUL;
for (ColumnFamilyStore cfStore : getValidColumnFamilies(false, false, keyspaceName, tables))
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,22 @@ public static InstanceAction statusToBootstrap(IInvokableInstance newNode)
};
}

public static InstanceAction statusToDecommission(IInvokableInstance newNode)
{
return (instance) ->
{
changeGossipState(instance,
newNode,
Arrays.asList(tokens(newNode),
statusLeaving(newNode)));
};
}

public static VersionedApplicationState statusLeaving(IInvokableInstance instance)
{
return versionedToken(instance, ApplicationState.STATUS, (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).leaving(tokens));
}

public static void withProperty(String prop, String value, Runnable r)
{
String before = System.getProperty(prop);
Expand Down Expand Up @@ -139,4 +155,4 @@ public static VersionedApplicationState statusBootstrapping(IInvokableInstance i
{
return versionedToken(instance, ApplicationState.STATUS, (partitioner, tokens) -> new VersionedValue.VersionedValueFactory(partitioner).bootstrapping(tokens));
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.distributed.test;

import org.junit.Test;

import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.NodeToolResult;
import org.apache.cassandra.distributed.shared.NetworkTopology;

import static org.apache.cassandra.distributed.action.GossipHelper.statusToBootstrap;
import static org.apache.cassandra.distributed.action.GossipHelper.statusToDecommission;
import static org.apache.cassandra.distributed.action.GossipHelper.withProperty;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
import static org.apache.cassandra.distributed.api.TokenSupplier.evenlyDistributedTokens;
import static org.junit.Assert.assertEquals;

public class CleanupFailureTest extends TestBaseImpl
{
@Test
public void cleanupDuringDecommissionTest() throws Throwable
{
try (Cluster cluster = builder().withNodes(2)
.withTokenSupplier(evenlyDistributedTokens(2))
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(2, "dc0", "rack0"))
.withConfig(config -> config.with(NETWORK, GOSSIP))
.start())
{
IInvokableInstance nodeToDecommission = cluster.get(1);
IInvokableInstance nodeToRemainInCluster = cluster.get(2);

// Start decomission on nodeToDecommission
cluster.forEach(statusToDecommission(nodeToDecommission));

// Add data to cluster while node is decomissioning
int numRows = 100;
createKeyspaceWithTable(cluster, 1);
insertData(cluster, 1, numRows, ConsistencyLevel.ONE);

// Check data before cleanup on nodeToRemainInCluster
assertEquals(100, nodeToRemainInCluster.executeInternal("SELECT * FROM " + KEYSPACE + ".tbl").length);

// Run cleanup on nodeToRemainInCluster
NodeToolResult result = nodeToRemainInCluster.nodetoolResult("cleanup");
result.asserts().failure();
result.asserts().stderrContains("Node is involved in cluster membership changes. Not safe to run cleanup.");

// Check data after cleanup on nodeToRemainInCluster
assertEquals(100, nodeToRemainInCluster.executeInternal("SELECT * FROM " + KEYSPACE + ".tbl").length);
}
}

@Test
public void cleanupDuringBootstrapTest() throws Throwable
{
int originalNodeCount = 1;
int expandedNodeCount = originalNodeCount + 1;

try (Cluster cluster = builder().withNodes(originalNodeCount)
.withTokenSupplier(evenlyDistributedTokens(expandedNodeCount))
.withNodeIdTopology(NetworkTopology.singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0"))
.withConfig(config -> config.with(NETWORK, GOSSIP))
.start())
{
IInstanceConfig config = cluster.newInstanceConfig();
IInvokableInstance bootstrappingNode = cluster.bootstrap(config);
withProperty("cassandra.join_ring", "false",
() -> bootstrappingNode.startup(cluster));

// Start decomission on bootstrappingNode
cluster.forEach(statusToBootstrap(bootstrappingNode));

// Add data to cluster while node is bootstrapping
int numRows = 100;
createKeyspaceWithTable(cluster, 2);
insertData(cluster, 1, numRows, ConsistencyLevel.ONE);

// Check data before cleanup on bootstrappingNode
assertEquals(numRows, bootstrappingNode.executeInternal("SELECT * FROM " + KEYSPACE + ".tbl").length);

// Run cleanup on bootstrappingNode
NodeToolResult result = bootstrappingNode.nodetoolResult("cleanup");
result.asserts().stderrContains("Node is involved in cluster membership changes. Not safe to run cleanup.");

// Check data after cleanup on bootstrappingNode
assertEquals(numRows, bootstrappingNode.executeInternal("SELECT * FROM " + KEYSPACE + ".tbl").length);
}
}

private void createKeyspaceWithTable(Cluster cluster, int rf)
{
cluster.schemaChange("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor': " + rf + "};");
cluster.schemaChange("CREATE TABLE IF NOT EXISTS " + KEYSPACE + ".tbl (pk int, ck int, v int, PRIMARY KEY (pk, ck))");
}

private void insertData(Cluster cluster, int node, int numberOfRows, ConsistencyLevel cl)
{
for (int i = 0; i < numberOfRows; i++)
{
cluster.coordinator(node).execute("INSERT INTO " + KEYSPACE + ".tbl (pk, ck, v) VALUES (?, ?, ?)", cl, i, i, i);
}
cluster.forEach(c -> c.flush(KEYSPACE));
}
}
51 changes: 49 additions & 2 deletions test/unit/org/apache/cassandra/db/CleanupTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

import com.google.common.collect.ImmutableList;
import com.google.common.collect.Sets;
import org.junit.BeforeClass;
import org.junit.Test;
Expand All @@ -41,6 +42,11 @@
import org.apache.cassandra.Util;
import org.apache.cassandra.config.ColumnDefinition;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.locator.AbstractReplicationStrategy;
import org.apache.cassandra.locator.IEndpointSnitch;
import org.apache.cassandra.locator.PendingRangeMaps;
import org.apache.cassandra.locator.PropertyFileSnitch;
import org.apache.cassandra.locator.SimpleStrategy;
import org.apache.cassandra.schema.KeyspaceMetadata;
import org.apache.cassandra.cql3.Operator;
import org.apache.cassandra.db.compaction.CompactionManager;
Expand All @@ -56,6 +62,8 @@
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.utils.ByteBufferUtil;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.ThrowableAssert.catchThrowable;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

Expand Down Expand Up @@ -116,9 +124,11 @@ public String getDatacenter(InetAddress endpoint)
}

@Test
public void testCleanup() throws ExecutionException, InterruptedException
public void testCleanup() throws ExecutionException, InterruptedException, UnknownHostException
{
StorageService.instance.getTokenMetadata().clearUnsafe();
TokenMetadata tmd = StorageService.instance.getTokenMetadata();
tmd.clearUnsafe();
tmd.updateNormalToken(token(new byte[]{ 50 }), InetAddress.getByName("127.0.0.1"));

Keyspace keyspace = Keyspace.open(KEYSPACE1);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);
Expand Down Expand Up @@ -330,6 +340,43 @@ public void testNeedsCleanup() throws Exception
assertEquals(testCase.getKey(), CompactionManager.needsCleanup(ssTable, testCase.getValue()));
}
}

@Test
public void testCleanupIsAbortedWhenNodeHasPendingRanges() throws ExecutionException, InterruptedException, UnknownHostException
{
// given
StorageService.instance.getTokenMetadata().clearUnsafe();

Keyspace keyspace = Keyspace.open(KEYSPACE1);
ColumnFamilyStore cfs = keyspace.getColumnFamilyStore(CF_STANDARD1);

fillCF(cfs, "val", LOOPS);
assertEquals(LOOPS, Util.getAll(Util.cmd(cfs).build()).size());

Range<Token> range = range(new BytesToken(new byte[]{0}), new BytesToken(new byte[]{1}));
givenPendingRange(cfs, range);

// when
CompactionManager.AllSSTableOpStatus status = CompactionManager.instance.performCleanup(cfs, 2);

// then
assertEquals("cleanup should be aborted", CompactionManager.AllSSTableOpStatus.ABORTED, status);
}

private void givenPendingRange(ColumnFamilyStore cfs, Range<Token> range) throws UnknownHostException
{
StorageService.instance.getTokenMetadata().calculatePendingRanges(createStrategy(cfs.keyspace.getName()), cfs.keyspace.getName());
PendingRangeMaps ranges = StorageService.instance.getTokenMetadata().getPendingRanges(cfs.keyspace.getName());
ranges.addPendingRange(range, InetAddress.getByName("127.0.0.1"));
}

private AbstractReplicationStrategy createStrategy(String keyspace)
{
IEndpointSnitch snitch = new PropertyFileSnitch();
DatabaseDescriptor.setEndpointSnitch(snitch);
return new SimpleStrategy(keyspace, new TokenMetadata(), DatabaseDescriptor.getEndpointSnitch(), Collections.emptyMap());
}

private static BytesToken token(byte ... value)
{
return new BytesToken(value);
Expand Down

0 comments on commit 7629f8f

Please sign in to comment.