Skip to content

Commit

Permalink
feat: add integration tests for v1beta2 BigQuery Storage API (#50)
Browse files Browse the repository at this point in the history
The tests are the same as in v1beta1, with some minor differences due to protocol buffer changes.
  • Loading branch information
mmladenovski committed Jan 31, 2020
1 parent 9496158 commit bd37cf3
Show file tree
Hide file tree
Showing 4 changed files with 1,223 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.bigquery.storage.v1beta2.it;

/** Test helper class to generate BigQuery resource paths. */
public class BigQueryResource {

/**
* Returns a BigQuery table resource path from the provided parameters into the following format:
* projects/{projectId}/datasets/{datasetId}/tables/{tableId}
*
* @param projectId
* @param datasetId
* @param tableId
* @return a path to a table resource.
*/
public static String FormatTableResource(String projectId, String datasetId, String tableId) {
return String.format("projects/%s/datasets/%s/tables/%s", projectId, datasetId, tableId);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Copyright 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.google.cloud.bigquery.storage.v1beta2.it;

import static org.junit.Assert.assertEquals;

import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.ServiceOptions;
import com.google.cloud.bigquery.storage.v1beta2.BigQueryReadClient;
import com.google.cloud.bigquery.storage.v1beta2.DataFormat;
import com.google.cloud.bigquery.storage.v1beta2.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1beta2.ReadRowsResponse;
import com.google.cloud.bigquery.storage.v1beta2.ReadSession;
import com.google.cloud.bigquery.storage.v1beta2.ReadStream;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.logging.Logger;
import org.junit.AfterClass;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.Test;

/**
* Integration tests for BigQuery Storage API which target long running sessions. These tests can be
* enabled by setting the system property 'bigquery.storage.enable_long_running_tests' to true.
*/
public class ITBigQueryStorageLongRunningTest {

private static final Logger LOG =
Logger.getLogger(ITBigQueryStorageLongRunningTest.class.getName());

private static final String LONG_TESTS_ENABLED_PROPERTY =
"bigquery.storage.enable_long_running_tests";

private static final String LONG_TESTS_DISABLED_MESSAGE =
String.format(
"BigQuery Storage long running tests are not enabled and will be skipped. "
+ "To enable them, set system property '%s' to true.",
LONG_TESTS_ENABLED_PROPERTY);

private static BigQueryReadClient client;
private static String parentProjectId;

@BeforeClass
public static void beforeClass() throws IOException {
Assume.assumeTrue(LONG_TESTS_DISABLED_MESSAGE, Boolean.getBoolean(LONG_TESTS_ENABLED_PROPERTY));
client = BigQueryReadClient.create();
parentProjectId = String.format("projects/%s", ServiceOptions.getDefaultProjectId());

LOG.info(
String.format(
"%s tests running with parent project: %s",
ITBigQueryStorageLongRunningTest.class.getSimpleName(), parentProjectId));
}

@AfterClass
public static void afterClass() {
if (client != null) {
client.close();
}
}

@Test
public void testLongRunningReadSession() throws InterruptedException, ExecutionException {
// This test reads a larger table with the goal of doing a simple validation of timeout settings
// for a longer running session.

String table =
BigQueryResource.FormatTableResource(
/* projectId = */ "bigquery-public-data",
/* datasetId = */ "samples",
/* tableId = */ "wikipedia");

ReadSession session =
client.createReadSession(
/* parent = */ parentProjectId,
/* readSession = */ ReadSession.newBuilder()
.setTable(table)
.setDataFormat(DataFormat.AVRO)
.build(),
/* maxStreamCount = */ 5);

assertEquals(
String.format(
"Did not receive expected number of streams for table '%s' CreateReadSession response:%n%s",
table, session.toString()),
5,
session.getStreamsCount());

List<Callable<Long>> tasks = new ArrayList<>(session.getStreamsCount());
for (final ReadStream stream : session.getStreamsList()) {
tasks.add(
new Callable<Long>() {
@Override
public Long call() throws Exception {
return readAllRowsFromStream(stream);
}
});
}

ExecutorService executor = Executors.newFixedThreadPool(tasks.size());
List<Future<Long>> results = executor.invokeAll(tasks);

long rowCount = 0;
for (Future<Long> result : results) {
rowCount += result.get();
}

assertEquals(313_797_035, rowCount);
}

private long readAllRowsFromStream(ReadStream readStream) {

ReadRowsRequest readRowsRequest =
ReadRowsRequest.newBuilder().setReadStream(readStream.getName()).build();

long rowCount = 0;
ServerStream<ReadRowsResponse> serverStream = client.readRowsCallable().call(readRowsRequest);
for (ReadRowsResponse response : serverStream) {
rowCount += response.getRowCount();
}

LOG.info(
String.format("Read total of %d rows from stream '%s'.", rowCount, readStream.getName()));
return rowCount;
}
}
Loading

0 comments on commit bd37cf3

Please sign in to comment.