From 8a646aaa95e79a3f33dff204a659c8a221069ffe Mon Sep 17 00:00:00 2001 From: dpcollins-google <40498610+dpcollins-google@users.noreply.github.com> Date: Fri, 10 Sep 2021 14:30:21 -0400 Subject: [PATCH] Port changes from Pub/Sub Lite to beam (#15418) * Port all changes from the Pub/Sub Lite repo back to beam. Beam will be the canonical source for this IO in the future per offline discussion. Also add the other direction of helper to CloudPubsubTransforms and add an integration test. * remove fixed TODO * Fixes to ReadWriteIT to work around Create or DirectRunner bug. * fix racy test --- .../beam/gradle/BeamModulePlugin.groovy | 2 +- .../sdk/io/gcp/pubsub/PubsubMessages.java | 42 ++- .../io/gcp/pubsublite/CloudPubsubChecks.java | 51 ---- .../gcp/pubsublite/CloudPubsubTransforms.java | 104 +++++++ .../ManagedBacklogReaderFactory.java | 33 +++ .../ManagedBacklogReaderFactoryImpl.java | 68 +++++ .../io/gcp/pubsublite/OffsetByteRange.java | 38 +++ .../gcp/pubsublite/OffsetByteRangeCoder.java | 63 ++++ .../pubsublite/OffsetByteRangeTracker.java | 66 ++--- .../pubsublite/PerServerPublisherCache.java | 4 + .../PerSubscriptionPartitionSdf.java | 57 ++-- .../sdk/io/gcp/pubsublite/PublisherCache.java | 62 ++-- .../sdk/io/gcp/pubsublite/Publishers.java | 72 ++++- .../sdk/io/gcp/pubsublite/PubsubLiteIO.java | 2 +- .../sdk/io/gcp/pubsublite/PubsubLiteSink.java | 10 +- .../io/gcp/pubsublite/SubscribeTransform.java | 44 ++- .../io/gcp/pubsublite/SubscriberOptions.java | 26 +- .../SubscriptionPartitionLoader.java | 4 +- ...SubscriptionPartitionProcessorFactory.java | 3 +- .../SubscriptionPartitionProcessorImpl.java | 23 +- .../TopicBacklogReaderSettings.java | 6 +- .../gcp/pubsublite/TrackerWithProgress.java | 24 ++ .../OffsetByteRangeTrackerTest.java | 34 ++- .../PerSubscriptionPartitionSdfTest.java | 70 ++++- .../sdk/io/gcp/pubsublite/ReadWriteIT.java | 280 ++++++++++++++++++ ...ubscriptionPartitionProcessorImplTest.java | 49 +-- 26 files changed, 939 insertions(+), 298 deletions(-) delete mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubChecks.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubTransforms.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/ManagedBacklogReaderFactory.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/ManagedBacklogReaderFactoryImpl.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRange.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeCoder.java create mode 100644 sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TrackerWithProgress.java create mode 100644 sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java diff --git a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy index 4cdf4ae8f98fc..67fe8b894c681 100644 --- a/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy +++ b/buildSrc/src/main/groovy/org/apache/beam/gradle/BeamModulePlugin.groovy @@ -446,7 +446,7 @@ class BeamModulePlugin implements Plugin { def errorprone_version = "2.3.4" def google_clients_version = "1.32.1" def google_cloud_bigdataoss_version = "2.2.2" - def google_cloud_pubsublite_version = "0.13.2" + def google_cloud_pubsublite_version = "1.0.4" def google_code_gson_version = "2.8.6" def google_oauth_clients_version = "1.31.0" // Try to keep grpc_version consistent with gRPC version in google_cloud_platform_libraries_bom diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java index b0cc68193184f..bf6a28863b01c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessages.java @@ -24,25 +24,36 @@ import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap; /** Common util functions for converting between PubsubMessage proto and {@link PubsubMessage}. */ -public class PubsubMessages { +public final class PubsubMessages { + private PubsubMessages() {} + + public static com.google.pubsub.v1.PubsubMessage toProto(PubsubMessage input) { + Map attributes = input.getAttributeMap(); + com.google.pubsub.v1.PubsubMessage.Builder message = + com.google.pubsub.v1.PubsubMessage.newBuilder() + .setData(ByteString.copyFrom(input.getPayload())); + // TODO(BEAM-8085) this should not be null + if (attributes != null) { + message.putAllAttributes(attributes); + } + String messageId = input.getMessageId(); + if (messageId != null) { + message.setMessageId(messageId); + } + return message.build(); + } + + public static PubsubMessage fromProto(com.google.pubsub.v1.PubsubMessage input) { + return new PubsubMessage( + input.getData().toByteArray(), input.getAttributesMap(), input.getMessageId()); + } + // Convert the PubsubMessage to a PubsubMessage proto, then return its serialized representation. public static class ParsePayloadAsPubsubMessageProto implements SerializableFunction { @Override public byte[] apply(PubsubMessage input) { - Map attributes = input.getAttributeMap(); - com.google.pubsub.v1.PubsubMessage.Builder message = - com.google.pubsub.v1.PubsubMessage.newBuilder() - .setData(ByteString.copyFrom(input.getPayload())); - // TODO(BEAM-8085) this should not be null - if (attributes != null) { - message.putAllAttributes(attributes); - } - String messageId = input.getMessageId(); - if (messageId != null) { - message.setMessageId(messageId); - } - return message.build().toByteArray(); + return toProto(input).toByteArray(); } } @@ -54,8 +65,7 @@ public PubsubMessage apply(byte[] input) { try { com.google.pubsub.v1.PubsubMessage message = com.google.pubsub.v1.PubsubMessage.parseFrom(input); - return new PubsubMessage( - message.getData().toByteArray(), message.getAttributesMap(), message.getMessageId()); + return fromProto(message); } catch (InvalidProtocolBufferException e) { throw new RuntimeException("Could not decode Pubsub message", e); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubChecks.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubChecks.java deleted file mode 100644 index 6dc15166666ab..0000000000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubChecks.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.pubsublite; - -import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsPublishTransformer; - -import com.google.cloud.pubsublite.Message; -import com.google.cloud.pubsublite.proto.PubSubMessage; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TypeDescriptor; - -/** - * A class providing a conversion validity check between Cloud Pub/Sub and Pub/Sub Lite message - * types. - */ -public final class CloudPubsubChecks { - private CloudPubsubChecks() {} - - /** - * Ensure that all messages that pass through can be converted to Cloud Pub/Sub messages using the - * standard transformation methods in the client library. - * - *

Will fail the pipeline if a message has multiple attributes per key. - */ - public static PTransform, PCollection> - ensureUsableAsCloudPubsub() { - return MapElements.into(TypeDescriptor.of(PubSubMessage.class)) - .via( - message -> { - Object unused = toCpsPublishTransformer().transform(Message.fromProto(message)); - return message; - }); - } -} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubTransforms.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubTransforms.java new file mode 100644 index 0000000000000..1140c11c2767f --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/CloudPubsubTransforms.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsublite; + +import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.fromCpsPublishTransformer; +import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsPublishTransformer; +import static com.google.cloud.pubsublite.cloudpubsub.MessageTransforms.toCpsSubscribeTransformer; + +import com.google.cloud.pubsublite.Message; +import com.google.cloud.pubsublite.cloudpubsub.KeyExtractor; +import com.google.cloud.pubsublite.proto.PubSubMessage; +import com.google.cloud.pubsublite.proto.SequencedMessage; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessages; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TypeDescriptor; + +/** A class providing transforms between Cloud Pub/Sub and Pub/Sub Lite message types. */ +public final class CloudPubsubTransforms { + private CloudPubsubTransforms() {} + /** + * Ensure that all messages that pass through can be converted to Cloud Pub/Sub messages using the + * standard transformation methods in the client library. + * + *

Will fail the pipeline if a message has multiple attributes per key. + */ + public static PTransform, PCollection> + ensureUsableAsCloudPubsub() { + return new PTransform, PCollection>() { + @Override + public PCollection expand(PCollection input) { + return input.apply( + MapElements.into(TypeDescriptor.of(PubSubMessage.class)) + .via( + message -> { + Object unused = + toCpsPublishTransformer().transform(Message.fromProto(message)); + return message; + })); + } + }; + } + + /** + * Transform messages read from Pub/Sub Lite to their equivalent Cloud Pub/Sub Message that would + * have been read from PubsubIO. + * + *

Will fail the pipeline if a message has multiple attributes per map key. + */ + public static PTransform, PCollection> + toCloudPubsubMessages() { + return new PTransform, PCollection>() { + @Override + public PCollection expand(PCollection input) { + return input.apply( + MapElements.into(TypeDescriptor.of(PubsubMessage.class)) + .via( + message -> + PubsubMessages.fromProto( + toCpsSubscribeTransformer() + .transform( + com.google.cloud.pubsublite.SequencedMessage.fromProto( + message))))); + } + }; + } + + /** + * Transform messages publishable using PubsubIO to their equivalent Pub/Sub Lite publishable + * message. + */ + public static PTransform, PCollection> + fromCloudPubsubMessages() { + return new PTransform, PCollection>() { + @Override + public PCollection expand(PCollection input) { + return input.apply( + MapElements.into(TypeDescriptor.of(PubSubMessage.class)) + .via( + message -> + fromCpsPublishTransformer(KeyExtractor.DEFAULT) + .transform(PubsubMessages.toProto(message)) + .toProto())); + } + }; + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/ManagedBacklogReaderFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/ManagedBacklogReaderFactory.java new file mode 100644 index 0000000000000..de0cf433ff33a --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/ManagedBacklogReaderFactory.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsublite; + +import java.io.Serializable; + +/** + * A ManagedBacklogReaderFactory produces TopicBacklogReaders and tears down any produced readers + * when it is itself closed. + * + *

close() should never be called on produced readers. + */ +public interface ManagedBacklogReaderFactory extends AutoCloseable, Serializable { + TopicBacklogReader newReader(SubscriptionPartition subscriptionPartition); + + @Override + void close(); +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/ManagedBacklogReaderFactoryImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/ManagedBacklogReaderFactoryImpl.java new file mode 100644 index 0000000000000..9a337bfdb7841 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/ManagedBacklogReaderFactoryImpl.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsublite; + +import com.google.api.gax.rpc.ApiException; +import com.google.cloud.pubsublite.Offset; +import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse; +import java.util.HashMap; +import java.util.Map; +import javax.annotation.concurrent.GuardedBy; +import org.apache.beam.sdk.transforms.SerializableFunction; + +public class ManagedBacklogReaderFactoryImpl implements ManagedBacklogReaderFactory { + private final SerializableFunction newReader; + + @GuardedBy("this") + private final Map readers = new HashMap<>(); + + ManagedBacklogReaderFactoryImpl( + SerializableFunction newReader) { + this.newReader = newReader; + } + + private static final class NonCloseableTopicBacklogReader implements TopicBacklogReader { + private final TopicBacklogReader underlying; + + NonCloseableTopicBacklogReader(TopicBacklogReader underlying) { + this.underlying = underlying; + } + + @Override + public ComputeMessageStatsResponse computeMessageStats(Offset offset) throws ApiException { + return underlying.computeMessageStats(offset); + } + + @Override + public void close() { + throw new IllegalArgumentException( + "Cannot call close() on a reader returned from ManagedBacklogReaderFactory."); + } + } + + @Override + public synchronized TopicBacklogReader newReader(SubscriptionPartition subscriptionPartition) { + return new NonCloseableTopicBacklogReader( + readers.computeIfAbsent(subscriptionPartition, newReader::apply)); + } + + @Override + public synchronized void close() { + readers.values().forEach(TopicBacklogReader::close); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRange.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRange.java new file mode 100644 index 0000000000000..b39d87e6e1f00 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRange.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsublite; + +import com.google.auto.value.AutoValue; +import org.apache.beam.sdk.coders.DefaultCoder; +import org.apache.beam.sdk.io.range.OffsetRange; + +@AutoValue +@DefaultCoder(OffsetByteRangeCoder.class) +abstract class OffsetByteRange { + abstract OffsetRange getRange(); + + abstract long getByteCount(); + + static OffsetByteRange of(OffsetRange range, long byteCount) { + return new AutoValue_OffsetByteRange(range, byteCount); + } + + static OffsetByteRange of(OffsetRange range) { + return of(range, 0); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeCoder.java new file mode 100644 index 0000000000000..076cda13e1936 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeCoder.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsublite; + +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import org.apache.beam.sdk.coders.AtomicCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderProvider; +import org.apache.beam.sdk.coders.CoderProviders; +import org.apache.beam.sdk.coders.DelegateCoder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.io.range.OffsetRange; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TypeDescriptor; + +public class OffsetByteRangeCoder extends AtomicCoder { + private static final Coder CODER = + DelegateCoder.of( + KvCoder.of(OffsetRange.Coder.of(), VarLongCoder.of()), + OffsetByteRangeCoder::toKv, + OffsetByteRangeCoder::fromKv); + + private static KV toKv(OffsetByteRange value) { + return KV.of(value.getRange(), value.getByteCount()); + } + + private static OffsetByteRange fromKv(KV kv) { + return OffsetByteRange.of(kv.getKey(), kv.getValue()); + } + + @Override + public void encode(OffsetByteRange value, OutputStream outStream) throws IOException { + CODER.encode(value, outStream); + } + + @Override + public OffsetByteRange decode(InputStream inStream) throws IOException { + return CODER.decode(inStream); + } + + public static CoderProvider getCoderProvider() { + return CoderProviders.forCoder( + TypeDescriptor.of(OffsetByteRange.class), new OffsetByteRangeCoder()); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTracker.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTracker.java index 608af8fea1896..da9aaaa03ac31 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTracker.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTracker.java @@ -26,8 +26,6 @@ import java.util.concurrent.TimeUnit; import javax.annotation.Nullable; import org.apache.beam.sdk.io.range.OffsetRange; -import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; -import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress; import org.apache.beam.sdk.transforms.splittabledofn.SplitResult; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Stopwatch; import org.joda.time.Duration; @@ -44,35 +42,33 @@ * received. IMPORTANT: minTrackingTime must be strictly smaller than the SDF read timeout when it * would return ProcessContinuation.resume(). */ -class OffsetByteRangeTracker extends RestrictionTracker - implements HasProgress { - private final TopicBacklogReader backlogReader; +class OffsetByteRangeTracker extends TrackerWithProgress { + private final TopicBacklogReader unownedBacklogReader; private final Duration minTrackingTime; private final long minBytesReceived; private final Stopwatch stopwatch; - private OffsetRange range; + private OffsetByteRange range; private @Nullable Long lastClaimed; - private long byteCount = 0; public OffsetByteRangeTracker( - OffsetRange range, - TopicBacklogReader backlogReader, + OffsetByteRange range, + TopicBacklogReader unownedBacklogReader, Stopwatch stopwatch, Duration minTrackingTime, long minBytesReceived) { - checkArgument(range.getTo() == Long.MAX_VALUE); - this.backlogReader = backlogReader; + checkArgument( + range.getRange().getTo() == Long.MAX_VALUE, + "May only construct OffsetByteRangeTracker with an unbounded range with no progress."); + checkArgument( + range.getByteCount() == 0L, + "May only construct OffsetByteRangeTracker with an unbounded range with no progress."); + this.unownedBacklogReader = unownedBacklogReader; this.minTrackingTime = minTrackingTime; this.minBytesReceived = minBytesReceived; this.stopwatch = stopwatch.reset().start(); this.range = range; } - @Override - public void finalize() { - this.backlogReader.close(); - } - @Override public IsBounded isBounded() { return IsBounded.UNBOUNDED; @@ -87,32 +83,32 @@ public boolean tryClaim(OffsetByteProgress position) { position.lastOffset().value(), lastClaimed); checkArgument( - toClaim >= range.getFrom(), + toClaim >= range.getRange().getFrom(), "Trying to claim offset %s before start of the range %s", toClaim, range); // split() has already been called, truncating this range. No more offsets may be claimed. - if (range.getTo() != Long.MAX_VALUE) { - boolean isRangeEmpty = range.getTo() == range.getFrom(); - boolean isValidClosedRange = nextOffset() == range.getTo(); + if (range.getRange().getTo() != Long.MAX_VALUE) { + boolean isRangeEmpty = range.getRange().getTo() == range.getRange().getFrom(); + boolean isValidClosedRange = nextOffset() == range.getRange().getTo(); checkState( isRangeEmpty || isValidClosedRange, "Violated class precondition: offset range improperly split. Please report a beam bug."); return false; } lastClaimed = toClaim; - byteCount += position.batchBytes(); + range = OffsetByteRange.of(range.getRange(), range.getByteCount() + position.batchBytes()); return true; } @Override - public OffsetRange currentRestriction() { + public OffsetByteRange currentRestriction() { return range; } private long nextOffset() { checkState(lastClaimed == null || lastClaimed < Long.MAX_VALUE); - return lastClaimed == null ? currentRestriction().getFrom() : lastClaimed + 1; + return lastClaimed == null ? currentRestriction().getRange().getFrom() : lastClaimed + 1; } /** @@ -124,29 +120,33 @@ private boolean receivedEnough() { if (duration.isLongerThan(minTrackingTime)) { return true; } - if (byteCount >= minBytesReceived) { + if (currentRestriction().getByteCount() >= minBytesReceived) { return true; } return false; } @Override - public @Nullable SplitResult trySplit(double fractionOfRemainder) { + public @Nullable SplitResult trySplit(double fractionOfRemainder) { // Cannot split a bounded range. This should already be completely claimed. - if (range.getTo() != Long.MAX_VALUE) { + if (range.getRange().getTo() != Long.MAX_VALUE) { return null; } if (!receivedEnough()) { return null; } - range = new OffsetRange(currentRestriction().getFrom(), nextOffset()); - return SplitResult.of(this.range, new OffsetRange(nextOffset(), Long.MAX_VALUE)); + range = + OffsetByteRange.of( + new OffsetRange(currentRestriction().getRange().getFrom(), nextOffset()), + range.getByteCount()); + return SplitResult.of( + this.range, OffsetByteRange.of(new OffsetRange(nextOffset(), Long.MAX_VALUE), 0)); } @Override @SuppressWarnings("unboxing.of.nullable") public void checkDone() throws IllegalStateException { - if (range.getFrom() == range.getTo()) { + if (range.getRange().getFrom() == range.getRange().getTo()) { return; } checkState( @@ -155,18 +155,18 @@ public void checkDone() throws IllegalStateException { range); long lastClaimedNotNull = checkNotNull(lastClaimed); checkState( - lastClaimedNotNull >= range.getTo() - 1, + lastClaimedNotNull >= range.getRange().getTo() - 1, "Last attempted offset was %s in range %s, claiming work in [%s, %s) was not attempted", lastClaimedNotNull, range, lastClaimedNotNull + 1, - range.getTo()); + range.getRange().getTo()); } @Override public Progress getProgress() { ComputeMessageStatsResponse stats = - this.backlogReader.computeMessageStats(Offset.of(nextOffset())); - return Progress.from(byteCount, stats.getMessageBytes()); + this.unownedBacklogReader.computeMessageStats(Offset.of(nextOffset())); + return Progress.from(range.getByteCount(), stats.getMessageBytes()); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerServerPublisherCache.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerServerPublisherCache.java index 623e20c09b458..d7526d88e0891 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerServerPublisherCache.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerServerPublisherCache.java @@ -27,4 +27,8 @@ final class PerServerPublisherCache { private PerServerPublisherCache() {} static final PublisherCache PUBLISHER_CACHE = new PublisherCache(); + + static { + Runtime.getRuntime().addShutdownHook(new Thread(PUBLISHER_CACHE::close)); + } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdf.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdf.java index a9f7a439f0daa..fdf7920298632 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdf.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdf.java @@ -17,13 +17,12 @@ */ package org.apache.beam.sdk.io.gcp.pubsublite; -import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical; -import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; +import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.blockingShutdown; import com.google.cloud.pubsublite.Offset; +import com.google.cloud.pubsublite.internal.ExtractStatus; import com.google.cloud.pubsublite.internal.wire.Committer; import com.google.cloud.pubsublite.proto.SequencedMessage; -import java.util.concurrent.ExecutionException; import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.SerializableBiFunction; @@ -35,31 +34,35 @@ class PerSubscriptionPartitionSdf extends DoFn { private final Duration maxSleepTime; + private final ManagedBacklogReaderFactory backlogReaderFactory; private final SubscriptionPartitionProcessorFactory processorFactory; private final SerializableFunction offsetReaderFactory; - private final SerializableBiFunction< - SubscriptionPartition, OffsetRange, RestrictionTracker> + private final SerializableBiFunction trackerFactory; private final SerializableFunction committerFactory; PerSubscriptionPartitionSdf( Duration maxSleepTime, + ManagedBacklogReaderFactory backlogReaderFactory, SerializableFunction offsetReaderFactory, - SerializableBiFunction< - SubscriptionPartition, - OffsetRange, - RestrictionTracker> + SerializableBiFunction trackerFactory, SubscriptionPartitionProcessorFactory processorFactory, SerializableFunction committerFactory) { this.maxSleepTime = maxSleepTime; + this.backlogReaderFactory = backlogReaderFactory; this.processorFactory = processorFactory; this.offsetReaderFactory = offsetReaderFactory; this.trackerFactory = trackerFactory; this.committerFactory = committerFactory; } + @Teardown + public void teardown() { + backlogReaderFactory.close(); + } + @GetInitialWatermarkEstimatorState public Instant getInitialWatermarkState() { return Instant.EPOCH; @@ -72,7 +75,7 @@ public MonotonicallyIncreasing newWatermarkEstimator(@WatermarkEstimatorState In @ProcessElement public ProcessContinuation processElement( - RestrictionTracker tracker, + RestrictionTracker tracker, @Element SubscriptionPartition subscriptionPartition, OutputReceiver receiver) throws Exception { @@ -83,38 +86,44 @@ public ProcessContinuation processElement( processor .lastClaimed() .ifPresent( - lastClaimedOffset -> - /* TODO(boyuanzz): When default dataflow can use finalizers, undo this. - finalizer.afterBundleCommit( - Instant.ofEpochMilli(Long.MAX_VALUE), - () -> */ { + lastClaimedOffset -> { Committer committer = committerFactory.apply(subscriptionPartition); committer.startAsync().awaitRunning(); // Commit the next-to-deliver offset. try { committer.commitOffset(Offset.of(lastClaimedOffset.value() + 1)).get(); - } catch (ExecutionException e) { - throw toCanonical(checkArgumentNotNull(e.getCause())).underlying; } catch (Exception e) { - throw toCanonical(e).underlying; + throw ExtractStatus.toCanonical(e).underlying; } - committer.stopAsync().awaitTerminated(); + blockingShutdown(committer); }); return result; } } @GetInitialRestriction - public OffsetRange getInitialRestriction(@Element SubscriptionPartition subscriptionPartition) { + public OffsetByteRange getInitialRestriction( + @Element SubscriptionPartition subscriptionPartition) { try (InitialOffsetReader reader = offsetReaderFactory.apply(subscriptionPartition)) { Offset offset = reader.read(); - return new OffsetRange(offset.value(), Long.MAX_VALUE /* open interval */); + return OffsetByteRange.of( + new OffsetRange(offset.value(), Long.MAX_VALUE /* open interval */)); } } @NewTracker - public RestrictionTracker newTracker( - @Element SubscriptionPartition subscriptionPartition, @Restriction OffsetRange range) { - return trackerFactory.apply(subscriptionPartition, range); + public TrackerWithProgress newTracker( + @Element SubscriptionPartition subscriptionPartition, @Restriction OffsetByteRange range) { + return trackerFactory.apply(backlogReaderFactory.newReader(subscriptionPartition), range); + } + + @GetSize + public double getSize( + @Element SubscriptionPartition subscriptionPartition, + @Restriction OffsetByteRange restriction) { + if (restriction.getRange().getTo() != Long.MAX_VALUE) { + return restriction.getByteCount(); + } + return newTracker(subscriptionPartition, restriction).getProgress().getWorkRemaining(); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PublisherCache.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PublisherCache.java index f8dc24baa7d6d..3dbdec69db99c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PublisherCache.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PublisherCache.java @@ -23,52 +23,50 @@ import com.google.api.core.ApiService.State; import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsublite.MessageMetadata; -import com.google.cloud.pubsublite.internal.CloseableMonitor; import com.google.cloud.pubsublite.internal.Publisher; +import com.google.cloud.pubsublite.internal.wire.SystemExecutors; import com.google.errorprone.annotations.concurrent.GuardedBy; import java.util.HashMap; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting; /** A map of working publishers by PublisherOptions. */ -class PublisherCache { - private final CloseableMonitor monitor = new CloseableMonitor(); - - private final Executor listenerExecutor = Executors.newSingleThreadExecutor(); - - @GuardedBy("monitor.monitor") +class PublisherCache implements AutoCloseable { + @GuardedBy("this") private final HashMap> livePublishers = new HashMap<>(); - Publisher get(PublisherOptions options) throws ApiException { + private synchronized void evict(PublisherOptions options) { + livePublishers.remove(options); + } + + synchronized Publisher get(PublisherOptions options) throws ApiException { checkArgument(options.usesCache()); - try (CloseableMonitor.Hold h = monitor.enter()) { - Publisher publisher = livePublishers.get(options); - if (publisher != null) { - return publisher; - } - publisher = Publishers.newPublisher(options); - livePublishers.put(options, publisher); - publisher.addListener( - new Listener() { - @Override - public void failed(State s, Throwable t) { - try (CloseableMonitor.Hold h = monitor.enter()) { - livePublishers.remove(options); - } - } - }, - listenerExecutor); - publisher.startAsync().awaitRunning(); + Publisher publisher = livePublishers.get(options); + if (publisher != null) { return publisher; } + publisher = Publishers.newPublisher(options); + livePublishers.put(options, publisher); + publisher.addListener( + new Listener() { + @Override + public void failed(State s, Throwable t) { + evict(options); + } + }, + SystemExecutors.getFuturesExecutor()); + publisher.startAsync().awaitRunning(); + return publisher; } @VisibleForTesting - void set(PublisherOptions options, Publisher toCache) { - try (CloseableMonitor.Hold h = monitor.enter()) { - livePublishers.put(options, toCache); - } + synchronized void set(PublisherOptions options, Publisher toCache) { + livePublishers.put(options, toCache); + } + + @Override + public synchronized void close() { + livePublishers.forEach(((options, publisher) -> publisher.stopAsync())); + livePublishers.clear(); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/Publishers.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/Publishers.java index 34012f72db15b..67ea6cf6062d1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/Publishers.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/Publishers.java @@ -17,17 +17,27 @@ */ package org.apache.beam.sdk.io.gcp.pubsublite; +import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical; import static com.google.cloud.pubsublite.internal.UncheckedApiPreconditions.checkArgument; +import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultMetadata; +import static com.google.cloud.pubsublite.internal.wire.ServiceClients.addDefaultSettings; import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsublite.AdminClient; import com.google.cloud.pubsublite.AdminClientSettings; import com.google.cloud.pubsublite.MessageMetadata; -import com.google.cloud.pubsublite.TopicPath; +import com.google.cloud.pubsublite.Partition; +import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings; import com.google.cloud.pubsublite.internal.Publisher; import com.google.cloud.pubsublite.internal.wire.PartitionCountWatchingPublisherSettings; +import com.google.cloud.pubsublite.internal.wire.PubsubContext; import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework; +import com.google.cloud.pubsublite.internal.wire.RoutingMetadata; import com.google.cloud.pubsublite.internal.wire.SinglePartitionPublisherBuilder; +import com.google.cloud.pubsublite.v1.AdminServiceClient; +import com.google.cloud.pubsublite.v1.AdminServiceSettings; +import com.google.cloud.pubsublite.v1.PublisherServiceClient; +import com.google.cloud.pubsublite.v1.PublisherServiceSettings; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken; class Publishers { @@ -35,6 +45,38 @@ class Publishers { private Publishers() {} + private static AdminClient newAdminClient(PublisherOptions options) throws ApiException { + try { + return AdminClient.create( + AdminClientSettings.newBuilder() + .setServiceClient( + AdminServiceClient.create( + addDefaultSettings( + options.topicPath().location().extractRegion(), + AdminServiceSettings.newBuilder()))) + .setRegion(options.topicPath().location().extractRegion()) + .build()); + } catch (Throwable t) { + throw toCanonical(t).underlying; + } + } + + private static PublisherServiceClient newServiceClient( + PublisherOptions options, Partition partition) { + PublisherServiceSettings.Builder settingsBuilder = PublisherServiceSettings.newBuilder(); + settingsBuilder = + addDefaultMetadata( + PubsubContext.of(FRAMEWORK), + RoutingMetadata.of(options.topicPath(), partition), + settingsBuilder); + try { + return PublisherServiceClient.create( + addDefaultSettings(options.topicPath().location().extractRegion(), settingsBuilder)); + } catch (Throwable t) { + throw toCanonical(t).underlying; + } + } + @SuppressWarnings("unchecked") static Publisher newPublisher(PublisherOptions options) throws ApiException { SerializableSupplier supplier = options.publisherSupplier(); @@ -44,20 +86,18 @@ static Publisher newPublisher(PublisherOptions options) throws checkArgument(token.isSupertypeOf(supplied.getClass())); return (Publisher) supplied; } - - TopicPath topic = options.topicPath(); - PartitionCountWatchingPublisherSettings.Builder publisherSettings = - PartitionCountWatchingPublisherSettings.newBuilder() - .setTopic(topic) - .setPublisherFactory( - partition -> - SinglePartitionPublisherBuilder.newBuilder() - .setTopic(topic) - .setPartition(partition) - .build()) - .setAdminClient( - AdminClient.create( - AdminClientSettings.newBuilder().setRegion(topic.location().region()).build())); - return publisherSettings.build().instantiate(); + return PartitionCountWatchingPublisherSettings.newBuilder() + .setTopic(options.topicPath()) + .setPublisherFactory( + partition -> + SinglePartitionPublisherBuilder.newBuilder() + .setTopic(options.topicPath()) + .setPartition(partition) + .setServiceClient(newServiceClient(options, partition)) + .setBatchingSettings(PublisherSettings.DEFAULT_BATCHING_SETTINGS) + .build()) + .setAdminClient(newAdminClient(options)) + .build() + .instantiate(); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteIO.java index ca1f2be416997..b93ac61f33be9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteIO.java @@ -107,7 +107,7 @@ public static PTransform, PCollection> * } */ public static PTransform, PDone> write(PublisherOptions options) { - return new PTransform, PDone>("PubsubLiteIO") { + return new PTransform, PDone>() { @Override public PDone expand(PCollection input) { PubsubLiteSink sink = new PubsubLiteSink(options); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteSink.java index d3acdfa35e054..d0e3afa2ac07b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteSink.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/PubsubLiteSink.java @@ -28,16 +28,14 @@ import com.google.cloud.pubsublite.internal.CheckedApiException; import com.google.cloud.pubsublite.internal.ExtractStatus; import com.google.cloud.pubsublite.internal.Publisher; +import com.google.cloud.pubsublite.internal.wire.SystemExecutors; import com.google.cloud.pubsublite.proto.PubSubMessage; import com.google.errorprone.annotations.concurrent.GuardedBy; import java.util.ArrayDeque; import java.util.Deque; -import java.util.concurrent.Executor; -import java.util.concurrent.Executors; import java.util.function.Consumer; import org.apache.beam.sdk.io.gcp.pubsublite.PublisherOrError.Kind; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors; /** A sink which publishes messages to Pub/Sub Lite. */ @SuppressWarnings({ @@ -56,8 +54,6 @@ class PubsubLiteSink extends DoFn { @GuardedBy("this") private transient Deque errorsSinceLastFinish; - private static final Executor executor = Executors.newCachedThreadPool(); - PubsubLiteSink(PublisherOptions options) { this.options = options; } @@ -89,7 +85,7 @@ public void failed(State s, Throwable t) { onFailure.accept(t); } }, - MoreExecutors.directExecutor()); + SystemExecutors.getFuturesExecutor()); if (!options.usesCache()) { publisher.startAsync(); } @@ -130,7 +126,7 @@ public void onFailure(Throwable t) { onFailure.accept(t); } }, - executor); + SystemExecutors.getFuturesExecutor()); } // Intentionally don't flush on bundle finish to allow multi-sink client reuse. diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscribeTransform.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscribeTransform.java index 9875880584cae..b6a9f5d590901 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscribeTransform.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscribeTransform.java @@ -23,6 +23,7 @@ import com.google.api.gax.rpc.ApiException; import com.google.cloud.pubsublite.AdminClient; import com.google.cloud.pubsublite.AdminClientSettings; +import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.TopicPath; import com.google.cloud.pubsublite.internal.wire.Committer; @@ -31,8 +32,6 @@ import java.util.List; import java.util.function.Consumer; import java.util.stream.Collectors; -import org.apache.beam.sdk.io.range.OffsetRange; -import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; @@ -54,10 +53,11 @@ private void checkSubscription(SubscriptionPartition subscriptionPartition) thro checkArgument(subscriptionPartition.subscription().equals(options.subscriptionPath())); } - private Subscriber newSubscriber(Partition partition, Consumer> consumer) { + private Subscriber newSubscriber( + Partition partition, Offset initialOffset, Consumer> consumer) { try { return options - .getSubscriberFactory(partition) + .getSubscriberFactory(partition, initialOffset) .newSubscriber( messages -> consumer.accept( @@ -71,23 +71,31 @@ private Subscriber newSubscriber(Partition partition, Consumer tracker, + RestrictionTracker tracker, OutputReceiver receiver) throws ApiException { checkSubscription(subscriptionPartition); return new SubscriptionPartitionProcessorImpl( tracker, receiver, - consumer -> newSubscriber(subscriptionPartition.partition(), consumer), + consumer -> + newSubscriber( + subscriptionPartition.partition(), + Offset.of(tracker.currentRestriction().getRange().getFrom()), + consumer), options.flowControlSettings()); } - private RestrictionTracker newRestrictionTracker( - SubscriptionPartition subscriptionPartition, OffsetRange initial) { + private TopicBacklogReader newBacklogReader(SubscriptionPartition subscriptionPartition) { checkSubscription(subscriptionPartition); + return options.getBacklogReader(subscriptionPartition.partition()); + } + + private TrackerWithProgress newRestrictionTracker( + TopicBacklogReader backlogReader, OffsetByteRange initial) { return new OffsetByteRangeTracker( initial, - options.getBacklogReader(subscriptionPartition.partition()), + backlogReader, Stopwatch.createUnstarted(), options.minBundleTimeout(), LongMath.saturatedMultiply(options.flowControlSettings().bytesOutstanding(), 10)); @@ -107,7 +115,7 @@ private TopicPath getTopicPath() { try (AdminClient admin = AdminClient.create( AdminClientSettings.newBuilder() - .setRegion(options.subscriptionPath().location().region()) + .setRegion(options.subscriptionPath().location().extractRegion()) .build())) { return TopicPath.parse(admin.getSubscription(options.subscriptionPath()).get().getTopic()); } catch (Throwable t) { @@ -118,25 +126,15 @@ private TopicPath getTopicPath() { @Override public PCollection expand(PBegin input) { PCollection subscriptionPartitions; - if (options.partitions().isEmpty()) { - subscriptionPartitions = - input.apply(new SubscriptionPartitionLoader(getTopicPath(), options.subscriptionPath())); - } else { - subscriptionPartitions = - input.apply( - Create.of( - options.partitions().stream() - .map( - partition -> - SubscriptionPartition.of(options.subscriptionPath(), partition)) - .collect(Collectors.toList()))); - } + subscriptionPartitions = + input.apply(new SubscriptionPartitionLoader(getTopicPath(), options.subscriptionPath())); return subscriptionPartitions.apply( ParDo.of( new PerSubscriptionPartitionSdf( // Ensure we read for at least 5 seconds more than the bundle timeout. options.minBundleTimeout().plus(Duration.standardSeconds(5)), + new ManagedBacklogReaderFactoryImpl(this::newBacklogReader), this::newInitialOffsetReader, this::newRestrictionTracker, this::newPartitionProcessor, diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriberOptions.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriberOptions.java index 0d3afe2c60da1..a9625be608fd3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriberOptions.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriberOptions.java @@ -23,6 +23,7 @@ import com.google.api.gax.rpc.ApiException; import com.google.auto.value.AutoValue; +import com.google.cloud.pubsublite.Offset; import com.google.cloud.pubsublite.Partition; import com.google.cloud.pubsublite.SubscriptionPath; import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings; @@ -35,13 +36,13 @@ import com.google.cloud.pubsublite.internal.wire.RoutingMetadata; import com.google.cloud.pubsublite.internal.wire.SubscriberBuilder; import com.google.cloud.pubsublite.internal.wire.SubscriberFactory; +import com.google.cloud.pubsublite.proto.Cursor; +import com.google.cloud.pubsublite.proto.SeekRequest; import com.google.cloud.pubsublite.v1.CursorServiceClient; import com.google.cloud.pubsublite.v1.CursorServiceSettings; import com.google.cloud.pubsublite.v1.SubscriberServiceClient; import com.google.cloud.pubsublite.v1.SubscriberServiceSettings; import java.io.Serializable; -import java.util.Set; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; @@ -68,11 +69,6 @@ public abstract class SubscriberOptions implements Serializable { /** Per-partition flow control parameters for this subscription. */ public abstract FlowControlSettings flowControlSettings(); - /** - * A set of partitions. If empty, continuously poll the set of partitions using an admin client. - */ - public abstract Set partitions(); - /** * The minimum wall time to pass before allowing bundle closure. * @@ -108,7 +104,6 @@ public abstract class SubscriberOptions implements Serializable { public static Builder newBuilder() { Builder builder = new AutoValue_SubscriberOptions.Builder(); return builder - .setPartitions(ImmutableSet.of()) .setFlowControlSettings(DEFAULT_FLOW_CONTROL) .setMinBundleTimeout(MIN_BUNDLE_TIMEOUT); } @@ -119,20 +114,19 @@ private SubscriberServiceClient newSubscriberServiceClient(Partition partition) throws ApiException { try { SubscriberServiceSettings.Builder settingsBuilder = SubscriberServiceSettings.newBuilder(); - settingsBuilder = addDefaultMetadata( PubsubContext.of(FRAMEWORK), RoutingMetadata.of(subscriptionPath(), partition), settingsBuilder); return SubscriberServiceClient.create( - addDefaultSettings(subscriptionPath().location().region(), settingsBuilder)); + addDefaultSettings(subscriptionPath().location().extractRegion(), settingsBuilder)); } catch (Throwable t) { throw toCanonical(t).underlying; } } - SubscriberFactory getSubscriberFactory(Partition partition) { + SubscriberFactory getSubscriberFactory(Partition partition, Offset initialOffset) { SubscriberFactory factory = subscriberFactory(); if (factory != null) { return factory; @@ -143,6 +137,10 @@ SubscriberFactory getSubscriberFactory(Partition partition) { .setSubscriptionPath(subscriptionPath()) .setPartition(partition) .setServiceClient(newSubscriberServiceClient(partition)) + .setInitialLocation( + SeekRequest.newBuilder() + .setCursor(Cursor.newBuilder().setOffset(initialOffset.value())) + .build()) .build(); } @@ -150,7 +148,7 @@ private CursorServiceClient newCursorServiceClient() throws ApiException { try { return CursorServiceClient.create( addDefaultSettings( - subscriptionPath().location().region(), CursorServiceSettings.newBuilder())); + subscriptionPath().location().extractRegion(), CursorServiceSettings.newBuilder())); } catch (Throwable t) { throw toCanonical(t).underlying; } @@ -189,7 +187,7 @@ InitialOffsetReader getInitialOffsetReader(Partition partition) { return new InitialOffsetReaderImpl( CursorClient.create( CursorClientSettings.newBuilder() - .setRegion(subscriptionPath().location().region()) + .setRegion(subscriptionPath().location().extractRegion()) .build()), subscriptionPath(), partition); @@ -201,8 +199,6 @@ public abstract static class Builder { public abstract Builder setSubscriptionPath(SubscriptionPath path); // Optional parameters - public abstract Builder setPartitions(Set partitions); - public abstract Builder setFlowControlSettings(FlowControlSettings flowControlSettings); public abstract Builder setMinBundleTimeout(Duration minBundleTimeout); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionLoader.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionLoader.java index 866e92229ba6c..e411d801a7f7e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionLoader.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionLoader.java @@ -92,7 +92,9 @@ public PollResult apply(TopicPath element, Context c) { }) .withPollInterval(pollDuration) .withTerminationPerInput( - terminate ? Watch.Growth.afterIterations(10) : Watch.Growth.never())); + terminate + ? Watch.Growth.afterTotalOf(pollDuration.multipliedBy(10)) + : Watch.Growth.never())); return partitions.apply( MapElements.into(TypeDescriptor.of(SubscriptionPartition.class)) .via(kv -> SubscriptionPartition.of(subscription, kv.getValue()))); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorFactory.java index 6bf362380ffe0..530c180ebd88b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorFactory.java @@ -19,7 +19,6 @@ import com.google.cloud.pubsublite.proto.SequencedMessage; import java.io.Serializable; -import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; @@ -28,6 +27,6 @@ interface SubscriptionPartitionProcessorFactory extends Serializable { SubscriptionPartitionProcessor newProcessor( SubscriptionPartition subscriptionPartition, - RestrictionTracker tracker, + RestrictionTracker tracker, OutputReceiver receiver); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImpl.java index 8d2a137a27dcb..a086d18b2f654 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImpl.java @@ -17,6 +17,8 @@ */ package org.apache.beam.sdk.io.gcp.pubsublite; +import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.blockingShutdown; + import com.google.api.core.ApiService.Listener; import com.google.api.core.ApiService.State; import com.google.cloud.pubsublite.Offset; @@ -24,9 +26,8 @@ import com.google.cloud.pubsublite.internal.CheckedApiException; import com.google.cloud.pubsublite.internal.ExtractStatus; import com.google.cloud.pubsublite.internal.wire.Subscriber; -import com.google.cloud.pubsublite.proto.Cursor; +import com.google.cloud.pubsublite.internal.wire.SystemExecutors; import com.google.cloud.pubsublite.proto.FlowControlRequest; -import com.google.cloud.pubsublite.proto.SeekRequest; import com.google.cloud.pubsublite.proto.SequencedMessage; import com.google.protobuf.util.Timestamps; import java.util.List; @@ -36,19 +37,17 @@ import java.util.concurrent.TimeoutException; import java.util.function.Consumer; import java.util.function.Function; -import org.apache.beam.sdk.io.range.OffsetRange; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables; -import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors; import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.SettableFuture; import org.joda.time.Duration; import org.joda.time.Instant; class SubscriptionPartitionProcessorImpl extends Listener implements SubscriptionPartitionProcessor { - private final RestrictionTracker tracker; + private final RestrictionTracker tracker; private final OutputReceiver receiver; private final Subscriber subscriber; private final SettableFuture completionFuture = SettableFuture.create(); @@ -57,7 +56,7 @@ class SubscriptionPartitionProcessorImpl extends Listener @SuppressWarnings("methodref.receiver.bound.invalid") SubscriptionPartitionProcessorImpl( - RestrictionTracker tracker, + RestrictionTracker tracker, OutputReceiver receiver, Function>, Subscriber> subscriberFactory, FlowControlSettings flowControlSettings) { @@ -70,23 +69,15 @@ class SubscriptionPartitionProcessorImpl extends Listener @Override @SuppressWarnings("argument.type.incompatible") public void start() throws CheckedApiException { - this.subscriber.addListener(this, MoreExecutors.directExecutor()); + this.subscriber.addListener(this, SystemExecutors.getFuturesExecutor()); this.subscriber.startAsync(); this.subscriber.awaitRunning(); try { - this.subscriber - .seek( - SeekRequest.newBuilder() - .setCursor(Cursor.newBuilder().setOffset(tracker.currentRestriction().getFrom())) - .build()) - .get(); this.subscriber.allowFlow( FlowControlRequest.newBuilder() .setAllowedBytes(flowControlSettings.bytesOutstanding()) .setAllowedMessages(flowControlSettings.messagesOutstanding()) .build()); - } catch (ExecutionException e) { - throw ExtractStatus.toCanonical(e.getCause()); } catch (Throwable t) { throw ExtractStatus.toCanonical(t); } @@ -125,7 +116,7 @@ public void failed(State from, Throwable failure) { @Override public void close() { - subscriber.stopAsync().awaitTerminated(); + blockingShutdown(subscriber); } @Override diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReaderSettings.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReaderSettings.java index 8c1dd9439e51a..79db0f19f5dd9 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReaderSettings.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TopicBacklogReaderSettings.java @@ -62,7 +62,7 @@ Builder setTopicPathFromSubscriptionPath(SubscriptionPath subscriptionPath) try (AdminClient adminClient = AdminClient.create( AdminClientSettings.newBuilder() - .setRegion(subscriptionPath.location().region()) + .setRegion(subscriptionPath.location().extractRegion()) .build())) { return setTopicPath( TopicPath.parse(adminClient.getSubscription(subscriptionPath).get().getTopic())); @@ -81,7 +81,9 @@ Builder setTopicPathFromSubscriptionPath(SubscriptionPath subscriptionPath) TopicBacklogReader instantiate() throws ApiException { TopicStatsClientSettings settings = - TopicStatsClientSettings.newBuilder().setRegion(topicPath().location().region()).build(); + TopicStatsClientSettings.newBuilder() + .setRegion(topicPath().location().extractRegion()) + .build(); TopicBacklogReader impl = new TopicBacklogReaderImpl(TopicStatsClient.create(settings), topicPath(), partition()); return new LimitingTopicBacklogReader(impl, Ticker.systemTicker()); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TrackerWithProgress.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TrackerWithProgress.java new file mode 100644 index 0000000000000..7f0d0309a5971 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsublite/TrackerWithProgress.java @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsublite; + +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress; + +public abstract class TrackerWithProgress + extends RestrictionTracker implements HasProgress {} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTrackerTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTrackerTest.java index f34ebb6a745e3..5a31f4fc686da 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTrackerTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/OffsetByteRangeTrackerTest.java @@ -49,7 +49,7 @@ public class OffsetByteRangeTrackerTest { private static final double IGNORED_FRACTION = -10000000.0; private static final long MIN_BYTES = 1000; private static final OffsetRange RANGE = new OffsetRange(123L, Long.MAX_VALUE); - private final TopicBacklogReader reader = mock(TopicBacklogReader.class); + private final TopicBacklogReader unownedBacklogReader = mock(TopicBacklogReader.class); @Spy Ticker ticker; private OffsetByteRangeTracker tracker; @@ -60,14 +60,18 @@ public void setUp() { when(ticker.read()).thenReturn(0L); tracker = new OffsetByteRangeTracker( - RANGE, reader, Stopwatch.createUnstarted(ticker), Duration.millis(500), MIN_BYTES); + OffsetByteRange.of(RANGE, 0), + unownedBacklogReader, + Stopwatch.createUnstarted(ticker), + Duration.millis(500), + MIN_BYTES); } @Test public void progressTracked() { assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(123), 10))); assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(124), 11))); - when(reader.computeMessageStats(Offset.of(125))) + when(unownedBacklogReader.computeMessageStats(Offset.of(125))) .thenReturn(ComputeMessageStatsResponse.newBuilder().setMessageBytes(1000).build()); Progress progress = tracker.getProgress(); assertEquals(21, progress.getWorkCompleted(), .0001); @@ -76,7 +80,7 @@ public void progressTracked() { @Test public void getProgressStatsFailure() { - when(reader.computeMessageStats(Offset.of(123))) + when(unownedBacklogReader.computeMessageStats(Offset.of(123))) .thenThrow(new CheckedApiException(Code.INTERNAL).underlying); assertThrows(ApiException.class, tracker::getProgress); } @@ -86,11 +90,15 @@ public void getProgressStatsFailure() { public void claimSplitSuccess() { assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(1_000), MIN_BYTES))); assertTrue(tracker.tryClaim(OffsetByteProgress.of(Offset.of(10_000), MIN_BYTES))); - SplitResult splits = tracker.trySplit(IGNORED_FRACTION); - assertEquals(RANGE.getFrom(), splits.getPrimary().getFrom()); - assertEquals(10_001, splits.getPrimary().getTo()); - assertEquals(10_001, splits.getResidual().getFrom()); - assertEquals(Long.MAX_VALUE, splits.getResidual().getTo()); + SplitResult splits = tracker.trySplit(IGNORED_FRACTION); + OffsetByteRange primary = splits.getPrimary(); + assertEquals(RANGE.getFrom(), primary.getRange().getFrom()); + assertEquals(10_001, primary.getRange().getTo()); + assertEquals(MIN_BYTES * 2, primary.getByteCount()); + OffsetByteRange residual = splits.getResidual(); + assertEquals(10_001, residual.getRange().getFrom()); + assertEquals(Long.MAX_VALUE, residual.getRange().getTo()); + assertEquals(0, residual.getByteCount()); assertEquals(splits.getPrimary(), tracker.currentRestriction()); tracker.checkDone(); assertNull(tracker.trySplit(IGNORED_FRACTION)); @@ -100,10 +108,10 @@ public void claimSplitSuccess() { @SuppressWarnings({"dereference.of.nullable", "argument.type.incompatible"}) public void splitWithoutClaimEmpty() { when(ticker.read()).thenReturn(100000000000000L); - SplitResult splits = tracker.trySplit(IGNORED_FRACTION); - assertEquals(RANGE.getFrom(), splits.getPrimary().getFrom()); - assertEquals(RANGE.getFrom(), splits.getPrimary().getTo()); - assertEquals(RANGE, splits.getResidual()); + SplitResult splits = tracker.trySplit(IGNORED_FRACTION); + assertEquals(RANGE.getFrom(), splits.getPrimary().getRange().getFrom()); + assertEquals(RANGE.getFrom(), splits.getPrimary().getRange().getTo()); + assertEquals(RANGE, splits.getResidual().getRange()); assertEquals(splits.getPrimary(), tracker.currentRestriction()); tracker.checkDone(); assertNull(tracker.trySplit(IGNORED_FRACTION)); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdfTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdfTest.java index 598037eef5f33..0a4e3e7458f59 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdfTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/PerSubscriptionPartitionSdfTest.java @@ -28,6 +28,7 @@ import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; @@ -51,6 +52,8 @@ import org.apache.beam.sdk.transforms.SerializableBiFunction; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.Progress; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.DoubleMath; import org.joda.time.Duration; import org.junit.Before; import org.junit.Test; @@ -65,22 +68,24 @@ public class PerSubscriptionPartitionSdfTest { private static final Duration MAX_SLEEP_TIME = Duration.standardMinutes(10).plus(Duration.millis(10)); - private static final OffsetRange RESTRICTION = new OffsetRange(1, Long.MAX_VALUE); + private static final OffsetByteRange RESTRICTION = + OffsetByteRange.of(new OffsetRange(1, Long.MAX_VALUE), 0); private static final SubscriptionPartition PARTITION = SubscriptionPartition.of(example(SubscriptionPath.class), example(Partition.class)); @Mock SerializableFunction offsetReaderFactory; + @Mock ManagedBacklogReaderFactory backlogReaderFactory; + @Mock TopicBacklogReader backlogReader; + @Mock - SerializableBiFunction< - SubscriptionPartition, OffsetRange, RestrictionTracker> - trackerFactory; + SerializableBiFunction trackerFactory; @Mock SubscriptionPartitionProcessorFactory processorFactory; @Mock SerializableFunction committerFactory; @Mock InitialOffsetReader initialOffsetReader; - @Spy RestrictionTracker tracker; + @Spy TrackerWithProgress tracker; @Mock OutputReceiver output; @Mock SubscriptionPartitionProcessor processor; @@ -98,9 +103,11 @@ public void setUp() { when(trackerFactory.apply(any(), any())).thenReturn(tracker); when(committerFactory.apply(any())).thenReturn(committer); when(tracker.currentRestriction()).thenReturn(RESTRICTION); + when(backlogReaderFactory.newReader(any())).thenReturn(backlogReader); sdf = new PerSubscriptionPartitionSdf( MAX_SLEEP_TIME, + backlogReaderFactory, offsetReaderFactory, trackerFactory, processorFactory, @@ -110,9 +117,10 @@ public void setUp() { @Test public void getInitialRestrictionReadSuccess() { when(initialOffsetReader.read()).thenReturn(example(Offset.class)); - OffsetRange range = sdf.getInitialRestriction(PARTITION); - assertEquals(example(Offset.class).value(), range.getFrom()); - assertEquals(Long.MAX_VALUE, range.getTo()); + OffsetByteRange range = sdf.getInitialRestriction(PARTITION); + assertEquals(example(Offset.class).value(), range.getRange().getFrom()); + assertEquals(Long.MAX_VALUE, range.getRange().getTo()); + assertEquals(0, range.getByteCount()); verify(offsetReaderFactory).apply(PARTITION); } @@ -125,7 +133,13 @@ public void getInitialRestrictionReadFailure() { @Test public void newTrackerCallsFactory() { assertSame(tracker, sdf.newTracker(PARTITION, RESTRICTION)); - verify(trackerFactory).apply(PARTITION, RESTRICTION); + verify(trackerFactory).apply(backlogReader, RESTRICTION); + } + + @Test + public void tearDownClosesBacklogReaderFactory() { + sdf.teardown(); + verify(backlogReaderFactory).close(); } @Test @@ -159,12 +173,48 @@ public void process() throws Exception { order2.verify(committer).awaitTerminated(); } + private static final class NoopManagedBacklogReaderFactory + implements ManagedBacklogReaderFactory { + @Override + public TopicBacklogReader newReader(SubscriptionPartition subscriptionPartition) { + return null; + } + + @Override + public void close() {} + } + @Test @SuppressWarnings("return.type.incompatible") public void dofnIsSerializable() throws Exception { ObjectOutputStream output = new ObjectOutputStream(new ByteArrayOutputStream()); output.writeObject( new PerSubscriptionPartitionSdf( - MAX_SLEEP_TIME, x -> null, (x, y) -> null, (x, y, z) -> null, (x) -> null)); + MAX_SLEEP_TIME, + new NoopManagedBacklogReaderFactory(), + x -> null, + (x, y) -> null, + (x, y, z) -> null, + (x) -> null)); + } + + @Test + public void getProgressUnboundedRangeDelegates() { + Progress progress = Progress.from(0, 0.2); + when(tracker.getProgress()).thenReturn(progress); + assertTrue( + DoubleMath.fuzzyEquals( + progress.getWorkRemaining(), sdf.getSize(PARTITION, RESTRICTION), .0001)); + verify(tracker).getProgress(); + } + + @Test + public void getProgressBoundedReturnsBytes() { + assertTrue( + DoubleMath.fuzzyEquals( + 123.0, + sdf.getSize(PARTITION, OffsetByteRange.of(new OffsetRange(87, 8000), 123)), + .0001)); + verifyNoInteractions(tracker); } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java new file mode 100644 index 0000000000000..e2429423dd0c9 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/ReadWriteIT.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsublite; + +import static org.apache.beam.sdk.util.Preconditions.checkArgumentNotNull; +import static org.junit.Assert.fail; + +import com.google.cloud.pubsublite.AdminClient; +import com.google.cloud.pubsublite.AdminClientSettings; +import com.google.cloud.pubsublite.BacklogLocation; +import com.google.cloud.pubsublite.CloudZone; +import com.google.cloud.pubsublite.Message; +import com.google.cloud.pubsublite.ProjectId; +import com.google.cloud.pubsublite.SubscriptionName; +import com.google.cloud.pubsublite.SubscriptionPath; +import com.google.cloud.pubsublite.TopicName; +import com.google.cloud.pubsublite.TopicPath; +import com.google.cloud.pubsublite.proto.PubSubMessage; +import com.google.cloud.pubsublite.proto.SequencedMessage; +import com.google.cloud.pubsublite.proto.Subscription; +import com.google.cloud.pubsublite.proto.Subscription.DeliveryConfig.DeliveryRequirement; +import com.google.cloud.pubsublite.proto.Topic; +import com.google.cloud.pubsublite.proto.Topic.PartitionConfig.Capacity; +import com.google.errorprone.annotations.concurrent.GuardedBy; +import com.google.protobuf.ByteString; +import java.util.ArrayDeque; +import java.util.ArrayList; +import java.util.Deque; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.extensions.gcp.options.GcpOptions; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.options.StreamingOptions; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.TestPipelineOptions; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.FlatMapElements; +import org.apache.beam.sdk.transforms.MapElements; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.SimpleFunction; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList; +import org.joda.time.Duration; +import org.junit.After; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@RunWith(JUnit4.class) +public class ReadWriteIT { + private static final Logger LOG = LoggerFactory.getLogger(ReadWriteIT.class); + private static final CloudZone ZONE = CloudZone.parse("us-central1-b"); + private static final int MESSAGE_COUNT = 90; + + @Rule public transient TestPipeline pipeline = TestPipeline.create(); + + private static ProjectId getProject(PipelineOptions options) { + return ProjectId.of(checkArgumentNotNull(options.as(GcpOptions.class).getProject())); + } + + private static String randomName() { + return "beam_it_resource_" + ThreadLocalRandom.current().nextLong(); + } + + private static AdminClient newAdminClient() { + return AdminClient.create(AdminClientSettings.newBuilder().setRegion(ZONE.region()).build()); + } + + private final Deque cleanupActions = new ArrayDeque<>(); + + private TopicPath createTopic(ProjectId id) throws Exception { + TopicPath toReturn = + TopicPath.newBuilder() + .setProject(id) + .setLocation(ZONE) + .setName(TopicName.of(randomName())) + .build(); + Topic.Builder topic = Topic.newBuilder().setName(toReturn.toString()); + topic + .getPartitionConfigBuilder() + .setCount(2) + .setCapacity(Capacity.newBuilder().setPublishMibPerSec(4).setSubscribeMibPerSec(4)); + topic.getRetentionConfigBuilder().setPerPartitionBytes(30 * (1L << 30)); + cleanupActions.addLast( + () -> { + try (AdminClient client = newAdminClient()) { + client.deleteTopic(toReturn).get(); + } catch (Throwable t) { + LOG.error("Failed to clean up topic.", t); + } + }); + try (AdminClient client = newAdminClient()) { + client.createTopic(topic.build()).get(); + } + return toReturn; + } + + private SubscriptionPath createSubscription(TopicPath topic) throws Exception { + SubscriptionPath toReturn = + SubscriptionPath.newBuilder() + .setProject(topic.project()) + .setLocation(ZONE) + .setName(SubscriptionName.of(randomName())) + .build(); + Subscription.Builder subscription = Subscription.newBuilder().setName(toReturn.toString()); + subscription + .getDeliveryConfigBuilder() + .setDeliveryRequirement(DeliveryRequirement.DELIVER_IMMEDIATELY); + subscription.setTopic(topic.toString()); + cleanupActions.addLast( + () -> { + try (AdminClient client = newAdminClient()) { + client.deleteSubscription(toReturn).get(); + } catch (Throwable t) { + LOG.error("Failed to clean up subscription.", t); + } + }); + try (AdminClient client = newAdminClient()) { + client.createSubscription(subscription.build(), BacklogLocation.BEGINNING).get(); + } + return toReturn; + } + + @After + public void tearDown() { + while (!cleanupActions.isEmpty()) { + cleanupActions.removeLast().run(); + } + } + + // Workaround for BEAM-12867 + // TODO(BEAM-12867): Remove this. + private static class CustomCreate extends PTransform, PCollection> { + @Override + public PCollection expand(PCollection input) { + return input.apply( + "createIndexes", + FlatMapElements.via( + new SimpleFunction>() { + @Override + public Iterable apply(Void input) { + return IntStream.range(0, MESSAGE_COUNT).boxed().collect(Collectors.toList()); + } + })); + } + } + + public static void writeMessages(TopicPath topicPath, Pipeline pipeline) { + PCollection trigger = pipeline.apply(Create.of((Void) null)); + PCollection indexes = trigger.apply("createIndexes", new CustomCreate()); + PCollection messages = + indexes.apply( + "createMessages", + MapElements.via( + new SimpleFunction( + index -> + Message.builder() + .setData(ByteString.copyFromUtf8(index.toString())) + .build() + .toProto()) {})); + // Add UUIDs to messages for later deduplication. + messages = messages.apply("addUuids", PubsubLiteIO.addUuids()); + messages.apply( + "writeMessages", + PubsubLiteIO.write(PublisherOptions.newBuilder().setTopicPath(topicPath).build())); + } + + public static PCollection readMessages( + SubscriptionPath subscriptionPath, Pipeline pipeline) { + PCollection messages = + pipeline.apply( + "readMessages", + PubsubLiteIO.read( + SubscriberOptions.newBuilder() + .setSubscriptionPath(subscriptionPath) + // setMinBundleTimeout INTENDED FOR TESTING ONLY + // This sacrifices efficiency to make tests run faster. Do not use this in a + // real pipeline! + .setMinBundleTimeout(Duration.standardSeconds(5)) + .build())); + // Deduplicate messages based on the uuids added in PubsubLiteIO.addUuids() when writing. + return messages.apply( + "dedupeMessages", PubsubLiteIO.deduplicate(UuidDeduplicationOptions.newBuilder().build())); + } + + // This static out of band communication is needed to retain serializability. + @GuardedBy("ReadWriteIT.class") + private static final List received = new ArrayList<>(); + + private static synchronized void addMessageReceived(SequencedMessage message) { + received.add(message); + } + + private static synchronized List getTestQuickstartReceived() { + return ImmutableList.copyOf(received); + } + + private static PTransform, PCollection> + collectTestQuickstart() { + return MapElements.via( + new SimpleFunction() { + @Override + public Void apply(SequencedMessage input) { + addMessageReceived(input); + return null; + } + }); + } + + @Test + public void testReadWrite() throws Exception { + pipeline.getOptions().as(StreamingOptions.class).setStreaming(true); + pipeline.getOptions().as(TestPipelineOptions.class).setBlockOnRun(false); + + TopicPath topic = createTopic(getProject(pipeline.getOptions())); + SubscriptionPath subscription = createSubscription(topic); + + // Publish some messages + writeMessages(topic, pipeline); + + // Read some messages. They should be deduplicated by the time we see them, so there should be + // exactly numMessages, one for every index in [0,MESSAGE_COUNT). + PCollection messages = readMessages(subscription, pipeline); + messages.apply("messageReceiver", collectTestQuickstart()); + pipeline.run(); + LOG.info("Running!"); + for (int round = 0; round < 120; ++round) { + Thread.sleep(1000); + Map receivedCounts = new HashMap<>(); + for (SequencedMessage message : getTestQuickstartReceived()) { + int id = Integer.parseInt(message.getMessage().getData().toStringUtf8()); + receivedCounts.put(id, receivedCounts.getOrDefault(id, 0) + 1); + } + LOG.info("Performing comparison round {}.\n", round); + boolean done = true; + List missing = new ArrayList<>(); + for (int id = 0; id < MESSAGE_COUNT; id++) { + int idCount = receivedCounts.getOrDefault(id, 0); + if (idCount == 0) { + missing.add(id); + done = false; + } + if (idCount > 1) { + fail(String.format("Failed to deduplicate message with id %s.", id)); + } + } + LOG.info("Still messing messages: {}.\n", missing); + if (done) { + return; + } + } + fail( + String.format( + "Failed to receive all messages after 2 minutes. Received %s messages.", + getTestQuickstartReceived().size())); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImplTest.java index dbf3b939d0835..3d74375897a06 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImplTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsublite/SubscriptionPartitionProcessorImplTest.java @@ -31,7 +31,6 @@ import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; -import com.google.api.core.ApiFutures; import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.StatusCode.Code; import com.google.cloud.pubsublite.Offset; @@ -40,7 +39,6 @@ import com.google.cloud.pubsublite.internal.wire.Subscriber; import com.google.cloud.pubsublite.proto.Cursor; import com.google.cloud.pubsublite.proto.FlowControlRequest; -import com.google.cloud.pubsublite.proto.SeekRequest; import com.google.cloud.pubsublite.proto.SequencedMessage; import com.google.protobuf.util.Timestamps; import java.util.List; @@ -64,7 +62,7 @@ @RunWith(JUnit4.class) @SuppressWarnings("initialization.fields.uninitialized") public class SubscriptionPartitionProcessorImplTest { - @Spy RestrictionTracker tracker; + @Spy RestrictionTracker tracker; @Mock OutputReceiver receiver; @Mock Function>, Subscriber> subscriberFactory; @@ -83,6 +81,10 @@ private static SequencedMessage messageWithOffset(long offset) { .build(); } + private OffsetByteRange initialRange() { + return OffsetByteRange.of(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE)); + } + @Before public void setUp() { initMocks(this); @@ -100,17 +102,10 @@ public void setUp() { @Test public void lifecycle() throws Exception { - when(tracker.currentRestriction()) - .thenReturn(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE)); - when(subscriber.seek(any())).thenReturn(ApiFutures.immediateFuture(example(Offset.class))); + when(tracker.currentRestriction()).thenReturn(initialRange()); processor.start(); verify(subscriber).startAsync(); verify(subscriber).awaitRunning(); - verify(subscriber) - .seek( - SeekRequest.newBuilder() - .setCursor(Cursor.newBuilder().setOffset(example(Offset.class).value())) - .build()); verify(subscriber) .allowFlow( FlowControlRequest.newBuilder() @@ -123,29 +118,15 @@ public void lifecycle() throws Exception { } @Test - public void lifecycleSeekThrows() throws Exception { - when(tracker.currentRestriction()) - .thenReturn(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE)); - when(subscriber.seek(any())) - .thenReturn(ApiFutures.immediateFailedFuture(new CheckedApiException(Code.OUT_OF_RANGE))); + public void lifecycleFlowControlThrows() throws Exception { + when(tracker.currentRestriction()).thenReturn(initialRange()); doThrow(new CheckedApiException(Code.OUT_OF_RANGE)).when(subscriber).allowFlow(any()); assertThrows(CheckedApiException.class, () -> processor.start()); } - @Test - public void lifecycleFlowControlThrows() { - when(tracker.currentRestriction()) - .thenReturn(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE)); - when(subscriber.seek(any())) - .thenReturn(ApiFutures.immediateFailedFuture(new CheckedApiException(Code.OUT_OF_RANGE))); - assertThrows(CheckedApiException.class, () -> processor.start()); - } - @Test public void lifecycleSubscriberAwaitThrows() throws Exception { - when(tracker.currentRestriction()) - .thenReturn(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE)); - when(subscriber.seek(any())).thenReturn(ApiFutures.immediateFuture(example(Offset.class))); + when(tracker.currentRestriction()).thenReturn(initialRange()); processor.start(); doThrow(new CheckedApiException(Code.INTERNAL).underlying).when(subscriber).awaitTerminated(); assertThrows(ApiException.class, () -> processor.close()); @@ -155,21 +136,19 @@ public void lifecycleSubscriberAwaitThrows() throws Exception { @Test public void subscriberFailureFails() throws Exception { - when(tracker.currentRestriction()) - .thenReturn(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE)); - when(subscriber.seek(any())).thenReturn(ApiFutures.immediateFuture(example(Offset.class))); + when(tracker.currentRestriction()).thenReturn(initialRange()); processor.start(); subscriber.fail(new CheckedApiException(Code.OUT_OF_RANGE)); ApiException e = - assertThrows(ApiException.class, () -> processor.waitForCompletion(Duration.ZERO)); + assertThrows( + // Longer wait is needed due to listener asynchrony. + ApiException.class, () -> processor.waitForCompletion(Duration.standardSeconds(1))); assertEquals(Code.OUT_OF_RANGE, e.getStatusCode().getCode()); } @Test public void allowFlowFailureFails() throws Exception { - when(tracker.currentRestriction()) - .thenReturn(new OffsetRange(example(Offset.class).value(), Long.MAX_VALUE)); - when(subscriber.seek(any())).thenReturn(ApiFutures.immediateFuture(example(Offset.class))); + when(tracker.currentRestriction()).thenReturn(initialRange()); processor.start(); when(tracker.tryClaim(any())).thenReturn(true); doThrow(new CheckedApiException(Code.OUT_OF_RANGE)).when(subscriber).allowFlow(any());