Skip to content

Commit

Permalink
Expose bootstrap and decommission state to nodetool info
Browse files Browse the repository at this point in the history
patch by Stefan Miklosovic; reviewed by Brandon Williams CASSANDRA-18555

Co-authored-by: Jaydeepkumar Chovatia <chovatia.jaydeep@gmail.com>
  • Loading branch information
smiklosovic and jaydeepkumar1984 committed Jun 20, 2023
1 parent f5a5917 commit e2a6c99
Show file tree
Hide file tree
Showing 7 changed files with 339 additions and 37 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
@@ -1,4 +1,5 @@
5.0
* Expose bootstrap and decommission state to nodetool info (CASSANDRA-18555)
* Fix SSTabledump errors when dumping data from index (CASSANDRA-17698)
* Avoid unnecessary deserialization of terminal arguments when executing CQL functions (CASSANDRA-18566)
* Remove dependency on pytz library for setting CQLSH timezones on Python version >= 3.9 (CASSANDRA-17433)
Expand Down
1 change: 1 addition & 0 deletions NEWS.txt
Expand Up @@ -164,6 +164,7 @@ New features
- Added `sstablepartitions` offline tool to find large partitions in sstables.
- `cassandra-stress` has a new option called '-jmx' which enables a user to pass username and password to JMX (CASSANDRA-18544)
- It is possible to read all credentials for `cassandra-stress` from a file via option `-credentials-file` (CASSANDRA-18544)
- nodetool info displays bootstrap state a node is in as well as if it was decommissioned or if it failed to decommission (CASSANDRA-18555)

Upgrading
---------
Expand Down
121 changes: 84 additions & 37 deletions src/java/org/apache/cassandra/service/StorageService.java
Expand Up @@ -263,6 +263,8 @@
import static org.apache.cassandra.net.Verb.REPLICATION_DONE_REQ;
import static org.apache.cassandra.service.ActiveRepairService.ParentRepairStatus;
import static org.apache.cassandra.service.ActiveRepairService.repairCommandExecutor;
import static org.apache.cassandra.service.StorageService.Mode.DECOMMISSIONED;
import static org.apache.cassandra.service.StorageService.Mode.DECOMMISSION_FAILED;
import static org.apache.cassandra.utils.Clock.Global.currentTimeMillis;
import static org.apache.cassandra.utils.Clock.Global.nanoTime;
import static org.apache.cassandra.utils.FBUtilities.getBroadcastAddressAndPort;
Expand Down Expand Up @@ -414,7 +416,7 @@ public Collection<Range<Token>> getPrimaryRangesWithinDC(String keyspace)
/* the probability for tracing any particular request, 0 disables tracing and 1 enables for all */
private double traceProbability = 0.0;

public enum Mode { STARTING, NORMAL, JOINING, LEAVING, DECOMMISSIONED, MOVING, DRAINING, DRAINED }
public enum Mode { STARTING, NORMAL, JOINING, LEAVING, DECOMMISSIONED, DECOMMISSION_FAILED, MOVING, DRAINING, DRAINED }
private volatile Mode operationMode = Mode.STARTING;

