Skip to content

Commit

Permalink
feat: StreamWriterV2 sets exception for response with error (#884)
Browse files Browse the repository at this point in the history
  • Loading branch information
yayi-google committed Feb 25, 2021
1 parent 8e2ab01 commit 4677d7b
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,15 @@ private void requestCallback(AppendRowsResponse response) {
} finally {
this.lock.unlock();
}
requestWrapper.appendResult.set(response);
if (response.hasError()) {
StatusRuntimeException exception =
new StatusRuntimeException(
Status.fromCodeValue(response.getError().getCode())
.withDescription(response.getError().getMessage()));
requestWrapper.appendResult.setException(exception);
} else {
requestWrapper.appendResult.set(response);
}
}

private void doneCallback(Throwable finalStatus) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,12 @@ private AppendRowsResponse createAppendResponse(long offset) {
.build();
}

private AppendRowsResponse createAppendResponseWithError(Status.Code code, String message) {
return AppendRowsResponse.newBuilder()
.setError(com.google.rpc.Status.newBuilder().setCode(code.value()).setMessage(message))
.build();
}

private ApiFuture<AppendRowsResponse> sendTestMessage(StreamWriterV2 writer, String[] messages) {
return writer.append(createAppendRequest(messages, -1));
}
Expand Down Expand Up @@ -196,7 +202,7 @@ public void testAppendSuccess() throws Exception {
}

@Test
public void testAppendSuccessAndError() throws Exception {
public void testAppendSuccessAndConnectionError() throws Exception {
StreamWriterV2 writer = getTestStreamWriterV2();
testBigQueryWrite.addResponse(createAppendResponse(0));
testBigQueryWrite.addException(Status.INTERNAL.asException());
Expand All @@ -211,6 +217,28 @@ public void testAppendSuccessAndError() throws Exception {
writer.close();
}

@Test
public void testAppendSuccessAndInStreamError() throws Exception {
StreamWriterV2 writer = getTestStreamWriterV2();
testBigQueryWrite.addResponse(createAppendResponse(0));
testBigQueryWrite.addResponse(
createAppendResponseWithError(Status.INVALID_ARGUMENT.getCode(), "test message"));
testBigQueryWrite.addResponse(createAppendResponse(1));

ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});
ApiFuture<AppendRowsResponse> appendFuture3 = sendTestMessage(writer, new String[] {"C"});

assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue());
StatusRuntimeException actualError =
assertFutureException(StatusRuntimeException.class, appendFuture2);
assertEquals(Status.Code.INVALID_ARGUMENT, actualError.getStatus().getCode());
assertEquals("test message", actualError.getStatus().getDescription());
assertEquals(1, appendFuture3.get().getAppendResult().getOffset().getValue());

writer.close();
}

@Test
public void longIdleBetweenAppends() throws Exception {
StreamWriterV2 writer = getTestStreamWriterV2();
Expand Down

0 comments on commit 4677d7b

Please sign in to comment.