From 038cda4ec928989cee8e005d6751dc6ceb84de85 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Sun, 31 Jan 2021 22:45:39 -0500 Subject: [PATCH 01/10] feat: Add CredentialsProvider to Publisher and Subscriber settings These objects require more than one client and are annoying to construct correctly. Add a commonly needed setting to the top level here. Fixes #472 --- .../cloudpubsub/PublisherSettings.java | 85 ++++++-- .../cloudpubsub/SubscriberSettings.java | 203 +++++++++++++----- .../internal/wire/AssignerBuilder.java | 95 -------- .../internal/wire/AssignerSettings.java | 68 ++++++ .../internal/wire/CommitterBuilder.java | 85 -------- .../internal/wire/CommitterSettings.java | 59 +++++ .../wire/PartitionCountWatchingPublisher.java | 18 +- ...rtitionCountWatchingPublisherSettings.java | 48 ++--- .../internal/wire/PublisherBuilder.java | 45 +--- .../wire/SinglePartitionPublisherBuilder.java | 21 +- .../internal/wire/SubscriberBuilder.java | 39 +--- .../cloudpubsub/PublisherSettingsTest.java | 1 - ...derTest.java => AssignerSettingsTest.java} | 4 +- ...erTest.java => CommitterSettingsTest.java} | 7 +- .../PartitionCountWatchingPublisherTest.java | 17 +- .../internal/wire/PublisherBuilderTest.java | 3 +- .../wire/SinglePartitionPublisherTest.java | 4 + .../pubsublite/beam/PublisherOptions.java | 31 ++- .../pubsublite/beam/SubscriberOptions.java | 66 ++++-- 19 files changed, 477 insertions(+), 422 deletions(-) delete mode 100755 google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerBuilder.java create mode 100755 google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerSettings.java delete mode 100755 google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/CommitterBuilder.java create mode 100755 google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/CommitterSettings.java rename google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/{AssignerBuilderTest.java => AssignerSettingsTest.java} (95%) rename google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/{CommitterBuilderTest.java => CommitterSettingsTest.java} (92%) diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/PublisherSettings.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/PublisherSettings.java index 14c5ca4d8..fe13b2c34 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/PublisherSettings.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/PublisherSettings.java @@ -16,21 +16,33 @@ package com.google.cloud.pubsublite.cloudpubsub; +import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical; +import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultMetadata; +import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings; + import com.google.api.gax.batching.BatchingSettings; +import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.rpc.ApiException; +import com.google.auth.oauth2.GoogleCredentials; import com.google.auto.value.AutoValue; +import com.google.cloud.pubsublite.AdminClient; +import com.google.cloud.pubsublite.AdminClientSettings; import com.google.cloud.pubsublite.Constants; import com.google.cloud.pubsublite.Message; import com.google.cloud.pubsublite.MessageTransformer; +import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.TopicPath; import com.google.cloud.pubsublite.cloudpubsub.internal.WrappingPublisher; -import com.google.cloud.pubsublite.internal.wire.PartitionCountWatcher; -import com.google.cloud.pubsublite.internal.wire.PartitionCountWatchingPublisher; import com.google.cloud.pubsublite.internal.wire.PartitionCountWatchingPublisherSettings; import com.google.cloud.pubsublite.internal.wire.PubsubContext; import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework; +import com.google.cloud.pubsublite.internal.wire.RoutingMetadata; import com.google.cloud.pubsublite.internal.wire.SinglePartitionPublisherBuilder; +import com.google.cloud.pubsublite.v1.AdminServiceClient; +import com.google.cloud.pubsublite.v1.AdminServiceSettings; import com.google.cloud.pubsublite.v1.PublisherServiceClient; +import com.google.cloud.pubsublite.v1.PublisherServiceSettings; +import com.google.common.annotations.VisibleForTesting; import com.google.pubsub.v1.PubsubMessage; import java.util.Optional; import java.util.function.Supplier; @@ -65,17 +77,22 @@ public abstract class PublisherSettings { /** Batching settings for this publisher to use. Apply per-partition. */ abstract Optional batchingSettings(); - /** A supplier for new PublisherServiceClients. Should return a new client each time. */ + /** A provider for credentials. */ + abstract CredentialsProvider credentialsProvider(); + + /** + * A supplier for new PublisherServiceClients. Should return a new client each time. If present, + * ignores CredentialsProvider. + */ abstract Optional> serviceClientSupplier(); // For testing. abstract SinglePartitionPublisherBuilder.Builder underlyingBuilder(); - abstract Optional partitionCountWatcherFactory(); - /** Get a new builder for a PublisherSettings. */ public static Builder newBuilder() { return new AutoValue_PublisherSettings.Builder() + .setCredentialsProvider(() -> GoogleCredentials.getApplicationDefault()) .setUnderlyingBuilder(SinglePartitionPublisherBuilder.newBuilder()); } @@ -97,18 +114,56 @@ public abstract Builder setMessageTransformer( /** Batching settings for this publisher to use. Apply per-partition. */ public abstract Builder setBatchingSettings(BatchingSettings batchingSettings); - /** A supplier for new PublisherServiceClients. Should return a new client each time. */ + /** A provider for credentials. */ + public abstract Builder setCredentialsProvider(CredentialsProvider credentialsProvider); + + /** + * A supplier for new PublisherServiceClients. Should return a new client each time. If present, + * ignores CredentialsProvider. + */ public abstract Builder setServiceClientSupplier(Supplier supplier); // For testing. + @VisibleForTesting abstract Builder setUnderlyingBuilder( SinglePartitionPublisherBuilder.Builder underlyingBuilder); - abstract Builder setPartitionCountWatcherFactory(PartitionCountWatcher.Factory factory); - public abstract PublisherSettings build(); } + private PublisherServiceClient newServiceClient(Partition partition) throws ApiException { + if (serviceClientSupplier().isPresent()) return serviceClientSupplier().get().get(); + PublisherServiceSettings.Builder settingsBuilder = PublisherServiceSettings.newBuilder(); + settingsBuilder = settingsBuilder.setCredentialsProvider(credentialsProvider()); + settingsBuilder = + addDefaultMetadata( + PubsubContext.of(FRAMEWORK), + RoutingMetadata.of(topicPath(), partition), + settingsBuilder); + try { + return PublisherServiceClient.create( + addDefaultSettings(topicPath().location().region(), settingsBuilder)); + } catch (Throwable t) { + throw toCanonical(t).underlying; + } + } + + private AdminClient newAdminClient() throws ApiException { + try { + return AdminClient.create( + AdminClientSettings.newBuilder() + .setServiceClient( + AdminServiceClient.create( + AdminServiceSettings.newBuilder() + .setCredentialsProvider(credentialsProvider()) + .build())) + .setRegion(topicPath().location().region()) + .build()); + } catch (Throwable t) { + throw toCanonical(t).underlying; + } + } + @SuppressWarnings("CheckReturnValue") Publisher instantiate() throws ApiException { BatchingSettings batchingSettings = batchingSettings().orElse(DEFAULT_BATCHING_SETTINGS); @@ -125,16 +180,12 @@ Publisher instantiate() throws ApiException { SinglePartitionPublisherBuilder.Builder singlePartitionBuilder = underlyingBuilder() .setBatchingSettings(batchingSettings) - .setContext(PubsubContext.of(FRAMEWORK)) .setTopic(topicPath()) - .setPartition(partition); - serviceClientSupplier() - .ifPresent( - supplier -> singlePartitionBuilder.setServiceClient(supplier.get())); + .setPartition(partition) + .setServiceClient(newServiceClient(partition)); return singlePartitionBuilder.build(); - }); - partitionCountWatcherFactory().ifPresent(publisherSettings::setConfigWatcherFactory); - return new WrappingPublisher( - new PartitionCountWatchingPublisher(publisherSettings.build()), messageTransformer); + }) + .setAdminClient(newAdminClient()); + return new WrappingPublisher(publisherSettings.build().instantiate(), messageTransformer); } } diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/SubscriberSettings.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/SubscriberSettings.java index 0a0f05b1b..b284c54e8 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/SubscriberSettings.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/SubscriberSettings.java @@ -17,9 +17,12 @@ package com.google.cloud.pubsublite.cloudpubsub; import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical; -import static com.google.cloud.pubsublite.internal.UncheckedApiPreconditions.checkArgument; +import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultMetadata; +import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings; +import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.rpc.ApiException; +import com.google.auth.oauth2.GoogleCredentials; import com.google.auto.value.AutoValue; import com.google.cloud.pubsub.v1.MessageReceiver; import com.google.cloud.pubsublite.MessageTransformer; @@ -32,15 +35,21 @@ import com.google.cloud.pubsublite.cloudpubsub.internal.PartitionSubscriberFactory; import com.google.cloud.pubsublite.cloudpubsub.internal.SinglePartitionSubscriber; import com.google.cloud.pubsublite.internal.CheckedApiException; -import com.google.cloud.pubsublite.internal.wire.AssignerBuilder; import com.google.cloud.pubsublite.internal.wire.AssignerFactory; -import com.google.cloud.pubsublite.internal.wire.CommitterBuilder; +import com.google.cloud.pubsublite.internal.wire.AssignerSettings; +import com.google.cloud.pubsublite.internal.wire.Committer; +import com.google.cloud.pubsublite.internal.wire.CommitterSettings; import com.google.cloud.pubsublite.internal.wire.PubsubContext; import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework; +import com.google.cloud.pubsublite.internal.wire.RoutingMetadata; import com.google.cloud.pubsublite.internal.wire.SubscriberBuilder; import com.google.cloud.pubsublite.v1.CursorServiceClient; +import com.google.cloud.pubsublite.v1.CursorServiceSettings; import com.google.cloud.pubsublite.v1.PartitionAssignmentServiceClient; +import com.google.cloud.pubsublite.v1.PartitionAssignmentServiceSettings; import com.google.cloud.pubsublite.v1.SubscriberServiceClient; +import com.google.cloud.pubsublite.v1.SubscriberServiceSettings; +import com.google.common.collect.ImmutableList; import com.google.pubsub.v1.PubsubMessage; import java.util.ArrayList; import java.util.List; @@ -56,29 +65,65 @@ public abstract class SubscriberSettings { private static final Framework FRAMEWORK = Framework.of("CLOUD_PUBSUB_SHIM"); // Required parameters. + /** + * The receiver which handles new messages sent by the Pub/Sub Lite system. Only one downcall from + * any connected partition will be outstanding at a time, and blocking in this receiver callback + * will block forward progress. + */ abstract MessageReceiver receiver(); + /** The subscription to use to receive messages. */ abstract SubscriptionPath subscriptionPath(); + /** + * The per-partition flow control settings. Because these apply per-partition, if you are using + * them to bound memory usage, keep in mind the number of partitions in the associated topic. + */ abstract FlowControlSettings perPartitionFlowControlSettings(); // Optional parameters. - // If set, disables auto-assignment. - abstract Optional> partitions(); + /** + * The partitions this subscriber should connect to to receive messages. If not empty, disables + * auto-assignment. + */ + abstract List partitions(); + /** The MessageTransformer to get PubsubMessages from Pub/Sub Lite wire messages. */ abstract Optional> transformer(); + /** A provider for credentials. */ + abstract CredentialsProvider credentialsProvider(); + + /** + * A supplier for new SubscriberServiceClients. Should return a new client each time. If present, + * ignores CredentialsProvider. + */ abstract Optional> subscriberServiceClientSupplier(); + /** + * A supplier for new CursorServiceClients. Should return a new client each time. If present, + * ignores CredentialsProvider. + */ abstract Optional> cursorServiceClientSupplier(); + /** + * A client to connect to the Pub/Sub lite assignment service. If present, ignores + * CredentialsProvider. + */ abstract Optional assignmentServiceClient(); + /** + * A handler for the action to take when {@link com.google.cloud.pubsub.v1.AckReplyConsumer#nack} + * is called. In Pub/Sub Lite, only a single subscriber for a given subscription is connected to + * any partition at a time, and there is no other client that may be able to handle messages. + */ abstract Optional nackHandler(); public static Builder newBuilder() { - return new AutoValue_SubscriberSettings.Builder(); + return new AutoValue_SubscriberSettings.Builder() + .setPartitions(ImmutableList.of()) + .setCredentialsProvider(GoogleCredentials::getApplicationDefault); } @AutoValue.Builder @@ -114,14 +159,26 @@ public abstract static class Builder { public abstract Builder setTransformer( MessageTransformer transformer); - /** A supplier for new SubscriberServiceClients. Should return a new client each time. */ + /** A provider for credentials. */ + public abstract Builder setCredentialsProvider(CredentialsProvider provider); + + /** + * A supplier for new SubscriberServiceClients. Should return a new client each time. If + * present, ignores CredentialsProvider. + */ public abstract Builder setSubscriberServiceClientSupplier( Supplier supplier); - /** A supplier for new CursorServiceClients. Should return a new client each time. */ + /** + * A supplier for new CursorServiceClients. Should return a new client each time. If present, + * ignores CredentialsProvider. + */ public abstract Builder setCursorServiceClientSupplier(Supplier supplier); - /** A client to connect to the Pub/Sub lite assignment service. */ + /** + * A client to connect to the Pub/Sub lite assignment service. If present, ignores + * CredentialsProvider. + */ public abstract Builder setAssignmentServiceClient(PartitionAssignmentServiceClient client); /** @@ -132,67 +189,101 @@ public abstract Builder setSubscriberServiceClientSupplier( */ public abstract Builder setNackHandler(NackHandler nackHandler); - abstract SubscriberSettings autoBuild(); + public abstract SubscriberSettings build(); + } - /** Build the SubscriberSettings instance. */ - public SubscriberSettings build() throws ApiException { - SubscriberSettings settings = autoBuild(); - checkArgument( - !settings.partitions().isPresent() || !settings.partitions().get().isEmpty(), - "Must provide at least one partition if setting partitions explicitly."); - return settings; + private SubscriberServiceClient newSubscriberServiceClient(Partition partition) + throws ApiException { + if (subscriberServiceClientSupplier().isPresent()) { + return subscriberServiceClientSupplier().get().get(); + } + try { + SubscriberServiceSettings.Builder settingsBuilder = + SubscriberServiceSettings.newBuilder().setCredentialsProvider(credentialsProvider()); + addDefaultMetadata( + PubsubContext.of(FRAMEWORK), + RoutingMetadata.of(subscriptionPath(), partition), + settingsBuilder); + return SubscriberServiceClient.create( + addDefaultSettings(subscriptionPath().location().region(), settingsBuilder)); + } catch (Throwable t) { + throw toCanonical(t).underlying; } } - PartitionSubscriberFactory makePartitionSubscriberFactory(SubscriptionPath canonicalPath) - throws ApiException { - return partition -> { - try { - SubscriberBuilder.Builder wireSubscriberBuilder = SubscriberBuilder.newBuilder(); - wireSubscriberBuilder.setSubscriptionPath(canonicalPath); - subscriberServiceClientSupplier() - .ifPresent(supplier -> wireSubscriberBuilder.setServiceClient(supplier.get())); - wireSubscriberBuilder.setContext(PubsubContext.of(FRAMEWORK)); - wireSubscriberBuilder.setPartition(partition); - - CommitterBuilder.Builder wireCommitterBuilder = CommitterBuilder.newBuilder(); - wireCommitterBuilder.setSubscriptionPath(canonicalPath); - cursorServiceClientSupplier() - .ifPresent(supplier -> wireCommitterBuilder.setServiceClient(supplier.get())); - wireCommitterBuilder.setPartition(partition); - - return new SinglePartitionSubscriber( - receiver(), - transformer().orElse(MessageTransforms.toCpsSubscribeTransformer()), - new AckSetTrackerImpl(wireCommitterBuilder.build()), - nackHandler().orElse(new NackHandler() {}), - messageConsumer -> wireSubscriberBuilder.setMessageConsumer(messageConsumer).build(), - perPartitionFlowControlSettings()); - } catch (Throwable t) { - throw toCanonical(t); - } - }; + private CursorServiceClient newCursorServiceClient() throws ApiException { + if (cursorServiceClientSupplier().isPresent()) { + return cursorServiceClientSupplier().get().get(); + } + try { + return CursorServiceClient.create( + addDefaultSettings( + subscriptionPath().location().region(), + CursorServiceSettings.newBuilder().setCredentialsProvider(credentialsProvider()))); + } catch (Throwable t) { + throw toCanonical(t).underlying; + } + } + + Subscriber newPartitionSubscriber(Partition partition) throws CheckedApiException { + try { + SubscriberBuilder.Builder wireSubscriberBuilder = + SubscriberBuilder.newBuilder() + .setPartition(partition) + .setSubscriptionPath(subscriptionPath()) + .setServiceClient(newSubscriberServiceClient(partition)); + + Committer wireCommitter = + CommitterSettings.newBuilder() + .setSubscriptionPath(subscriptionPath()) + .setPartition(partition) + .setServiceClient(newCursorServiceClient()) + .build() + .instantiate(); + + return new SinglePartitionSubscriber( + receiver(), + transformer().orElse(MessageTransforms.toCpsSubscribeTransformer()), + new AckSetTrackerImpl(wireCommitter), + nackHandler().orElse(new NackHandler() {}), + messageConsumer -> wireSubscriberBuilder.setMessageConsumer(messageConsumer).build(), + perPartitionFlowControlSettings()); + } catch (Throwable t) { + throw toCanonical(t); + } + } + + private PartitionAssignmentServiceClient getAssignmentServiceClient() throws ApiException { + if (assignmentServiceClient().isPresent()) { + return assignmentServiceClient().get(); + } + try { + return PartitionAssignmentServiceClient.create( + addDefaultSettings( + subscriptionPath().location().region(), + PartitionAssignmentServiceSettings.newBuilder() + .setCredentialsProvider(credentialsProvider()))); + } catch (Throwable t) { + throw toCanonical(t).underlying; + } } @SuppressWarnings("CheckReturnValue") Subscriber instantiate() throws ApiException { - PartitionSubscriberFactory partitionSubscriberFactory = - makePartitionSubscriberFactory(subscriptionPath()); + PartitionSubscriberFactory partitionSubscriberFactory = this::newPartitionSubscriber; - if (!partitions().isPresent()) { - AssignerBuilder.Builder assignerBuilder = AssignerBuilder.newBuilder(); - assignerBuilder.setSubscriptionPath(subscriptionPath()); - assignmentServiceClient().ifPresent(assignerBuilder::setServiceClient); + if (!partitions().isEmpty()) { + AssignerSettings.Builder assignerSettings = + AssignerSettings.newBuilder() + .setSubscriptionPath(subscriptionPath()) + .setServiceClient(getAssignmentServiceClient()); AssignerFactory assignerFactory = - receiver -> { - assignerBuilder.setReceiver(receiver); - return assignerBuilder.build(); - }; + receiver -> assignerSettings.setReceiver(receiver).build().instantiate(); return new AssigningSubscriber(partitionSubscriberFactory, assignerFactory); } List perPartitionSubscribers = new ArrayList<>(); - for (Partition partition : partitions().get()) { + for (Partition partition : partitions()) { try { perPartitionSubscribers.add(partitionSubscriberFactory.newSubscriber(partition)); } catch (CheckedApiException e) { diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerBuilder.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerBuilder.java deleted file mode 100755 index 0418bf0f9..000000000 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerBuilder.java +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsublite.internal.wire; - -import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical; -import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings; - -import com.google.api.gax.rpc.ApiException; -import com.google.auto.value.AutoValue; -import com.google.cloud.pubsublite.SubscriptionPath; -import com.google.cloud.pubsublite.proto.InitialPartitionAssignmentRequest; -import com.google.cloud.pubsublite.v1.PartitionAssignmentServiceClient; -import com.google.cloud.pubsublite.v1.PartitionAssignmentServiceSettings; -import com.google.common.flogger.GoogleLogger; -import com.google.protobuf.ByteString; -import java.nio.ByteBuffer; -import java.util.Optional; -import java.util.UUID; - -@AutoValue -public abstract class AssignerBuilder { - private static final GoogleLogger logger = GoogleLogger.forEnclosingClass(); - // Required parameters. - abstract SubscriptionPath subscriptionPath(); - - abstract PartitionAssignmentReceiver receiver(); - - // Optional parameters. - abstract Optional serviceClient(); - - public static Builder newBuilder() { - return new AutoValue_AssignerBuilder.Builder(); - } - - @AutoValue.Builder - public abstract static class Builder { - // Required parameters. - public abstract Builder setSubscriptionPath(SubscriptionPath path); - - public abstract Builder setReceiver(PartitionAssignmentReceiver receiver); - - // Optional parameters. - public abstract Builder setServiceClient(PartitionAssignmentServiceClient serviceClient); - - abstract AssignerBuilder autoBuild(); - - @SuppressWarnings("CheckReturnValue") - public Assigner build() throws ApiException { - AssignerBuilder builder = autoBuild(); - - PartitionAssignmentServiceClient serviceClient; - if (builder.serviceClient().isPresent()) { - serviceClient = builder.serviceClient().get(); - } else { - try { - serviceClient = - PartitionAssignmentServiceClient.create( - addDefaultSettings( - builder.subscriptionPath().location().region(), - PartitionAssignmentServiceSettings.newBuilder())); - } catch (Throwable t) { - throw toCanonical(t).underlying; - } - } - - UUID uuid = UUID.randomUUID(); - ByteBuffer uuidBuffer = ByteBuffer.allocate(16); - uuidBuffer.putLong(uuid.getMostSignificantBits()); - uuidBuffer.putLong(uuid.getLeastSignificantBits()); - logger.atInfo().log( - "Subscription %s using UUID %s for assignment.", builder.subscriptionPath(), uuid); - - InitialPartitionAssignmentRequest initial = - InitialPartitionAssignmentRequest.newBuilder() - .setSubscription(builder.subscriptionPath().toString()) - .setClientId(ByteString.copyFrom(uuidBuffer.array())) - .build(); - return new AssignerImpl(serviceClient, initial, builder.receiver()); - } - } -} diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerSettings.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerSettings.java new file mode 100755 index 000000000..bf6e715cb --- /dev/null +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerSettings.java @@ -0,0 +1,68 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.internal.wire; + +import com.google.auto.value.AutoValue; +import com.google.cloud.pubsublite.SubscriptionPath; +import com.google.cloud.pubsublite.proto.InitialPartitionAssignmentRequest; +import com.google.cloud.pubsublite.v1.PartitionAssignmentServiceClient; +import com.google.common.flogger.GoogleLogger; +import com.google.protobuf.ByteString; +import java.nio.ByteBuffer; +import java.util.UUID; + +@AutoValue +public abstract class AssignerSettings { + private static final GoogleLogger logger = GoogleLogger.forEnclosingClass(); + // Required parameters. + abstract SubscriptionPath subscriptionPath(); + + abstract PartitionAssignmentReceiver receiver(); + + abstract PartitionAssignmentServiceClient serviceClient(); + + public static Builder newBuilder() { + return new AutoValue_AssignerSettings.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + // Required parameters. + public abstract Builder setSubscriptionPath(SubscriptionPath path); + + public abstract Builder setReceiver(PartitionAssignmentReceiver receiver); + + public abstract Builder setServiceClient(PartitionAssignmentServiceClient serviceClient); + + public abstract AssignerSettings build(); + } + + public Assigner instantiate() { + UUID uuid = UUID.randomUUID(); + ByteBuffer uuidBuffer = ByteBuffer.allocate(16); + uuidBuffer.putLong(uuid.getMostSignificantBits()); + uuidBuffer.putLong(uuid.getLeastSignificantBits()); + logger.atInfo().log("Subscription %s using UUID %s for assignment.", subscriptionPath(), uuid); + + InitialPartitionAssignmentRequest initial = + InitialPartitionAssignmentRequest.newBuilder() + .setSubscription(subscriptionPath().toString()) + .setClientId(ByteString.copyFrom(uuidBuffer.array())) + .build(); + return new AssignerImpl(serviceClient(), initial, receiver()); + } +} diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/CommitterBuilder.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/CommitterBuilder.java deleted file mode 100755 index 549dc9129..000000000 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/CommitterBuilder.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright 2020 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.google.cloud.pubsublite.internal.wire; - -import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical; -import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings; - -import com.google.api.gax.rpc.ApiException; -import com.google.auto.value.AutoValue; -import com.google.cloud.pubsublite.Partition; -import com.google.cloud.pubsublite.SubscriptionPath; -import com.google.cloud.pubsublite.proto.InitialCommitCursorRequest; -import com.google.cloud.pubsublite.v1.CursorServiceClient; -import com.google.cloud.pubsublite.v1.CursorServiceSettings; -import java.util.Optional; - -@AutoValue -public abstract class CommitterBuilder { - // Required parameters. - abstract SubscriptionPath subscriptionPath(); - - abstract Partition partition(); - - // Optional parameters. - abstract Optional serviceClient(); - - public static Builder newBuilder() { - return new AutoValue_CommitterBuilder.Builder(); - } - - @AutoValue.Builder - public abstract static class Builder { - // Required parameters. - public abstract Builder setSubscriptionPath(SubscriptionPath path); - - public abstract Builder setPartition(Partition partition); - - // Optional parameters. - public abstract Builder setServiceClient(CursorServiceClient client); - - abstract CommitterBuilder autoBuild(); - - @SuppressWarnings("CheckReturnValue") - public Committer build() throws ApiException { - CommitterBuilder builder = autoBuild(); - - CursorServiceClient serviceClient; - if (builder.serviceClient().isPresent()) { - serviceClient = builder.serviceClient().get(); - } else { - try { - serviceClient = - CursorServiceClient.create( - addDefaultSettings( - builder.subscriptionPath().location().region(), - CursorServiceSettings.newBuilder())); - } catch (Throwable t) { - throw toCanonical(t).underlying; - } - } - - InitialCommitCursorRequest initialCommitCursorRequest = - InitialCommitCursorRequest.newBuilder() - .setSubscription(builder.subscriptionPath().toString()) - .setPartition(builder.partition().value()) - .build(); - return new ApiExceptionCommitter( - new CommitterImpl(serviceClient, initialCommitCursorRequest)); - } - } -} diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/CommitterSettings.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/CommitterSettings.java new file mode 100755 index 000000000..45c663163 --- /dev/null +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/CommitterSettings.java @@ -0,0 +1,59 @@ +/* + * Copyright 2020 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.pubsublite.internal.wire; + +import com.google.auto.value.AutoValue; +import com.google.cloud.pubsublite.Partition; +import com.google.cloud.pubsublite.SubscriptionPath; +import com.google.cloud.pubsublite.proto.InitialCommitCursorRequest; +import com.google.cloud.pubsublite.v1.CursorServiceClient; + +@AutoValue +public abstract class CommitterSettings { + // Required parameters. + abstract SubscriptionPath subscriptionPath(); + + abstract Partition partition(); + + abstract CursorServiceClient serviceClient(); + + public static Builder newBuilder() { + return new AutoValue_CommitterSettings.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + // Required parameters. + public abstract Builder setSubscriptionPath(SubscriptionPath path); + + public abstract Builder setPartition(Partition partition); + + public abstract Builder setServiceClient(CursorServiceClient client); + + public abstract CommitterSettings build(); + } + + public Committer instantiate() { + InitialCommitCursorRequest initialCommitCursorRequest = + InitialCommitCursorRequest.newBuilder() + .setSubscription(subscriptionPath().toString()) + .setPartition(partition().value()) + .build(); + return new ApiExceptionCommitter( + new CommitterImpl(serviceClient(), initialCommitCursorRequest)); + } +} diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisher.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisher.java index 33708580e..c4710de6d 100644 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisher.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisher.java @@ -25,7 +25,11 @@ import com.google.cloud.pubsublite.Message; import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.PublishMetadata; -import com.google.cloud.pubsublite.internal.*; +import com.google.cloud.pubsublite.internal.CheckedApiException; +import com.google.cloud.pubsublite.internal.CloseableMonitor; +import com.google.cloud.pubsublite.internal.ProxyService; +import com.google.cloud.pubsublite.internal.Publisher; +import com.google.cloud.pubsublite.internal.RoutingPolicy; import com.google.common.collect.ImmutableMap; import com.google.common.flogger.GoogleLogger; import com.google.common.util.concurrent.MoreExecutors; @@ -94,11 +98,13 @@ public void stop() { @GuardedBy("monitor.monitor") private Optional partitionsWithRouting = Optional.empty(); - public PartitionCountWatchingPublisher(PartitionCountWatchingPublisherSettings settings) { - this.publisherFactory = settings.publisherFactory(); - this.policyFactory = settings.routingPolicyFactory(); - PartitionCountWatcher configWatcher = - settings.configWatcherFactory().newWatcher(this::handleConfig); + PartitionCountWatchingPublisher( + PartitionPublisherFactory publisherFactory, + RoutingPolicy.Factory policyFactory, + PartitionCountWatcher.Factory configWatcherFactory) { + this.publisherFactory = publisherFactory; + this.policyFactory = policyFactory; + PartitionCountWatcher configWatcher = configWatcherFactory.newWatcher(this::handleConfig); addServices(configWatcher); } diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisherSettings.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisherSettings.java index f57faf04f..10f5d5808 100644 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisherSettings.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisherSettings.java @@ -15,12 +15,12 @@ */ package com.google.cloud.pubsublite.internal.wire; +import com.google.api.gax.rpc.ApiException; import com.google.auto.value.AutoValue; import com.google.cloud.pubsublite.*; import com.google.cloud.pubsublite.internal.DefaultRoutingPolicy; -import com.google.cloud.pubsublite.internal.RoutingPolicy; +import com.google.cloud.pubsublite.internal.Publisher; import java.time.Duration; -import java.util.Optional; @AutoValue public abstract class PartitionCountWatchingPublisherSettings { @@ -29,11 +29,9 @@ public abstract class PartitionCountWatchingPublisherSettings { abstract PartitionPublisherFactory publisherFactory(); - // Optional parameters - abstract PartitionCountWatcher.Factory configWatcherFactory(); - - abstract RoutingPolicy.Factory routingPolicyFactory(); + abstract AdminClient adminClient(); + // Optional parameters abstract Duration configPollPeriod(); public static Builder newBuilder() { @@ -48,38 +46,18 @@ public abstract static class Builder { public abstract Builder setPublisherFactory(PartitionPublisherFactory factory); - // Optional parameters. - public abstract Builder setConfigWatcherFactory(PartitionCountWatcher.Factory factory); - - public abstract Builder setRoutingPolicyFactory(RoutingPolicy.Factory factory); + public abstract Builder setAdminClient(AdminClient client); + // Optional parameters. public abstract Builder setConfigPollPeriod(Duration period); - abstract Optional configWatcherFactory(); - - abstract Optional routingPolicyFactory(); - - abstract Duration configPollPeriod(); - - abstract TopicPath topic(); - - abstract PartitionCountWatchingPublisherSettings autoBuild(); + public abstract PartitionCountWatchingPublisherSettings build(); + } - public PartitionCountWatchingPublisherSettings build() { - if (!configWatcherFactory().isPresent()) { - setConfigWatcherFactory( - new PartitionCountWatcherImpl.Factory( - topic(), - AdminClient.create( - AdminClientSettings.newBuilder() - .setRegion(topic().location().region()) - .build()), - configPollPeriod())); - } - if (!routingPolicyFactory().isPresent()) { - setRoutingPolicyFactory(DefaultRoutingPolicy::new); - } - return autoBuild(); - } + public Publisher instantiate() throws ApiException { + return new PartitionCountWatchingPublisher( + publisherFactory(), + DefaultRoutingPolicy::new, + new PartitionCountWatcherImpl.Factory(topic(), adminClient(), configPollPeriod())); } } diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherBuilder.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherBuilder.java index 18c8ad727..0b687c2e9 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherBuilder.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherBuilder.java @@ -16,10 +16,6 @@ package com.google.cloud.pubsublite.internal.wire; -import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical; -import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultMetadata; -import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings; - import com.google.api.gax.batching.BatchingSettings; import com.google.api.gax.batching.FlowController.LimitExceededBehavior; import com.google.api.gax.rpc.ApiException; @@ -31,10 +27,7 @@ import com.google.cloud.pubsublite.internal.Publisher; import com.google.cloud.pubsublite.proto.InitialPublishRequest; import com.google.cloud.pubsublite.v1.PublisherServiceClient; -import com.google.cloud.pubsublite.v1.PublisherServiceSettings; import com.google.common.base.Preconditions; -import java.util.Optional; -import org.threeten.bp.Duration; /** * A builder for a PubSub Lite Publisher. Basic usage: @@ -55,13 +48,6 @@ */ @AutoValue public abstract class PublisherBuilder { - public static final BatchingSettings DEFAULT_BATCHING_SETTINGS = - BatchingSettings.newBuilder() - .setDelayThreshold(Duration.ofMillis(50)) - .setElementCountThreshold(1000L) - .setRequestByteThreshold(Constants.MAX_PUBLISH_BATCH_BYTES) - .setIsEnabled(true) - .build(); public static final BatchingSettings DISABLED_BATCHING_SETTINGS = BatchingSettings.newBuilder() .setElementCountThreshold(1L) @@ -74,16 +60,13 @@ public abstract class PublisherBuilder { abstract Partition partition(); - // Optional parameters. abstract BatchingSettings batching(); - abstract Optional serviceClient(); - - abstract PubsubContext context(); + abstract PublisherServiceClient serviceClient(); + // Optional parameters. public static Builder builder() { - Builder impl = new AutoValue_PublisherBuilder.Builder(); - return impl.setBatching(DEFAULT_BATCHING_SETTINGS).setContext(PubsubContext.of()); + return new AutoValue_PublisherBuilder.Builder(); } @AutoValue.Builder @@ -93,36 +76,16 @@ public abstract static class Builder { public abstract Builder setPartition(Partition partition); - // Optional parameters. public abstract Builder setBatching(BatchingSettings batching); public abstract Builder setServiceClient(PublisherServiceClient client); - public abstract Builder setContext(PubsubContext context); - abstract PublisherBuilder autoBuild(); public Publisher build() throws ApiException { PublisherBuilder autoBuilt = autoBuild(); - PublisherServiceClient serviceClient; - if (autoBuilt.serviceClient().isPresent()) { - serviceClient = autoBuilt.serviceClient().get(); - } else { - try { - PublisherServiceSettings.Builder settingsBuilder = PublisherServiceSettings.newBuilder(); - addDefaultMetadata( - autoBuilt.context(), - RoutingMetadata.of(autoBuilt.topic(), autoBuilt.partition()), - settingsBuilder); - serviceClient = - PublisherServiceClient.create( - addDefaultSettings(autoBuilt.topic().location().region(), settingsBuilder)); - } catch (Throwable t) { - throw toCanonical(t).underlying; - } - } return new PublisherImpl( - serviceClient, + autoBuilt.serviceClient(), InitialPublishRequest.newBuilder() .setTopic(autoBuilt.topic().toString()) .setPartition(autoBuilt.partition().value()) diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisherBuilder.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisherBuilder.java index ff998b8f8..35a5706b3 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisherBuilder.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisherBuilder.java @@ -24,7 +24,7 @@ import com.google.cloud.pubsublite.TopicPath; import com.google.cloud.pubsublite.internal.Publisher; import com.google.cloud.pubsublite.v1.PublisherServiceClient; -import java.util.Optional; +import com.google.common.annotations.VisibleForTesting; @AutoValue public abstract class SinglePartitionPublisherBuilder { @@ -33,20 +33,15 @@ public abstract class SinglePartitionPublisherBuilder { abstract Partition partition(); - // Optional parameters. - abstract Optional serviceClient(); + abstract PublisherServiceClient serviceClient(); - abstract Optional batchingSettings(); - - // Rarely set parameters. - abstract PubsubContext context(); + abstract BatchingSettings batchingSettings(); // For testing. abstract PublisherBuilder.Builder underlyingBuilder(); public static Builder newBuilder() { return new AutoValue_SinglePartitionPublisherBuilder.Builder() - .setContext(PubsubContext.of()) .setUnderlyingBuilder(PublisherBuilder.builder()); } @@ -58,15 +53,12 @@ public abstract static class Builder { public abstract Builder setPartition(Partition partition); - // Optional parameters. public abstract Builder setServiceClient(PublisherServiceClient serviceClient); public abstract Builder setBatchingSettings(BatchingSettings batchingSettings); - // Rarely set parameters. - public abstract Builder setContext(PubsubContext context); - // For testing. + @VisibleForTesting abstract Builder setUnderlyingBuilder(PublisherBuilder.Builder underlyingBuilder); abstract SinglePartitionPublisherBuilder autoBuild(); @@ -78,9 +70,8 @@ public Publisher build() throws ApiException { .underlyingBuilder() .setTopic(builder.topic()) .setPartition(builder.partition()) - .setContext(builder.context()); - builder.serviceClient().ifPresent(publisherBuilder::setServiceClient); - builder.batchingSettings().ifPresent(publisherBuilder::setBatching); + .setServiceClient(builder.serviceClient()) + .setBatching(builder.batchingSettings()); return new SinglePartitionPublisher(publisherBuilder.build(), builder.partition()); } } diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberBuilder.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberBuilder.java index 10aa4f581..a28ba9e64 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberBuilder.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberBuilder.java @@ -16,10 +16,6 @@ package com.google.cloud.pubsublite.internal.wire; -import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical; -import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultMetadata; -import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings; - import com.google.api.gax.rpc.ApiException; import com.google.auto.value.AutoValue; import com.google.cloud.pubsublite.Partition; @@ -27,9 +23,7 @@ import com.google.cloud.pubsublite.SubscriptionPath; import com.google.cloud.pubsublite.proto.InitialSubscribeRequest; import com.google.cloud.pubsublite.v1.SubscriberServiceClient; -import com.google.cloud.pubsublite.v1.SubscriberServiceSettings; import com.google.common.collect.ImmutableList; -import java.util.Optional; import java.util.function.Consumer; @AutoValue @@ -41,13 +35,10 @@ public abstract class SubscriberBuilder { abstract Partition partition(); - // Optional parameters. - abstract Optional serviceClient(); - - abstract PubsubContext context(); + abstract SubscriberServiceClient serviceClient(); public static Builder newBuilder() { - return new AutoValue_SubscriberBuilder.Builder().setContext(PubsubContext.of()); + return new AutoValue_SubscriberBuilder.Builder(); } @AutoValue.Builder @@ -60,44 +51,22 @@ public abstract Builder setMessageConsumer( public abstract Builder setPartition(Partition partition); - // Optional parameters. public abstract Builder setServiceClient(SubscriberServiceClient serviceClient); - public abstract Builder setContext(PubsubContext context); - abstract SubscriberBuilder autoBuild(); @SuppressWarnings("CheckReturnValue") public Subscriber build() throws ApiException { SubscriberBuilder autoBuilt = autoBuild(); - SubscriberServiceClient serviceClient; - if (autoBuilt.serviceClient().isPresent()) { - serviceClient = autoBuilt.serviceClient().get(); - } else { - try { - SubscriberServiceSettings.Builder settingsBuilder = - SubscriberServiceSettings.newBuilder(); - addDefaultMetadata( - autoBuilt.context(), - RoutingMetadata.of(autoBuilt.subscriptionPath(), autoBuilt.partition()), - settingsBuilder); - serviceClient = - SubscriberServiceClient.create( - addDefaultSettings( - autoBuilt.subscriptionPath().location().region(), settingsBuilder)); - } catch (Throwable t) { - throw toCanonical(t).underlying; - } - } - InitialSubscribeRequest initialSubscribeRequest = InitialSubscribeRequest.newBuilder() .setSubscription(autoBuilt.subscriptionPath().toString()) .setPartition(autoBuilt.partition().value()) .build(); return new ApiExceptionSubscriber( - new SubscriberImpl(serviceClient, initialSubscribeRequest, autoBuilt.messageConsumer())); + new SubscriberImpl( + autoBuilt.serviceClient(), initialSubscribeRequest, autoBuilt.messageConsumer())); } } } diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/PublisherSettingsTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/PublisherSettingsTest.java index 6474a11e9..69aacbda3 100644 --- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/PublisherSettingsTest.java +++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/PublisherSettingsTest.java @@ -65,7 +65,6 @@ public void testSettings() throws CheckedApiException { .setTopicPath(getPath()) .setServiceClientSupplier(() -> mock(PublisherServiceClient.class)) .setUnderlyingBuilder(mockBuilder) - .setPartitionCountWatcherFactory((c) -> fakeWatcher) .build() .instantiate(); } diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/AssignerBuilderTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/AssignerSettingsTest.java similarity index 95% rename from google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/AssignerBuilderTest.java rename to google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/AssignerSettingsTest.java index b2af497fe..531607406 100644 --- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/AssignerBuilderTest.java +++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/AssignerSettingsTest.java @@ -29,10 +29,10 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) -public class AssignerBuilderTest { +public class AssignerSettingsTest { @Test public void testBuilder() { - AssignerBuilder.newBuilder() + AssignerSettings.newBuilder() .setSubscriptionPath( SubscriptionPath.newBuilder() .setLocation(CloudZone.of(CloudRegion.of("us-central1"), 'a')) diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/CommitterBuilderTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/CommitterSettingsTest.java similarity index 92% rename from google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/CommitterBuilderTest.java rename to google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/CommitterSettingsTest.java index 156948463..4cee6256b 100644 --- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/CommitterBuilderTest.java +++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/CommitterSettingsTest.java @@ -30,11 +30,11 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) -public class CommitterBuilderTest { +public class CommitterSettingsTest { @Test public void testBuilder() { Committer unusedCommitter = - CommitterBuilder.newBuilder() + CommitterSettings.newBuilder() .setSubscriptionPath( SubscriptionPath.newBuilder() .setLocation(CloudZone.of(CloudRegion.of("us-central1"), 'a')) @@ -43,6 +43,7 @@ public void testBuilder() { .build()) .setPartition(Partition.of(987)) .setServiceClient(mock(CursorServiceClient.class)) - .build(); + .build() + .instantiate(); } } diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisherTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisherTest.java index d43572df1..3ecd5ac28 100644 --- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisherTest.java +++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PartitionCountWatchingPublisherTest.java @@ -90,17 +90,12 @@ public void setUp() { .startAsync(); publisher = new PartitionCountWatchingPublisher( - PartitionCountWatchingPublisherSettings.newBuilder() - .setConfigWatcherFactory( - c -> { - leakedConsumer = c; - return fakeConfigWatcher; - }) - .setTopic(path()) - .setConfigPollPeriod(PERIOD) - .setPublisherFactory(mockPublisherFactory) - .setRoutingPolicyFactory(mockRoutingPolicyFactory) - .build()); + mockPublisherFactory, + mockRoutingPolicyFactory, + c -> { + leakedConsumer = c; + return fakeConfigWatcher; + }); publisher.startAsync(); publisher.awaitRunning(); diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PublisherBuilderTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PublisherBuilderTest.java index 444a96f30..f658aa556 100644 --- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PublisherBuilderTest.java +++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/PublisherBuilderTest.java @@ -25,6 +25,7 @@ import com.google.cloud.pubsublite.ProjectNumber; import com.google.cloud.pubsublite.TopicName; import com.google.cloud.pubsublite.TopicPath; +import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings; import com.google.cloud.pubsublite.internal.Publisher; import com.google.cloud.pubsublite.v1.PublisherServiceClient; import org.junit.Test; @@ -37,7 +38,7 @@ public class PublisherBuilderTest { public void testBuilder() { Publisher unusedPublisher = PublisherBuilder.builder() - .setBatching(PublisherBuilder.DEFAULT_BATCHING_SETTINGS) + .setBatching(PublisherSettings.DEFAULT_BATCHING_SETTINGS) .setTopic( TopicPath.newBuilder() .setLocation(CloudZone.of(CloudRegion.of("us-central1"), 'a')) diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisherTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisherTest.java index 3ff6c72b8..7229e55ff 100644 --- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisherTest.java +++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SinglePartitionPublisherTest.java @@ -26,6 +26,7 @@ import com.google.api.core.ApiFuture; import com.google.api.core.SettableApiFuture; +import com.google.api.gax.batching.BatchingSettings; import com.google.cloud.pubsublite.CloudRegion; import com.google.cloud.pubsublite.CloudZone; import com.google.cloud.pubsublite.Message; @@ -37,6 +38,7 @@ import com.google.cloud.pubsublite.TopicPath; import com.google.cloud.pubsublite.internal.Publisher; import com.google.cloud.pubsublite.internal.testing.FakeApiService; +import com.google.cloud.pubsublite.v1.PublisherServiceClient; import com.google.protobuf.ByteString; import org.junit.Before; import org.junit.Test; @@ -73,6 +75,8 @@ public void setUp() { .setTopic(topic) .setPartition(partition) .setUnderlyingBuilder(mockBuilder) + .setBatchingSettings(BatchingSettings.newBuilder().setIsEnabled(false).build()) + .setServiceClient(mock(PublisherServiceClient.class)) .build(); this.pub.startAsync().awaitRunning(); } diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PublisherOptions.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PublisherOptions.java index 635dc956c..50c582c33 100644 --- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PublisherOptions.java +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/PublisherOptions.java @@ -16,15 +16,24 @@ package com.google.cloud.pubsublite.beam; +import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical; +import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultMetadata; +import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings; + +import com.google.api.gax.rpc.ApiException; import com.google.auto.value.AutoValue; +import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.PublishMetadata; import com.google.cloud.pubsublite.TopicPath; +import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings; import com.google.cloud.pubsublite.internal.Publisher; import com.google.cloud.pubsublite.internal.wire.PubsubContext; import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework; +import com.google.cloud.pubsublite.internal.wire.RoutingMetadata; import com.google.cloud.pubsublite.internal.wire.RoutingPublisherBuilder; import com.google.cloud.pubsublite.internal.wire.SinglePartitionPublisherBuilder; import com.google.cloud.pubsublite.v1.PublisherServiceClient; +import com.google.cloud.pubsublite.v1.PublisherServiceSettings; import java.io.Serializable; import javax.annotation.Nullable; @@ -53,10 +62,26 @@ public boolean usesCache() { return clientSupplier() == null; } + private PublisherServiceClient newServiceClient(Partition partition) throws ApiException { + if (clientSupplier() != null) return clientSupplier().get(); + PublisherServiceSettings.Builder settingsBuilder = PublisherServiceSettings.newBuilder(); + settingsBuilder = + addDefaultMetadata( + PubsubContext.of(FRAMEWORK), + RoutingMetadata.of(topicPath(), partition), + settingsBuilder); + try { + return PublisherServiceClient.create( + addDefaultSettings(topicPath().location().region(), settingsBuilder)); + } catch (Throwable t) { + throw toCanonical(t).underlying; + } + } + @SuppressWarnings("CheckReturnValue") Publisher getPublisher() { SinglePartitionPublisherBuilder.Builder singlePartitionPublisherBuilder = - SinglePartitionPublisherBuilder.newBuilder().setContext(PubsubContext.of(FRAMEWORK)); + SinglePartitionPublisherBuilder.newBuilder(); if (clientSupplier() != null) { singlePartitionPublisherBuilder.setServiceClient(clientSupplier().get()); } @@ -64,9 +89,11 @@ Publisher getPublisher() { .setTopic(topicPath()) .setPublisherFactory( partition -> - singlePartitionPublisherBuilder + SinglePartitionPublisherBuilder.newBuilder() .setTopic(topicPath()) .setPartition(partition) + .setServiceClient(newServiceClient(partition)) + .setBatchingSettings(PublisherSettings.DEFAULT_BATCHING_SETTINGS) .build()) .build(); } diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java index 09830a594..a689041f1 100644 --- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java @@ -16,6 +16,10 @@ package com.google.cloud.pubsublite.beam; +import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical; +import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultMetadata; +import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings; + import com.google.api.gax.rpc.ApiException; import com.google.auto.value.AutoValue; import com.google.cloud.pubsublite.Partition; @@ -23,13 +27,16 @@ import com.google.cloud.pubsublite.SubscriptionPath; import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings; import com.google.cloud.pubsublite.internal.wire.Committer; -import com.google.cloud.pubsublite.internal.wire.CommitterBuilder; +import com.google.cloud.pubsublite.internal.wire.CommitterSettings; import com.google.cloud.pubsublite.internal.wire.PubsubContext; import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework; +import com.google.cloud.pubsublite.internal.wire.RoutingMetadata; import com.google.cloud.pubsublite.internal.wire.SubscriberBuilder; import com.google.cloud.pubsublite.internal.wire.SubscriberFactory; import com.google.cloud.pubsublite.v1.CursorServiceClient; +import com.google.cloud.pubsublite.v1.CursorServiceSettings; import com.google.cloud.pubsublite.v1.SubscriberServiceClient; +import com.google.cloud.pubsublite.v1.SubscriberServiceSettings; import com.google.common.base.Optional; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -79,6 +86,24 @@ public static Builder newBuilder() { public abstract Builder toBuilder(); + private SubscriberServiceClient newSubscriberServiceClient(Partition partition) + throws ApiException { + if (subscriberClientSupplier().isPresent()) { + return subscriberClientSupplier().get().get(); + } + try { + SubscriberServiceSettings.Builder settingsBuilder = + addDefaultMetadata( + PubsubContext.of(FRAMEWORK), + RoutingMetadata.of(subscriptionPath(), partition), + SubscriberServiceSettings.newBuilder()); + return SubscriberServiceClient.create( + addDefaultSettings(subscriptionPath().location().region(), settingsBuilder)); + } catch (Throwable t) { + throw toCanonical(t).underlying; + } + } + @SuppressWarnings("CheckReturnValue") public ImmutableMap getSubscriberFactories() { ImmutableMap.Builder factories = ImmutableMap.builder(); @@ -87,21 +112,30 @@ public ImmutableMap getSubscriberFactories() { partition, subscriberFactory() .or( - consumer -> { - SubscriberBuilder.Builder builder = SubscriberBuilder.newBuilder(); - builder.setMessageConsumer(consumer); - builder.setSubscriptionPath(subscriptionPath()); - builder.setPartition(partition); - builder.setContext(PubsubContext.of(FRAMEWORK)); - if (subscriberClientSupplier().isPresent()) { - builder.setServiceClient(subscriberClientSupplier().get().get()); - } - return builder.build(); - })); + consumer -> + SubscriberBuilder.newBuilder() + .setMessageConsumer(consumer) + .setSubscriptionPath(subscriptionPath()) + .setPartition(partition) + .setServiceClient(newSubscriberServiceClient(partition)) + .build())); } return factories.build(); } + private CursorServiceClient newCursorServiceClient() throws ApiException { + if (committerClientSupplier().isPresent()) { + return committerClientSupplier().get().get(); + } + try { + return CursorServiceClient.create( + addDefaultSettings( + subscriptionPath().location().region(), CursorServiceSettings.newBuilder())); + } catch (Throwable t) { + throw toCanonical(t).underlying; + } + } + @SuppressWarnings("CheckReturnValue") public ImmutableMap getCommitters() throws ApiException { ImmutableMap.Builder committers = ImmutableMap.builder(); @@ -109,13 +143,11 @@ public ImmutableMap getCommitters() throws ApiException { if (committerSupplier().isPresent()) { committers.put(partition, committerSupplier().get().get()); } else { - CommitterBuilder.Builder builder = CommitterBuilder.newBuilder(); + CommitterSettings.Builder builder = CommitterSettings.newBuilder(); builder.setSubscriptionPath(subscriptionPath()); builder.setPartition(partition); - if (committerClientSupplier().isPresent()) { - builder.setServiceClient(committerClientSupplier().get().get()); - } - committers.put(partition, builder.build()); + builder.setServiceClient(newCursorServiceClient()); + committers.put(partition, builder.build().instantiate()); } } return committers.build(); From 6dd21e0d3f86ac8711078a59d2e632eba7b9137e Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Sun, 31 Jan 2021 22:57:05 -0500 Subject: [PATCH 02/10] feat: Add CredentialsProvider to Publisher and Subscriber settings These objects require more than one client and are annoying to construct correctly. Add a commonly needed setting to the top level here. Fixes #472 --- .../pubsublite/cloudpubsub/PublisherSettings.java | 11 +++++++++-- .../pubsublite/cloudpubsub/PublisherSettingsTest.java | 2 ++ .../cloudpubsub/SubscriberSettingsTest.java | 7 +++---- 3 files changed, 14 insertions(+), 6 deletions(-) diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/PublisherSettings.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/PublisherSettings.java index fe13b2c34..009f156c4 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/PublisherSettings.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/PublisherSettings.java @@ -86,6 +86,9 @@ public abstract class PublisherSettings { */ abstract Optional> serviceClientSupplier(); + /** The AdminClient to use, if provided. */ + abstract Optional adminClient(); + // For testing. abstract SinglePartitionPublisherBuilder.Builder underlyingBuilder(); @@ -123,6 +126,9 @@ public abstract Builder setMessageTransformer( */ public abstract Builder setServiceClientSupplier(Supplier supplier); + /** The AdminClient to use, if provided. */ + public abstract Builder setAdminClient(AdminClient adminClient); + // For testing. @VisibleForTesting abstract Builder setUnderlyingBuilder( @@ -148,7 +154,8 @@ private PublisherServiceClient newServiceClient(Partition partition) throws ApiE } } - private AdminClient newAdminClient() throws ApiException { + private AdminClient getAdminClient() throws ApiException { + if (adminClient().isPresent()) return adminClient().get(); try { return AdminClient.create( AdminClientSettings.newBuilder() @@ -185,7 +192,7 @@ Publisher instantiate() throws ApiException { .setServiceClient(newServiceClient(partition)); return singlePartitionBuilder.build(); }) - .setAdminClient(newAdminClient()); + .setAdminClient(getAdminClient()); return new WrappingPublisher(publisherSettings.build().instantiate(), messageTransformer); } } diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/PublisherSettingsTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/PublisherSettingsTest.java index 69aacbda3..acdc76d36 100644 --- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/PublisherSettingsTest.java +++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/PublisherSettingsTest.java @@ -21,6 +21,7 @@ import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; +import com.google.cloud.pubsublite.AdminClient; import com.google.cloud.pubsublite.CloudZone; import com.google.cloud.pubsublite.ProjectNumber; import com.google.cloud.pubsublite.PublishMetadata; @@ -64,6 +65,7 @@ public void testSettings() throws CheckedApiException { PublisherSettings.newBuilder() .setTopicPath(getPath()) .setServiceClientSupplier(() -> mock(PublisherServiceClient.class)) + .setAdminClient(mock(AdminClient.class)) .setUnderlyingBuilder(mockBuilder) .build() .instantiate(); diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/SubscriberSettingsTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/SubscriberSettingsTest.java index 495d3c0a0..5aacac7e8 100644 --- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/SubscriberSettingsTest.java +++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/cloudpubsub/SubscriberSettingsTest.java @@ -24,7 +24,6 @@ import com.google.cloud.pubsublite.ProjectNumber; import com.google.cloud.pubsublite.SubscriptionName; import com.google.cloud.pubsublite.SubscriptionPath; -import com.google.cloud.pubsublite.internal.CheckedApiException; import com.google.cloud.pubsublite.v1.CursorServiceClient; import com.google.cloud.pubsublite.v1.PartitionAssignmentServiceClient; import com.google.cloud.pubsublite.v1.SubscriberServiceClient; @@ -35,7 +34,7 @@ @RunWith(JUnit4.class) public class SubscriberSettingsTest { - SubscriptionPath getPath() throws CheckedApiException { + SubscriptionPath getPath() { return SubscriptionPath.newBuilder() .setProject(ProjectNumber.of(56)) .setLocation(CloudZone.parse("us-central1-a")) @@ -44,7 +43,7 @@ SubscriptionPath getPath() throws CheckedApiException { } @Test - public void testSettingsWithPartitons() throws CheckedApiException { + public void testSettingsWithPartitons() { SubscriberSettings.newBuilder() .setReceiver(mock(MessageReceiver.class)) .setSubscriptionPath(getPath()) @@ -58,7 +57,7 @@ public void testSettingsWithPartitons() throws CheckedApiException { } @Test - public void testSettingsWithoutPartitons() throws CheckedApiException { + public void testSettingsWithoutPartitons() { SubscriberSettings.newBuilder() .setReceiver(mock(MessageReceiver.class)) .setSubscriptionPath(getPath()) From b6fda04b1860c96b08f91b636f2e0ec90a81fc3e Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Sun, 31 Jan 2021 23:00:28 -0500 Subject: [PATCH 03/10] feat: Add CredentialsProvider to Publisher and Subscriber settings These objects require more than one client and are annoying to construct correctly. Add a commonly needed setting to the top level here. Fixes #472 --- .../google/cloud/pubsublite/cloudpubsub/SubscriberSettings.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/SubscriberSettings.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/SubscriberSettings.java index b284c54e8..d0211c134 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/SubscriberSettings.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/SubscriberSettings.java @@ -272,7 +272,7 @@ private PartitionAssignmentServiceClient getAssignmentServiceClient() throws Api Subscriber instantiate() throws ApiException { PartitionSubscriberFactory partitionSubscriberFactory = this::newPartitionSubscriber; - if (!partitions().isEmpty()) { + if (partitions().isEmpty()) { AssignerSettings.Builder assignerSettings = AssignerSettings.newBuilder() .setSubscriptionPath(subscriptionPath()) From 056cb28035d0623cac474e878f84b8598972b6e5 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Sun, 31 Jan 2021 23:14:57 -0500 Subject: [PATCH 04/10] fix: clirr --- .../clirr-ignored-differences.xml | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/google-cloud-pubsublite/clirr-ignored-differences.xml b/google-cloud-pubsublite/clirr-ignored-differences.xml index a721437a8..f652e80d9 100644 --- a/google-cloud-pubsublite/clirr-ignored-differences.xml +++ b/google-cloud-pubsublite/clirr-ignored-differences.xml @@ -1,15 +1,11 @@ - + - 8001 - com/google/cloud/pubsublite/ProjectLookupUtils - - - 6011 - com/google/cloud/pubsublite/Constants - MAX_PUBLISH_MESSAGE_BYTES + 7013 + **/*$Builder + * @@ -30,6 +26,7 @@ 6001 com/google/cloud/pubsublite/internal/** + * * @@ -113,6 +110,7 @@ 6001 com/google/cloud/pubsublite/cloudpubsub/internal/** + * * @@ -173,6 +171,7 @@ 6001 com/google/cloud/pubsublite/v1/** + * * From cbd26737ad0222dd666fa39901900df7263cb2b0 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Sun, 31 Jan 2021 23:19:25 -0500 Subject: [PATCH 05/10] fix: flaky test --- .../cloud/pubsublite/internal/wire/SubscriberImplTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SubscriberImplTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SubscriberImplTest.java index 5abd11750..df077b3b8 100755 --- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SubscriberImplTest.java +++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SubscriberImplTest.java @@ -183,9 +183,11 @@ public void batchesFlowControlRequests() throws Exception { @Test public void messagesEmpty_IsError() throws Exception { + Future failed = whenFailed(permanentErrorHandler); subscriber.allowFlow(bigFlowControlRequest()); leakedResponseObserver.onResponse(Response.ofMessages(ImmutableList.of())); assertThrows(IllegalStateException.class, subscriber::awaitTerminated); + failed.get(); verify(permanentErrorHandler) .failed(any(), argThat(new ApiExceptionMatcher(Code.INVALID_ARGUMENT))); } From 45ae1fba959faaaed305a20fc3f764cfd3567f3d Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Mon, 1 Feb 2021 08:56:08 -0500 Subject: [PATCH 06/10] fix: deps --- google-cloud-pubsublite/pom.xml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/google-cloud-pubsublite/pom.xml b/google-cloud-pubsublite/pom.xml index 04153d819..205984d98 100644 --- a/google-cloud-pubsublite/pom.xml +++ b/google-cloud-pubsublite/pom.xml @@ -93,6 +93,14 @@ io.grpc grpc-protobuf + + com.google.auth + google-auth-library-oauth2-http + + + com.google.auth + google-auth-library-credentials + From 0f7be6cf4aa75f04e091fbd4d660e93bb3816f37 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Mon, 1 Feb 2021 09:12:52 -0500 Subject: [PATCH 07/10] fix: scopes --- .../google/cloud/pubsublite/cloudpubsub/PublisherSettings.java | 2 +- .../google/cloud/pubsublite/cloudpubsub/SubscriberSettings.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/PublisherSettings.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/PublisherSettings.java index 009f156c4..8996e41fa 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/PublisherSettings.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/PublisherSettings.java @@ -95,7 +95,7 @@ public abstract class PublisherSettings { /** Get a new builder for a PublisherSettings. */ public static Builder newBuilder() { return new AutoValue_PublisherSettings.Builder() - .setCredentialsProvider(() -> GoogleCredentials.getApplicationDefault()) + .setCredentialsProvider(PublisherServiceSettings.defaultCredentialsProviderBuilder().build()) .setUnderlyingBuilder(SinglePartitionPublisherBuilder.newBuilder()); } diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/SubscriberSettings.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/SubscriberSettings.java index d0211c134..df99bdde5 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/SubscriberSettings.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/SubscriberSettings.java @@ -123,7 +123,7 @@ public abstract class SubscriberSettings { public static Builder newBuilder() { return new AutoValue_SubscriberSettings.Builder() .setPartitions(ImmutableList.of()) - .setCredentialsProvider(GoogleCredentials::getApplicationDefault); + .setCredentialsProvider(SubscriberServiceSettings.defaultCredentialsProviderBuilder().build()); } @AutoValue.Builder From 8144b32c68c1f18d43a3734f19a8eaabc5a1b3ca Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Mon, 1 Feb 2021 09:14:32 -0500 Subject: [PATCH 08/10] fix: format --- .../cloud/pubsublite/cloudpubsub/PublisherSettings.java | 4 ++-- .../cloud/pubsublite/cloudpubsub/SubscriberSettings.java | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/PublisherSettings.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/PublisherSettings.java index 8996e41fa..7031b322a 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/PublisherSettings.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/PublisherSettings.java @@ -23,7 +23,6 @@ import com.google.api.gax.batching.BatchingSettings; import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.rpc.ApiException; -import com.google.auth.oauth2.GoogleCredentials; import com.google.auto.value.AutoValue; import com.google.cloud.pubsublite.AdminClient; import com.google.cloud.pubsublite.AdminClientSettings; @@ -95,7 +94,8 @@ public abstract class PublisherSettings { /** Get a new builder for a PublisherSettings. */ public static Builder newBuilder() { return new AutoValue_PublisherSettings.Builder() - .setCredentialsProvider(PublisherServiceSettings.defaultCredentialsProviderBuilder().build()) + .setCredentialsProvider( + PublisherServiceSettings.defaultCredentialsProviderBuilder().build()) .setUnderlyingBuilder(SinglePartitionPublisherBuilder.newBuilder()); } diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/SubscriberSettings.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/SubscriberSettings.java index df99bdde5..7a22a3d20 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/SubscriberSettings.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/SubscriberSettings.java @@ -22,7 +22,6 @@ import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.rpc.ApiException; -import com.google.auth.oauth2.GoogleCredentials; import com.google.auto.value.AutoValue; import com.google.cloud.pubsub.v1.MessageReceiver; import com.google.cloud.pubsublite.MessageTransformer; @@ -123,7 +122,8 @@ public abstract class SubscriberSettings { public static Builder newBuilder() { return new AutoValue_SubscriberSettings.Builder() .setPartitions(ImmutableList.of()) - .setCredentialsProvider(SubscriberServiceSettings.defaultCredentialsProviderBuilder().build()); + .setCredentialsProvider( + SubscriberServiceSettings.defaultCredentialsProviderBuilder().build()); } @AutoValue.Builder From 997a3526ad51af9d9fdb56761791fea0fd69f097 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Mon, 1 Feb 2021 09:19:03 -0500 Subject: [PATCH 09/10] fix: dependencies again --- google-cloud-pubsublite/pom.xml | 8 -------- 1 file changed, 8 deletions(-) diff --git a/google-cloud-pubsublite/pom.xml b/google-cloud-pubsublite/pom.xml index 205984d98..04153d819 100644 --- a/google-cloud-pubsublite/pom.xml +++ b/google-cloud-pubsublite/pom.xml @@ -93,14 +93,6 @@ io.grpc grpc-protobuf - - com.google.auth - google-auth-library-oauth2-http - - - com.google.auth - google-auth-library-credentials - From c1b304c080e4a68282582171bbf4ac3912b50637 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Mon, 1 Feb 2021 09:33:39 -0500 Subject: [PATCH 10/10] fix: admin endpoint --- .../cloud/pubsublite/cloudpubsub/PublisherSettings.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/PublisherSettings.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/PublisherSettings.java index 7031b322a..5e1bb291f 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/PublisherSettings.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/cloudpubsub/PublisherSettings.java @@ -161,9 +161,10 @@ private AdminClient getAdminClient() throws ApiException { AdminClientSettings.newBuilder() .setServiceClient( AdminServiceClient.create( - AdminServiceSettings.newBuilder() - .setCredentialsProvider(credentialsProvider()) - .build())) + addDefaultSettings( + topicPath().location().region(), + AdminServiceSettings.newBuilder() + .setCredentialsProvider(credentialsProvider())))) .setRegion(topicPath().location().region()) .build()); } catch (Throwable t) {