Skip to content

Commit

Permalink
feat: ComputeTimeCursor RPC for Pub/Sub Lite (#615)
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 370536460

Source-Author: Google APIs <noreply@google.com>
Source-Date: Mon Apr 26 14:02:22 2021 -0700
Source-Repo: googleapis/googleapis
Source-Sha: ae5fb2884a296832c39867e8e8c81bbc72a32ce8
Source-Link: googleapis/googleapis@ae5fb28
  • Loading branch information
yoshi-automation committed Apr 26, 2021
1 parent f5e9cb9 commit f74b73c
Show file tree
Hide file tree
Showing 24 changed files with 3,973 additions and 203 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import com.google.cloud.pubsublite.proto.ComputeHeadCursorResponse;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsRequest;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import com.google.cloud.pubsublite.proto.ComputeTimeCursorRequest;
import com.google.cloud.pubsublite.proto.ComputeTimeCursorResponse;
import com.google.cloud.pubsublite.v1.stub.TopicStatsServiceStub;
import com.google.cloud.pubsublite.v1.stub.TopicStatsServiceStubSettings;
import java.io.IOException;
Expand Down Expand Up @@ -259,6 +261,57 @@ public final ComputeHeadCursorResponse computeHeadCursor(ComputeHeadCursorReques
return stub.computeHeadCursorCallable();
}

// AUTO-GENERATED DOCUMENTATION AND METHOD.
/**
* Compute the corresponding cursor for a publish or event time in a topic partition.
*
* <p>Sample code:
*
* <pre>{@code
* try (TopicStatsServiceClient topicStatsServiceClient = TopicStatsServiceClient.create()) {
* ComputeTimeCursorRequest request =
* ComputeTimeCursorRequest.newBuilder()
* .setTopic(TopicName.of("[PROJECT]", "[LOCATION]", "[TOPIC]").toString())
* .setPartition(-1799810326)
* .setTarget(TimeTarget.newBuilder().build())
* .build();
* ComputeTimeCursorResponse response = topicStatsServiceClient.computeTimeCursor(request);
* }
* }</pre>
*
* @param request The request object containing all of the parameters for the API call.
* @throws com.google.api.gax.rpc.ApiException if the remote call fails
*/
public final ComputeTimeCursorResponse computeTimeCursor(ComputeTimeCursorRequest request) {
return computeTimeCursorCallable().call(request);
}

// AUTO-GENERATED DOCUMENTATION AND METHOD.
/**
* Compute the corresponding cursor for a publish or event time in a topic partition.
*
* <p>Sample code:
*
* <pre>{@code
* try (TopicStatsServiceClient topicStatsServiceClient = TopicStatsServiceClient.create()) {
* ComputeTimeCursorRequest request =
* ComputeTimeCursorRequest.newBuilder()
* .setTopic(TopicName.of("[PROJECT]", "[LOCATION]", "[TOPIC]").toString())
* .setPartition(-1799810326)
* .setTarget(TimeTarget.newBuilder().build())
* .build();
* ApiFuture<ComputeTimeCursorResponse> future =
* topicStatsServiceClient.computeTimeCursorCallable().futureCall(request);
* // Do something.
* ComputeTimeCursorResponse response = future.get();
* }
* }</pre>
*/
public final UnaryCallable<ComputeTimeCursorRequest, ComputeTimeCursorResponse>
computeTimeCursorCallable() {
return stub.computeTimeCursorCallable();
}

@Override
public final void close() {
stub.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import com.google.cloud.pubsublite.proto.ComputeHeadCursorResponse;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsRequest;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import com.google.cloud.pubsublite.proto.ComputeTimeCursorRequest;
import com.google.cloud.pubsublite.proto.ComputeTimeCursorResponse;
import com.google.cloud.pubsublite.v1.stub.TopicStatsServiceStubSettings;
import java.io.IOException;
import java.util.List;
Expand Down Expand Up @@ -82,6 +84,12 @@ public class TopicStatsServiceSettings extends ClientSettings<TopicStatsServiceS
return ((TopicStatsServiceStubSettings) getStubSettings()).computeHeadCursorSettings();
}

/** Returns the object with the settings used for calls to computeTimeCursor. */
public UnaryCallSettings<ComputeTimeCursorRequest, ComputeTimeCursorResponse>
computeTimeCursorSettings() {
return ((TopicStatsServiceStubSettings) getStubSettings()).computeTimeCursorSettings();
}

public static final TopicStatsServiceSettings create(TopicStatsServiceStubSettings stub)
throws IOException {
return new TopicStatsServiceSettings.Builder(stub.toBuilder()).build();
Expand Down Expand Up @@ -192,6 +200,12 @@ public Builder applyToAllUnaryMethods(
return getStubSettingsBuilder().computeHeadCursorSettings();
}

/** Returns the builder for the settings used for calls to computeTimeCursor. */
public UnaryCallSettings.Builder<ComputeTimeCursorRequest, ComputeTimeCursorResponse>
computeTimeCursorSettings() {
return getStubSettingsBuilder().computeTimeCursorSettings();
}

@Override
public TopicStatsServiceSettings build() throws IOException {
return new TopicStatsServiceSettings(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,9 @@
},
"ComputeMessageStats": {
"methods": ["computeMessageStats", "computeMessageStatsCallable"]
},
"ComputeTimeCursor": {
"methods": ["computeTimeCursor", "computeTimeCursorCallable"]
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import com.google.cloud.pubsublite.proto.ComputeHeadCursorResponse;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsRequest;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import com.google.cloud.pubsublite.proto.ComputeTimeCursorRequest;
import com.google.cloud.pubsublite.proto.ComputeTimeCursorResponse;
import com.google.common.collect.ImmutableMap;
import com.google.longrunning.stub.GrpcOperationsStub;
import io.grpc.MethodDescriptor;
Expand Down Expand Up @@ -66,10 +68,23 @@ public class GrpcTopicStatsServiceStub extends TopicStatsServiceStub {
ProtoUtils.marshaller(ComputeHeadCursorResponse.getDefaultInstance()))
.build();

private static final MethodDescriptor<ComputeTimeCursorRequest, ComputeTimeCursorResponse>
computeTimeCursorMethodDescriptor =
MethodDescriptor.<ComputeTimeCursorRequest, ComputeTimeCursorResponse>newBuilder()
.setType(MethodDescriptor.MethodType.UNARY)
.setFullMethodName("google.cloud.pubsublite.v1.TopicStatsService/ComputeTimeCursor")
.setRequestMarshaller(
ProtoUtils.marshaller(ComputeTimeCursorRequest.getDefaultInstance()))
.setResponseMarshaller(
ProtoUtils.marshaller(ComputeTimeCursorResponse.getDefaultInstance()))
.build();

private final UnaryCallable<ComputeMessageStatsRequest, ComputeMessageStatsResponse>
computeMessageStatsCallable;
private final UnaryCallable<ComputeHeadCursorRequest, ComputeHeadCursorResponse>
computeHeadCursorCallable;
private final UnaryCallable<ComputeTimeCursorRequest, ComputeTimeCursorResponse>
computeTimeCursorCallable;

private final BackgroundResource backgroundResources;
private final GrpcOperationsStub operationsStub;
Expand Down Expand Up @@ -143,6 +158,20 @@ public Map<String, String> extract(ComputeHeadCursorRequest request) {
}
})
.build();
GrpcCallSettings<ComputeTimeCursorRequest, ComputeTimeCursorResponse>
computeTimeCursorTransportSettings =
GrpcCallSettings.<ComputeTimeCursorRequest, ComputeTimeCursorResponse>newBuilder()
.setMethodDescriptor(computeTimeCursorMethodDescriptor)
.setParamsExtractor(
new RequestParamsExtractor<ComputeTimeCursorRequest>() {
@Override
public Map<String, String> extract(ComputeTimeCursorRequest request) {
ImmutableMap.Builder<String, String> params = ImmutableMap.builder();
params.put("topic", String.valueOf(request.getTopic()));
return params.build();
}
})
.build();

this.computeMessageStatsCallable =
callableFactory.createUnaryCallable(
Expand All @@ -154,6 +183,11 @@ public Map<String, String> extract(ComputeHeadCursorRequest request) {
computeHeadCursorTransportSettings,
settings.computeHeadCursorSettings(),
clientContext);
this.computeTimeCursorCallable =
callableFactory.createUnaryCallable(
computeTimeCursorTransportSettings,
settings.computeTimeCursorSettings(),
clientContext);

this.backgroundResources =
new BackgroundResourceAggregation(clientContext.getBackgroundResources());
Expand All @@ -175,6 +209,12 @@ public GrpcOperationsStub getOperationsStub() {
return computeHeadCursorCallable;
}

@Override
public UnaryCallable<ComputeTimeCursorRequest, ComputeTimeCursorResponse>
computeTimeCursorCallable() {
return computeTimeCursorCallable;
}

@Override
public final void close() {
shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.google.cloud.pubsublite.proto.ComputeHeadCursorResponse;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsRequest;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import com.google.cloud.pubsublite.proto.ComputeTimeCursorRequest;
import com.google.cloud.pubsublite.proto.ComputeTimeCursorResponse;
import javax.annotation.Generated;

// AUTO-GENERATED DOCUMENTATION AND CLASS.
Expand All @@ -43,6 +45,11 @@ public abstract class TopicStatsServiceStub implements BackgroundResource {
throw new UnsupportedOperationException("Not implemented: computeHeadCursorCallable()");
}

public UnaryCallable<ComputeTimeCursorRequest, ComputeTimeCursorResponse>
computeTimeCursorCallable() {
throw new UnsupportedOperationException("Not implemented: computeTimeCursorCallable()");
}

@Override
public abstract void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import com.google.cloud.pubsublite.proto.ComputeHeadCursorResponse;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsRequest;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import com.google.cloud.pubsublite.proto.ComputeTimeCursorRequest;
import com.google.cloud.pubsublite.proto.ComputeTimeCursorResponse;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -87,6 +89,8 @@ public class TopicStatsServiceStubSettings extends StubSettings<TopicStatsServic
computeMessageStatsSettings;
private final UnaryCallSettings<ComputeHeadCursorRequest, ComputeHeadCursorResponse>
computeHeadCursorSettings;
private final UnaryCallSettings<ComputeTimeCursorRequest, ComputeTimeCursorResponse>
computeTimeCursorSettings;

/** Returns the object with the settings used for calls to computeMessageStats. */
public UnaryCallSettings<ComputeMessageStatsRequest, ComputeMessageStatsResponse>
Expand All @@ -100,6 +104,12 @@ public class TopicStatsServiceStubSettings extends StubSettings<TopicStatsServic
return computeHeadCursorSettings;
}

/** Returns the object with the settings used for calls to computeTimeCursor. */
public UnaryCallSettings<ComputeTimeCursorRequest, ComputeTimeCursorResponse>
computeTimeCursorSettings() {
return computeTimeCursorSettings;
}

@BetaApi("A restructuring of stub classes is planned, so this may break in the future")
public TopicStatsServiceStub createStub() throws IOException {
if (getTransportChannelProvider()
Expand Down Expand Up @@ -171,6 +181,7 @@ protected TopicStatsServiceStubSettings(Builder settingsBuilder) throws IOExcept

computeMessageStatsSettings = settingsBuilder.computeMessageStatsSettings().build();
computeHeadCursorSettings = settingsBuilder.computeHeadCursorSettings().build();
computeTimeCursorSettings = settingsBuilder.computeTimeCursorSettings().build();
}

/** Builder for TopicStatsServiceStubSettings. */
Expand All @@ -180,6 +191,8 @@ public static class Builder extends StubSettings.Builder<TopicStatsServiceStubSe
computeMessageStatsSettings;
private final UnaryCallSettings.Builder<ComputeHeadCursorRequest, ComputeHeadCursorResponse>
computeHeadCursorSettings;
private final UnaryCallSettings.Builder<ComputeTimeCursorRequest, ComputeTimeCursorResponse>
computeTimeCursorSettings;
private static final ImmutableMap<String, ImmutableSet<StatusCode.Code>>
RETRYABLE_CODE_DEFINITIONS;

Expand All @@ -195,7 +208,6 @@ public static class Builder extends StubSettings.Builder<TopicStatsServiceStubSe
StatusCode.Code.ABORTED,
StatusCode.Code.INTERNAL,
StatusCode.Code.UNKNOWN)));
definitions.put("no_retry_codes", ImmutableSet.copyOf(Lists.<StatusCode.Code>newArrayList()));
RETRYABLE_CODE_DEFINITIONS = definitions.build();
}

