From 278205e5bed4bab993630c5c73d441894d28ddee Mon Sep 17 00:00:00 2001 From: dpcollins-google <40498610+dpcollins-google@users.noreply.github.com> Date: Wed, 22 Feb 2023 20:03:55 -0500 Subject: [PATCH] fix: Upgrade kafka shim pubsublite version (#405) --- pom.xml | 2 +- .../cloud/pubsublite/kafka/LiteHeaders.java | 30 +++++++------ .../pubsublite/kafka/RecordTransforms.java | 43 +++++++++++-------- .../kafka/SinglePartitionSubscriber.java | 4 +- .../kafka/SingleSubscriptionConsumerImpl.java | 2 +- .../kafka/PubsubLiteProducerTest.java | 4 +- .../kafka/RecordTransformsTest.java | 40 ++++++++++------- .../kafka/SinglePartitionSubscriberTest.java | 17 ++------ .../SingleSubscriptionConsumerImplTest.java | 9 ++-- 9 files changed, 81 insertions(+), 70 deletions(-) diff --git a/pom.xml b/pom.xml index 0b588f45..6c766244 100644 --- a/pom.xml +++ b/pom.xml @@ -18,7 +18,7 @@ pubsublite-kafka-auth - 1.9.2 + 1.11.1 diff --git a/pubsublite-kafka/src/main/java/com/google/cloud/pubsublite/kafka/LiteHeaders.java b/pubsublite-kafka/src/main/java/com/google/cloud/pubsublite/kafka/LiteHeaders.java index cd3cd6da..8fb63de8 100644 --- a/pubsublite-kafka/src/main/java/com/google/cloud/pubsublite/kafka/LiteHeaders.java +++ b/pubsublite-kafka/src/main/java/com/google/cloud/pubsublite/kafka/LiteHeaders.java @@ -16,19 +16,22 @@ package com.google.cloud.pubsublite.kafka; +import com.google.cloud.pubsublite.proto.AttributeValues; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableListMultimap; import com.google.common.collect.Iterables; import com.google.common.collect.Iterators; import com.google.protobuf.ByteString; import java.util.Iterator; +import java.util.Map; +import java.util.stream.Collectors; +import javax.annotation.Nullable; import org.apache.kafka.common.header.Header; import org.apache.kafka.common.header.Headers; class LiteHeaders implements Headers { - private ImmutableListMultimap attributes; + private Map attributes; - LiteHeaders(ImmutableListMultimap attributes) { + LiteHeaders(Map attributes) { this.attributes = attributes; } @@ -68,23 +71,24 @@ public Header lastHeader(String s) { @Override public Iterable
headers(String s) { - if (attributes.containsKey(s)) - return Iterables.transform(attributes.get(s), value -> toHeader(s, value)); - return ImmutableList.of(); + @Nullable AttributeValues values = attributes.get(s); + if (values == null) { + return ImmutableList.of(); + } + return values.getValuesList().stream().map(v -> toHeader(s, v)).collect(Collectors.toList()); } @Override public Header[] toArray() { - ImmutableList.Builder
arrayBuilder = ImmutableList.builder(); - attributes - .entries() - .forEach(entry -> arrayBuilder.add(toHeader(entry.getKey(), entry.getValue()))); - return (Header[]) arrayBuilder.build().toArray(); + return Iterators.toArray(iterator(), Header.class); } @Override public Iterator
iterator() { - return Iterators.transform( - attributes.entries().iterator(), entry -> toHeader(entry.getKey(), entry.getValue())); + return attributes.entrySet().stream() + .flatMap( + entry -> + entry.getValue().getValuesList().stream().map(v -> toHeader(entry.getKey(), v))) + .iterator(); } } diff --git a/pubsublite-kafka/src/main/java/com/google/cloud/pubsublite/kafka/RecordTransforms.java b/pubsublite-kafka/src/main/java/com/google/cloud/pubsublite/kafka/RecordTransforms.java index 61839d3c..e59982d5 100644 --- a/pubsublite-kafka/src/main/java/com/google/cloud/pubsublite/kafka/RecordTransforms.java +++ b/pubsublite-kafka/src/main/java/com/google/cloud/pubsublite/kafka/RecordTransforms.java @@ -16,10 +16,11 @@ package com.google.cloud.pubsublite.kafka; -import com.google.cloud.pubsublite.Message; import com.google.cloud.pubsublite.Partition; -import com.google.cloud.pubsublite.SequencedMessage; import com.google.cloud.pubsublite.TopicPath; +import com.google.cloud.pubsublite.proto.AttributeValues; +import com.google.cloud.pubsublite.proto.PubSubMessage; +import com.google.cloud.pubsublite.proto.SequencedMessage; import com.google.common.collect.ImmutableListMultimap; import com.google.protobuf.ByteString; import com.google.protobuf.Timestamp; @@ -32,44 +33,52 @@ class RecordTransforms { private RecordTransforms() {} - static Message toMessage(ProducerRecord record) { - Message.Builder builder = - Message.builder() + static PubSubMessage toMessage(ProducerRecord record) { + PubSubMessage.Builder builder = + PubSubMessage.newBuilder() .setKey(ByteString.copyFrom(record.key())) .setData(ByteString.copyFrom(record.value())); if (record.timestamp() != null) { - builder = builder.setEventTime(Timestamps.fromMillis(record.timestamp())); + builder.setEventTime(Timestamps.fromMillis(record.timestamp())); } ImmutableListMultimap.Builder attributes = ImmutableListMultimap.builder(); record .headers() .forEach(header -> attributes.put(header.key(), ByteString.copyFrom(header.value()))); - return builder.setAttributes(attributes.build()).build(); + attributes + .build() + .asMap() + .forEach( + (key, values) -> + builder.putAttributes( + key, AttributeValues.newBuilder().addAllValues(values).build())); + return builder.build(); } static ConsumerRecord fromMessage( - SequencedMessage message, TopicPath topic, Partition partition) { - Headers headers = new LiteHeaders(message.message().attributes()); + SequencedMessage sequenced, TopicPath topic, Partition partition) { + PubSubMessage message = sequenced.getMessage(); + Headers headers = new LiteHeaders(message.getAttributesMap()); TimestampType type; Timestamp timestamp; - if (message.message().eventTime().isPresent()) { + if (message.hasEventTime()) { type = TimestampType.CREATE_TIME; - timestamp = message.message().eventTime().get(); + timestamp = message.getEventTime(); } else { type = TimestampType.LOG_APPEND_TIME; - timestamp = message.publishTime(); + timestamp = sequenced.getPublishTime(); } return new ConsumerRecord<>( topic.toString(), (int) partition.value(), - message.offset().value(), + sequenced.getCursor().getOffset(), Timestamps.toMillis(timestamp), type, 0L, - message.message().key().size(), - message.message().data().size(), - message.message().key().toByteArray(), - message.message().data().toByteArray(), + message.getKey().size(), + message.getData().size(), + message.getKey().toByteArray(), + message.getData().toByteArray(), headers); } } diff --git a/pubsublite-kafka/src/main/java/com/google/cloud/pubsublite/kafka/SinglePartitionSubscriber.java b/pubsublite-kafka/src/main/java/com/google/cloud/pubsublite/kafka/SinglePartitionSubscriber.java index 28806255..4c34be0e 100644 --- a/pubsublite-kafka/src/main/java/com/google/cloud/pubsublite/kafka/SinglePartitionSubscriber.java +++ b/pubsublite-kafka/src/main/java/com/google/cloud/pubsublite/kafka/SinglePartitionSubscriber.java @@ -22,13 +22,13 @@ import com.google.api.core.ApiFutures; import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.Partition; -import com.google.cloud.pubsublite.SequencedMessage; import com.google.cloud.pubsublite.internal.BlockingPullSubscriber; import com.google.cloud.pubsublite.internal.CheckedApiException; import com.google.cloud.pubsublite.internal.CloseableMonitor; import com.google.cloud.pubsublite.internal.ProxyService; import com.google.cloud.pubsublite.internal.wire.Committer; import com.google.cloud.pubsublite.proto.SeekRequest; +import com.google.cloud.pubsublite.proto.SequencedMessage; import com.google.common.collect.Iterables; import com.google.common.util.concurrent.MoreExecutors; import com.google.errorprone.annotations.concurrent.GuardedBy; @@ -115,7 +115,7 @@ ArrayDeque getMessages() throws CheckedApiException { try (CloseableMonitor.Hold h = monitor.enter()) { ArrayDeque messages = pullMessages(); if (!messages.isEmpty()) { - lastReceived = Optional.of(Iterables.getLast(messages).offset()); + lastReceived = Optional.of(Offset.of(Iterables.getLast(messages).getCursor().getOffset())); needsCommitting = true; } return messages; diff --git a/pubsublite-kafka/src/main/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImpl.java b/pubsublite-kafka/src/main/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImpl.java index f80b18cd..323b2c0e 100644 --- a/pubsublite-kafka/src/main/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImpl.java +++ b/pubsublite-kafka/src/main/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImpl.java @@ -26,12 +26,12 @@ import com.google.api.core.SettableApiFuture; import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.Partition; -import com.google.cloud.pubsublite.SequencedMessage; import com.google.cloud.pubsublite.TopicPath; import com.google.cloud.pubsublite.internal.CloseableMonitor; import com.google.cloud.pubsublite.internal.ExtractStatus; import com.google.cloud.pubsublite.proto.SeekRequest; import com.google.cloud.pubsublite.proto.SeekRequest.NamedTarget; +import com.google.cloud.pubsublite.proto.SequencedMessage; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; diff --git a/pubsublite-kafka/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducerTest.java b/pubsublite-kafka/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducerTest.java index 85a92f8e..12e63921 100644 --- a/pubsublite-kafka/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducerTest.java +++ b/pubsublite-kafka/src/test/java/com/google/cloud/pubsublite/kafka/PubsubLiteProducerTest.java @@ -27,7 +27,6 @@ import com.google.api.core.SettableApiFuture; import com.google.api.gax.rpc.StatusCode.Code; import com.google.cloud.pubsublite.AdminClient; -import com.google.cloud.pubsublite.Message; import com.google.cloud.pubsublite.MessageMetadata; import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.Partition; @@ -36,6 +35,7 @@ import com.google.cloud.pubsublite.internal.CheckedApiException; import com.google.cloud.pubsublite.internal.Publisher; import com.google.cloud.pubsublite.internal.testing.FakeApiService; +import com.google.cloud.pubsublite.proto.PubSubMessage; import com.google.common.collect.ImmutableMap; import java.util.List; import java.util.concurrent.Future; @@ -63,7 +63,7 @@ abstract static class FakePublisher extends FakeApiService private static final ProducerRecord RECORD = new ProducerRecord<>( example(TopicPath.class).toString(), "abc".getBytes(), "defg".getBytes()); - private static final Message MESSAGE = RecordTransforms.toMessage(RECORD); + private static final PubSubMessage MESSAGE = RecordTransforms.toMessage(RECORD); private static final TopicPartition TOPIC_PARTITION = new TopicPartition( example(TopicPath.class).toString(), (int) example(Partition.class).value()); diff --git a/pubsublite-kafka/src/test/java/com/google/cloud/pubsublite/kafka/RecordTransformsTest.java b/pubsublite-kafka/src/test/java/com/google/cloud/pubsublite/kafka/RecordTransformsTest.java index 20c2a8e9..1347960a 100644 --- a/pubsublite-kafka/src/test/java/com/google/cloud/pubsublite/kafka/RecordTransformsTest.java +++ b/pubsublite-kafka/src/test/java/com/google/cloud/pubsublite/kafka/RecordTransformsTest.java @@ -22,8 +22,11 @@ import com.google.cloud.pubsublite.Message; import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.Partition; -import com.google.cloud.pubsublite.SequencedMessage; import com.google.cloud.pubsublite.TopicPath; +import com.google.cloud.pubsublite.proto.AttributeValues; +import com.google.cloud.pubsublite.proto.Cursor; +import com.google.cloud.pubsublite.proto.PubSubMessage; +import com.google.cloud.pubsublite.proto.SequencedMessage; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableListMultimap; import com.google.protobuf.ByteString; @@ -37,21 +40,24 @@ @RunWith(JUnit4.class) public class RecordTransformsTest { - private static final Message MESSAGE = - Message.builder() + private static final PubSubMessage MESSAGE = + PubSubMessage.newBuilder() .setKey(ByteString.copyFromUtf8("abc")) .setData(ByteString.copyFromUtf8("def")) .setEventTime(Timestamp.newBuilder().setSeconds(1).setNanos(1000000).build()) - .setAttributes( - ImmutableListMultimap.of( - "xxx", - ByteString.copyFromUtf8("yyy"), - "zzz", - ByteString.copyFromUtf8("zzz"), - "zzz", - ByteString.copyFromUtf8("zzz"))) + .putAttributes("xxx", single("yyy")) + .putAttributes( + "zzz", + AttributeValues.newBuilder() + .addValues(ByteString.copyFromUtf8("zzz")) + .addValues(ByteString.copyFromUtf8("zzz")) + .build()) .build(); + private static AttributeValues single(String v) { + return AttributeValues.newBuilder().addValues(ByteString.copyFromUtf8(v)).build(); + } + @Test public void publishTransform() { ProducerRecord record = @@ -65,15 +71,19 @@ public void publishTransform() { LiteHeaders.toHeader("xxx", ByteString.copyFromUtf8("yyy")), LiteHeaders.toHeader("zzz", ByteString.copyFromUtf8("zzz")), LiteHeaders.toHeader("zzz", ByteString.copyFromUtf8("zzz")))); - Message message = RecordTransforms.toMessage(record); + PubSubMessage message = RecordTransforms.toMessage(record); assertThat(message).isEqualTo(MESSAGE); } @Test public void subscribeTransform() { SequencedMessage sequencedMessage = - SequencedMessage.of( - MESSAGE, Timestamp.newBuilder().setNanos(12345).build(), example(Offset.class), 123L); + SequencedMessage.newBuilder() + .setMessage(MESSAGE) + .setPublishTime(Timestamp.newBuilder().setNanos(12345)) + .setCursor(Cursor.newBuilder().setOffset(example(Offset.class).value())) + .setSizeBytes(123) + .build(); ConsumerRecord record = RecordTransforms.fromMessage( sequencedMessage, example(TopicPath.class), example(Partition.class)); @@ -85,7 +95,7 @@ public void subscribeTransform() { record .headers() .forEach(header -> headers.put(header.key(), ByteString.copyFrom(header.value()))); - assertThat(headers.build()).isEqualTo(MESSAGE.attributes()); + assertThat(headers.build()).isEqualTo(Message.fromProto(MESSAGE).attributes()); assertThat(record.offset()).isEqualTo(example(Offset.class).value()); assertThat(record.topic()).isEqualTo(example(TopicPath.class).toString()); assertThat(record.partition()).isEqualTo(example(Partition.class).value()); diff --git a/pubsublite-kafka/src/test/java/com/google/cloud/pubsublite/kafka/SinglePartitionSubscriberTest.java b/pubsublite-kafka/src/test/java/com/google/cloud/pubsublite/kafka/SinglePartitionSubscriberTest.java index 4b3e2482..e35278bf 100644 --- a/pubsublite-kafka/src/test/java/com/google/cloud/pubsublite/kafka/SinglePartitionSubscriberTest.java +++ b/pubsublite-kafka/src/test/java/com/google/cloud/pubsublite/kafka/SinglePartitionSubscriberTest.java @@ -22,25 +22,22 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; import com.google.api.core.ApiFutures; -import com.google.cloud.pubsublite.Message; import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.Partition; -import com.google.cloud.pubsublite.SequencedMessage; import com.google.cloud.pubsublite.internal.BlockingPullSubscriber; import com.google.cloud.pubsublite.internal.CheckedApiException; import com.google.cloud.pubsublite.internal.testing.FakeApiService; import com.google.cloud.pubsublite.internal.wire.Committer; import com.google.cloud.pubsublite.internal.wire.SubscriberResetHandler; +import com.google.cloud.pubsublite.proto.Cursor; import com.google.cloud.pubsublite.proto.SeekRequest; import com.google.cloud.pubsublite.proto.SeekRequest.NamedTarget; -import com.google.protobuf.Timestamp; +import com.google.cloud.pubsublite.proto.SequencedMessage; import java.util.Optional; -import org.junit.After; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -73,16 +70,8 @@ public void setUp() throws CheckedApiException { .thenReturn(pullSubscriber); } - @After - public void tearDown() throws Exception { - verifyNoMoreInteractions(subscriberFactory); - verifyNoMoreInteractions(pullSubscriber); - verifyNoMoreInteractions(committer); - } - private static SequencedMessage message(long offset) { - return SequencedMessage.of( - Message.builder().build(), Timestamp.getDefaultInstance(), Offset.of(offset), 0L); + return SequencedMessage.newBuilder().setCursor(Cursor.newBuilder().setOffset(offset)).build(); } @Test diff --git a/pubsublite-kafka/src/test/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImplTest.java b/pubsublite-kafka/src/test/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImplTest.java index 48855977..fd00d0d4 100644 --- a/pubsublite-kafka/src/test/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImplTest.java +++ b/pubsublite-kafka/src/test/java/com/google/cloud/pubsublite/kafka/SingleSubscriptionConsumerImplTest.java @@ -31,10 +31,8 @@ import com.google.api.core.ApiFuture; import com.google.api.core.ApiFutures; import com.google.api.core.SettableApiFuture; -import com.google.cloud.pubsublite.Message; import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.Partition; -import com.google.cloud.pubsublite.SequencedMessage; import com.google.cloud.pubsublite.TopicPath; import com.google.cloud.pubsublite.internal.BlockingPullSubscriber; import com.google.cloud.pubsublite.internal.CheckedApiException; @@ -44,12 +42,12 @@ import com.google.cloud.pubsublite.proto.Cursor; import com.google.cloud.pubsublite.proto.SeekRequest; import com.google.cloud.pubsublite.proto.SeekRequest.NamedTarget; +import com.google.cloud.pubsublite.proto.SequencedMessage; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableListMultimap; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.ListMultimap; -import com.google.protobuf.Timestamp; import java.time.Duration; import java.util.Map; import java.util.Optional; @@ -102,8 +100,9 @@ public void setUp() throws CheckedApiException { } private static SequencedMessage message(Offset offset) { - return SequencedMessage.of( - Message.builder().build(), Timestamp.getDefaultInstance(), Offset.of(offset.value()), 0L); + return SequencedMessage.newBuilder() + .setCursor(Cursor.newBuilder().setOffset(offset.value())) + .build(); } private static SequencedMessage message(long offset) {