Skip to content

Commit

Permalink
fix: Update SingleConnection to not hold the monitor on upcalls (#151)
Browse files Browse the repository at this point in the history
  • Loading branch information
dpcollins-google committed Jun 29, 2020
1 parent b93c9f1 commit 8274753
Showing 1 changed file with 6 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ public abstract class SingleConnection<StreamRequestT, StreamResponseT, ClientRe
private static final GoogleLogger log = GoogleLogger.forEnclosingClass();

private final StreamObserver<StreamRequestT> requestStream;
// onError and onCompleted may be called with connectionMonitor held. All messages will be sent
// without it held.
private final StreamObserver<ClientResponseT> clientStream;

private final CloseableMonitor connectionMonitor = new CloseableMonitor();
Expand Down Expand Up @@ -101,10 +99,7 @@ protected void sendToClient(ClientResponseT response) {

protected void setError(Status error) {
Preconditions.checkArgument(!error.isOk());
try (CloseableMonitor.Hold h = connectionMonitor.enter()) {
if (completed) return;
abort(error);
}
abort(error);
}

protected boolean isCompleted() {
Expand All @@ -123,10 +118,12 @@ public void close() {
clientStream.onCompleted();
}

@GuardedBy("connectionMonitor.monitor")
private void abort(Status error) {
Preconditions.checkArgument(!error.isOk());
completed = true;
try (CloseableMonitor.Hold h = connectionMonitor.enter()) {
if (completed) return;
completed = true;
}
requestStream.onError(error.asRuntimeException());
clientStream.onError(error.asRuntimeException());
}
Expand All @@ -150,9 +147,7 @@ public void onNext(StreamResponseT response) {
responseStatus = handleStreamResponse(response);
}
if (!responseStatus.isOk()) {
try (CloseableMonitor.Hold h = connectionMonitor.enter()) {
abort(responseStatus);
}
abort(responseStatus);
}
}

Expand Down

0 comments on commit 8274753

Please sign in to comment.