Skip to content

Commit

Permalink
feat: Batch commit requests (#883)
Browse files Browse the repository at this point in the history
also merge TrivialProxyService into ProxyService
  • Loading branch information
dpcollins-google committed Sep 15, 2021
1 parent b24bcb4 commit 5abd97d
Show file tree
Hide file tree
Showing 18 changed files with 272 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.cloudpubsub.Subscriber;
import com.google.cloud.pubsublite.internal.TrivialProxyService;
import com.google.cloud.pubsublite.internal.ProxyService;
import java.util.List;

// A MultiPartitionSubscriber wraps multiple subscribers into a single ApiService that can be
// interacted with. If any single subscriber fails, all others are stopped.
public class MultiPartitionSubscriber extends TrivialProxyService implements Subscriber {
public class MultiPartitionSubscriber extends ProxyService implements Subscriber {
public static Subscriber of(List<Subscriber> subscribers) throws ApiException {
return new MultiPartitionSubscriber(subscribers);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,13 @@
import com.google.cloud.pubsublite.MessageTransformer;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.TrivialProxyService;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;
import com.google.pubsub.v1.PubsubMessage;

// A WrappingPublisher wraps the wire protocol client with a Cloud Pub/Sub api compliant
// publisher. It encodes a MessageMetadata object in the response string.
public class WrappingPublisher extends TrivialProxyService implements Publisher {
public class WrappingPublisher extends ProxyService implements Publisher {
private final com.google.cloud.pubsublite.internal.Publisher<MessageMetadata> wirePublisher;
private final MessageTransformer<PubsubMessage, Message> transformer;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.internal;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutureCallback;
import com.google.api.core.ApiFutures;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.pubsublite.internal.wire.SystemExecutors;

public final class MoreApiFutures {
private MoreApiFutures() {}

public static <T> void connectFutures(
ApiFuture<T> source, SettableApiFuture<? super T> toConnect) {
ApiFutures.addCallback(
source,
new ApiFutureCallback<T>() {
@Override
public void onFailure(Throwable throwable) {
toConnect.setException(throwable);
}

@Override
public void onSuccess(T t) {
toConnect.set(t);
}
},
SystemExecutors.getFuturesExecutor());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.flogger.GoogleLogger;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
Expand All @@ -41,7 +42,13 @@ public abstract class ProxyService extends AbstractApiService {
private final List<ApiService> services = new ArrayList<>();
private final AtomicBoolean stoppedOrFailed = new AtomicBoolean(false);

protected ProxyService() {}
protected <T extends ApiService> ProxyService(Collection<T> services) {
addServices(services);
}

protected ProxyService(ApiService... services) throws ApiException {
this(Arrays.asList(services));
}

// Add a new ApiServices to this. Requires that all of them are in state NEW and this is in state
// NEW.
Expand All @@ -59,13 +66,13 @@ protected final void addServices(ApiService... services) throws ApiException {
}

// Method to be called on service start after dependent services start.
protected abstract void start() throws CheckedApiException;
protected void start() throws CheckedApiException {}
// Method to be called on service stop before dependent services stop.
protected abstract void stop() throws CheckedApiException;
protected void stop() throws CheckedApiException {}

// Method to be called for class-specific permanent error handling after trying to stop all other
// services. May not throw.
protected abstract void handlePermanentError(CheckedApiException error);
protected void handlePermanentError(CheckedApiException error) {}

// Tries to stop all dependent services and sets this service into the FAILED state.
protected final void onPermanentError(CheckedApiException error) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.TrivialProxyService;
import com.google.cloud.pubsublite.internal.ProxyService;

class ApiExceptionCommitter extends TrivialProxyService implements Committer {
class ApiExceptionCommitter extends ProxyService implements Committer {
private final Committer committer;

ApiExceptionCommitter(Committer committer) throws ApiException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
import com.google.api.core.ApiFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.internal.TrivialProxyService;
import java.io.IOException;

public class ApiExceptionPublisher<T> extends TrivialProxyService implements Publisher<T> {
public class ApiExceptionPublisher<T> extends ProxyService implements Publisher<T> {
private final Publisher<T> publisher;

ApiExceptionPublisher(Publisher<T> publisher) throws ApiException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import com.google.api.core.AbstractApiService;
import com.google.api.core.ApiService;
import com.google.api.gax.core.BackgroundResource;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.common.collect.ImmutableList;
Expand All @@ -31,7 +30,7 @@ public class ApiServiceUtils {

private ApiServiceUtils() {}

public static ApiService backgroundResourceAsApiService(BackgroundResource resource) {
public static ApiService autoCloseableAsApiService(AutoCloseable resource) {
return new AbstractApiService() {
@Override
protected void doStart() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,13 @@

package com.google.cloud.pubsublite.internal.wire;

import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.backgroundResourceAsApiService;
import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.autoCloseableAsApiService;

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.internal.TrivialProxyService;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.cloud.pubsublite.proto.InitialPartitionAssignmentRequest;
import com.google.cloud.pubsublite.proto.PartitionAssignment;
import com.google.cloud.pubsublite.proto.PartitionAssignmentRequest;
Expand All @@ -33,7 +33,7 @@
import java.util.HashSet;
import java.util.Set;

public class AssignerImpl extends TrivialProxyService
public class AssignerImpl extends ProxyService
implements Assigner, RetryingConnectionObserver<PartitionAssignment> {
private static final GoogleLogger logger = GoogleLogger.forEnclosingClass();

Expand Down Expand Up @@ -72,7 +72,7 @@ public AssignerImpl(
new ConnectedAssignerImpl.Factory(),
initialRequest,
receiver);
addServices(backgroundResourceAsApiService(client));
addServices(autoCloseableAsApiService(client));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
/*
* Copyright 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.internal.wire;

import com.google.api.core.ApiFuture;
import com.google.api.core.SettableApiFuture;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.internal.AlarmFactory;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.MoreApiFutures;
import com.google.cloud.pubsublite.internal.ProxyService;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.Optional;
import java.util.concurrent.Future;

public class BatchingCommitter extends ProxyService implements Committer {
private final Committer underlying;

@GuardedBy("this")
private SettableApiFuture<Void> currentFuture = SettableApiFuture.create();

@GuardedBy("this")
private Optional<Offset> currentOffset = Optional.empty();

BatchingCommitter(Committer underlying, AlarmFactory alarmFactory) {
super(underlying);
this.underlying = underlying;
Future<?> alarm = alarmFactory.newAlarm(this::flush);
addServices(ApiServiceUtils.autoCloseableAsApiService(() -> alarm.cancel(false)));
}

@Override
public synchronized ApiFuture<Void> commitOffset(Offset offset) {
currentOffset = Optional.of(offset);
return currentFuture;
}

@Override
public void waitUntilEmpty() throws CheckedApiException {
flush();
underlying.waitUntilEmpty();
}

@Override
protected void stop() {
flush();
}

private synchronized void flush() {
if (!currentOffset.isPresent()) {
return;
}
ApiFuture<Void> underlyingFuture = underlying.commitOffset(currentOffset.get());
MoreApiFutures.connectFutures(underlyingFuture, currentFuture);
currentOffset = Optional.empty();
currentFuture = SettableApiFuture.create();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.google.cloud.pubsublite.internal.wire;

import static com.google.cloud.pubsublite.internal.CheckedApiPreconditions.checkState;
import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.backgroundResourceAsApiService;
import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.autoCloseableAsApiService;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
Expand Down Expand Up @@ -81,7 +81,7 @@ public CommitterImpl(CursorServiceClient client, InitialCommitCursorRequest requ
stream -> client.streamingCommitCursorCallable().splitCall(stream),
new ConnectedCommitterImpl.Factory(),
request);
addServices(backgroundResourceAsApiService(client));
addServices(autoCloseableAsApiService(client));
}

// ProxyService implementation.
Expand All @@ -94,9 +94,6 @@ protected void handlePermanentError(CheckedApiException error) {
}
}

@Override
protected void start() {}

@Override
protected void stop() {
try (CloseableMonitor.Hold h = monitor.enter()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
import com.google.auto.value.AutoValue;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.internal.AlarmFactory;
import com.google.cloud.pubsublite.proto.InitialCommitCursorRequest;
import com.google.cloud.pubsublite.v1.CursorServiceClient;
import java.time.Duration;

@AutoValue
public abstract class CommitterSettings {
Expand Down Expand Up @@ -54,6 +56,8 @@ public Committer instantiate() {
.setPartition(partition().value())
.build();
return new ApiExceptionCommitter(
new CommitterImpl(serviceClient(), initialCommitCursorRequest));
new BatchingCommitter(
new CommitterImpl(serviceClient(), initialCommitCursorRequest),
AlarmFactory.create(Duration.ofMillis(50))));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,6 @@ private void handleConfig(long partitionCount) {
}
}

@Override
protected void start() {}

@Override
protected void stop() {
Optional<PartitionsWithRouting> current;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
package com.google.cloud.pubsublite.internal.wire;

import static com.google.cloud.pubsublite.internal.CheckedApiPreconditions.checkState;
import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.backgroundResourceAsApiService;
import static com.google.cloud.pubsublite.internal.wire.ApiServiceUtils.autoCloseableAsApiService;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
Expand Down Expand Up @@ -136,7 +136,7 @@ public PublisherImpl(
Objects.requireNonNull(batchingSettings.getDelayThreshold()).toNanos())),
initialRequest,
batchingSettings);
addServices(backgroundResourceAsApiService(client));
addServices(autoCloseableAsApiService(client));
}

@GuardedBy("monitor.monitor")
Expand Down
Loading

0 comments on commit 5abd97d

Please sign in to comment.