/* Used for tracking drain progress */
Expand Down Expand Up @@ -626,7 +628,7 @@ public void stopTransports()
* they get the Gossip shutdown message, so even if
* we don't get time to broadcast this, it is not a problem.
*
* See {@link Gossiper#markAsShutdown(InetAddressAndPort)}
* See Gossiper.markAsShutdown(InetAddressAndPort)
*/
private void shutdownClientServers()
{
Expand Down Expand Up @@ -2157,7 +2159,8 @@ private void invalidateLocalRanges()
/**
* All MVs have been created during bootstrap, so mark them as built
*/
private void markViewsAsBuilt() {
private void markViewsAsBuilt()
{
for (String keyspace : Schema.instance.getUserKeyspaces().names())
{
for (ViewMetadata view: Schema.instance.getKeyspaceMetadata(keyspace).views)
Expand All @@ -2168,11 +2171,18 @@ private void markViewsAsBuilt() {
/**
* Called when bootstrap did finish successfully
*/
private void bootstrapFinished() {
private void bootstrapFinished()
{
markViewsAsBuilt();
isBootstrapMode = false;
}

@Override
public String getBootstrapState()
{
return SystemKeyspace.getBootstrapState().name();
}

public boolean resumeBootstrap()
{
if (isBootstrapMode && SystemKeyspace.bootstrapInProgress())
Expand Down Expand Up @@ -5128,18 +5138,32 @@ private void startLeaving()

public void decommission(boolean force) throws InterruptedException
{
if (operationMode == DECOMMISSIONED)
{
logger.info("This node was already decommissioned. There is no point in decommissioning it again.");
return;
}

if (isDecommissioning())
{
logger.info("This node is still decommissioning.");
return;
}

TokenMetadata metadata = tokenMetadata.cloneAfterAllLeft();
// there is no point to do this logic again once node was decommissioning but failed to do so
if (operationMode != Mode.LEAVING)
{
if (!tokenMetadata.isMember(FBUtilities.getBroadcastAddressAndPort()))
throw new UnsupportedOperationException("local node is not a member of the token ring yet");
if (metadata.getAllEndpoints().size() < 2)
if (metadata.getAllEndpoints().size() < 2 && metadata.getAllEndpoints().contains(FBUtilities.getBroadcastAddressAndPort()))
throw new UnsupportedOperationException("no other normal nodes in the ring; decommission would be pointless");
if (operationMode != Mode.NORMAL)
if (operationMode != Mode.NORMAL && operationMode != DECOMMISSION_FAILED)
throw new UnsupportedOperationException("Node in " + operationMode + " state; wait for status to become normal or restart");
}

if (!isDecommissioning.compareAndSet(false, true))
throw new IllegalStateException("Node is still decommissioning. Check nodetool netstats.");
throw new IllegalStateException("Node is still decommissioning. Check nodetool netstats or nodetool info.");

if (logger.isDebugEnabled())
logger.debug("DECOMMISSIONING");
Expand All @@ -5150,27 +5174,35 @@ public void decommission(boolean force) throws InterruptedException

String dc = DatabaseDescriptor.getEndpointSnitch().getLocalDatacenter();

if (operationMode != Mode.LEAVING) // If we're already decommissioning there is no point checking RF/pending ranges
// If we're already decommissioning there is no point checking RF/pending ranges
if (operationMode != Mode.LEAVING)
{
int rf, numNodes;
for (String keyspaceName : Schema.instance.getNonLocalStrategyKeyspaces().names())
{
if (!force)
{
boolean notEnoughLiveNodes = false;
Keyspace keyspace = Keyspace.open(keyspaceName);
if (keyspace.getReplicationStrategy() instanceof NetworkTopologyStrategy)
{
NetworkTopologyStrategy strategy = (NetworkTopologyStrategy) keyspace.getReplicationStrategy();
rf = strategy.getReplicationFactor(dc).allReplicas;
numNodes = metadata.getTopology().getDatacenterEndpoints().get(dc).size();
Collection<InetAddressAndPort> datacenterEndpoints = metadata.getTopology().getDatacenterEndpoints().get(dc);
numNodes = datacenterEndpoints.size();
if (numNodes <= rf && datacenterEndpoints.contains(FBUtilities.getBroadcastAddressAndPort()))
notEnoughLiveNodes = true;
}
else
{
numNodes = metadata.getAllEndpoints().size();
Set<InetAddressAndPort> allEndpoints = metadata.getAllEndpoints();
numNodes = allEndpoints.size();
rf = keyspace.getReplicationStrategy().getReplicationFactor().allReplicas;
if (numNodes <= rf && allEndpoints.contains(FBUtilities.getBroadcastAddressAndPort()))
notEnoughLiveNodes = true;
}

if (numNodes <= rf)
if (notEnoughLiveNodes)
throw new UnsupportedOperationException("Not enough live nodes to maintain replication factor in keyspace "
+ keyspaceName + " (RF = " + rf + ", N = " + numNodes + ")."
+ " Perform a forceful decommission to ignore.");
Expand All @@ -5182,42 +5214,48 @@ public void decommission(boolean force) throws InterruptedException
}

startLeaving();
long timeout = Math.max(RING_DELAY_MILLIS, BatchlogManager.instance.getBatchlogTimeout());
long timeout = Math.max(RING_DELAY_MILLIS, BatchlogManager.getBatchlogTimeout());
setMode(Mode.LEAVING, "sleeping " + timeout + " ms for batch processing and pending range setup", true);
Thread.sleep(timeout);

Runnable finishLeaving = new Runnable()
unbootstrap();

// shutdown cql, gossip, messaging, Stage and set state to DECOMMISSIONED

shutdownClientServers();
Gossiper.instance.stop();
try
{
public void run()
{
shutdownClientServers();
Gossiper.instance.stop();
try
{
MessagingService.instance().shutdown();
}
catch (IOError ioe)
{
logger.info("failed to shutdown message service: {}", ioe);
}
MessagingService.instance().shutdown();
}
catch (IOError ioe)
{
logger.info("failed to shutdown message service", ioe);
}

Stage.shutdownNow();
SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.DECOMMISSIONED);
setMode(Mode.DECOMMISSIONED, true);
// let op be responsible for killing the process
}
};
unbootstrap(finishLeaving);
Stage.shutdownNow();
SystemKeyspace.setBootstrapState(SystemKeyspace.BootstrapState.DECOMMISSIONED);
setMode(DECOMMISSIONED, true);
// let op be responsible for killing the process
}
catch (InterruptedException e)
{
throw new UncheckedInterruptedException(e);
setMode(DECOMMISSION_FAILED, true);
logger.error("Node interrupted while decommissioning");
throw new RuntimeException("Node interrupted while decommissioning");
}
catch (ExecutionException e)
{
logger.error("Error while decommissioning node ", e.getCause());
setMode(DECOMMISSION_FAILED, true);
logger.error("Error while decommissioning node: {}", e.getCause().getMessage());
throw new RuntimeException("Error while decommissioning node: " + e.getCause().getMessage());
}
catch (Throwable t)
{
setMode(DECOMMISSION_FAILED, true);
logger.error("Error while decommissioning node: {}", t.getMessage());
throw t;
}
finally
{
isDecommissioning.set(false);
Expand Down Expand Up @@ -5254,7 +5292,7 @@ public Supplier<Future<StreamState>> prepareUnbootstrapStreaming()
return () -> streamRanges(rangesToStream);
}

private void unbootstrap(Runnable onFinish) throws ExecutionException, InterruptedException
private void unbootstrap() throws ExecutionException, InterruptedException
{
Supplier<Future<StreamState>> startStreaming = prepareUnbootstrapStreaming();

Expand Down Expand Up @@ -5290,7 +5328,6 @@ private void unbootstrap(Runnable onFinish) throws ExecutionException, Interrupt
hintsSuccess.get();
logger.debug("stream acks all received.");
leaveRing();
onFinish.run();
}

private Future streamHints()
Expand Down Expand Up @@ -5610,7 +5647,17 @@ public boolean isNormal()

public boolean isDecommissioned()
{
return operationMode == Mode.DECOMMISSIONED;
return operationMode == DECOMMISSIONED;
}

public boolean isDecommissionFailed()
{
return operationMode == DECOMMISSION_FAILED;
}

public boolean isDecommissioning()
{
return isDecommissioning.get();
}

public String getDrainProgress()
Expand Down
19 changes: 19 additions & 0 deletions src/java/org/apache/cassandra/service/StorageServiceMBean.java
Expand Up @@ -489,6 +489,23 @@ default int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion,
*/
public void decommission(boolean force) throws InterruptedException;

/**
* Returns whether a node has failed to decommission.
*
* The fact that this method returns false does not mean that there was an attempt to
* decommission this node which was successful.
*
* @return true if decommission of this node has failed, false otherwise
*/
public boolean isDecommissionFailed();

/**
* Returns whether a node is being decommissioned or not.
*
* @return true if this node is decommissioning, false otherwise
*/
public boolean isDecommissioning();

/**
* @param newToken token to move this node to.
* This node will unload its data onto its neighbors, and bootstrap to the new token.
Expand Down Expand Up @@ -1006,6 +1023,8 @@ default int upgradeSSTables(String keyspaceName, boolean excludeCurrentVersion,
*/
public boolean resumeBootstrap();

public String getBootstrapState();

/** Gets the concurrency settings for processing stages*/
static class StageConcurrency implements Serializable
{
Expand Down
10 changes: 10 additions & 0 deletions src/java/org/apache/cassandra/tools/nodetool/Decommission.java
Expand Up @@ -37,6 +37,16 @@ public void execute(NodeProbe probe)
{
try
{
if (probe.getStorageService().isDecommissioning())
{
probe.output().out.println("This node is still decommissioning.");
return;
}
if ("DECOMMISSIONED".equals(probe.getStorageService().getBootstrapState()))
{
probe.output().out.println("Node was already decommissioned.");
return;
}
probe.decommission(force);
} catch (InterruptedException e)
{
Expand Down
4 changes: 4 additions & 0 deletions src/java/org/apache/cassandra/tools/nodetool/Info.java
Expand Up @@ -177,6 +177,10 @@ public void execute(NodeProbe probe)
{
out.printf("%-23s: (node is not joined to the cluster)%n", "Token");
}

out.printf("%-23s: %s%n", "Bootstrap state", probe.getStorageService().getBootstrapState());
out.printf("%-23s: %s%n", "Decommissioning", probe.getStorageService().isDecommissioning());
out.printf("%-23s: %s%n", "Decommission failed", probe.getStorageService().isDecommissionFailed());
}

/**
Expand Down

0 comments on commit e2a6c99

Please sign in to comment.