Skip to content

Commit

Permalink
fix restarting of services on gossipping-only member
Browse files Browse the repository at this point in the history
patch by Stefan Miklosovic; reviewed by Brandon Williams (CASSANDRA-17752)
  • Loading branch information
smiklosovic committed Jul 27, 2022
1 parent 09692d5 commit a7b5321
Show file tree
Hide file tree
Showing 5 changed files with 125 additions and 3 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
@@ -1,4 +1,5 @@
3.0.28
* Fix restarting of services on gossipping-only member (CASSANDRA-17752)
* Fix writetime and ttl functions forbidden for collections instead of multicell columns (CASSANDRA-17628)
* Supress CVE-2020-7238 (CASSANDRA-17697)
* Fix issue where frozen maps may not be serialized in the correct order (CASSANDRA-17623)
Expand Down
4 changes: 2 additions & 2 deletions src/java/org/apache/cassandra/service/StorageService.java
Expand Up @@ -346,7 +346,7 @@ public void stopGossiping()
{
if (initialized)
{
if (!isNormal())
if (!isNormal() && joinRing)
throw new IllegalStateException("Unable to stop gossip because the node is not in the normal state. Try to stop the node instead.");

logger.warn("Stopping gossip by operator request");
Expand Down Expand Up @@ -4489,7 +4489,7 @@ synchronized void checkServiceAllowedToStart(String service)
if (isShutdown()) // do not rely on operationMode in case it gets changed to decomissioned or other
throw new IllegalStateException(String.format("Unable to start %s because the node was drained.", service));

if (!isNormal())
if (!isNormal() && joinRing) // if the node is not joining the ring, it is gossipping-only member which is in STARTING state forever
throw new IllegalStateException(String.format("Unable to start %s because the node is not in the normal state.", service));
}

Expand Down
Expand Up @@ -50,9 +50,14 @@

import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;
import static org.apache.cassandra.distributed.action.GossipHelper.withProperty;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
import static org.apache.cassandra.distributed.api.TokenSupplier.evenlyDistributedTokens;
import static org.apache.cassandra.distributed.shared.NetworkTopology.singleDcNetworkTopology;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class GossipTest extends TestBaseImpl
{
Expand Down Expand Up @@ -266,6 +271,37 @@ public void gossipShutdownUpdatesTokenMetadata() throws Exception
}
}

@Test
public void restartGossipOnGossippingOnlyMember() throws Throwable
{
int originalNodeCount = 1;
int expandedNodeCount = originalNodeCount + 1;

try (Cluster cluster = builder().withNodes(originalNodeCount)
.withTokenSupplier(evenlyDistributedTokens(expandedNodeCount, 1))
.withNodeIdTopology(singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0"))
.withConfig(config -> config.with(NETWORK, GOSSIP))
.start())
{
IInstanceConfig config = cluster.newInstanceConfig();
IInvokableInstance gossippingOnlyMember = cluster.bootstrap(config);
withProperty("cassandra.join_ring", Boolean.toString(false), () -> gossippingOnlyMember.startup(cluster));

assertTrue(gossippingOnlyMember.callOnInstance((IIsolatedExecutor.SerializableCallable<Boolean>)
() -> StorageService.instance.isGossipRunning()));

gossippingOnlyMember.runOnInstance((IIsolatedExecutor.SerializableRunnable) () -> StorageService.instance.stopGossiping());

assertFalse(gossippingOnlyMember.callOnInstance((IIsolatedExecutor.SerializableCallable<Boolean>)
() -> StorageService.instance.isGossipRunning()));

gossippingOnlyMember.runOnInstance((IIsolatedExecutor.SerializableRunnable) () -> StorageService.instance.startGossiping());

assertTrue(gossippingOnlyMember.callOnInstance((IIsolatedExecutor.SerializableCallable<Boolean>)
() -> StorageService.instance.isGossipRunning()));
}
}

void assertPendingRangesForPeer(final boolean expectPending, final InetAddress movingAddress, final Cluster cluster)
{
for (IInvokableInstance inst : new IInvokableInstance[]{ cluster.get(1), cluster.get(3)})
Expand Down
Expand Up @@ -26,14 +26,24 @@
import com.datastax.driver.core.Session;
import com.datastax.driver.core.SimpleStatement;
import com.datastax.driver.core.Statement;
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ICluster;
import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.IIsolatedExecutor;
import org.apache.cassandra.distributed.impl.RowUtil;
import org.apache.cassandra.service.StorageService;

import static org.apache.cassandra.distributed.action.GossipHelper.withProperty;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
import static org.apache.cassandra.distributed.api.TokenSupplier.evenlyDistributedTokens;
import static org.apache.cassandra.distributed.shared.AssertUtils.assertRows;
import static org.apache.cassandra.distributed.shared.AssertUtils.row;
import static org.apache.cassandra.distributed.shared.NetworkTopology.singleDcNetworkTopology;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

// TODO: this test should be removed after running in-jvm dtests is set up via the shared API repository
public class NativeProtocolTest extends TestBaseImpl
Expand Down Expand Up @@ -78,4 +88,35 @@ public void withCounters() throws Throwable
cluster.close();
}
}

