diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java index 8cfd30a800..7b0d3a2964 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java @@ -167,6 +167,7 @@ public static Builder builder() { /** Builder for the options to config {@link ConnectionWorkerPool}. */ @AutoValue.Builder public abstract static class Builder { + // TODO(gaole) rename to per location for easier understanding. public abstract Builder setMinConnectionsPerPool(int value); public abstract Builder setMaxConnectionsPerPool(int value); @@ -387,4 +388,20 @@ int getCreateConnectionCount() { int getTotalConnectionCount() { return connectionWorkerPool.size(); } + + String getTraceId() { + return traceId; + } + + boolean ownsBigQueryWriteClient() { + return ownsBigQueryWriteClient; + } + + FlowController.LimitExceededBehavior limitExceededBehavior() { + return limitExceededBehavior; + } + + BigQueryWriteClient bigQueryWriteClient() { + return client; + } } diff --git a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java index e869668818..180ee81d94 100644 --- a/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java +++ b/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/StreamWriter.java @@ -20,12 +20,19 @@ import com.google.api.gax.core.CredentialsProvider; import com.google.api.gax.rpc.FixedHeaderProvider; import com.google.api.gax.rpc.TransportChannelProvider; +import com.google.auto.value.AutoOneOf; +import com.google.auto.value.AutoValue; +import com.google.cloud.bigquery.storage.v1.StreamWriter.Builder.ConnectionMode; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.StatusRuntimeException; import java.io.IOException; +import java.util.Map; +import java.util.Objects; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.logging.Logger; /** @@ -36,8 +43,6 @@ public class StreamWriter implements AutoCloseable { private static final Logger log = Logger.getLogger(StreamWriter.class.getName()); - private final ConnectionWorker connectionWorker; - /* * The identifier of stream to write to. */ @@ -51,11 +56,108 @@ public class StreamWriter implements AutoCloseable { */ private final String writerId = UUID.randomUUID().toString(); + /** + * Stream can access a single connection or a pool of connection depending on whether multiplexing + * is enabled. + */ + private final SingleConnectionOrConnectionPool singleConnectionOrConnectionPool; + + /** + * Static map from {@link ConnectionPoolKey} to connection pool. Note this map is static to be + * shared by every stream writer in the same process. + */ + private static final Map connectionPoolMap = + new ConcurrentHashMap<>(); + /** The maximum size of one request. Defined by the API. */ public static long getApiMaxRequestBytes() { return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte) } + /** + * Connection pool with different key will be split. + * + *

Shard based only on location right now. + */ + @AutoValue + abstract static class ConnectionPoolKey { + abstract String location(); + + public static ConnectionPoolKey create(String location) { + return new AutoValue_StreamWriter_ConnectionPoolKey(location); + } + } + + /** + * When in single table mode, append directly to connectionWorker. Otherwise append to connection + * pool in multiplexing mode. + */ + @AutoOneOf(SingleConnectionOrConnectionPool.Kind.class) + public abstract static class SingleConnectionOrConnectionPool { + /** Kind of connection operation mode. */ + public enum Kind { + CONNECTION_WORKER, + CONNECTION_WORKER_POOL + } + + public abstract Kind getKind(); + + public abstract ConnectionWorker connectionWorker(); + + public abstract ConnectionWorkerPool connectionWorkerPool(); + + public ApiFuture append( + StreamWriter streamWriter, ProtoRows protoRows, long offset) { + if (getKind() == Kind.CONNECTION_WORKER) { + return connectionWorker() + .append(streamWriter.getStreamName(), streamWriter.getProtoSchema(), protoRows, offset); + } else { + return connectionWorkerPool().append(streamWriter, protoRows, offset); + } + } + + public void close(StreamWriter streamWriter) { + if (getKind() == Kind.CONNECTION_WORKER) { + connectionWorker().close(); + } else { + connectionWorkerPool().close(streamWriter); + } + } + + long getInflightWaitSeconds() { + if (getKind() == Kind.CONNECTION_WORKER_POOL) { + throw new IllegalStateException( + "getInflightWaitSeconds is not supported in multiplexing mode."); + } + return connectionWorker().getInflightWaitSeconds(); + } + + TableSchema getUpdatedSchema() { + if (getKind() == Kind.CONNECTION_WORKER_POOL) { + // TODO(gaole): implement updated schema support for multiplexing. + throw new IllegalStateException("getUpdatedSchema is not implemented for multiplexing."); + } + return connectionWorker().getUpdatedSchema(); + } + + String getWriterId(String streamWriterId) { + if (getKind() == Kind.CONNECTION_WORKER_POOL) { + return streamWriterId; + } + return connectionWorker().getWriterId(); + } + + public static SingleConnectionOrConnectionPool ofSingleConnection(ConnectionWorker connection) { + return AutoOneOf_StreamWriter_SingleConnectionOrConnectionPool.connectionWorker(connection); + } + + public static SingleConnectionOrConnectionPool ofConnectionPool( + ConnectionWorkerPool connectionPool) { + return AutoOneOf_StreamWriter_SingleConnectionOrConnectionPool.connectionWorkerPool( + connectionPool); + } + } + private StreamWriter(Builder builder) throws IOException { BigQueryWriteClient client; this.streamName = builder.streamName; @@ -78,16 +180,66 @@ private StreamWriter(Builder builder) throws IOException { client = builder.client; ownsBigQueryWriteClient = false; } - connectionWorker = - new ConnectionWorker( - builder.streamName, - builder.writerSchema, - builder.maxInflightRequest, - builder.maxInflightBytes, - builder.limitExceededBehavior, - builder.traceId, - client, - ownsBigQueryWriteClient); + if (builder.connectionMode == ConnectionMode.SINGLE_TABLE) { + this.singleConnectionOrConnectionPool = + SingleConnectionOrConnectionPool.ofSingleConnection( + new ConnectionWorker( + builder.streamName, + builder.writerSchema, + builder.maxInflightRequest, + builder.maxInflightBytes, + builder.limitExceededBehavior, + builder.traceId, + client, + ownsBigQueryWriteClient)); + } else { + if (builder.location == "") { + throw new IllegalArgumentException("Location must be specified for multiplexing client!"); + } + // Assume the connection in the same pool share the same client and trace id. + // The first StreamWriter for a new stub will create the pool for the other + // streams in the same region, meaning the per StreamWriter settings are no + // longer working unless all streams share the same set of settings + this.singleConnectionOrConnectionPool = + SingleConnectionOrConnectionPool.ofConnectionPool( + connectionPoolMap.computeIfAbsent( + ConnectionPoolKey.create(builder.location), + (key) -> + new ConnectionWorkerPool( + builder.maxInflightRequest, + builder.maxInflightBytes, + builder.limitExceededBehavior, + builder.traceId, + client, + ownsBigQueryWriteClient))); + validateFetchedConnectonPool(client, builder); + } + } + + // Validate whether the fetched connection pool matched certain properties. + private void validateFetchedConnectonPool( + BigQueryWriteClient client, StreamWriter.Builder builder) { + String paramsValidatedFailed = ""; + if (!Objects.equals( + this.singleConnectionOrConnectionPool.connectionWorkerPool().getTraceId(), + builder.traceId)) { + paramsValidatedFailed = "Trace id"; + } else if (!Objects.equals( + this.singleConnectionOrConnectionPool.connectionWorkerPool().bigQueryWriteClient(), + client)) { + paramsValidatedFailed = "Bigquery write client"; + } else if (!Objects.equals( + this.singleConnectionOrConnectionPool.connectionWorkerPool().limitExceededBehavior(), + builder.limitExceededBehavior)) { + paramsValidatedFailed = "Limit Exceeds Behavior"; + } + + if (!paramsValidatedFailed.isEmpty()) { + throw new IllegalArgumentException( + String.format( + "%s used for the same connection pool for the same location must be the same!", + paramsValidatedFailed)); + } } /** @@ -127,7 +279,7 @@ public ApiFuture append(ProtoRows rows) { * @return the append response wrapped in a future. */ public ApiFuture append(ProtoRows rows, long offset) { - return this.connectionWorker.append(streamName, writerSchema, rows, offset); + return this.singleConnectionOrConnectionPool.append(this, rows, offset); } /** @@ -139,12 +291,12 @@ public ApiFuture append(ProtoRows rows, long offset) { * stream case. */ public long getInflightWaitSeconds() { - return connectionWorker.getInflightWaitSeconds(); + return singleConnectionOrConnectionPool.getInflightWaitSeconds(); } /** @return a unique Id for the writer. */ public String getWriterId() { - return connectionWorker.getWriterId(); + return singleConnectionOrConnectionPool.getWriterId(writerId); } /** @return name of the Stream that this writer is working on. */ @@ -160,7 +312,7 @@ public ProtoSchema getProtoSchema() { /** Close the stream writer. Shut down all resources. */ @Override public void close() { - this.connectionWorker.close(); + singleConnectionOrConnectionPool.close(this); } /** @@ -179,11 +331,28 @@ public static StreamWriter.Builder newBuilder(String streamName) { /** Thread-safe getter of updated TableSchema */ public synchronized TableSchema getUpdatedSchema() { - return connectionWorker.getUpdatedSchema(); + return singleConnectionOrConnectionPool.getUpdatedSchema(); + } + + @VisibleForTesting + SingleConnectionOrConnectionPool.Kind getConnectionOperationType() { + return singleConnectionOrConnectionPool.getKind(); } /** A builder of {@link StreamWriter}s. */ public static final class Builder { + /** Operation mode for the internal connection pool. */ + public enum ConnectionMode { + // Create a connection per given write stream. + SINGLE_TABLE, + // Share a connection for multiple tables. This mode is only effective in default stream case. + // Some key characteristics: + // 1. tables within the same pool has to be in the same location. + // 2. Close(streamReference) will not close connection immediately until all tables on + // this connection is closed. + // 3. Try to use one stream per table at first and share stream later. + MULTIPLEXING + } private static final long DEFAULT_MAX_INFLIGHT_REQUESTS = 1000L; @@ -210,10 +379,14 @@ public static final class Builder { private FlowController.LimitExceededBehavior limitExceededBehavior = FlowController.LimitExceededBehavior.Block; + private ConnectionMode connectionMode = ConnectionMode.SINGLE_TABLE; + private String traceId = null; private TableSchema updatedTableSchema = null; + private String location; + private Builder(String streamName) { this.streamName = Preconditions.checkNotNull(streamName); this.client = null; @@ -246,6 +419,11 @@ public Builder setEndpoint(String endpoint) { return this; } + public Builder enableConnectionPool() { + this.connectionMode = ConnectionMode.MULTIPLEXING; + return this; + } + /** * {@code ChannelProvider} to use to create Channels, which must point at Cloud BigQuery Storage * API endpoint. @@ -280,6 +458,12 @@ public Builder setTraceId(String traceId) { return this; } + /** Location of the table this stream writer is targeting. */ + public Builder setLocation(String location) { + this.location = location; + return this; + } + /** * Sets the limit exceeded behavior. * diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java index 1bc180b814..8b865eb13a 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPoolTest.java @@ -73,7 +73,8 @@ public void setUp() throws Exception { @Test public void testSingleTableConnection_noOverwhelmedConnection() throws Exception { // Set the max requests count to a large value so we will not scaling up. - testSend100RequestsToMultiTable( + testSendRequestsToMultiTable( + /*requestToSend=*/ 100, /*maxRequests=*/ 100000, /*maxConnections=*/ 8, /*expectedConnectionCount=*/ 1, @@ -83,7 +84,8 @@ public void testSingleTableConnection_noOverwhelmedConnection() throws Exception @Test public void testSingleTableConnections_overwhelmed() throws Exception { // A connection will be considered overwhelmed when the requests count reach 5 (max 10). - testSend100RequestsToMultiTable( + testSendRequestsToMultiTable( + /*requestToSend=*/ 100, /*maxRequests=*/ 10, /*maxConnections=*/ 8, /*expectedConnectionCount=*/ 8, @@ -94,7 +96,8 @@ public void testSingleTableConnections_overwhelmed() throws Exception { public void testMultiTableConnection_noOverwhelmedConnection() throws Exception { // Set the max requests count to a large value so we will not scaling up. // All tables will share the two connections (2 becasue we set the min connections to be 2). - testSend100RequestsToMultiTable( + testSendRequestsToMultiTable( + /*requestToSend=*/ 100, /*maxRequests=*/ 100000, /*maxConnections=*/ 8, /*expectedConnectionCount=*/ 2, @@ -102,17 +105,44 @@ public void testMultiTableConnection_noOverwhelmedConnection() throws Exception } @Test - public void testMultiTableConnections_overwhelmed() throws Exception { + public void testMultiTableConnections_overwhelmed_reachingMaximum() throws Exception { // A connection will be considered overwhelmed when the requests count reach 5 (max 10). - testSend100RequestsToMultiTable( + testSendRequestsToMultiTable( + /*requestToSend=*/ 100, /*maxRequests=*/ 10, /*maxConnections=*/ 8, /*expectedConnectionCount=*/ 8, /*tableCount=*/ 4); } - private void testSend100RequestsToMultiTable( - int maxRequests, int maxConnections, int expectedConnectionCount, int tableCount) + @Test + public void testMultiTableConnections_overwhelmed_overTotalLimit() throws Exception { + // A connection will be considered overwhelmed when the requests count reach 5 (max 10). + testSendRequestsToMultiTable( + /*requestToSend=*/ 200, + /*maxRequests=*/ 10, + /*maxConnections=*/ 8, + /*expectedConnectionCount=*/ 8, + /*tableCount=*/ 10); + } + + @Test + public void testMultiTableConnections_overwhelmed_notReachingMaximum() throws Exception { + // A connection will be considered overwhelmed when the requests count reach 5 (max 10). + testSendRequestsToMultiTable( + /*requestToSend=*/ 20, + /*maxRequests=*/ 10, + /*maxConnections=*/ 8, + /*expectedConnectionCount=*/ 4, + /*tableCount=*/ 4); + } + + private void testSendRequestsToMultiTable( + int requestToSend, + int maxRequests, + int maxConnections, + int expectedConnectionCount, + int tableCount) throws IOException, ExecutionException, InterruptedException { ConnectionWorkerPool.setOptions( Settings.builder() @@ -126,7 +156,7 @@ private void testSend100RequestsToMultiTable( testBigQueryWrite.setResponseSleep(Duration.ofMillis(50L)); // Try append 100 requests. - long appendCount = 100; + long appendCount = requestToSend; for (long i = 0; i < appendCount; i++) { testBigQueryWrite.addResponse(createAppendResponse(i)); } diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java index ef50c40977..04725ba97b 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/StreamWriterTest.java @@ -30,6 +30,7 @@ import com.google.api.gax.rpc.UnknownException; import com.google.cloud.bigquery.storage.test.Test.FooType; import com.google.cloud.bigquery.storage.v1.StorageError.StorageErrorCode; +import com.google.cloud.bigquery.storage.v1.StreamWriter.SingleConnectionOrConnectionPool.Kind; import com.google.common.base.Strings; import com.google.protobuf.Any; import com.google.protobuf.DescriptorProtos; @@ -90,6 +91,15 @@ public void tearDown() throws Exception { serviceHelper.stop(); } + private StreamWriter getMultiplexingTestStreamWriter() throws IOException { + return StreamWriter.newBuilder(TEST_STREAM, client) + .setWriterSchema(createProtoSchema()) + .setTraceId(TEST_TRACE_ID) + .setLocation("US") + .enableConnectionPool() + .build(); + } + private StreamWriter getTestStreamWriter() throws IOException { return StreamWriter.newBuilder(TEST_STREAM, client) .setWriterSchema(createProtoSchema()) @@ -196,7 +206,6 @@ private void verifyAppendRequests(long appendCount) { } } - @Test public void testBuildBigQueryWriteClientInWriter() throws Exception { StreamWriter writer = StreamWriter.newBuilder(TEST_STREAM) @@ -703,6 +712,16 @@ public void testWriterId() Assert.assertNotEquals(writer1.getWriterId(), writer2.getWriterId()); } + @Test + public void testInitialization_operationKind() throws Exception { + try (StreamWriter streamWriter = getMultiplexingTestStreamWriter()) { + Assert.assertEquals(streamWriter.getConnectionOperationType(), Kind.CONNECTION_WORKER_POOL); + } + try (StreamWriter streamWriter = getTestStreamWriter()) { + Assert.assertEquals(streamWriter.getConnectionOperationType(), Kind.CONNECTION_WORKER); + } + } + // Timeout to ensure close() doesn't wait for done callback timeout. @Test(timeout = 10000) public void testCloseDisconnectedStream() throws Exception {