Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
f17f000
fix: keep track of internal seek
hannahrogers-google May 26, 2020
ae706b6
fix: check shutdown on seek
hannahrogers-google May 27, 2020
2d5134d
fix: reset tokens on seek
hannahrogers-google May 27, 2020
8b9c25f
merge of upstream
hannahrogers-google Jul 31, 2020
9b19348
feat: add api client header to outbound requests
hannahrogers-google Jul 31, 2020
42b443c
fix format
hannahrogers-google Jul 31, 2020
0a394db
Merge branch 'master' of github.com:googleapis/java-pubsublite into m…
hannahrogers-google Aug 4, 2020
e5a38b3
fix: use gccl instead of gapic
hannahrogers-google Aug 4, 2020
1eb4d7b
Merge branch 'master' of github.com:googleapis/java-pubsublite
hannahrogers-google Dec 21, 2020
641690a
feat: adding a method to the wire publisher to cancel outstanding pub…
hannahrogers-google Dec 21, 2020
0d54202
fix: fix typo
hannahrogers-google Dec 21, 2020
d2ae03d
fix: adding more tests to satisfy codecov report
hannahrogers-google Dec 22, 2020
2cc0495
Merge branch 'master' of github.com:googleapis/java-pubsublite
hannahrogers-google Feb 17, 2021
383d1cb
resolve conflicts
hannahrogers-google Feb 17, 2021
3723710
Merge branch 'master' of github.com:googleapis/java-pubsublite
hannahrogers-google Feb 19, 2021
1a4f7be
feat: added ability to create subscriptions at head
hannahrogers-google Feb 19, 2021
e84f780
Merge branch 'master' of github.com:googleapis/java-pubsublite
hannahrogers-google Mar 9, 2021
0020039
fix: change CursorLocation to StartingOffset
hannahrogers-google Mar 9, 2021
91bcbcd
fix: update docs
hannahrogers-google Mar 9, 2021
e975204
fix: default is end
hannahrogers-google Mar 10, 2021
bcc4f4c
fix: test name
hannahrogers-google Mar 10, 2021
6917df6
feat: update starting offset to offset location
hannahrogers-google Mar 16, 2021
2731111
Merge branch 'master' of github.com:googleapis/java-pubsublite
hannahrogers-google Mar 16, 2021
42d8e84
fix: update docs
hannahrogers-google Mar 16, 2021
0e1a54a
fix: requested changes
hannahrogers-google Mar 16, 2021
02cb149
fix: changing offset location to backlog location
hannahrogers-google Mar 17, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions google-cloud-pubsublite/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
<className>com/google/cloud/pubsublite/PublishMetadata</className>
<method>*</method>
</difference>
<difference>
<differenceType>7012</differenceType>
<className>com/google/cloud/pubsublite/AdminClient</className>
<method>*</method>
</difference>
<!-- Added abstract method to AutoValue.Builder class (Always okay) -->
<difference>
<differenceType>7013</differenceType>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@ static AdminClient create(AdminClientSettings settings) throws ApiException {
return settings.instantiate();
}

/**
* BacklogLoction refers to a location with respect to the message backlog.
*
* <p>BEGINNING refers to the location of the oldest retained message. END refers to the location
* past all currently published messages, skipping the entire message backlog.
*/
public enum BacklogLocation {
BEGINNING,
END
}

/** The Google Cloud region this client operates on. */
CloudRegion region();