Expand All @@ -215,8 +227,6 @@ public static class Builder extends StubSettings.Builder<TopicStatsServiceStubSe
.setTotalTimeout(Duration.ofMillis(600000L))
.build();
definitions.put("retry_policy_0_params", settings);
settings = RetrySettings.newBuilder().setRpcTimeoutMultiplier(1.0).build();
definitions.put("no_retry_params", settings);
RETRY_PARAM_DEFINITIONS = definitions.build();
}

Expand All @@ -229,10 +239,11 @@ protected Builder(ClientContext clientContext) {

computeMessageStatsSettings = UnaryCallSettings.newUnaryCallSettingsBuilder();
computeHeadCursorSettings = UnaryCallSettings.newUnaryCallSettingsBuilder();
computeTimeCursorSettings = UnaryCallSettings.newUnaryCallSettingsBuilder();

unaryMethodSettingsBuilders =
ImmutableList.<UnaryCallSettings.Builder<?, ?>>of(
computeMessageStatsSettings, computeHeadCursorSettings);
computeMessageStatsSettings, computeHeadCursorSettings, computeTimeCursorSettings);
initDefaults(this);
}

Expand All @@ -241,10 +252,11 @@ protected Builder(TopicStatsServiceStubSettings settings) {

computeMessageStatsSettings = settings.computeMessageStatsSettings.toBuilder();
computeHeadCursorSettings = settings.computeHeadCursorSettings.toBuilder();
computeTimeCursorSettings = settings.computeTimeCursorSettings.toBuilder();

unaryMethodSettingsBuilders =
ImmutableList.<UnaryCallSettings.Builder<?, ?>>of(
computeMessageStatsSettings, computeHeadCursorSettings);
computeMessageStatsSettings, computeHeadCursorSettings, computeTimeCursorSettings);
}

