Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -146,4 +146,11 @@ static AdminClient create(AdminClientSettings settings) throws StatusException {
* status {@link io.grpc.Status.Code#NOT_FOUND}
*/
ApiFuture<Void> deleteSubscription(SubscriptionPath path);

/**
* Tear down this admin client.
*
* @throws StatusException on a failure to properly terminate.
*/
void close() throws StatusException;
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking change that requires a major version update.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are no major versions in a pre-v1 library. This change will not be submitted as it is superceded by #295. This is also only a breaking change for someone who overrides (not uses) AdminClient, which is not an intended workflow. The previous method signature was void close() throws Exception which this is a subset of.

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import com.google.api.core.ApiFuture;
import com.google.api.core.NanoClock;
import com.google.api.gax.core.BackgroundResourceAggregation;
import com.google.api.gax.core.ExecutorAsBackgroundResource;
import com.google.api.gax.retrying.ExponentialRetryAlgorithm;
import com.google.api.gax.retrying.ResultRetryAlgorithm;
Expand Down Expand Up @@ -54,13 +53,16 @@
import com.google.common.collect.ImmutableList;
import com.google.protobuf.FieldMask;
import io.grpc.Status;
import io.grpc.StatusException;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class AdminClientImpl extends BackgroundResourceAggregation implements AdminClient {
public class AdminClientImpl implements AdminClient {
private final ExecutorAsBackgroundResource executorResource;
private final CloudRegion region;
private final AdminServiceGrpc.AdminServiceBlockingStub stub;
private final RetryingExecutor<Void> voidRetryingExecutor;
Expand All @@ -75,20 +77,8 @@ public AdminClientImpl(
CloudRegion region,
AdminServiceGrpc.AdminServiceBlockingStub stub,
RetrySettings retrySettings) {
this(
region,
stub,
retrySettings,
// TODO: Consider allowing tuning in the future.
Executors.newScheduledThreadPool(6));
}

private AdminClientImpl(
CloudRegion region,
AdminServiceGrpc.AdminServiceBlockingStub stub,
RetrySettings retrySettings,
ScheduledExecutorService executor) {
super(ImmutableList.of(new ExecutorAsBackgroundResource(executor)));
ScheduledExecutorService executor = Executors.newScheduledThreadPool(6);
this.executorResource = new ExecutorAsBackgroundResource(executor);
this.region = region;
this.stub = stub;
this.voidRetryingExecutor = retryingExecutor(retrySettings, executor);
Expand All @@ -100,6 +90,42 @@ private AdminClientImpl(
this.listTopicSubscriptionsRetryingExecutor = retryingExecutor(retrySettings, executor);
}

// BackgroundResource implementation.
@Override
public void shutdown() {
executorResource.shutdown();
}

@Override
public boolean isShutdown() {
return executorResource.isShutdown();
}

@Override
public boolean isTerminated() {
return executorResource.isTerminated();
}

@Override
public void shutdownNow() {
executorResource.shutdownNow();
}

@Override
public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException {
return executorResource.awaitTermination(duration, unit);
}

@Override
public void close() throws StatusException {
try {
executorResource.close();
} catch (Exception e) {
throw ExtractStatus.toCanonical(e);
}
}

// AdminClient implementation.
private static <T> RetryingExecutor<T> retryingExecutor(
RetrySettings settings, ScheduledExecutorService executor) {
return new ScheduledRetryingExecutor<>(retryAlgorithm(settings), executor);
Expand Down