Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add userClose flag back to StreamWriter #1973

Merged
merged 9 commits into from
Feb 6, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,15 @@ ApiFuture<AppendRowsResponse> append(
return appendInternal(requestBuilder.build());
}

Boolean isUserClosed() {
this.lock.lock();
try {
return userClosed;
} finally {
this.lock.unlock();
}
}

private ApiFuture<AppendRowsResponse> appendInternal(AppendRowsRequest message) {
AppendRequestAndResponse requestWrapper = new AppendRequestAndResponse(message);
if (requestWrapper.messageSize > getApiMaxRequestBytes()) {
Expand Down Expand Up @@ -378,8 +387,13 @@ public String getWriterId() {
}

boolean isConnectionInUnrecoverableState() {
// If final status is set, there's no
return connectionFinalStatus != null;
this.lock.lock();
try {
// If final status is set, there's no
return connectionFinalStatus != null;
} finally {
this.lock.unlock();
}
}

/** Close the stream writer. Shut down all resources. */
Expand Down Expand Up @@ -793,7 +807,7 @@ synchronized TableSchemaAndTimestamp getUpdatedSchema() {
}

// Class that wraps AppendRowsRequest and its corresponding Response future.
private static final class AppendRequestAndResponse {
static final class AppendRequestAndResponse {
final SettableApiFuture<AppendRowsResponse> appendResult;
final AppendRowsRequest message;
final long messageSize;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema w
connectionWorkerPool.add(connectionWorker);
log.info(
String.format(
"Scaling up new connection for stream name: %s, pool size after scaling up %s",
"Scaling up new connection for stream name: %s, pool size after scaling up %d",
streamName, connectionWorkerPool.size()));
return connectionWorker;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.auto.value.AutoOneOf;
import com.google.auto.value.AutoValue;
import com.google.cloud.bigquery.storage.v1.ConnectionWorker.AppendRequestAndResponse;
import com.google.cloud.bigquery.storage.v1.ConnectionWorker.TableSchemaAndTimestamp;
import com.google.cloud.bigquery.storage.v1.StreamWriter.SingleConnectionOrConnectionPool.Kind;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.grpc.Status;
Expand All @@ -36,6 +38,8 @@
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.logging.Logger;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
Expand Down Expand Up @@ -70,6 +74,11 @@ public class StreamWriter implements AutoCloseable {
*/
private final String location;

/*
* If user has closed the StreamWriter.
*/
private AtomicBoolean userClosed = new AtomicBoolean(false);

/*
* A String that uniquely identifies this writer.
*/
Expand All @@ -94,6 +103,8 @@ public class StreamWriter implements AutoCloseable {
/** Creation timestamp of this streamwriter */
private final long creationTimestamp;

private Lock lock;

/** The maximum size of one request. Defined by the API. */
public static long getApiMaxRequestBytes() {
return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)
Expand Down Expand Up @@ -363,6 +374,15 @@ public ApiFuture<AppendRowsResponse> append(ProtoRows rows) {
* @return the append response wrapped in a future.
*/
public ApiFuture<AppendRowsResponse> append(ProtoRows rows, long offset) {
if (userClosed.get()) {
AppendRequestAndResponse requestWrapper =
new AppendRequestAndResponse(AppendRowsRequest.newBuilder().build());
requestWrapper.appendResult.setException(
new StatusRuntimeException(
Status.fromCode(Code.FAILED_PRECONDITION)
.withDescription("User slosed StreamWriter")));
return requestWrapper.appendResult;
}
return this.singleConnectionOrConnectionPool.append(this, rows, offset);
}

Expand Down Expand Up @@ -398,9 +418,25 @@ public String getLocation() {
return location;
}

/**
* @return if a stream writer can no longer be used for writing. It is due to either the
* StreamWriter is explicitly closed or the underlying connection is broken when connection
* pool is not used. Client should recreate StreamWriter in this case.
*/
public boolean isDone() {
if (singleConnectionOrConnectionPool.getKind() == Kind.CONNECTION_WORKER) {
return userClosed.get()
|| singleConnectionOrConnectionPool.connectionWorker().isConnectionInUnrecoverableState();
} else {
// With ConnectionPool, we will replace the bad connection automatically.
return userClosed.get();
}
}

/** Close the stream writer. Shut down all resources. */
@Override
public void close() {
userClosed.set(true);
singleConnectionOrConnectionPool.close(this);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.google.api.gax.grpc.testing.MockGrpcService;
import com.google.protobuf.AbstractMessage;
import io.grpc.ServerServiceDefinition;
import io.grpc.Status;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
Expand Down Expand Up @@ -102,4 +103,8 @@ public long getConnectionCount() {
public void setExecutor(ScheduledExecutorService executor) {
serviceImpl.setExecutor(executor);
}

public void setFailedStatus(Status failedStatus) {
serviceImpl.setFailedStatus(failedStatus);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class FakeBigQueryWriteImpl extends BigQueryWriteGrpc.BigQueryWriteImplBase {
// Record whether the first record has been seen on a connection.
private final Map<StreamObserver<AppendRowsResponse>, Boolean> connectionToFirstRequest =
new ConcurrentHashMap<>();
private Status failedStatus = Status.ABORTED;

/** Class used to save the state of a possible response. */
private static class Response {
Expand Down Expand Up @@ -138,6 +139,10 @@ public long getConnectionCount() {
return connectionCount;
}

public void setFailedStatus(Status failedStatus) {
this.failedStatus = failedStatus;
}

@Override
public StreamObserver<AppendRowsRequest> appendRows(
final StreamObserver<AppendRowsResponse> responseObserver) {
Expand Down Expand Up @@ -177,10 +182,10 @@ public void onNext(AppendRowsRequest value) {
&& recordCount % closeAfter == 0
&& (numberTimesToClose == 0 || connectionCount <= numberTimesToClose)) {
LOG.info("Shutting down connection from test...");
responseObserver.onError(Status.ABORTED.asException());
responseObserver.onError(failedStatus.asException());
} else if (closeForeverAfter > 0 && recordCount > closeForeverAfter) {
LOG.info("Shutting down connection from test...");
responseObserver.onError(Status.ABORTED.asException());
responseObserver.onError(failedStatus.asException());
} else {
final Response response = responses.get(offset);
sendResponse(response, responseObserver);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.api.gax.rpc.AbortedException;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.InvalidArgumentException;
import com.google.api.gax.rpc.StatusCode.Code;
import com.google.api.gax.rpc.UnknownException;
import com.google.cloud.bigquery.storage.test.Test.FooType;
Expand Down Expand Up @@ -1225,4 +1226,90 @@ public void testCloseDisconnectedStream() throws Exception {
// Ensure closing the writer after disconnect succeeds.
writer.close();
}

@Test(timeout = 10000)
public void testStreamWriterUserCloseMultiplexing() throws Exception {
StreamWriter writer =
StreamWriter.newBuilder(TEST_STREAM_1)
.setWriterSchema(createProtoSchema())
.setEnableConnectionPool(true)
.setLocation("us")
.build();

writer.close();
assertTrue(writer.isDone());
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
ExecutionException ex =
assertThrows(
ExecutionException.class,
() -> {
appendFuture1.get();
});
assertEquals(
Status.Code.FAILED_PRECONDITION,
((StatusRuntimeException) ex.getCause()).getStatus().getCode());
}

@Test(timeout = 10000)
public void testStreamWriterUserCloseNoMultiplexing() throws Exception {
StreamWriter writer =
StreamWriter.newBuilder(TEST_STREAM_1).setWriterSchema(createProtoSchema()).build();

writer.close();
assertTrue(writer.isDone());
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
ExecutionException ex =
assertThrows(
ExecutionException.class,
() -> {
appendFuture1.get();
});
assertEquals(
Status.Code.FAILED_PRECONDITION,
((StatusRuntimeException) ex.getCause()).getStatus().getCode());
}

@Test(timeout = 10000)
public void testStreamWriterPermanentErrorMultiplexing() throws Exception {
StreamWriter writer =
StreamWriter.newBuilder(TEST_STREAM_1, client)
.setWriterSchema(createProtoSchema())
.setEnableConnectionPool(true)
.setLocation("us")
.build();
testBigQueryWrite.setCloseForeverAfter(1);
// Permenant errror.
testBigQueryWrite.setFailedStatus(Status.INVALID_ARGUMENT);
testBigQueryWrite.addResponse(createAppendResponse(0));
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
appendFuture1.get();
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"A"});
ExecutionException ex = assertThrows(
ExecutionException.class,
() -> {
appendFuture2.get();
});
assertTrue(ex.getCause() instanceof InvalidArgumentException);
assertFalse(writer.isDone());
}

@Test(timeout = 10000)
public void testStreamWriterPermanentErrorNoMultiplexing() throws Exception {
StreamWriter writer =
StreamWriter.newBuilder(TEST_STREAM_1, client).setWriterSchema(createProtoSchema()).build();
testBigQueryWrite.setCloseForeverAfter(1);
// Permenant errror.
testBigQueryWrite.setFailedStatus(Status.INVALID_ARGUMENT);
testBigQueryWrite.addResponse(createAppendResponse(0));
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
appendFuture1.get();
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"A"});
ExecutionException ex = assertThrows(
ExecutionException.class,
() -> {
appendFuture2.get();
});
assertTrue(writer.isDone());
assertTrue(ex.getCause() instanceof InvalidArgumentException);
}
}