Skip to content

Commit

Permalink
fix: Move settings for the topic backlog reader to TopicBacklogReader…
Browse files Browse the repository at this point in the history
…Settings (#254)

* Add an independent TopicBacklogReaderSettings

* Delete Example.java
  • Loading branch information
palmere-google committed Sep 24, 2020
1 parent 0e20d80 commit 0ee60eb
Show file tree
Hide file tree
Showing 4 changed files with 102 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public UnboundedReader<SequencedMessage> createReader(
return new PubsubLiteUnboundedReader(
this,
statesBuilder.build(),
subscriberOptions.topicBacklogReader(),
TopicBacklogReader.create(subscriberOptions.topicBacklogReaderSettings()),
Ticker.systemTicker());
} catch (StatusException e) {
throw new IOException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,10 @@
package com.google.cloud.pubsublite.beam;

import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.PartitionLookupUtils;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.TopicStatsClient;
import com.google.cloud.pubsublite.internal.TopicStatsClientSettings;
import com.google.cloud.pubsublite.internal.wire.Committer;
import com.google.cloud.pubsublite.internal.wire.CommitterBuilder;
import com.google.cloud.pubsublite.internal.wire.PubsubContext;
Expand All @@ -40,7 +34,6 @@
import com.google.common.collect.ImmutableSet;
import io.grpc.StatusException;
import java.io.Serializable;
import java.util.concurrent.ExecutionException;

@AutoValue
public abstract class SubscriberOptions implements Serializable {
Expand All @@ -57,8 +50,8 @@ public abstract class SubscriberOptions implements Serializable {
/** A set of partitions. If empty, retrieve the set of partitions using an admin client. */
public abstract ImmutableSet<Partition> partitions();

/** The class used to read backlog for the subscription described by subscriptionPath() */
public abstract TopicBacklogReader topicBacklogReader();
/** The class used to read backlog for the subscription described by subscriptionPath(). */
public abstract TopicBacklogReaderSettings topicBacklogReaderSettings();

/** A supplier for the subscriber stub to be used. */
public abstract Optional<SerializableSupplier<SubscriberServiceStub>> subscriberStubSupplier();
Expand Down Expand Up @@ -143,7 +136,8 @@ public abstract Builder setSubscriberStubSupplier(
public abstract Builder setCommitterStubSupplier(
SerializableSupplier<CursorServiceStub> stubSupplier);

public abstract Builder setTopicBacklogReader(TopicBacklogReader topicBacklogReader);
public abstract Builder setTopicBacklogReaderSettings(
TopicBacklogReaderSettings topicBacklogReaderSettings);

// Used in unit tests
abstract Builder setSubscriberFactory(SubscriberFactory subscriberFactory);
Expand All @@ -155,41 +149,29 @@ public abstract Builder setCommitterStubSupplier(

abstract ImmutableSet<Partition> partitions();

abstract Optional<TopicBacklogReader> topicBacklogReader();
abstract Optional<TopicBacklogReaderSettings> topicBacklogReaderSettings();

abstract SubscriberOptions autoBuild();

public SubscriberOptions build() throws StatusException {
if (!partitions().isEmpty() && topicBacklogReader().isPresent()) {
if (!partitions().isEmpty() && topicBacklogReaderSettings().isPresent()) {
return autoBuild();
}
TopicPath path;
try (AdminClient adminClient =
AdminClient.create(
AdminClientSettings.newBuilder()
.setRegion(subscriptionPath().location().region())
.build())) {
path = TopicPath.parse(adminClient.getSubscription(subscriptionPath()).get().getTopic());
} catch (ExecutionException e) {
throw ExtractStatus.toCanonical(e.getCause());
} catch (Throwable t) {
throw ExtractStatus.toCanonical(t);
}

if (partitions().isEmpty()) {
int partition_count = PartitionLookupUtils.numPartitions(path);
int partitionCount = PartitionLookupUtils.numPartitions(subscriptionPath());
ImmutableSet.Builder<Partition> partitions = ImmutableSet.builder();
for (int i = 0; i < partition_count; i++) {
for (int i = 0; i < partitionCount; i++) {
partitions.add(Partition.of(i));
}
setPartitions(partitions.build());
}
if (!topicBacklogReader().isPresent()) {
setTopicBacklogReader(
new TopicBacklogReaderImpl(
TopicStatsClient.create(TopicStatsClientSettings.newBuilder().build()), path));
if (!topicBacklogReaderSettings().isPresent()) {
setTopicBacklogReaderSettings(
TopicBacklogReaderSettings.newBuilder()
.setTopicPathFromSubscriptionPath(subscriptionPath())
.build());
}

return autoBuild();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import io.grpc.StatusException;
import java.util.Map;

/**
Expand All @@ -27,6 +28,10 @@
* partitions within a subscription.
*/
public interface TopicBacklogReader {
/** Create a TopicBacklogReader from settings. */
static TopicBacklogReader create(TopicBacklogReaderSettings settings) throws StatusException {
return settings.instantiate();
}
/**
* Compute and aggregate message statistics for message between the provided start offset and HEAD
* for each partition.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.beam;

import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.ExtractStatus;
import com.google.cloud.pubsublite.internal.TopicStatsClient;
import com.google.cloud.pubsublite.internal.TopicStatsClientSettings;
import com.google.cloud.pubsublite.proto.TopicStatsServiceGrpc.TopicStatsServiceBlockingStub;
import com.google.common.base.Optional;
import io.grpc.StatusException;
import java.io.Serializable;
import java.util.concurrent.ExecutionException;

@AutoValue
public abstract class TopicBacklogReaderSettings implements Serializable {
/**
* The topic path for this backlog reader. Either topicPath or subscriptionPath must be set. If
* both are set, subscriptionPath will be ignored.
*/
abstract TopicPath topicPath();

// Optional parameters
abstract Optional<SerializableSupplier<TopicStatsServiceBlockingStub>> stub();

public static Builder newBuilder() {
return new AutoValue_TopicBacklogReaderSettings.Builder();
}

@AutoValue.Builder
public abstract static class Builder {
// Required parameters.
public abstract Builder setTopicPath(TopicPath topicPath);

public Builder setTopicPathFromSubscriptionPath(SubscriptionPath subscriptionPath)
throws StatusException {
try (AdminClient adminClient =
AdminClient.create(
AdminClientSettings.newBuilder()
.setRegion(subscriptionPath.location().region())
.build())) {
setTopicPath(
TopicPath.parse(adminClient.getSubscription(subscriptionPath).get().getTopic()));
return this;
} catch (ExecutionException e) {
throw ExtractStatus.toCanonical(e.getCause());
} catch (Throwable t) {
throw ExtractStatus.toCanonical(t);
}
}

public abstract Builder setStub(SerializableSupplier<TopicStatsServiceBlockingStub> stub);

public abstract TopicBacklogReaderSettings build();
}

TopicBacklogReader instantiate() throws StatusException {
TopicStatsClientSettings.Builder builder = TopicStatsClientSettings.newBuilder();
if (stub().isPresent()) {
builder.setStub(stub().get().get());
}
builder.setRegion(topicPath().location().region());
return new TopicBacklogReaderImpl(TopicStatsClient.create(builder.build()), topicPath());
}
}

0 comments on commit 0ee60eb

Please sign in to comment.