Skip to content

Commit

Permalink
fix: Change proxy service to call child start methods on executors (#…
Browse files Browse the repository at this point in the history
…1334)

* fix: Change proxy service to call child start methods on executors

Also change service clients to use an isolated executor
Also change channel pools to not use a fixed pool size, and use an auto-scaling channel pool.

* feat: Convert internal interfaces to use protos

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
dpcollins-google and gcf-owl-bot[bot] committed Feb 20, 2023
1 parent 0150fb7 commit e08ea2f
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 39 deletions.
4 changes: 2 additions & 2 deletions README.md
Expand Up @@ -32,13 +32,13 @@ If you are using Maven, add this to your pom.xml file:
If you are using Gradle without BOM, add this to your dependencies:

```Groovy
implementation 'com.google.cloud:google-cloud-pubsublite:1.9.4'
implementation 'com.google.cloud:google-cloud-pubsublite:1.10.0'
```

If you are using SBT, add this to your dependencies:

```Scala
libraryDependencies += "com.google.cloud" % "google-cloud-pubsublite" % "1.9.4"
libraryDependencies += "com.google.cloud" % "google-cloud-pubsublite" % "1.10.0"
```

## Authentication
Expand Down
Expand Up @@ -40,7 +40,7 @@
public abstract class ProxyService extends AbstractApiService {
private static final GoogleLogger LOGGER = GoogleLogger.forEnclosingClass();
private final List<ApiService> services = new ArrayList<>();
private final AtomicBoolean stoppedOrFailed = new AtomicBoolean(false);
private final AtomicBoolean failed = new AtomicBoolean(false);

protected <T extends ApiService> ProxyService(Collection<T> services) {
addServices(services);
Expand All @@ -55,8 +55,16 @@ protected ProxyService(ApiService... services) throws ApiException {
protected final <T extends ApiService> void addServices(Collection<T> services)
throws ApiException {
checkState(state() == State.NEW);
Listener onServiceError =
new Listener() {
@Override
public void failed(State state, Throwable throwable) {
onPermanentError(toCanonical(throwable));
}
};
for (ApiService service : services) {
checkArgument(service.state() == State.NEW, "All services must not be started.");
service.addListener(onServiceError, SystemExecutors.getFuturesExecutor());
this.services.add(service);
}
}
Expand All @@ -76,7 +84,7 @@ protected void handlePermanentError(CheckedApiException error) {}

// Tries to stop all dependent services and sets this service into the FAILED state.
protected final void onPermanentError(CheckedApiException error) {
if (stoppedOrFailed.getAndSet(true)) return;
if (failed.getAndSet(true)) return;
try {
ApiServiceUtils.stopAsync(services);
} catch (Throwable t) {
Expand All @@ -87,13 +95,21 @@ protected final void onPermanentError(CheckedApiException error) {
} catch (Throwable t) {
LOGGER.atFine().withCause(t).log("Exception in handlePermanentError.");
}
// Failures are sent to the client and should always be ApiExceptions.
notifyFailed(error.underlying);
try {
// Failures are sent to the client and should always be ApiExceptions.
notifyFailed(error.underlying);
} catch (IllegalStateException e) {
LOGGER.atFine().withCause(e).log("Exception in notifyFailed.");
}
}

// AbstractApiService implementation.
@Override
protected final void doStart() {
SystemExecutors.getFuturesExecutor().execute(this::startImpl);
}

private void startImpl() {
Listener listener =
new Listener() {
private final AtomicInteger leftToStart = new AtomicInteger(services.size());
Expand All @@ -103,54 +119,53 @@ public void running() {
if (leftToStart.decrementAndGet() == 0) {
try {
start();
notifyStarted();
} catch (CheckedApiException e) {
onPermanentError(e);
return;
}
notifyStarted();
}
}

@Override
public void failed(State state, Throwable throwable) {
onPermanentError(toCanonical(throwable));
}
};
for (ApiService service : services) {
service.addListener(listener, SystemExecutors.getFuturesExecutor());
service.startAsync();
try {
for (ApiService service : services) {
service.addListener(listener, SystemExecutors.getFuturesExecutor());
service.startAsync();
}
} catch (Throwable t) {
onPermanentError(toCanonical(t));
}
}

@Override
protected final void doStop() {
SystemExecutors.getFuturesExecutor().execute(this::stopImpl);
}

private void stopImpl() {
Listener listener =
new Listener() {
private final AtomicInteger leftToStop = new AtomicInteger(services.size());

@Override
public void terminated(State state) {
if (leftToStop.decrementAndGet() == 0) {
if (!stoppedOrFailed.getAndSet(true)) {
notifyStopped();
}
notifyStopped();
}
}

@Override
public void failed(State state, Throwable throwable) {
onPermanentError(toCanonical(throwable));
}
};
try {
stop();
} catch (CheckedApiException e) {
onPermanentError(e);
return;
}
for (ApiService service : services) {
service.addListener(listener, SystemExecutors.getFuturesExecutor());
service.stopAsync();
try {
for (ApiService service : services) {
service.addListener(listener, SystemExecutors.getFuturesExecutor());
service.stopAsync();
}
} catch (Throwable t) {
onPermanentError(toCanonical(t));
}
}
}
Expand Up @@ -19,6 +19,7 @@
import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;

import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.grpc.ChannelPoolSettings;
import com.google.api.gax.grpc.GrpcCallContext;
import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider;
import com.google.api.gax.rpc.ApiCallContext;
Expand All @@ -27,15 +28,15 @@
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.Endpoints;
import com.google.cloud.pubsublite.internal.Lazy;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.Multimaps;
import java.util.concurrent.ScheduledExecutorService;
import org.threeten.bp.Duration;

public final class ServiceClients {
// Default to 10 channels per client to avoid server limitations on streams and requests
// per-channel.
private static final int CLIENT_POOL_SIZE =
Integer.parseInt(System.getProperty("PUBSUB_LITE_CHANNELS_PER_CLIENT", "10"));
private static final Lazy<ScheduledExecutorService> GRPC_EXECUTOR =
new Lazy<>(() -> SystemExecutors.newDaemonExecutor("pubsub-lite-grpc"));

private ServiceClients() {}

Expand All @@ -45,7 +46,11 @@ private static TransportChannelProvider getTransportChannelProvider() {
.setKeepAliveTime(Duration.ofMinutes(1))
.setKeepAliveWithoutCalls(true)
.setKeepAliveTimeout(Duration.ofMinutes(1))
.setPoolSize(CLIENT_POOL_SIZE)
.setChannelPoolSettings(
ChannelPoolSettings.builder()
.setInitialChannelCount(25)
.setMaxRpcsPerChannel(50)
.build())
.setExecutor(SystemExecutors.getFuturesExecutor())
.build();
}
Expand All @@ -57,8 +62,7 @@ Settings addDefaultSettings(CloudRegion target, Builder builder) throws ApiExcep
try {
return builder
.setEndpoint(Endpoints.regionalEndpoint(target))
.setBackgroundExecutorProvider(
FixedExecutorProvider.create(SystemExecutors.getAlarmExecutor()))
.setBackgroundExecutorProvider(FixedExecutorProvider.create(GRPC_EXECUTOR.get()))
.setTransportChannelProvider(getTransportChannelProvider())
.build();
} catch (Throwable t) {
Expand Down
Expand Up @@ -196,7 +196,7 @@ private Subscriber initSub1() throws CheckedApiException {
public void stopStopsSubs() throws CheckedApiException {
Subscriber sub1 = initSub1();

assigningSubscriber.stopAsync();
assigningSubscriber.stopAsync().awaitTerminated();
verify(sub1).stopAsync();
verify(sub1).awaitTerminated();
}
Expand Down
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.pubsublite.internal.wire;

import static com.google.cloud.pubsublite.internal.ApiExceptionMatcher.assertFutureThrowsCode;
import static com.google.cloud.pubsublite.internal.ApiExceptionMatcher.assertThrowableMatches;
import static com.google.cloud.pubsublite.internal.testing.RetryingConnectionHelpers.whenFailed;
import static com.google.common.truth.Truth.assertThat;
Expand Down Expand Up @@ -67,7 +68,9 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.InOrder;
Expand Down Expand Up @@ -99,6 +102,8 @@ private static MessagePublishResponse messageResponse(Offset startOffset, int me
.build();
}

@Rule public Timeout globalTimeout = Timeout.seconds(30);

@Mock private PublishStreamFactory unusedStreamFactory;
@Mock private BatchPublisher mockBatchPublisher;
@Mock private BatchPublisherFactory mockPublisherFactory;
Expand Down Expand Up @@ -203,11 +208,10 @@ public void construct_closeSendsBatched() throws Exception {
}

@Test
public void publishBeforeStart_isPermanentError() throws Exception {
public void publishBeforeStart_FailsFuture() {
Message message = Message.builder().build();
assertThrows(
IllegalStateException.class, () -> publisher.publish(message, PublishSequenceNumber.of(0)));
assertThrows(IllegalStateException.class, () -> publisher.startAsync().awaitRunning());
assertFutureThrowsCode(
publisher.publish(message, PublishSequenceNumber.of(0)), Code.FAILED_PRECONDITION);
verifyNoInteractions(mockPublisherFactory);
verifyNoInteractions(mockBatchPublisher);
}
Expand Down

0 comments on commit e08ea2f

Please sign in to comment.