Skip to content

Commit

Permalink
fix: Fix bundle finalizers (#514)
Browse files Browse the repository at this point in the history
* fix: Fix per-partition SDF to not use BundleFinalizer

* feat: reimport

* fix: bundle finalizers

* fix: SubscriberOptions

* fix: SubscriberOptions

* fix: pom

* fix: pom

* fix: pom

* fix: pom

* fix: pom

* fix: pom

* fix: pom

* fix: pom
  • Loading branch information
dpcollins-google committed Feb 24, 2021
1 parent 041b3a5 commit 9d71415
Show file tree
Hide file tree
Showing 18 changed files with 125 additions and 66 deletions.
68 changes: 66 additions & 2 deletions pubsublite-beam-io/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,58 @@
</dependency>
<dependency>
<groupId>com.google.auto.service</groupId>
<artifactId>auto-service</artifactId>
<version>1.0-rc7</version>
<artifactId>auto-service-annotations</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.findbugs</groupId>
<artifactId>jsr305</artifactId>
</dependency>
<dependency>
<groupId>com.google.api</groupId>
<artifactId>gax</artifactId>
</dependency>
<dependency>
<groupId>com.google.errorprone</groupId>
<artifactId>error_prone_annotations</artifactId>
</dependency>
<dependency>
<groupId>com.google.api</groupId>
<artifactId>api-common</artifactId>
</dependency>
<dependency>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value-annotations</artifactId>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.30</version>
</dependency>
<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<version>2.10.5</version>
</dependency>
<dependency>
<groupId>org.checkerframework</groupId>
<artifactId>checker-qual</artifactId>
<version>3.7.0</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
<dependency>
<groupId>javax.annotation</groupId>
<artifactId>javax.annotation-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
Expand All @@ -51,6 +101,12 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest-core</artifactId>
<version>1.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest</artifactId>
Expand All @@ -74,6 +130,14 @@
<skip>true</skip>
</configuration>
</plugin>
<plugin>
<!-- Unable to disable warnings for beam-runners-direct-java -->
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<configuration>
<skip>true</skip>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@
package com.google.cloud.pubsublite.beam;

import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import com.google.common.base.Ticker;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Ticker;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheBuilder;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.CacheLoader;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LoadingCache;

final class LimitingTopicBacklogReader implements TopicBacklogReader {
private final TopicBacklogReader underlying;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,19 @@

package com.google.cloud.pubsublite.beam;

import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;

import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import com.google.common.base.Stopwatch;
import java.util.concurrent.TimeUnit;
import javax.annotation.Nullable;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.HasProgress;
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Stopwatch;
import org.joda.time.Duration;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.google.cloud.pubsublite.beam;

import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.wire.Committer;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import org.apache.beam.sdk.io.range.OffsetRange;
Expand Down Expand Up @@ -69,8 +70,7 @@ public MonotonicallyIncreasing newWatermarkEstimator(@WatermarkEstimatorState In
public ProcessContinuation processElement(
RestrictionTracker<OffsetRange, OffsetByteProgress> tracker,
@Element SubscriptionPartition subscriptionPartition,
OutputReceiver<SequencedMessage> receiver,
BundleFinalizer finalizer)
OutputReceiver<SequencedMessage> receiver)
throws Exception {
try (SubscriptionPartitionProcessor processor =
processorFactory.newProcessor(subscriptionPartition, tracker, receiver)) {
Expand All @@ -79,16 +79,17 @@ public ProcessContinuation processElement(
processor
.lastClaimed()
.ifPresent(
lastClaimedOffset ->
finalizer.afterBundleCommit(
Instant.ofEpochMilli(Long.MAX_VALUE),
() -> {
Committer committer = committerFactory.apply(subscriptionPartition);
committer.startAsync().awaitRunning();
// Commit the next-to-deliver offset.
committer.commitOffset(Offset.of(lastClaimedOffset.value() + 1)).get();
committer.stopAsync().awaitTerminated();
}));
lastClaimedOffset -> {
Committer committer = committerFactory.apply(subscriptionPartition);
committer.startAsync().awaitRunning();
// Commit the next-to-deliver offset.
try {
committer.commitOffset(Offset.of(lastClaimedOffset.value() + 1)).get();
} catch (Exception e) {
throw ExtractStatus.toCanonical(e).underlying;
}
committer.stopAsync().awaitTerminated();
});
return result;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.common.annotations.VisibleForTesting;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.HashMap;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;

/** A map of working publishers by PublisherOptions. */
class PublisherCache {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import com.google.cloud.pubsublite.v1.AdminServiceSettings;
import com.google.cloud.pubsublite.v1.PublisherServiceClient;
import com.google.cloud.pubsublite.v1.PublisherServiceSettings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.reflect.TypeToken;
import com.google.common.reflect.TypeToken;

class Publishers {
private static final Framework FRAMEWORK = Framework.of("BEAM");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,14 @@
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.function.Consumer;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;

/** A sink which publishes messages to Pub/Sub Lite. */
@SuppressWarnings({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import com.google.cloud.pubsublite.internal.wire.Committer;
import com.google.cloud.pubsublite.internal.wire.Subscriber;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.common.base.Stopwatch;
import com.google.common.math.LongMath;
import java.util.List;
import java.util.function.Consumer;
import java.util.stream.Collectors;
Expand All @@ -38,8 +40,6 @@
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Stopwatch;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.math.LongMath;
import org.joda.time.Duration;

class SubscribeTransform extends PTransform<PBegin, PCollection<SequencedMessage>> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@
import com.google.cloud.pubsublite.internal.wire.RoutingMetadata;
import com.google.cloud.pubsublite.internal.wire.SubscriberBuilder;
import com.google.cloud.pubsublite.internal.wire.SubscriberFactory;
import com.google.cloud.pubsublite.v1.CursorServiceClient;
import com.google.cloud.pubsublite.v1.CursorServiceSettings;
import com.google.cloud.pubsublite.v1.SubscriberServiceClient;
import com.google.cloud.pubsublite.v1.SubscriberServiceSettings;
import com.google.common.collect.ImmutableSet;
import java.io.Serializable;
import java.util.Set;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableSet;
import org.checkerframework.checker.nullness.qual.Nullable;

@AutoValue
Expand Down Expand Up @@ -127,6 +129,16 @@ SubscriberFactory getSubscriberFactory(Partition partition) {
.build();
}

private CursorServiceClient newCursorServiceClient() throws ApiException {
try {
return CursorServiceClient.create(
addDefaultSettings(
subscriptionPath().location().region(), CursorServiceSettings.newBuilder()));
} catch (Throwable t) {
throw toCanonical(t).underlying;
}
}

Committer getCommitter(Partition partition) {
SerializableSupplier<Committer> supplier = committerSupplier();
if (supplier != null) {
Expand All @@ -135,6 +147,7 @@ Committer getCommitter(Partition partition) {
return CommitterSettings.newBuilder()
.setSubscriptionPath(subscriptionPath())
.setPartition(partition)
.setServiceClient(newCursorServiceClient())
.build()
.instantiate();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@

package com.google.cloud.pubsublite.beam;

import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import static com.google.common.base.Preconditions.checkArgument;

import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.PartitionLookupUtils;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicPath;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand All @@ -36,8 +38,6 @@
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.joda.time.Duration;
import org.joda.time.Instant;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
import com.google.cloud.pubsublite.proto.FlowControlRequest;
import com.google.cloud.pubsublite.proto.SeekRequest;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import com.google.common.collect.Iterables;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.SettableFuture;
import com.google.protobuf.util.Timestamps;
import java.util.List;
import java.util.Optional;
Expand All @@ -39,9 +42,6 @@
import org.apache.beam.sdk.transforms.DoFn.OutputReceiver;
import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.MoreExecutors;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.SettableFuture;
import org.joda.time.Duration;
import org.joda.time.Instant;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.google.cloud.pubsublite.beam;

import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Offset;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

package com.google.cloud.pubsublite.beam;

import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkNotNull;

import com.google.api.gax.rpc.ApiException;
import com.google.auto.value.AutoValue;
Expand All @@ -28,10 +28,10 @@
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.TopicStatsClient;
import com.google.cloud.pubsublite.internal.TopicStatsClientSettings;
import com.google.common.base.Ticker;
import java.io.Serializable;
import java.util.concurrent.ExecutionException;
import javax.annotation.Nonnull;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Ticker;

@AutoValue
abstract class TopicBacklogReaderSettings implements Serializable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package com.google.cloud.pubsublite.beam;

import com.google.cloud.pubsublite.proto.PubSubMessage;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import com.google.protobuf.ByteString;
import java.util.HashSet;
import java.util.Set;
Expand All @@ -26,8 +28,6 @@
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Sets;
import org.joda.time.Duration;
import org.junit.Rule;
import org.junit.Test;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,11 @@
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import com.google.common.base.Stopwatch;
import com.google.common.base.Ticker;
import org.apache.beam.sdk.io.range.OffsetRange;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker.Progress;
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Stopwatch;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Ticker;
import org.joda.time.Duration;
import org.junit.Before;
import org.junit.Test;
Expand Down
Loading

0 comments on commit 9d71415

Please sign in to comment.