Skip to content

Commit

Permalink
feat: add connection worker skeleton used for multiplexing client (#1778
Browse files Browse the repository at this point in the history
)

* feat: Split writer into connection worker and wrapper, this is a
prerequisite for multiplexing client

* feat: add connection worker pool skeleton, used for multiplexing client
  • Loading branch information
GaoleMeng committed Sep 14, 2022
1 parent 7dd447d commit b26265e
Show file tree
Hide file tree
Showing 3 changed files with 196 additions and 2 deletions.
1 change: 1 addition & 0 deletions .github/.OwlBot.yaml
Expand Up @@ -78,6 +78,7 @@ deep-preserve-regex:
- "/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1.*/Waiter.java"
- "/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/Exceptions.java"
- "/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorker.java"
- "/google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1/ConnectionWorkerPool.java"

deep-copy-regex:
- source: "/google/cloud/bigquery/storage/(v.*)/.*-java/proto-google-.*/src"
Expand Down
21 changes: 19 additions & 2 deletions google-cloud-bigquerystorage/pom.xml
Expand Up @@ -44,6 +44,15 @@
<groupId>org.codehaus.mojo</groupId>
<artifactId>flatten-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<configuration>
<usedDependencies>
<dependency>com.google.auto.value:auto-value</dependency>
</usedDependencies>
</configuration>
</plugin>
</plugins>
</build>
<dependencies>
Expand All @@ -63,6 +72,16 @@
<groupId>com.google.api</groupId>
<artifactId>api-common</artifactId>
</dependency>
<dependency>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value</artifactId>
<version>1.9</version>
</dependency>
<dependency>
<groupId>com.google.auto.value</groupId>
<artifactId>auto-value-annotations</artifactId>
<version>1.9</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
Expand All @@ -71,7 +90,6 @@
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-common-protos</artifactId>
</dependency>

<dependency>
<groupId>com.google.api.grpc</groupId>
<artifactId>proto-google-cloud-bigquerystorage-v1beta1</artifactId>
Expand Down Expand Up @@ -134,7 +152,6 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.google.truth</groupId>
<artifactId>truth</artifactId>
Expand Down
@@ -0,0 +1,176 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.bigquery.storage.v1;

import com.google.api.core.ApiFuture;
import com.google.api.gax.batching.FlowController;
import com.google.auto.value.AutoValue;
import javax.annotation.concurrent.GuardedBy;

public class ConnectionWorkerPool {
/*
* Max allowed inflight requests in the stream. Method append is blocked at this.
*/
private final long maxInflightRequests;

/*
* Max allowed inflight bytes in the stream. Method append is blocked at this.
*/
private final long maxInflightBytes;

/*
* Behavior when inflight queue is exceeded. Only supports Block or Throw, default is Block.
*/
private final FlowController.LimitExceededBehavior limitExceededBehavior;

/*
* TraceId for debugging purpose.
*/
private final String traceId;

/*
* Tracks current inflight requests in the stream.
*/
@GuardedBy("lock")
private long inflightRequests = 0;

/*
* Tracks current inflight bytes in the stream.
*/
@GuardedBy("lock")
private long inflightBytes = 0;

/*
* Tracks how often the stream was closed due to a retriable error. Streaming will stop when the
* count hits a threshold. Streaming should only be halted, if it isn't possible to establish a
* connection. Keep track of the number of reconnections in succession. This will be reset if
* a row is successfully called back.
*/
@GuardedBy("lock")
private long conectionRetryCountWithoutCallback = 0;

/*
* If false, streamConnection needs to be reset.
*/
@GuardedBy("lock")
private boolean streamConnectionIsConnected = false;

/*
* A boolean to track if we cleaned up inflight queue.
*/
@GuardedBy("lock")
private boolean inflightCleanuped = false;

/*
* Indicates whether user has called Close() or not.
*/
@GuardedBy("lock")
private boolean userClosed = false;

/*
* The final status of connection. Set to nonnull when connection is permanently closed.
*/
@GuardedBy("lock")
private Throwable connectionFinalStatus = null;

/*
* Contains the updated TableSchema.
*/
@GuardedBy("lock")
private TableSchema updatedSchema;

/*
* A client used to interact with BigQuery.
*/
private BigQueryWriteClient client;

/*
* If true, the client above is created by this writer and should be closed.
*/
private boolean ownsBigQueryWriteClient = false;

/** Settings for connection pool. */
@AutoValue
public abstract static class Settings {
/**
* The minimum connections each pool created before trying to reuse the previously created
* connection in multiplexing mode.
*/
abstract int minConnectionsPerPool();

/** The maximum connections per connection pool. */
abstract int maxConnectionsPerPool();

public static Builder builder() {
return new AutoValue_ConnectionWorkerPool_Settings.Builder()
.setMinConnectionsPerPool(2)
.setMaxConnectionsPerPool(10);
}

/** Builder for the options to config {@link ConnectionWorkerPool}. */
@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setMinConnectionsPerPool(int value);

public abstract Builder setMaxConnectionsPerPool(int value);

public abstract Settings build();
}
}

/** Static setting for connection pool. */
private static Settings settings = Settings.builder().build();

public ConnectionWorkerPool(
long maxInflightRequests,
long maxInflightBytes,
FlowController.LimitExceededBehavior limitExceededBehavior,
String traceId,
BigQueryWriteClient client,
boolean ownsBigQueryWriteClient) {
this.maxInflightRequests = maxInflightRequests;
this.maxInflightBytes = maxInflightBytes;
this.limitExceededBehavior = limitExceededBehavior;
this.traceId = traceId;
this.client = client;
this.ownsBigQueryWriteClient = ownsBigQueryWriteClient;
}

/**
* Sets static connection pool options.
*
* <p>Note: this method should be triggered prior to the construction of connection pool.
*/
public static void setOptions(Settings settings) {
ConnectionWorkerPool.settings = settings;
}

/** Distributes the writing of a message to an underlying connection. */
public ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows) {
throw new RuntimeException("Append is not implemented!");
}

/** Distributes the writing of a message to an underlying connection. */
public ApiFuture<AppendRowsResponse> append(
StreamWriter streamWriter, ProtoRows rows, long offset) {
throw new RuntimeException("append with offset is not implemented on connection pool!");
}

/** Close the stream writer. Shut down all resources. */
public void close(StreamWriter streamWriter) {
throw new RuntimeException("close is implemented on connection pool");
}
}

0 comments on commit b26265e

Please sign in to comment.