Skip to content

Commit

Permalink
feat: Adding ExecutorProvider support while creating BigQueryReadClie…
Browse files Browse the repository at this point in the history
…nt (#2072)

* Adding ExecutorProvider support while creating BigQueryReadClient

* adding integration test

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
suryasoma and gcf-owl-bot[bot] committed Apr 14, 2023
1 parent 3e3fdc9 commit 9221e18
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 4 deletions.
6 changes: 3 additions & 3 deletions README.md
Expand Up @@ -57,13 +57,13 @@ implementation 'com.google.cloud:google-cloud-bigquerystorage'
If you are using Gradle without BOM, add this to your dependencies:

```Groovy
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.34.2'
implementation 'com.google.cloud:google-cloud-bigquerystorage:2.35.0'
```

If you are using SBT, add this to your dependencies:

```Scala
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.34.2"
libraryDependencies += "com.google.cloud" % "google-cloud-bigquerystorage" % "2.35.0"
```
<!-- {x-version-update-end} -->

Expand Down Expand Up @@ -220,7 +220,7 @@ Java is a registered trademark of Oracle and/or its affiliates.
[kokoro-badge-link-5]: http://storage.googleapis.com/cloud-devrel-public/java/badges/java-bigquerystorage/java11.html
[stability-image]: https://img.shields.io/badge/stability-stable-green
[maven-version-image]: https://img.shields.io/maven-central/v/com.google.cloud/google-cloud-bigquerystorage.svg
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-bigquerystorage/2.34.2
[maven-version-link]: https://central.sonatype.com/artifact/com.google.cloud/google-cloud-bigquerystorage/2.35.0
[authentication]: https://github.com/googleapis/google-cloud-java#authentication
[auth-scopes]: https://developers.google.com/identity/protocols/oauth2/scopes
[predefined-iam-roles]: https://cloud.google.com/iam/docs/understanding-roles#predefined_roles
Expand Down
Expand Up @@ -75,7 +75,8 @@ public static EnhancedBigQueryReadStub create(
.setHeaderProvider(settings.getHeaderProvider())
.setCredentialsProvider(settings.getCredentialsProvider())
.setStreamWatchdogCheckInterval(settings.getStreamWatchdogCheckInterval())
.setStreamWatchdogProvider(settings.getStreamWatchdogProvider());
.setStreamWatchdogProvider(settings.getStreamWatchdogProvider())
.setBackgroundExecutorProvider(settings.getBackgroundExecutorProvider());

baseSettingsBuilder
.createReadSessionSettings()
Expand Down Expand Up @@ -195,4 +196,8 @@ public void shutdownNow() {
public boolean awaitTermination(long duration, TimeUnit unit) throws InterruptedException {
return stub.awaitTermination(duration, unit);
}

public BigQueryReadStubSettings getStubSettings() {
return stubSettings;
}
}
Expand Up @@ -21,7 +21,9 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;

import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.RetryOption;
import com.google.cloud.ServiceOptions;
Expand All @@ -39,6 +41,7 @@
import com.google.cloud.bigquery.TableInfo;
import com.google.cloud.bigquery.TimePartitioning;
import com.google.cloud.bigquery.storage.v1.BigQueryReadClient;
import com.google.cloud.bigquery.storage.v1.BigQueryReadSettings;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.DataFormat;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
Expand Down Expand Up @@ -806,6 +809,56 @@ public void testStructAndArraySqlTypes() throws InterruptedException, IOExceptio
assertEquals(rowAssertMessage, new Utf8("abc"), structRecord.get("str_field"));
}

@Test
public void testSimpleReadWithBackgroundExecutorProvider() throws IOException {
BigQueryReadSettings bigQueryReadSettings =
BigQueryReadSettings.newBuilder()
.setBackgroundExecutorProvider(
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(14).build())
.build();
// Overriding the default client
client = BigQueryReadClient.create(bigQueryReadSettings);
assertTrue(
client.getStub().getStubSettings().getBackgroundExecutorProvider()
instanceof InstantiatingExecutorProvider);
assertEquals(
14,
((InstantiatingExecutorProvider)
client.getStub().getStubSettings().getBackgroundExecutorProvider())
.getExecutorThreadCount());
String table =
BigQueryResource.FormatTableResource(
/* projectId = */ "bigquery-public-data",
/* datasetId = */ "samples",
/* tableId = */ "shakespeare");

ReadSession session =
client.createReadSession(
/* parent = */ parentProjectId,
/* readSession = */ ReadSession.newBuilder()
.setTable(table)
.setDataFormat(DataFormat.AVRO)
.build(),
/* maxStreamCount = */ 1);
assertEquals(
String.format(
"Did not receive expected number of streams for table '%s' CreateReadSession response:%n%s",
table, session.toString()),
1,
session.getStreamsCount());

ReadRowsRequest readRowsRequest =
ReadRowsRequest.newBuilder().setReadStream(session.getStreams(0).getName()).build();

long rowCount = 0;
ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(readRowsRequest);
for (ReadRowsResponse response : stream) {
rowCount += response.getRowCount();
}

assertEquals(164_656, rowCount);
}

/**
* Reads to the specified row offset within the stream. If the stream does not have the desired
* rows to read, it will read all of them.
Expand Down

0 comments on commit 9221e18

Please sign in to comment.