Skip to content

Commit

Permalink
Merge pull request #288 from dpcollins-google/cps-shared-threadpool
Browse files Browse the repository at this point in the history
fix: Use a shared threadpool for all CPS activities
  • Loading branch information
dpcollins-google committed Jul 21, 2021
2 parents 821eccb + a76f51a commit 879a480
Show file tree
Hide file tree
Showing 6 changed files with 38 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,16 @@
////////////////////////////////////////////////////////////////////////////////
package com.google.pubsub.kafka.common;

import com.google.api.gax.core.CredentialsProvider;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.google.protobuf.ByteString;
import io.grpc.Channel;
import io.grpc.ClientInterceptors;
import io.grpc.ManagedChannel;
import io.grpc.auth.ClientAuthInterceptor;
import io.grpc.netty.NegotiationType;
import io.grpc.netty.NettyChannelBuilder;
import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

/** Utility methods and constants that are repeated across one or more classes. */
public class ConnectorUtils {

private static final String ENDPOINT = "pubsub.googleapis.com";

public static final String SCHEMA_NAME = ByteString.class.getName();
public static final String CPS_SUBSCRIPTION_FORMAT = "projects/%s/subscriptions/%s";
public static final String CPS_TOPIC_FORMAT = "projects/%s/topics/%s";
public static final String CPS_PROJECT_CONFIG = "cps.project";
public static final String CPS_TOPIC_CONFIG = "cps.topic";
public static final String CPS_ENDPOINT = "cps.endpoint";
Expand All @@ -48,19 +39,18 @@ public class ConnectorUtils {
public static final String KAFKA_OFFSET_ATTRIBUTE = "kafka.offset";
public static final String KAFKA_TIMESTAMP_ATTRIBUTE = "kafka.timestamp";

/** Return {@link io.grpc.Channel} which is used by Cloud Pub/Sub gRPC API's. */
public static Channel getChannel(CredentialsProvider credentialsProvider) throws IOException {
ManagedChannel channelImpl =
NettyChannelBuilder.forAddress(ENDPOINT, 443)
.negotiationType(NegotiationType.TLS)
// Maximum Pub/Sub message size is 10MB.
.maxInboundMessageSize(10 * 1024 * 1024)
.build();
final ClientAuthInterceptor interceptor =
new ClientAuthInterceptor(
credentialsProvider.getCredentials(),
Executors.newCachedThreadPool());
return ClientInterceptors.intercept(channelImpl, interceptor);
private static ScheduledExecutorService newDaemonExecutor(String prefix) {
return Executors.newScheduledThreadPool(
Math.max(4, Runtime.getRuntime().availableProcessors() * 5),
new ThreadFactoryBuilder().setDaemon(true).setNameFormat(prefix + "-%d").build());
}

// A shared executor for Pub/Sub clients to use.
private static Optional<ScheduledExecutorService> SYSTEM_EXECUTOR = Optional.empty();
public static synchronized ScheduledExecutorService getSystemExecutor() {
if (!SYSTEM_EXECUTOR.isPresent()) {
SYSTEM_EXECUTOR = Optional.of(newDaemonExecutor("pubsub-connect-system"));
}
return SYSTEM_EXECUTOR.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,13 @@
////////////////////////////////////////////////////////////////////////////////
package com.google.pubsub.kafka.sink;

import static com.google.pubsub.kafka.common.ConnectorUtils.getSystemExecutor;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.retrying.RetrySettings;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -405,6 +408,7 @@ private void createPublisher() {
.setInitialRpcTimeout(Duration.ofSeconds(10))
.setRpcTimeoutMultiplier(2)
.build())
.setExecutorProvider(FixedExecutorProvider.create(getSystemExecutor()))
.setEndpoint(cpsEndpoint);
if (orderingKeySource != OrderingKeySource.NONE) {
builder.setEnableMessageOrdering(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,20 @@
import java.util.Deque;
import java.util.List;
import java.util.concurrent.Future;
import org.apache.commons.lang3.tuple.Pair;

public class AckBatchingSubscriber implements CloudPubSubSubscriber {
interface AlarmFactory {
Future<?> newAlarm(Runnable runnable);
}

private static class IdsAndFuture {
Collection<String> ids;
SettableApiFuture<Empty> future;
};

private final CloudPubSubSubscriber underlying;
@GuardedBy("this")
private final Deque<Pair<Collection<String>, SettableApiFuture<Empty>>> toSend = new ArrayDeque<>();
private final Deque<IdsAndFuture> toSend = new ArrayDeque<>();
private final Future<?> alarm;

public AckBatchingSubscriber(
Expand All @@ -40,9 +44,11 @@ public ApiFuture<List<ReceivedMessage>> pull() {

@Override
public synchronized ApiFuture<Empty> ackMessages(Collection<String> ackIds) {
SettableApiFuture<Empty> result = SettableApiFuture.create();
toSend.add(Pair.of(ackIds, result));
return result;
IdsAndFuture idsAndFuture = new IdsAndFuture();
idsAndFuture.ids = ackIds;
idsAndFuture.future = SettableApiFuture.create();
toSend.add(idsAndFuture);
return idsAndFuture.future;
}

private void flush() {
Expand All @@ -53,8 +59,8 @@ private void flush() {
return;
}
toSend.forEach(pair -> {
ackIds.addAll(pair.getLeft());
futures.add(pair.getRight());
ackIds.addAll(pair.ids);
futures.add(pair.future);
});
toSend.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.protobuf.Empty;
import com.google.pubsub.kafka.common.ConnectorUtils;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PullRequest;
Expand All @@ -31,7 +30,6 @@
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Random;
import java.util.concurrent.ThreadLocalRandom;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package com.google.pubsub.kafka.source;

import com.google.api.gax.core.CredentialsProvider;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.common.annotations.VisibleForTesting;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
////////////////////////////////////////////////////////////////////////////////
package com.google.pubsub.kafka.source;

import static com.google.pubsub.kafka.common.ConnectorUtils.getSystemExecutor;

import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
import com.google.api.gax.core.FixedExecutorProvider;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.common.annotations.VisibleForTesting;
Expand All @@ -39,8 +42,6 @@
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
Expand All @@ -62,8 +63,6 @@ public class CloudPubSubSourceTask extends SourceTask {

private static final Logger log = LoggerFactory.getLogger(CloudPubSubSourceTask.class);
private static final int NUM_CPS_SUBSCRIBERS = 10;
private static final ScheduledExecutorService ACK_EXECUTOR =
MoreExecutors.getExitingScheduledExecutorService(new ScheduledThreadPoolExecutor(4));

private String kafkaTopic;
private ProjectSubscriptionName cpsSubscription;
Expand Down Expand Up @@ -158,7 +157,8 @@ public void start(Map<String, String> props) {
.setMaxOutstandingElementCount(streamingPullMessages)
.setMaxOutstandingRequestBytes(streamingPullBytes).build())
.setParallelPullCount(streamingPullParallelStreams)
.setEndpoint(cpsEndpoint);
.setEndpoint(cpsEndpoint)
.setExecutorProvider(FixedExecutorProvider.create(getSystemExecutor()));
if (streamingPullMaxAckDeadlineMs > 0) {
builder.setMaxAckExtensionPeriod(Duration.ofMillis(streamingPullMaxAckDeadlineMs));
}
Expand All @@ -172,7 +172,7 @@ public void start(Map<String, String> props) {
subscriber = new AckBatchingSubscriber(
new CloudPubSubRoundRobinSubscriber(NUM_CPS_SUBSCRIBERS,
gcpCredentialsProvider,
cpsEndpoint, cpsSubscription, cpsMaxBatchSize), runnable -> ACK_EXECUTOR
cpsEndpoint, cpsSubscription, cpsMaxBatchSize), runnable -> getSystemExecutor()
.scheduleAtFixedRate(runnable, 100, 100, TimeUnit.MILLISECONDS));
}
}
Expand Down

0 comments on commit 879a480

Please sign in to comment.