Skip to content

Commit

Permalink
feat: add close() to multiplexing client (#1788)
Browse files Browse the repository at this point in the history
* feat: Split writer into connection worker and wrapper, this is a
prerequisite for multiplexing client

* feat: add connection worker pool skeleton, used for multiplexing client

* feat: add Load api for connection worker for multiplexing client

* feat: add multiplexing support to connection worker. We will treat every
new stream name as a switch of destinationt

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* feat: port the multiplexing client core algorithm and basic tests
also fixed a tiny bug inside fake bigquery write impl for getting thre
response from offset

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
GaoleMeng and gcf-owl-bot[bot] committed Sep 20, 2022
1 parent bd24452 commit 6b3a974
Show file tree
Hide file tree
Showing 2 changed files with 154 additions and 5 deletions.
Expand Up @@ -340,6 +340,40 @@ private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema w
return connectionWorker;
}

/**
* Reports the close of the given write stream.
*
* <p>The corresponding worker is not closed until there is no stream reference is targeting to
* that worker.
*/
public void close(StreamWriter streamWriter) {
lock.lock();
try {
streamWriterToConnection.remove(streamWriter);
// Since it's possible some other connections may have served this writeStream, we
// iterate and see whether it's also fine to close other connections.
Set<ConnectionWorker> connectionToRemove = new HashSet<>();
for (ConnectionWorker connectionWorker : connectionToWriteStream.keySet()) {
if (connectionToWriteStream.containsKey(connectionWorker)) {
connectionToWriteStream.get(connectionWorker).remove(streamWriter);
if (connectionToWriteStream.get(connectionWorker).isEmpty()) {
connectionWorker.close();
connectionWorkerPool.remove(connectionWorker);
connectionToRemove.add(connectionWorker);
}
}
}
log.info(
String.format(
"During closing of writeStream for %s with writer id %s, we decided to close %s "
+ "connections",
streamWriter.getStreamName(), streamWriter.getWriterId(), connectionToRemove.size()));
connectionToWriteStream.keySet().removeAll(connectionToRemove);
} finally {
lock.unlock();
}
}

