7013
diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/AdminClient.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/AdminClient.java
index f44117cdb..c649c46e3 100755
--- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/AdminClient.java
+++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/AdminClient.java
@@ -30,6 +30,17 @@ static AdminClient create(AdminClientSettings settings) throws ApiException {
return settings.instantiate();
}
+ /**
+ * BacklogLoction refers to a location with respect to the message backlog.
+ *
+ * BEGINNING refers to the location of the oldest retained message. END refers to the location
+ * past all currently published messages, skipping the entire message backlog.
+ */
+ public enum BacklogLocation {
+ BEGINNING,
+ END
+ }
+
/** The Google Cloud region this client operates on. */
CloudRegion region();
@@ -102,11 +113,27 @@ static AdminClient create(AdminClientSettings settings) throws ApiException {
/**
* Create the provided subscription if it does not yet exist.
*
+ *
By default, a new subscription will only receive messages published after the subscription
+ * was created.
+ *
+ * @param subscription The subscription to create.
+ * @return A future that will have either an error {@link com.google.api.gax.rpc.ApiException} or
+ * the subscription on success.
+ */
+ default ApiFuture createSubscription(Subscription subscription) {
+ return createSubscription(subscription, BacklogLocation.END);
+ }
+
+ /**
+ * Create the provided subscription at the given starting offset if it does not yet exist.
+ *
* @param subscription The subscription to create.
+ * @param startingOffset The offset at which the new subscription will start receiving messages.
* @return A future that will have either an error {@link com.google.api.gax.rpc.ApiException} or
* the subscription on success.
*/
- ApiFuture createSubscription(Subscription subscription);
+ ApiFuture createSubscription(
+ Subscription subscription, BacklogLocation startingOffset);
/**
* Get the subscription with id {@code id} if it exists.
diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/AdminClientImpl.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/AdminClientImpl.java
index f541fdc9e..d6deafb34 100755
--- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/AdminClientImpl.java
+++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/AdminClientImpl.java
@@ -19,6 +19,7 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.pubsublite.AdminClient;
+import com.google.cloud.pubsublite.AdminClient.BacklogLocation;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.LocationPath;
import com.google.cloud.pubsublite.SubscriptionPath;
@@ -136,7 +137,8 @@ public ApiFuture> listTopicSubscriptions(TopicPath path)
}
@Override
- public ApiFuture createSubscription(Subscription subscription) {
+ public ApiFuture createSubscription(
+ Subscription subscription, BacklogLocation startingOffset) {
SubscriptionPath path = SubscriptionPath.parse(subscription.getName());
return serviceClient
.createSubscriptionCallable()
@@ -145,6 +147,7 @@ public ApiFuture createSubscription(Subscription subscription) {
.setParent(path.locationPath().toString())
.setSubscription(subscription)
.setSubscriptionId(path.name().toString())
+ .setSkipBacklog(startingOffset == BacklogLocation.END)
.build());
}
diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/AdminClientImplTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/AdminClientImplTest.java
index 1ddab9464..ef8721cbb 100755
--- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/AdminClientImplTest.java
+++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/AdminClientImplTest.java
@@ -29,6 +29,7 @@
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.api.gax.rpc.UnaryCallable;
+import com.google.cloud.pubsublite.AdminClient.BacklogLocation;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.LocationPath;
@@ -372,6 +373,7 @@ public void createSubscription_Ok() throws Exception {
.setParent(subscriptionPath().locationPath().toString())
.setSubscription(SUBSCRIPTION)
.setSubscriptionId(subscriptionName().value())
+ .setSkipBacklog(true)
.build();
when(createSubscriptionCallable.futureCall(request))
@@ -387,6 +389,7 @@ public void createSubscription_Error() {
.setParent(subscriptionPath().locationPath().toString())
.setSubscription(SUBSCRIPTION)
.setSubscriptionId(subscriptionName().value())
+ .setSkipBacklog(true)
.build();
when(createSubscriptionCallable.futureCall(request)).thenReturn(failedPreconditionFuture());
@@ -394,6 +397,40 @@ public void createSubscription_Error() {
assertFutureThrowsCode(client.createSubscription(SUBSCRIPTION), Code.FAILED_PRECONDITION);
}
+ @Test
+ public void createSubscriptionAtBeginning_Ok() throws Exception {
+ CreateSubscriptionRequest request =
+ CreateSubscriptionRequest.newBuilder()
+ .setParent(subscriptionPath().locationPath().toString())
+ .setSubscription(SUBSCRIPTION)
+ .setSubscriptionId(subscriptionName().value())
+ .setSkipBacklog(false)
+ .build();
+
+ when(createSubscriptionCallable.futureCall(request))
+ .thenReturn(immediateFuture(SUBSCRIPTION_2));
+
+ assertThat(client.createSubscription(SUBSCRIPTION, BacklogLocation.BEGINNING).get())
+ .isEqualTo(SUBSCRIPTION_2);
+ }
+
+ @Test
+ public void createSubscriptionAtBeginning_Error() throws Exception {
+ CreateSubscriptionRequest request =
+ CreateSubscriptionRequest.newBuilder()
+ .setParent(subscriptionPath().locationPath().toString())
+ .setSubscription(SUBSCRIPTION)
+ .setSubscriptionId(subscriptionName().value())
+ .setSkipBacklog(false)
+ .build();
+
+ when(createSubscriptionCallable.futureCall(request)).thenReturn(failedPreconditionFuture());
+
+ assertFutureThrowsCode(
+ client.createSubscription(SUBSCRIPTION, BacklogLocation.BEGINNING),
+ Code.FAILED_PRECONDITION);
+ }
+
@Test
public void updateSubscription_Ok() throws Exception {
UpdateSubscriptionRequest request =