From e2a6c99310aa93ba3506ca8f603ae1039372f533 Mon Sep 17 00:00:00 2001 From: Stefan Miklosovic Date: Wed, 14 Jun 2023 17:43:50 +0200 Subject: [PATCH] Expose bootstrap and decommission state to nodetool info patch by Stefan Miklosovic; reviewed by Brandon Williams CASSANDRA-18555 Co-authored-by: Jaydeepkumar Chovatia --- CHANGES.txt | 1 + NEWS.txt | 1 + .../cassandra/service/StorageService.java | 121 +++++++--- .../service/StorageServiceMBean.java | 19 ++ .../tools/nodetool/Decommission.java | 10 + .../apache/cassandra/tools/nodetool/Info.java | 4 + .../distributed/test/DecommissionTest.java | 220 ++++++++++++++++++ 7 files changed, 339 insertions(+), 37 deletions(-) create mode 100644 test/distributed/org/apache/cassandra/distributed/test/DecommissionTest.java diff --git a/CHANGES.txt b/CHANGES.txt index 6437d2796dec..334f829bfc45 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -1,4 +1,5 @@ 5.0 + * Expose bootstrap and decommission state to nodetool info (CASSANDRA-18555) * Fix SSTabledump errors when dumping data from index (CASSANDRA-17698) * Avoid unnecessary deserialization of terminal arguments when executing CQL functions (CASSANDRA-18566) * Remove dependency on pytz library for setting CQLSH timezones on Python version >= 3.9 (CASSANDRA-17433) diff --git a/NEWS.txt b/NEWS.txt index 43e980582e72..7027b6e48946 100644 --- a/NEWS.txt +++ b/NEWS.txt @@ -164,6 +164,7 @@ New features - Added `sstablepartitions` offline tool to find large partitions in sstables. - `cassandra-stress` has a new option called '-jmx' which enables a user to pass username and password to JMX (CASSANDRA-18544) - It is possible to read all credentials for `cassandra-stress` from a file via option `-credentials-file` (CASSANDRA-18544) + - nodetool info displays bootstrap state a node is in as well as if it was decommissioned or if it failed to decommission (CASSANDRA-18555) Upgrading --------- diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index 5f99fef4bcf1..1773673f6104 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -263,6 +263,8 @@ import static org.apache.cassandra.net.Verb.REPLICATION_DONE_REQ; import static org.apache.cassandra.service.ActiveRepairService.ParentRepairStatus; import static org.apache.cassandra.service.ActiveRepairService.repairCommandExecutor; +import static org.apache.cassandra.service.StorageService.Mode.DECOMMISSIONED; +import static org.apache.cassandra.service.StorageService.Mode.DECOMMISSION_FAILED; import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis; import static org.apache.cassandra.utils.Clock.Global.nanoTime; import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort; @@ -414,7 +416,7 @@ public Collection> getPrimaryRangesWithinDC(String keyspace) /* the probability for tracing any particular request, 0 disables tracing and 1 enables for all */ private double traceProbability = 0.0; - public enum Mode { STARTING, NORMAL, JOINING, LEAVING, DECOMMISSIONED, MOVING, DRAINING, DRAINED } + public enum Mode { STARTING, NORMAL, JOINING, LEAVING, DECOMMISSIONED, DECOMMISSION_FAILED, MOVING, DRAINING, DRAINED } private volatile Mode operationMode = Mode.STARTING; /* Used for tracking drain progress */ @@ -626,7 +628,7 @@ public void stopTransports() * they get the Gossip shutdown message, so even if * we don't get time to broadcast this, it is not a problem. * - * See {@link Gossiper#markAsShutdown(InetAddressAndPort)} + * See Gossiper.markAsShutdown(InetAddressAndPort) */ private void shutdownClientServers() { @@ -2157,7 +2159,8 @@ private void invalidateLocalRanges() /** * All MVs have been created during bootstrap, so mark them as built */ - private void markViewsAsBuilt() { + private void markViewsAsBuilt() + { for (String keyspace : Schema.instance.getUserKeyspaces().names()) { for (ViewMetadata view: Schema.instance.getKeyspaceMetadata(keyspace).views) @@ -2168,11 +2171,18 @@ private void markViewsAsBuilt() { /** * Called when bootstrap did finish successfully */ - private void bootstrapFinished() { + private void bootstrapFinished() + { markViewsAsBuilt(); isBootstrapMode = false; } + @Override + public String getBootstrapState() + { + return SystemKeyspace.getBootstrapState().name(); + } + public boolean resumeBootstrap() { if (isBootstrapMode && SystemKeyspace.bootstrapInProgress()) @@ -5128,18 +5138,32 @@ private void startLeaving() public void decommission(boolean force) throws InterruptedException { + if (operationMode == DECOMMISSIONED) + { + logger.info("This node was already decommissioned. There is no point in decommissioning it again."); + return; + } + + if (isDecommissioning()) + { + logger.info("This node is still decommissioning."); + return; + } + TokenMetadata metadata = tokenMetadata.cloneAfterAllLeft(); + // there is no point to do this logic again once node was decommissioning but failed to do so if (operationMode != Mode.LEAVING) { if (!tokenMetadata.isMember(FBUtilities.getBroadcastAddressAndPort())) throw new UnsupportedOperationException("local node is not a member of the token ring yet"); - if (metadata.getAllEndpoints().size() < 2) + if (metadata.getAllEndpoints().size() < 2 && metadata.getAllEndpoints().contains(FBUtilities.getBroadcastAddressAndPort())) throw new UnsupportedOperationException("no other normal nodes in the ring; decommission would be pointless"); - if (operationMode != Mode.NORMAL) + if (operationMode != Mode.NORMAL && operationMode != DECOMMISSION_FAILED) throw new UnsupportedOperationException("Node in " + operationMode + " state; wait for status to become normal or restart"); } + if (!isDecommissioning.compareAndSet(false, true)) - throw new IllegalStateException("Node is still decommissioning. Check nodetool netstats."); + throw new IllegalStateException("Node is still decommissioning. Check nodetool netstats or nodetool info."); if (logger.isDebugEnabled()) logger.debug("DECOMMISSIONING"); @@ -5150,27 +5174,35 @@ public void decommission(boolean force) throws InterruptedException String dc = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter(); - if (operationMode != Mode.LEAVING) // If we're already decommissioning there is no point checking RF/pending ranges + // If we're already decommissioning there is no point checking RF/pending ranges + if (operationMode != Mode.LEAVING) { int rf, numNodes; for (String keyspaceName : Schema.instance.getNonLocalStrategyKeyspaces().names()) { if (!force) { + boolean notEnoughLiveNodes = false; Keyspace keyspace = Keyspace.open(keyspaceName); if (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy) { NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) keyspace.getReplicationStrategy(); rf = strategy.getReplicationFactor(dc).allReplicas; - numNodes = metadata.getTopology().getDatacenterEndpoints().get(dc).size(); + Collection datacenterEndpoints = metadata.getTopology().getDatacenterEndpoints().get(dc); + numNodes = datacenterEndpoints.size(); + if (numNodes <= rf && datacenterEndpoints.contains(FBUtilities.getBroadcastAddressAndPort())) + notEnoughLiveNodes = true; } else { - numNodes = metadata.getAllEndpoints().size(); + Set allEndpoints = metadata.getAllEndpoints(); + numNodes = allEndpoints.size(); rf = keyspace.getReplicationStrategy().getReplicationFactor().allReplicas; + if (numNodes <= rf && allEndpoints.contains(FBUtilities.getBroadcastAddressAndPort())) + notEnoughLiveNodes = true; } - if (numNodes <= rf) + if (notEnoughLiveNodes) throw new UnsupportedOperationException("Not enough live nodes to maintain replication factor in keyspace " + keyspaceName + " (RF = " + rf + ", N = " + numNodes + ")." + " Perform a forceful decommission to ignore."); @@ -5182,42 +5214,48 @@ public void decommission(boolean force) throws InterruptedException } startLeaving(); - long timeout = Math.max(RING_DELAY_MILLIS, BatchlogManager.instance.getBatchlogTimeout()); + long timeout = Math.max(RING_DELAY_MILLIS, BatchlogManager.getBatchlogTimeout()); setMode(Mode.LEAVING, "sleeping " + timeout + " ms for batch processing and pending range setup", true); Thread.sleep(timeout); - Runnable finishLeaving = new Runnable() + unbootstrap(); + + // shutdown cql, gossip, messaging, Stage and set state to DECOMMISSIONED + + shutdownClientServers(); + Gossiper.instance.stop(); + try { - public void run() - { - shutdownClientServers(); - Gossiper.instance.stop(); - try - { - MessagingService.instance().shutdown(); - } - catch (IOError ioe) - { - logger.info("failed to shutdown message service: {}", ioe); - } + MessagingService.instance().shutdown(); + } + catch (IOError ioe) + { + logger.info("failed to shutdown message service", ioe); + } - Stage.shutdownNow(); - SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.DECOMMISSIONED); - setMode(Mode.DECOMMISSIONED, true); - // let op be responsible for killing the process - } - }; - unbootstrap(finishLeaving); + Stage.shutdownNow(); + SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.DECOMMISSIONED); + setMode(DECOMMISSIONED, true); + // let op be responsible for killing the process } catch (InterruptedException e) { - throw new UncheckedInterruptedException(e); + setMode(DECOMMISSION_FAILED, true); + logger.error("Node interrupted while decommissioning"); + throw new RuntimeException("Node interrupted while decommissioning"); } catch (ExecutionException e) { - logger.error("Error while decommissioning node ", e.getCause()); + setMode(DECOMMISSION_FAILED, true); + logger.error("Error while decommissioning node: {}", e.getCause().getMessage()); throw new RuntimeException("Error while decommissioning node: " + e.getCause().getMessage()); } + catch (Throwable t) + { + setMode(DECOMMISSION_FAILED, true); + logger.error("Error while decommissioning node: {}", t.getMessage()); + throw t; + } finally { isDecommissioning.set(false); @@ -5254,7 +5292,7 @@ public Supplier> prepareUnbootstrapStreaming() return () -> streamRanges(rangesToStream); } - private void unbootstrap(Runnable onFinish) throws ExecutionException, InterruptedException + private void unbootstrap() throws ExecutionException, InterruptedException { Supplier> startStreaming = prepareUnbootstrapStreaming(); @@ -5290,7 +5328,6 @@ private void unbootstrap(Runnable onFinish) throws ExecutionException, Interrupt hintsSuccess.get(); logger.debug("stream acks all received."); leaveRing(); - onFinish.run(); } private Future streamHints() @@ -5610,7 +5647,17 @@ public boolean isNormal() public boolean isDecommissioned() { - return operationMode == Mode.DECOMMISSIONED; + return operationMode == DECOMMISSIONED; + } + + public boolean isDecommissionFailed() + { + return operationMode == DECOMMISSION_FAILED; + } + + public boolean isDecommissioning() + { + return isDecommissioning.get(); } public String getDrainProgress() diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 154b0d86d303..8c3428e70344 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -489,6 +489,23 @@ default int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, */ public void decommission(boolean force) throws InterruptedException; + /** + * Returns whether a node has failed to decommission. + * + * The fact that this method returns false does not mean that there was an attempt to + * decommission this node which was successful. + * + * @return true if decommission of this node has failed, false otherwise + */ + public boolean isDecommissionFailed(); + + /** + * Returns whether a node is being decommissioned or not. + * + * @return true if this node is decommissioning, false otherwise + */ + public boolean isDecommissioning(); + /** * @param newToken token to move this node to. * This node will unload its data onto its neighbors, and bootstrap to the new token. @@ -1006,6 +1023,8 @@ default int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion, */ public boolean resumeBootstrap(); + public String getBootstrapState(); + /** Gets the concurrency settings for processing stages*/ static class StageConcurrency implements Serializable { diff --git a/src/java/org/apache/cassandra/tools/nodetool/Decommission.java b/src/java/org/apache/cassandra/tools/nodetool/Decommission.java index 98b6d5846c5f..de70932c5316 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/Decommission.java +++ b/src/java/org/apache/cassandra/tools/nodetool/Decommission.java @@ -37,6 +37,16 @@ public void execute(NodeProbe probe) { try { + if (probe.getStorageService().isDecommissioning()) + { + probe.output().out.println("This node is still decommissioning."); + return; + } + if ("DECOMMISSIONED".equals(probe.getStorageService().getBootstrapState())) + { + probe.output().out.println("Node was already decommissioned."); + return; + } probe.decommission(force); } catch (InterruptedException e) { diff --git a/src/java/org/apache/cassandra/tools/nodetool/Info.java b/src/java/org/apache/cassandra/tools/nodetool/Info.java index 5e0d87c7674c..72086fa56835 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/Info.java +++ b/src/java/org/apache/cassandra/tools/nodetool/Info.java @@ -177,6 +177,10 @@ public void execute(NodeProbe probe) { out.printf("%-23s: (node is not joined to the cluster)%n", "Token"); } + + out.printf("%-23s: %s%n", "Bootstrap state", probe.getStorageService().getBootstrapState()); + out.printf("%-23s: %s%n", "Decommissioning", probe.getStorageService().isDecommissioning()); + out.printf("%-23s: %s%n", "Decommission failed", probe.getStorageService().isDecommissionFailed()); } /** diff --git a/test/distributed/org/apache/cassandra/distributed/test/DecommissionTest.java b/test/distributed/org/apache/cassandra/distributed/test/DecommissionTest.java new file mode 100644 index 000000000000..66091da3ef9e --- /dev/null +++ b/test/distributed/org/apache/cassandra/distributed/test/DecommissionTest.java @@ -0,0 +1,220 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.cassandra.distributed.test; + +import java.util.concurrent.Callable; +import java.util.function.Supplier; + +import org.junit.Test; + +import net.bytebuddy.ByteBuddy; +import net.bytebuddy.dynamic.loading.ClassLoadingStrategy; +import net.bytebuddy.implementation.MethodDelegation; +import net.bytebuddy.implementation.bind.annotation.SuperCall; +import org.apache.cassandra.distributed.Cluster; +import org.apache.cassandra.distributed.api.IInvokableInstance; +import org.apache.cassandra.service.StorageService; +import org.apache.cassandra.streaming.StreamState; +import org.apache.cassandra.utils.concurrent.Future; + +import static net.bytebuddy.matcher.ElementMatchers.named; +import static org.apache.cassandra.db.SystemKeyspace.BootstrapState.COMPLETED; +import static org.apache.cassandra.db.SystemKeyspace.BootstrapState.DECOMMISSIONED; +import static org.apache.cassandra.distributed.api.Feature.GOSSIP; +import static org.apache.cassandra.distributed.api.Feature.NETWORK; +import static org.apache.cassandra.distributed.shared.ClusterUtils.stopUnchecked; +import static org.apache.cassandra.service.StorageService.Mode.DECOMMISSION_FAILED; +import static org.apache.cassandra.service.StorageService.Mode.NORMAL; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public class DecommissionTest extends TestBaseImpl +{ + @Test + public void testDecommission() throws Throwable + { + try (Cluster cluster = init(Cluster.build(2) + .withConfig(config -> config.with(GOSSIP) + .with(NETWORK)) + .withInstanceInitializer(DecommissionTest.BB::install) + .start())) + { + IInvokableInstance instance = cluster.get(1); + + instance.runOnInstance(() -> { + + assertEquals(COMPLETED.name(), StorageService.instance.getBootstrapState()); + + // pretend that decommissioning has failed in the middle + + try + { + StorageService.instance.decommission(true); + fail("the first attempt to decommission should fail"); + } + catch (Throwable t) + { + assertEquals("simulated error in prepareUnbootstrapStreaming", t.getMessage()); + } + + assertFalse(StorageService.instance.isDecommissioning()); + assertTrue(StorageService.instance.isDecommissionFailed()); + + // still COMPLETED, nothing has changed + assertEquals(COMPLETED.name(), StorageService.instance.getBootstrapState()); + + String operationMode = StorageService.instance.getOperationMode(); + assertEquals(DECOMMISSION_FAILED.name(), operationMode); + + // try to decommission again, now successfully + + try + { + StorageService.instance.decommission(true); + + // decommission was successful, so we reset failed decommission mode + assertFalse(StorageService.instance.isDecommissionFailed()); + + assertEquals(DECOMMISSIONED.name(), StorageService.instance.getBootstrapState()); + assertFalse(StorageService.instance.isDecommissioning()); + } + catch (Throwable t) + { + fail("the second decommission attempt should pass but it failed on: " + t.getMessage()); + } + + // check that decommissioning of already decommissioned node has no effect + + try + { + assertEquals(DECOMMISSIONED.name(), StorageService.instance.getBootstrapState()); + assertFalse(StorageService.instance.isDecommissionFailed()); + + StorageService.instance.decommission(true); + + assertEquals(DECOMMISSIONED.name(), StorageService.instance.getBootstrapState()); + assertFalse(StorageService.instance.isDecommissionFailed()); + assertFalse(StorageService.instance.isDecommissioning()); + } + catch (Throwable t) + { + fail("Decommissioning already decommissioned node should be no-op operation."); + } + }); + } + } + + @Test + public void testDecommissionAfterNodeRestart() throws Throwable + { + try (Cluster cluster = init(Cluster.build(2) + .withConfig(config -> config.with(GOSSIP) + .with(NETWORK)) + .withInstanceInitializer((classLoader, threadGroup, num, generation) -> { + // we do not want to install BB after restart of a node which + // failed to decommission which is the second generation, here + // as "1" as it is counted from 0. + if (num == 1 && generation != 1) + BB.install(classLoader, num); + }) + .start())) + { + IInvokableInstance instance = cluster.get(1); + + instance.runOnInstance(() -> { + assertEquals(COMPLETED.name(), StorageService.instance.getBootstrapState()); + + // pretend that decommissioning has failed in the middle + + try + { + StorageService.instance.decommission(true); + fail("the first attempt to decommission should fail"); + } + catch (Throwable t) + { + assertEquals("simulated error in prepareUnbootstrapStreaming", t.getMessage()); + } + + // node is in DECOMMISSION_FAILED mode + String operationMode = StorageService.instance.getOperationMode(); + assertEquals(DECOMMISSION_FAILED.name(), operationMode); + }); + + // restart the node which we failed to decommission + stopUnchecked(instance); + instance.startup(); + + // it is back to normal so let's decommission again + + String oprationMode = instance.callOnInstance(() -> StorageService.instance.getOperationMode()); + assertEquals(NORMAL.name(), oprationMode); + + instance.runOnInstance(() -> { + try + { + StorageService.instance.decommission(true); + } + catch (InterruptedException e) + { + fail("Should decommission the node"); + } + + assertEquals(DECOMMISSIONED.name(), StorageService.instance.getBootstrapState()); + assertFalse(StorageService.instance.isDecommissionFailed()); + assertFalse(StorageService.instance.isDecommissioning()); + }); + } + } + + + public static class BB + { + public static void install(ClassLoader classLoader, Integer num) + { + new ByteBuddy().rebase(StorageService.class) + .method(named("prepareUnbootstrapStreaming")) + .intercept(MethodDelegation.to(DecommissionTest.BB.class)) + .make() + .load(classLoader, ClassLoadingStrategy.Default.INJECTION); + } + + private static int invocations = 0; + + @SuppressWarnings("unused") + public static Supplier> prepareUnbootstrapStreaming(@SuperCall Callable>> zuper) + { + ++invocations; + + if (invocations == 1) + throw new RuntimeException("simulated error in prepareUnbootstrapStreaming"); + + try + { + return zuper.call(); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + } +}