Skip to content
Permalink
Browse files
Merge branch 'cassandra-4.0' into cassandra-4.1
  • Loading branch information
Jon Meredith committed Jun 30, 2022
2 parents a9725b6 + 008bf86 commit 5e4eeb2d0c69b920eadd83559f6ab771c7b6a8dc
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 5 deletions.
@@ -95,7 +95,7 @@ public static InstanceAction statusToNormal(IInvokableInstance peer)
*/
public static void unsafeStatusToNormal(IInvokableInstance target, IInstance peer)
{
int messagingVersion = peer.getMessagingVersion();
final int messagingVersion = getOrDefaultMessagingVersion(target, peer);
changeGossipState(target,
peer,
Arrays.asList(unsafeVersionedValue(target,
@@ -422,7 +422,7 @@ public static void changeGossipState(IInvokableInstance target, IInstance peer,
{
InetSocketAddress addr = peer.broadcastAddress();
UUID hostId = peer.config().hostId();
int netVersion = peer.getMessagingVersion();
final int netVersion = getOrDefaultMessagingVersion(target, peer);
target.runOnInstance(() -> {
InetAddressAndPort endpoint = toCassandraInetAddressAndPort(addr);
StorageService storageService = StorageService.instance;
@@ -448,6 +448,14 @@ public static void changeGossipState(IInvokableInstance target, IInstance peer,
});
}

private static int getOrDefaultMessagingVersion(IInvokableInstance target, IInstance peer)
{
int peerVersion = peer.getMessagingVersion();
final int netVersion = peerVersion == 0 ? target.getMessagingVersion() : peerVersion;
assert netVersion != 0 : "Unable to determine messaging version for peer {}" + peer.config().num();
return netVersion;
}

public static void withProperty(String prop, boolean value, Runnable r)
{
withProperty(prop, Boolean.toString(value), r);
@@ -395,6 +395,8 @@ public synchronized void startup(ICluster cluster)
}
throw t;
}
// This duplicates work done in Instance startup, but keeping as other Instance implementations
// do not, so to permit older releases to be tested, repeat the setup
updateMessagingVersions();

if (instanceInitializer != null)
@@ -46,6 +46,9 @@

import com.google.common.annotations.VisibleForTesting;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.util.concurrent.GlobalEventExecutor;
import org.apache.cassandra.Util;
import org.apache.cassandra.auth.AuthCache;
@@ -163,8 +166,10 @@
*/
public class Instance extends IsolatedExecutor implements IInvokableInstance
{
private Logger inInstancelogger; // Defer creation until running in the instance context
public final IInstanceConfig config;
private volatile boolean initialized = false;
private volatile boolean internodeMessagingStarted = false;
private final AtomicLong startedAt = new AtomicLong();

@Deprecated
@@ -321,6 +326,12 @@ public void schemaChangeInternal(String query)
private void registerMockMessaging(ICluster<?> cluster)
{
MessagingService.instance().outboundSink.add((message, to) -> {
if (!internodeMessagingStarted)
{
inInstancelogger.debug("Dropping outbound message {} to {} as internode messaging has not been started yet",
message, to);
return false;
}
cluster.deliverMessage(to, serializeMessage(message.from(), to, message));
return false;
});
@@ -469,6 +480,12 @@ public void receiveMessageWithInvokingThread(IMessage message)
private SerializableConsumer<Boolean> receiveMessageRunnable(IMessage message)
{
return runOnCaller -> {
if (!internodeMessagingStarted)
{
inInstancelogger.debug("Dropping inbound message {} to {} as internode messaging has not been started yet",
message, config().broadcastAddress());
return;
}
if (message.version() > MessagingService.current_version)
{
throw new IllegalStateException(String.format("Node%d received message version %d but current version is %d",
@@ -499,13 +516,20 @@ private SerializableConsumer<Boolean> receiveMessageRunnable(IMessage message)

public int getMessagingVersion()
{
return MessagingService.current_version;
if (DatabaseDescriptor.isDaemonInitialized())
return MessagingService.current_version;
else
return 0;
}

@Override
public void setMessagingVersion(InetSocketAddress endpoint, int version)
{
MessagingService.instance().versions.set(toCassandraInetAddressAndPort(endpoint), version);
if (DatabaseDescriptor.isDaemonInitialized())
MessagingService.instance().versions.set(toCassandraInetAddressAndPort(endpoint), version);
else
inInstancelogger.warn("Skipped setting messaging version for {} to {} as daemon not initialized yet. Stacktrace attached for debugging.",
endpoint, version, new RuntimeException());
}

@Override
@@ -546,6 +570,7 @@ public void startup(ICluster cluster)
assert startedAt.compareAndSet(0L, System.nanoTime()) : "startedAt uninitialized";

sync(() -> {
inInstancelogger = LoggerFactory.getLogger(Instance.class);
try
{
// org.apache.cassandra.distributed.impl.AbstractCluster.startup sets the exception handler for the thread
@@ -634,7 +659,11 @@ public void startup(ICluster cluster)
}
registerInboundFilter(cluster);
registerOutboundFilter(cluster);

if (!config.has(NETWORK))
{
propagateMessagingVersions(cluster); // fake messaging needs to know messaging version for filters
}
internodeMessagingStarted = true;
JVMStabilityInspector.replaceKiller(new InstanceKiller(Instance.this::shutdown));

// TODO: this is more than just gossip
@@ -706,6 +735,33 @@ else if (cluster instanceof Cluster)
initialized = true;
}

// Update the messaging versions for all instances
// that have initialized their configurations.
private static void propagateMessagingVersions(ICluster cluster)
{
cluster.stream().forEach(reportToObj -> {
IInstance reportTo = (IInstance) reportToObj;
if (reportTo.isShutdown())
return;

int reportToVersion = reportTo.getMessagingVersion();
if (reportToVersion == 0)
return;

cluster.stream().forEach(reportFromObj -> {
IInstance reportFrom = (IInstance) reportFromObj;
if (reportFrom == reportTo || reportFrom.isShutdown())
return;

int reportFromVersion = reportFrom.getMessagingVersion();
if (reportFromVersion == 0) // has not read configuration yet, no accessing messaging version
return;
// TODO: decide if we need to take care of the minversion
reportTo.setMessagingVersion(reportFrom.broadcastAddress(), reportFromVersion);
});
});
}

@Override
public void postStartup()
{
@@ -796,6 +852,7 @@ public Future<Void> shutdown(boolean graceful)

error = parallelRun(error, executor, () -> ScheduledExecutors.shutdownNowAndWait(1L, MINUTES));

internodeMessagingStarted = false;
error = parallelRun(error, executor,
CommitLog.instance::shutdownBlocking,
// can only shutdown message once, so if the test shutsdown an instance, then ignore the failure

0 comments on commit 5e4eeb2

Please sign in to comment.