Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions google-cloud-pubsublite/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,21 @@
<groupId>org.codehaus.mojo</groupId>
<artifactId>flatten-maven-plugin</artifactId>
</plugin>
<plugin>
<!--TODO: Remove before GA. -->
<groupId>org.codehaus.mojo</groupId>
<artifactId>clirr-maven-plugin</artifactId>
<configuration>
<failOnError>false</failOnError>
</configuration>
<executions>
<execution>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,20 @@
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.SubscriptionPaths;
import com.google.cloud.pubsublite.cloudpubsub.internal.AckSetTrackerImpl;
import com.google.cloud.pubsublite.cloudpubsub.internal.AssigningSubscriber;
import com.google.cloud.pubsublite.cloudpubsub.internal.MultiPartitionSubscriber;
import com.google.cloud.pubsublite.cloudpubsub.internal.PartitionSubscriberFactory;
import com.google.cloud.pubsublite.cloudpubsub.internal.SinglePartitionSubscriber;
import com.google.cloud.pubsublite.internal.Preconditions;
import com.google.cloud.pubsublite.internal.wire.AssignerBuilder;
import com.google.cloud.pubsublite.internal.wire.AssignerFactory;
import com.google.cloud.pubsublite.internal.wire.CommitterBuilder;
import com.google.cloud.pubsublite.internal.wire.PubsubContext;
import com.google.cloud.pubsublite.internal.wire.PubsubContext.Framework;
import com.google.cloud.pubsublite.internal.wire.SubscriberBuilder;
import com.google.cloud.pubsublite.proto.CursorServiceGrpc;
import com.google.cloud.pubsublite.proto.PartitionAssignmentServiceGrpc.PartitionAssignmentServiceStub;
import com.google.cloud.pubsublite.proto.SubscriberServiceGrpc;
import com.google.common.collect.ImmutableList;
import com.google.pubsub.v1.PubsubMessage;
import io.grpc.StatusException;
import java.util.ArrayList;
Expand All @@ -51,17 +55,21 @@ public abstract class SubscriberSettings {

abstract SubscriptionPath subscriptionPath();

abstract ImmutableList<Partition> partitions();

abstract FlowControlSettings perPartitionFlowControlSettings();

// Optional parameters.

// If set, disables auto-assignment.
abstract Optional<List<Partition>> partitions();

abstract Optional<MessageTransformer<SequencedMessage, PubsubMessage>> transformer();

abstract Optional<SubscriberServiceGrpc.SubscriberServiceStub> subscriberServiceStub();

abstract Optional<CursorServiceGrpc.CursorServiceStub> cursorServiceStub();

abstract Optional<PartitionAssignmentServiceStub> assignmentServiceStub();

abstract Optional<NackHandler> nackHandler();

public static Builder newBuilder() {
Expand All @@ -76,11 +84,12 @@ public abstract static class Builder {

public abstract Builder setSubscriptionPath(SubscriptionPath path);

public abstract Builder setPartitions(List<Partition> partition);

public abstract Builder setPerPartitionFlowControlSettings(FlowControlSettings settings);

// Optional parameters.
/** If set, disables auto-assignment. */
public abstract Builder setPartitions(List<Partition> partition);

public abstract Builder setTransformer(
MessageTransformer<SequencedMessage, PubsubMessage> transformer);

Expand All @@ -89,14 +98,17 @@ public abstract Builder setSubscriberServiceStub(

public abstract Builder setCursorServiceStub(CursorServiceGrpc.CursorServiceStub stub);

public abstract Builder setAssignmentServiceStub(PartitionAssignmentServiceStub stub);

public abstract Builder setNackHandler(NackHandler nackHandler);

abstract SubscriberSettings autoBuild();

public SubscriberSettings build() throws StatusException {
SubscriberSettings settings = autoBuild();
Preconditions.checkArgument(
!settings.partitions().isEmpty(), "Must provide at least one partition.");
!settings.partitions().isPresent() || !settings.partitions().get().isEmpty(),
"Must provide at least one partition if setting partitions explicitly.");
SubscriptionPaths.check(settings.subscriptionPath());
return settings;
}
Expand All @@ -113,18 +125,36 @@ Subscriber instantiate() throws StatusException {
wireCommitterBuilder.setSubscriptionPath(subscriptionPath());
cursorServiceStub().ifPresent(wireCommitterBuilder::setCursorStub);

List<Subscriber> perPartitionSubscribers = new ArrayList<>();
for (Partition partition : partitions()) {
wireSubscriberBuilder.setPartition(partition);
wireCommitterBuilder.setPartition(partition);
perPartitionSubscribers.add(
new SinglePartitionSubscriber(
PartitionSubscriberFactory partitionSubscriberFactory =
partition -> {
wireSubscriberBuilder.setPartition(partition);
wireCommitterBuilder.setPartition(partition);
return new SinglePartitionSubscriber(
receiver(),
transformer().orElse(MessageTransforms.toCpsSubscribeTransformer()),
new AckSetTrackerImpl(wireCommitterBuilder.build()),
nackHandler().orElse(new NackHandler() {}),
messageConsumer -> wireSubscriberBuilder.setMessageConsumer(messageConsumer).build(),
perPartitionFlowControlSettings()));
perPartitionFlowControlSettings());
};

if (!partitions().isPresent()) {
AssignerBuilder.Builder assignerBuilder = AssignerBuilder.newBuilder();
assignerBuilder.setSubscriptionPath(subscriptionPath());
assignmentServiceStub().ifPresent(assignerBuilder::setAssignmentStub);
AssignerFactory assignerFactory =
receiver -> {
assignerBuilder.setReceiver(receiver);
return assignerBuilder.build();
};
return new AssigningSubscriber(partitionSubscriberFactory, assignerFactory);
}

List<Subscriber> perPartitionSubscribers = new ArrayList<>();
for (Partition partition : partitions().get()) {
wireSubscriberBuilder.setPartition(partition);
wireCommitterBuilder.setPartition(partition);
perPartitionSubscribers.add(partitionSubscriberFactory.New(partition));
}
return MultiPartitionSubscriber.of(perPartitionSubscribers);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.internal.wire;

import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.Endpoints;
import com.google.cloud.pubsublite.Stubs;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.SubscriptionPaths;
import com.google.cloud.pubsublite.proto.InitialPartitionAssignmentRequest;
import com.google.cloud.pubsublite.proto.PartitionAssignmentServiceGrpc;
import com.google.cloud.pubsublite.proto.PartitionAssignmentServiceGrpc.PartitionAssignmentServiceStub;
import com.google.common.flogger.GoogleLogger;
import com.google.protobuf.ByteString;
import io.grpc.Status;
import io.grpc.StatusException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Optional;
import java.util.UUID;

@AutoValue
public abstract class AssignerBuilder {
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();
// Required parameters.
abstract SubscriptionPath subscriptionPath();

abstract PartitionAssignmentReceiver receiver();

// Optional parameters.
abstract Optional<PartitionAssignmentServiceStub> assignmentStub();

public static Builder newBuilder() {
return new AutoValue_AssignerBuilder.Builder();
}

@AutoValue.Builder
public abstract static class Builder {
// Required parameters.
public abstract Builder setSubscriptionPath(SubscriptionPath path);

public abstract Builder setReceiver(PartitionAssignmentReceiver receiver);

// Optional parameters.
public abstract Builder setAssignmentStub(PartitionAssignmentServiceStub stub);

abstract AssignerBuilder autoBuild();

@SuppressWarnings("CheckReturnValue")
public Assigner build() throws StatusException {
AssignerBuilder builder = autoBuild();
SubscriptionPaths.check(builder.subscriptionPath());

PartitionAssignmentServiceStub stub;
if (builder.assignmentStub().isPresent()) {
stub = builder.assignmentStub().get();
} else {
try {
stub =
Stubs.defaultStub(
Endpoints.regionalEndpoint(
SubscriptionPaths.getZone(builder.subscriptionPath()).region()),
PartitionAssignmentServiceGrpc::newStub);
} catch (IOException e) {
throw Status.INTERNAL
.withCause(e)
.withDescription("Creating assigner stub failed.")
.asException();
}
}

UUID uuid = UUID.randomUUID();
ByteBuffer uuidBuffer = ByteBuffer.allocate(16);
uuidBuffer.putLong(uuid.getMostSignificantBits());
uuidBuffer.putLong(uuid.getLeastSignificantBits());
logger.atInfo().log(
"Subscription %s using UUID %s for assignment.",
builder.subscriptionPath().value(), uuid);

InitialPartitionAssignmentRequest initial =
InitialPartitionAssignmentRequest.newBuilder()
.setSubscription(builder.subscriptionPath().value())
.setClientId(ByteString.copyFrom(uuidBuffer.array()))
.build();
return new AssignerImpl(
stub, new ConnectedAssignerImpl.Factory(), initial, builder.receiver());
}
}
}