Skip to content

Commit

Permalink
fix: Ensure notifyFailed is called and all services are stopped when …
Browse files Browse the repository at this point in the history
…a permanent error happens (#860)

* fix: Ensure notifyFailed is called and all services are stopped when a permanent error happens

* fix: pom issues

* fix: dependencies

* fix: catch handlePermanentError exceptions
  • Loading branch information
dpcollins-google committed Sep 8, 2021
1 parent 413e35e commit a046b53
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@
import com.google.api.core.AbstractApiService;
import com.google.api.core.ApiService;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.internal.wire.ApiServiceUtils;
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
import com.google.common.collect.ImmutableList;
import com.google.common.flogger.GoogleLogger;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand All @@ -35,6 +37,7 @@
//
// On any dependent service failure, fails all other services and calls handlePermanentError.
public abstract class ProxyService extends AbstractApiService {
private static final GoogleLogger LOGGER = GoogleLogger.forEnclosingClass();
private final List<ApiService> services = new ArrayList<>();
private final AtomicBoolean stoppedOrFailed = new AtomicBoolean(false);

Expand Down Expand Up @@ -67,10 +70,16 @@ protected final void addServices(ApiService... services) throws ApiException {
// Tries to stop all dependent services and sets this service into the FAILED state.
protected final void onPermanentError(CheckedApiException error) {
if (stoppedOrFailed.getAndSet(true)) return;
for (ApiService service : services) {
service.stopAsync();
try {
ApiServiceUtils.stopAsync(services);
} catch (Throwable t) {
LOGGER.atFine().withCause(t).log("Exception in underlying service shutdown.");
}
try {
handlePermanentError(error);
} catch (Throwable t) {
LOGGER.atFine().withCause(t).log("Exception in handlePermanentError.");
}
handlePermanentError(error);
// Failures are sent to the client and should always be ApiExceptions.
notifyFailed(error.underlying);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,21 @@ protected void doStop() {
};
}

public static void stopAsync(Iterable<? extends ApiService> services) throws ApiException {
CheckedApiException lastException = null;
for (ApiService service : services) {
try {
service.stopAsync();
} catch (Throwable t) {
LOGGER.atFine().withCause(t).log("Exception in service shutdown.");
lastException = toCanonical(t);
}
}
if (lastException != null) {
throw lastException.underlying;
}
}

public static void blockingShutdown(Iterable<? extends ApiService> services) throws ApiException {
CheckedApiException lastException = null;
for (ApiService service : services) {
Expand Down
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@
<artifactId>javax.annotation-api</artifactId>
<version>1.3.2</version>
</dependency>
<dependency>
<!-- Explicit override, remove when no longer needed.-->
<groupId>org.checkerframework</groupId>
<artifactId>checker-qual</artifactId>
<version>3.17.0</version>
</dependency>
<dependency>
<groupId>com.google.truth</groupId>
<artifactId>truth</artifactId>
Expand Down

0 comments on commit a046b53

Please sign in to comment.