Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@
import org.joda.time.Duration;

class SubscribeTransform extends PTransform<PBegin, PCollection<SequencedMessage>> {
private static final Duration MAX_SLEEP_TIME = Duration.standardMinutes(1);

private final SubscriberOptions options;

SubscribeTransform(SubscriberOptions options) {
Expand Down Expand Up @@ -90,7 +88,7 @@ private RestrictionTracker<OffsetRange, OffsetByteProgress> newRestrictionTracke
initial,
options.getBacklogReader(subscriptionPartition.partition()),
Stopwatch.createUnstarted(),
MAX_SLEEP_TIME.multipliedBy(3).dividedBy(4),
options.minBundleTimeout(),
LongMath.saturatedMultiply(options.flowControlSettings().bytesOutstanding(), 10));
}

Expand Down Expand Up @@ -136,7 +134,8 @@ public PCollection<SequencedMessage> expand(PBegin input) {
return subscriptionPartitions.apply(
ParDo.of(
new PerSubscriptionPartitionSdf(
MAX_SLEEP_TIME,
// Ensure we read for at least 5 seconds more than the bundle timeout.
options.minBundleTimeout().plus(Duration.standardSeconds(5)),
this::newInitialOffsetReader,
this::newRestrictionTracker,
this::newPartitionProcessor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import java.io.Serializable;
import java.util.Set;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Duration;

@AutoValue
public abstract class SubscriberOptions implements Serializable {
Expand All @@ -51,6 +52,8 @@ public abstract class SubscriberOptions implements Serializable {

private static final long MEBIBYTE = 1L << 20;

private static final Duration MIN_BUNDLE_TIMEOUT = Duration.standardMinutes(1);

public static final FlowControlSettings DEFAULT_FLOW_CONTROL =
FlowControlSettings.builder()
.setMessagesOutstanding(Long.MAX_VALUE)
Expand All @@ -69,6 +72,14 @@ public abstract class SubscriberOptions implements Serializable {
*/
public abstract Set<Partition> partitions();

/**
* The minimum wall time to pass before allowing bundle closure.
*
* <p>Setting this to too small of a value will result in increased compute costs and lower
* throughput per byte. Immediate timeouts (Duration.ZERO) may be useful for testing.
*/
public abstract Duration minBundleTimeout();

/**
* A factory to override subscriber creation entirely and delegate to another method. Primarily
* useful for testing.
Expand All @@ -95,7 +106,10 @@ public abstract class SubscriberOptions implements Serializable {

public static Builder newBuilder() {
Builder builder = new AutoValue_SubscriberOptions.Builder();
return builder.setPartitions(ImmutableSet.of()).setFlowControlSettings(DEFAULT_FLOW_CONTROL);
return builder
.setPartitions(ImmutableSet.of())
.setFlowControlSettings(DEFAULT_FLOW_CONTROL)
.setMinBundleTimeout(MIN_BUNDLE_TIMEOUT);
}

public abstract Builder toBuilder();
Expand Down Expand Up @@ -188,6 +202,8 @@ public abstract static class Builder {

public abstract Builder setFlowControlSettings(FlowControlSettings flowControlSettings);

public abstract Builder setMinBundleTimeout(Duration minBundleTimeout);

// Used in unit tests
abstract Builder setSubscriberFactory(SubscriberFactory subscriberFactory);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.protobuf.ByteString;
import java.io.DataOutputStream;
import java.io.IOException;
import java.util.Base64;
import java.util.UUID;
import org.apache.beam.sdk.coders.DefaultCoder;

Expand Down Expand Up @@ -48,6 +49,9 @@ public static Uuid random() {
} catch (IOException e) {
throw new RuntimeException("Should never have an IOException since there is no io.", e);
}
return Uuid.of(output.toByteString());
// Encode to Base64 so the random UUIDs are valid if consumed from the Cloud Pub/Sub client.
return Uuid.of(
ByteString.copyFrom(
Base64.getEncoder().encode(output.toByteString().asReadOnlyByteBuffer())));
}
}