Skip to content

Commit

Permalink
feat: Adding ability to create a subscription at HEAD (#545)
Browse files Browse the repository at this point in the history
* feat: added ability to create subscriptions at head

* fix: change CursorLocation to StartingOffset

* fix: update docs

* fix: default is end

* fix: test name

* feat: update starting offset to offset location

* fix: update docs

* fix: requested changes

* fix: changing offset location to backlog location
  • Loading branch information
hannahrogers-google committed Mar 22, 2021
1 parent c2b5680 commit c526235
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 2 deletions.
5 changes: 5 additions & 0 deletions google-cloud-pubsublite/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
<className>com/google/cloud/pubsublite/PublishMetadata</className>
<method>*</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/pubsublite/AdminClient</className>
<method>*</method>
</difference>
<!-- Added abstract method to AutoValue.Builder class (Always okay) -->
<difference>
<differenceType>7013</differenceType>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@ static AdminClient create(AdminClientSettings settings) throws ApiException {
return settings.instantiate();
}

/**
* BacklogLoction refers to a location with respect to the message backlog.
*
* <p>BEGINNING refers to the location of the oldest retained message. END refers to the location
* past all currently published messages, skipping the entire message backlog.
*/
public enum BacklogLocation {
BEGINNING,
END
}

/** The Google Cloud region this client operates on. */
CloudRegion region();

Expand Down Expand Up @@ -102,11 +113,27 @@ static AdminClient create(AdminClientSettings settings) throws ApiException {
/**
* Create the provided subscription if it does not yet exist.
*
* <p>By default, a new subscription will only receive messages published after the subscription
* was created.
*
* @param subscription The subscription to create.
* @return A future that will have either an error {@link com.google.api.gax.rpc.ApiException} or
* the subscription on success.
*/
default ApiFuture<Subscription> createSubscription(Subscription subscription) {
return createSubscription(subscription, BacklogLocation.END);
}

/**
* Create the provided subscription at the given starting offset if it does not yet exist.
*
* @param subscription The subscription to create.
* @param startingOffset The offset at which the new subscription will start receiving messages.
* @return A future that will have either an error {@link com.google.api.gax.rpc.ApiException} or
* the subscription on success.
*/
ApiFuture<Subscription> createSubscription(Subscription subscription);
ApiFuture<Subscription> createSubscription(
Subscription subscription, BacklogLocation startingOffset);

/**
* Get the subscription with id {@code id} if it exists.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClient.BacklogLocation;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.LocationPath;
import com.google.cloud.pubsublite.SubscriptionPath;
Expand Down Expand Up @@ -136,7 +137,8 @@ public ApiFuture<List<SubscriptionPath>> listTopicSubscriptions(TopicPath path)
}

@Override
public ApiFuture<Subscription> createSubscription(Subscription subscription) {
public ApiFuture<Subscription> createSubscription(
Subscription subscription, BacklogLocation startingOffset) {
SubscriptionPath path = SubscriptionPath.parse(subscription.getName());
return serviceClient
.createSubscriptionCallable()
Expand All @@ -145,6 +147,7 @@ public ApiFuture<Subscription> createSubscription(Subscription subscription) {
.setParent(path.locationPath().toString())
.setSubscription(subscription)
.setSubscriptionId(path.name().toString())
.setSkipBacklog(startingOffset == BacklogLocation.END)
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.pubsublite.AdminClient.BacklogLocation;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.LocationPath;
Expand Down Expand Up @@ -372,6 +373,7 @@ public void createSubscription_Ok() throws Exception {
.setParent(subscriptionPath().locationPath().toString())
.setSubscription(SUBSCRIPTION)
.setSubscriptionId(subscriptionName().value())
.setSkipBacklog(true)
.build();

when(createSubscriptionCallable.futureCall(request))
Expand All @@ -387,13 +389,48 @@ public void createSubscription_Error() {
.setParent(subscriptionPath().locationPath().toString())
.setSubscription(SUBSCRIPTION)
.setSubscriptionId(subscriptionName().value())
.setSkipBacklog(true)
.build();

when(createSubscriptionCallable.futureCall(request)).thenReturn(failedPreconditionFuture());

assertFutureThrowsCode(client.createSubscription(SUBSCRIPTION), Code.FAILED_PRECONDITION);
}

@Test
public void createSubscriptionAtBeginning_Ok() throws Exception {
CreateSubscriptionRequest request =
CreateSubscriptionRequest.newBuilder()
.setParent(subscriptionPath().locationPath().toString())
.setSubscription(SUBSCRIPTION)
.setSubscriptionId(subscriptionName().value())
.setSkipBacklog(false)
.build();

when(createSubscriptionCallable.futureCall(request))
.thenReturn(immediateFuture(SUBSCRIPTION_2));

assertThat(client.createSubscription(SUBSCRIPTION, BacklogLocation.BEGINNING).get())
.isEqualTo(SUBSCRIPTION_2);
}

@Test
public void createSubscriptionAtBeginning_Error() throws Exception {
CreateSubscriptionRequest request =
CreateSubscriptionRequest.newBuilder()
.setParent(subscriptionPath().locationPath().toString())
.setSubscription(SUBSCRIPTION)
.setSubscriptionId(subscriptionName().value())
.setSkipBacklog(false)
.build();

when(createSubscriptionCallable.futureCall(request)).thenReturn(failedPreconditionFuture());

assertFutureThrowsCode(
client.createSubscription(SUBSCRIPTION, BacklogLocation.BEGINNING),
Code.FAILED_PRECONDITION);
}

@Test
public void updateSubscription_Ok() throws Exception {
UpdateSubscriptionRequest request =
Expand Down

0 comments on commit c526235

Please sign in to comment.