Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: fix some todos and reject stream writer if it's created with mixed behavior of passed in client or not #1803

Merged
merged 27 commits into from
Sep 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
5a63d95
feat: Split writer into connection worker and wrapper, this is a
GaoleMeng Sep 9, 2022
5a13302
feat: add connection worker pool skeleton, used for multiplexing client
GaoleMeng Sep 13, 2022
0297204
Merge branch 'main' into main
GaoleMeng Sep 14, 2022
8a81ad3
feat: add Load api for connection worker for multiplexing client
GaoleMeng Sep 14, 2022
68fd040
Merge remote-tracking branch 'upstream/main'
GaoleMeng Sep 14, 2022
3106dae
Merge remote-tracking branch 'upstream/main'
GaoleMeng Sep 15, 2022
5bf04e5
Merge branch 'googleapis:main' into main
GaoleMeng Sep 15, 2022
2fc7551
Merge branch 'main' of https://github.com/GaoleMeng/java-bigquerystorage
GaoleMeng Sep 15, 2022
7a6d919
feat: add multiplexing support to connection worker. We will treat every
GaoleMeng Sep 15, 2022
3ba7659
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 16, 2022
f379a78
Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 16, 2022
9307776
Merge branch 'main' of https://github.com/GaoleMeng/java-bigquerystorage
GaoleMeng Sep 16, 2022
de73013
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 16, 2022
19005a1
feat: port the multiplexing client core algorithm and basic tests
GaoleMeng Sep 19, 2022
c5d14ba
Merge branch 'googleapis:main' into main
GaoleMeng Sep 19, 2022
644360a
🦉 Updates from OwlBot post-processor
gcf-owl-bot[bot] Sep 20, 2022
3099d82
Merge branch 'googleapis:main' into main
GaoleMeng Sep 20, 2022
e707dd6
Merge branch 'googleapis:main' into main
GaoleMeng Sep 20, 2022
9e7a8fa
Merge branch 'main' of https://github.com/GaoleMeng/java-bigquerystorage
GaoleMeng Sep 20, 2022
31f1755
Merge branch 'googleapis:main' into main
GaoleMeng Sep 20, 2022
44c36fc
feat: wire multiplexing connection pool to stream writer
GaoleMeng Sep 20, 2022
87a4036
feat: some fixes for multiplexing client
GaoleMeng Sep 23, 2022
c92ea1b
Merge remote-tracking branch 'upstream/main'
GaoleMeng Sep 23, 2022
019520c
Merge branch 'googleapis:main' into main
GaoleMeng Sep 26, 2022
47893df
feat: fix some todos, and reject the mixed behavior of passed in clie…
GaoleMeng Sep 27, 2022
8bd4e6a
Merge remote-tracking branch 'upstream/main'
GaoleMeng Sep 27, 2022
83409b0
Merge remote-tracking branch 'upstream/main'
GaoleMeng Sep 27, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.io.IOException;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -324,18 +325,19 @@ private ConnectionWorker createConnectionWorker(String streamName, ProtoSchema w
// Though atomic integer is super lightweight, add extra if check in case adding future logic.
testValueCreateConnectionCount.getAndIncrement();
}
// TODO(gaole): figure out a better way to handle header / request body mismatch
// currently we use different header for the client in each connection worker to be different
// currently we use different header for the client in each connection worker to be different
// as the backend require the header to have the same write_stream field as request body.
BigQueryWriteClient clientAfterModification = client;
if (ownsBigQueryWriteClient) {
BigQueryWriteSettings settings = client.getSettings();

// Every header to write api is required to set write_stream in the header to help routing
// the request to correct region.
HashMap<String, String> newHeaders = new HashMap<>();
newHeaders.putAll(settings.toBuilder().getHeaderProvider().getHeaders());
newHeaders.put("x-goog-request-params", "write_stream=" + streamName);
BigQueryWriteSettings stubSettings =
settings
.toBuilder()
.setHeaderProvider(
FixedHeaderProvider.create("x-goog-request-params", "write_stream=" + streamName))
.build();
settings.toBuilder().setHeaderProvider(FixedHeaderProvider.create(newHeaders)).build();
clientAfterModification = BigQueryWriteClient.create(stubSettings);
}
ConnectionWorker connectionWorker =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

/**
Expand Down Expand Up @@ -68,6 +67,9 @@ public class StreamWriter implements AutoCloseable {
*/
private final SingleConnectionOrConnectionPool singleConnectionOrConnectionPool;

/** Test only param to tell how many times a client is created. */
private static int testOnlyClientCreatedTimes = 0;

