Skip to content

Commit

Permalink
feat: Add ComputeHeadCursor RPC for Pub/Sub Lite. (#429)
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 347681363

Source-Author: Google APIs <noreply@google.com>
Source-Date: Tue Dec 15 13:31:04 2020 -0800
Source-Repo: googleapis/googleapis
Source-Sha: f967ea0c0437a269515665ff9dbb69fcf134ddd9
Source-Link: googleapis/googleapis@f967ea0

Co-authored-by: dpcollins-google <40498610+dpcollins-google@users.noreply.github.com>
  • Loading branch information
yoshi-automation and dpcollins-google committed Dec 16, 2020
1 parent 86aecc9 commit 34d8d02
Show file tree
Hide file tree
Showing 28 changed files with 2,182 additions and 216 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import com.google.api.core.BetaApi;
import com.google.api.gax.core.BackgroundResource;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.pubsublite.proto.ComputeHeadCursorRequest;
import com.google.cloud.pubsublite.proto.ComputeHeadCursorResponse;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsRequest;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import com.google.cloud.pubsublite.v1.stub.TopicStatsServiceStub;
Expand Down Expand Up @@ -159,6 +161,34 @@ public final ComputeMessageStatsResponse computeMessageStats(ComputeMessageStats
return stub.computeMessageStatsCallable();
}

// AUTO-GENERATED DOCUMENTATION AND METHOD.
/**
* Compute the head cursor for the partition. The head cursor?s offset is guaranteed to be before
* or equal to all messages which have not yet been acknowledged to be published, and greater than
* the offset of any message whose publish has already been acknowledged. It is 0 if there have
* never been messages on the partition.
*
* @param request The request object containing all of the parameters for the API call.
* @throws com.google.api.gax.rpc.ApiException if the remote call fails
*/
public final ComputeHeadCursorResponse computeHeadCursor(ComputeHeadCursorRequest request) {
return computeHeadCursorCallable().call(request);
}

// AUTO-GENERATED DOCUMENTATION AND METHOD.
/**
* Compute the head cursor for the partition. The head cursor?s offset is guaranteed to be before
* or equal to all messages which have not yet been acknowledged to be published, and greater than
* the offset of any message whose publish has already been acknowledged. It is 0 if there have
* never been messages on the partition.
*
* <p>Sample code:
*/
public final UnaryCallable<ComputeHeadCursorRequest, ComputeHeadCursorResponse>
computeHeadCursorCallable() {
return stub.computeHeadCursorCallable();
}

@Override
public final void close() {
stub.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import com.google.api.gax.rpc.ClientSettings;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.cloud.pubsublite.proto.ComputeHeadCursorRequest;
import com.google.cloud.pubsublite.proto.ComputeHeadCursorResponse;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsRequest;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import com.google.cloud.pubsublite.v1.stub.TopicStatsServiceStubSettings;
Expand Down Expand Up @@ -74,6 +76,12 @@ public class TopicStatsServiceSettings extends ClientSettings<TopicStatsServiceS
return ((TopicStatsServiceStubSettings) getStubSettings()).computeMessageStatsSettings();
}

/** Returns the object with the settings used for calls to computeHeadCursor. */
public UnaryCallSettings<ComputeHeadCursorRequest, ComputeHeadCursorResponse>
computeHeadCursorSettings() {
return ((TopicStatsServiceStubSettings) getStubSettings()).computeHeadCursorSettings();
}

public static final TopicStatsServiceSettings create(TopicStatsServiceStubSettings stub)
throws IOException {
return new TopicStatsServiceSettings.Builder(stub.toBuilder()).build();
Expand Down Expand Up @@ -178,6 +186,12 @@ public Builder applyToAllUnaryMethods(
return getStubSettingsBuilder().computeMessageStatsSettings();
}

/** Returns the builder for the settings used for calls to computeHeadCursor. */
public UnaryCallSettings.Builder<ComputeHeadCursorRequest, ComputeHeadCursorResponse>
computeHeadCursorSettings() {
return getStubSettingsBuilder().computeHeadCursorSettings();
}

@Override
public TopicStatsServiceSettings build() throws IOException {
return new TopicStatsServiceSettings(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.google.api.gax.rpc.ClientContext;
import com.google.api.gax.rpc.RequestParamsExtractor;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.pubsublite.proto.ComputeHeadCursorRequest;
import com.google.cloud.pubsublite.proto.ComputeHeadCursorResponse;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsRequest;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -53,8 +55,21 @@ public class GrpcTopicStatsServiceStub extends TopicStatsServiceStub {
ProtoUtils.marshaller(ComputeMessageStatsResponse.getDefaultInstance()))
.build();

private static final MethodDescriptor<ComputeHeadCursorRequest, ComputeHeadCursorResponse>
computeHeadCursorMethodDescriptor =
MethodDescriptor.<ComputeHeadCursorRequest, ComputeHeadCursorResponse>newBuilder()
.setType(MethodDescriptor.MethodType.UNARY)
.setFullMethodName("google.cloud.pubsublite.v1.TopicStatsService/ComputeHeadCursor")
.setRequestMarshaller(
ProtoUtils.marshaller(ComputeHeadCursorRequest.getDefaultInstance()))
.setResponseMarshaller(
ProtoUtils.marshaller(ComputeHeadCursorResponse.getDefaultInstance()))
.build();

private final UnaryCallable<ComputeMessageStatsRequest, ComputeMessageStatsResponse>
computeMessageStatsCallable;
private final UnaryCallable<ComputeHeadCursorRequest, ComputeHeadCursorResponse>
computeHeadCursorCallable;

private final BackgroundResource backgroundResources;
private final GrpcOperationsStub operationsStub;
Expand Down Expand Up @@ -114,12 +129,31 @@ public Map<String, String> extract(ComputeMessageStatsRequest request) {
}
})
.build();
GrpcCallSettings<ComputeHeadCursorRequest, ComputeHeadCursorResponse>
computeHeadCursorTransportSettings =
GrpcCallSettings.<ComputeHeadCursorRequest, ComputeHeadCursorResponse>newBuilder()
.setMethodDescriptor(computeHeadCursorMethodDescriptor)
.setParamsExtractor(
new RequestParamsExtractor<ComputeHeadCursorRequest>() {
@Override
public Map<String, String> extract(ComputeHeadCursorRequest request) {
ImmutableMap.Builder<String, String> params = ImmutableMap.builder();
params.put("topic", String.valueOf(request.getTopic()));
return params.build();
}
})
.build();

this.computeMessageStatsCallable =
callableFactory.createUnaryCallable(
computeMessageStatsTransportSettings,
settings.computeMessageStatsSettings(),
clientContext);
this.computeHeadCursorCallable =
callableFactory.createUnaryCallable(
computeHeadCursorTransportSettings,
settings.computeHeadCursorSettings(),
clientContext);

this.backgroundResources =
new BackgroundResourceAggregation(clientContext.getBackgroundResources());
Expand All @@ -134,6 +168,11 @@ public GrpcOperationsStub getOperationsStub() {
return computeMessageStatsCallable;
}

public UnaryCallable<ComputeHeadCursorRequest, ComputeHeadCursorResponse>
computeHeadCursorCallable() {
return computeHeadCursorCallable;
}

@Override
public final void close() {
shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

import com.google.api.gax.core.BackgroundResource;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.pubsublite.proto.ComputeHeadCursorRequest;
import com.google.cloud.pubsublite.proto.ComputeHeadCursorResponse;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsRequest;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import javax.annotation.Generated;
Expand All @@ -36,6 +38,11 @@ public abstract class TopicStatsServiceStub implements BackgroundResource {
throw new UnsupportedOperationException("Not implemented: computeMessageStatsCallable()");
}

public UnaryCallable<ComputeHeadCursorRequest, ComputeHeadCursorResponse>
computeHeadCursorCallable() {
throw new UnsupportedOperationException("Not implemented: computeHeadCursorCallable()");
}

@Override
public abstract void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import com.google.api.gax.rpc.StubSettings;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.api.gax.rpc.UnaryCallSettings;
import com.google.cloud.pubsublite.proto.ComputeHeadCursorRequest;
import com.google.cloud.pubsublite.proto.ComputeHeadCursorResponse;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsRequest;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -84,13 +86,21 @@ public class TopicStatsServiceStubSettings extends StubSettings<TopicStatsServic

private final UnaryCallSettings<ComputeMessageStatsRequest, ComputeMessageStatsResponse>
computeMessageStatsSettings;
private final UnaryCallSettings<ComputeHeadCursorRequest, ComputeHeadCursorResponse>
computeHeadCursorSettings;

/** Returns the object with the settings used for calls to computeMessageStats. */
public UnaryCallSettings<ComputeMessageStatsRequest, ComputeMessageStatsResponse>
computeMessageStatsSettings() {
return computeMessageStatsSettings;
}

/** Returns the object with the settings used for calls to computeHeadCursor. */
public UnaryCallSettings<ComputeHeadCursorRequest, ComputeHeadCursorResponse>
computeHeadCursorSettings() {
return computeHeadCursorSettings;
}

@BetaApi("A restructuring of stub classes is planned, so this may break in the future")
public TopicStatsServiceStub createStub() throws IOException {
if (getTransportChannelProvider()
Expand Down Expand Up @@ -161,13 +171,16 @@ protected TopicStatsServiceStubSettings(Builder settingsBuilder) throws IOExcept
super(settingsBuilder);

computeMessageStatsSettings = settingsBuilder.computeMessageStatsSettings().build();
computeHeadCursorSettings = settingsBuilder.computeHeadCursorSettings().build();
}

/** Builder for TopicStatsServiceStubSettings. */
public static class Builder extends StubSettings.Builder<TopicStatsServiceStubSettings, Builder> {
private final ImmutableList<UnaryCallSettings.Builder<?, ?>> unaryMethodSettingsBuilders;
private final UnaryCallSettings.Builder<ComputeMessageStatsRequest, ComputeMessageStatsResponse>
computeMessageStatsSettings;
private final UnaryCallSettings.Builder<ComputeHeadCursorRequest, ComputeHeadCursorResponse>
computeHeadCursorSettings;
private static final ImmutableMap<String, ImmutableSet<StatusCode.Code>>
RETRYABLE_CODE_DEFINITIONS;

Expand All @@ -183,6 +196,7 @@ public static class Builder extends StubSettings.Builder<TopicStatsServiceStubSe
StatusCode.Code.ABORTED,
StatusCode.Code.INTERNAL,
StatusCode.Code.UNKNOWN)));
definitions.put("no_retry_codes", ImmutableSet.copyOf(Lists.<StatusCode.Code>newArrayList()));
RETRYABLE_CODE_DEFINITIONS = definitions.build();
}

Expand All @@ -202,6 +216,8 @@ public static class Builder extends StubSettings.Builder<TopicStatsServiceStubSe
.setTotalTimeout(Duration.ofMillis(600000L))
.build();
definitions.put("retry_policy_0_params", settings);
settings = RetrySettings.newBuilder().setRpcTimeoutMultiplier(1.0).build();
definitions.put("no_retry_params", settings);
RETRY_PARAM_DEFINITIONS = definitions.build();
}

