Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static com.google.common.base.Preconditions.checkState;

import com.google.auto.value.AutoValue;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.Message;
import java.io.IOException;
Expand All @@ -31,6 +32,7 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;
import javax.naming.SizeLimitExceededException;
import org.apache.beam.sdk.PipelineRunner;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.coders.Coder;
Expand Down Expand Up @@ -732,9 +734,20 @@ private PubsubIO() {}
/** Implementation of {@link #write}. */
@AutoValue
public abstract static class Write<T> extends PTransform<PCollection<T>, PDone> {
private static final int MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT = 10 * 1024 * 1024;
private static final int MAX_PUBLISH_BATCH_SIZE = 100;

@Nullable
abstract ValueProvider<PubsubTopic> getTopicProvider();

/** the batch size for bulk submissions to pubsub. */
@Nullable
abstract Integer getMaxBatchSize();

/** the maximum batch size, by bytes. */
@Nullable
abstract Integer getMaxBatchBytesSize();

/** The name of the message attribute to publish message timestamps in. */
@Nullable
abstract String getTimestampAttribute();
Expand All @@ -753,6 +766,10 @@ public abstract static class Write<T> extends PTransform<PCollection<T>, PDone>
abstract static class Builder<T> {
abstract Builder<T> setTopicProvider(ValueProvider<PubsubTopic> topicProvider);

abstract Builder<T> setMaxBatchSize(Integer batchSize);

abstract Builder<T> setMaxBatchBytesSize(Integer maxBatchBytesSize);

abstract Builder<T> setTimestampAttribute(String timestampAttribute);

abstract Builder<T> setIdAttribute(String idAttribute);
Expand All @@ -779,6 +796,29 @@ public Write<T> to(ValueProvider<String> topic) {
.build();
}

/**
* Writes to Pub/Sub are batched to efficiently send data. The value of the attribute will be a
* number representing the number of Pub/Sub messages to queue before sending off the bulk
* request. For example, if given 1000 the write sink will wait until 1000 messages have been
* received, or the pipeline has finished, whichever is first.
*
* <p>Pub/Sub has a limitation of 10mb per individual request/batch. This attribute was
* requested dynamic to allow larger Pub/Sub messages to be sent using this source. Thus
* allowing customizable batches and control of number of events before the 10mb size limit is
* hit.
*/
public Write<T> withMaxBatchSize(int batchSize) {
return toBuilder().setMaxBatchSize(batchSize).build();
}

/**
* Writes to Pub/Sub are limited by 10mb in general. This attribute controls the maximum allowed
* bytes to be sent to Pub/Sub in a single batched message.
*/
public Write<T> withMaxBatchBytesSize(int maxBatchBytesSize) {
return toBuilder().setMaxBatchBytesSize(maxBatchBytesSize).build();
}

/**
* Writes to Pub/Sub and adds each record's timestamp to the published messages in an attribute
* with the specified name. The value of the attribute will be a number representing the number
Expand Down Expand Up @@ -819,9 +859,15 @@ public PDone expand(PCollection<T> input) {
if (getTopicProvider() == null) {
throw new IllegalStateException("need to set the topic of a PubsubIO.Write transform");
}

switch (input.isBounded()) {
case BOUNDED:
input.apply(ParDo.of(new PubsubBoundedWriter()));
input.apply(
ParDo.of(
new PubsubBoundedWriter(
MoreObjects.firstNonNull(getMaxBatchSize(), MAX_PUBLISH_BATCH_SIZE),
MoreObjects.firstNonNull(
getMaxBatchBytesSize(), MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT))));
return PDone.in(input.getPipeline());
case UNBOUNDED:
return input
Expand All @@ -832,7 +878,12 @@ public PDone expand(PCollection<T> input) {
NestedValueProvider.of(getTopicProvider(), new TopicPathTranslator()),
getTimestampAttribute(),
getIdAttribute(),
100 /* numShards */));
100 /* numShards */,
MoreObjects.firstNonNull(
getMaxBatchSize(), PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_SIZE),
MoreObjects.firstNonNull(
getMaxBatchBytesSize(),
PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_BYTES)));
}
throw new RuntimeException(); // cases are exhaustive.
}
Expand All @@ -850,32 +901,57 @@ public void populateDisplayData(DisplayData.Builder builder) {
* <p>Public so can be suppressed by runners.
*/
public class PubsubBoundedWriter extends DoFn<T, Void> {

private static final int MAX_PUBLISH_BATCH_SIZE = 100;
private transient List<OutgoingMessage> output;
private transient PubsubClient pubsubClient;
private transient int currentOutputBytes;

private int maxPublishBatchByteSize;
private int maxPublishBatchSize;

PubsubBoundedWriter(int maxPublishBatchSize, int maxPublishBatchByteSize) {
this.maxPublishBatchSize = maxPublishBatchSize;
this.maxPublishBatchByteSize = maxPublishBatchByteSize;
}

PubsubBoundedWriter() {
this(MAX_PUBLISH_BATCH_SIZE, MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT);
}

@StartBundle
public void startBundle(StartBundleContext c) throws IOException {
this.output = new ArrayList<>();
this.currentOutputBytes = 0;

// NOTE: idAttribute is ignored.
this.pubsubClient =
FACTORY.newClient(
getTimestampAttribute(), null, c.getPipelineOptions().as(PubsubOptions.class));
}

@ProcessElement
public void processElement(ProcessContext c) throws IOException {
public void processElement(ProcessContext c) throws IOException, SizeLimitExceededException {
byte[] payload;
PubsubMessage message = getFormatFn().apply(c.element());
payload = message.getPayload();
Map<String, String> attributes = message.getAttributeMap();
// NOTE: The record id is always null.
output.add(new OutgoingMessage(payload, attributes, c.timestamp().getMillis(), null));

if (output.size() >= MAX_PUBLISH_BATCH_SIZE) {
if (payload.length > maxPublishBatchByteSize) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to default to success here and simply publish this message. Otherwise this could be considered a backwards-incompatible change, as it changes IO semantics.

String msg =
String.format(
"Pub/Sub message size (%d) exceeded maximum batch size (%d)",
payload.length, maxPublishBatchByteSize);
throw new SizeLimitExceededException(msg);
}

// Checking before adding the message stops us from violating the max bytes
if (((currentOutputBytes + payload.length) >= maxPublishBatchByteSize)
|| (output.size() >= maxPublishBatchSize)) {
publish();
}

// NOTE: The record id is always null.
output.add(new OutgoingMessage(payload, attributes, c.timestamp().getMillis(), null));
currentOutputBytes += payload.length;
}

@FinishBundle
Expand All @@ -884,6 +960,7 @@ public void finishBundle() throws IOException {
publish();
}
output = null;
currentOutputBytes = 0;
pubsubClient.close();
pubsubClient = null;
}
Expand All @@ -895,6 +972,7 @@ private void publish() throws IOException {
PubsubClient.topicPathFromName(topic.project, topic.topic), output);
checkState(n == output.size());
output.clear();
currentOutputBytes = 0;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,10 @@
*/
public class PubsubUnboundedSink extends PTransform<PCollection<PubsubMessage>, PDone> {
/** Default maximum number of messages per publish. */
private static final int DEFAULT_PUBLISH_BATCH_SIZE = 1000;
static final int DEFAULT_PUBLISH_BATCH_SIZE = 1000;

/** Default maximum size of a publish batch, in bytes. */
private static final int DEFAULT_PUBLISH_BATCH_BYTES = 400000;
static final int DEFAULT_PUBLISH_BATCH_BYTES = 400000;

/** Default longest delay between receiving a message and pushing it to Pubsub. */
private static final Duration DEFAULT_MAX_LATENCY = Duration.standardSeconds(2);
Expand Down Expand Up @@ -367,6 +367,25 @@ public PubsubUnboundedSink(
RecordIdMethod.RANDOM);
}

public PubsubUnboundedSink(
PubsubClientFactory pubsubFactory,
ValueProvider<TopicPath> topic,
String timestampAttribute,
String idAttribute,
int numShards,
int publishBatchSize,
int publishBatchBytes) {
this(
pubsubFactory,
topic,
timestampAttribute,
idAttribute,
numShards,
publishBatchSize,
publishBatchBytes,
DEFAULT_MAX_LATENCY,
RecordIdMethod.RANDOM);
}
/** Get the topic being written to. */
public TopicPath getTopic() {
return topic.get();
Expand Down