Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 6 additions & 18 deletions google-cloud-pubsublite/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
@@ -1,31 +1,19 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
<differences>
<!-- Path construction has been modified: remove on next release. -->
<!-- ProjectLookupUtils functions renamed: remove on next release. -->
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/pubsublite/*Path*</className>
<className>com/google/cloud/pubsublite/ProjectLookupUtils</className>
<method>*</method>
</difference>
<difference>
<differenceType>7004</differenceType>
<className>com/google/cloud/pubsublite/*Path*</className>
<method>*</method>
</difference>
<difference>
<differenceType>7005</differenceType>
<className>com/google/cloud/pubsublite/*Path*</className>
<method>*</method>
<to>*</to>
<differenceType>3003</differenceType>
<className>com/google/cloud/pubsublite/ProjectLookupUtils</className>
</difference>
<difference>
<differenceType>7013</differenceType>
<className>com/google/cloud/pubsublite/*Path*</className>
<method>*</method>
</difference>
<difference>
<differenceType>8001</differenceType>
<className>com/google/cloud/pubsublite/*Paths*</className>
<differenceType>7009</differenceType>
<className>com/google/cloud/pubsublite/ProjectLookupUtils</className>
<method>*</method>
</difference>
<!-- Blanket ignored files -->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,37 +23,45 @@
import com.google.cloud.resourcemanager.ResourceManagerOptions;
import io.grpc.StatusException;

public class ProjectLookupUtils {
private static final ResourceManager resourceManager =
ResourceManagerOptions.getDefaultInstance().getService();
public final class ProjectLookupUtils {
private ProjectLookupUtils() {}

private static ResourceManager resourceManager = null;

private static synchronized ResourceManager getResourceManager() {
if (resourceManager == null) {
resourceManager = ResourceManagerOptions.getDefaultInstance().getService();
}
return resourceManager;
}

private static ProjectNumber getProjectNumber(ProjectId id) throws StatusException {
try {
Project project = resourceManager.get(id.toString());
Project project = getResourceManager().get(id.toString());
return ProjectNumber.of(project.getProjectNumber());
} catch (Throwable t) {
throw ExtractStatus.toCanonical(t);
}
}

static ProjectNumber toCannonical(ProjectIdOrNumber project) throws StatusException {
static ProjectNumber toCanonical(ProjectIdOrNumber project) throws StatusException {
if (project.getKind() == Kind.NUMBER) return project.number();
return getProjectNumber(project.name());
}

public static ProjectPath toCannonical(ProjectPath path) throws StatusException {
return path.toBuilder().setProject(toCannonical(path.project())).build();
public static ProjectPath toCanonical(ProjectPath path) throws StatusException {
return path.toBuilder().setProject(toCanonical(path.project())).build();
}

public static LocationPath toCannonical(LocationPath path) throws StatusException {
return path.toBuilder().setProject(toCannonical(path.project())).build();
public static LocationPath toCanonical(LocationPath path) throws StatusException {
return path.toBuilder().setProject(toCanonical(path.project())).build();
}

public static SubscriptionPath toCannonical(SubscriptionPath path) throws StatusException {
return path.toBuilder().setProject(toCannonical(path.project())).build();
public static SubscriptionPath toCanonical(SubscriptionPath path) throws StatusException {
return path.toBuilder().setProject(toCanonical(path.project())).build();
}

public static TopicPath toCannonical(TopicPath path) throws StatusException {
return path.toBuilder().setProject(toCannonical(path.project())).build();
public static TopicPath toCanonical(TopicPath path) throws StatusException {
return path.toBuilder().setProject(toCanonical(path.project())).build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public CloudRegion region() {
public ApiFuture<Topic> createTopic(Topic topic) {
return RetryingExecutorUtil.runWithRetries(
() -> {
TopicPath path = ProjectLookupUtils.toCannonical(TopicPath.parse(topic.getName()));
TopicPath path = ProjectLookupUtils.toCanonical(TopicPath.parse(topic.getName()));
return stub.createTopic(
CreateTopicRequest.newBuilder()
.setParent(path.locationPath().toString())
Expand All @@ -119,7 +119,7 @@ public ApiFuture<Topic> getTopic(TopicPath path) {
() ->
stub.getTopic(
GetTopicRequest.newBuilder()
.setName(ProjectLookupUtils.toCannonical(path).toString())
.setName(ProjectLookupUtils.toCanonical(path).toString())
.build()),
topicRetryingExecutor);
}
Expand All @@ -130,7 +130,7 @@ public ApiFuture<Long> getTopicPartitionCount(TopicPath path) {
() ->
stub.getTopicPartitions(
GetTopicPartitionsRequest.newBuilder()
.setName(ProjectLookupUtils.toCannonical(path).toString())
.setName(ProjectLookupUtils.toCanonical(path).toString())
.build())
.getPartitionCount(),
partitionCountRetryingExecutor);
Expand All @@ -142,7 +142,7 @@ public ApiFuture<List<Topic>> listTopics(LocationPath path) {
() ->
stub.listTopics(
ListTopicsRequest.newBuilder()
.setParent(ProjectLookupUtils.toCannonical(path).toString())
.setParent(ProjectLookupUtils.toCanonical(path).toString())
.build())
.getTopicsList(),
listTopicsRetryingExecutor);
Expand All @@ -156,7 +156,7 @@ public ApiFuture<Topic> updateTopic(Topic topic, FieldMask mask) {
topic
.toBuilder()
.setName(
ProjectLookupUtils.toCannonical(TopicPath.parse(topic.getName())).toString())
ProjectLookupUtils.toCanonical(TopicPath.parse(topic.getName())).toString())
.build();
return stub.updateTopic(
UpdateTopicRequest.newBuilder().setTopic(canonical).setUpdateMask(mask).build());
Expand All @@ -171,7 +171,7 @@ public ApiFuture<Void> deleteTopic(TopicPath path) {
() -> {
stub.deleteTopic(
DeleteTopicRequest.newBuilder()
.setName(ProjectLookupUtils.toCannonical(path).toString())
.setName(ProjectLookupUtils.toCanonical(path).toString())
.build());
return null;
},
Expand All @@ -186,7 +186,7 @@ public ApiFuture<List<SubscriptionPath>> listTopicSubscriptions(TopicPath path)
for (String subscription :
stub.listTopicSubscriptions(
ListTopicSubscriptionsRequest.newBuilder()
.setName(ProjectLookupUtils.toCannonical(path).toString())
.setName(ProjectLookupUtils.toCanonical(path).toString())
.build())
.getSubscriptionsList()) {
SubscriptionPath subscription_path = SubscriptionPath.parse(subscription);
Expand All @@ -202,7 +202,7 @@ public ApiFuture<Subscription> createSubscription(Subscription subscription) {
return RetryingExecutorUtil.runWithRetries(
() -> {
SubscriptionPath path =
ProjectLookupUtils.toCannonical(SubscriptionPath.parse(subscription.getName()));
ProjectLookupUtils.toCanonical(SubscriptionPath.parse(subscription.getName()));
return stub.createSubscription(
CreateSubscriptionRequest.newBuilder()
.setParent(path.locationPath().toString())
Expand All @@ -219,7 +219,7 @@ public ApiFuture<Subscription> getSubscription(SubscriptionPath path) {
() ->
stub.getSubscription(
GetSubscriptionRequest.newBuilder()
.setName(ProjectLookupUtils.toCannonical(path).toString())
.setName(ProjectLookupUtils.toCanonical(path).toString())
.build()),
subscriptionRetryingExecutor);
}
Expand All @@ -230,7 +230,7 @@ public ApiFuture<List<Subscription>> listSubscriptions(LocationPath path) {
() ->
stub.listSubscriptions(
ListSubscriptionsRequest.newBuilder()
.setParent(ProjectLookupUtils.toCannonical(path).toString())
.setParent(ProjectLookupUtils.toCanonical(path).toString())
.build())
.getSubscriptionsList(),
listSubscriptionsRetryingExecutor);
Expand All @@ -244,8 +244,7 @@ public ApiFuture<Subscription> updateSubscription(Subscription subscription, Fie
subscription
.toBuilder()
.setName(
ProjectLookupUtils.toCannonical(
SubscriptionPath.parse(subscription.getName()))
ProjectLookupUtils.toCanonical(SubscriptionPath.parse(subscription.getName()))
.toString())
.build();
return stub.updateSubscription(
Expand All @@ -264,7 +263,7 @@ public ApiFuture<Void> deleteSubscription(SubscriptionPath path) {
() -> {
stub.deleteSubscription(
DeleteSubscriptionRequest.newBuilder()
.setName(ProjectLookupUtils.toCannonical(path).toString())
.setName(ProjectLookupUtils.toCanonical(path).toString())
.build());
return null;
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ public ApiFuture<ComputeMessageStatsResponse> computeMessageStats(
() -> {
return stub.computeMessageStats(
ComputeMessageStatsRequest.newBuilder()
.setTopic(ProjectLookupUtils.toCannonical(path).toString())
.setTopic(ProjectLookupUtils.toCanonical(path).toString())
.setPartition(partition.value())
.setStartCursor(Cursor.newBuilder().setOffset(start.value()).build())
.setEndCursor(Cursor.newBuilder().setOffset(end.value()).build())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ public Assigner build() throws StatusException {
InitialPartitionAssignmentRequest initial =
InitialPartitionAssignmentRequest.newBuilder()
.setSubscription(
ProjectLookupUtils.toCannonical(builder.subscriptionPath()).toString())
ProjectLookupUtils.toCanonical(builder.subscriptionPath()).toString())
.setClientId(ByteString.copyFrom(uuidBuffer.array()))
.build();
return new AssignerImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public Committer build() throws StatusException {
InitialCommitCursorRequest initialCommitCursorRequest =
InitialCommitCursorRequest.newBuilder()
.setSubscription(
ProjectLookupUtils.toCannonical(builder.subscriptionPath()).toString())
ProjectLookupUtils.toCanonical(builder.subscriptionPath()).toString())
.setPartition(builder.partition().value())
.build();
return new CommitterImpl(cursorStub, initialCommitCursorRequest);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public Publisher<Offset> build() throws StatusException {
return new PublisherImpl(
actualStub,
InitialPublishRequest.newBuilder()
.setTopic(ProjectLookupUtils.toCannonical(autoBuilt.topic()).toString())
.setTopic(ProjectLookupUtils.toCanonical(autoBuilt.topic()).toString())
.setPartition(autoBuilt.partition().value())
.build(),
validateBatchingSettings(autoBuilt.batching()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ static Metadata of(TopicPath topic, Partition partition) throws StatusException
Metadata metadata = new Metadata();
String topic_value =
URLEncoder.encode(
ProjectLookupUtils.toCannonical(topic).toString(), StandardCharsets.UTF_8.toString());
ProjectLookupUtils.toCanonical(topic).toString(), StandardCharsets.UTF_8.toString());
String params = String.format("partition=%s&topic=%s", partition.value(), topic_value);
metadata.put(PARAMS_KEY, params);
return metadata;
Expand All @@ -53,7 +53,7 @@ static Metadata of(SubscriptionPath subscription, Partition partition) throws St
Metadata metadata = new Metadata();
String subscription_value =
URLEncoder.encode(
ProjectLookupUtils.toCannonical(subscription).toString(),
ProjectLookupUtils.toCanonical(subscription).toString(),
StandardCharsets.UTF_8.toString());
String params =
String.format("partition=%s&subscription=%s", partition.value(), subscription_value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public Subscriber build() throws StatusException {
InitialSubscribeRequest initialSubscribeRequest =
InitialSubscribeRequest.newBuilder()
.setSubscription(
ProjectLookupUtils.toCannonical(builder.subscriptionPath()).toString())
ProjectLookupUtils.toCanonical(builder.subscriptionPath()).toString())
.setPartition(builder.partition().value())
.build();
return new SubscriberImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,11 @@
public class TopicStatsClientSettingsTest {
@Test
public void testSettings() throws StatusException {
TopicStatsClientSettings.newBuilder()
.setRegion(CloudRegion.of("us-central1"))
.setStub(TopicStatsServiceGrpc.newBlockingStub(mock(Channel.class)))
.build()
.instantiate();
TopicStatsClient unusedClient =
TopicStatsClientSettings.newBuilder()
.setRegion(CloudRegion.of("us-central1"))
.setStub(TopicStatsServiceGrpc.newBlockingStub(mock(Channel.class)))
.build()
.instantiate();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,16 @@
public class CommitterBuilderTest {
@Test
public void testBuilder() throws Exception {
CommitterBuilder.newBuilder()
.setSubscriptionPath(
SubscriptionPath.newBuilder()
.setLocation(CloudZone.of(CloudRegion.of("us-central1"), 'a'))
.setProject(ProjectNumber.of(3))
.setName(SubscriptionName.of("abc"))
.build())
.setPartition(Partition.of(987))
.setCursorStub(CursorServiceGrpc.newStub(mock(Channel.class)))
.build();
Committer unusedCommitter =
CommitterBuilder.newBuilder()
.setSubscriptionPath(
SubscriptionPath.newBuilder()
.setLocation(CloudZone.of(CloudRegion.of("us-central1"), 'a'))
.setProject(ProjectNumber.of(3))
.setName(SubscriptionName.of("abc"))
.build())
.setPartition(Partition.of(987))
.setCursorStub(CursorServiceGrpc.newStub(mock(Channel.class)))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,12 @@

import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.internal.Publisher;
import com.google.cloud.pubsublite.proto.PublisherServiceGrpc;
import io.grpc.Channel;
import org.junit.Test;
Expand All @@ -34,16 +36,17 @@
public class PublisherBuilderTest {
@Test
public void testBuilder() throws Exception {
PublisherBuilder.builder()
.setBatching(PublisherBuilder.DEFAULT_BATCHING_SETTINGS)
.setTopic(
TopicPath.newBuilder()
.setLocation(CloudZone.of(CloudRegion.of("us-central1"), 'a'))
.setProject(ProjectNumber.of(3))
.setName(TopicName.of("abc"))
.build())
.setPartition(Partition.of(85))
.setStub(PublisherServiceGrpc.newStub(mock(Channel.class)))
.build();
Publisher<Offset> unusedPublisher =
PublisherBuilder.builder()
.setBatching(PublisherBuilder.DEFAULT_BATCHING_SETTINGS)
.setTopic(
TopicPath.newBuilder()
.setLocation(CloudZone.of(CloudRegion.of("us-central1"), 'a'))
.setProject(ProjectNumber.of(3))
.setName(TopicName.of("abc"))
.build())
.setPartition(Partition.of(85))
.setStub(PublisherServiceGrpc.newStub(mock(Channel.class)))
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@

package com.google.cloud.pubsublite.internal.wire;

import static com.google.common.truth.Truth.assertThat;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.mockito.MockitoAnnotations.initMocks;

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.pubsublite.Message;
import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.Partition;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.internal.FakeApiService;
Expand Down Expand Up @@ -73,8 +76,11 @@ public void flushFlushesAll() throws Exception {
public void publishValidRoute() throws Exception {
Message message = Message.builder().setKey(ByteString.copyFromUtf8("abc")).build();
when(routingPolicy.route(message.key())).thenReturn(Partition.of(1));
routing.publish(message);
PublishMetadata meta = PublishMetadata.of(Partition.of(1), Offset.of(3));
when(publisher1.publish(message)).thenReturn(ApiFutures.immediateFuture(meta));
ApiFuture<PublishMetadata> fut = routing.publish(message);
verify(publisher1, times(1)).publish(message);
assertThat(fut.get()).isEqualTo(meta);
this.routing.stopAsync().awaitTerminated();
}

Expand Down
Loading