Expand All @@ -213,19 +229,23 @@ protected Builder(ClientContext clientContext) {
super(clientContext);

computeMessageStatsSettings = UnaryCallSettings.newUnaryCallSettingsBuilder();
computeHeadCursorSettings = UnaryCallSettings.newUnaryCallSettingsBuilder();

unaryMethodSettingsBuilders =
ImmutableList.<UnaryCallSettings.Builder<?, ?>>of(computeMessageStatsSettings);
ImmutableList.<UnaryCallSettings.Builder<?, ?>>of(
computeMessageStatsSettings, computeHeadCursorSettings);
initDefaults(this);
}

protected Builder(TopicStatsServiceStubSettings settings) {
super(settings);

computeMessageStatsSettings = settings.computeMessageStatsSettings.toBuilder();
computeHeadCursorSettings = settings.computeHeadCursorSettings.toBuilder();

unaryMethodSettingsBuilders =
ImmutableList.<UnaryCallSettings.Builder<?, ?>>of(computeMessageStatsSettings);
ImmutableList.<UnaryCallSettings.Builder<?, ?>>of(
computeMessageStatsSettings, computeHeadCursorSettings);
}

private static Builder createDefault() {
Expand All @@ -245,6 +265,11 @@ private static Builder initDefaults(Builder builder) {
.setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("retry_policy_0_codes"))
.setRetrySettings(RETRY_PARAM_DEFINITIONS.get("retry_policy_0_params"));

builder
.computeHeadCursorSettings()
.setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("no_retry_codes"))
.setRetrySettings(RETRY_PARAM_DEFINITIONS.get("no_retry_params"));

