Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -257,15 +257,16 @@ void reportMetadataInternal(ClusterMetadata metadata)

private void checkIfNodesRemoved(Topology topology, Set<Node.Id> stillLiveNodes)
{
if (epochs.minEpoch() == topology.epoch()) return;
long minEpoch = epochs.minEpoch();
if (minEpoch == 0 || topology.epoch() <= minEpoch) return;
Topology previous = getTopologyForEpoch(topology.epoch() - 1);
// for all nodes removed, or pending removal, mark them as removed so we don't wait on their replies
Set<Node.Id> removedNodes = Sets.difference(previous.nodes(), topology.nodes());
removedNodes = Sets.filter(removedNodes, id -> !stillLiveNodes.contains(id));
// TODO (desired, efficiency): there should be no need to notify every epoch for every removed node
for (Node.Id removedNode : removedNodes)
{
if (topology.epoch() >= epochs.minEpoch())
if (topology.epoch() >= minEpoch)
onNodeRemoved(topology.epoch(), previous, removedNode);
}
}
Expand Down
28 changes: 16 additions & 12 deletions src/java/org/apache/cassandra/service/accord/AccordService.java
Original file line number Diff line number Diff line change
Expand Up @@ -382,20 +382,15 @@ public synchronized void unsafeStartupWithOverrides(@Nullable Journal.TopologyUp

if (remote != null)
remote.forEach(configService::reportTopology, highestKnown + 1, Integer.MAX_VALUE);
else if (images.isEmpty()) // First boot, single-node cluster
configService.reportTopology(AccordTopology.createAccordTopology(metadata));

// Subscribe to TCM events
ClusterMetadataService.instance().log().addListener(configService.listener);
{
metadata = ClusterMetadata.current();
highestKnown = configService.currentEpoch();
if (metadata.epoch.getEpoch() > highestKnown)
{
remote = fetchTopologies(highestKnown + 1);
if (remote != null)
remote.forEach(configService::reportTopology, highestKnown + 1, Integer.MAX_VALUE);
}
}

// We report current topology _after_ subscribing to TCM events since in a single-node cluster there
// will be no notification about the current epoch, since it's already reported. And in a multi-node cluster
// we do not want a race between addinga listener and reporting an epoch.
if (remote == null && images.isEmpty())
configService.reportTopology(AccordTopology.createAccordTopology(metadata));

WatermarkCollector.fetchAndReportWatermarksAsync(configService());
configService.unsafeMarkTruncated();
Expand All @@ -414,6 +409,15 @@ else if (images.isEmpty()) // First boot, single-node cluster
catch (TimeoutException e)
{
logger.warn("Epoch {} is not ready after waiting for {} seconds", metadata.epoch, (++attempt) * waitSeconds);
// In case there are any gaps, fetch unknown topologies.
metadata = ClusterMetadata.current();
highestKnown = configService.currentEpoch();
if (metadata.epoch.getEpoch() > highestKnown)
{
remote = fetchTopologies(highestKnown + 1);
if (remote != null)
remote.forEach(configService::reportTopology, highestKnown + 1, Integer.MAX_VALUE);
}
}

if (Clock.Global.nanoTime() > deadine)
Expand Down