Skip to content

Commit

Permalink
Stop the context and update the state BEFORE throwing the exception.
Browse files Browse the repository at this point in the history
  • Loading branch information
Sela committed Mar 6, 2017
1 parent d71207a commit 4d1222f
Showing 1 changed file with 4 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,13 @@ protected void stop() {
// been thrown during the "grace period".
try {
javaStreamingContext.awaitTermination(0);
SparkContextFactory.stopSparkContext(javaSparkContext);
if (Objects.equals(state, State.RUNNING)) {
state = State.STOPPED;
}
} catch (Exception e) {
throw beamExceptionFrom(e);
}
SparkContextFactory.stopSparkContext(javaSparkContext);
if (Objects.equals(state, State.RUNNING)) {
state = State.STOPPED;
}
}

@Override
Expand Down

0 comments on commit 4d1222f

Please sign in to comment.