Skip to content

Commit

Permalink
fix: re-use the same subscriber service client for all BlockingPullSu…
Browse files Browse the repository at this point in the history
…bscribers created (#513)

* fix: respect max messages per batch option

* fix formatting

* fix: use single subscriber service client

* fix public method

* fix: mark the service client transient

* fix: guard by this
  • Loading branch information
hannahrogers-google authored Nov 17, 2022
1 parent f436045 commit 9c73b2b
Showing 1 changed file with 35 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,15 @@
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import javax.annotation.concurrent.GuardedBy;

@AutoValue
public abstract class PslReadDataSourceOptions implements Serializable {
private static final long serialVersionUID = 2680059304693561607L;

@GuardedBy("this")
private transient SubscriberServiceClient subscriberServiceClient = null;

@Nullable
public abstract String credentialsKey();

Expand Down Expand Up @@ -127,36 +131,41 @@ MultiPartitionCommitter newMultiPartitionCommitter() {

@SuppressWarnings("CheckReturnValue")
PartitionSubscriberFactory getSubscriberFactory() {
SubscriberServiceClient serviceClient = getSubscriberServiceClient();
return (partition, offset, consumer) -> {
return SubscriberBuilder.newBuilder()
.setSubscriptionPath(this.subscriptionPath())
.setPartition(partition)
.setMessageConsumer(consumer)
.setStreamFactory(
responseStream -> {
ApiCallContext context =
getCallContext(
PubsubContext.of(Constants.FRAMEWORK),
RoutingMetadata.of(subscriptionPath(), partition));
return serviceClient.subscribeCallable().splitCall(responseStream, context);
})
.setInitialLocation(
SeekRequest.newBuilder()
.setCursor(Cursor.newBuilder().setOffset(offset.value()))
.build())
.build();
};
}

private synchronized SubscriberServiceClient getSubscriberServiceClient() {
if (subscriberServiceClient != null) return subscriberServiceClient;
try {
SubscriberServiceSettings.Builder settingsBuilder =
SubscriberServiceSettings.newBuilder()
.setCredentialsProvider(new PslCredentialsProvider(credentialsKey()));
try {
SubscriberServiceClient serviceClient =
SubscriberServiceClient.create(
addDefaultSettings(
this.subscriptionPath().location().extractRegion(), settingsBuilder));
return SubscriberBuilder.newBuilder()
.setSubscriptionPath(this.subscriptionPath())
.setPartition(partition)
.setMessageConsumer(consumer)
.setStreamFactory(
responseStream -> {
ApiCallContext context =
getCallContext(
PubsubContext.of(Constants.FRAMEWORK),
RoutingMetadata.of(subscriptionPath(), partition));
return serviceClient.subscribeCallable().splitCall(responseStream, context);
})
.setInitialLocation(
SeekRequest.newBuilder()
.setCursor(Cursor.newBuilder().setOffset(offset.value()))
.build())
.build();
} catch (IOException e) {
throw new IllegalStateException("Failed to create subscriber service.", e);
}
};
subscriberServiceClient =
SubscriberServiceClient.create(
addDefaultSettings(subscriptionPath().location().extractRegion(), settingsBuilder));
return subscriberServiceClient;
} catch (IOException e) {
throw new IllegalStateException("Unable to create SubscriberServiceClient.");
}
}

// TODO(b/jiangmichael): Make XXXClientSettings accept creds so we could simplify below methods.
Expand Down

0 comments on commit 9c73b2b

Please sign in to comment.