private static Builder createDefault() {
Expand All @@ -266,8 +278,13 @@ private static Builder initDefaults(Builder builder) {

builder
.computeHeadCursorSettings()
.setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("no_retry_codes"))
.setRetrySettings(RETRY_PARAM_DEFINITIONS.get("no_retry_params"));
.setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("retry_policy_0_codes"))
.setRetrySettings(RETRY_PARAM_DEFINITIONS.get("retry_policy_0_params"));

builder
.computeTimeCursorSettings()
.setRetryableCodes(RETRYABLE_CODE_DEFINITIONS.get("retry_policy_0_codes"))
.setRetrySettings(RETRY_PARAM_DEFINITIONS.get("retry_policy_0_params"));

return builder;
}
Expand Down Expand Up @@ -300,6 +317,12 @@ public Builder applyToAllUnaryMethods(
return computeHeadCursorSettings;
}

/** Returns the builder for the settings used for calls to computeTimeCursor. */
public UnaryCallSettings.Builder<ComputeTimeCursorRequest, ComputeTimeCursorResponse>
computeTimeCursorSettings() {
return computeTimeCursorSettings;
}

@Override
public TopicStatsServiceStubSettings build() throws IOException {
return new TopicStatsServiceStubSettings(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.google.cloud.pubsublite.proto.ComputeHeadCursorResponse;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsRequest;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import com.google.cloud.pubsublite.proto.ComputeTimeCursorRequest;
import com.google.cloud.pubsublite.proto.ComputeTimeCursorResponse;
import com.google.cloud.pubsublite.proto.TopicStatsServiceGrpc.TopicStatsServiceImplBase;
import com.google.protobuf.AbstractMessage;
import io.grpc.stub.StreamObserver;
Expand Down Expand Up @@ -105,4 +107,26 @@ public void computeHeadCursor(
Exception.class.getName())));
}
}

@Override
public void computeTimeCursor(
ComputeTimeCursorRequest request,
StreamObserver<ComputeTimeCursorResponse> responseObserver) {
Object response = responses.poll();
if (response instanceof ComputeTimeCursorResponse) {
requests.add(request);
responseObserver.onNext(((ComputeTimeCursorResponse) response));
responseObserver.onCompleted();
} else if (response instanceof Exception) {
responseObserver.onError(((Exception) response));
} else {
responseObserver.onError(
new IllegalArgumentException(
String.format(
"Unrecognized response type %s for method ComputeTimeCursor, expected %s or %s",
response == null ? "null" : response.getClass().getName(),
ComputeTimeCursorResponse.class.getName(),
Exception.class.getName())));
}
}
}
Loading

0 comments on commit f74b73c

Please sign in to comment.