Skip to content

Commit

Permalink
fix: Change SingleConnection to batch before initial response instead…
Browse files Browse the repository at this point in the history
… of blocking (#1462)

* fix: Change SingleConnection to batch before initial response instead of blocking

This requires minimal changes and prevents thread explosion issues

* fix: Change SingleConnection to batch before initial response instead of blocking

This requires minimal changes and prevents thread explosion issues

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
dpcollins-google and gcf-owl-bot[bot] committed Jul 25, 2023
1 parent fc5f5d9 commit 2ac44af
Show file tree
Hide file tree
Showing 8 changed files with 48 additions and 147 deletions.
Expand Up @@ -16,10 +16,7 @@

package com.google.cloud.pubsublite.internal.wire;

import static com.google.cloud.pubsublite.internal.CheckedApiPreconditions.checkState;

import com.google.api.gax.rpc.ResponseObserver;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.PublishSequenceNumber;
import com.google.cloud.pubsublite.internal.wire.StreamFactories.PublishStreamFactory;
import com.google.cloud.pubsublite.proto.MessagePublishRequest;
Expand Down Expand Up @@ -69,18 +66,10 @@ public void publish(
}

@Override
protected void handleInitialResponse(PublishResponse response) throws CheckedApiException {
checkState(
response.hasInitialResponse(),
"First stream response is not an initial response: " + response);
}

@Override
protected void handleStreamResponse(PublishResponse response) throws CheckedApiException {
checkState(!response.hasInitialResponse(), "Received duplicate initial response.");
checkState(
response.hasMessageResponse(),
"Received response on stream which was neither a message or initial response.");
protected void handleStreamResponse(PublishResponse response) {
if (!response.hasMessageResponse()) {
return;
}
sendToClient(response.getMessageResponse());
}
}
Expand Up @@ -41,7 +41,7 @@ private ConnectedAssignerImpl(
StreamFactory<PartitionAssignmentRequest, PartitionAssignment> streamFactory,
ResponseObserver<PartitionAssignment> clientStream,
PartitionAssignmentRequest initialRequest) {
super(streamFactory, clientStream, STREAM_IDLE_TIMEOUT, /*expectInitialResponse=*/ false);
super(streamFactory, clientStream, STREAM_IDLE_TIMEOUT);
initialize(initialRequest);
}

Expand All @@ -56,13 +56,6 @@ public ConnectedAssigner New(
}

// SingleConnection implementation.
@Override
protected void handleInitialResponse(PartitionAssignment response) throws CheckedApiException {
// The assignment stream is server-initiated by sending a PartitionAssignment. The
// initial response from the server is handled identically to other responses.
handleStreamResponse(response);
}

@Override
protected void handleStreamResponse(PartitionAssignment response) throws CheckedApiException {
try (CloseableMonitor.Hold h = monitor.enter()) {
Expand Down
Expand Up @@ -41,7 +41,7 @@ public class ConnectedCommitterImpl
ResponseObserver<SequencedCommitCursorResponse> clientStream,
StreamingCommitCursorRequest initialRequest,
Duration streamIdleTimeout) {
super(streamFactory, clientStream, streamIdleTimeout, /*expectInitialResponse=*/ true);
super(streamFactory, clientStream, streamIdleTimeout);
this.initialRequest = initialRequest;
initialize(initialRequest);
}
Expand All @@ -57,20 +57,12 @@ public ConnectedCommitter New(
}
}

// SingleConnection implementation.
@Override
protected void handleInitialResponse(StreamingCommitCursorResponse response)
throws CheckedApiException {
checkState(
response.hasInitial(),
String.format(
"Received non-initial first response %s on stream with initial request %s.",
response, initialRequest));
}

@Override
protected void handleStreamResponse(StreamingCommitCursorResponse response)
throws CheckedApiException {
if (response.hasInitial()) {
return;
}
checkState(
response.hasCommit(),
String.format(
Expand Down
Expand Up @@ -64,24 +64,11 @@ public void allowFlow(FlowControlRequest request) {
sendToStream(SubscribeRequest.newBuilder().setFlowControl(request).build());
}

@Override
protected void handleInitialResponse(SubscribeResponse response) throws CheckedApiException {
checkState(
response.hasInitial(),
String.format(
"Received non-initial first response %s on stream with initial request %s.",
response, initialRequest));
}

@Override
protected void handleStreamResponse(SubscribeResponse response) throws CheckedApiException {
switch (response.getResponseCase()) {
case INITIAL:
throw new CheckedApiException(
String.format(
"Received duplicate initial response on stream with initial request %s.",
initialRequest),
Code.FAILED_PRECONDITION);
return;
case MESSAGES:
onMessages(response.getMessages());
return;
Expand Down
Expand Up @@ -21,12 +21,13 @@
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.api.gax.rpc.StreamController;
import com.google.cloud.pubsublite.internal.CheckedApiException;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.common.base.Preconditions;
import com.google.common.flogger.GoogleLogger;
import com.google.common.util.concurrent.Monitor.Guard;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.time.Duration;
import java.util.ArrayDeque;
import java.util.Queue;
import javax.annotation.Nullable;

/**
* A SingleConnection handles the state for a stream with an initial connection request that may
Expand All @@ -44,75 +45,57 @@ public abstract class SingleConnection<StreamRequestT, StreamResponseT, ClientRe

private final ClientStream<StreamRequestT> requestStream;
private final ResponseObserver<ClientResponseT> clientStream;
private final boolean expectInitial;
private final StreamIdleTimer streamIdleTimer;

private final CloseableMonitor connectionMonitor = new CloseableMonitor();

@GuardedBy("connectionMonitor.monitor")
@GuardedBy("this")
private boolean receivedInitial = false;

@GuardedBy("connectionMonitor.monitor")
private boolean completed = false;
@GuardedBy("this")
private final Queue<StreamRequestT> bufferedBeforeInitial = new ArrayDeque<>();

protected abstract void handleInitialResponse(StreamResponseT response)
throws CheckedApiException;
@GuardedBy("this")
private boolean completed = false;

protected abstract void handleStreamResponse(StreamResponseT response) throws CheckedApiException;

protected SingleConnection(
StreamFactory<StreamRequestT, StreamResponseT> streamFactory,
ResponseObserver<ClientResponseT> clientStream,
Duration streamIdleTimeout,
boolean expectInitialResponse) {
Duration streamIdleTimeout) {
this.clientStream = clientStream;
this.expectInitial = expectInitialResponse;
this.streamIdleTimer = new StreamIdleTimer(streamIdleTimeout, this::onStreamIdle);
this.requestStream = streamFactory.New(this);
}

protected SingleConnection(
StreamFactory<StreamRequestT, StreamResponseT> streamFactory,
ResponseObserver<ClientResponseT> clientStream) {
this(streamFactory, clientStream, DEFAULT_STREAM_IDLE_TIMEOUT, /*expectInitialResponse=*/ true);
this(streamFactory, clientStream, DEFAULT_STREAM_IDLE_TIMEOUT);
}

protected void initialize(StreamRequestT initialRequest) {
this.requestStream.send(initialRequest);
if (!expectInitial) {
return;
}
try (CloseableMonitor.Hold h =
connectionMonitor.enterWhenUninterruptibly(
new Guard(connectionMonitor.monitor) {
@Override
public boolean isSatisfied() {
return receivedInitial || completed;
}
})) {}
}

protected void sendToStream(StreamRequestT request) {
try (CloseableMonitor.Hold h = connectionMonitor.enter()) {
if (completed) {
log.atFine().log("Sent request after stream completion: %s", request);
return;
}
// This should be impossible to not have received the initial request, or be completed, and
// the caller has access to this object.
Preconditions.checkState(receivedInitial);
requestStream.send(request);
protected synchronized void sendToStream(StreamRequestT request) {
if (completed) {
log.atFine().log("Sent request after stream completion: %s", request);
return;
}
if (!receivedInitial) {
bufferedBeforeInitial.add(request);
return;
}
requestStream.send(request);
}

protected void sendToClient(ClientResponseT response) {
try (CloseableMonitor.Hold h = connectionMonitor.enter()) {
synchronized (this) {
if (completed) {
log.atFine().log("Sent response after stream completion: %s", response);
return;
}
// This should be impossible to not have received the initial request, or be completed, and
// the caller has access to this object.
// We should not send data to the client before receiving the initial value.
Preconditions.checkState(receivedInitial);
}
// The upcall may be reentrant, possibly on another thread while this thread is blocked.
Expand All @@ -123,16 +106,10 @@ protected void setError(CheckedApiException error) {
abort(error);
}

protected boolean isCompleted() {
try (CloseableMonitor.Hold h = connectionMonitor.enter()) {
return completed;
}
}

// Records the connection as completed and performs tear down, if not already completed. Returns
// whether the connection was already complete.
private boolean completeStream() {
try (CloseableMonitor.Hold h = connectionMonitor.enter()) {
private synchronized boolean completeStream() {
try {
if (completed) {
return true;
}
Expand Down Expand Up @@ -167,27 +144,33 @@ public void onStart(StreamController streamController) {}

@Override
public void onResponse(StreamResponseT response) {
boolean isFirst;
try (CloseableMonitor.Hold h = connectionMonitor.enter()) {
synchronized (this) {
streamIdleTimer.restart();
if (completed) {
log.atFine().log("Received response on stream after completion: %s", response);
return;
}
isFirst = !receivedInitial;
receivedInitial = true;
if (!receivedInitial) {
handleInitial();
}
}
try {
if (isFirst) {
handleInitialResponse(response);
} else {
handleStreamResponse(response);
}
handleStreamResponse(response);
} catch (CheckedApiException e) {
abort(e);
}
}

@GuardedBy("this")
private void handleInitial() {
for (@Nullable StreamRequestT req = bufferedBeforeInitial.poll();
req != null;
req = bufferedBeforeInitial.poll()) {
requestStream.send(req);
}
receivedInitial = true;
}

@Override
public void onError(Throwable t) {
if (completeStream()) {
Expand Down
Expand Up @@ -203,19 +203,6 @@ public void construct_SendsInitialThenError() {
}
}

@Test
public void construct_SendsMessagePublishResponseError() {
doAnswer(new MessageResponseAnswer(ByteString.EMPTY, messageResponse(Offset.of(10))))
.when(mockRequestStream)
.send(initialRequest(ByteString.EMPTY));
try (BatchPublisherImpl publisher =
FACTORY.New(streamFactory, mockOutputStream, initialRequest(ByteString.EMPTY))) {
verify(mockOutputStream).onError(argThat(new ApiExceptionMatcher(Code.FAILED_PRECONDITION)));
verifyNoMoreInteractions(mockOutputStream);
}
leakedResponseStream = Optional.empty();
}

private BatchPublisherImpl initialize(ByteString clientId) {
doAnswer(
(Answer<Void>)
Expand Down Expand Up @@ -244,16 +231,6 @@ public void responseAfterClose_Dropped() throws Exception {
verify(mockOutputStream, never()).onResponse(any());
}

@Test
public void duplicateInitial_Abort() {
BatchPublisher unusedPublisher = initialize(ByteString.EMPTY);
PublishResponse.Builder builder = PublishResponse.newBuilder();
builder.getInitialResponseBuilder();
leakedResponseStream.get().onResponse(builder.build());
verify(mockOutputStream).onError(argThat(new ApiExceptionMatcher(Code.FAILED_PRECONDITION)));
leakedResponseStream = Optional.empty();
}

@Test
public void setsSequenceNumbersWhenClientIdPresent() {
BatchPublisher publisher = initialize(CLIENT_ID);
Expand Down
Expand Up @@ -190,16 +190,6 @@ public void responseAfterClose_Dropped() throws Exception {
verify(mockOutputStream, never()).onResponse(any());
}

@Test
public void duplicateInitial_Abort() {
initialize();
StreamingCommitCursorResponse.Builder builder = StreamingCommitCursorResponse.newBuilder();
builder.getInitialBuilder();
leakedResponseStream.get().onResponse(builder.build());
verify(mockOutputStream).onError(argThat(new ApiExceptionMatcher(Code.FAILED_PRECONDITION)));
leakedResponseStream = Optional.empty();
}

@Test
public void commitRequestProxied() {
initialize();
Expand Down
Expand Up @@ -215,16 +215,6 @@ public void responseAfterClose_Dropped() {
verify(mockOutputStream, never()).onResponse(any());
}

@Test
public void duplicateInitial_Abort() {
initialize();
SubscribeResponse.Builder builder =
SubscribeResponse.newBuilder().setInitial(InitialSubscribeResponse.getDefaultInstance());
leakedResponseStream.get().onResponse(builder.build());
verify(mockOutputStream).onError(argThat(new ApiExceptionMatcher(Code.FAILED_PRECONDITION)));
leakedResponseStream = Optional.empty();
}

@Test
public void emptyMessagesResponse_Abort() {
initialize();
Expand Down

0 comments on commit 2ac44af

Please sign in to comment.