Skip to content

Commit

Permalink
fix: Implement ApiResourceAggregation to deduplicate logic for Backgr…
Browse files Browse the repository at this point in the history
…oundResources. (#301)
  • Loading branch information
dpcollins-google committed Oct 19, 2020
1 parent 96ad02c commit 09578b5
Show file tree
Hide file tree
Showing 9 changed files with 120 additions and 117 deletions.
5 changes: 5 additions & 0 deletions google-cloud-pubsublite/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@
<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
<differences>
<!-- Blanket ignored files -->
<difference>
<differenceType>5001</differenceType>
<className>com/google/cloud/pubsublite/internal/**</className>
<to>**</to>
</difference>
<difference>
<differenceType>6000</differenceType>
<className>com/google/cloud/pubsublite/internal/**</className>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@
package com.google.cloud.pubsublite;

import com.google.api.core.ApiFuture;
import com.google.api.gax.core.BackgroundResource;
import com.google.cloud.pubsublite.internal.ApiBackgroundResource;
import com.google.cloud.pubsublite.proto.Subscription;
import com.google.cloud.pubsublite.proto.Topic;
import com.google.protobuf.FieldMask;
import io.grpc.StatusException;
import java.util.List;

/** A client for performing Pub/Sub Lite admin operations. */
public interface AdminClient extends BackgroundResource {
public interface AdminClient extends ApiBackgroundResource {
static AdminClient create(AdminClientSettings settings) throws StatusException {
return settings.instantiate();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package com.google.cloud.pubsublite.internal;

import com.google.api.core.ApiFuture;
import com.google.api.gax.core.BackgroundResourceAggregation;
import com.google.api.gax.core.ExecutorAsBackgroundResource;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.retrying.RetryingExecutor;
Expand Down Expand Up @@ -48,7 +47,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

public class AdminClientImpl extends BackgroundResourceAggregation implements AdminClient {
public class AdminClientImpl extends ApiResourceAggregation implements AdminClient {
private final CloudRegion region;
private final AdminServiceGrpc.AdminServiceBlockingStub stub;
private final RetryingExecutor<Void> voidRetryingExecutor;
Expand Down Expand Up @@ -76,7 +75,7 @@ private AdminClientImpl(
AdminServiceGrpc.AdminServiceBlockingStub stub,
RetrySettings retrySettings,
ScheduledExecutorService executor) {
super(ImmutableList.of(new ExecutorAsBackgroundResource(executor)));
super(new ExecutorAsBackgroundResource(executor));
this.region = region;
this.stub = stub;
this.voidRetryingExecutor = RetryingExecutorUtil.retryingExecutor(retrySettings, executor);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.internal;

import com.google.api.gax.core.BackgroundResource;
import io.grpc.StatusException;

public interface ApiBackgroundResource extends BackgroundResource {
/**
* Tear down this resource.
*
* @throws StatusException on a failure to properly terminate.
*/
@Override
void close() throws StatusException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.pubsublite.internal;

import static com.google.cloud.pubsublite.internal.ExtractStatus.toCanonical;

import com.google.api.gax.core.BackgroundResource;
import com.google.api.gax.core.BackgroundResourceAggregation;
import com.google.common.collect.ImmutableList;
import io.grpc.StatusException;
import java.util.concurrent.TimeUnit;

public class ApiResourceAggregation implements ApiBackgroundResource {
private final BackgroundResourceAggregation resources;

ApiResourceAggregation(BackgroundResource... resources) {
this.resources = new BackgroundResourceAggregation(ImmutableList.copyOf(resources));
}

@Override
public void close() throws StatusException {
try {
resources.close();
} catch (Throwable t) {
throw toCanonical(t);
}
}

@Override
public void shutdown() {
resources.shutdown();
}

@Override
public boolean isShutdown() {
return resources.isShutdown();
}

@Override
public boolean isTerminated() {
return resources.isTerminated();
}

@Override
public void shutdownNow() {
resources.shutdownNow();
}

@Override
public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException {
return resources.awaitTermination(duration, unit);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,14 @@
package com.google.cloud.pubsublite.internal;

import com.google.api.core.ApiFuture;
import com.google.api.gax.core.BackgroundResource;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.SubscriptionPath;
import io.grpc.StatusException;
import java.util.Map;

public interface CursorClient extends BackgroundResource {

public interface CursorClient extends ApiBackgroundResource {
static CursorClient create(CursorClientSettings settings) throws StatusException {
return settings.instantiate();
}
Expand All @@ -50,12 +48,4 @@ static CursorClient create(CursorClientSettings settings) throws StatusException
* @return A future for the operation's completion.
*/
ApiFuture<Void> commitCursor(SubscriptionPath path, Partition partition, Offset offset);

/**
* Tear down this admin client.
*
* @throws StatusException on a failure to properly terminate.
*/
@Override
void close() throws StatusException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package com.google.cloud.pubsublite.internal;

import com.google.api.core.ApiFuture;
import com.google.api.gax.core.BackgroundResource;
import com.google.api.gax.core.ExecutorAsBackgroundResource;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.retrying.RetryingExecutor;
Expand All @@ -32,14 +31,11 @@
import com.google.cloud.pubsublite.proto.ListPartitionCursorsResponse;
import com.google.cloud.pubsublite.proto.PartitionCursor;
import com.google.common.collect.ImmutableMap;
import io.grpc.StatusException;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class CursorClientImpl implements BackgroundResource, CursorClient {
private final ExecutorAsBackgroundResource executorResource;
public class CursorClientImpl extends ApiResourceAggregation implements CursorClient {
private final CloudRegion region;
private final CursorServiceBlockingStub stub;
private final RetryingExecutor<Map<Partition, Offset>> listRetryingExecutor;
Expand All @@ -60,7 +56,7 @@ private CursorClientImpl(
CursorServiceBlockingStub stub,
RetrySettings retrySettings,
ScheduledExecutorService executor) {
this.executorResource = new ExecutorAsBackgroundResource(executor);
super(new ExecutorAsBackgroundResource(executor));
this.region = region;
this.stub = stub;
this.listRetryingExecutor = RetryingExecutorUtil.retryingExecutor(retrySettings, executor);
Expand All @@ -72,41 +68,6 @@ public CloudRegion region() {
return region;
}

// BackgroundResource implementation.
@Override
public void shutdown() {
executorResource.shutdown();
}

@Override
public boolean isShutdown() {
return executorResource.isShutdown();
}

@Override
public boolean isTerminated() {
return executorResource.isTerminated();
}

@Override
public void shutdownNow() {
executorResource.shutdownNow();
}

@Override
public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException {
return executorResource.awaitTermination(duration, unit);
}

@Override
public void close() throws StatusException {
try {
executorResource.close();
} catch (Exception e) {
throw ExtractStatus.toCanonical(e);
}
}

// CursorClient Implementation
@Override
public ApiFuture<Map<Partition, Offset>> listPartitionCursors(SubscriptionPath path) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,14 @@
package com.google.cloud.pubsublite.internal;

import com.google.api.core.ApiFuture;
import com.google.api.gax.core.BackgroundResource;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import io.grpc.StatusException;

public interface TopicStatsClient extends BackgroundResource {
public interface TopicStatsClient extends ApiBackgroundResource {

static TopicStatsClient create(TopicStatsClientSettings settings) throws StatusException {
return settings.instantiate();
Expand All @@ -45,12 +44,4 @@ static TopicStatsClient create(TopicStatsClientSettings settings) throws StatusE
*/
ApiFuture<ComputeMessageStatsResponse> computeMessageStats(
TopicPath path, Partition partition, Offset start, Offset end);

/**
* Tear down this admin client.
*
* @throws StatusException on a failure to properly terminate.
*/
@Override
void close() throws StatusException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
package com.google.cloud.pubsublite.internal;

import com.google.api.core.ApiFuture;
import com.google.api.gax.core.BackgroundResource;
import com.google.api.gax.core.ExecutorAsBackgroundResource;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.retrying.RetryingExecutor;
Expand All @@ -29,13 +28,10 @@
import com.google.cloud.pubsublite.proto.ComputeMessageStatsResponse;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.TopicStatsServiceGrpc;
import io.grpc.StatusException;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

public class TopicStatsClientImpl implements BackgroundResource, TopicStatsClient {
private final ExecutorAsBackgroundResource executorResource;
public class TopicStatsClientImpl extends ApiResourceAggregation implements TopicStatsClient {
private final CloudRegion region;
private final TopicStatsServiceGrpc.TopicStatsServiceBlockingStub stub;
private final RetryingExecutor<ComputeMessageStatsResponse> retryingExecutor;
Expand All @@ -57,7 +53,7 @@ private TopicStatsClientImpl(
TopicStatsServiceGrpc.TopicStatsServiceBlockingStub stub,
RetrySettings retrySettings,
ScheduledExecutorService executor) {
this.executorResource = new ExecutorAsBackgroundResource(executor);
super(new ExecutorAsBackgroundResource(executor));
this.region = region;
this.stub = stub;
this.retryingExecutor = RetryingExecutorUtil.retryingExecutor(retrySettings, executor);
Expand All @@ -68,55 +64,19 @@ public CloudRegion region() {
return region;
}

// BackgroundResource implementation.
@Override
public void shutdown() {
executorResource.shutdown();
}

@Override
public boolean isShutdown() {
return executorResource.isShutdown();
}

@Override
public boolean isTerminated() {
return executorResource.isTerminated();
}

@Override
public void shutdownNow() {
executorResource.shutdownNow();
}

@Override
public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException {
return executorResource.awaitTermination(duration, unit);
}

@Override
public void close() throws StatusException {
try {
executorResource.close();
} catch (Exception e) {
throw ExtractStatus.toCanonical(e);
}
}

// TopicStatsClient Implementation
@Override
public ApiFuture<ComputeMessageStatsResponse> computeMessageStats(
TopicPath path, Partition partition, Offset start, Offset end) {
return RetryingExecutorUtil.runWithRetries(
() -> {
return stub.computeMessageStats(
ComputeMessageStatsRequest.newBuilder()
.setTopic(ProjectLookupUtils.toCanonical(path).toString())
.setPartition(partition.value())
.setStartCursor(Cursor.newBuilder().setOffset(start.value()).build())
.setEndCursor(Cursor.newBuilder().setOffset(end.value()).build())
.build());
},
() ->
stub.computeMessageStats(
ComputeMessageStatsRequest.newBuilder()
.setTopic(ProjectLookupUtils.toCanonical(path).toString())
.setPartition(partition.value())
.setStartCursor(Cursor.newBuilder().setOffset(start.value()).build())
.setEndCursor(Cursor.newBuilder().setOffset(end.value()).build())
.build()),
retryingExecutor);
}
}

0 comments on commit 09578b5

Please sign in to comment.