Expand Down Expand Up @@ -102,11 +113,27 @@ static AdminClient create(AdminClientSettings settings) throws ApiException {
/**
* Create the provided subscription if it does not yet exist.
*
* <p>By default, a new subscription will only receive messages published after the subscription
* was created.
*
* @param subscription The subscription to create.
* @return A future that will have either an error {@link com.google.api.gax.rpc.ApiException} or
* the subscription on success.
*/
default ApiFuture<Subscription> createSubscription(Subscription subscription) {
return createSubscription(subscription, BacklogLocation.END);
}

/**
* Create the provided subscription at the given starting offset if it does not yet exist.
*
* @param subscription The subscription to create.
* @param startingOffset The offset at which the new subscription will start receiving messages.
* @return A future that will have either an error {@link com.google.api.gax.rpc.ApiException} or
* the subscription on success.
*/
ApiFuture<Subscription> createSubscription(Subscription subscription);
ApiFuture<Subscription> createSubscription(
Subscription subscription, BacklogLocation startingOffset);

/**
* Get the subscription with id {@code id} if it exists.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClient.BacklogLocation;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.LocationPath;
import com.google.cloud.pubsublite.SubscriptionPath;
Expand Down Expand Up @@ -136,7 +137,8 @@ public ApiFuture<List<SubscriptionPath>> listTopicSubscriptions(TopicPath path)
}

@Override
public ApiFuture<Subscription> createSubscription(Subscription subscription) {
public ApiFuture<Subscription> createSubscription(
Subscription subscription, BacklogLocation startingOffset) {
SubscriptionPath path = SubscriptionPath.parse(subscription.getName());
return serviceClient
.createSubscriptionCallable()
Expand All @@ -145,6 +147,7 @@ public ApiFuture<Subscription> createSubscription(Subscription subscription) {
.setParent(path.locationPath().toString())
.setSubscription(subscription)
.setSubscriptionId(path.name().toString())
.setSkipBacklog(startingOffset == BacklogLocation.END)
.build());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.api.gax.rpc.UnaryCallable;
import com.google.cloud.pubsublite.AdminClient.BacklogLocation;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.LocationPath;
Expand Down Expand Up @@ -372,6 +373,7 @@ public void createSubscription_Ok() throws Exception {
.setParent(subscriptionPath().locationPath().toString())
.setSubscription(SUBSCRIPTION)
.setSubscriptionId(subscriptionName().value())
.setSkipBacklog(true)
.build();

when(createSubscriptionCallable.futureCall(request))
Expand All @@ -387,13 +389,48 @@ public void createSubscription_Error() {
.setParent(subscriptionPath().locationPath().toString())
.setSubscription(SUBSCRIPTION)
.setSubscriptionId(subscriptionName().value())
.setSkipBacklog(true)
.build();

when(createSubscriptionCallable.futureCall(request)).thenReturn(failedPreconditionFuture());

assertFutureThrowsCode(client.createSubscription(SUBSCRIPTION), Code.FAILED_PRECONDITION);
}

@Test
public void createSubscriptionAtBeginning_Ok() throws Exception {
CreateSubscriptionRequest request =
CreateSubscriptionRequest.newBuilder()
.setParent(subscriptionPath().locationPath().toString())
.setSubscription(SUBSCRIPTION)
.setSubscriptionId(subscriptionName().value())
.setSkipBacklog(false)
.build();

when(createSubscriptionCallable.futureCall(request))
.thenReturn(immediateFuture(SUBSCRIPTION_2));

assertThat(client.createSubscription(SUBSCRIPTION, BacklogLocation.BEGINNING).get())
.isEqualTo(SUBSCRIPTION_2);
}

@Test
public void createSubscriptionAtBeginning_Error() throws Exception {
CreateSubscriptionRequest request =
CreateSubscriptionRequest.newBuilder()
.setParent(subscriptionPath().locationPath().toString())
.setSubscription(SUBSCRIPTION)
.setSubscriptionId(subscriptionName().value())
.setSkipBacklog(false)
.build();

when(createSubscriptionCallable.futureCall(request)).thenReturn(failedPreconditionFuture());

assertFutureThrowsCode(
client.createSubscription(SUBSCRIPTION, BacklogLocation.BEGINNING),
Code.FAILED_PRECONDITION);
}

@Test
public void updateSubscription_Ok() throws Exception {
UpdateSubscriptionRequest request =
Expand Down