Skip to content

Commit f92f828

Browse files
fix: Add more coverage (#220)
* fix: Add a test for WrappingPublisher * fix: Restructure Stubs to have fewer untested lines in builder classes. * fix: Add license header.
1 parent 32fce6f commit f92f828

File tree

9 files changed

+182
-91
lines changed

9 files changed

+182
-91
lines changed

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/AdminClientSettings.java

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,7 @@
2121
import com.google.cloud.pubsublite.internal.AdminClientImpl;
2222
import com.google.cloud.pubsublite.proto.AdminServiceGrpc;
2323
import com.google.cloud.pubsublite.proto.AdminServiceGrpc.AdminServiceBlockingStub;
24-
import io.grpc.Status;
2524
import io.grpc.StatusException;
26-
import java.io.IOException;
2725
import java.util.Optional;
2826

2927
@AutoValue
@@ -60,16 +58,7 @@ AdminClient instantiate() throws StatusException {
6058
if (stub().isPresent()) {
6159
stub = stub().get();
6260
} else {
63-
try {
64-
stub =
65-
Stubs.defaultStub(
66-
Endpoints.regionalEndpoint(region()), AdminServiceGrpc::newBlockingStub);
67-
} catch (IOException e) {
68-
throw Status.INTERNAL
69-
.withCause(e)
70-
.withDescription("Creating admin stub failed.")
71-
.asException();
72-
}
61+
stub = Stubs.defaultStub(region(), AdminServiceGrpc::newBlockingStub);
7362
}
7463
return new AdminClientImpl(region(), stub, retrySettings());
7564
}

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/Stubs.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.google.api.gax.rpc.ApiClientHeaderProvider;
2222
import com.google.auth.oauth2.GoogleCredentials;
2323
import com.google.cloud.pubsublite.internal.ChannelCache;
24+
import com.google.cloud.pubsublite.internal.ExtractStatus;
2425
import com.google.common.collect.ImmutableList;
2526
import io.grpc.CallOptions;
2627
import io.grpc.Channel;
@@ -31,6 +32,7 @@
3132
import io.grpc.Metadata;
3233
import io.grpc.Metadata.Key;
3334
import io.grpc.MethodDescriptor;
35+
import io.grpc.StatusException;
3436
import io.grpc.auth.MoreCallCredentials;
3537
import io.grpc.stub.AbstractStub;
3638
import java.io.IOException;
@@ -44,14 +46,23 @@ public class Stubs {
4446
private static final ChannelCache channels = new ChannelCache();
4547

4648
public static <StubT extends AbstractStub<StubT>> StubT defaultStub(
47-
String target, Function<Channel, StubT> stubFactory) throws IOException {
48-
return stubFactory
49-
.apply(ClientInterceptors.intercept(channels.get(target), getClientInterceptors()))
50-
.withCallCredentials(
51-
MoreCallCredentials.from(
52-
GoogleCredentials.getApplicationDefault()
53-
.createScoped(
54-
ImmutableList.of("https://www.googleapis.com/auth/cloud-platform"))));
49+
CloudRegion target, Function<Channel, StubT> stubFactory) throws StatusException {
50+
return defaultStub(Endpoints.regionalEndpoint(target), stubFactory);
51+
}
52+
53+
public static <StubT extends AbstractStub<StubT>> StubT defaultStub(
54+
String target, Function<Channel, StubT> stubFactory) throws StatusException {
55+
try {
56+
return stubFactory
57+
.apply(ClientInterceptors.intercept(channels.get(target), getClientInterceptors()))
58+
.withCallCredentials(
59+
MoreCallCredentials.from(
60+
GoogleCredentials.getApplicationDefault()
61+
.createScoped(
62+
ImmutableList.of("https://www.googleapis.com/auth/cloud-platform"))));
63+
} catch (IOException e) {
64+
throw ExtractStatus.toCanonical(e);
65+
}
5566
}
5667

5768
private static List<ClientInterceptor> getClientInterceptors() {

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/TopicStatsClientSettings.java

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,10 @@
1919
import com.google.auto.value.AutoValue;
2020
import com.google.cloud.pubsublite.CloudRegion;
2121
import com.google.cloud.pubsublite.Constants;
22-
import com.google.cloud.pubsublite.Endpoints;
2322
import com.google.cloud.pubsublite.Stubs;
2423
import com.google.cloud.pubsublite.proto.TopicStatsServiceGrpc;
2524
import com.google.cloud.pubsublite.proto.TopicStatsServiceGrpc.TopicStatsServiceBlockingStub;
26-
import io.grpc.Status;
2725
import io.grpc.StatusException;
28-
import java.io.IOException;
2926
import java.util.Optional;
3027

3128
@AutoValue
@@ -63,16 +60,7 @@ TopicStatsClient instantiate() throws StatusException {
6360
if (stub().isPresent()) {
6461
stub = stub().get();
6562
} else {
66-
try {
67-
stub =
68-
Stubs.defaultStub(
69-
Endpoints.regionalEndpoint(region()), TopicStatsServiceGrpc::newBlockingStub);
70-
} catch (IOException e) {
71-
throw Status.INTERNAL
72-
.withCause(e)
73-
.withDescription("Creating topic stats stub failed.")
74-
.asException();
75-
}
63+
stub = Stubs.defaultStub(region(), TopicStatsServiceGrpc::newBlockingStub);
7664
}
7765
return new TopicStatsClientImpl(region(), stub, retrySettings());
7866
}

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/AssignerBuilder.java

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package com.google.cloud.pubsublite.internal.wire;
1818

1919
import com.google.auto.value.AutoValue;
20-
import com.google.cloud.pubsublite.Endpoints;
2120
import com.google.cloud.pubsublite.Stubs;
2221
import com.google.cloud.pubsublite.SubscriptionPath;
2322
import com.google.cloud.pubsublite.SubscriptionPaths;
@@ -26,9 +25,7 @@
2625
import com.google.cloud.pubsublite.proto.PartitionAssignmentServiceGrpc.PartitionAssignmentServiceStub;
2726
import com.google.common.flogger.GoogleLogger;
2827
import com.google.protobuf.ByteString;
29-
import io.grpc.Status;
3028
import io.grpc.StatusException;
31-
import java.io.IOException;
3229
import java.nio.ByteBuffer;
3330
import java.util.Optional;
3431
import java.util.UUID;
@@ -69,18 +66,10 @@ public Assigner build() throws StatusException {
6966
if (builder.assignmentStub().isPresent()) {
7067
stub = builder.assignmentStub().get();
7168
} else {
72-
try {
73-
stub =
74-
Stubs.defaultStub(
75-
Endpoints.regionalEndpoint(
76-
SubscriptionPaths.getZone(builder.subscriptionPath()).region()),
77-
PartitionAssignmentServiceGrpc::newStub);
78-
} catch (IOException e) {
79-
throw Status.INTERNAL
80-
.withCause(e)
81-
.withDescription("Creating assigner stub failed.")
82-
.asException();
83-
}
69+
stub =
70+
Stubs.defaultStub(
71+
SubscriptionPaths.getZone(builder.subscriptionPath()).region(),
72+
PartitionAssignmentServiceGrpc::newStub);
8473
}
8574

8675
UUID uuid = UUID.randomUUID();

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/CommitterBuilder.java

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,14 @@
1717
package com.google.cloud.pubsublite.internal.wire;
1818

1919
import com.google.auto.value.AutoValue;
20-
import com.google.cloud.pubsublite.Endpoints;
2120
import com.google.cloud.pubsublite.Partition;
2221
import com.google.cloud.pubsublite.Stubs;
2322
import com.google.cloud.pubsublite.SubscriptionPath;
2423
import com.google.cloud.pubsublite.SubscriptionPaths;
2524
import com.google.cloud.pubsublite.proto.CursorServiceGrpc;
2625
import com.google.cloud.pubsublite.proto.CursorServiceGrpc.CursorServiceStub;
2726
import com.google.cloud.pubsublite.proto.InitialCommitCursorRequest;
28-
import io.grpc.Status;
2927
import io.grpc.StatusException;
30-
import java.io.IOException;
3128
import java.util.Optional;
3229

3330
@AutoValue
@@ -65,18 +62,10 @@ public Committer build() throws StatusException {
6562
if (builder.cursorStub().isPresent()) {
6663
cursorStub = builder.cursorStub().get();
6764
} else {
68-
try {
69-
cursorStub =
70-
Stubs.defaultStub(
71-
Endpoints.regionalEndpoint(
72-
SubscriptionPaths.getZone(builder.subscriptionPath()).region()),
73-
CursorServiceGrpc::newStub);
74-
} catch (IOException e) {
75-
throw Status.INTERNAL
76-
.withCause(e)
77-
.withDescription("Creating cursor stub failed.")
78-
.asException();
79-
}
65+
cursorStub =
66+
Stubs.defaultStub(
67+
SubscriptionPaths.getZone(builder.subscriptionPath()).region(),
68+
CursorServiceGrpc::newStub);
8069
}
8170

8271
InitialCommitCursorRequest initialCommitCursorRequest =

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/PublisherBuilder.java

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,18 @@
2020
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
2121
import com.google.auto.value.AutoValue;
2222
import com.google.cloud.pubsublite.Constants;
23-
import com.google.cloud.pubsublite.Endpoints;
2423
import com.google.cloud.pubsublite.Offset;
2524
import com.google.cloud.pubsublite.Partition;
2625
import com.google.cloud.pubsublite.Stubs;
2726
import com.google.cloud.pubsublite.TopicPath;
2827
import com.google.cloud.pubsublite.TopicPaths;
29-
import com.google.cloud.pubsublite.internal.ExtractStatus;
3028
import com.google.cloud.pubsublite.internal.Publisher;
3129
import com.google.cloud.pubsublite.proto.InitialPublishRequest;
3230
import com.google.cloud.pubsublite.proto.PublisherServiceGrpc;
3331
import com.google.common.base.Preconditions;
3432
import io.grpc.Metadata;
3533
import io.grpc.StatusException;
3634
import io.grpc.stub.MetadataUtils;
37-
import java.io.IOException;
3835
import java.util.Optional;
3936
import org.threeten.bp.Duration;
4037

@@ -107,16 +104,11 @@ public abstract static class Builder {
107104
public Publisher<Offset> build() throws StatusException {
108105
PublisherBuilder autoBuilt = autoBuild();
109106
PublisherServiceGrpc.PublisherServiceStub actualStub;
110-
try {
111-
actualStub =
112-
autoBuilt.stub().isPresent()
113-
? autoBuilt.stub().get()
114-
: Stubs.defaultStub(
115-
Endpoints.regionalEndpoint(TopicPaths.getZone(autoBuilt.topic()).region()),
116-
PublisherServiceGrpc::newStub);
117-
} catch (IOException e) {
118-
throw ExtractStatus.toCanonical(e);
119-
}
107+
actualStub =
108+
autoBuilt.stub().isPresent()
109+
? autoBuilt.stub().get()
110+
: Stubs.defaultStub(
111+
TopicPaths.getZone(autoBuilt.topic()).region(), PublisherServiceGrpc::newStub);
120112
Metadata metadata = autoBuilt.context().getMetadata();
121113
metadata.merge(RoutingMetadata.of(autoBuilt.topic(), autoBuilt.partition()));
122114
actualStub = MetadataUtils.attachHeaders(actualStub, metadata);

google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberBuilder.java

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package com.google.cloud.pubsublite.internal.wire;
1818

1919
import com.google.auto.value.AutoValue;
20-
import com.google.cloud.pubsublite.Endpoints;
2120
import com.google.cloud.pubsublite.Partition;
2221
import com.google.cloud.pubsublite.SequencedMessage;
2322
import com.google.cloud.pubsublite.Stubs;
@@ -28,10 +27,8 @@
2827
import com.google.cloud.pubsublite.proto.SubscriberServiceGrpc.SubscriberServiceStub;
2928
import com.google.common.collect.ImmutableList;
3029
import io.grpc.Metadata;
31-
import io.grpc.Status;
3230
import io.grpc.StatusException;
3331
import io.grpc.stub.MetadataUtils;
34-
import java.io.IOException;
3532
import java.util.Optional;
3633
import java.util.function.Consumer;
3734

@@ -80,18 +77,10 @@ public Subscriber build() throws StatusException {
8077
if (builder.subscriberServiceStub().isPresent()) {
8178
subscriberServiceStub = builder.subscriberServiceStub().get();
8279
} else {
83-
try {
84-
subscriberServiceStub =
85-
Stubs.defaultStub(
86-
Endpoints.regionalEndpoint(
87-
SubscriptionPaths.getZone(builder.subscriptionPath()).region()),
88-
SubscriberServiceGrpc::newStub);
89-
} catch (IOException e) {
90-
throw Status.INTERNAL
91-
.withCause(e)
92-
.withDescription("Creating subscriber stub failed.")
93-
.asException();
94-
}
80+
subscriberServiceStub =
81+
Stubs.defaultStub(
82+
SubscriptionPaths.getZone(builder.subscriptionPath()).region(),
83+
SubscriberServiceGrpc::newStub);
9584
}
9685
Metadata metadata = builder.context().getMetadata();
9786
metadata.merge(RoutingMetadata.of(builder.subscriptionPath(), builder.partition()));
Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
/*
2+
* Copyright 2020 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.pubsublite.cloudpubsub.internal;
18+
19+
import static com.google.common.truth.Truth.assertThat;
20+
import static org.mockito.ArgumentMatchers.any;
21+
import static org.mockito.Mockito.times;
22+
import static org.mockito.Mockito.verify;
23+
import static org.mockito.Mockito.when;
24+
import static org.mockito.MockitoAnnotations.initMocks;
25+
26+
import com.google.api.core.ApiFuture;
27+
import com.google.api.core.SettableApiFuture;
28+
import com.google.cloud.pubsublite.Message;
29+
import com.google.cloud.pubsublite.Offset;
30+
import com.google.cloud.pubsublite.Partition;
31+
import com.google.cloud.pubsublite.PublishMetadata;
32+
import com.google.cloud.pubsublite.cloudpubsub.KeyExtractor;
33+
import com.google.cloud.pubsublite.cloudpubsub.MessageTransforms;
34+
import com.google.cloud.pubsublite.internal.FakeApiService;
35+
import com.google.cloud.pubsublite.internal.Publisher;
36+
import com.google.cloud.pubsublite.internal.StatusExceptionMatcher;
37+
import com.google.protobuf.ByteString;
38+
import com.google.pubsub.v1.PubsubMessage;
39+
import io.grpc.Status.Code;
40+
import io.grpc.StatusException;
41+
import org.junit.After;
42+
import org.junit.Before;
43+
import org.junit.Test;
44+
import org.junit.runner.RunWith;
45+
import org.junit.runners.JUnit4;
46+
import org.mockito.Spy;
47+
48+
@RunWith(JUnit4.class)
49+
public class WrappingPublisherTest {
50+
abstract static class FakePublisher extends FakeApiService
51+
implements Publisher<PublishMetadata> {}
52+
53+
@Spy private FakePublisher underlying;
54+
55+
private WrappingPublisher publisher;
56+
57+
@Before
58+
public void setUp() throws StatusException {
59+
initMocks(this);
60+
publisher =
61+
new WrappingPublisher(
62+
underlying, MessageTransforms.fromCpsPublishTransformer(KeyExtractor.DEFAULT));
63+
publisher.startAsync().awaitRunning();
64+
verify(underlying).startAsync();
65+
}
66+
67+
@After
68+
public void tearDown() {
69+
if (publisher.isRunning()) {
70+
publisher.stopAsync().awaitTerminated();
71+
verify(underlying).stopAsync();
72+
}
73+
}
74+
75+
@Test
76+
public void validPublish() throws Exception {
77+
PubsubMessage message = PubsubMessage.newBuilder().setOrderingKey("abc").build();
78+
Message wireMessage = Message.builder().setKey(ByteString.copyFromUtf8("abc")).build();
79+
SettableApiFuture<PublishMetadata> metadataFuture = SettableApiFuture.create();
80+
when(underlying.publish(wireMessage)).thenReturn(metadataFuture);
81+
ApiFuture<String> published = publisher.publish(message);
82+
verify(underlying).publish(wireMessage);
83+
assertThat(published.isDone()).isFalse();
84+
PublishMetadata metadata = PublishMetadata.of(Partition.of(3), Offset.of(88));
85+
metadataFuture.set(metadata);
86+
assertThat(published.isDone()).isTrue();
87+
assertThat(published.get()).isEqualTo(metadata.encode());
88+
}
89+
90+
@Test
91+
public void badTimestampCannotBeTransformed() {
92+
PubsubMessage message =
93+
PubsubMessage.newBuilder()
94+
.setOrderingKey("abc")
95+
.putAttributes(
96+
MessageTransforms.PUBSUB_LITE_EVENT_TIME_TIMESTAMP_PROTO,
97+
"Not a valid encoded timestamp")
98+
.build();
99+
100+
ApiFuture<String> published = publisher.publish(message);
101+
verify(underlying, times(0)).publish(any());
102+
StatusExceptionMatcher.assertFutureThrowsCode(published, Code.INVALID_ARGUMENT);
103+
assertThat(publisher.isRunning()).isFalse();
104+
}
105+
}

0 commit comments

Comments
 (0)