diff --git a/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java b/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java index 383b0995e..ec6123f32 100644 --- a/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java +++ b/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/ConsumerSettings.java @@ -20,6 +20,7 @@ import com.google.cloud.pubsublite.AdminClient; import com.google.cloud.pubsublite.AdminClientSettings; import com.google.cloud.pubsublite.CloudZone; +import com.google.cloud.pubsublite.PartitionLookupUtils; import com.google.cloud.pubsublite.SubscriptionPath; import com.google.cloud.pubsublite.TopicPath; import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings; @@ -67,12 +68,12 @@ public abstract static class Builder { } public Consumer instantiate() throws StatusException { - try { - CloudZone zone = subscriptionPath().location(); - AdminClient adminClient = - AdminClient.create(AdminClientSettings.newBuilder().setRegion(zone.region()).build()); + CloudZone zone = subscriptionPath().location(); + try (AdminClient adminClient = + AdminClient.create(AdminClientSettings.newBuilder().setRegion(zone.region()).build())) { Subscription subscription = adminClient.getSubscription(subscriptionPath()).get(); TopicPath topic = TopicPath.parse(subscription.getTopic()); + long partitionCount = PartitionLookupUtils.numPartitions(topic); AssignerFactory assignerFactory = receiver -> { AssignerBuilder.Builder builder = AssignerBuilder.newBuilder(); @@ -111,7 +112,12 @@ public Consumer instantiate() throws StatusException { CursorClient.create(CursorClientSettings.newBuilder().setRegion(zone.region()).build()); return new PubsubLiteConsumer( - subscriptionPath(), topic, consumerFactory, assignerFactory, adminClient, cursorClient); + subscriptionPath(), + topic, + partitionCount, + consumerFactory, + assignerFactory, + cursorClient); } catch (Exception e) { throw ExtractStatus.toCanonical(e); } diff --git a/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/ProducerSettings.java b/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/ProducerSettings.java index e0e3774bb..49a55b39a 100644 --- a/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/ProducerSettings.java +++ b/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/ProducerSettings.java @@ -17,9 +17,7 @@ package com.google.cloud.pubsublite.kafka; import com.google.auto.value.AutoValue; -import com.google.cloud.pubsublite.AdminClient; -import com.google.cloud.pubsublite.AdminClientSettings; -import com.google.cloud.pubsublite.CloudZone; +import com.google.cloud.pubsublite.PartitionLookupUtils; import com.google.cloud.pubsublite.TopicPath; import com.google.cloud.pubsublite.internal.wire.PubsubContext; import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework; @@ -54,9 +52,7 @@ public Producer instantiate() throws StatusException { .setTopic(topicPath()); RoutingPublisherBuilder.Builder routingBuilder = RoutingPublisherBuilder.newBuilder().setTopic(topicPath()).setPublisherBuilder(builder); - CloudZone zone = topicPath().location(); - AdminClient adminClient = - AdminClient.create(AdminClientSettings.newBuilder().setRegion(zone.region()).build()); - return new PubsubLiteProducer(routingBuilder.build(), adminClient, topicPath()); + return new PubsubLiteProducer( + routingBuilder.build(), PartitionLookupUtils.numPartitions(topicPath()), topicPath()); } } diff --git a/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java b/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java index 0a13fa49b..19fd03b93 100644 --- a/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java +++ b/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumer.java @@ -20,7 +20,6 @@ import com.google.api.core.ApiFutureCallback; import com.google.api.core.ApiFutures; -import com.google.cloud.pubsublite.AdminClient; import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.SubscriptionPath; @@ -68,13 +67,13 @@ *

This also filters methods that Pub/Sub Lite will not implement. */ class PubsubLiteConsumer implements Consumer { - private static Duration INFINITE_DURATION = Duration.ofMillis(Long.MAX_VALUE); - private static GoogleLogger logger = GoogleLogger.forEnclosingClass(); + private static final Duration INFINITE_DURATION = Duration.ofMillis(Long.MAX_VALUE); + private static final GoogleLogger logger = GoogleLogger.forEnclosingClass(); private final SubscriptionPath subscriptionPath; private final TopicPath topicPath; + private final long partitionCount; private final ConsumerFactory consumerFactory; private final AssignerFactory assignerFactory; - private final AdminClient adminClient; private final CursorClient cursorClient; private Optional assigner = Optional.empty(); private Optional consumer = Optional.empty(); @@ -82,15 +81,15 @@ class PubsubLiteConsumer implements Consumer { PubsubLiteConsumer( SubscriptionPath subscriptionPath, TopicPath topicPath, + long partitionCount, ConsumerFactory consumerFactory, AssignerFactory assignerFactory, - AdminClient adminClient, CursorClient cursorClient) { this.subscriptionPath = subscriptionPath; this.topicPath = topicPath; + this.partitionCount = partitionCount; this.consumerFactory = consumerFactory; this.assignerFactory = assignerFactory; - this.adminClient = adminClient; this.cursorClient = cursorClient; } @@ -443,7 +442,7 @@ public List partitionsFor(String s) { @Override public List partitionsFor(String topic, Duration timeout) { checkTopic(topic); - return SharedBehavior.partitionsFor(adminClient, topicPath, timeout); + return SharedBehavior.partitionsFor(partitionCount, topicPath); } @Override @@ -509,11 +508,6 @@ public void close(long l, TimeUnit timeUnit) { @Override public void close(Duration timeout) { - try { - adminClient.close(); - } catch (Exception e) { - logger.atSevere().withCause(e).log("Error closing admin client during Consumer shutdown."); - } try { cursorClient.close(); } catch (Exception e) { diff --git a/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducer.java b/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducer.java index be0749421..dbb7090bd 100644 --- a/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducer.java +++ b/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducer.java @@ -24,7 +24,6 @@ import com.google.api.core.ApiFutures; import com.google.api.core.ApiService.Listener; import com.google.api.core.ApiService.State; -import com.google.cloud.pubsublite.AdminClient; import com.google.cloud.pubsublite.PublishMetadata; import com.google.cloud.pubsublite.TopicPath; import com.google.cloud.pubsublite.internal.ExtractStatus; @@ -52,21 +51,20 @@ import org.apache.kafka.common.errors.UnsupportedVersionException; class PubsubLiteProducer implements Producer { - private static Duration INFINITE_DURATION = Duration.ofMillis(Long.MAX_VALUE); private static final UnsupportedVersionException NO_TRANSACTIONS_EXCEPTION = new UnsupportedVersionException( "Pub/Sub Lite is a non-transactional system and does not support producer transactions."); private static final GoogleLogger logger = GoogleLogger.forEnclosingClass(); private final Publisher publisher; - private final AdminClient adminClient; private final TopicPath topicPath; + private final long partitionCount; PubsubLiteProducer( - Publisher publisher, AdminClient adminClient, TopicPath topicPath) { + Publisher publisher, long partitionCount, TopicPath topicPath) { this.publisher = publisher; - this.adminClient = adminClient; this.topicPath = topicPath; + this.partitionCount = partitionCount; this.publisher.addListener( new Listener() { @Override @@ -177,7 +175,7 @@ public void flush() { @Override public List partitionsFor(String s) { checkTopic(s); - return SharedBehavior.partitionsFor(adminClient, topicPath, INFINITE_DURATION); + return SharedBehavior.partitionsFor(partitionCount, topicPath); } @Override @@ -192,11 +190,6 @@ public void close() { @Override public void close(Duration duration) { - try { - adminClient.close(); - } catch (Exception e) { - logger.atWarning().withCause(e).log("Failed to close admin client."); - } try { publisher.stopAsync().awaitTerminated(duration.toMillis(), MILLISECONDS); } catch (TimeoutException e) { diff --git a/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/SharedBehavior.java b/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/SharedBehavior.java index 545bf0776..80d83fab0 100644 --- a/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/SharedBehavior.java +++ b/pubsublite-kafka-shim/src/main/java/com/google/cloud/pubsublite/kafka/SharedBehavior.java @@ -18,13 +18,10 @@ import static com.google.cloud.pubsublite.kafka.KafkaExceptionUtils.toKafka; -import com.google.cloud.pubsublite.AdminClient; import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.TopicPath; import com.google.common.collect.ImmutableList; -import java.time.Duration; import java.util.List; -import java.util.concurrent.TimeUnit; import org.apache.kafka.common.PartitionInfo; /** Shared behavior for producer and consumer. */ @@ -40,13 +37,10 @@ static PartitionInfo toPartitionInfo(TopicPath topic, Partition partition) { PubsubLiteNode.NODES); } - static List partitionsFor( - AdminClient adminClient, TopicPath topic, Duration timeout) { + static List partitionsFor(long partitionCount, TopicPath topic) { try { - long count = - adminClient.getTopicPartitionCount(topic).get(timeout.toMillis(), TimeUnit.MILLISECONDS); ImmutableList.Builder result = ImmutableList.builder(); - for (int i = 0; i < count; ++i) { + for (int i = 0; i < partitionCount; ++i) { result.add(toPartitionInfo(topic, Partition.of(i))); } return result.build(); diff --git a/pubsublite-kafka-shim/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumerTest.java b/pubsublite-kafka-shim/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumerTest.java index 76ebbb3c6..f152a62ea 100644 --- a/pubsublite-kafka-shim/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumerTest.java +++ b/pubsublite-kafka-shim/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteConsumerTest.java @@ -30,7 +30,6 @@ import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; -import com.google.cloud.pubsublite.AdminClient; import com.google.cloud.pubsublite.CloudZone; import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.Partition; @@ -98,7 +97,6 @@ private static T example(Class klass) { @Mock ConsumerFactory consumerFactory; @Mock AssignerFactory assignerFactory; - @Mock AdminClient adminClient; @Mock CursorClient cursorClient; @Mock Assigner assigner; @@ -113,9 +111,9 @@ public void setUp() { new PubsubLiteConsumer( example(SubscriptionPath.class), example(TopicPath.class), + 3, consumerFactory, assignerFactory, - adminClient, cursorClient); when(consumerFactory.newConsumer()).thenReturn(underlying); } diff --git a/pubsublite-kafka-shim/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducerTest.java b/pubsublite-kafka-shim/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducerTest.java index d28934749..7a05fceae 100644 --- a/pubsublite-kafka-shim/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducerTest.java +++ b/pubsublite-kafka-shim/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducerTest.java @@ -24,7 +24,6 @@ import static org.mockito.Mockito.when; import com.google.api.core.SettableApiFuture; -import com.google.cloud.pubsublite.AdminClient; import com.google.cloud.pubsublite.Message; import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.Partition; @@ -49,7 +48,6 @@ import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.mockito.Mock; import org.mockito.MockitoAnnotations; import org.mockito.Spy; @@ -68,14 +66,12 @@ abstract static class FakePublisher extends FakeApiService @Spy FakePublisher underlying; - @Mock AdminClient adminClient; - Producer producer; @Before public void setUp() { MockitoAnnotations.initMocks(this); - producer = new PubsubLiteProducer(underlying, adminClient, example(TopicPath.class)); + producer = new PubsubLiteProducer(underlying, 3, example(TopicPath.class)); verify(underlying).startAsync(); verify(underlying).awaitRunning(); } @@ -212,7 +208,6 @@ public void flush() throws Exception { @Test public void close() throws Exception { producer.close(); - verify(adminClient).close(); verify(underlying).stopAsync(); verify(underlying).awaitTerminated(Long.MAX_VALUE, TimeUnit.MILLISECONDS); } diff --git a/pubsublite-kafka-shim/src/test/java/com/google/cloud/pubsublite/kafka/SharedBehaviorTest.java b/pubsublite-kafka-shim/src/test/java/com/google/cloud/pubsublite/kafka/SharedBehaviorTest.java index bab860eb0..abcb416ee 100644 --- a/pubsublite-kafka-shim/src/test/java/com/google/cloud/pubsublite/kafka/SharedBehaviorTest.java +++ b/pubsublite-kafka-shim/src/test/java/com/google/cloud/pubsublite/kafka/SharedBehaviorTest.java @@ -18,37 +18,19 @@ import static com.google.cloud.pubsublite.internal.testing.UnitTestExamples.example; import static com.google.common.truth.Truth.assertThat; -import static org.mockito.Mockito.when; -import static org.mockito.MockitoAnnotations.initMocks; -import com.google.api.core.ApiFuture; -import com.google.api.core.ApiFutures; -import com.google.cloud.pubsublite.AdminClient; import com.google.cloud.pubsublite.TopicPath; -import java.time.Duration; import java.util.List; import org.apache.kafka.common.PartitionInfo; -import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -import org.mockito.Mock; @RunWith(JUnit4.class) public class SharedBehaviorTest { - @Mock AdminClient admin; - - @Before - public void setUp() { - initMocks(this); - } - @Test - public void partitionsForSuccess() throws Exception { - ApiFuture future = ApiFutures.immediateFuture(2L); - when(admin.getTopicPartitionCount(example(TopicPath.class))).thenReturn(future); - List result = - SharedBehavior.partitionsFor(admin, example(TopicPath.class), Duration.ofDays(1)); + public void partitionsForSuccess() { + List result = SharedBehavior.partitionsFor(2, example(TopicPath.class)); assertThat(result.size()).isEqualTo(2); assertThat(result.get(0).topic()).isEqualTo(example(TopicPath.class).toString()); assertThat(result.get(0).partition()).isEqualTo(0);