Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

BEAM-13592 Add getOrderingKey in o.a.b.sdk.io.gcp.pubsub.PubsubMessage #22216

Merged
merged 15 commits into from Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Expand Up @@ -120,6 +120,7 @@
## New Features / Improvements

* Previously available in Java sdk, Python sdk now also supports logging level overrides per module. ([#18222](https://github.com/apache/beam/issues/18222)).
* Added support for accessing GCP PubSub Message ordering keys (Java) ([BEAM-13592](https://issues.apache.org/jira/browse/BEAM-13592))

## Breaking Changes

Expand Down
Expand Up @@ -49,6 +49,13 @@ public interface PubsubClientFactory extends Serializable {
* {@code timestampAttribute} and {@code idAttribute} to store custom timestamps/ids within
* message metadata.
*/
PubsubClient newClient(
@Nullable String timestampAttribute,
@Nullable String idAttribute,
PubsubOptions options,
@Nullable String rootUrlOverride)
throws IOException;

PubsubClient newClient(
@Nullable String timestampAttribute, @Nullable String idAttribute, PubsubOptions options)
throws IOException;
Expand Down Expand Up @@ -318,6 +325,9 @@ public static OutgoingMessage of(
if (message.getAttributeMap() != null) {
builder.putAllAttributes(message.getAttributeMap());
}
if (message.getOrderingKey() != null) {
builder.setOrderingKey(message.getOrderingKey());
}
return of(builder.build(), timestampMsSinceEpoch, recordId);
}
}
Expand Down
Expand Up @@ -37,6 +37,9 @@ public List<CoderProvider> getCoderProviders() {
TypeDescriptor.of(PubsubMessage.class), PubsubMessageWithMessageIdCoder.of()),
CoderProviders.forCoder(
TypeDescriptor.of(PubsubMessage.class),
PubsubMessageWithAttributesAndMessageIdCoder.of()));
PubsubMessageWithAttributesAndMessageIdCoder.of()),
CoderProviders.forCoder(
TypeDescriptor.of(PubsubMessage.class),
PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder.of()));
}
}
Expand Up @@ -57,6 +57,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand Down Expand Up @@ -84,11 +85,22 @@ private static class PubsubGrpcClientFactory implements PubsubClientFactory {
public PubsubClient newClient(
@Nullable String timestampAttribute, @Nullable String idAttribute, PubsubOptions options)
throws IOException {

return newClient(timestampAttribute, idAttribute, options, null);
}

@Override
public PubsubClient newClient(
@Nullable String timestampAttribute,
@Nullable String idAttribute,
PubsubOptions options,
String rootUrlOverride)
throws IOException {
return new PubsubGrpcClient(
timestampAttribute,
idAttribute,
DEFAULT_TIMEOUT_S,
channelForRootUrl(options.getPubsubRootUrl()),
channelForRootUrl(MoreObjects.firstNonNull(rootUrlOverride, options.getPubsubRootUrl())),
options.getGcpCredential());
}

Expand Down Expand Up @@ -190,7 +202,8 @@ private SubscriberBlockingStub subscriberStub() throws IOException {
public int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages) throws IOException {
PublishRequest.Builder request = PublishRequest.newBuilder().setTopic(topic.getPath());
for (OutgoingMessage outgoingMessage : outgoingMessages) {
PubsubMessage.Builder message = outgoingMessage.message().toBuilder();
PubsubMessage.Builder message =
outgoingMessage.message().toBuilder().clearMessageId().clearPublishTime();

if (timestampAttribute != null) {
message.putAttributes(
Expand Down
Expand Up @@ -556,6 +556,19 @@ public static Read<PubsubMessage> readMessagesWithAttributesAndMessageId() {
.build();
}

/**
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe to support bounded writes, on line 1329 of this file you'll need to add explicit copying of the orderingKey as well (in the output.add(OutgoingMessage.of(... section of processElement

* Returns A {@link PTransform} that continuously reads from a Google Cloud Pub/Sub stream. The
* messages will contain a {@link PubsubMessage#getPayload() payload}, {@link
* PubsubMessage#getAttributeMap() attributes}, along with the {@link PubsubMessage#getMessageId()
* messageId} and {PubsubMessage#getOrderingKey() orderingKey} from PubSub.
*/
public static Read<PubsubMessage> readMessagesWithAttributesAndMessageIdAndOrderingKey() {
return Read.newBuilder()
.setCoder(PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder.of())
.setNeedsOrderingKey(true)
.build();
}

/**
* Returns A {@link PTransform} that continuously reads UTF-8 encoded strings from a Google Cloud
* Pub/Sub stream.
Expand Down Expand Up @@ -767,6 +780,8 @@ public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>>

abstract boolean getNeedsMessageId();

abstract boolean getNeedsOrderingKey();

abstract Builder<T> toBuilder();

static <T> Builder<T> newBuilder(SerializableFunction<PubsubMessage, T> parseFn) {
Expand All @@ -775,6 +790,7 @@ static <T> Builder<T> newBuilder(SerializableFunction<PubsubMessage, T> parseFn)
builder.setPubsubClientFactory(FACTORY);
builder.setNeedsAttributes(false);
builder.setNeedsMessageId(false);
builder.setNeedsOrderingKey(false);
return builder;
}

Expand Down Expand Up @@ -814,6 +830,8 @@ abstract static class Builder<T> {

abstract Builder<T> setNeedsMessageId(boolean needsMessageId);

abstract Builder<T> setNeedsOrderingKey(boolean needsOrderingKey);

abstract Builder<T> setClock(Clock clock);

abstract Read<T> build();
Expand Down Expand Up @@ -1021,7 +1039,8 @@ public PCollection<T> expand(PBegin input) {
getTimestampAttribute(),
getIdAttribute(),
getNeedsAttributes(),
getNeedsMessageId());
getNeedsMessageId(),
getNeedsOrderingKey());

PCollection<T> read;
PCollection<PubsubMessage> preParse = input.apply(source);
Expand Down Expand Up @@ -1126,6 +1145,8 @@ public abstract static class Write<T> extends PTransform<PCollection<T>, PDone>
/** The format function for input PubsubMessage objects. */
abstract SerializableFunction<T, PubsubMessage> getFormatFn();

abstract @Nullable String getPubsubRootUrl();

abstract Builder<T> toBuilder();

static <T> Builder<T> newBuilder(SerializableFunction<T, PubsubMessage> formatFn) {
Expand Down Expand Up @@ -1155,6 +1176,8 @@ abstract static class Builder<T> {

abstract Builder<T> setFormatFn(SerializableFunction<T, PubsubMessage> formatFn);

abstract Builder<T> setPubsubRootUrl(String pubsubRootUrl);

abstract Write<T> build();
}

Expand Down Expand Up @@ -1234,6 +1257,10 @@ public Write<T> withIdAttribute(String idAttribute) {
return toBuilder().setIdAttribute(idAttribute).build();
}

public Write<T> withPubsubRootUrl(String pubsubRootUrl) {
return toBuilder().setPubsubRootUrl(pubsubRootUrl).build();
}

@Override
public PDone expand(PCollection<T> input) {
if (getTopicProvider() == null) {
Expand Down Expand Up @@ -1273,8 +1300,8 @@ public PDone expand(PCollection<T> input) {
MoreObjects.firstNonNull(
getMaxBatchSize(), PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_SIZE),
MoreObjects.firstNonNull(
getMaxBatchBytesSize(),
PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_BYTES)));
getMaxBatchBytesSize(), PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_BYTES),
getPubsubRootUrl()));
}
throw new RuntimeException(); // cases are exhaustive.
}
Expand Down Expand Up @@ -1341,16 +1368,19 @@ public void processElement(ProcessContext c) throws IOException, SizeLimitExceed

byte[] payload = message.getPayload();
Map<String, String> attributes = message.getAttributeMap();
String orderingKey = message.getOrderingKey();

com.google.pubsub.v1.PubsubMessage.Builder msgBuilder =
com.google.pubsub.v1.PubsubMessage.newBuilder()
.setData(ByteString.copyFrom(payload))
.putAllAttributes(attributes);

if (orderingKey != null) {
msgBuilder.setOrderingKey(orderingKey);
}

// NOTE: The record id is always null.
output.add(
OutgoingMessage.of(
com.google.pubsub.v1.PubsubMessage.newBuilder()
.setData(ByteString.copyFrom(payload))
.putAllAttributes(attributes)
.build(),
c.timestamp().getMillis(),
null));
output.add(OutgoingMessage.of(msgBuilder.build(), c.timestamp().getMillis(), null));
currentOutputBytes += messageSize;
}

Expand Down
Expand Up @@ -48,6 +48,7 @@
import org.apache.beam.sdk.extensions.gcp.util.RetryHttpRequestInitializer;
import org.apache.beam.sdk.extensions.gcp.util.Transport;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.checkerframework.checker.nullness.qual.Nullable;
Expand All @@ -73,6 +74,17 @@ private static HttpRequestInitializer chainHttpRequestInitializer(
public PubsubClient newClient(
@Nullable String timestampAttribute, @Nullable String idAttribute, PubsubOptions options)
throws IOException {

return newClient(timestampAttribute, idAttribute, options, null);
}

@Override
public PubsubClient newClient(
@Nullable String timestampAttribute,
@Nullable String idAttribute,
PubsubOptions options,
String rootUrlOverride)
throws IOException {
Pubsub pubsub =
new Pubsub.Builder(
Transport.getTransport(),
Expand All @@ -82,7 +94,7 @@ public PubsubClient newClient(
// Do not log 404. It clutters the output and is possibly even required by the
// caller.
new RetryHttpRequestInitializer(ImmutableList.of(404))))
.setRootUrl(options.getPubsubRootUrl())
.setRootUrl(MoreObjects.firstNonNull(rootUrlOverride, options.getPubsubRootUrl()))
.setApplicationName(options.getAppName())
.setGoogleClientRequestInitializer(options.getGoogleApiTrace())
.build();
Expand Down Expand Up @@ -133,6 +145,8 @@ public int publish(TopicPath topic, List<OutgoingMessage> outgoingMessages) thro
if (!outgoingMessage.message().getOrderingKey().isEmpty()) {
pubsubMessage.setOrderingKey(outgoingMessage.message().getOrderingKey());
}

// N.B. publishTime and messageId are intentionally not set on the message that is published
pubsubMessages.add(pubsubMessage);
}
PublishRequest request = new PublishRequest().setMessages(pubsubMessages);
Expand Down
Expand Up @@ -25,7 +25,7 @@

/**
* Class representing a Pub/Sub message. Each message contains a single message payload, a map of
* attached attributes, and a message id.
* attached attributes, a message id and an ordering key.
*/
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
Expand All @@ -40,21 +40,34 @@ abstract static class Impl {

abstract @Nullable String getMessageId();

abstract @Nullable String getOrderingKey();

static Impl create(
byte[] payload, @Nullable Map<String, String> attributes, @Nullable String messageId) {
return new AutoValue_PubsubMessage_Impl(payload, attributes, messageId);
byte[] payload,
@Nullable Map<String, String> attributes,
@Nullable String messageId,
@Nullable String orderingKey) {
return new AutoValue_PubsubMessage_Impl(payload, attributes, messageId, orderingKey);
}
}

private Impl impl;

public PubsubMessage(byte[] payload, @Nullable Map<String, String> attributes) {
this(payload, attributes, null);
this(payload, attributes, null, null);
}

public PubsubMessage(
byte[] payload, @Nullable Map<String, String> attributes, @Nullable String messageId) {
impl = Impl.create(payload, attributes, messageId);
impl = Impl.create(payload, attributes, messageId, null);
}

public PubsubMessage(
byte[] payload,
@Nullable Map<String, String> attributes,
@Nullable String messageId,
@Nullable String orderingKey) {
impl = Impl.create(payload, attributes, messageId, orderingKey);
}

/** Returns the main PubSub message. */
Expand All @@ -78,6 +91,11 @@ public byte[] getPayload() {
return impl.getMessageId();
}

/** Returns the ordering key of the message. */
public @Nullable String getOrderingKey() {
return impl.getOrderingKey();
}

@Override
public String toString() {
return impl.toString();
Expand Down
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.pubsub;

import com.google.protobuf.Timestamp;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.Map;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.coders.MapCoder;
import org.apache.beam.sdk.coders.NullableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.extensions.protobuf.ProtoCoder;
import org.apache.beam.sdk.values.TypeDescriptor;

/** A coder for PubsubMessage including all fields of a PubSub message from server. */
@SuppressWarnings({
"nullness" // TODO(https://github.com/apache/beam/issues/20497)
})
public class PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder
extends CustomCoder<PubsubMessage> {
// A message's payload cannot be null
private static final Coder<byte[]> PAYLOAD_CODER = ByteArrayCoder.of();
// A message's attributes can be null.
private static final Coder<Map<String, String>> ATTRIBUTES_CODER =
NullableCoder.of(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
// A message's messageId cannot be null
private static final Coder<String> MESSAGE_ID_CODER = StringUtf8Coder.of();
// A message's publish time, populated by server
private static final Coder<Timestamp> PUBLISH_TIME_CODER = ProtoCoder.of(Timestamp.class);
// A message's ordering key can be null
private static final Coder<String> ORDERING_KEY_CODER = NullableCoder.of(StringUtf8Coder.of());

public static Coder<PubsubMessage> of(TypeDescriptor<PubsubMessage> ignored) {
return of();
}

public static PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder of() {
return new PubsubMessageWithAttributesAndMessageIdAndOrderingKeyCoder();
}

@Override
public void encode(PubsubMessage value, OutputStream outStream) throws IOException {
PAYLOAD_CODER.encode(value.getPayload(), outStream);
ATTRIBUTES_CODER.encode(value.getAttributeMap(), outStream);
MESSAGE_ID_CODER.encode(value.getMessageId(), outStream);
// TODO(discuss what to do with publish_time field)
PUBLISH_TIME_CODER.encode(Timestamp.getDefaultInstance(), outStream);
ORDERING_KEY_CODER.encode(value.getOrderingKey(), outStream);
}

@Override
public PubsubMessage decode(InputStream inStream) throws IOException {
byte[] payload = PAYLOAD_CODER.decode(inStream);
Map<String, String> attributes = ATTRIBUTES_CODER.decode(inStream);
String messageId = MESSAGE_ID_CODER.decode(inStream);
PUBLISH_TIME_CODER.decode(inStream);
String orderingKey = ORDERING_KEY_CODER.decode(inStream);
return new PubsubMessage(payload, attributes, messageId, orderingKey);
}
}