@Test
public void restartTransportOnGossippingOnlyMember() throws Throwable
{
int originalNodeCount = 1;
int expandedNodeCount = originalNodeCount + 1;

try (Cluster cluster = builder().withNodes(originalNodeCount)
.withTokenSupplier(evenlyDistributedTokens(expandedNodeCount, 1))
.withNodeIdTopology(singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0"))
.withConfig(config -> config.with(NETWORK, GOSSIP, NATIVE_PROTOCOL))
.start())
{
IInstanceConfig config = cluster.newInstanceConfig();
IInvokableInstance gossippingOnlyMember = cluster.bootstrap(config);
withProperty("cassandra.join_ring", Boolean.toString(false), () -> gossippingOnlyMember.startup(cluster));

assertTrue(gossippingOnlyMember.callOnInstance((IIsolatedExecutor.SerializableCallable<Boolean>)
() -> StorageService.instance.isNativeTransportRunning()));

gossippingOnlyMember.runOnInstance((IIsolatedExecutor.SerializableRunnable) () -> StorageService.instance.stopNativeTransport());

assertFalse(gossippingOnlyMember.callOnInstance((IIsolatedExecutor.SerializableCallable<Boolean>)
() -> StorageService.instance.isNativeTransportRunning()));

gossippingOnlyMember.runOnInstance((IIsolatedExecutor.SerializableRunnable) () -> StorageService.instance.startNativeTransport());

assertTrue(gossippingOnlyMember.callOnInstance((IIsolatedExecutor.SerializableCallable<Boolean>)
() -> StorageService.instance.isNativeTransportRunning()));
}
}
}
Expand Up @@ -28,16 +28,29 @@
import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.ConsistencyLevel;
import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.api.IInstanceConfig;
import org.apache.cassandra.distributed.api.IInvokableInstance;
import org.apache.cassandra.distributed.api.IIsolatedExecutor;
import org.apache.cassandra.distributed.api.QueryResults;
import org.apache.cassandra.distributed.api.SimpleQueryResult;
import org.apache.cassandra.distributed.shared.AssertUtils;
import org.apache.cassandra.service.StorageService;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.Mutation;
import org.apache.cassandra.utils.ByteBufferUtil;
import org.apache.thrift.TException;

public class ThriftClientTest extends TestBaseImpl
import static org.apache.cassandra.distributed.action.GossipHelper.withProperty;
import static org.apache.cassandra.distributed.api.Feature.GOSSIP;
import static org.apache.cassandra.distributed.api.Feature.NATIVE_PROTOCOL;
import static org.apache.cassandra.distributed.api.Feature.NETWORK;
import static org.apache.cassandra.distributed.api.TokenSupplier.evenlyDistributedTokens;
import static org.apache.cassandra.distributed.shared.NetworkTopology.singleDcNetworkTopology;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

public class ThriftTest extends TestBaseImpl
{
@Test
public void writeThenReadCQL() throws IOException, TException
Expand Down Expand Up @@ -66,4 +79,35 @@ public void writeThenReadCQL() throws IOException, TException
AssertUtils.assertRows(qr, QueryResults.builder().row(0, 0).build());
}
}

@Test
public void restartThriftOnGossippingOnlyMember() throws Throwable
{
int originalNodeCount = 1;
int expandedNodeCount = originalNodeCount + 1;

try (Cluster cluster = builder().withNodes(originalNodeCount)
.withTokenSupplier(evenlyDistributedTokens(expandedNodeCount, 1))
.withNodeIdTopology(singleDcNetworkTopology(expandedNodeCount, "dc0", "rack0"))
.withConfig(config -> config.with(NETWORK, GOSSIP, NATIVE_PROTOCOL))
.start())
{
IInstanceConfig config = cluster.newInstanceConfig();
IInvokableInstance gossippingOnlyMember = cluster.bootstrap(config);
withProperty("cassandra.join_ring", Boolean.toString(false), () -> gossippingOnlyMember.startup(cluster));

assertTrue(gossippingOnlyMember.callOnInstance((IIsolatedExecutor.SerializableCallable<Boolean>)
() -> StorageService.instance.isRPCServerRunning()));

gossippingOnlyMember.runOnInstance((IIsolatedExecutor.SerializableRunnable) () -> StorageService.instance.stopRPCServer());

assertFalse(gossippingOnlyMember.callOnInstance((IIsolatedExecutor.SerializableCallable<Boolean>)
() -> StorageService.instance.isRPCServerRunning()));

gossippingOnlyMember.runOnInstance((IIsolatedExecutor.SerializableRunnable) () -> StorageService.instance.startRPCServer());

assertTrue(gossippingOnlyMember.callOnInstance((IIsolatedExecutor.SerializableCallable<Boolean>)
() -> StorageService.instance.isRPCServerRunning()));
}
}
}

0 comments on commit a7b5321

Please sign in to comment.