Skip to content

Commit

Permalink
fix: Centralize alarm executor creation for Pub/Sub Lite (#565)
Browse files Browse the repository at this point in the history
* fix: Set batching settings on SinglePartitionPublisher

* fix: Centralize alarm executor creation for Pub/Sub Lite

Also prevent creation of per-object thread pools which are largely unused and may prevent shutdown if not using daemon threads.
  • Loading branch information
dpcollins-google committed Mar 30, 2021
1 parent 5371c6c commit 5f73967
Show file tree
Hide file tree
Showing 9 changed files with 109 additions and 87 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ public class PartitionCountWatcherImpl extends AbstractApiService implements Par
private final Duration period;
private final TopicPath topicPath;
private final AdminClient adminClient;
private final ScheduledExecutorService executorService;
private final Consumer<Long> partitionCountReceiver;

private ScheduledFuture<?> partitionCountPoll;
Expand Down Expand Up @@ -59,7 +58,6 @@ private PartitionCountWatcherImpl(
this.topicPath = topicPath;
this.adminClient = adminClient;
this.partitionCountReceiver = receiver;
this.executorService = Executors.newSingleThreadScheduledExecutor();
}

private void pollTopicConfig() {
Expand Down Expand Up @@ -96,8 +94,9 @@ private void stop() {
@Override
protected void doStart() {
partitionCountPoll =
executorService.scheduleAtFixedRate(
this::pollTopicConfig, 0, period.toMillis(), TimeUnit.MILLISECONDS);
SystemExecutors.getAlarmExecutor()
.scheduleAtFixedRate(
this::pollTopicConfig, 0, period.toMillis(), TimeUnit.MILLISECONDS);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,7 @@
import java.util.List;
import java.util.Optional;
import java.util.Queue;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import javax.annotation.concurrent.GuardedBy;
Expand All @@ -59,7 +57,6 @@ public final class PublisherImpl extends ProxyService
@GuardedBy("monitor.monitor")
private final RetryingConnection<BatchPublisher> connection;

private final ScheduledExecutorService executorService;
private final BatchingSettings batchingSettings;
private Future<?> alarmFuture;

Expand Down Expand Up @@ -115,7 +112,6 @@ private static class InFlightBatch {
new SerialBatcher(
batchingSettings.getRequestByteThreshold(),
batchingSettings.getElementCountThreshold());
this.executorService = Executors.newSingleThreadScheduledExecutor();
this.batchingSettings = batchingSettings;
addServices(connection);
}
Expand Down Expand Up @@ -195,18 +191,18 @@ protected void start() {
// After initialize, the stream can have an error and try to cancel the future, but the
// future can call into the stream, so this needs to be created under lock.
this.alarmFuture =
this.executorService.scheduleWithFixedDelay(
this::flushToStream,
batchingSettings.getDelayThreshold().toNanos(),
batchingSettings.getDelayThreshold().toNanos(),
TimeUnit.NANOSECONDS);
SystemExecutors.getAlarmExecutor()
.scheduleWithFixedDelay(
this::flushToStream,
batchingSettings.getDelayThreshold().toNanos(),
batchingSettings.getDelayThreshold().toNanos(),
TimeUnit.NANOSECONDS);
}
}

@Override
protected void stop() {
alarmFuture.cancel(false /* mayInterruptIfRunning */);
executorService.shutdown();
flush(); // Flush any outstanding messages that were batched.
try (CloseableMonitor.Hold h = monitor.enter()) {
shutdown = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,6 @@
import com.google.common.flogger.GoogleLogger;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import javax.annotation.concurrent.GuardedBy;

Expand All @@ -57,7 +55,6 @@ class RetryingConnectionImpl<
connectionFactory;
private final StreamRequestT initialRequest;
private final RetryingConnectionObserver<ClientResponseT> observer;
private final ScheduledExecutorService systemExecutor;

// connectionMonitor will not be held in any upcalls.
private final CloseableMonitor connectionMonitor = new CloseableMonitor();
Expand All @@ -81,16 +78,16 @@ class RetryingConnectionImpl<
this.connectionFactory = connectionFactory;
this.initialRequest = initialRequest;
this.observer = observer;
this.systemExecutor = Executors.newSingleThreadScheduledExecutor();
}

@Override
protected void doStart() {
this.systemExecutor.execute(
() -> {
reinitialize();
notifyStarted();
});
SystemExecutors.getAlarmExecutor()
.execute(
() -> {
reinitialize();
notifyStarted();
});
}

// Reinitialize the stream. Must be called in a downcall to prevent deadlock.
Expand Down Expand Up @@ -119,7 +116,6 @@ protected void doStop() {
}
logger.atFine().log(
String.format("Terminated connection with initial request %s.", initialRequest));
systemExecutor.shutdownNow();
notifyStopped();
}

Expand Down Expand Up @@ -195,17 +191,18 @@ public final void onError(Throwable t) {
logger.atFine().withCause(t).log(
"Stream disconnected attempting retry, after %s milliseconds", backoffTime);
ScheduledFuture<?> retry =
systemExecutor.schedule(
() -> {
try {
observer.triggerReinitialize();
} catch (Throwable t2) {
logger.atWarning().withCause(t2).log("Error occurred in triggerReinitialize.");
onError(t2);
}
},
backoffTime,
MILLISECONDS);
SystemExecutors.getAlarmExecutor()
.schedule(
() -> {
try {
observer.triggerReinitialize();
} catch (Throwable t2) {
logger.atWarning().withCause(t2).log("Error occurred in triggerReinitialize.");
onError(t2);
}
},
backoffTime,
MILLISECONDS);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@
import com.google.cloud.pubsublite.Endpoints;
import com.google.cloud.pubsublite.internal.Lazy;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.MoreExecutors;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import org.threeten.bp.Duration;

public final class ServiceClients {
Expand All @@ -38,9 +36,7 @@ private ServiceClients() {}
new Lazy<>(
() ->
FixedExecutorProvider.create(
MoreExecutors.getExitingScheduledExecutorService(
new ScheduledThreadPoolExecutor(
Math.max(4, Runtime.getRuntime().availableProcessors())))));
SystemExecutors.newDaemonExecutor("pubsub-lite-service-clients")));

public static <
Settings extends ClientSettings<Settings>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.Monitor;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.annotation.concurrent.GuardedBy;
Expand All @@ -56,7 +54,6 @@ public class SubscriberImpl extends ProxyService

private final CloseableMonitor monitor = new CloseableMonitor();

private final ScheduledExecutorService executorService;
private Future<?> alarmFuture;

@GuardedBy("monitor.monitor")
Expand Down Expand Up @@ -101,7 +98,6 @@ private static class InFlightSeek {
factory,
SubscribeRequest.newBuilder().setInitial(initialRequest).build(),
this);
this.executorService = Executors.newSingleThreadScheduledExecutor();
addServices(this.connection);
}

Expand Down Expand Up @@ -133,18 +129,18 @@ protected void handlePermanentError(CheckedApiException error) {
protected void start() {
try (CloseableMonitor.Hold h = monitor.enter()) {
alarmFuture =
executorService.scheduleWithFixedDelay(
this::processBatchFlowRequest,
FLOW_REQUESTS_FLUSH_INTERVAL_MS,
FLOW_REQUESTS_FLUSH_INTERVAL_MS,
TimeUnit.MILLISECONDS);
SystemExecutors.getAlarmExecutor()
.scheduleWithFixedDelay(
this::processBatchFlowRequest,
FLOW_REQUESTS_FLUSH_INTERVAL_MS,
FLOW_REQUESTS_FLUSH_INTERVAL_MS,
TimeUnit.MILLISECONDS);
}
}

@Override
protected void stop() {
alarmFuture.cancel(false /* mayInterruptIfRunning */);
executorService.shutdown();
try (CloseableMonitor.Hold h = monitor.enter()) {
shutdown = true;
inFlightSeek.ifPresent(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.internal.wire;

import com.google.cloud.pubsublite.internal.Lazy;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

public class SystemExecutors {
private SystemExecutors() {}

public static ScheduledExecutorService newDaemonExecutor(String prefix) {
return Executors.newScheduledThreadPool(
Math.max(4, Runtime.getRuntime().availableProcessors()),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat(prefix + "-%d").build());
}

private static final Lazy<ScheduledExecutorService> ALARM_EXECUTOR =
new Lazy<>(() -> newDaemonExecutor("pubsub-lite-alarms"));
// An executor for alarms.
public static ScheduledExecutorService getAlarmExecutor() {
return ALARM_EXECUTOR.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,8 @@ final class PerServerPublisherCache {
private PerServerPublisherCache() {}

static final PublisherCache PUBLISHER_CACHE = new PublisherCache();

static {
Runtime.getRuntime().addShutdownHook(new Thread(PUBLISHER_CACHE::close));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,52 +22,50 @@
import com.google.api.core.ApiService.State;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
import com.google.common.annotations.VisibleForTesting;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.HashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;

/** A map of working publishers by PublisherOptions. */
class PublisherCache {
private final CloseableMonitor monitor = new CloseableMonitor();

private final Executor listenerExecutor = Executors.newSingleThreadExecutor();

@GuardedBy("monitor.monitor")
class PublisherCache implements AutoCloseable {
@GuardedBy("this")
private final HashMap<PublisherOptions, Publisher<MessageMetadata>> livePublishers =
new HashMap<>();

Publisher<MessageMetadata> get(PublisherOptions options) throws ApiException {
private synchronized void evict(PublisherOptions options) {
livePublishers.remove(options);
}

synchronized Publisher<MessageMetadata> get(PublisherOptions options) throws ApiException {
checkArgument(options.usesCache());
try (CloseableMonitor.Hold h = monitor.enter()) {
Publisher<MessageMetadata> publisher = livePublishers.get(options);
if (publisher != null) {
return publisher;
}
publisher = Publishers.newPublisher(options);
livePublishers.put(options, publisher);
publisher.addListener(
new Listener() {
@Override
public void failed(State s, Throwable t) {
try (CloseableMonitor.Hold h = monitor.enter()) {
livePublishers.remove(options);
}
}
},
listenerExecutor);
publisher.startAsync().awaitRunning();
Publisher<MessageMetadata> publisher = livePublishers.get(options);
if (publisher != null) {
return publisher;
}
publisher = Publishers.newPublisher(options);
livePublishers.put(options, publisher);
publisher.addListener(
new Listener() {
@Override
public void failed(State s, Throwable t) {
evict(options);
}
},
SystemExecutors.getAlarmExecutor());
publisher.startAsync().awaitRunning();
return publisher;
}

@VisibleForTesting
void set(PublisherOptions options, Publisher<MessageMetadata> toCache) {
try (CloseableMonitor.Hold h = monitor.enter()) {
livePublishers.put(options, toCache);
}
synchronized void set(PublisherOptions options, Publisher<MessageMetadata> toCache) {
livePublishers.put(options, toCache);
}

@Override
public synchronized void close() {
livePublishers.forEach(((options, publisher) -> publisher.stopAsync()));
livePublishers.clear();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,12 @@
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import org.apache.beam.sdk.transforms.DoFn;

Expand All @@ -55,8 +54,6 @@ class PubsubLiteSink extends DoFn<PubSubMessage, Void> {
@GuardedBy("this")
private transient Deque<CheckedApiException> errorsSinceLastFinish;

private static final Executor executor = Executors.newCachedThreadPool();

PubsubLiteSink(PublisherOptions options) {
this.options = options;
}
Expand Down Expand Up @@ -129,7 +126,7 @@ public void onFailure(Throwable t) {
onFailure.accept(t);
}
},
executor);
SystemExecutors.getAlarmExecutor());
}

// Intentionally don't flush on bundle finish to allow multi-sink client reuse.
Expand Down

0 comments on commit 5f73967

Please sign in to comment.