Skip to content

Commit

Permalink
fix: make sure to fall back to old query path when query job is incom…
Browse files Browse the repository at this point in the history
…plete (#941)

* fix: make sure to fall back to old query path when query job is incomplete (takes more than 10s)

* nit

* address comments

* add comment

* nit update
  • Loading branch information
stephaniewang526 authored Nov 13, 2020
1 parent 7843755 commit bd7d85c
Show file tree
Hide file tree
Showing 3 changed files with 96 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1268,20 +1268,22 @@ public com.google.api.services.bigquery.model.QueryResponse call() {

long numRows;
Schema schema;
if (results.getSchema() == null && results.getJobComplete()) {
JobId jobId = JobId.fromPb(results.getJobReference());
Job job = getJob(jobId, options);
TableResult tableResult = job.getQueryResults();
return tableResult;
} else {
schema = results.getSchema() == null ? null : Schema.fromPb(results.getSchema());
if (results.getJobComplete() && results.getSchema() != null) {
schema = Schema.fromPb(results.getSchema());
if (results.getNumDmlAffectedRows() == null && results.getTotalRows() == null) {
numRows = 0L;
} else if (results.getNumDmlAffectedRows() != null) {
numRows = results.getNumDmlAffectedRows();
} else {
numRows = results.getTotalRows().longValue();
}
} else {
// Query is long running (> 10s) and hasn't completed yet, or query completed but didn't
// return the schema, fallback. Some operations don't return the schema and can be optimized
// here, but this is left as future work.
JobId jobId = JobId.fromPb(results.getJobReference());
Job job = getJob(jobId, options);
return job.getQueryResults();
}

if (results.getPageToken() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1955,6 +1955,63 @@ public void testFastQueryMultiplePages() throws InterruptedException {
verify(bigqueryRpcMock).queryRpc(PROJECT, requestInfo.toPb());
}

@Test
public void testFastQuerySlowDdl() throws InterruptedException {
// mock new fast query path response when running a query that takes more than 10s
JobId queryJob = JobId.of(PROJECT, JOB);
com.google.api.services.bigquery.model.QueryResponse queryResponsePb =
new com.google.api.services.bigquery.model.QueryResponse()
.setJobComplete(false) // false when query does not complete in 10s
.setJobReference(queryJob.toPb()) // backend sends back a jobReference
.setRows(ImmutableList.of(TABLE_ROW))
.setSchema(TABLE_SCHEMA.toPb());

// mock job response from backend
com.google.api.services.bigquery.model.Job responseJob =
new com.google.api.services.bigquery.model.Job()
.setConfiguration(QUERY_JOB_CONFIGURATION_FOR_QUERY.toPb())
.setJobReference(queryJob.toPb())
.setId(JOB)
.setStatus(new com.google.api.services.bigquery.model.JobStatus().setState("DONE"));

// mock old query path response when falling back
GetQueryResultsResponse queryResultsResponsePb =
new GetQueryResultsResponse()
.setJobReference(responseJob.getJobReference())
.setRows(ImmutableList.of(TABLE_ROW))
.setJobComplete(true)
.setTotalRows(BigInteger.valueOf(1L))
.setSchema(TABLE_SCHEMA.toPb());

QueryRequestInfo requestInfo = new QueryRequestInfo(QUERY_JOB_CONFIGURATION_FOR_QUERY);
QueryRequest requestPb = requestInfo.toPb();

when(bigqueryRpcMock.queryRpc(PROJECT, requestPb)).thenReturn(queryResponsePb);
responseJob.getConfiguration().getQuery().setDestinationTable(TABLE_ID.toPb());
when(bigqueryRpcMock.getJob(PROJECT, JOB, null, EMPTY_RPC_OPTIONS)).thenReturn(responseJob);
when(bigqueryRpcMock.getQueryResults(
PROJECT, JOB, null, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS)))
.thenReturn(queryResultsResponsePb);
when(bigqueryRpcMock.listTableData(PROJECT, DATASET, TABLE, EMPTY_RPC_OPTIONS))
.thenReturn(new TableDataList().setRows(ImmutableList.of(TABLE_ROW)).setTotalRows(1L));

bigquery = options.getService();
TableResult result = bigquery.query(QUERY_JOB_CONFIGURATION_FOR_QUERY);
assertThat(result.getSchema()).isEqualTo(TABLE_SCHEMA);
assertThat(result.getTotalRows()).isEqualTo(1);
for (FieldValueList row : result.getValues()) {
assertThat(row.get(0).getBooleanValue()).isFalse();
assertThat(row.get(1).getLongValue()).isEqualTo(1);
}

verify(bigqueryRpcMock).queryRpc(PROJECT, requestInfo.toPb());
verify(bigqueryRpcMock).getJob(PROJECT, JOB, null, EMPTY_RPC_OPTIONS);
verify(bigqueryRpcMock)
.getQueryResults(
PROJECT, JOB, null, BigQueryImpl.optionMap(Job.DEFAULT_QUERY_WAIT_OPTIONS));
verify(bigqueryRpcMock).listTableData(PROJECT, DATASET, TABLE, EMPTY_RPC_OPTIONS);
}

@Test
public void testQueryRequestCompletedOptions() throws InterruptedException {
JobId queryJob = JobId.of(PROJECT, JOB);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1813,6 +1813,36 @@ public void testFastDDLQuery() throws InterruptedException {
}
}

@Test
public void testFastQuerySlowDDL() throws InterruptedException {
String tableName =
"test_table_fast_query_ddl_slow_" + UUID.randomUUID().toString().substring(0, 8);
// This query take more than 10s to run and should fall back on the old query path
String slowDdlQuery =
String.format(
"CREATE OR REPLACE TABLE %s AS SELECT unique_key, agency, complaint_type, descriptor, street_name, city, landmark FROM `bigquery-public-data.new_york.311_service_requests`",
tableName);
QueryJobConfiguration ddlConfig =
QueryJobConfiguration.newBuilder(slowDdlQuery)
.setDefaultDataset(DatasetId.of(DATASET))
.build();
TableResult result = bigquery.query(ddlConfig);
assertEquals(0, result.getTotalRows());
assertNotNull(result.getSchema());
// Verify correctness of table content
String sqlQuery = String.format("SELECT * FROM %s.%s", DATASET, tableName);
QueryJobConfiguration sqlConfig = QueryJobConfiguration.newBuilder(sqlQuery).build();
TableResult resultAfterDDL = bigquery.query(sqlConfig);
for (FieldValueList row : resultAfterDDL.getValues()) {
FieldValue unique_key = row.get(0);
assertEquals(unique_key, row.get("unique_key"));
FieldValue agency = row.get(1);
assertEquals(agency, row.get("agency"));
FieldValue complaint_type = row.get(2);
assertEquals(complaint_type, row.get("complaint_type"));
}
}

@Test
public void testFastQueryHTTPException() throws InterruptedException {
String queryInvalid =
Expand Down

0 comments on commit bd7d85c

Please sign in to comment.