From b47133f512869c664c9c93074d0aef5b02662678 Mon Sep 17 00:00:00 2001 From: Carl McGraw Date: Sat, 22 Jul 2017 15:30:40 -0700 Subject: [PATCH 1/5] Added maxPublishBatchSize parameter to PubsubBoundedWriter class. --- .../beam/sdk/io/gcp/pubsub/PubsubIO.java | 29 +++++++++++++++++-- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index 5f4027adce50..2015c43a7213 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -491,7 +491,9 @@ public static Read readAvros(Class clazz) { /** Returns A {@link PTransform} that writes to a Google Cloud Pub/Sub stream. */ private static Write write() { - return new AutoValue_PubsubIO_Write.Builder().build(); + return new org.apache.beam.sdk.io.gcp.pubsub.AutoValue_PubsubIO_Write.Builder() + .setBatchSize(100) + .build(); } /** Returns A {@link PTransform} that writes to a Google Cloud Pub/Sub stream. */ @@ -735,6 +737,9 @@ public abstract static class Write extends PTransform, PDone> @Nullable abstract ValueProvider getTopicProvider(); + /** the batch size for bulk submissions to pubsub. */ + abstract int getBatchSize(); + /** The name of the message attribute to publish message timestamps in. */ @Nullable abstract String getTimestampAttribute(); @@ -753,6 +758,8 @@ public abstract static class Write extends PTransform, PDone> abstract static class Builder { abstract Builder setTopicProvider(ValueProvider topicProvider); + abstract Builder setBatchSize(int batchSize); + abstract Builder setTimestampAttribute(String timestampAttribute); abstract Builder setIdAttribute(String idAttribute); @@ -779,6 +786,21 @@ public Write to(ValueProvider topic) { .build(); } + /** + * Writes to Pub/Sub are batched to efficiently send data. The value of the attribute will + * be a number representing the number of Pub/Sub messages to queue before sending off the + * bulk request. For example, if given 1000 the write sink will wait until 1000 messages have + * been received, or the pipeline has finished, whichever is first. + * + *

Pub/Sub has a limitation of 10mb per individual request/batch. This attribute was + * requested dynamic to allow larger Pub/Sub messages to be sent using this source. Thus + * allowing customizable batches and control of number of events before the 10mb size limit + * is hit. + */ + public Write withBatchSize(int batchSize) { + return toBuilder().setBatchSize(batchSize).build(); + } + /** * Writes to Pub/Sub and adds each record's timestamp to the published messages in an attribute * with the specified name. The value of the attribute will be a number representing the number @@ -851,12 +873,13 @@ public void populateDisplayData(DisplayData.Builder builder) { */ public class PubsubBoundedWriter extends DoFn { - private static final int MAX_PUBLISH_BATCH_SIZE = 100; private transient List output; private transient PubsubClient pubsubClient; + private int maxPublishBatchSize; @StartBundle public void startBundle(StartBundleContext c) throws IOException { + this.maxPublishBatchSize = getBatchSize(); this.output = new ArrayList<>(); // NOTE: idAttribute is ignored. this.pubsubClient = @@ -873,7 +896,7 @@ public void processElement(ProcessContext c) throws IOException { // NOTE: The record id is always null. output.add(new OutgoingMessage(payload, attributes, c.timestamp().getMillis(), null)); - if (output.size() >= MAX_PUBLISH_BATCH_SIZE) { + if (output.size() >= maxPublishBatchSize) { publish(); } } From fa2d11099a318cc352b6c1fb082a682a7b90a965 Mon Sep 17 00:00:00 2001 From: Carl McGraw Date: Sat, 22 Jul 2017 17:30:18 -0700 Subject: [PATCH 2/5] updated BoundedPubsubWriter to dynamically flush if queued messages exceed a pre-defined maximum batch byte size --- .../beam/sdk/io/gcp/pubsub/PubsubIO.java | 44 ++++++++++++++++--- 1 file changed, 39 insertions(+), 5 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index 2015c43a7213..acd749e10c4c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -31,6 +31,8 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import javax.annotation.Nullable; +import javax.naming.SizeLimitExceededException; + import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.Coder; @@ -492,7 +494,8 @@ public static Read readAvros(Class clazz) { /** Returns A {@link PTransform} that writes to a Google Cloud Pub/Sub stream. */ private static Write write() { return new org.apache.beam.sdk.io.gcp.pubsub.AutoValue_PubsubIO_Write.Builder() - .setBatchSize(100) + .setBatchSize(1000) + .setMaxBatchBytesSize(1000000) .build(); } @@ -740,6 +743,9 @@ public abstract static class Write extends PTransform, PDone> /** the batch size for bulk submissions to pubsub. */ abstract int getBatchSize(); + /** the maximum batch size, by bytes. */ + abstract int getMaxBatchBytesSize(); + /** The name of the message attribute to publish message timestamps in. */ @Nullable abstract String getTimestampAttribute(); @@ -760,6 +766,8 @@ abstract static class Builder { abstract Builder setBatchSize(int batchSize); + abstract Builder setMaxBatchBytesSize(int maxBatchBytesSize); + abstract Builder setTimestampAttribute(String timestampAttribute); abstract Builder setIdAttribute(String idAttribute); @@ -801,6 +809,14 @@ public Write withBatchSize(int batchSize) { return toBuilder().setBatchSize(batchSize).build(); } + /** + * Writes to Pub/Sub are limited by 10mb in general. This attribute controls the maximum allowed + * bytes to be sent to Pub/Sub in a single batched message. + */ + public Write withMaxBatchBytesSize(int maxBatchBytesSize) { + return toBuilder().setMaxBatchBytesSize(maxBatchBytesSize).build(); + } + /** * Writes to Pub/Sub and adds each record's timestamp to the published messages in an attribute * with the specified name. The value of the attribute will be a number representing the number @@ -875,12 +891,18 @@ public class PubsubBoundedWriter extends DoFn { private transient List output; private transient PubsubClient pubsubClient; + private transient int currentOutputBytes; + + private int maxPublishBatchByteSize; private int maxPublishBatchSize; @StartBundle public void startBundle(StartBundleContext c) throws IOException { + this.maxPublishBatchByteSize = getMaxBatchBytesSize(); this.maxPublishBatchSize = getBatchSize(); this.output = new ArrayList<>(); + this.currentOutputBytes = 0; + // NOTE: idAttribute is ignored. this.pubsubClient = FACTORY.newClient( @@ -888,17 +910,27 @@ public void startBundle(StartBundleContext c) throws IOException { } @ProcessElement - public void processElement(ProcessContext c) throws IOException { + public void processElement(ProcessContext c) throws IOException, SizeLimitExceededException { byte[] payload; PubsubMessage message = getFormatFn().apply(c.element()); payload = message.getPayload(); Map attributes = message.getAttributeMap(); - // NOTE: The record id is always null. - output.add(new OutgoingMessage(payload, attributes, c.timestamp().getMillis(), null)); - if (output.size() >= maxPublishBatchSize) { + if (payload.length > maxPublishBatchByteSize) { + String msg = String.format("Pub/Sub message size (%d) exceeded maximum batch size (%d)", + payload.length, maxPublishBatchByteSize); + throw new SizeLimitExceededException(msg); + } + + // Checking before adding the message stops us from violating the max bytes + if (((currentOutputBytes + payload.length) >= maxPublishBatchByteSize) + || (output.size() >= maxPublishBatchSize)) { publish(); } + + // NOTE: The record id is always null. + output.add(new OutgoingMessage(payload, attributes, c.timestamp().getMillis(), null)); + currentOutputBytes += payload.length; } @FinishBundle @@ -907,6 +939,7 @@ public void finishBundle() throws IOException { publish(); } output = null; + currentOutputBytes = 0; pubsubClient.close(); pubsubClient = null; } @@ -918,6 +951,7 @@ private void publish() throws IOException { PubsubClient.topicPathFromName(topic.project, topic.topic), output); checkState(n == output.size()); output.clear(); + currentOutputBytes = 0; } @Override From 9899d8af3864545c8fad971e1787157b45cb8477 Mon Sep 17 00:00:00 2001 From: Carl McGraw Date: Sat, 22 Jul 2017 18:17:03 -0700 Subject: [PATCH 3/5] updated UnboundedPubsubSink to accept new parameters. --- .../beam/sdk/io/gcp/pubsub/PubsubIO.java | 50 +++++++++++++------ .../io/gcp/pubsub/PubsubUnboundedSink.java | 12 +++++ 2 files changed, 47 insertions(+), 15 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index acd749e10c4c..e3cae3425b83 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -20,6 +20,7 @@ import static com.google.common.base.Preconditions.checkState; import com.google.auto.value.AutoValue; +import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableMap; import com.google.protobuf.Message; import java.io.IOException; @@ -493,10 +494,7 @@ public static Read readAvros(Class clazz) { /** Returns A {@link PTransform} that writes to a Google Cloud Pub/Sub stream. */ private static Write write() { - return new org.apache.beam.sdk.io.gcp.pubsub.AutoValue_PubsubIO_Write.Builder() - .setBatchSize(1000) - .setMaxBatchBytesSize(1000000) - .build(); + return new AutoValue_PubsubIO_Write.Builder().build(); } /** Returns A {@link PTransform} that writes to a Google Cloud Pub/Sub stream. */ @@ -737,14 +735,19 @@ private PubsubIO() {} /** Implementation of {@link #write}. */ @AutoValue public abstract static class Write extends PTransform, PDone> { + private static final int MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT = 1000000; + private static final int MAX_PUBLISH_BATCH_SIZE = 100; + @Nullable abstract ValueProvider getTopicProvider(); /** the batch size for bulk submissions to pubsub. */ - abstract int getBatchSize(); + @Nullable + abstract Integer getMaxBatchSize(); /** the maximum batch size, by bytes. */ - abstract int getMaxBatchBytesSize(); + @Nullable + abstract Integer getMaxBatchBytesSize(); /** The name of the message attribute to publish message timestamps in. */ @Nullable @@ -764,9 +767,9 @@ public abstract static class Write extends PTransform, PDone> abstract static class Builder { abstract Builder setTopicProvider(ValueProvider topicProvider); - abstract Builder setBatchSize(int batchSize); + abstract Builder setMaxBatchSize(Integer batchSize); - abstract Builder setMaxBatchBytesSize(int maxBatchBytesSize); + abstract Builder setMaxBatchBytesSize(Integer maxBatchBytesSize); abstract Builder setTimestampAttribute(String timestampAttribute); @@ -805,8 +808,8 @@ public Write to(ValueProvider topic) { * allowing customizable batches and control of number of events before the 10mb size limit * is hit. */ - public Write withBatchSize(int batchSize) { - return toBuilder().setBatchSize(batchSize).build(); + public Write withMaxBatchSize(int batchSize) { + return toBuilder().setMaxBatchSize(batchSize).build(); } /** @@ -857,9 +860,15 @@ public PDone expand(PCollection input) { if (getTopicProvider() == null) { throw new IllegalStateException("need to set the topic of a PubsubIO.Write transform"); } + switch (input.isBounded()) { case BOUNDED: - input.apply(ParDo.of(new PubsubBoundedWriter())); + input.apply(ParDo.of(new PubsubBoundedWriter( + MoreObjects.firstNonNull(getMaxBatchSize(), + MAX_PUBLISH_BATCH_SIZE), + MoreObjects.firstNonNull(getMaxBatchBytesSize(), + MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT) + ))); return PDone.in(input.getPipeline()); case UNBOUNDED: return input @@ -870,7 +879,12 @@ public PDone expand(PCollection input) { NestedValueProvider.of(getTopicProvider(), new TopicPathTranslator()), getTimestampAttribute(), getIdAttribute(), - 100 /* numShards */)); + 100 /* numShards */)), + MoreObjects.firstNonNull(getMaxBatchSize(), + PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_SIZE), + MoreObjects.firstNonNull(getMaxBatchBytesSize(), + PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_BYTES) + )); } throw new RuntimeException(); // cases are exhaustive. } @@ -888,7 +902,6 @@ public void populateDisplayData(DisplayData.Builder builder) { *

Public so can be suppressed by runners. */ public class PubsubBoundedWriter extends DoFn { - private transient List output; private transient PubsubClient pubsubClient; private transient int currentOutputBytes; @@ -896,10 +909,17 @@ public class PubsubBoundedWriter extends DoFn { private int maxPublishBatchByteSize; private int maxPublishBatchSize; + PubsubBoundedWriter(int maxPublishBatchSize, int maxPublishBatchByteSize) { + this.maxPublishBatchSize = maxPublishBatchSize; + this.maxPublishBatchByteSize = maxPublishBatchByteSize; + } + + PubsubBoundedWriter() { + this(MAX_PUBLISH_BATCH_SIZE, MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT); + } + @StartBundle public void startBundle(StartBundleContext c) throws IOException { - this.maxPublishBatchByteSize = getMaxBatchBytesSize(); - this.maxPublishBatchSize = getBatchSize(); this.output = new ArrayList<>(); this.currentOutputBytes = 0; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java index e9c8e73b7f97..346085ca4129 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java @@ -367,6 +367,18 @@ public PubsubUnboundedSink( RecordIdMethod.RANDOM); } + public PubsubUnboundedSink( + PubsubClientFactory pubsubFactory, + ValueProvider topic, + String timestampAttribute, + String idAttribute, + int numShards, + int publishBatchSize, + int publishBatchBytes) { + this(pubsubFactory, topic, timestampAttribute, idAttribute, numShards, + publishBatchSize, publishBatchBytes, DEFAULT_MAX_LATENCY, + RecordIdMethod.RANDOM); + } /** Get the topic being written to. */ public TopicPath getTopic() { return topic.get(); From c83e853f8f0cd25c2c2d75fd94da42ced506ce96 Mon Sep 17 00:00:00 2001 From: Alexey Romanenko Date: Wed, 11 Jul 2018 11:34:28 +0200 Subject: [PATCH 4/5] Resolve merging conflicts --- .../beam/sdk/io/gcp/pubsub/PubsubIO.java | 43 ++++++++++--------- .../io/gcp/pubsub/PubsubUnboundedSink.java | 31 +++++++------ 2 files changed, 41 insertions(+), 33 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index e3cae3425b83..dd561421b925 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -33,7 +33,6 @@ import java.util.regex.Pattern; import javax.annotation.Nullable; import javax.naming.SizeLimitExceededException; - import org.apache.beam.sdk.PipelineRunner; import org.apache.beam.sdk.coders.AvroCoder; import org.apache.beam.sdk.coders.Coder; @@ -798,15 +797,15 @@ public Write to(ValueProvider topic) { } /** - * Writes to Pub/Sub are batched to efficiently send data. The value of the attribute will - * be a number representing the number of Pub/Sub messages to queue before sending off the - * bulk request. For example, if given 1000 the write sink will wait until 1000 messages have - * been received, or the pipeline has finished, whichever is first. + * Writes to Pub/Sub are batched to efficiently send data. The value of the attribute will be a + * number representing the number of Pub/Sub messages to queue before sending off the bulk + * request. For example, if given 1000 the write sink will wait until 1000 messages have been + * received, or the pipeline has finished, whichever is first. * *

Pub/Sub has a limitation of 10mb per individual request/batch. This attribute was * requested dynamic to allow larger Pub/Sub messages to be sent using this source. Thus - * allowing customizable batches and control of number of events before the 10mb size limit - * is hit. + * allowing customizable batches and control of number of events before the 10mb size limit is + * hit. */ public Write withMaxBatchSize(int batchSize) { return toBuilder().setMaxBatchSize(batchSize).build(); @@ -863,12 +862,12 @@ public PDone expand(PCollection input) { switch (input.isBounded()) { case BOUNDED: - input.apply(ParDo.of(new PubsubBoundedWriter( - MoreObjects.firstNonNull(getMaxBatchSize(), - MAX_PUBLISH_BATCH_SIZE), - MoreObjects.firstNonNull(getMaxBatchBytesSize(), - MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT) - ))); + input.apply( + ParDo.of( + new PubsubBoundedWriter( + MoreObjects.firstNonNull(getMaxBatchSize(), MAX_PUBLISH_BATCH_SIZE), + MoreObjects.firstNonNull( + getMaxBatchBytesSize(), MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT)))); return PDone.in(input.getPipeline()); case UNBOUNDED: return input @@ -879,12 +878,12 @@ public PDone expand(PCollection input) { NestedValueProvider.of(getTopicProvider(), new TopicPathTranslator()), getTimestampAttribute(), getIdAttribute(), - 100 /* numShards */)), - MoreObjects.firstNonNull(getMaxBatchSize(), - PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_SIZE), - MoreObjects.firstNonNull(getMaxBatchBytesSize(), - PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_BYTES) - )); + 100 /* numShards */, + MoreObjects.firstNonNull( + getMaxBatchSize(), PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_SIZE), + MoreObjects.firstNonNull( + getMaxBatchBytesSize(), + PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_BYTES))); } throw new RuntimeException(); // cases are exhaustive. } @@ -937,14 +936,16 @@ public void processElement(ProcessContext c) throws IOException, SizeLimitExceed Map attributes = message.getAttributeMap(); if (payload.length > maxPublishBatchByteSize) { - String msg = String.format("Pub/Sub message size (%d) exceeded maximum batch size (%d)", + String msg = + String.format( + "Pub/Sub message size (%d) exceeded maximum batch size (%d)", payload.length, maxPublishBatchByteSize); throw new SizeLimitExceededException(msg); } // Checking before adding the message stops us from violating the max bytes if (((currentOutputBytes + payload.length) >= maxPublishBatchByteSize) - || (output.size() >= maxPublishBatchSize)) { + || (output.size() >= maxPublishBatchSize)) { publish(); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java index 346085ca4129..118b9317a2ce 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java @@ -84,10 +84,10 @@ */ public class PubsubUnboundedSink extends PTransform, PDone> { /** Default maximum number of messages per publish. */ - private static final int DEFAULT_PUBLISH_BATCH_SIZE = 1000; + static final int DEFAULT_PUBLISH_BATCH_SIZE = 1000; /** Default maximum size of a publish batch, in bytes. */ - private static final int DEFAULT_PUBLISH_BATCH_BYTES = 400000; + static final int DEFAULT_PUBLISH_BATCH_BYTES = 400000; /** Default longest delay between receiving a message and pushing it to Pubsub. */ private static final Duration DEFAULT_MAX_LATENCY = Duration.standardSeconds(2); @@ -368,16 +368,23 @@ public PubsubUnboundedSink( } public PubsubUnboundedSink( - PubsubClientFactory pubsubFactory, - ValueProvider topic, - String timestampAttribute, - String idAttribute, - int numShards, - int publishBatchSize, - int publishBatchBytes) { - this(pubsubFactory, topic, timestampAttribute, idAttribute, numShards, - publishBatchSize, publishBatchBytes, DEFAULT_MAX_LATENCY, - RecordIdMethod.RANDOM); + PubsubClientFactory pubsubFactory, + ValueProvider topic, + String timestampAttribute, + String idAttribute, + int numShards, + int publishBatchSize, + int publishBatchBytes) { + this( + pubsubFactory, + topic, + timestampAttribute, + idAttribute, + numShards, + publishBatchSize, + publishBatchBytes, + DEFAULT_MAX_LATENCY, + RecordIdMethod.RANDOM); } /** Get the topic being written to. */ public TopicPath getTopic() { From 806fa1919e62f54887cdae88cabf42f93d08bb9e Mon Sep 17 00:00:00 2001 From: Carl McGraw Date: Wed, 25 Jul 2018 17:06:18 -0700 Subject: [PATCH 5/5] set maximum batch size to 10mb (10 * 1024 * 1024) --- .../main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index dd561421b925..e39b44f95bd1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -734,7 +734,7 @@ private PubsubIO() {} /** Implementation of {@link #write}. */ @AutoValue public abstract static class Write extends PTransform, PDone> { - private static final int MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT = 1000000; + private static final int MAX_PUBLISH_BATCH_BYTE_SIZE_DEFAULT = 10 * 1024 * 1024; private static final int MAX_PUBLISH_BATCH_SIZE = 100; @Nullable