Skip to content

Commit

Permalink
feat: Add userClose flag back to StreamWriter (#1973)
Browse files Browse the repository at this point in the history
* feat: Add userClose flag back to StreamWriter

* .

* .

* .

* .

* .

* fix test failure

* .

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
yirutang and gcf-owl-bot[bot] committed Feb 6, 2023
1 parent 54e9bb9 commit 4b51acd
Show file tree
Hide file tree
Showing 6 changed files with 158 additions and 7 deletions.
Expand Up @@ -279,6 +279,15 @@ ApiFuture<AppendRowsResponse> append(
return appendInternal(requestBuilder.build());
}

Boolean isUserClosed() {
this.lock.lock();
try {
return userClosed;
} finally {
this.lock.unlock();
}
}

private ApiFuture<AppendRowsResponse> appendInternal(AppendRowsRequest message) {
AppendRequestAndResponse requestWrapper = new AppendRequestAndResponse(message);
if (requestWrapper.messageSize > getApiMaxRequestBytes()) {
Expand Down Expand Up @@ -384,8 +393,13 @@ public String getWriterId() {
}

boolean isConnectionInUnrecoverableState() {
// If final status is set, there's no
return connectionFinalStatus != null;
this.lock.lock();
try {
// If final status is set, there's no
return connectionFinalStatus != null;
} finally {
this.lock.unlock();
}
}

/** Close the stream writer. Shut down all resources. */
Expand Down Expand Up @@ -821,7 +835,7 @@ synchronized TableSchemaAndTimestamp getUpdatedSchema() {
}

// Class that wraps AppendRowsRequest and its corresponding Response future.
private static final class AppendRequestAndResponse {
static final class AppendRequestAndResponse {
final SettableApiFuture<AppendRowsResponse> appendResult;
final AppendRowsRequest message;
final long messageSize;
Expand Down
Expand Up @@ -379,7 +379,7 @@ private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema w
connectionWorkerPool.add(connectionWorker);
log.info(
String.format(
"Scaling up new connection for stream name: %s, pool size after scaling up %s",
"Scaling up new connection for stream name: %s, pool size after scaling up %d",
streamName, connectionWorkerPool.size()));
return connectionWorker;
}
Expand Down
Expand Up @@ -22,7 +22,9 @@
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.auto.value.AutoOneOf;
import com.google.auto.value.AutoValue;
import com.google.cloud.bigquery.storage.v1.ConnectionWorker.AppendRequestAndResponse;
import com.google.cloud.bigquery.storage.v1.ConnectionWorker.TableSchemaAndTimestamp;
import com.google.cloud.bigquery.storage.v1.StreamWriter.SingleConnectionOrConnectionPool.Kind;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.grpc.Status;
Expand All @@ -36,6 +38,8 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -70,6 +74,11 @@ public class StreamWriter implements AutoCloseable {
*/
private final String location;

/*
* If user has closed the StreamWriter.
*/
private AtomicBoolean userClosed = new AtomicBoolean(false);

/*
* A String that uniquely identifies this writer.
*/
Expand All @@ -94,6 +103,8 @@ public class StreamWriter implements AutoCloseable {
/** Creation timestamp of this streamwriter */
private final long creationTimestamp;

private Lock lock;

/** The maximum size of one request. Defined by the API. */
public static long getApiMaxRequestBytes() {
return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)
Expand Down Expand Up @@ -363,6 +374,17 @@ public ApiFuture<AppendRowsResponse> append(ProtoRows rows) {
* @return the append response wrapped in a future.
*/
public ApiFuture<AppendRowsResponse> append(ProtoRows rows, long offset) {
if (userClosed.get()) {
AppendRequestAndResponse requestWrapper =
new AppendRequestAndResponse(AppendRowsRequest.newBuilder().build());
requestWrapper.appendResult.setException(
new Exceptions.StreamWriterClosedException(
Status.fromCode(Status.Code.FAILED_PRECONDITION)
.withDescription("User closed StreamWriter"),
streamName,
getWriterId()));
return requestWrapper.appendResult;
}
return this.singleConnectionOrConnectionPool.append(this, rows, offset);
}

Expand Down Expand Up @@ -398,9 +420,25 @@ public String getLocation() {
return location;
}

/**
* @return if a stream writer can no longer be used for writing. It is due to either the
* StreamWriter is explicitly closed or the underlying connection is broken when connection
* pool is not used. Client should recreate StreamWriter in this case.
*/
public boolean isDone() {
if (singleConnectionOrConnectionPool.getKind() == Kind.CONNECTION_WORKER) {
return userClosed.get()
|| singleConnectionOrConnectionPool.connectionWorker().isConnectionInUnrecoverableState();
} else {
// With ConnectionPool, we will replace the bad connection automatically.
return userClosed.get();
}
}

/** Close the stream writer. Shut down all resources. */
@Override
public void close() {
userClosed.set(true);
singleConnectionOrConnectionPool.close(this);
}

Expand Down
Expand Up @@ -18,6 +18,7 @@
import com.google.api.gax.grpc.testing.MockGrpcService;
import com.google.protobuf.AbstractMessage;
import io.grpc.ServerServiceDefinition;
import io.grpc.Status;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -102,4 +103,8 @@ public long getConnectionCount() {
public void setExecutor(ScheduledExecutorService executor) {
serviceImpl.setExecutor(executor);
}

public void setFailedStatus(Status failedStatus) {
serviceImpl.setFailedStatus(failedStatus);
}
}
Expand Up @@ -62,6 +62,7 @@ class FakeBigQueryWriteImpl extends BigQueryWriteGrpc.BigQueryWriteImplBase {
// Record whether the first record has been seen on a connection.
private final Map<StreamObserver<AppendRowsResponse>, Boolean> connectionToFirstRequest =
new ConcurrentHashMap<>();
private Status failedStatus = Status.ABORTED;

/** Class used to save the state of a possible response. */
private static class Response {
Expand Down Expand Up @@ -138,6 +139,10 @@ public long getConnectionCount() {
return connectionCount;
}

public void setFailedStatus(Status failedStatus) {
this.failedStatus = failedStatus;
}

@Override
public StreamObserver<AppendRowsRequest> appendRows(
final StreamObserver<AppendRowsResponse> responseObserver) {
Expand Down Expand Up @@ -177,10 +182,10 @@ public void onNext(AppendRowsRequest value) {
&& recordCount % closeAfter == 0
&& (numberTimesToClose == 0 || connectionCount <= numberTimesToClose)) {
LOG.info("Shutting down connection from test...");
responseObserver.onError(Status.ABORTED.asException());
responseObserver.onError(failedStatus.asException());
} else if (closeForeverAfter > 0 && recordCount > closeForeverAfter) {
LOG.info("Shutting down connection from test...");
responseObserver.onError(Status.ABORTED.asException());
responseObserver.onError(failedStatus.asException());
} else {
final Response response = responses.get(offset);
sendResponse(response, responseObserver);
Expand Down
Expand Up @@ -30,6 +30,7 @@
import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.api.gax.rpc.AbortedException;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.InvalidArgumentException;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.api.gax.rpc.UnknownException;
import com.google.cloud.bigquery.storage.test.Test.FooType;
Expand Down Expand Up @@ -1037,7 +1038,7 @@ public void testWriterAlreadyClosedException() throws Exception {
// The basic StatusRuntimeException API is not changed.
assertTrue(actualError instanceof StatusRuntimeException);
assertEquals(Status.Code.FAILED_PRECONDITION, actualError.getStatus().getCode());
assertTrue(actualError.getStatus().getDescription().contains("Connection is already closed"));
assertTrue(actualError.getStatus().getDescription().contains("User closed StreamWriter"));
assertEquals(actualError.getWriterId(), writer.getWriterId());
assertEquals(actualError.getStreamName(), writer.getStreamName());
}
Expand Down Expand Up @@ -1225,4 +1226,92 @@ public void testCloseDisconnectedStream() throws Exception {
// Ensure closing the writer after disconnect succeeds.
writer.close();
}

@Test(timeout = 10000)
public void testStreamWriterUserCloseMultiplexing() throws Exception {
StreamWriter writer =
StreamWriter.newBuilder(TEST_STREAM_1, client)
.setWriterSchema(createProtoSchema())
.setEnableConnectionPool(true)
.setLocation("us")
.build();

writer.close();
assertTrue(writer.isDone());
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
ExecutionException ex =
assertThrows(
ExecutionException.class,
() -> {
appendFuture1.get();
});
assertEquals(
Status.Code.FAILED_PRECONDITION,
((StatusRuntimeException) ex.getCause()).getStatus().getCode());
}

@Test(timeout = 10000)
public void testStreamWriterUserCloseNoMultiplexing() throws Exception {
StreamWriter writer =
StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(createProtoSchema()).build();

writer.close();
assertTrue(writer.isDone());
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
ExecutionException ex =
assertThrows(
ExecutionException.class,
() -> {
appendFuture1.get();
});
assertEquals(
Status.Code.FAILED_PRECONDITION,
((StatusRuntimeException) ex.getCause()).getStatus().getCode());
}

@Test(timeout = 10000)
public void testStreamWriterPermanentErrorMultiplexing() throws Exception {
StreamWriter writer =
StreamWriter.newBuilder(TEST_STREAM_1, client)
.setWriterSchema(createProtoSchema())
.setEnableConnectionPool(true)
.setLocation("us")
.build();
testBigQueryWrite.setCloseForeverAfter(1);
// Permenant errror.
testBigQueryWrite.setFailedStatus(Status.INVALID_ARGUMENT);
testBigQueryWrite.addResponse(createAppendResponse(0));
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
appendFuture1.get();
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"A"});
ExecutionException ex =
assertThrows(
ExecutionException.class,
() -> {
appendFuture2.get();
});
assertTrue(ex.getCause() instanceof InvalidArgumentException);
assertFalse(writer.isDone());
}

@Test(timeout = 10000)
public void testStreamWriterPermanentErrorNoMultiplexing() throws Exception {
StreamWriter writer =
StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(createProtoSchema()).build();
testBigQueryWrite.setCloseForeverAfter(1);
// Permenant errror.
testBigQueryWrite.setFailedStatus(Status.INVALID_ARGUMENT);
testBigQueryWrite.addResponse(createAppendResponse(0));
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
appendFuture1.get();
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"A"});
ExecutionException ex =
assertThrows(
ExecutionException.class,
() -> {
appendFuture2.get();
});
assertTrue(writer.isDone());
assertTrue(ex.getCause() instanceof InvalidArgumentException);
}
}

0 comments on commit 4b51acd

Please sign in to comment.