Skip to content

Commit

Permalink
fix: Change Kafka{Producer/Consumer} to fix the number of partitions …
Browse files Browse the repository at this point in the history
…instead of looking it up. (#322)

This prevents making too many queries to the number of partitions.
  • Loading branch information
dpcollins-google committed Oct 23, 2020
1 parent 21cf834 commit 3f27c86
Show file tree
Hide file tree
Showing 8 changed files with 30 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.PartitionLookupUtils;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
Expand Down Expand Up @@ -67,12 +68,12 @@ public abstract static class Builder {
}

public Consumer<byte[], byte[]> instantiate() throws StatusException {
try {
CloudZone zone = subscriptionPath().location();
AdminClient adminClient =
AdminClient.create(AdminClientSettings.newBuilder().setRegion(zone.region()).build());
CloudZone zone = subscriptionPath().location();
try (AdminClient adminClient =
AdminClient.create(AdminClientSettings.newBuilder().setRegion(zone.region()).build())) {
Subscription subscription = adminClient.getSubscription(subscriptionPath()).get();
TopicPath topic = TopicPath.parse(subscription.getTopic());
long partitionCount = PartitionLookupUtils.numPartitions(topic);
AssignerFactory assignerFactory =
receiver -> {
AssignerBuilder.Builder builder = AssignerBuilder.newBuilder();
Expand Down Expand Up @@ -111,7 +112,12 @@ public Consumer<byte[], byte[]> instantiate() throws StatusException {
CursorClient.create(CursorClientSettings.newBuilder().setRegion(zone.region()).build());

return new PubsubLiteConsumer(
subscriptionPath(), topic, consumerFactory, assignerFactory, adminClient, cursorClient);
subscriptionPath(),
topic,
partitionCount,
consumerFactory,
assignerFactory,
cursorClient);
} catch (Exception e) {
throw ExtractStatus.toCanonical(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
package com.google.cloud.pubsublite.kafka;

import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.PartitionLookupUtils;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.wire.PubsubContext;
import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework;
Expand Down Expand Up @@ -54,9 +52,7 @@ public Producer<byte[], byte[]> instantiate() throws StatusException {
.setTopic(topicPath());
RoutingPublisherBuilder.Builder routingBuilder =
RoutingPublisherBuilder.newBuilder().setTopic(topicPath()).setPublisherBuilder(builder);
CloudZone zone = topicPath().location();
AdminClient adminClient =
AdminClient.create(AdminClientSettings.newBuilder().setRegion(zone.region()).build());
return new PubsubLiteProducer(routingBuilder.build(), adminClient, topicPath());
return new PubsubLiteProducer(
routingBuilder.build(), PartitionLookupUtils.numPartitions(topicPath()), topicPath());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.SubscriptionPath;
Expand Down Expand Up @@ -68,29 +67,29 @@
* <p>This also filters methods that Pub/Sub Lite will not implement.
*/
class PubsubLiteConsumer implements Consumer<byte[], byte[]> {
private static Duration INFINITE_DURATION = Duration.ofMillis(Long.MAX_VALUE);
private static GoogleLogger logger = GoogleLogger.forEnclosingClass();
private static final Duration INFINITE_DURATION = Duration.ofMillis(Long.MAX_VALUE);
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
private final SubscriptionPath subscriptionPath;
private final TopicPath topicPath;
private final long partitionCount;
private final ConsumerFactory consumerFactory;
private final AssignerFactory assignerFactory;
private final AdminClient adminClient;
private final CursorClient cursorClient;
private Optional<Assigner> assigner = Optional.empty();
private Optional<SingleSubscriptionConsumer> consumer = Optional.empty();

PubsubLiteConsumer(
SubscriptionPath subscriptionPath,
TopicPath topicPath,
long partitionCount,
ConsumerFactory consumerFactory,
AssignerFactory assignerFactory,
AdminClient adminClient,
CursorClient cursorClient) {
this.subscriptionPath = subscriptionPath;
this.topicPath = topicPath;
this.partitionCount = partitionCount;
this.consumerFactory = consumerFactory;
this.assignerFactory = assignerFactory;
this.adminClient = adminClient;
this.cursorClient = cursorClient;
}

Expand Down Expand Up @@ -443,7 +442,7 @@ public List<PartitionInfo> partitionsFor(String s) {
@Override
public List<PartitionInfo> partitionsFor(String topic, Duration timeout) {
checkTopic(topic);
return SharedBehavior.partitionsFor(adminClient, topicPath, timeout);
return SharedBehavior.partitionsFor(partitionCount, topicPath);
}

@Override
Expand Down Expand Up @@ -509,11 +508,6 @@ public void close(long l, TimeUnit timeUnit) {

@Override
public void close(Duration timeout) {
try {
adminClient.close();
} catch (Exception e) {
logger.atSevere().withCause(e).log("Error closing admin client during Consumer shutdown.");
}
try {
cursorClient.close();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.google.api.core.ApiFutures;
import com.google.api.core.ApiService.Listener;
import com.google.api.core.ApiService.State;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.ExtractStatus;
Expand Down Expand Up @@ -52,21 +51,20 @@
import org.apache.kafka.common.errors.UnsupportedVersionException;

class PubsubLiteProducer implements Producer<byte[], byte[]> {
private static Duration INFINITE_DURATION = Duration.ofMillis(Long.MAX_VALUE);
private static final UnsupportedVersionException NO_TRANSACTIONS_EXCEPTION =
new UnsupportedVersionException(
"Pub/Sub Lite is a non-transactional system and does not support producer transactions.");
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();

private final Publisher<PublishMetadata> publisher;
private final AdminClient adminClient;
private final TopicPath topicPath;
private final long partitionCount;

PubsubLiteProducer(
Publisher<PublishMetadata> publisher, AdminClient adminClient, TopicPath topicPath) {
Publisher<PublishMetadata> publisher, long partitionCount, TopicPath topicPath) {
this.publisher = publisher;
this.adminClient = adminClient;
this.topicPath = topicPath;
this.partitionCount = partitionCount;
this.publisher.addListener(
new Listener() {
@Override
Expand Down Expand Up @@ -177,7 +175,7 @@ public void flush() {
@Override
public List<PartitionInfo> partitionsFor(String s) {
checkTopic(s);
return SharedBehavior.partitionsFor(adminClient, topicPath, INFINITE_DURATION);
return SharedBehavior.partitionsFor(partitionCount, topicPath);
}

@Override
Expand All @@ -192,11 +190,6 @@ public void close() {

@Override
public void close(Duration duration) {
try {
adminClient.close();
} catch (Exception e) {
logger.atWarning().withCause(e).log("Failed to close admin client.");
}
try {
publisher.stopAsync().awaitTerminated(duration.toMillis(), MILLISECONDS);
} catch (TimeoutException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@

import static com.google.cloud.pubsublite.kafka.KafkaExceptionUtils.toKafka;

import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.TopicPath;
import com.google.common.collect.ImmutableList;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.PartitionInfo;

/** Shared behavior for producer and consumer. */
Expand All @@ -40,13 +37,10 @@ static PartitionInfo toPartitionInfo(TopicPath topic, Partition partition) {
PubsubLiteNode.NODES);
}

static List<PartitionInfo> partitionsFor(
AdminClient adminClient, TopicPath topic, Duration timeout) {
static List<PartitionInfo> partitionsFor(long partitionCount, TopicPath topic) {
try {
long count =
adminClient.getTopicPartitionCount(topic).get(timeout.toMillis(), TimeUnit.MILLISECONDS);
ImmutableList.Builder<PartitionInfo> result = ImmutableList.builder();
for (int i = 0; i < count; ++i) {
for (int i = 0; i < partitionCount; ++i) {
result.add(toPartitionInfo(topic, Partition.of(i)));
}
return result.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
Expand Down Expand Up @@ -98,7 +97,6 @@ private static <T> T example(Class<T> klass) {

@Mock ConsumerFactory consumerFactory;
@Mock AssignerFactory assignerFactory;
@Mock AdminClient adminClient;
@Mock CursorClient cursorClient;

@Mock Assigner assigner;
Expand All @@ -113,9 +111,9 @@ public void setUp() {
new PubsubLiteConsumer(
example(SubscriptionPath.class),
example(TopicPath.class),
3,
consumerFactory,
assignerFactory,
adminClient,
cursorClient);
when(consumerFactory.newConsumer()).thenReturn(underlying);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import static org.mockito.Mockito.when;

import com.google.api.core.SettableApiFuture;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
Expand All @@ -49,7 +48,6 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.Spy;

Expand All @@ -68,14 +66,12 @@ abstract static class FakePublisher extends FakeApiService

@Spy FakePublisher underlying;

@Mock AdminClient adminClient;

Producer<byte[], byte[]> producer;

@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
producer = new PubsubLiteProducer(underlying, adminClient, example(TopicPath.class));
producer = new PubsubLiteProducer(underlying, 3, example(TopicPath.class));
verify(underlying).startAsync();
verify(underlying).awaitRunning();
}
Expand Down Expand Up @@ -212,7 +208,6 @@ public void flush() throws Exception {
@Test
public void close() throws Exception {
producer.close();
verify(adminClient).close();
verify(underlying).stopAsync();
verify(underlying).awaitTerminated(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,37 +18,19 @@

import static com.google.cloud.pubsublite.internal.testing.UnitTestExamples.example;
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.TopicPath;
import java.time.Duration;
import java.util.List;
import org.apache.kafka.common.PartitionInfo;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;

@RunWith(JUnit4.class)
public class SharedBehaviorTest {
@Mock AdminClient admin;

@Before
public void setUp() {
initMocks(this);
}

@Test
public void partitionsForSuccess() throws Exception {
ApiFuture<Long> future = ApiFutures.immediateFuture(2L);
when(admin.getTopicPartitionCount(example(TopicPath.class))).thenReturn(future);
List<PartitionInfo> result =
SharedBehavior.partitionsFor(admin, example(TopicPath.class), Duration.ofDays(1));
public void partitionsForSuccess() {
List<PartitionInfo> result = SharedBehavior.partitionsFor(2, example(TopicPath.class));
assertThat(result.size()).isEqualTo(2);
assertThat(result.get(0).topic()).isEqualTo(example(TopicPath.class).toString());
assertThat(result.get(0).partition()).isEqualTo(0);
Expand Down

0 comments on commit 3f27c86

Please sign in to comment.