Skip to content

Commit

Permalink
fix: Upgrade kafka shim pubsublite version (#405)
Browse files Browse the repository at this point in the history
  • Loading branch information
dpcollins-google committed Feb 23, 2023
1 parent 7d99aef commit 278205e
Show file tree
Hide file tree
Showing 9 changed files with 81 additions and 70 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<module>pubsublite-kafka-auth</module>
</modules>
<properties>
<psl.version>1.9.2</psl.version>
<psl.version>1.11.1</psl.version>
</properties>

<dependencyManagement>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,22 @@

package com.google.cloud.pubsublite.kafka;

import com.google.cloud.pubsublite.proto.AttributeValues;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Iterators;
import com.google.protobuf.ByteString;
import java.util.Iterator;
import java.util.Map;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
import org.apache.kafka.common.header.Header;
import org.apache.kafka.common.header.Headers;

class LiteHeaders implements Headers {
private ImmutableListMultimap<String, ByteString> attributes;
private Map<String, AttributeValues> attributes;

LiteHeaders(ImmutableListMultimap<String, ByteString> attributes) {
LiteHeaders(Map<String, AttributeValues> attributes) {
this.attributes = attributes;
}

Expand Down Expand Up @@ -68,23 +71,24 @@ public Header lastHeader(String s) {

@Override
public Iterable<Header> headers(String s) {
if (attributes.containsKey(s))
return Iterables.transform(attributes.get(s), value -> toHeader(s, value));
return ImmutableList.of();
@Nullable AttributeValues values = attributes.get(s);
if (values == null) {
return ImmutableList.of();
}
return values.getValuesList().stream().map(v -> toHeader(s, v)).collect(Collectors.toList());
}

@Override
public Header[] toArray() {
ImmutableList.Builder<Header> arrayBuilder = ImmutableList.builder();
attributes
.entries()
.forEach(entry -> arrayBuilder.add(toHeader(entry.getKey(), entry.getValue())));
return (Header[]) arrayBuilder.build().toArray();
return Iterators.toArray(iterator(), Header.class);
}

@Override
public Iterator<Header> iterator() {
return Iterators.transform(
attributes.entries().iterator(), entry -> toHeader(entry.getKey(), entry.getValue()));
return attributes.entrySet().stream()
.flatMap(
entry ->
entry.getValue().getValuesList().stream().map(v -> toHeader(entry.getKey(), v)))
.iterator();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@

package com.google.cloud.pubsublite.kafka;

import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.proto.AttributeValues;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.common.collect.ImmutableListMultimap;
import com.google.protobuf.ByteString;
import com.google.protobuf.Timestamp;
Expand All @@ -32,44 +33,52 @@
class RecordTransforms {
private RecordTransforms() {}

static Message toMessage(ProducerRecord<byte[], byte[]> record) {
Message.Builder builder =
Message.builder()
static PubSubMessage toMessage(ProducerRecord<byte[], byte[]> record) {
PubSubMessage.Builder builder =
PubSubMessage.newBuilder()
.setKey(ByteString.copyFrom(record.key()))
.setData(ByteString.copyFrom(record.value()));
if (record.timestamp() != null) {
builder = builder.setEventTime(Timestamps.fromMillis(record.timestamp()));
builder.setEventTime(Timestamps.fromMillis(record.timestamp()));
}
ImmutableListMultimap.Builder<String, ByteString> attributes = ImmutableListMultimap.builder();
record
.headers()
.forEach(header -> attributes.put(header.key(), ByteString.copyFrom(header.value())));
return builder.setAttributes(attributes.build()).build();
attributes
.build()
.asMap()
.forEach(
(key, values) ->
builder.putAttributes(
key, AttributeValues.newBuilder().addAllValues(values).build()));
return builder.build();
}

static ConsumerRecord<byte[], byte[]> fromMessage(
SequencedMessage message, TopicPath topic, Partition partition) {
Headers headers = new LiteHeaders(message.message().attributes());
SequencedMessage sequenced, TopicPath topic, Partition partition) {
PubSubMessage message = sequenced.getMessage();
Headers headers = new LiteHeaders(message.getAttributesMap());
TimestampType type;
Timestamp timestamp;
if (message.message().eventTime().isPresent()) {
if (message.hasEventTime()) {
type = TimestampType.CREATE_TIME;
timestamp = message.message().eventTime().get();
timestamp = message.getEventTime();
} else {
type = TimestampType.LOG_APPEND_TIME;
timestamp = message.publishTime();
timestamp = sequenced.getPublishTime();
}
return new ConsumerRecord<>(
topic.toString(),
(int) partition.value(),
message.offset().value(),
sequenced.getCursor().getOffset(),
Timestamps.toMillis(timestamp),
type,
0L,
message.message().key().size(),
message.message().data().size(),
message.message().key().toByteArray(),
message.message().data().toByteArray(),
message.getKey().size(),
message.getData().size(),
message.getKey().toByteArray(),
message.getData().toByteArray(),
headers);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@
import com.google.api.core.ApiFutures;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.internal.BlockingPullSubscriber;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.wire.Committer;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.errorprone.annotations.concurrent.GuardedBy;
Expand Down Expand Up @@ -115,7 +115,7 @@ ArrayDeque<SequencedMessage> getMessages() throws CheckedApiException {
try (CloseableMonitor.Hold h = monitor.enter()) {
ArrayDeque<SequencedMessage> messages = pullMessages();
if (!messages.isEmpty()) {
lastReceived = Optional.of(Iterables.getLast(messages).offset());
lastReceived = Optional.of(Offset.of(Iterables.getLast(messages).getCursor().getOffset()));
needsCommitting = true;
}
return messages;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@
import com.google.api.core.SettableApiFuture;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.proto.SeekRequest.NamedTarget;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.google.api.core.SettableApiFuture;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
Expand All @@ -36,6 +35,7 @@
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.internal.testing.FakeApiService;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.common.collect.ImmutableMap;
import java.util.List;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -63,7 +63,7 @@ abstract static class FakePublisher extends FakeApiService
private static final ProducerRecord<byte[], byte[]> RECORD =
new ProducerRecord<>(
example(TopicPath.class).toString(), "abc".getBytes(), "defg".getBytes());
private static final Message MESSAGE = RecordTransforms.toMessage(RECORD);
private static final PubSubMessage MESSAGE = RecordTransforms.toMessage(RECORD);
private static final TopicPartition TOPIC_PARTITION =
new TopicPartition(
example(TopicPath.class).toString(), (int) example(Partition.class).value());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,11 @@
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.proto.AttributeValues;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.protobuf.ByteString;
Expand All @@ -37,21 +40,24 @@

@RunWith(JUnit4.class)
public class RecordTransformsTest {
private static final Message MESSAGE =
Message.builder()
private static final PubSubMessage MESSAGE =
PubSubMessage.newBuilder()
.setKey(ByteString.copyFromUtf8("abc"))
.setData(ByteString.copyFromUtf8("def"))
.setEventTime(Timestamp.newBuilder().setSeconds(1).setNanos(1000000).build())
.setAttributes(
ImmutableListMultimap.of(
"xxx",
ByteString.copyFromUtf8("yyy"),
"zzz",
ByteString.copyFromUtf8("zzz"),
"zzz",
ByteString.copyFromUtf8("zzz")))
.putAttributes("xxx", single("yyy"))
.putAttributes(
"zzz",
AttributeValues.newBuilder()
.addValues(ByteString.copyFromUtf8("zzz"))
.addValues(ByteString.copyFromUtf8("zzz"))
.build())
.build();

private static AttributeValues single(String v) {
return AttributeValues.newBuilder().addValues(ByteString.copyFromUtf8(v)).build();
}

@Test
public void publishTransform() {
ProducerRecord<byte[], byte[]> record =
Expand All @@ -65,15 +71,19 @@ public void publishTransform() {
LiteHeaders.toHeader("xxx", ByteString.copyFromUtf8("yyy")),
LiteHeaders.toHeader("zzz", ByteString.copyFromUtf8("zzz")),
LiteHeaders.toHeader("zzz", ByteString.copyFromUtf8("zzz"))));
Message message = RecordTransforms.toMessage(record);
PubSubMessage message = RecordTransforms.toMessage(record);
assertThat(message).isEqualTo(MESSAGE);
}

@Test
public void subscribeTransform() {
SequencedMessage sequencedMessage =
SequencedMessage.of(
MESSAGE, Timestamp.newBuilder().setNanos(12345).build(), example(Offset.class), 123L);
SequencedMessage.newBuilder()
.setMessage(MESSAGE)
.setPublishTime(Timestamp.newBuilder().setNanos(12345))
.setCursor(Cursor.newBuilder().setOffset(example(Offset.class).value()))
.setSizeBytes(123)
.build();
ConsumerRecord<byte[], byte[]> record =
RecordTransforms.fromMessage(
sequencedMessage, example(TopicPath.class), example(Partition.class));
Expand All @@ -85,7 +95,7 @@ public void subscribeTransform() {
record
.headers()
.forEach(header -> headers.put(header.key(), ByteString.copyFrom(header.value())));
assertThat(headers.build()).isEqualTo(MESSAGE.attributes());
assertThat(headers.build()).isEqualTo(Message.fromProto(MESSAGE).attributes());
assertThat(record.offset()).isEqualTo(example(Offset.class).value());
assertThat(record.topic()).isEqualTo(example(TopicPath.class).toString());
assertThat(record.partition()).isEqualTo(example(Partition.class).value());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,22 @@
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;

import com.google.api.core.ApiFutures;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.internal.BlockingPullSubscriber;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.testing.FakeApiService;
import com.google.cloud.pubsublite.internal.wire.Committer;
import com.google.cloud.pubsublite.internal.wire.SubscriberResetHandler;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.proto.SeekRequest.NamedTarget;
import com.google.protobuf.Timestamp;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import java.util.Optional;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -73,16 +70,8 @@ public void setUp() throws CheckedApiException {
.thenReturn(pullSubscriber);
}

@After
public void tearDown() throws Exception {
verifyNoMoreInteractions(subscriberFactory);
verifyNoMoreInteractions(pullSubscriber);
verifyNoMoreInteractions(committer);
}

private static SequencedMessage message(long offset) {
return SequencedMessage.of(
Message.builder().build(), Timestamp.getDefaultInstance(), Offset.of(offset), 0L);
return SequencedMessage.newBuilder().setCursor(Cursor.newBuilder().setOffset(offset)).build();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,8 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.BlockingPullSubscriber;
import com.google.cloud.pubsublite.internal.CheckedApiException;
Expand All @@ -44,12 +42,12 @@
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.proto.SeekRequest.NamedTarget;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ListMultimap;
import com.google.protobuf.Timestamp;
import java.time.Duration;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -102,8 +100,9 @@ public void setUp() throws CheckedApiException {
}

private static SequencedMessage message(Offset offset) {
return SequencedMessage.of(
Message.builder().build(), Timestamp.getDefaultInstance(), Offset.of(offset.value()), 0L);
return SequencedMessage.newBuilder()
.setCursor(Cursor.newBuilder().setOffset(offset.value()))
.build();
}

private static SequencedMessage message(long offset) {
Expand Down

0 comments on commit 278205e

Please sign in to comment.