Skip to content

Commit

Permalink
feat: add multiplexing client core algorithm and basic testing, plus …
Browse files Browse the repository at this point in the history
…fix a tiny bug in fake server (#1787)

* feat: Split writer into connection worker and wrapper, this is a
prerequisite for multiplexing client

* feat: add connection worker pool skeleton, used for multiplexing client

* feat: add Load api for connection worker for multiplexing client

* feat: add multiplexing support to connection worker. We will treat every
new stream name as a switch of destinationt

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: port the multiplexing client core algorithm and basic tests
also fixed a tiny bug inside fake bigquery write impl for getting thre
response from offset

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
GaoleMeng and gcf-owl-bot[bot] committed Sep 20, 2022
1 parent a869a1d commit 1bb8e26
Show file tree
Hide file tree
Showing 5 changed files with 427 additions and 9 deletions.
2 changes: 1 addition & 1 deletion README.md
Expand Up @@ -49,7 +49,7 @@ If you are using Maven without BOM, add this to your dependencies:
If you are using Gradle 5.x or later, add this to your dependencies:

```Groovy
implementation platform('com.google.cloud:libraries-bom:26.1.1')
implementation platform('com.google.cloud:libraries-bom:26.1.2')
implementation 'com.google.cloud:google-cloud-bigquerystorage'
```
Expand Down
Expand Up @@ -18,9 +18,26 @@
import com.google.api.core.ApiFuture;
import com.google.api.gax.batching.FlowController;
import com.google.auto.value.AutoValue;
import com.google.cloud.bigquery.storage.v1.ConnectionWorker.Load;
import com.google.common.base.Stopwatch;
import com.google.common.collect.ImmutableList;
import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger;
import javax.annotation.concurrent.GuardedBy;

/** Pool of connections to accept appends and distirbute to different connections. */
public class ConnectionWorkerPool {
private static final Logger log = Logger.getLogger(ConnectionWorkerPool.class.getName());
/*
* Max allowed inflight requests in the stream. Method append is blocked at this.
*/
Expand All @@ -36,11 +53,29 @@ public class ConnectionWorkerPool {
*/
private final FlowController.LimitExceededBehavior limitExceededBehavior;

/** Map from write stream to corresponding connection. */
private final Map<StreamWriter, ConnectionWorker> streamWriterToConnection =
new ConcurrentHashMap<>();

/** Map from a connection to a set of write stream that have sent requests onto it. */
private final Map<ConnectionWorker, Set<StreamWriter>> connectionToWriteStream =
new ConcurrentHashMap<>();

/** Collection of all the created connections. */
private final Set<ConnectionWorker> connectionWorkerPool =
Collections.synchronizedSet(new HashSet<>());

/** Enable test related logic. */
private static boolean enableTesting = false;

/*
* TraceId for debugging purpose.
*/
private final String traceId;

/** Used for test on the number of times createWorker is called. */
private final AtomicInteger testValueCreateConnectionCount = new AtomicInteger(0);

/*
* Tracks current inflight requests in the stream.
*/
Expand Down Expand Up @@ -102,6 +137,15 @@ public class ConnectionWorkerPool {
*/
private boolean ownsBigQueryWriteClient = false;

/**
* The current maximum connection count. This value is gradually increased till the user defined
* maximum connection count.
*/
private int currentMaxConnectionCount;

/** Lock for controlling concurrent operation on add / delete connections. */
private final Lock lock = new ReentrantLock();

/** Settings for connection pool. */
@AutoValue
public abstract static class Settings {
Expand Down Expand Up @@ -147,6 +191,7 @@ public ConnectionWorkerPool(
this.traceId = traceId;
this.client = client;
this.ownsBigQueryWriteClient = ownsBigQueryWriteClient;
this.currentMaxConnectionCount = settings.minConnectionsPerPool();
}

/**
Expand All @@ -160,13 +205,149 @@ public static void setOptions(Settings settings) {

/** Distributes the writing of a message to an underlying connection. */
public ApiFuture<AppendRowsResponse> append(StreamWriter streamWriter, ProtoRows rows) {
throw new RuntimeException("Append is not implemented!");
return append(streamWriter, rows, -1);
}

/** Distributes the writing of a message to an underlying connection. */
public ApiFuture<AppendRowsResponse> append(
StreamWriter streamWriter, ProtoRows rows, long offset) {
throw new RuntimeException("append with offset is not implemented on connection pool!");
// We are in multiplexing mode after entering the following logic.
ConnectionWorker connectionWorker =
streamWriterToConnection.compute(
streamWriter,
(key, existingStream) -> {
// Though compute on concurrent map is atomic, we still do explicit locking as we
// may have concurrent close(...) triggered.
lock.lock();
try {
// Stick to the existing stream if it's not overwhelmed.
if (existingStream != null && !existingStream.getLoad().isOverwhelmed()) {
return existingStream;
}
// Try to create or find another existing stream to reuse.
ConnectionWorker createdOrExistingConnection = null;
try {
createdOrExistingConnection =
createOrReuseConnectionWorker(streamWriter, existingStream);
} catch (IOException e) {
throw new IllegalStateException(e);
}
// Update connection to write stream relationship.
connectionToWriteStream.computeIfAbsent(
createdOrExistingConnection, (ConnectionWorker k) -> new HashSet<>());
connectionToWriteStream.get(createdOrExistingConnection).add(streamWriter);
return createdOrExistingConnection;
} finally {
lock.unlock();
}
});
Stopwatch stopwatch = Stopwatch.createStarted();
ApiFuture<AppendRowsResponse> responseFuture =
connectionWorker.append(
streamWriter.getStreamName(), streamWriter.getProtoSchema(), rows, offset);
return responseFuture;
}

/**
* Create a new connection if we haven't reached current maximum, or reuse an existing connection
* with least load.
*/
private ConnectionWorker createOrReuseConnectionWorker(
StreamWriter streamWriter, ConnectionWorker existingConnectionWorker) throws IOException {
String streamReference = streamWriter.getStreamName();
if (connectionWorkerPool.size() < currentMaxConnectionCount) {
// Always create a new connection if we haven't reached current maximum.
return createConnectionWorker(streamWriter.getStreamName(), streamWriter.getProtoSchema());
} else {
ConnectionWorker existingBestConnection =
pickBestLoadConnection(
enableTesting ? Load.TEST_LOAD_COMPARATOR : Load.LOAD_COMPARATOR,
ImmutableList.copyOf(connectionWorkerPool));
if (!existingBestConnection.getLoad().isOverwhelmed()) {
return existingBestConnection;
} else if (currentMaxConnectionCount < settings.maxConnectionsPerPool()) {
// At this point, we have reached the connection cap and the selected connection is
// overwhelmed, we can try scale up the connection pool.
// The connection count will go up one by one until `maxConnectionsPerPool` is reached.
currentMaxConnectionCount += 1;
if (currentMaxConnectionCount > settings.maxConnectionsPerPool()) {
currentMaxConnectionCount = settings.maxConnectionsPerPool();
}
return createConnectionWorker(streamWriter.getStreamName(), streamWriter.getProtoSchema());
} else {
// Stick to the original connection if all the connections are overwhelmed.
if (existingConnectionWorker != null) {
return existingConnectionWorker;
}
// If we are at this branch, it means we reached the maximum connections.
return existingBestConnection;
}
}
}

/** Select out the best connection worker among the given connection workers. */
static ConnectionWorker pickBestLoadConnection(
Comparator<Load> comparator, List<ConnectionWorker> connectionWorkerList) {
if (connectionWorkerList.isEmpty()) {
throw new IllegalStateException(
String.format(
"Bug in code! At least one connection worker should be passed in "
+ "pickSemiBestLoadConnection(...)"));
}
// Compare all connection workers to find the connection worker with the smallest load.
// Loop and find the connection with the least load.
// The load comparision and computation process
int currentBestIndex = 0;
Load currentBestLoad = connectionWorkerList.get(currentBestIndex).getLoad();
for (int i = 1; i < connectionWorkerList.size(); i++) {
Load loadToCompare = connectionWorkerList.get(i).getLoad();
if (comparator.compare(loadToCompare, currentBestLoad) <= 0) {
currentBestIndex = i;
currentBestLoad = loadToCompare;
}
}
return connectionWorkerList.get(currentBestIndex);
}

/**
* Creates a single connection worker.
*
* <p>Note this function need to be thread-safe across different stream reference but no need for
* a single stream reference. This is because createConnectionWorker(...) is called via
* computeIfAbsent(...) which is at most once per key.
*/
private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema writeSchema)
throws IOException {
if (enableTesting) {
// Though atomic integer is super lightweight, add extra if check in case adding future logic.
testValueCreateConnectionCount.getAndIncrement();
}
ConnectionWorker connectionWorker =
new ConnectionWorker(
streamName,
writeSchema,
maxInflightRequests,
maxInflightBytes,
limitExceededBehavior,
traceId,
client,
ownsBigQueryWriteClient);
connectionWorkerPool.add(connectionWorker);
log.info(
String.format(
"Scaling up new connection for stream name: %s, pool size after scaling up %s",
streamName, connectionWorkerPool.size()));
return connectionWorker;
}

/** Enable Test related logic. */
public static void enableTestingLogic() {
enableTesting = true;
}

/** Returns how many times createConnectionWorker(...) is called. */
int getCreateConnectionCount() {
return testValueCreateConnectionCount.get();
}

/** Close the stream writer. Shut down all resources. */
Expand Down
Expand Up @@ -152,6 +152,11 @@ public String getStreamName() {
return streamName;
}

/** @return the passed in user schema. */
public ProtoSchema getProtoSchema() {
return writerSchema;
}

/** Close the stream writer. Shut down all resources. */
@Override
public void close() {
Expand Down

0 comments on commit 1bb8e26

Please sign in to comment.