Skip to content

Commit

Permalink
fix: Change internals to throw StatusException instead of return Stat…
Browse files Browse the repository at this point in the history
…us (#300)

* fix: Change internals to throw StatusException instead of return Status

This is an intermediate change needed to move to ApiException/CheckedApiException.

* fix: clirr
  • Loading branch information
dpcollins-google committed Oct 19, 2020
1 parent a9baa99 commit 96ad02c
Show file tree
Hide file tree
Showing 18 changed files with 214 additions and 243 deletions.
21 changes: 6 additions & 15 deletions google-cloud-pubsublite/clirr-ignored-differences.xml
Original file line number Diff line number Diff line change
@@ -1,21 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<!-- see http://www.mojohaus.org/clirr-maven-plugin/examples/ignored-differences.html -->
<differences>
<!-- ProjectLookupUtils functions renamed: remove on next release. -->
<difference>
<differenceType>7002</differenceType>
<className>com/google/cloud/pubsublite/ProjectLookupUtils</className>
<method>*</method>
</difference>
<difference>
<differenceType>3003</differenceType>
<className>com/google/cloud/pubsublite/ProjectLookupUtils</className>
</difference>
<difference>
<differenceType>7009</differenceType>
<className>com/google/cloud/pubsublite/ProjectLookupUtils</className>
<method>*</method>
</difference>
<!-- Blanket ignored files -->
<difference>
<differenceType>6000</differenceType>
Expand All @@ -37,6 +22,12 @@
<className>com/google/cloud/pubsublite/internal/**</className>
<method>*</method>
</difference>
<difference>
<differenceType>7005</differenceType>
<className>com/google/cloud/pubsublite/internal/**</className>
<method>*</method>
<to>*</to>
</difference>
<difference>
<differenceType>7006</differenceType>
<className>com/google/cloud/pubsublite/internal/**</className>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,15 @@ protected void handlePermanentError(StatusException error) {}

@Override
protected void start() {
wireSubscriber.allowFlow(
FlowControlRequest.newBuilder()
.setAllowedMessages(flowControlSettings.messagesOutstanding())
.setAllowedBytes(flowControlSettings.bytesOutstanding())
.build());
try {
wireSubscriber.allowFlow(
FlowControlRequest.newBuilder()
.setAllowedMessages(flowControlSettings.messagesOutstanding())
.setAllowedBytes(flowControlSettings.bytesOutstanding())
.build());
} catch (StatusException e) {
onPermanentError(e);
}
}

@Override
Expand All @@ -89,11 +93,15 @@ void onMessages(ImmutableList<SequencedMessage> sequencedMessages) {
@Override
public void ack() {
trackerConsumer.run();
wireSubscriber.allowFlow(
FlowControlRequest.newBuilder()
.setAllowedMessages(1)
.setAllowedBytes(bytes)
.build());
try {
wireSubscriber.allowFlow(
FlowControlRequest.newBuilder()
.setAllowedMessages(1)
.setAllowedBytes(bytes)
.build());
} catch (StatusException e) {
onPermanentError(e);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import com.google.cloud.pubsublite.proto.PartitionAssignmentServiceGrpc.PartitionAssignmentServiceStub;
import com.google.common.annotations.VisibleForTesting;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.grpc.Status;
import io.grpc.StatusException;
import java.util.HashSet;
import java.util.Set;
Expand Down Expand Up @@ -82,13 +81,10 @@ private static Set<Partition> toSet(PartitionAssignment assignment) throws Statu
}

@Override
public Status onClientResponse(PartitionAssignment value) {
public void onClientResponse(PartitionAssignment value) throws StatusException {
try (CloseableMonitor.Hold h = monitor.enter()) {
receiver.handleAssignment(toSet(value));
connection.modifyConnection(connectionOr -> connectionOr.ifPresent(ConnectedAssigner::ack));
} catch (StatusException e) {
return e.getStatus();
}
return Status.OK;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package com.google.cloud.pubsublite.internal.wire;

import static com.google.cloud.pubsublite.internal.Preconditions.checkState;

import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.internal.CloseableMonitor;
import com.google.cloud.pubsublite.proto.MessagePublishResponse;
Expand All @@ -24,6 +26,7 @@
import com.google.cloud.pubsublite.proto.PublishResponse;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.stub.StreamObserver;
import java.util.Collection;
import java.util.Optional;
Expand Down Expand Up @@ -61,36 +64,31 @@ public void publish(Collection<PubSubMessage> messages) {
}

@Override
protected Status handleInitialResponse(PublishResponse response) {
if (!response.hasInitialResponse()) {
return Status.FAILED_PRECONDITION.withDescription(
"First stream response is not an initial response: " + response);
}
return Status.OK;
protected void handleInitialResponse(PublishResponse response) throws StatusException {
checkState(
response.hasInitialResponse(),
"First stream response is not an initial response: " + response);
}

@Override
protected Status handleStreamResponse(PublishResponse response) {
if (response.hasInitialResponse()) {
return Status.FAILED_PRECONDITION.withDescription("Received duplicate initial response.");
} else if (response.hasMessageResponse()) {
return onMessageResponse(response.getMessageResponse());
} else {
return Status.FAILED_PRECONDITION.withDescription(
"Received response on stream which was neither a message or initial response.");
}
protected void handleStreamResponse(PublishResponse response) throws StatusException {
checkState(!response.hasInitialResponse(), "Received duplicate initial response.");
checkState(
response.hasMessageResponse(),
"Received response on stream which was neither a message or initial response.");
onMessageResponse(response.getMessageResponse());
}

private Status onMessageResponse(MessagePublishResponse response) {
private void onMessageResponse(MessagePublishResponse response) throws StatusException {
Offset offset = Offset.of(response.getStartCursor().getOffset());
try (CloseableMonitor.Hold h = monitor.enter()) {
if (lastOffset.isPresent() && offset.value() <= lastOffset.get().value()) {
return Status.FAILED_PRECONDITION.withDescription(
"Received out of order offsets on stream.");
throw Status.FAILED_PRECONDITION
.withDescription("Received out of order offsets on stream.")
.asException();
}
lastOffset = Optional.of(offset);
}
sendToClient(offset);
return Status.OK;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.google.api.core.SettableApiFuture;
import com.google.cloud.pubsublite.Offset;
import io.grpc.Status;
import io.grpc.StatusException;
import java.util.ArrayDeque;
import java.util.Optional;
import java.util.Queue;
Expand Down Expand Up @@ -56,16 +57,18 @@ ApiFuture<Void> addCommit(Offset offset) {
return futureWithOffset.future;
}

Status complete(long numComplete) {
void complete(long numComplete) throws StatusException {
if (numComplete > currentConnectionFutures.size()) {
Status error =
Status.FAILED_PRECONDITION.withDescription(
String.format(
"Received %s completions, which is more than the commits outstanding for this"
+ " stream.",
numComplete));
abort(error.asException());
return error;
StatusException error =
Status.FAILED_PRECONDITION
.withDescription(
String.format(
"Received %s completions, which is more than the commits outstanding for this"
+ " stream.",
numComplete))
.asException();
abort(error);
throw error;
}
while (!pastConnectionFutures.isEmpty()) {
// Past futures refer to commits sent chronologically before the current stream, and thus they
Expand All @@ -75,7 +78,6 @@ Status complete(long numComplete) {
for (int i = 0; i < numComplete; i++) {
currentConnectionFutures.remove().future.set(null);
}
return Status.OK;
}

void abort(Throwable error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Monitor.Guard;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.grpc.Status;
import io.grpc.StatusException;
import java.util.Optional;

Expand Down Expand Up @@ -133,10 +132,10 @@ public ApiFuture<Void> commitOffset(Offset offset) {
}

@Override
public Status onClientResponse(SequencedCommitCursorResponse value) {
public void onClientResponse(SequencedCommitCursorResponse value) throws StatusException {
Preconditions.checkArgument(value.getAcknowledgedCommits() > 0);
try (CloseableMonitor.Hold h = monitor.enter()) {
return state.complete(value.getAcknowledgedCommits());
state.complete(value.getAcknowledgedCommits());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import com.google.cloud.pubsublite.proto.PartitionAssignmentAck;
import com.google.cloud.pubsublite.proto.PartitionAssignmentRequest;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.stub.StreamObserver;

Expand Down Expand Up @@ -55,24 +54,21 @@ public ConnectedAssigner New(

// SingleConnection implementation.
@Override
protected Status handleInitialResponse(PartitionAssignment response) {
protected void handleInitialResponse(PartitionAssignment response) throws StatusException {
// The assignment stream is server-initiated by sending a PartitionAssignment. The
// initial response from the server is handled identically to other responses.
return handleStreamResponse(response);
handleStreamResponse(response);
}

@Override
protected Status handleStreamResponse(PartitionAssignment response) {
protected void handleStreamResponse(PartitionAssignment response) throws StatusException {
try (CloseableMonitor.Hold h = monitor.enter()) {
checkState(
!outstanding,
"Received assignment from the server while there was an assignment outstanding.");
outstanding = true;
} catch (StatusException e) {
return e.getStatus();
}
sendToClient(response);
return Status.OK;
}

// ConnectedAssigner implementation.
Expand All @@ -82,7 +78,7 @@ public void ack() {
checkState(outstanding, "Client acknowledged when there was no request outstanding.");
outstanding = false;
} catch (StatusException e) {
setError(e.getStatus());
setError(e);
}
sendToStream(
PartitionAssignmentRequest.newBuilder()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,15 @@

package com.google.cloud.pubsublite.internal.wire;

import static com.google.cloud.pubsublite.internal.Preconditions.checkState;

import com.google.cloud.pubsublite.Offset;
import com.google.cloud.pubsublite.proto.Cursor;
import com.google.cloud.pubsublite.proto.SequencedCommitCursorRequest;
import com.google.cloud.pubsublite.proto.SequencedCommitCursorResponse;
import com.google.cloud.pubsublite.proto.StreamingCommitCursorRequest;
import com.google.cloud.pubsublite.proto.StreamingCommitCursorResponse;
import io.grpc.Status;
import io.grpc.StatusException;
import io.grpc.stub.StreamObserver;

public class ConnectedCommitterImpl
Expand Down Expand Up @@ -52,32 +54,29 @@ public ConnectedCommitter New(

// SingleConnection implementation.
@Override
protected Status handleInitialResponse(StreamingCommitCursorResponse response) {
if (!response.hasInitial()) {
return Status.FAILED_PRECONDITION.withDescription(
String.format(
"Received non-initial first response %s on stream with initial request %s.",
response, initialRequest));
}
return Status.OK;
protected void handleInitialResponse(StreamingCommitCursorResponse response)
throws StatusException {
checkState(
response.hasInitial(),
String.format(
"Received non-initial first response %s on stream with initial request %s.",
response, initialRequest));
}

@Override
protected Status handleStreamResponse(StreamingCommitCursorResponse response) {
if (!response.hasCommit()) {
return Status.FAILED_PRECONDITION.withDescription(
String.format(
"Received non-commit subsequent response %s on stream with initial request %s.",
response, initialRequest));
}
if (response.getCommit().getAcknowledgedCommits() <= 0) {
return Status.FAILED_PRECONDITION.withDescription(
String.format(
"Received non-positive commit count response %s on stream with initial request %s.",
response, initialRequest));
}
protected void handleStreamResponse(StreamingCommitCursorResponse response)
throws StatusException {
checkState(
response.hasCommit(),
String.format(
"Received non-commit subsequent response %s on stream with initial request %s.",
response, initialRequest));
checkState(
response.getCommit().getAcknowledgedCommits() > 0,
String.format(
"Received non-positive commit count response %s on stream with initial request %s.",
response, initialRequest));
sendToClient(response.getCommit());
return Status.OK;
}

// ConnectedCommitter implementation.
Expand Down
Loading

0 comments on commit 96ad02c

Please sign in to comment.