/**
* Static map from {@link ConnectionPoolKey} to connection pool. Note this map is static to be
* shared by every stream writer in the same process.
Expand Down Expand Up @@ -169,25 +171,7 @@ private StreamWriter(Builder builder) throws IOException {
this.streamName = builder.streamName;
this.writerSchema = builder.writerSchema;
this.location = builder.location;
boolean ownsBigQueryWriteClient;
if (builder.client == null) {
BigQueryWriteSettings stubSettings =
BigQueryWriteSettings.newBuilder()
.setCredentialsProvider(builder.credentialsProvider)
.setTransportChannelProvider(builder.channelProvider)
.setBackgroundExecutorProvider(builder.executorProvider)
.setEndpoint(builder.endpoint)
// (b/185842996): Temporily fix this by explicitly providing the header.
.setHeaderProvider(
FixedHeaderProvider.create(
"x-goog-request-params", "write_stream=" + this.streamName))
.build();
client = BigQueryWriteClient.create(stubSettings);
ownsBigQueryWriteClient = true;
} else {
client = builder.client;
ownsBigQueryWriteClient = false;
}
boolean ownsBigQueryWriteClient = builder.client == null;
if (!builder.enableConnectionPool) {
this.singleConnectionOrConnectionPool =
SingleConnectionOrConnectionPool.ofSingleConnection(
Expand All @@ -198,7 +182,7 @@ private StreamWriter(Builder builder) throws IOException {
builder.maxInflightBytes,
builder.limitExceededBehavior,
builder.traceId,
client,
getBigQueryWriteClient(builder),
ownsBigQueryWriteClient));
} else {
if (builder.location == null || builder.location.isEmpty()) {
Expand All @@ -212,29 +196,39 @@ private StreamWriter(Builder builder) throws IOException {
SingleConnectionOrConnectionPool.ofConnectionPool(
connectionPoolMap.computeIfAbsent(
ConnectionPoolKey.create(builder.location),
(key) ->
new ConnectionWorkerPool(
(key) -> {
try {
return new ConnectionWorkerPool(
builder.maxInflightRequest,
builder.maxInflightBytes,
builder.limitExceededBehavior,
builder.traceId,
client,
ownsBigQueryWriteClient)));
getBigQueryWriteClient(builder),
ownsBigQueryWriteClient);
} catch (IOException e) {
throw new RuntimeException(e);
}
}));
validateFetchedConnectonPool(builder);
// Shut down the passed in client. Internally we will create another client inside connection
// pool for every new connection worker.
// TODO(gaole): instead of perform close outside of pool approach, change to always create
// new client in connection.
if (client != singleConnectionOrConnectionPool.connectionWorkerPool().bigQueryWriteClient()
&& ownsBigQueryWriteClient) {
client.shutdown();
try {
client.awaitTermination(150, TimeUnit.SECONDS);
} catch (InterruptedException unused) {
// Ignore interruption as this client is not used.
}
client.close();
}
}
}

private BigQueryWriteClient getBigQueryWriteClient(Builder builder) throws IOException {
if (builder.client == null) {
BigQueryWriteSettings stubSettings =
BigQueryWriteSettings.newBuilder()
.setCredentialsProvider(builder.credentialsProvider)
.setTransportChannelProvider(builder.channelProvider)
.setEndpoint(builder.endpoint)
// (b/185842996): Temporily fix this by explicitly providing the header.
.setHeaderProvider(
FixedHeaderProvider.create(
"x-goog-request-params", "write_stream=" + this.streamName))
.build();
testOnlyClientCreatedTimes++;
return BigQueryWriteClient.create(stubSettings);
} else {
return builder.client;
}
}

Expand All @@ -245,6 +239,10 @@ private void validateFetchedConnectonPool(StreamWriter.Builder builder) {
this.singleConnectionOrConnectionPool.connectionWorkerPool().getTraceId(),
builder.traceId)) {
paramsValidatedFailed = "Trace id";
} else if (!Objects.equals(
this.singleConnectionOrConnectionPool.connectionWorkerPool().ownsBigQueryWriteClient(),
builder.client == null)) {
paramsValidatedFailed = "Whether using passed in clients";
} else if (!Objects.equals(
this.singleConnectionOrConnectionPool.connectionWorkerPool().limitExceededBehavior(),
builder.limitExceededBehavior)) {
Expand Down Expand Up @@ -361,6 +359,17 @@ SingleConnectionOrConnectionPool.Kind getConnectionOperationType() {
return singleConnectionOrConnectionPool.getKind();
}

@VisibleForTesting
static int getTestOnlyClientCreatedTimes() {
return testOnlyClientCreatedTimes;
}

@VisibleForTesting
static void cleanUp() {
testOnlyClientCreatedTimes = 0;
connectionPoolMap.clear();
}

/** A builder of {@link StreamWriter}s. */
public static final class Builder {
private static final long DEFAULT_MAX_INFLIGHT_REQUESTS = 1000L;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,13 +82,15 @@ public void setUp() throws Exception {
.setCredentialsProvider(NoCredentialsProvider.create())
.setTransportChannelProvider(serviceHelper.createChannelProvider())
.build());
StreamWriter.cleanUp();
}

@After
public void tearDown() throws Exception {
log.info("tearDown called");
client.close();
serviceHelper.stop();
StreamWriter.cleanUp();
}

private StreamWriter getMultiplexingTestStreamWriter() throws IOException {
Expand Down Expand Up @@ -722,6 +724,25 @@ public void testInitialization_operationKind() throws Exception {
}
}

@Test
public void createStreamWithDifferentWhetherOwnsClient() throws Exception {
StreamWriter streamWriter1 = getMultiplexingTestStreamWriter();

assertThrows(
IllegalArgumentException.class,
new ThrowingRunnable() {
@Override
public void run() throws Throwable {
StreamWriter.newBuilder(TEST_STREAM)
.setWriterSchema(createProtoSchema())
.setTraceId(TEST_TRACE_ID)
.setLocation("US")
.setEnableConnectionPool(true)
.build();
}
});
}

// Timeout to ensure close() doesn't wait for done callback timeout.
@Test(timeout = 10000)
public void testCloseDisconnectedStream() throws Exception {
Expand Down