Skip to content

Commit

Permalink
feat: Change message id on PubsubMessages to be an encoded MessageMet…
Browse files Browse the repository at this point in the history
…adata and rename PublishMetadata to MessageMetadata (#482)

* feat: Change message id on PubsubMessages to be an encoded PublishMetadata

PublishMetadata is both the partition and offset, and is an ID suitable for deduplication.

FIXES: #471

* feat: Rename PublishMetadata to MessageMetadata

* fix: Run fmt:format on samples
  • Loading branch information
dpcollins-google committed Feb 4, 2021
1 parent fe29e70 commit a5cedf9
Show file tree
Hide file tree
Showing 26 changed files with 197 additions and 94 deletions.
4 changes: 2 additions & 2 deletions .readme-partials.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,11 @@ custom_content: |
futures.add(future);
}
} finally {
ArrayList<PublishMetadata> metadata = new ArrayList<>();
ArrayList<MessageMetadata> metadata = new ArrayList<>();
List<String> ackIds = ApiFutures.allAsList(futures).get();
for (String id : ackIds) {
// Decoded metadata contains partition and offset.
metadata.add(PublishMetadata.decode(id));
metadata.add(MessageMetadata.decode(id));
}
System.out.println(metadata + "\nPublished " + ackIds.size() + " messages.");
Expand Down
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,11 @@ try {
futures.add(future);
}
} finally {
ArrayList<PublishMetadata> metadata = new ArrayList<>();
ArrayList<MessageMetadata> metadata = new ArrayList<>();
List<String> ackIds = ApiFutures.allAsList(futures).get();
for (String id : ackIds) {
// Decoded metadata contains partition and offset.
metadata.add(PublishMetadata.decode(id));
metadata.add(MessageMetadata.decode(id));
}
System.out.println(metadata + "\nPublished " + ackIds.size() + " messages.");

Expand Down
6 changes: 6 additions & 0 deletions google-cloud-pubsublite/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
<differences>
<!-- TODO: Remove on next release -->
<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/pubsublite/PublishMetadata</className>
<method>*</method>
</difference>
<!-- Added abstract method to AutoValue.Builder class (Always okay) -->
<difference>
<differenceType>7013</differenceType>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,32 +24,33 @@
import com.google.cloud.pubsublite.internal.CheckedApiException;

/**
* Information about a successful publish operation. Can be encoded in the string returned by the
* Cloud Pub/Sub {@link com.google.cloud.pubsub.v1.Publisher#publish} api.
* Information about a message in Pub/Sub Lite. Can be encoded in the string returned by the Cloud
* Pub/Sub {@link com.google.cloud.pubsub.v1.Publisher#publish} api or the {@link
* com.google.pubsub.v1.PubsubMessage#getMessageId} field on received messages.
*/
@AutoValue
public abstract class PublishMetadata {
public abstract class MessageMetadata {
/** The partition a message was published to. */
public abstract Partition partition();

/** The offset a message was assigned. */
public abstract Offset offset();

/** Construct a PublishMetadata from a Partition and Offset. */
public static PublishMetadata of(Partition partition, Offset offset) {
return new AutoValue_PublishMetadata(partition, offset);
/** Construct a MessageMetadata from a Partition and Offset. */
public static MessageMetadata of(Partition partition, Offset offset) {
return new AutoValue_MessageMetadata(partition, offset);
}

/** Decode a PublishMetadata from the Cloud Pub/Sub ack id. */
public static PublishMetadata decode(String encoded) throws ApiException {
/** Decode a MessageMetadata from the Cloud Pub/Sub ack id. */
public static MessageMetadata decode(String encoded) throws ApiException {
String[] split = encoded.split(":");
checkArgument(split.length == 2, "Invalid encoded PublishMetadata.");
checkArgument(split.length == 2, "Invalid encoded MessageMetadata.");
try {
Partition partition = Partition.of(Long.parseLong(split[0]));
Offset offset = Offset.of(Long.parseLong(split[1]));
return of(partition, offset);
} catch (NumberFormatException e) {
throw new CheckedApiException("Invalid encoded PublishMetadata.", e, Code.INVALID_ARGUMENT)
throw new CheckedApiException("Invalid encoded MessageMetadata.", e, Code.INVALID_ARGUMENT)
.underlying;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,9 @@
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.MessageTransformer;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.SequencedMessage;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.common.collect.ImmutableListMultimap;
Expand Down Expand Up @@ -72,15 +74,30 @@ private static String parseAttributes(Collection<ByteString> values) throws ApiE
"Received an unparseable message with multiple values for an attribute.");
ByteString attribute = values.iterator().next();
checkArgument(
attribute.isValidUtf8(), "Received an unparseable message with a non-utf8 attribute.");
attribute.isValidUtf8(),
String.format(
"Received an unparseable message with a non-utf8 attribute value: %s",
Base64.getEncoder().encodeToString(attribute.toByteArray())));
return attribute.toStringUtf8();
}

static MessageTransformer<SequencedMessage, PubsubMessage> addIdCpsSubscribeTransformer(
Partition partition, MessageTransformer<SequencedMessage, PubsubMessage> toWrap) {
return message -> {
PubsubMessage out = toWrap.transform(message);
checkArgument(
out.getMessageId().isEmpty(),
String.format("Received non-empty message id for PubsubMessage: %s", out));
return out.toBuilder()
.setMessageId(MessageMetadata.of(partition, message.offset()).encode())
.build();
};
}

public static MessageTransformer<SequencedMessage, PubsubMessage> toCpsSubscribeTransformer() {
return message -> {
PubsubMessage.Builder outBuilder =
toCpsPublishTransformer().transform(message.message()).toBuilder();
outBuilder.setMessageId(Long.toString(message.offset().value()));
outBuilder.setPublishTime(message.publishTime());
return outBuilder.build();
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@ public abstract class SubscriberSettings {
*/
abstract List<Partition> partitions();

/** The MessageTransformer to get PubsubMessages from Pub/Sub Lite wire messages. */
/**
* The MessageTransformer to get PubsubMessages from Pub/Sub Lite wire messages. The messageId
* field must not be set on the returned message.
*/
abstract Optional<MessageTransformer<SequencedMessage, PubsubMessage>> transformer();

/** A provider for credentials. */
Expand Down Expand Up @@ -155,7 +158,10 @@ public abstract static class Builder {
*/
public abstract Builder setPartitions(List<Partition> partition);

/** The MessageTransformer to get PubsubMessages from Pub/Sub Lite wire messages. */
/**
* The MessageTransformer to get PubsubMessages from Pub/Sub Lite wire messages. The messageId
* field must not be set on the returned message.
*/
public abstract Builder setTransformer(
MessageTransformer<SequencedMessage, PubsubMessage> transformer);

Expand Down Expand Up @@ -243,7 +249,8 @@ Subscriber newPartitionSubscriber(Partition partition) throws CheckedApiExceptio

return new SinglePartitionSubscriber(
receiver(),
transformer().orElse(MessageTransforms.toCpsSubscribeTransformer()),
MessageTransforms.addIdCpsSubscribeTransformer(
partition, transformer().orElse(MessageTransforms.toCpsSubscribeTransformer())),
new AckSetTrackerImpl(wireCommitter),
nackHandler().orElse(new NackHandler() {}),
messageConsumer -> wireSubscriberBuilder.setMessageConsumer(messageConsumer).build(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,22 +22,22 @@
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.MessageTransformer;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.TrivialProxyService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.pubsub.v1.PubsubMessage;

// A WrappingPublisher wraps the wire protocol client with a Cloud Pub/Sub api compliant
// publisher. It encodes a PublishMetadata object in the response string.
// publisher. It encodes a MessageMetadata object in the response string.
public class WrappingPublisher extends TrivialProxyService implements Publisher {
private final com.google.cloud.pubsublite.internal.Publisher<PublishMetadata> wirePublisher;
private final com.google.cloud.pubsublite.internal.Publisher<MessageMetadata> wirePublisher;
private final MessageTransformer<PubsubMessage, Message> transformer;

public WrappingPublisher(
com.google.cloud.pubsublite.internal.Publisher<PublishMetadata> wirePublisher,
com.google.cloud.pubsublite.internal.Publisher<MessageMetadata> wirePublisher,
MessageTransformer<PubsubMessage, Message> transformer)
throws ApiException {
super(wirePublisher);
Expand All @@ -58,7 +58,7 @@ public ApiFuture<String> publish(PubsubMessage message) {
}
return ApiFutures.transform(
wirePublisher.publish(wireMessage),
PublishMetadata::encode,
MessageMetadata::encode,
MoreExecutors.directExecutor());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import com.google.api.core.ApiFutures;
import com.google.api.core.ApiService;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.ProxyService;
Expand All @@ -39,23 +39,23 @@
import java.util.stream.LongStream;

public class PartitionCountWatchingPublisher extends ProxyService
implements Publisher<PublishMetadata> {
implements Publisher<MessageMetadata> {
private static final GoogleLogger log = GoogleLogger.forEnclosingClass();
private final PartitionPublisherFactory publisherFactory;
private final RoutingPolicy.Factory policyFactory;

private static class PartitionsWithRouting {
public final ImmutableMap<Partition, Publisher<PublishMetadata>> publishers;
public final ImmutableMap<Partition, Publisher<MessageMetadata>> publishers;
private final RoutingPolicy routingPolicy;

private PartitionsWithRouting(
ImmutableMap<Partition, Publisher<PublishMetadata>> publishers,
ImmutableMap<Partition, Publisher<MessageMetadata>> publishers,
RoutingPolicy routingPolicy) {
this.publishers = publishers;
this.routingPolicy = routingPolicy;
}

public ApiFuture<PublishMetadata> publish(Message message) throws CheckedApiException {
public ApiFuture<MessageMetadata> publish(Message message) throws CheckedApiException {
try {
Partition routedPartition =
message.key().isEmpty()
Expand All @@ -73,13 +73,13 @@ public ApiFuture<PublishMetadata> publish(Message message) throws CheckedApiExce
}

public void cancelOutstandingPublishes() {
for (Publisher<PublishMetadata> publisher : publishers.values()) {
for (Publisher<MessageMetadata> publisher : publishers.values()) {
publisher.cancelOutstandingPublishes();
}
}

public void flush() throws IOException {
for (Publisher<PublishMetadata> publisher : publishers.values()) {
for (Publisher<MessageMetadata> publisher : publishers.values()) {
publisher.flush();
}
}
Expand Down Expand Up @@ -109,7 +109,7 @@ public void stop() {
}

@Override
public ApiFuture<PublishMetadata> publish(Message message) {
public ApiFuture<MessageMetadata> publish(Message message) {
Optional<PartitionsWithRouting> partitions;
try (CloseableMonitor.Hold h = monitor.enter()) {
partitions = partitionsWithRouting;
Expand Down Expand Up @@ -150,12 +150,12 @@ public void flush() throws IOException {
partitions.get().flush();
}

private ImmutableMap<Partition, Publisher<PublishMetadata>> getNewPartitionPublishers(
private ImmutableMap<Partition, Publisher<MessageMetadata>> getNewPartitionPublishers(
LongStream newPartitions) {
ImmutableMap.Builder<Partition, Publisher<PublishMetadata>> mapBuilder = ImmutableMap.builder();
ImmutableMap.Builder<Partition, Publisher<MessageMetadata>> mapBuilder = ImmutableMap.builder();
newPartitions.forEach(
i -> {
Publisher<PublishMetadata> p = publisherFactory.newPublisher(Partition.of(i));
Publisher<MessageMetadata> p = publisherFactory.newPublisher(Partition.of(i));
p.addListener(
new Listener() {
@Override
Expand All @@ -167,7 +167,7 @@ public void failed(State from, Throwable failure) {
mapBuilder.put(Partition.of(i), p);
p.startAsync();
});
ImmutableMap<Partition, Publisher<PublishMetadata>> partitions = mapBuilder.build();
ImmutableMap<Partition, Publisher<MessageMetadata>> partitions = mapBuilder.build();
partitions.values().forEach(ApiService::awaitRunning);
return partitions;
}
Expand All @@ -189,12 +189,12 @@ private void handleConfig(long partitionCount) {
partitionCount);
return;
}
ImmutableMap.Builder<Partition, Publisher<PublishMetadata>> mapBuilder =
ImmutableMap.Builder<Partition, Publisher<MessageMetadata>> mapBuilder =
ImmutableMap.builder();
current.ifPresent(p -> p.publishers.forEach(mapBuilder::put));
getNewPartitionPublishers(LongStream.range(currentSize, partitionCount))
.forEach(mapBuilder::put);
ImmutableMap<Partition, Publisher<PublishMetadata>> newMap = mapBuilder.build();
ImmutableMap<Partition, Publisher<MessageMetadata>> newMap = mapBuilder.build();

partitionsWithRouting =
Optional.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public abstract static class Builder {
public abstract PartitionCountWatchingPublisherSettings build();
}

public Publisher<PublishMetadata> instantiate() throws ApiException {
public Publisher<MessageMetadata> instantiate() throws ApiException {
return new PartitionCountWatchingPublisher(
publisherFactory(),
DefaultRoutingPolicy::new,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,10 @@
package com.google.cloud.pubsublite.internal.wire;

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.internal.Publisher;

public interface PartitionPublisherFactory {
Publisher<PublishMetadata> newPublisher(Partition partition) throws ApiException;
Publisher<MessageMetadata> newPublisher(Partition partition) throws ApiException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,21 @@
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.internal.RoutingPolicy;
import com.google.cloud.pubsublite.internal.TrivialProxyService;
import java.io.IOException;
import java.util.Map;

public class RoutingPublisher extends TrivialProxyService implements Publisher<PublishMetadata> {
private final Map<Partition, Publisher<PublishMetadata>> partitionPublishers;
public class RoutingPublisher extends TrivialProxyService implements Publisher<MessageMetadata> {
private final Map<Partition, Publisher<MessageMetadata>> partitionPublishers;
private final RoutingPolicy policy;

RoutingPublisher(
Map<Partition, Publisher<PublishMetadata>> partitionPublishers, RoutingPolicy policy)
Map<Partition, Publisher<MessageMetadata>> partitionPublishers, RoutingPolicy policy)
throws ApiException {
super(partitionPublishers.values());
this.partitionPublishers = partitionPublishers;
Expand All @@ -46,7 +46,7 @@ public class RoutingPublisher extends TrivialProxyService implements Publisher<P

// Publisher implementation.
@Override
public ApiFuture<PublishMetadata> publish(Message message) {
public ApiFuture<MessageMetadata> publish(Message message) {
try {
Partition routedPartition =
message.key().isEmpty() ? policy.routeWithoutKey() : policy.route(message.key());
Expand All @@ -65,14 +65,14 @@ public ApiFuture<PublishMetadata> publish(Message message) {

@Override
public void cancelOutstandingPublishes() {
for (Publisher<PublishMetadata> publisher : partitionPublishers.values()) {
for (Publisher<MessageMetadata> publisher : partitionPublishers.values()) {
publisher.cancelOutstandingPublishes();
}
}

@Override
public void flush() throws IOException {
for (Publisher<PublishMetadata> publisher : partitionPublishers.values()) {
for (Publisher<MessageMetadata> publisher : partitionPublishers.values()) {
publisher.flush();
}
}
Expand Down
Loading

0 comments on commit a5cedf9

Please sign in to comment.