Skip to content

Commit

Permalink
fix: Restructure java client stream creation to reuse clients for dif…
Browse files Browse the repository at this point in the history
…ferent partitions (#1002)

* fix: Restructure java client stream creation to reuse clients for different partitions

* fix: Restructure java client stream creation to reuse clients for different partitions
  • Loading branch information
dpcollins-google committed Dec 22, 2021
1 parent d1068b4 commit a45a179
Show file tree
Hide file tree
Showing 30 changed files with 273 additions and 211 deletions.
11 changes: 11 additions & 0 deletions google-cloud-pubsublite/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
<differences>
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/pubsublite/cloudpubsub/PublisherSettings$Builder</className>
<method>*</method>
</difference>
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/pubsublite/cloudpubsub/SubscriberSettings$Builder</className>
<method>*</method>
</difference>
<!-- TODO: Remove above after next release -->
<!-- Added method to AdminClient interface (Always okay) -->
<difference>
<differenceType>7012</differenceType>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,24 +17,27 @@
package com.google.cloud.pubsublite.cloudpubsub;

import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultMetadata;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings;
import static com.google.cloud.pubsublite.internal.wire.ServiceClients.getCallContext;

import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.rpc.ApiCallContext;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.Constants;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.MessageTransformer;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.internal.WrappingPublisher;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.wire.PartitionCountWatchingPublisherSettings;
import com.google.cloud.pubsublite.internal.wire.PartitionPublisherFactory;
import com.google.cloud.pubsublite.internal.wire.PubsubContext;
import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework;
import com.google.cloud.pubsublite.internal.wire.RoutingMetadata;
Expand All @@ -46,7 +49,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.pubsub.v1.PubsubMessage;
import java.util.Optional;
import java.util.function.Supplier;
import org.threeten.bp.Duration;

/**
Expand Down Expand Up @@ -75,7 +77,7 @@ public abstract class PublisherSettings {
abstract Optional<MessageTransformer<PubsubMessage, Message>> messageTransformer();

/** Batching settings for this publisher to use. Apply per-partition. */
abstract Optional<BatchingSettings> batchingSettings();
abstract BatchingSettings batchingSettings();

/** A provider for credentials. */
abstract CredentialsProvider credentialsProvider();
Expand All @@ -89,7 +91,7 @@ public abstract class PublisherSettings {
* A supplier for new PublisherServiceClients. Should return a new client each time. If present,
* ignores CredentialsProvider.
*/
abstract Optional<Supplier<PublisherServiceClient>> serviceClientSupplier();
abstract Optional<PublisherServiceClient> serviceClient();

/** The AdminClient to use, if provided. */
abstract Optional<AdminClient> adminClient();
Expand All @@ -103,6 +105,7 @@ public static Builder newBuilder() {
.setFramework(Framework.of("CLOUD_PUBSUB_SHIM"))
.setCredentialsProvider(
PublisherServiceSettings.defaultCredentialsProviderBuilder().build())
.setBatchingSettings(DEFAULT_BATCHING_SETTINGS)
.setUnderlyingBuilder(SinglePartitionPublisherBuilder.newBuilder());
}

Expand Down Expand Up @@ -132,11 +135,8 @@ public abstract Builder setMessageTransformer(
*/
public abstract Builder setFramework(Framework framework);

/**
* A supplier for new PublisherServiceClients. Should return a new client each time. If present,
* ignores CredentialsProvider.
*/
public abstract Builder setServiceClientSupplier(Supplier<PublisherServiceClient> supplier);
/** The PublisherServiceClient to use, if provided. */
public abstract Builder setServiceClient(PublisherServiceClient client);

/** The AdminClient to use, if provided. */
public abstract Builder setAdminClient(AdminClient adminClient);
Expand All @@ -149,23 +149,47 @@ abstract Builder setUnderlyingBuilder(
public abstract PublisherSettings build();
}

private PublisherServiceClient newServiceClient(Partition partition) throws ApiException {
if (serviceClientSupplier().isPresent()) return serviceClientSupplier().get().get();
PublisherServiceSettings.Builder settingsBuilder = PublisherServiceSettings.newBuilder();
settingsBuilder = settingsBuilder.setCredentialsProvider(credentialsProvider());
settingsBuilder =
addDefaultMetadata(
PubsubContext.of(framework()),
RoutingMetadata.of(topicPath(), partition),
settingsBuilder);
private PublisherServiceClient newServiceClient() throws ApiException {
if (serviceClient().isPresent()) return serviceClient().get();
try {
return PublisherServiceClient.create(
addDefaultSettings(topicPath().location().extractRegion(), settingsBuilder));
addDefaultSettings(
topicPath().location().extractRegion(),
PublisherServiceSettings.newBuilder().setCredentialsProvider(credentialsProvider())));
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
}

private PartitionPublisherFactory getPartitionPublisherFactory() {
PublisherServiceClient client = newServiceClient();
return new PartitionPublisherFactory() {
@Override
public com.google.cloud.pubsublite.internal.Publisher<MessageMetadata> newPublisher(
Partition partition) throws ApiException {
SinglePartitionPublisherBuilder.Builder singlePartitionBuilder =
underlyingBuilder()
.setBatchingSettings(batchingSettings())
.setTopic(topicPath())
.setPartition(partition)
.setStreamFactory(
responseStream -> {
ApiCallContext context =
getCallContext(
PubsubContext.of(framework()),
RoutingMetadata.of(topicPath(), partition));
return client.publishCallable().splitCall(responseStream, context);
});
return singlePartitionBuilder.build();
}

@Override
public void close() {
client.close();
}
};
}

private AdminClient getAdminClient() throws ApiException {
if (adminClient().isPresent()) return adminClient().get();
try {
Expand All @@ -186,9 +210,8 @@ private AdminClient getAdminClient() throws ApiException {

@SuppressWarnings("CheckReturnValue")
Publisher instantiate() throws ApiException {
BatchingSettings batchingSettings = batchingSettings().orElse(DEFAULT_BATCHING_SETTINGS);
if (batchingSettings.getFlowControlSettings().getMaxOutstandingElementCount() != null
|| batchingSettings.getFlowControlSettings().getMaxOutstandingRequestBytes() != null) {
if (batchingSettings().getFlowControlSettings().getMaxOutstandingElementCount() != null
|| batchingSettings().getFlowControlSettings().getMaxOutstandingRequestBytes() != null) {
throw new CheckedApiException(
"Pub/Sub Lite does not support flow control settings for publishing.",
Code.INVALID_ARGUMENT)
Expand All @@ -202,16 +225,7 @@ Publisher instantiate() throws ApiException {
PartitionCountWatchingPublisherSettings.Builder publisherSettings =
PartitionCountWatchingPublisherSettings.newBuilder()
.setTopic(topicPath())
.setPublisherFactory(
partition -> {
SinglePartitionPublisherBuilder.Builder singlePartitionBuilder =
underlyingBuilder()
.setBatchingSettings(batchingSettings)
.setTopic(topicPath())
.setPartition(partition)
.setServiceClient(newServiceClient(partition));
return singlePartitionBuilder.build();
})
.setPublisherFactory(getPartitionPublisherFactory())
.setAdminClient(getAdminClient());
return new WrappingPublisher(publisherSettings.build().instantiate(), messageTransformer);
}
Expand Down
Loading

0 comments on commit a45a179

Please sign in to comment.