return builder;
}

Expand All @@ -270,6 +295,12 @@ public Builder applyToAllUnaryMethods(
return computeMessageStatsSettings;
}

/** Returns the builder for the settings used for calls to computeHeadCursor. */
public UnaryCallSettings.Builder<ComputeHeadCursorRequest, ComputeHeadCursorResponse>
computeHeadCursorSettings() {
return computeHeadCursorSettings;
}

@Override
public TopicStatsServiceStubSettings build() throws IOException {
return new TopicStatsServiceStubSettings(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package com.google.cloud.pubsublite.v1;

import com.google.api.core.BetaApi;
import com.google.cloud.pubsublite.proto.ComputeHeadCursorRequest;
import com.google.cloud.pubsublite.proto.ComputeHeadCursorResponse;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsRequest;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import com.google.cloud.pubsublite.proto.TopicStatsServiceGrpc.TopicStatsServiceImplBase;
Expand Down Expand Up @@ -75,4 +77,20 @@ public void computeMessageStats(
responseObserver.onError(new IllegalArgumentException("Unrecognized response type"));
}
}

@Override
public void computeHeadCursor(
ComputeHeadCursorRequest request,
StreamObserver<ComputeHeadCursorResponse> responseObserver) {
Object response = responses.remove();
if (response instanceof ComputeHeadCursorResponse) {
requests.add(request);
responseObserver.onNext(((ComputeHeadCursorResponse) response));
responseObserver.onCompleted();
} else if (response instanceof Exception) {
responseObserver.onError(((Exception) response));
} else {
responseObserver.onError(new IllegalArgumentException("Unrecognized response type"));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.api.gax.rpc.ApiClientHeaderProvider;
import com.google.api.gax.rpc.InvalidArgumentException;
import com.google.cloud.pubsublite.proto.ComputeHeadCursorRequest;
import com.google.cloud.pubsublite.proto.ComputeHeadCursorResponse;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsRequest;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import com.google.cloud.pubsublite.proto.Cursor;
Expand Down Expand Up @@ -135,4 +137,49 @@ public void computeMessageStatsExceptionTest() throws Exception {
// Expected exception.
}
}

@Test
public void computeHeadCursorTest() throws Exception {
ComputeHeadCursorResponse expectedResponse =
ComputeHeadCursorResponse.newBuilder().setHeadCursor(Cursor.newBuilder().build()).build();
mockTopicStatsService.addResponse(expectedResponse);

ComputeHeadCursorRequest request =
ComputeHeadCursorRequest.newBuilder()
.setTopic(TopicName.of("[PROJECT]", "[LOCATION]", "[TOPIC]").toString())
.setPartition(-1799810326)
.build();

ComputeHeadCursorResponse actualResponse = client.computeHeadCursor(request);
Assert.assertEquals(expectedResponse, actualResponse);

List<AbstractMessage> actualRequests = mockTopicStatsService.getRequests();
Assert.assertEquals(1, actualRequests.size());
ComputeHeadCursorRequest actualRequest = ((ComputeHeadCursorRequest) actualRequests.get(0));

Assert.assertEquals(request.getTopic(), actualRequest.getTopic());
Assert.assertEquals(request.getPartition(), actualRequest.getPartition());
Assert.assertTrue(
channelProvider.isHeaderSent(
ApiClientHeaderProvider.getDefaultApiClientHeaderKey(),
GaxGrpcProperties.getDefaultApiClientHeaderPattern()));
}

@Test
public void computeHeadCursorExceptionTest() throws Exception {
StatusRuntimeException exception = new StatusRuntimeException(io.grpc.Status.INVALID_ARGUMENT);
mockTopicStatsService.addException(exception);

try {
ComputeHeadCursorRequest request =
ComputeHeadCursorRequest.newBuilder()
.setTopic(TopicName.of("[PROJECT]", "[LOCATION]", "[TOPIC]").toString())
.setPartition(-1799810326)
.build();
client.computeHeadCursor(request);
Assert.fail("No exception raised");
} catch (InvalidArgumentException e) {
// Expected exception.
}
}
}
Loading

0 comments on commit 34d8d02

Please sign in to comment.