Skip to content
Permalink
Browse files
feat: set requestId for fast query path in QueryRequestInfo instead o…
…f QueryJobConfiguration (#987)

As requested @epavan
  • Loading branch information
stephaniewang526 committed Dec 5, 2020
1 parent a692fe1 commit e2cd4f63ccc543e144f90e06eaadb2e96ce94943
@@ -103,6 +103,11 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
@@ -35,7 +35,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.UUID;

/**
* Google BigQuery Query Job configuration. A Query Job runs a query against BigQuery data. Query
@@ -72,7 +71,6 @@ public final class QueryJobConfiguration extends JobConfiguration {
private final List<ConnectionProperty> connectionProperties;
// maxResults is only used for fast query path
private final Long maxResults;
private final String requestId;

/**
* Priority levels for a query. If not specified the priority is assumed to be {@link
@@ -123,7 +121,6 @@ public enum Priority {
private RangePartitioning rangePartitioning;
private List<ConnectionProperty> connectionProperties;
private Long maxResults;
private String requestId;

private Builder() {
super(Type.QUERY);
@@ -157,7 +154,6 @@ private Builder(QueryJobConfiguration jobConfiguration) {
this.rangePartitioning = jobConfiguration.rangePartitioning;
this.connectionProperties = jobConfiguration.connectionProperties;
this.maxResults = jobConfiguration.maxResults;
this.requestId = jobConfiguration.requestId;
}

private Builder(com.google.api.services.bigquery.model.JobConfiguration configurationPb) {
@@ -625,11 +621,6 @@ public Builder setMaxResults(Long maxResults) {
return this;
}

Builder setRequestId(String requestId) {
this.requestId = requestId;
return this;
}

public QueryJobConfiguration build() {
return new QueryJobConfiguration(this);
}
@@ -672,7 +663,6 @@ private QueryJobConfiguration(Builder builder) {
this.rangePartitioning = builder.rangePartitioning;
this.connectionProperties = builder.connectionProperties;
this.maxResults = builder.maxResults;
this.requestId = builder.requestId;
}

/**
@@ -875,10 +865,6 @@ public Long getMaxResults() {
return maxResults;
}

String getRequestId() {
return requestId;
}

@Override
public Builder toBuilder() {
return new Builder(this);
@@ -1057,7 +1043,7 @@ com.google.api.services.bigquery.model.JobConfiguration toPb() {
/** Creates a builder for a BigQuery Query Job given the query to be run. */
public static Builder newBuilder(String query) {
checkArgument(!isNullOrEmpty(query), "Provided query is null or empty");
return new Builder().setQuery(query).setRequestId(UUID.randomUUID().toString());
return new Builder().setQuery(query);
}

/**
@@ -23,6 +23,7 @@
import com.google.common.collect.Lists;
import java.util.List;
import java.util.Map;
import java.util.UUID;

final class QueryRequestInfo {

@@ -35,9 +36,9 @@ final class QueryRequestInfo {
private final Long maxResults;
private final String query;
private final List<QueryParameter> queryParameters;
private final String requestId;
private final Boolean useQueryCache;
private final Boolean useLegacySql;
private final String requestId;

QueryRequestInfo(QueryJobConfiguration config) {
this.config = config;
@@ -49,9 +50,9 @@ final class QueryRequestInfo {
this.maxResults = config.getMaxResults();
this.query = config.getQuery();
this.queryParameters = config.toPb().getQuery().getQueryParameters();
this.requestId = UUID.randomUUID().toString();
this.useLegacySql = config.useLegacySql();
this.useQueryCache = config.useQueryCache();
this.requestId = config.getRequestId();
}

boolean isFastQuerySupported() {
@@ -1885,9 +1885,8 @@ public void testFastQueryRequestCompleted() throws InterruptedException {
.setTotalBytesProcessed(42L)
.setTotalRows(BigInteger.valueOf(1L));

QueryRequestInfo requestInfo = new QueryRequestInfo(QUERY_JOB_CONFIGURATION_FOR_QUERY);

when(bigqueryRpcMock.queryRpc(PROJECT, requestInfo.toPb())).thenReturn(queryResponsePb);
when(bigqueryRpcMock.queryRpc(eq(PROJECT), requestPbCapture.capture()))
.thenReturn(queryResponsePb);

bigquery = options.getService();
TableResult result = bigquery.query(QUERY_JOB_CONFIGURATION_FOR_QUERY);
@@ -1900,7 +1899,15 @@ public void testFastQueryRequestCompleted() throws InterruptedException {
assertThat(row.get(0).getBooleanValue()).isFalse();
assertThat(row.get(1).getLongValue()).isEqualTo(1);
}
verify(bigqueryRpcMock).queryRpc(PROJECT, requestInfo.toPb());

QueryRequest requestPb = requestPbCapture.getValue();
assertEquals(QUERY_JOB_CONFIGURATION_FOR_QUERY.getQuery(), requestPb.getQuery());
assertEquals(
QUERY_JOB_CONFIGURATION_FOR_QUERY.getDefaultDataset().getDataset(),
requestPb.getDefaultDataset().getDatasetId());
assertEquals(QUERY_JOB_CONFIGURATION_FOR_QUERY.useQueryCache(), requestPb.getUseQueryCache());

verify(bigqueryRpcMock).queryRpc(eq(PROJECT), requestPbCapture.capture());
}

@Test
@@ -1937,22 +1944,30 @@ public void testFastQueryMultiplePages() throws InterruptedException {
.setTotalBytesProcessed(42L)
.setTotalRows(BigInteger.valueOf(1L));

QueryRequestInfo requestInfo = new QueryRequestInfo(QUERY_JOB_CONFIGURATION_FOR_QUERY);
when(bigqueryRpcMock.queryRpc(PROJECT, requestInfo.toPb())).thenReturn(queryResponsePb);
when(bigqueryRpcMock.queryRpc(eq(PROJECT), requestPbCapture.capture()))
.thenReturn(queryResponsePb);

bigquery = options.getService();
TableResult result = bigquery.query(QUERY_JOB_CONFIGURATION_FOR_QUERY);
assertTrue(result.hasNextPage());
assertNotNull(result.getNextPageToken());
assertNotNull(result.getNextPage());

QueryRequest requestPb = requestPbCapture.getValue();
assertEquals(QUERY_JOB_CONFIGURATION_FOR_QUERY.getQuery(), requestPb.getQuery());
assertEquals(
QUERY_JOB_CONFIGURATION_FOR_QUERY.getDefaultDataset().getDataset(),
requestPb.getDefaultDataset().getDatasetId());
assertEquals(QUERY_JOB_CONFIGURATION_FOR_QUERY.useQueryCache(), requestPb.getUseQueryCache());

verify(bigqueryRpcMock).getJob(PROJECT, JOB, null, EMPTY_RPC_OPTIONS);
verify(bigqueryRpcMock)
.listTableData(
PROJECT,
DATASET,
TABLE,
BigQueryImpl.optionMap(BigQuery.TableDataListOption.pageToken(CURSOR)));
verify(bigqueryRpcMock).queryRpc(PROJECT, requestInfo.toPb());
verify(bigqueryRpcMock).queryRpc(eq(PROJECT), requestPbCapture.capture());
}

@Test
@@ -1983,10 +1998,8 @@ public void testFastQuerySlowDdl() throws InterruptedException {
.setTotalRows(BigInteger.valueOf(1L))
.setSchema(TABLE_SCHEMA.toPb());

QueryRequestInfo requestInfo = new QueryRequestInfo(QUERY_JOB_CONFIGURATION_FOR_QUERY);
QueryRequest requestPb = requestInfo.toPb();

when(bigqueryRpcMock.queryRpc(PROJECT, requestPb)).thenReturn(queryResponsePb);
when(bigqueryRpcMock.queryRpc(eq(PROJECT), requestPbCapture.capture()))
.thenReturn(queryResponsePb);
responseJob.getConfiguration().getQuery().setDestinationTable(TABLE_ID.toPb());
when(bigqueryRpcMock.getJob(PROJECT, JOB, null, EMPTY_RPC_OPTIONS)).thenReturn(responseJob);
when(bigqueryRpcMock.getQueryResults(
@@ -2004,7 +2017,14 @@ public void testFastQuerySlowDdl() throws InterruptedException {
assertThat(row.get(1).getLongValue()).isEqualTo(1);
}

verify(bigqueryRpcMock).queryRpc(PROJECT, requestInfo.toPb());
QueryRequest requestPb = requestPbCapture.getValue();
assertEquals(QUERY_JOB_CONFIGURATION_FOR_QUERY.getQuery(), requestPb.getQuery());
assertEquals(
QUERY_JOB_CONFIGURATION_FOR_QUERY.getDefaultDataset().getDataset(),
requestPb.getDefaultDataset().getDatasetId());
assertEquals(QUERY_JOB_CONFIGURATION_FOR_QUERY.useQueryCache(), requestPb.getUseQueryCache());

verify(bigqueryRpcMock).queryRpc(eq(PROJECT), requestPbCapture.capture());
verify(bigqueryRpcMock).getJob(PROJECT, JOB, null, EMPTY_RPC_OPTIONS);
verify(bigqueryRpcMock)
.getQueryResults(
@@ -2287,9 +2307,6 @@ public void testFastQuerySQLShouldRetry() throws Exception {
.setTotalRows(BigInteger.valueOf(1L))
.setSchema(TABLE_SCHEMA.toPb());

QueryRequestInfo requestInfo = new QueryRequestInfo(QUERY_JOB_CONFIGURATION_FOR_QUERY);
QueryRequest requestPb = requestInfo.toPb();

when(bigqueryRpcMock.queryRpc(eq(PROJECT), requestPbCapture.capture()))
.thenThrow(new BigQueryException(500, "InternalError"))
.thenThrow(new BigQueryException(502, "Bad Gateway"))
@@ -2310,13 +2327,13 @@ public void testFastQuerySQLShouldRetry() throws Exception {

List<QueryRequest> allRequests = requestPbCapture.getAllValues();
boolean idempotent = true;
String requestId = requestPb.getRequestId();
String firstRequestId = allRequests.get(0).getRequestId();
for (QueryRequest request : allRequests) {
idempotent = request.getRequestId().equals(requestId);
idempotent = request.getRequestId().equals(firstRequestId);
}
assertTrue(idempotent);

verify(bigqueryRpcMock, times(5)).queryRpc(PROJECT, requestPb);
verify(bigqueryRpcMock, times(5)).queryRpc(eq(PROJECT), requestPbCapture.capture());
}

@Test
@@ -2331,9 +2348,6 @@ public void testFastQueryDMLShouldRetry() throws Exception {
.setNumDmlAffectedRows(1L)
.setSchema(TABLE_SCHEMA.toPb());

QueryRequestInfo requestInfo = new QueryRequestInfo(QUERY_JOB_CONFIGURATION_FOR_DMLQUERY);
QueryRequest requestPb = requestInfo.toPb();

when(bigqueryRpcMock.queryRpc(eq(PROJECT), requestPbCapture.capture()))
.thenThrow(new BigQueryException(500, "InternalError"))
.thenThrow(new BigQueryException(502, "Bad Gateway"))
@@ -2354,13 +2368,13 @@ public void testFastQueryDMLShouldRetry() throws Exception {

List<QueryRequest> allRequests = requestPbCapture.getAllValues();
boolean idempotent = true;
String requestId = requestPb.getRequestId();
String firstRequestId = allRequests.get(0).getRequestId();
for (QueryRequest request : allRequests) {
idempotent = request.getRequestId().equals(requestId);
idempotent = request.getRequestId().equals(firstRequestId);
}
assertTrue(idempotent);

verify(bigqueryRpcMock, times(5)).queryRpc(PROJECT, requestPb);
verify(bigqueryRpcMock, times(5)).queryRpc(eq(PROJECT), requestPbCapture.capture());
}

@Test
@@ -2374,9 +2388,6 @@ public void testFastQueryDDLShouldRetry() throws Exception {
.setTotalBytesProcessed(42L)
.setSchema(TABLE_SCHEMA.toPb());

QueryRequestInfo requestInfo = new QueryRequestInfo(QUERY_JOB_CONFIGURATION_FOR_DDLQUERY);
QueryRequest requestPb = requestInfo.toPb();

when(bigqueryRpcMock.queryRpc(eq(PROJECT), requestPbCapture.capture()))
.thenThrow(new BigQueryException(500, "InternalError"))
.thenThrow(new BigQueryException(502, "Bad Gateway"))
@@ -2397,13 +2408,13 @@ public void testFastQueryDDLShouldRetry() throws Exception {

List<QueryRequest> allRequests = requestPbCapture.getAllValues();
boolean idempotent = true;
String requestId = requestPb.getRequestId();
String firstRequestId = allRequests.get(0).getRequestId();
for (QueryRequest request : allRequests) {
idempotent = request.getRequestId().equals(requestId);
idempotent = request.getRequestId().equals(firstRequestId);
}
assertTrue(idempotent);

verify(bigqueryRpcMock, times(5)).queryRpc(PROJECT, requestPb);
verify(bigqueryRpcMock, times(5)).queryRpc(eq(PROJECT), requestPbCapture.capture());
}

@Test
@@ -2424,10 +2435,7 @@ public void testFastQueryBigQueryException() throws InterruptedException {
.setPageToken(null)
.setErrors(errorProtoList);

QueryRequestInfo requestInfo = new QueryRequestInfo(QUERY_JOB_CONFIGURATION_FOR_QUERY);
QueryRequest requestPb = requestInfo.toPb();

when(bigqueryRpcMock.queryRpc(PROJECT, requestPb)).thenReturn(responsePb);
when(bigqueryRpcMock.queryRpc(eq(PROJECT), requestPbCapture.capture())).thenReturn(responsePb);

bigquery = options.getService();
try {
@@ -2436,7 +2444,14 @@ public void testFastQueryBigQueryException() throws InterruptedException {
} catch (BigQueryException ex) {
assertEquals(Lists.transform(errorProtoList, BigQueryError.FROM_PB_FUNCTION), ex.getErrors());
}
verify(bigqueryRpcMock).queryRpc(PROJECT, requestPb);

QueryRequest requestPb = requestPbCapture.getValue();
assertEquals(QUERY_JOB_CONFIGURATION_FOR_QUERY.getQuery(), requestPb.getQuery());
assertEquals(
QUERY_JOB_CONFIGURATION_FOR_QUERY.getDefaultDataset().getDataset(),
requestPb.getDefaultDataset().getDatasetId());
assertEquals(QUERY_JOB_CONFIGURATION_FOR_QUERY.useQueryCache(), requestPb.getUseQueryCache());
verify(bigqueryRpcMock).queryRpc(eq(PROJECT), requestPbCapture.capture());
}

@Test
@@ -16,6 +16,7 @@

package com.google.cloud.bigquery;

import static org.assertj.core.api.Assertions.*;
import static org.junit.Assert.assertEquals;

import com.google.api.services.bigquery.model.QueryRequest;
@@ -31,7 +32,6 @@

public class QueryRequestInfoTest {

private static final String TEST_PROJECT_ID = "test-project-id";
private static final String QUERY = "BigQuery SQL";
private static final DatasetId DATASET_ID = DatasetId.of("dataset");
private static final TableId TABLE_ID = TableId.of("dataset", "table");
@@ -166,8 +166,7 @@ public void equalTo() {
}

private void compareQueryRequestInfo(QueryRequestInfo expected, QueryRequestInfo actual) {
assertEquals(expected, actual);
assertEquals(expected.hashCode(), actual.hashCode());
assertEquals(expected.toString(), actual.toString());
// requestId are expected to be different
assertThat(actual).isEqualToIgnoringGivenFields(expected, "requestId");
}
}
@@ -1776,6 +1776,15 @@ public void testFastQueryMultipleRuns() throws InterruptedException {
assertNull(result.getNextPageToken());
assertFalse(result.hasNextPage());

// running the same QueryJobConfiguration with the same query again
TableResult result1Duplicate = bigquery.query(config);
assertEquals(QUERY_RESULT_SCHEMA, result1Duplicate.getSchema());
assertEquals(2, result.getTotalRows());
assertNull(result1Duplicate.getNextPage());
assertNull(result1Duplicate.getNextPageToken());
assertFalse(result1Duplicate.hasNextPage());

// running a new QueryJobConfiguration with the same query
QueryJobConfiguration config2 =
QueryJobConfiguration.newBuilder(query).setDefaultDataset(DatasetId.of(DATASET)).build();
TableResult result2 = bigquery.query(config2);
@@ -121,6 +121,13 @@
<version>1.113.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<!-- use 2.9.1 for Java 7 projects -->
<version>2.9.1</version>
<scope>test</scope>
</dependency>
</dependencies>
</dependencyManagement>

0 comments on commit e2cd4f6

Please sign in to comment.