Skip to content

Commit

Permalink
fix: properly shutdown subscriber stub on permanent streaming pull fa…
Browse files Browse the repository at this point in the history
…ilure (#539)

* fix: stop the subscriber stub on streaming pull failure
  • Loading branch information
hannahrogers-google committed Mar 3, 2021
1 parent 373aee2 commit adbcc0c
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -149,8 +149,7 @@ protected void doStart() {

@Override
protected void doStop() {
messageDispatcher.stop();
ackOperationsWaiter.waitComplete();
runShutdown();

lock.lock();
try {
Expand All @@ -161,6 +160,11 @@ protected void doStop() {
}
}

private void runShutdown() {
messageDispatcher.stop();
ackOperationsWaiter.waitComplete();
}

private class StreamingPullResponseObserver implements ResponseObserver<StreamingPullResponse> {

final SettableApiFuture<Void> errorFuture;
Expand Down Expand Up @@ -282,6 +286,7 @@ public void onFailure(Throwable cause) {
ApiExceptionFactory.createException(
cause, GrpcStatusCode.of(Status.fromThrowable(cause).getCode()), false);
logger.log(Level.SEVERE, "terminated streaming with exception", gaxException);
runShutdown();
notifyFailed(gaxException);
return;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,9 +308,7 @@ protected void doStop() {
public void run() {
try {
// stop connection is no-op if connections haven't been started.
stopAllStreamingConnections();
shutdownBackgroundResources();
subStub.shutdownNow();
runShutdown();
notifyStopped();
} catch (Exception e) {
notifyFailed(e);
Expand All @@ -320,6 +318,12 @@ public void run() {
.start();
}

private void runShutdown() {
stopAllStreamingConnections();
shutdownBackgroundResources();
subStub.shutdownNow();
}

private void startStreamingConnections() {
synchronized (streamingSubscriberConnections) {
for (int i = 0; i < numPullers; i++) {
Expand Down Expand Up @@ -352,8 +356,7 @@ private void startStreamingConnections() {
public void failed(State from, Throwable failure) {
// If a connection failed is because of a fatal error, we should fail the
// whole subscriber.
stopAllStreamingConnections();
shutdownBackgroundResources();
runShutdown();
try {
notifyFailed(failure);
} catch (IllegalStateException e) {
Expand Down

0 comments on commit adbcc0c

Please sign in to comment.