From fbc67d4e6ca45e52eade7a65c1d933cff5273241 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Wed, 17 Mar 2021 14:26:19 -0400 Subject: [PATCH 1/2] fix: Two QOL issues with PubsubLiteIO - UUIDs generated randomly are not valid UTF-8 strings so they cannot be read from the CPS shim by default - Restrictions on bundle closing make it hard to test PSL with windowing pipelines --- .../cloud/pubsublite/beam/SubscribeTransform.java | 8 ++++++++ .../cloud/pubsublite/beam/SubscriberOptions.java | 15 ++++++++++++++- .../com/google/cloud/pubsublite/beam/Uuid.java | 6 +++++- 3 files changed, 27 insertions(+), 2 deletions(-) diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscribeTransform.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscribeTransform.java index b9f635f84..16559e6ea 100644 --- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscribeTransform.java +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscribeTransform.java @@ -86,6 +86,14 @@ private SubscriptionPartitionProcessor newPartitionProcessor( private RestrictionTracker newRestrictionTracker( SubscriptionPartition subscriptionPartition, OffsetRange initial) { checkSubscription(subscriptionPartition); + if (options.allowSmallBundlesForTesting()) { + return new OffsetByteRangeTracker( + initial, + options.getBacklogReader(subscriptionPartition.partition()), + Stopwatch.createUnstarted(), + Duration.ZERO, + 0); + } return new OffsetByteRangeTracker( initial, options.getBacklogReader(subscriptionPartition.partition()), diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java index 066c331b4..b6f305ed5 100644 --- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java @@ -69,6 +69,14 @@ public abstract class SubscriberOptions implements Serializable { */ public abstract Set partitions(); + /** + * FOR TESTING ONLY. + * + *

Allow smaller bundles to be generated. Note that this tears down the client each time and + * will lead to significantly lower overall throughput. + */ + public abstract boolean allowSmallBundlesForTesting(); + /** * A factory to override subscriber creation entirely and delegate to another method. Primarily * useful for testing. @@ -95,7 +103,10 @@ public abstract class SubscriberOptions implements Serializable { public static Builder newBuilder() { Builder builder = new AutoValue_SubscriberOptions.Builder(); - return builder.setPartitions(ImmutableSet.of()).setFlowControlSettings(DEFAULT_FLOW_CONTROL); + return builder + .setPartitions(ImmutableSet.of()) + .setFlowControlSettings(DEFAULT_FLOW_CONTROL) + .setAllowSmallBundlesForTesting(false); } public abstract Builder toBuilder(); @@ -188,6 +199,8 @@ public abstract static class Builder { public abstract Builder setFlowControlSettings(FlowControlSettings flowControlSettings); + public abstract Builder setAllowSmallBundlesForTesting(boolean allowSmallBundlesForTesting); + // Used in unit tests abstract Builder setSubscriberFactory(SubscriberFactory subscriberFactory); diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/Uuid.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/Uuid.java index b3ff52f54..1ddf6bd37 100644 --- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/Uuid.java +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/Uuid.java @@ -20,6 +20,7 @@ import com.google.protobuf.ByteString; import java.io.DataOutputStream; import java.io.IOException; +import java.util.Base64; import java.util.UUID; import org.apache.beam.sdk.coders.DefaultCoder; @@ -48,6 +49,9 @@ public static Uuid random() { } catch (IOException e) { throw new RuntimeException("Should never have an IOException since there is no io.", e); } - return Uuid.of(output.toByteString()); + // Encode to Base64 so the random UUIDs are valid if consumed from the Cloud Pub/Sub client. + return Uuid.of( + ByteString.copyFrom( + Base64.getEncoder().encode(output.toByteString().asReadOnlyByteBuffer()))); } } From bff816b9a2826311961228a324cb0fc6d2654484 Mon Sep 17 00:00:00 2001 From: Daniel Collins Date: Wed, 17 Mar 2021 15:22:33 -0400 Subject: [PATCH 2/2] fix: Two QOL issues with PubsubLiteIO - UUIDs generated randomly are not valid UTF-8 strings so they cannot be read from the CPS shim by default - Restrictions on bundle closing make it hard to test PSL with windowing pipelines --- .../cloud/pubsublite/beam/SubscribeTransform.java | 15 +++------------ .../cloud/pubsublite/beam/SubscriberOptions.java | 15 +++++++++------ 2 files changed, 12 insertions(+), 18 deletions(-) diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscribeTransform.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscribeTransform.java index 16559e6ea..dd9675e0e 100644 --- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscribeTransform.java +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscribeTransform.java @@ -43,8 +43,6 @@ import org.joda.time.Duration; class SubscribeTransform extends PTransform> { - private static final Duration MAX_SLEEP_TIME = Duration.standardMinutes(1); - private final SubscriberOptions options; SubscribeTransform(SubscriberOptions options) { @@ -86,19 +84,11 @@ private SubscriptionPartitionProcessor newPartitionProcessor( private RestrictionTracker newRestrictionTracker( SubscriptionPartition subscriptionPartition, OffsetRange initial) { checkSubscription(subscriptionPartition); - if (options.allowSmallBundlesForTesting()) { - return new OffsetByteRangeTracker( - initial, - options.getBacklogReader(subscriptionPartition.partition()), - Stopwatch.createUnstarted(), - Duration.ZERO, - 0); - } return new OffsetByteRangeTracker( initial, options.getBacklogReader(subscriptionPartition.partition()), Stopwatch.createUnstarted(), - MAX_SLEEP_TIME.multipliedBy(3).dividedBy(4), + options.minBundleTimeout(), LongMath.saturatedMultiply(options.flowControlSettings().bytesOutstanding(), 10)); } @@ -144,7 +134,8 @@ public PCollection expand(PBegin input) { return subscriptionPartitions.apply( ParDo.of( new PerSubscriptionPartitionSdf( - MAX_SLEEP_TIME, + // Ensure we read for at least 5 seconds more than the bundle timeout. + options.minBundleTimeout().plus(Duration.standardSeconds(5)), this::newInitialOffsetReader, this::newRestrictionTracker, this::newPartitionProcessor, diff --git a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java index b6f305ed5..69757efe8 100644 --- a/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java +++ b/pubsublite-beam-io/src/main/java/com/google/cloud/pubsublite/beam/SubscriberOptions.java @@ -42,6 +42,7 @@ import java.io.Serializable; import java.util.Set; import org.checkerframework.checker.nullness.qual.Nullable; +import org.joda.time.Duration; @AutoValue public abstract class SubscriberOptions implements Serializable { @@ -51,6 +52,8 @@ public abstract class SubscriberOptions implements Serializable { private static final long MEBIBYTE = 1L << 20; + private static final Duration MIN_BUNDLE_TIMEOUT = Duration.standardMinutes(1); + public static final FlowControlSettings DEFAULT_FLOW_CONTROL = FlowControlSettings.builder() .setMessagesOutstanding(Long.MAX_VALUE) @@ -70,12 +73,12 @@ public abstract class SubscriberOptions implements Serializable { public abstract Set partitions(); /** - * FOR TESTING ONLY. + * The minimum wall time to pass before allowing bundle closure. * - *

Allow smaller bundles to be generated. Note that this tears down the client each time and - * will lead to significantly lower overall throughput. + *

Setting this to too small of a value will result in increased compute costs and lower + * throughput per byte. Immediate timeouts (Duration.ZERO) may be useful for testing. */ - public abstract boolean allowSmallBundlesForTesting(); + public abstract Duration minBundleTimeout(); /** * A factory to override subscriber creation entirely and delegate to another method. Primarily @@ -106,7 +109,7 @@ public static Builder newBuilder() { return builder .setPartitions(ImmutableSet.of()) .setFlowControlSettings(DEFAULT_FLOW_CONTROL) - .setAllowSmallBundlesForTesting(false); + .setMinBundleTimeout(MIN_BUNDLE_TIMEOUT); } public abstract Builder toBuilder(); @@ -199,7 +202,7 @@ public abstract static class Builder { public abstract Builder setFlowControlSettings(FlowControlSettings flowControlSettings); - public abstract Builder setAllowSmallBundlesForTesting(boolean allowSmallBundlesForTesting); + public abstract Builder setMinBundleTimeout(Duration minBundleTimeout); // Used in unit tests abstract Builder setSubscriberFactory(SubscriberFactory subscriberFactory);