Skip to content

Commit

Permalink
feat: wire connection pool to stream writer without implementing upda…
Browse files Browse the repository at this point in the history
…ted schema (#1790)

* feat: Split writer into connection worker and wrapper, this is a
prerequisite for multiplexing client

* feat: add connection worker pool skeleton, used for multiplexing client

* feat: add Load api for connection worker for multiplexing client

* feat: add multiplexing support to connection worker. We will treat every
new stream name as a switch of destinationt

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: port the multiplexing client core algorithm and basic tests
also fixed a tiny bug inside fake bigquery write impl for getting thre
response from offset

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: wire multiplexing connection pool to stream writer

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
GaoleMeng and gcf-owl-bot[bot] committed Sep 21, 2022
1 parent 6b3a974 commit 3eb1475
Show file tree
Hide file tree
Showing 4 changed files with 276 additions and 26 deletions.
Expand Up @@ -167,6 +167,7 @@ public static Builder builder() {
/** Builder for the options to config {@link ConnectionWorkerPool}. */
@AutoValue.Builder
public abstract static class Builder {
// TODO(gaole) rename to per location for easier understanding.
public abstract Builder setMinConnectionsPerPool(int value);

public abstract Builder setMaxConnectionsPerPool(int value);
Expand Down Expand Up @@ -387,4 +388,20 @@ int getCreateConnectionCount() {
int getTotalConnectionCount() {
return connectionWorkerPool.size();
}

String getTraceId() {
return traceId;
}

boolean ownsBigQueryWriteClient() {
return ownsBigQueryWriteClient;
}

FlowController.LimitExceededBehavior limitExceededBehavior() {
return limitExceededBehavior;
}

BigQueryWriteClient bigQueryWriteClient() {
return client;
}
}
Expand Up @@ -20,12 +20,19 @@
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.rpc.FixedHeaderProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.auto.value.AutoOneOf;
import com.google.auto.value.AutoValue;
import com.google.cloud.bigquery.storage.v1.StreamWriter.Builder.ConnectionMode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusRuntimeException;
import java.io.IOException;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Logger;

/**
Expand All @@ -36,8 +43,6 @@
public class StreamWriter implements AutoCloseable {
private static final Logger log = Logger.getLogger(StreamWriter.class.getName());

private final ConnectionWorker connectionWorker;

/*
* The identifier of stream to write to.
*/
Expand All @@ -51,11 +56,108 @@ public class StreamWriter implements AutoCloseable {
*/
private final String writerId = UUID.randomUUID().toString();

/**
* Stream can access a single connection or a pool of connection depending on whether multiplexing
* is enabled.
*/
private final SingleConnectionOrConnectionPool singleConnectionOrConnectionPool;

/**
* Static map from {@link ConnectionPoolKey} to connection pool. Note this map is static to be
* shared by every stream writer in the same process.
*/
private static final Map<ConnectionPoolKey, ConnectionWorkerPool> connectionPoolMap =
new ConcurrentHashMap<>();

/** The maximum size of one request. Defined by the API. */
public static long getApiMaxRequestBytes() {
return 10L * 1000L * 1000L; // 10 megabytes (https://en.wikipedia.org/wiki/Megabyte)
}

/**
* Connection pool with different key will be split.
*
* <p>Shard based only on location right now.
*/
@AutoValue
abstract static class ConnectionPoolKey {
abstract String location();

public static ConnectionPoolKey create(String location) {
return new AutoValue_StreamWriter_ConnectionPoolKey(location);
}
}

/**
* When in single table mode, append directly to connectionWorker. Otherwise append to connection
* pool in multiplexing mode.
*/
@AutoOneOf(SingleConnectionOrConnectionPool.Kind.class)
public abstract static class SingleConnectionOrConnectionPool {
/** Kind of connection operation mode. */
public enum Kind {
CONNECTION_WORKER,
CONNECTION_WORKER_POOL
}

public abstract Kind getKind();

public abstract ConnectionWorker connectionWorker();

public abstract ConnectionWorkerPool connectionWorkerPool();

public ApiFuture<AppendRowsResponse> append(
StreamWriter streamWriter, ProtoRows protoRows, long offset) {
if (getKind() == Kind.CONNECTION_WORKER) {
return connectionWorker()
.append(streamWriter.getStreamName(), streamWriter.getProtoSchema(), protoRows, offset);
} else {
return connectionWorkerPool().append(streamWriter, protoRows, offset);
}
}

public void close(StreamWriter streamWriter) {
if (getKind() == Kind.CONNECTION_WORKER) {
connectionWorker().close();
} else {
connectionWorkerPool().close(streamWriter);
}
}

long getInflightWaitSeconds() {
if (getKind() == Kind.CONNECTION_WORKER_POOL) {
throw new IllegalStateException(
"getInflightWaitSeconds is not supported in multiplexing mode.");
}
return connectionWorker().getInflightWaitSeconds();
}

TableSchema getUpdatedSchema() {
if (getKind() == Kind.CONNECTION_WORKER_POOL) {
// TODO(gaole): implement updated schema support for multiplexing.
throw new IllegalStateException("getUpdatedSchema is not implemented for multiplexing.");
}
return connectionWorker().getUpdatedSchema();
}

String getWriterId(String streamWriterId) {
if (getKind() == Kind.CONNECTION_WORKER_POOL) {
return streamWriterId;
}
return connectionWorker().getWriterId();
}

public static SingleConnectionOrConnectionPool ofSingleConnection(ConnectionWorker connection) {
return AutoOneOf_StreamWriter_SingleConnectionOrConnectionPool.connectionWorker(connection);
}

public static SingleConnectionOrConnectionPool ofConnectionPool(
ConnectionWorkerPool connectionPool) {
return AutoOneOf_StreamWriter_SingleConnectionOrConnectionPool.connectionWorkerPool(
connectionPool);
}
}

private StreamWriter(Builder builder) throws IOException {
BigQueryWriteClient client;
this.streamName = builder.streamName;
Expand All @@ -78,16 +180,66 @@ private StreamWriter(Builder builder) throws IOException {
client = builder.client;
ownsBigQueryWriteClient = false;
}
connectionWorker =
new ConnectionWorker(
builder.streamName,
builder.writerSchema,
builder.maxInflightRequest,
builder.maxInflightBytes,
builder.limitExceededBehavior,
builder.traceId,
client,
ownsBigQueryWriteClient);
if (builder.connectionMode == ConnectionMode.SINGLE_TABLE) {
this.singleConnectionOrConnectionPool =
SingleConnectionOrConnectionPool.ofSingleConnection(
new ConnectionWorker(
builder.streamName,
builder.writerSchema,
builder.maxInflightRequest,
builder.maxInflightBytes,
builder.limitExceededBehavior,
builder.traceId,
client,
ownsBigQueryWriteClient));
} else {
if (builder.location == "") {
throw new IllegalArgumentException("Location must be specified for multiplexing client!");
}
// Assume the connection in the same pool share the same client and trace id.
// The first StreamWriter for a new stub will create the pool for the other
// streams in the same region, meaning the per StreamWriter settings are no
// longer working unless all streams share the same set of settings
this.singleConnectionOrConnectionPool =
SingleConnectionOrConnectionPool.ofConnectionPool(
connectionPoolMap.computeIfAbsent(
ConnectionPoolKey.create(builder.location),
(key) ->
new ConnectionWorkerPool(
builder.maxInflightRequest,
builder.maxInflightBytes,
builder.limitExceededBehavior,
builder.traceId,
client,
ownsBigQueryWriteClient)));
validateFetchedConnectonPool(client, builder);
}
}

// Validate whether the fetched connection pool matched certain properties.
private void validateFetchedConnectonPool(
BigQueryWriteClient client, StreamWriter.Builder builder) {
String paramsValidatedFailed = "";
if (!Objects.equals(
this.singleConnectionOrConnectionPool.connectionWorkerPool().getTraceId(),
builder.traceId)) {
paramsValidatedFailed = "Trace id";
} else if (!Objects.equals(
this.singleConnectionOrConnectionPool.connectionWorkerPool().bigQueryWriteClient(),
client)) {
paramsValidatedFailed = "Bigquery write client";
} else if (!Objects.equals(
this.singleConnectionOrConnectionPool.connectionWorkerPool().limitExceededBehavior(),
builder.limitExceededBehavior)) {
paramsValidatedFailed = "Limit Exceeds Behavior";
}

if (!paramsValidatedFailed.isEmpty()) {
throw new IllegalArgumentException(
String.format(
"%s used for the same connection pool for the same location must be the same!",
paramsValidatedFailed));
}
}

/**
Expand Down Expand Up @@ -127,7 +279,7 @@ public ApiFuture<AppendRowsResponse> append(ProtoRows rows) {
* @return the append response wrapped in a future.
*/
public ApiFuture<AppendRowsResponse> append(ProtoRows rows, long offset) {
return this.connectionWorker.append(streamName, writerSchema, rows, offset);
return this.singleConnectionOrConnectionPool.append(this, rows, offset);
}

/**
Expand All @@ -139,12 +291,12 @@ public ApiFuture<AppendRowsResponse> append(ProtoRows rows, long offset) {
* stream case.
*/
public long getInflightWaitSeconds() {
return connectionWorker.getInflightWaitSeconds();
return singleConnectionOrConnectionPool.getInflightWaitSeconds();
}

/** @return a unique Id for the writer. */
public String getWriterId() {
return connectionWorker.getWriterId();
return singleConnectionOrConnectionPool.getWriterId(writerId);
}

/** @return name of the Stream that this writer is working on. */
Expand All @@ -160,7 +312,7 @@ public ProtoSchema getProtoSchema() {
/** Close the stream writer. Shut down all resources. */
@Override
public void close() {
this.connectionWorker.close();
singleConnectionOrConnectionPool.close(this);
}

/**
Expand All @@ -179,11 +331,28 @@ public static StreamWriter.Builder newBuilder(String streamName) {

/** Thread-safe getter of updated TableSchema */
public synchronized TableSchema getUpdatedSchema() {
return connectionWorker.getUpdatedSchema();
return singleConnectionOrConnectionPool.getUpdatedSchema();
}

@VisibleForTesting
SingleConnectionOrConnectionPool.Kind getConnectionOperationType() {
return singleConnectionOrConnectionPool.getKind();
}

/** A builder of {@link StreamWriter}s. */
public static final class Builder {
/** Operation mode for the internal connection pool. */
public enum ConnectionMode {
// Create a connection per given write stream.
SINGLE_TABLE,
// Share a connection for multiple tables. This mode is only effective in default stream case.
// Some key characteristics:
// 1. tables within the same pool has to be in the same location.
// 2. Close(streamReference) will not close connection immediately until all tables on
// this connection is closed.
// 3. Try to use one stream per table at first and share stream later.
MULTIPLEXING
}

private static final long DEFAULT_MAX_INFLIGHT_REQUESTS = 1000L;

Expand All @@ -210,10 +379,14 @@ public static final class Builder {
private FlowController.LimitExceededBehavior limitExceededBehavior =
FlowController.LimitExceededBehavior.Block;

private ConnectionMode connectionMode = ConnectionMode.SINGLE_TABLE;

private String traceId = null;

private TableSchema updatedTableSchema = null;

private String location;

private Builder(String streamName) {
this.streamName = Preconditions.checkNotNull(streamName);
this.client = null;
Expand Down Expand Up @@ -246,6 +419,11 @@ public Builder setEndpoint(String endpoint) {
return this;
}

public Builder enableConnectionPool() {
this.connectionMode = ConnectionMode.MULTIPLEXING;
return this;
}

/**
* {@code ChannelProvider} to use to create Channels, which must point at Cloud BigQuery Storage
* API endpoint.
Expand Down Expand Up @@ -280,6 +458,12 @@ public Builder setTraceId(String traceId) {
return this;
}

/** Location of the table this stream writer is targeting. */
public Builder setLocation(String location) {
this.location = location;
return this;
}

/**
* Sets the limit exceeded behavior.
*
Expand Down

0 comments on commit 3eb1475

Please sign in to comment.