Skip to content

Commit

Permalink
feat: Use the partition watching publisher in the cps client (#409)
Browse files Browse the repository at this point in the history
* feat: Use the partition watching publisher in the cps client

* replace wildcard imports with single class imports
  • Loading branch information
palmere-google committed Dec 10, 2020
1 parent 63c34f0 commit b85ceb4
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@
import com.google.cloud.pubsublite.MessageTransformer;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.internal.WrappingPublisher;
import com.google.cloud.pubsublite.internal.wire.PartitionCountWatcher;
import com.google.cloud.pubsublite.internal.wire.PartitionCountWatchingPublisher;
import com.google.cloud.pubsublite.internal.wire.PartitionCountWatchingPublisherSettings;
import com.google.cloud.pubsublite.internal.wire.PubsubContext;
import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework;
import com.google.cloud.pubsublite.internal.wire.RoutingPublisherBuilder;
import com.google.cloud.pubsublite.internal.wire.SinglePartitionPublisherBuilder;
import com.google.cloud.pubsublite.v1.PublisherServiceClient;
import com.google.pubsub.v1.PubsubMessage;
Expand Down Expand Up @@ -69,8 +71,7 @@ public abstract class PublisherSettings {
// For testing.
abstract SinglePartitionPublisherBuilder.Builder underlyingBuilder();

// For testing.
abstract Optional<Integer> numPartitions();
abstract Optional<PartitionCountWatcher.Factory> partitionCountWatcherFactory();

/** Get a new builder for a PublisherSettings. */
public static Builder newBuilder() {
Expand Down Expand Up @@ -103,8 +104,7 @@ public abstract Builder setMessageTransformer(
abstract Builder setUnderlyingBuilder(
SinglePartitionPublisherBuilder.Builder underlyingBuilder);

// For testing.
abstract Builder setNumPartitions(int numPartitions);
abstract Builder setPartitionCountWatcherFactory(PartitionCountWatcher.Factory factory);

public abstract PublisherSettings build();
}
Expand All @@ -117,8 +117,8 @@ Publisher instantiate() throws ApiException {
messageTransformer()
.orElseGet(() -> MessageTransforms.fromCpsPublishTransformer(keyExtractor));

RoutingPublisherBuilder.Builder wireBuilder =
RoutingPublisherBuilder.newBuilder()
PartitionCountWatchingPublisherSettings.Builder publisherSettings =
PartitionCountWatchingPublisherSettings.newBuilder()
.setTopic(topicPath())
.setPublisherFactory(
partition -> {
Expand All @@ -133,9 +133,8 @@ Publisher instantiate() throws ApiException {
supplier -> singlePartitionBuilder.setServiceClient(supplier.get()));
return singlePartitionBuilder.build();
});

numPartitions().ifPresent(wireBuilder::setNumPartitions);

return new WrappingPublisher(wireBuilder.build(), messageTransformer);
partitionCountWatcherFactory().ifPresent(publisherSettings::setConfigWatcherFactory);
return new WrappingPublisher(
new PartitionCountWatchingPublisher(publisherSettings.build()), messageTransformer);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.internal.testing.FakeApiService;
import com.google.cloud.pubsublite.internal.wire.PartitionCountWatcher;
import com.google.cloud.pubsublite.internal.wire.SinglePartitionPublisherBuilder;
import com.google.cloud.pubsublite.v1.PublisherServiceClient;
import org.junit.Test;
Expand All @@ -49,7 +50,10 @@ TopicPath getPath() throws CheckedApiException {
abstract static class FakePublisher extends FakeApiService
implements Publisher<PublishMetadata> {}

abstract static class FakeConfigWatcher extends FakeApiService implements PartitionCountWatcher {}

@Spy private FakePublisher underlying;
@Spy private FakeConfigWatcher fakeWatcher;

@Test
public void testSettings() throws CheckedApiException {
Expand All @@ -61,7 +65,7 @@ public void testSettings() throws CheckedApiException {
.setTopicPath(getPath())
.setServiceClientSupplier(() -> mock(PublisherServiceClient.class))
.setUnderlyingBuilder(mockBuilder)
.setNumPartitions(77)
.setPartitionCountWatcherFactory((c) -> fakeWatcher)
.build()
.instantiate();
}
Expand Down

0 comments on commit b85ceb4

Please sign in to comment.