Skip to content

Commit

Permalink
Rely on snapshotted session infos on StreamResultFuture.maybeComplete…
Browse files Browse the repository at this point in the history
… to avoid races

patch by Massimiliano Tomassi; reviewed by Sergio Bossa and by ZhaoYang
for CASSANDRA-15667
  • Loading branch information
maxtomassi authored and blerer committed May 25, 2020
1 parent ea202ce commit a4b6deb
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
3.0.21
* Rely on snapshotted session infos on StreamResultFuture.maybeComplete to avoid race conditions (CASSANDRA-15667)
* EmptyType doesn't override writeValue so could attempt to write bytes when expected not to (CASSANDRA-15790)
* Fix index queries on partition key columns when some partitions contains only static data (CASSANDRA-13666)
* Avoid creating duplicate rows during major upgrades (CASSANDRA-15789)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public boolean hasActiveSessions()
for (StreamSession session : streamSessions.values())
{
StreamSession.State state = session.state();
if (state != StreamSession.State.COMPLETE && state != StreamSession.State.FAILED)
if (!state.isFinalState())
return true;
}
return false;
Expand All @@ -245,6 +245,7 @@ public StreamSession getOrCreateNextSession(InetAddress peer, InetAddress connec
{
StreamSession session = new StreamSession(peer, connecting, factory, streamSessions.size(), keepSSTableLevel, isIncremental);
streamSessions.put(++lastReturned, session);
sessionInfos.put(lastReturned, session.getSessionInfo());
return session;
}
// get
Expand Down Expand Up @@ -277,6 +278,7 @@ public StreamSession getOrCreateSessionById(InetAddress peer, int id, InetAddres
{
session = new StreamSession(peer, connecting, factory, id, keepSSTableLevel, isIncremental);
streamSessions.put(id, session);
sessionInfos.put(id, session.getSessionInfo());
}
return session;
}
Expand Down
12 changes: 11 additions & 1 deletion src/java/org/apache/cassandra/streaming/StreamResultFuture.java
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ synchronized void fireStreamEvent(StreamEvent event)

private synchronized void maybeComplete()
{
if (!coordinator.hasActiveSessions())
if (finishedAllSessions())
{
StreamState finalState = getCurrentState();
if (finalState.hasFailedSession())
Expand All @@ -217,4 +217,14 @@ private synchronized void maybeComplete()
}
}
}

/**
* We can't use {@link StreamCoordinator#hasActiveSessions()} directly because {@link this#maybeComplete()}
* relies on the snapshotted state from {@link StreamCoordinator} and not the {@link StreamSession} state
* directly (CASSANDRA-15667), otherwise inconsistent snapshotted states may lead to completion races.
*/
private boolean finishedAllSessions()
{
return coordinator.getAllSessionInfo().stream().allMatch(s -> s.state.isFinalState());
}
}
29 changes: 22 additions & 7 deletions src/java/org/apache/cassandra/streaming/StreamSession.java
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,27 @@ public class StreamSession implements IEndpointStateChangeSubscriber

public static enum State
{
INITIALIZED,
PREPARING,
STREAMING,
WAIT_COMPLETE,
COMPLETE,
FAILED,
INITIALIZED(false),
PREPARING(false),
STREAMING(false),
WAIT_COMPLETE(false),
COMPLETE(true),
FAILED(true);

private final boolean finalState;

State(boolean finalState)
{
this.finalState = finalState;
}

/**
* @return true if current state is final, either COMPLETE OR FAILED.
*/
public boolean isFinalState()
{
return finalState;
}
}

private volatile State state = State.INITIALIZED;
Expand Down Expand Up @@ -300,7 +315,7 @@ public synchronized void addTransferRanges(String keyspace, Collection<Range<Tok

private void failIfFinished()
{
if (state() == State.COMPLETE || state() == State.FAILED)
if (state().isFinalState())
throw new RuntimeException(String.format("Stream %s is finished with state %s", planId(), state().name()));
}

Expand Down

0 comments on commit a4b6deb

Please sign in to comment.