diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscribeTransform.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscribeTransform.java index b9f635f84..dd9675e0e 100644 --- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscribeTransform.java +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscribeTransform.java @@ -43,8 +43,6 @@ import org.joda.time.Duration; class SubscribeTransform extends PTransform> { - private static final Duration MAX_SLEEP_TIME = Duration.standardMinutes(1); - private final SubscriberOptions options; SubscribeTransform(SubscriberOptions options) { @@ -90,7 +88,7 @@ private RestrictionTracker newRestrictionTracke initial, options.getBacklogReader(subscriptionPartition.partition()), Stopwatch.createUnstarted(), - MAX_SLEEP_TIME.multipliedBy(3).dividedBy(4), + options.minBundleTimeout(), LongMath.saturatedMultiply(options.flowControlSettings().bytesOutstanding(), 10)); } @@ -136,7 +134,8 @@ public PCollection expand(PBegin input) { return subscriptionPartitions.apply( ParDo.of( new PerSubscriptionPartitionSdf( - MAX_SLEEP_TIME, + // Ensure we read for at least 5 seconds more than the bundle timeout. + options.minBundleTimeout().plus(Duration.standardSeconds(5)), this::newInitialOffsetReader, this::newRestrictionTracker, this::newPartitionProcessor, diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java index 066c331b4..69757efe8 100644 --- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java @@ -42,6 +42,7 @@ import java.io.Serializable; import java.util.Set; import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; @AutoValue public abstract class SubscriberOptions implements Serializable { @@ -51,6 +52,8 @@ public abstract class SubscriberOptions implements Serializable { private static final long MEBIBYTE = 1L << 20; + private static final Duration MIN_BUNDLE_TIMEOUT = Duration.standardMinutes(1); + public static final FlowControlSettings DEFAULT_FLOW_CONTROL = FlowControlSettings.builder() .setMessagesOutstanding(Long.MAX_VALUE) @@ -69,6 +72,14 @@ public abstract class SubscriberOptions implements Serializable { */ public abstract Set partitions(); + /** + * The minimum wall time to pass before allowing bundle closure. + * + *

Setting this to too small of a value will result in increased compute costs and lower + * throughput per byte. Immediate timeouts (Duration.ZERO) may be useful for testing. + */ + public abstract Duration minBundleTimeout(); + /** * A factory to override subscriber creation entirely and delegate to another method. Primarily * useful for testing. @@ -95,7 +106,10 @@ public abstract class SubscriberOptions implements Serializable { public static Builder newBuilder() { Builder builder = new AutoValue_SubscriberOptions.Builder(); - return builder.setPartitions(ImmutableSet.of()).setFlowControlSettings(DEFAULT_FLOW_CONTROL); + return builder + .setPartitions(ImmutableSet.of()) + .setFlowControlSettings(DEFAULT_FLOW_CONTROL) + .setMinBundleTimeout(MIN_BUNDLE_TIMEOUT); } public abstract Builder toBuilder(); @@ -188,6 +202,8 @@ public abstract static class Builder { public abstract Builder setFlowControlSettings(FlowControlSettings flowControlSettings); + public abstract Builder setMinBundleTimeout(Duration minBundleTimeout); + // Used in unit tests abstract Builder setSubscriberFactory(SubscriberFactory subscriberFactory); diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/Uuid.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/Uuid.java index b3ff52f54..1ddf6bd37 100644 --- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/Uuid.java +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/Uuid.java @@ -20,6 +20,7 @@ import com.google.protobuf.ByteString; import java.io.DataOutputStream; import java.io.IOException; +import java.util.Base64; import java.util.UUID; import org.apache.beam.sdk.coders.DefaultCoder; @@ -48,6 +49,9 @@ public static Uuid random() { } catch (IOException e) { throw new RuntimeException("Should never have an IOException since there is no io.", e); } - return Uuid.of(output.toByteString()); + // Encode to Base64 so the random UUIDs are valid if consumed from the Cloud Pub/Sub client. + return Uuid.of( + ByteString.copyFrom( + Base64.getEncoder().encode(output.toByteString().asReadOnlyByteBuffer()))); } }