Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.PartitionLookupUtils;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
Expand Down Expand Up @@ -67,12 +68,12 @@ public abstract static class Builder {
}

public Consumer<byte[], byte[]> instantiate() throws StatusException {
try {
CloudZone zone = subscriptionPath().location();
AdminClient adminClient =
AdminClient.create(AdminClientSettings.newBuilder().setRegion(zone.region()).build());
CloudZone zone = subscriptionPath().location();
try (AdminClient adminClient =
AdminClient.create(AdminClientSettings.newBuilder().setRegion(zone.region()).build())) {
Subscription subscription = adminClient.getSubscription(subscriptionPath()).get();
TopicPath topic = TopicPath.parse(subscription.getTopic());
long partitionCount = PartitionLookupUtils.numPartitions(topic);
AssignerFactory assignerFactory =
receiver -> {
AssignerBuilder.Builder builder = AssignerBuilder.newBuilder();
Expand Down Expand Up @@ -111,7 +112,12 @@ public Consumer<byte[], byte[]> instantiate() throws StatusException {
CursorClient.create(CursorClientSettings.newBuilder().setRegion(zone.region()).build());

return new PubsubLiteConsumer(
subscriptionPath(), topic, consumerFactory, assignerFactory, adminClient, cursorClient);
subscriptionPath(),
topic,
partitionCount,
consumerFactory,
assignerFactory,
cursorClient);
} catch (Exception e) {
throw ExtractStatus.toCanonical(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@
package com.google.cloud.pubsublite.kafka;

import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.PartitionLookupUtils;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.wire.PubsubContext;
import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework;
Expand Down Expand Up @@ -54,9 +52,7 @@ public Producer<byte[], byte[]> instantiate() throws StatusException {
.setTopic(topicPath());
RoutingPublisherBuilder.Builder routingBuilder =
RoutingPublisherBuilder.newBuilder().setTopic(topicPath()).setPublisherBuilder(builder);
CloudZone zone = topicPath().location();
AdminClient adminClient =
AdminClient.create(AdminClientSettings.newBuilder().setRegion(zone.region()).build());
return new PubsubLiteProducer(routingBuilder.build(), adminClient, topicPath());
return new PubsubLiteProducer(
routingBuilder.build(), PartitionLookupUtils.numPartitions(topicPath()), topicPath());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.SubscriptionPath;
Expand Down Expand Up @@ -68,29 +67,29 @@
* <p>This also filters methods that Pub/Sub Lite will not implement.
*/
class PubsubLiteConsumer implements Consumer<byte[], byte[]> {
private static Duration INFINITE_DURATION = Duration.ofMillis(Long.MAX_VALUE);
private static GoogleLogger logger = GoogleLogger.forEnclosingClass();
private static final Duration INFINITE_DURATION = Duration.ofMillis(Long.MAX_VALUE);
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
private final SubscriptionPath subscriptionPath;
private final TopicPath topicPath;
private final long partitionCount;
private final ConsumerFactory consumerFactory;
private final AssignerFactory assignerFactory;
private final AdminClient adminClient;
private final CursorClient cursorClient;
private Optional<Assigner> assigner = Optional.empty();
private Optional<SingleSubscriptionConsumer> consumer = Optional.empty();

PubsubLiteConsumer(
SubscriptionPath subscriptionPath,
TopicPath topicPath,
long partitionCount,
ConsumerFactory consumerFactory,
AssignerFactory assignerFactory,
AdminClient adminClient,
CursorClient cursorClient) {
this.subscriptionPath = subscriptionPath;
this.topicPath = topicPath;
this.partitionCount = partitionCount;
this.consumerFactory = consumerFactory;
this.assignerFactory = assignerFactory;
this.adminClient = adminClient;
this.cursorClient = cursorClient;
}

Expand Down Expand Up @@ -443,7 +442,7 @@ public List<PartitionInfo> partitionsFor(String s) {
@Override
public List<PartitionInfo> partitionsFor(String topic, Duration timeout) {
checkTopic(topic);
return SharedBehavior.partitionsFor(adminClient, topicPath, timeout);
return SharedBehavior.partitionsFor(partitionCount, topicPath);
}

@Override
Expand Down Expand Up @@ -509,11 +508,6 @@ public void close(long l, TimeUnit timeUnit) {

@Override
public void close(Duration timeout) {
try {
adminClient.close();
} catch (Exception e) {
logger.atSevere().withCause(e).log("Error closing admin client during Consumer shutdown.");
}
try {
cursorClient.close();
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.google.api.core.ApiFutures;
import com.google.api.core.ApiService.Listener;
import com.google.api.core.ApiService.State;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.ExtractStatus;
Expand Down Expand Up @@ -52,21 +51,20 @@
import org.apache.kafka.common.errors.UnsupportedVersionException;

class PubsubLiteProducer implements Producer<byte[], byte[]> {
private static Duration INFINITE_DURATION = Duration.ofMillis(Long.MAX_VALUE);
private static final UnsupportedVersionException NO_TRANSACTIONS_EXCEPTION =
new UnsupportedVersionException(
"Pub/Sub Lite is a non-transactional system and does not support producer transactions.");
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();

private final Publisher<PublishMetadata> publisher;
private final AdminClient adminClient;
private final TopicPath topicPath;
private final long partitionCount;

PubsubLiteProducer(
Publisher<PublishMetadata> publisher, AdminClient adminClient, TopicPath topicPath) {
Publisher<PublishMetadata> publisher, long partitionCount, TopicPath topicPath) {
this.publisher = publisher;
this.adminClient = adminClient;
this.topicPath = topicPath;
this.partitionCount = partitionCount;
this.publisher.addListener(
new Listener() {
@Override
Expand Down Expand Up @@ -177,7 +175,7 @@ public void flush() {
@Override
public List<PartitionInfo> partitionsFor(String s) {
checkTopic(s);
return SharedBehavior.partitionsFor(adminClient, topicPath, INFINITE_DURATION);
return SharedBehavior.partitionsFor(partitionCount, topicPath);
}

@Override
Expand All @@ -192,11 +190,6 @@ public void close() {

@Override
public void close(Duration duration) {
try {
adminClient.close();
} catch (Exception e) {
logger.atWarning().withCause(e).log("Failed to close admin client.");
}
try {
publisher.stopAsync().awaitTerminated(duration.toMillis(), MILLISECONDS);
} catch (TimeoutException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@

import static com.google.cloud.pubsublite.kafka.KafkaExceptionUtils.toKafka;

import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.TopicPath;
import com.google.common.collect.ImmutableList;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.common.PartitionInfo;

/** Shared behavior for producer and consumer. */
Expand All @@ -40,13 +37,10 @@ static PartitionInfo toPartitionInfo(TopicPath topic, Partition partition) {
PubsubLiteNode.NODES);
}

static List<PartitionInfo> partitionsFor(
AdminClient adminClient, TopicPath topic, Duration timeout) {
static List<PartitionInfo> partitionsFor(long partitionCount, TopicPath topic) {
try {
long count =
adminClient.getTopicPartitionCount(topic).get(timeout.toMillis(), TimeUnit.MILLISECONDS);
ImmutableList.Builder<PartitionInfo> result = ImmutableList.builder();
for (int i = 0; i < count; ++i) {
for (int i = 0; i < partitionCount; ++i) {
result.add(toPartitionInfo(topic, Partition.of(i)));
}
return result.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@

import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
Expand Down Expand Up @@ -98,7 +97,6 @@ private static <T> T example(Class<T> klass) {

@Mock ConsumerFactory consumerFactory;
@Mock AssignerFactory assignerFactory;
@Mock AdminClient adminClient;
@Mock CursorClient cursorClient;

@Mock Assigner assigner;
Expand All @@ -113,9 +111,9 @@ public void setUp() {
new PubsubLiteConsumer(
example(SubscriptionPath.class),
example(TopicPath.class),
3,
consumerFactory,
assignerFactory,
adminClient,
cursorClient);
when(consumerFactory.newConsumer()).thenReturn(underlying);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import static org.mockito.Mockito.when;

import com.google.api.core.SettableApiFuture;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
Expand All @@ -49,7 +48,6 @@
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.mockito.Spy;

Expand All @@ -68,14 +66,12 @@ abstract static class FakePublisher extends FakeApiService

@Spy FakePublisher underlying;

@Mock AdminClient adminClient;

Producer<byte[], byte[]> producer;

@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
producer = new PubsubLiteProducer(underlying, adminClient, example(TopicPath.class));
producer = new PubsubLiteProducer(underlying, 3, example(TopicPath.class));
verify(underlying).startAsync();
verify(underlying).awaitRunning();
}
Expand Down Expand Up @@ -212,7 +208,6 @@ public void flush() throws Exception {
@Test
public void close() throws Exception {
producer.close();
verify(adminClient).close();
verify(underlying).stopAsync();
verify(underlying).awaitTerminated(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,37 +18,19 @@

import static com.google.cloud.pubsublite.internal.testing.UnitTestExamples.example;
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.TopicPath;
import java.time.Duration;
import java.util.List;
import org.apache.kafka.common.PartitionInfo;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.Mock;

@RunWith(JUnit4.class)
public class SharedBehaviorTest {
@Mock AdminClient admin;

@Before
public void setUp() {
initMocks(this);
}

@Test
public void partitionsForSuccess() throws Exception {
ApiFuture<Long> future = ApiFutures.immediateFuture(2L);
when(admin.getTopicPartitionCount(example(TopicPath.class))).thenReturn(future);
List<PartitionInfo> result =
SharedBehavior.partitionsFor(admin, example(TopicPath.class), Duration.ofDays(1));
public void partitionsForSuccess() {
List<PartitionInfo> result = SharedBehavior.partitionsFor(2, example(TopicPath.class));
assertThat(result.size()).isEqualTo(2);
assertThat(result.get(0).topic()).isEqualTo(example(TopicPath.class).toString());
assertThat(result.get(0).partition()).isEqualTo(0);
Expand Down