Skip to content

Commit

Permalink
[CASSANDRA-16932] CEP-10 Phase 2: Minor Gossip Fixes
Browse files Browse the repository at this point in the history
* Ensure we apply new states in correct order so as not to lose TOKEN message
* Permit replacement nodes to join the ring with lower heartbeat state than
  node being replaced
  • Loading branch information
belliottsmith committed Oct 7, 2021
1 parent 5b82447 commit ce2a0a2
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 2 deletions.
23 changes: 22 additions & 1 deletion src/java/org/apache/cassandra/gms/Gossiper.java
Expand Up @@ -1587,12 +1587,33 @@ private void applyNewStates(InetAddressAndPort addr, EndpointState localState, E
if (!hasMajorVersion3Nodes())
localState.removeMajorVersion3LegacyApplicationStates();

// need to run STATUS or STATUS_WITH_PORT first to handle BOOT_REPLACE correctly (else won't be a member, so TOKENS won't be processed)
for (Entry<ApplicationState, VersionedValue> updatedEntry : updatedStates)
{
switch (updatedEntry.getKey())
{
default:
continue;
case STATUS:
if (localState.containsApplicationState(ApplicationState.STATUS_WITH_PORT))
continue;
case STATUS_WITH_PORT:
}
doOnChangeNotifications(addr, updatedEntry.getKey(), updatedEntry.getValue());
}

for (Entry<ApplicationState, VersionedValue> updatedEntry : updatedStates)
{
switch (updatedEntry.getKey())
{
// We should have alredy handled these two states above:
case STATUS_WITH_PORT:
case STATUS:
continue;
}
// filters out legacy change notifications
// only if local state already indicates that the peer has the new fields
if ((ApplicationState.INTERNAL_IP == updatedEntry.getKey() && localState.containsApplicationState(ApplicationState.INTERNAL_ADDRESS_AND_PORT))
||(ApplicationState.STATUS == updatedEntry.getKey() && localState.containsApplicationState(ApplicationState.STATUS_WITH_PORT))
|| (ApplicationState.RPC_ADDRESS == updatedEntry.getKey() && localState.containsApplicationState(ApplicationState.NATIVE_ADDRESS_AND_PORT)))
continue;
doOnChangeNotifications(addr, updatedEntry.getKey(), updatedEntry.getValue());
Expand Down
4 changes: 3 additions & 1 deletion src/java/org/apache/cassandra/service/StorageService.java
Expand Up @@ -2728,7 +2728,9 @@ else if (endpoint.equals(currentOwner))
tokensToUpdateInMetadata.add(token);
tokensToUpdateInSystemKeyspace.add(token);
}
else if (Gossiper.instance.compareEndpointStartup(endpoint, currentOwner) > 0)
// Note: in test scenarios, there may not be any delta between the heartbeat generations of the old
// and new nodes, so we first check whether the new endpoint is marked as a replacement for the old.
else if (endpoint.equals(tokenMetadata.getReplacementNode(currentOwner).orElse(null)) || Gossiper.instance.compareEndpointStartup(endpoint, currentOwner) > 0)
{
tokensToUpdateInMetadata.add(token);
tokensToUpdateInSystemKeyspace.add(token);
Expand Down

0 comments on commit ce2a0a2

Please sign in to comment.