/** Enable Test related logic. */
public static void enableTestingLogic() {
enableTesting = true;
Expand All @@ -350,8 +384,7 @@ int getCreateConnectionCount() {
return testValueCreateConnectionCount.get();
}

/** Close the stream writer. Shut down all resources. */
public void close(StreamWriter streamWriter) {
throw new RuntimeException("close is implemented on connection pool");
int getTotalConnectionCount() {
return connectionWorkerPool.size();
}
}
Expand Up @@ -24,6 +24,7 @@
import com.google.api.gax.grpc.testing.MockServiceHelper;
import com.google.cloud.bigquery.storage.test.Test.FooType;
import com.google.cloud.bigquery.storage.v1.ConnectionWorkerPool.Settings;
import com.google.common.util.concurrent.Uninterruptibles;
import com.google.protobuf.DescriptorProtos;
import com.google.protobuf.Int64Value;
import java.io.IOException;
Expand All @@ -33,6 +34,7 @@
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
Expand Down Expand Up @@ -112,10 +114,13 @@ public void testMultiTableConnections_overwhelmed() throws Exception {
private void testSend100RequestsToMultiTable(
int maxRequests, int maxConnections, int expectedConnectionCount, int tableCount)
throws IOException, ExecutionException, InterruptedException {
ConnectionWorkerPool.setOptions(
Settings.builder()
.setMinConnectionsPerPool(2)
.setMaxConnectionsPerPool(maxConnections)
.build());
ConnectionWorkerPool connectionWorkerPool =
createConnectionWorkerPool(maxRequests, /*maxBytes=*/ 100000);
ConnectionWorkerPool.setOptions(
Settings.builder().setMaxConnectionsPerPool(maxConnections).build());

// Sets the sleep time to simulate requests stuck in connection.
testBigQueryWrite.setResponseSleep(Duration.ofMillis(50L));
Expand Down Expand Up @@ -163,6 +168,117 @@ private void testSend100RequestsToMultiTable(
assertThat(offsets.size()).isEqualTo(appendCount);
}

@Test
public void testMultiStreamClosed_multiplexingEnabled() throws Exception {
ConnectionWorkerPool.setOptions(
Settings.builder().setMaxConnectionsPerPool(10).setMinConnectionsPerPool(5).build());
ConnectionWorkerPool connectionWorkerPool =
createConnectionWorkerPool(/*maxRequests=*/ 3, /*maxBytes=*/ 1000);

// Sets the sleep time to simulate requests stuck in connection.
testBigQueryWrite.setResponseSleep(Duration.ofMillis(50L));
StreamWriter writeStream1 = getTestStreamWriter(TEST_STREAM_1);
StreamWriter writeStream2 = getTestStreamWriter(TEST_STREAM_2);

// Try append 20 requests, at the end we should have 2 requests per connection.
long appendCount = 20;
for (long i = 0; i < appendCount; i++) {
testBigQueryWrite.addResponse(createAppendResponse(i));
}
List<ApiFuture<?>> futures = new ArrayList<>();

// We will start inserting to two tables interferely.
// The final status of each connection queue will be
// (s1 is the request coming from writeStream 1, etc):
// c1: [s1, s1], c2: [s2, s2], c3: [s1, s1], c4: [s2, s2]
// c5 - c10: [s1, s2]
for (int i = 0; i < appendCount; i++) {
StreamWriter writeStream = i % 2 == 0 ? writeStream1 : writeStream2;
futures.add(
sendFooStringTestMessage(
writeStream, connectionWorkerPool, new String[] {String.valueOf(i)}, i));
}

for (ApiFuture<?> future : futures) {
future.get();
}
// At the end we should scale up to 10 connections.
assertThat(connectionWorkerPool.getCreateConnectionCount()).isEqualTo(10);
assertThat(connectionWorkerPool.getTotalConnectionCount()).isEqualTo(10);

// Start testing calling close on each stream.
// When we close the first stream, only the connection that only serve stream 1 will be closed.
// for which c1 and c3 are closed.
connectionWorkerPool.close(writeStream1);
assertThat(connectionWorkerPool.getTotalConnectionCount()).isEqualTo(8);

// The next time we call close, every connection will be closed.
connectionWorkerPool.close(writeStream2);
assertThat(connectionWorkerPool.getTotalConnectionCount()).isEqualTo(0);
}

@Test
public void testMultiStreamAppend_appendWhileClosing() throws Exception {
ConnectionWorkerPool.setOptions(
Settings.builder().setMaxConnectionsPerPool(10).setMinConnectionsPerPool(5).build());
ConnectionWorkerPool connectionWorkerPool =
createConnectionWorkerPool(/*maxRequests=*/ 3, /*maxBytes=*/ 100000);

// Sets the sleep time to simulate requests stuck in connection.
testBigQueryWrite.setResponseSleep(Duration.ofMillis(50L));
StreamWriter writeStream1 = getTestStreamWriter(TEST_STREAM_1);
StreamWriter writeStream2 = getTestStreamWriter(TEST_STREAM_2);

// Try append 10 requests, at the end we should have 2 requests per connection, and 5
// connections created.
long appendCount = 10;
for (long i = 0; i < appendCount; i++) {
testBigQueryWrite.addResponse(createAppendResponse(i));
}
List<ApiFuture<?>> futures = new ArrayList<>();

// 1. We will start inserting to two tables interferely.
// The final status of each connection queue will be
// (s1 is the request coming from writeStream 1, etc):
// c1: [s1, s1], c2: [s2, s2], c3: [s1, s1], c4: [s2, s2], c5: [s1, s2]
for (int i = 0; i < appendCount; i++) {
StreamWriter writeStream = i % 2 == 0 ? writeStream1 : writeStream2;
futures.add(
sendFooStringTestMessage(
writeStream, connectionWorkerPool, new String[] {String.valueOf(i)}, i));
}
assertThat(connectionWorkerPool.getCreateConnectionCount()).isEqualTo(5);
assertThat(connectionWorkerPool.getTotalConnectionCount()).isEqualTo(5);

// 2. Close one of the stream, after this close, since we will wait for the waiting queue to be
// drained in c1 and c3, at the same time the other queue should also be drained.
connectionWorkerPool.close(writeStream1);
assertThat(connectionWorkerPool.getTotalConnectionCount()).isEqualTo(3);
// Sleep 1 second to make sure every message is drained.
Uninterruptibles.sleepUninterruptibly(1, TimeUnit.SECONDS);

// 3. Insert another batch of messages, since every connection has no in flight messages
// we should be able to reuse the previous 5 connections.
for (int i = 0; i < appendCount; i++) {
StreamWriter writeStream = i % 2 == 0 ? writeStream1 : writeStream2;
futures.add(
sendFooStringTestMessage(
writeStream, connectionWorkerPool, new String[] {String.valueOf(i)}, i));
}
assertThat(connectionWorkerPool.getTotalConnectionCount()).isEqualTo(5);
for (ApiFuture<?> future : futures) {
future.get();
}

// 4. Close write stream 1. Two connections associated with it will be closed.
connectionWorkerPool.close(writeStream1);
assertThat(connectionWorkerPool.getTotalConnectionCount()).isEqualTo(3);

// 5. Close write stream 2, all should be closed.
connectionWorkerPool.close(writeStream2);
assertThat(connectionWorkerPool.getTotalConnectionCount()).isEqualTo(0);
}

private AppendRowsResponse createAppendResponse(long offset) {
return AppendRowsResponse.newBuilder()
.setAppendResult(
Expand Down

0 comments on commit 6b3a974

Please sign in to comment.