From f17f000466ee60b2416159bb03f0784e6c80095f Mon Sep 17 00:00:00 2001 From: hannahrogers-google Date: Tue, 26 May 2020 19:18:57 -0400 Subject: [PATCH 1/6] fix: keep track of internal seek --- .../internal/wire/SubscriberImpl.java | 55 ++++++++++++++----- .../internal/wire/SubscriberImplTest.java | 41 +++++++++++++- 2 files changed, 82 insertions(+), 14 deletions(-) diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberImpl.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberImpl.java index abcd59014..d8b003eb1 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberImpl.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberImpl.java @@ -34,6 +34,7 @@ import com.google.cloud.pubsublite.proto.SubscriberServiceGrpc.SubscriberServiceStub; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.Monitor; import io.grpc.Status; import io.grpc.StatusException; import java.util.Optional; @@ -56,11 +57,24 @@ public class SubscriberImpl extends ProxyService private final TokenCounter tokenCounter = new TokenCounter(); @GuardedBy("monitor.monitor") - private Optional> inFlightSeek = Optional.empty(); + private Optional inFlightSeek = Optional.empty(); + + @GuardedBy("monitor.monitor") + private boolean internalSeekInFlight = false; @GuardedBy("monitor.monitor") private boolean shutdown = false; + private static class InFlightSeek { + final SeekRequest seekRequest; + final SettableApiFuture seekFuture; + + InFlightSeek(SeekRequest request, SettableApiFuture future) { + seekRequest = request; + seekFuture = future; + } + } + @VisibleForTesting SubscriberImpl( SubscriberServiceStub stub, @@ -91,7 +105,7 @@ public SubscriberImpl( protected void handlePermanentError(StatusException error) { try (CloseableMonitor.Hold h = monitor.enter()) { shutdown = true; - inFlightSeek.ifPresent(inFlight -> inFlight.setException(error)); + inFlightSeek.ifPresent(inFlight -> inFlight.seekFuture.setException(error)); inFlightSeek = Optional.empty(); onPermanentError(error); } @@ -106,7 +120,7 @@ protected void stop() { shutdown = true; inFlightSeek.ifPresent( inFlight -> - inFlight.setException( + inFlight.seekFuture.setException( Status.ABORTED .withDescription("Client stopped while seek in flight.") .asException())); @@ -115,13 +129,20 @@ protected void stop() { @Override public ApiFuture seek(SeekRequest request) { - try (CloseableMonitor.Hold h = monitor.enter()) { + try (CloseableMonitor.Hold h = + monitor.enterWhenUninterruptibly( + new Monitor.Guard(monitor.monitor) { + @Override + public boolean isSatisfied() { + return !internalSeekInFlight; + } + })) { checkArgument( Predicates.isValidSeekRequest(request), "Sent SeekRequest with no location set."); checkState(!shutdown, "Seeked after the stream shut down."); checkState(!inFlightSeek.isPresent(), "Seeked while seek is already in flight."); SettableApiFuture future = SettableApiFuture.create(); - inFlightSeek = Optional.of(future); + inFlightSeek = Optional.of(new InFlightSeek(request, future)); connection.modifyConnection( connectedSubscriber -> connectedSubscriber.ifPresent(subscriber -> subscriber.seek(request))); @@ -164,13 +185,17 @@ public void triggerReinitialize() { connectedSubscriber -> { checkArgument(monitor.monitor.isOccupiedByCurrentThread()); checkArgument(connectedSubscriber.isPresent()); - nextOffsetTracker - .requestForRestart() - .ifPresent( - request -> { - inFlightSeek = Optional.of(SettableApiFuture.create()); - connectedSubscriber.get().seek(request); - }); + if (inFlightSeek.isPresent()) { + connectedSubscriber.get().seek(inFlightSeek.get().seekRequest); + } else { + nextOffsetTracker + .requestForRestart() + .ifPresent( + request -> { + internalSeekInFlight = true; + connectedSubscriber.get().seek(request); + }); + } tokenCounter .requestForRestart() .ifPresent(request -> connectedSubscriber.get().allowFlow(request)); @@ -212,9 +237,13 @@ private Status onSeekResponse(Offset seekOffset) { if (shutdown) { return Status.OK; } + if (internalSeekInFlight) { + internalSeekInFlight = false; + return Status.OK; + } checkState(inFlightSeek.isPresent(), "No in flight seek, but received a seek response."); nextOffsetTracker.onClientSeek(seekOffset); - inFlightSeek.get().set(seekOffset); + inFlightSeek.get().seekFuture.set(seekOffset); inFlightSeek = Optional.empty(); return Status.OK; } catch (StatusException e) { diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SubscriberImplTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SubscriberImplTest.java index 8f1c9b7e8..ff6f7f7f5 100755 --- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SubscriberImplTest.java +++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SubscriberImplTest.java @@ -19,6 +19,7 @@ import static com.google.cloud.pubsublite.internal.StatusExceptionMatcher.assertFutureThrowsCode; import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; @@ -40,6 +41,7 @@ import com.google.cloud.pubsublite.SubscriptionPaths; import com.google.cloud.pubsublite.internal.StatusExceptionMatcher; import com.google.cloud.pubsublite.internal.wire.ConnectedSubscriber.Response; +import com.google.cloud.pubsublite.proto.Cursor; import com.google.cloud.pubsublite.proto.FlowControlRequest; import com.google.cloud.pubsublite.proto.InitialSubscribeRequest; import com.google.cloud.pubsublite.proto.SeekRequest; @@ -96,7 +98,7 @@ private static SubscribeRequest initialRequest() { private final Listener permanentErrorHandler = mock(Listener.class); - private Subscriber subscriber; + private SubscriberImpl subscriber; private StreamObserver leakedResponseObserver; @Before @@ -222,4 +224,41 @@ public void messageResponseSubtracts() { verify(permanentErrorHandler) .failed(any(), argThat(new StatusExceptionMatcher(Code.FAILED_PRECONDITION))); } + + @Test + public void reinitialize_resendsInFlightSeek() { + Offset offset = Offset.of(1); + SeekRequest seekRequest = + SeekRequest.newBuilder() + .setCursor(Cursor.newBuilder().setOffset(offset.value())) + .build(); + ApiFuture future = subscriber.seek(seekRequest); + assertThat(subscriber.seekInFlight()).isTrue(); + + subscriber.triggerReinitialize(); + verify(mockConnectedSubscriber, times(2)).seek(seekRequest); + + leakedResponseObserver.onNext(Response.ofSeekOffset(offset)); + assertTrue(future.isDone()); + assertThat(subscriber.seekInFlight()).isFalse(); + } + + @Test + public void reinitialize_sendsNextOffsetSeek() { + subscriber.allowFlow(bigFlowControlRequest()); + ImmutableList messages = + ImmutableList.of( + SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(0), 10), + SequencedMessage.of(Message.builder().build(), Timestamps.EPOCH, Offset.of(1), 10)); + leakedResponseObserver.onNext(Response.ofMessages(messages)); + verify(mockMessageConsumer).accept(messages); + + subscriber.triggerReinitialize(); + verify(mockConnectedSubscriber).seek( + SeekRequest.newBuilder() + .setCursor(Cursor.newBuilder().setOffset(2)) + .build()); + assertThat(subscriber.seekInFlight()).isFalse(); + leakedResponseObserver.onNext(Response.ofSeekOffset(Offset.of(2))); + } } From ae706b63d78398a311eb9f89a95e5bfad7b177bb Mon Sep 17 00:00:00 2001 From: hannahrogers-google Date: Wed, 27 May 2020 10:27:50 -0400 Subject: [PATCH 2/6] fix: check shutdown on seek --- .../cloud/pubsublite/internal/wire/SubscriberImpl.java | 2 +- .../pubsublite/internal/wire/SubscriberImplTest.java | 10 +++------- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberImpl.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberImpl.java index d8b003eb1..d67fd55f5 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberImpl.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberImpl.java @@ -134,7 +134,7 @@ public ApiFuture seek(SeekRequest request) { new Monitor.Guard(monitor.monitor) { @Override public boolean isSatisfied() { - return !internalSeekInFlight; + return !internalSeekInFlight || shutdown; } })) { checkArgument( diff --git a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SubscriberImplTest.java b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SubscriberImplTest.java index ff6f7f7f5..6e08297fb 100755 --- a/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SubscriberImplTest.java +++ b/google-cloud-pubsublite/src/test/java/com/google/cloud/pubsublite/internal/wire/SubscriberImplTest.java @@ -229,9 +229,7 @@ public void messageResponseSubtracts() { public void reinitialize_resendsInFlightSeek() { Offset offset = Offset.of(1); SeekRequest seekRequest = - SeekRequest.newBuilder() - .setCursor(Cursor.newBuilder().setOffset(offset.value())) - .build(); + SeekRequest.newBuilder().setCursor(Cursor.newBuilder().setOffset(offset.value())).build(); ApiFuture future = subscriber.seek(seekRequest); assertThat(subscriber.seekInFlight()).isTrue(); @@ -254,10 +252,8 @@ public void reinitialize_sendsNextOffsetSeek() { verify(mockMessageConsumer).accept(messages); subscriber.triggerReinitialize(); - verify(mockConnectedSubscriber).seek( - SeekRequest.newBuilder() - .setCursor(Cursor.newBuilder().setOffset(2)) - .build()); + verify(mockConnectedSubscriber) + .seek(SeekRequest.newBuilder().setCursor(Cursor.newBuilder().setOffset(2)).build()); assertThat(subscriber.seekInFlight()).isFalse(); leakedResponseObserver.onNext(Response.ofSeekOffset(Offset.of(2))); } From 2d5134d2ce512c5b14943eb703689afbf8681cb6 Mon Sep 17 00:00:00 2001 From: hannahrogers-google Date: Wed, 27 May 2020 11:07:34 -0400 Subject: [PATCH 3/6] fix: reset tokens on seek --- .../cloud/pubsublite/internal/wire/SubscriberImpl.java | 1 + .../google/cloud/pubsublite/internal/wire/TokenCounter.java | 5 +++++ 2 files changed, 6 insertions(+) diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberImpl.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberImpl.java index d67fd55f5..290266f35 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberImpl.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/SubscriberImpl.java @@ -143,6 +143,7 @@ public boolean isSatisfied() { checkState(!inFlightSeek.isPresent(), "Seeked while seek is already in flight."); SettableApiFuture future = SettableApiFuture.create(); inFlightSeek = Optional.of(new InFlightSeek(request, future)); + tokenCounter.onClientSeek(); connection.modifyConnection( connectedSubscriber -> connectedSubscriber.ifPresent(subscriber -> subscriber.seek(request))); diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/TokenCounter.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/TokenCounter.java index 51242ca2a..8ee3c44c9 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/TokenCounter.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/internal/wire/TokenCounter.java @@ -50,6 +50,11 @@ void onMessages(Collection received) throws StatusException { messages -= received.size(); } + void onClientSeek() { + bytes = 0; + messages = 0; + } + Optional requestForRestart() { if (bytes == 0 && messages == 0) return Optional.empty(); return Optional.of( From 9b19348053c37b4b8fb45b7ce778dd86d67387ad Mon Sep 17 00:00:00 2001 From: hannahrogers-google Date: Fri, 31 Jul 2020 14:21:07 -0400 Subject: [PATCH 4/6] feat: add api client header to outbound requests --- .../com/google/cloud/pubsublite/Stubs.java | 49 ++++++++++++++++++- 1 file changed, 48 insertions(+), 1 deletion(-) diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/Stubs.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/Stubs.java index f3749671a..b9f9d36d3 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/Stubs.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/Stubs.java @@ -16,13 +16,28 @@ package com.google.cloud.pubsublite; +import com.google.api.gax.core.GaxProperties; +import com.google.api.gax.grpc.GaxGrpcProperties; +import com.google.api.gax.rpc.ApiClientHeaderProvider; import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.pubsublite.internal.ChannelCache; import com.google.common.collect.ImmutableList; +import io.grpc.CallOptions; import io.grpc.Channel; +import io.grpc.ClientCall; +import io.grpc.ClientInterceptor; +import io.grpc.ClientInterceptors; +import io.grpc.ForwardingClientCall.SimpleForwardingClientCall; +import io.grpc.Metadata; +import io.grpc.Metadata.Key; +import io.grpc.MethodDescriptor; import io.grpc.auth.MoreCallCredentials; import io.grpc.stub.AbstractStub; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import java.util.function.Function; public class Stubs { @@ -30,8 +45,9 @@ public class Stubs { public static > StubT defaultStub( String target, Function stubFactory) throws IOException { + return stubFactory - .apply(channels.get(target)) + .apply(ClientInterceptors.intercept(channels.get(target), getClientInterceptors())) .withCallCredentials( MoreCallCredentials.from( GoogleCredentials.getApplicationDefault() @@ -39,5 +55,36 @@ public static > StubT defaultStub( ImmutableList.of("https://www.googleapis.com/auth/cloud-platform")))); } + private static List getClientInterceptors() { + List clientInterceptors = new ArrayList<>(); + Map apiClientHeaders = + ApiClientHeaderProvider.newBuilder() + .setGeneratedLibToken("gapic", GaxProperties.getLibraryVersion(Stubs.class)) + .setTransportToken( + GaxGrpcProperties.getGrpcTokenName(), GaxGrpcProperties.getGrpcVersion()) + .build() + .getHeaders(); + clientInterceptors.add( + new ClientInterceptor() { + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + ClientCall call = next.newCall(method, callOptions); + return new SimpleForwardingClientCall(call) { + @Override + public void start(ClientCall.Listener responseListener, Metadata headers) { + for (Entry apiClientHeader : apiClientHeaders.entrySet()) { + headers.put( + Key.of(apiClientHeader.getKey(), Metadata.ASCII_STRING_MARSHALLER), + apiClientHeader.getValue()); + } + super.start(responseListener, headers); + } + }; + } + }); + return clientInterceptors; + } + private Stubs() {} } From 42b443c3c7cf2ae419f760b544dc3ba2eb640ee5 Mon Sep 17 00:00:00 2001 From: hannahrogers-google Date: Fri, 31 Jul 2020 14:23:01 -0400 Subject: [PATCH 5/6] fix format --- .../src/main/java/com/google/cloud/pubsublite/Stubs.java | 1 - 1 file changed, 1 deletion(-) diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/Stubs.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/Stubs.java index b9f9d36d3..4c1fc7e9d 100755 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/Stubs.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/Stubs.java @@ -45,7 +45,6 @@ public class Stubs { public static > StubT defaultStub( String target, Function stubFactory) throws IOException { - return stubFactory .apply(ClientInterceptors.intercept(channels.get(target), getClientInterceptors())) .withCallCredentials( From e5a38b344366f60a528aca2fce87be53da458435 Mon Sep 17 00:00:00 2001 From: hannahrogers-google Date: Tue, 4 Aug 2020 12:28:22 -0400 Subject: [PATCH 6/6] fix: use gccl instead of gapic --- .../src/main/java/com/google/cloud/pubsublite/Stubs.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/Stubs.java b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/Stubs.java index 4c1fc7e9d..6afaffafe 100644 --- a/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/Stubs.java +++ b/google-cloud-pubsublite/src/main/java/com/google/cloud/pubsublite/Stubs.java @@ -58,7 +58,7 @@ private static List getClientInterceptors() { List clientInterceptors = new ArrayList<>(); Map apiClientHeaders = ApiClientHeaderProvider.newBuilder() - .setGeneratedLibToken("gapic", GaxProperties.getLibraryVersion(Stubs.class)) + .setClientLibToken("gccl", GaxProperties.getLibraryVersion(Stubs.class)) .setTransportToken( GaxGrpcProperties.getGrpcTokenName(), GaxGrpcProperties.getGrpcVersion()) .build()