Skip to content
Permalink
Browse files
feat: sql fast path impl (#509)
* feat: sql fast path impl

add QueryJobConfig to QueryRequest logic

high level mode

reset private methods

refactor: modified code

update logic
add test

refactor: update code and test case

add integration tests

code format

add clir ignore and remove pom file

feat: add more assert

nit update

* add logic for DML and DDL queries
enable requestId
add integration tests for fast path multipages query, DML, and DDL queries

fix requestId logic

update QueryRequestInfo and add mock test

add mock test cases for SQL, DML, and DDL
clean up code

fix IT

add schema test

* update ITs to check table content correctness, update fastquery logic

nit

nit

* add test for bogus query

* add check for idempotent requestId

* update QueryRequestInfo and error handling logic

* add mock test for query JobException

* update mock test

* fix unit tests, nit update

* update exception handling from JobException to BigQueryException

* update based on comments

* nit

* update based on comments

* add maxResult support
optimization changes

* update code

* add test coverage

to address:
google-cloud-bigquery/src/main/java/com/google/cloud/bigquery/BigQueryException.java#L69-L71
Added lines #L69 - L71 were not covered by tests

* lint fix

* feat: add more code cov

* set method back

* feat: code cove

* add codecov

Co-authored-by: Praful Makani <praful@qlogic.io>
  • Loading branch information
stephaniewang526 and Praful Makani committed Sep 22, 2020
1 parent f2ecf15 commit 64a7d65ff97152c49194f507562266c1ba6f0f3b
@@ -32,4 +32,9 @@
<className>com/google/cloud/bigquery/spi/v2/BigQueryRpc</className>
<method>com.google.api.services.bigquery.model.TestIamPermissionsResponse testIamPermissions(java.lang.String, java.util.List, java.util.Map)</method>
</difference>
<difference>
<className>com/google/cloud/bigquery/spi/v2/BigQueryRpc</className>
<method>com.google.api.services.bigquery.model.QueryResponse queryRpc(java.lang.String, com.google.api.services.bigquery.model.QueryRequest)</method>
<differenceType>7012</differenceType>
</difference>
</differences>
@@ -84,7 +84,7 @@ public String getLocation() {
return location;
}

String getDebugInfo() {
public String getDebugInfo() {
return debugInfo;
}

@@ -21,6 +21,8 @@
import com.google.cloud.http.BaseHttpServiceException;
import com.google.common.collect.ImmutableSet;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ExecutionException;
@@ -39,37 +41,52 @@ public final class BigQueryException extends BaseHttpServiceException {
new Error(500, null), new Error(502, null), new Error(503, null), new Error(504, null));
private static final long serialVersionUID = -5006625989225438209L;

private final BigQueryError error;
private final List<BigQueryError> errors;

public BigQueryException(int code, String message) {
this(code, message, (Throwable) null);
}

public BigQueryException(int code, String message, Throwable cause) {
super(code, message, null, true, RETRYABLE_ERRORS, cause);
this.error = null;
this.errors = null;
}

public BigQueryException(int code, String message, BigQueryError error) {
super(code, message, error != null ? error.getReason() : null, true, RETRYABLE_ERRORS);
this.error = error;
this.errors = Arrays.asList(error);
}

public BigQueryException(List<BigQueryError> errors) {
super(0, null, null, false, RETRYABLE_ERRORS, null);
this.errors = errors;
}

public BigQueryException(IOException exception) {
super(exception, true, RETRYABLE_ERRORS);
BigQueryError error = null;
List<BigQueryError> errors = null;
if (getReason() != null) {
error = new BigQueryError(getReason(), getLocation(), getMessage(), getDebugInfo());
errors =
Arrays.asList(
new BigQueryError(getReason(), getLocation(), getMessage(), getDebugInfo()));
}
this.error = error;
this.errors = errors;
}

/**
* Returns the {@link BigQueryError} that caused this exception. Returns {@code null} if none
* exists.
*/
public BigQueryError getError() {
return error;
return errors == null || errors.isEmpty() || errors.size() == 0 ? null : errors.get(0);
}

/**
* Returns a list of {@link BigQueryError}s that caused this exception. Returns {@code null} if
* none exists.
*/
public List<BigQueryError> getErrors() {
return errors;
}

@Override
@@ -81,12 +98,12 @@ public boolean equals(Object obj) {
return false;
}
BigQueryException other = (BigQueryException) obj;
return super.equals(other) && Objects.equals(error, other.error);
return super.equals(other) && Objects.equals(errors, other.errors);
}

@Override
public int hashCode() {
return Objects.hash(super.hashCode(), error);
return Objects.hash(super.hashCode(), errors);
}

/**
@@ -26,6 +26,7 @@
import com.google.api.gax.paging.Page;
import com.google.api.services.bigquery.model.ErrorProto;
import com.google.api.services.bigquery.model.GetQueryResultsResponse;
import com.google.api.services.bigquery.model.QueryRequest;
import com.google.api.services.bigquery.model.TableDataInsertAllRequest;
import com.google.api.services.bigquery.model.TableDataInsertAllRequest.Rows;
import com.google.api.services.bigquery.model.TableDataInsertAllResponse;
@@ -48,6 +49,7 @@
import com.google.common.collect.FluentIterable;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import java.util.ArrayList;
import java.util.List;
@@ -198,6 +200,43 @@ public Page<FieldValueList> getNextPage() {
}
}

private class QueryPageFetcher extends Thread implements NextPageFetcher<FieldValueList> {

private static final long serialVersionUID = -8501991114794410114L;
private final Map<BigQueryRpc.Option, ?> requestOptions;
private final BigQueryOptions serviceOptions;
private Job job;
private final TableId table;
private final Schema schema;

QueryPageFetcher(
JobId jobId,
Schema schema,
BigQueryOptions serviceOptions,
String cursor,
Map<BigQueryRpc.Option, ?> optionMap) {
this.requestOptions =
PageImpl.nextRequestOptions(BigQueryRpc.Option.PAGE_TOKEN, cursor, optionMap);
this.serviceOptions = serviceOptions;
this.job = getJob(jobId);
this.table = ((QueryJobConfiguration) job.getConfiguration()).getDestinationTable();
this.schema = schema;
}

@Override
public Page<FieldValueList> getNextPage() {
while (!JobStatus.State.DONE.equals(job.getStatus().getState())) {
try {
sleep(5000);
} catch (InterruptedException ex) {
throw new RuntimeException(ex.getMessage());
}
job = job.reload();
}
return listTableData(table, schema, serviceOptions, requestOptions).x();
}
}

private final BigQueryRpc bigQueryRpc;

BigQueryImpl(BigQueryOptions options) {
@@ -1184,9 +1223,79 @@ public Boolean call() {
public TableResult query(QueryJobConfiguration configuration, JobOption... options)
throws InterruptedException, JobException {
Job.checkNotDryRun(configuration, "query");

// If all parameters passed in configuration are supported by the query() method on the backend,
// put on fast path
QueryRequestInfo requestInfo = new QueryRequestInfo(configuration);
if (requestInfo.isFastQuerySupported()) {
String projectId = getOptions().getProjectId();
QueryRequest content = requestInfo.toPb();
return queryRpc(projectId, content, options);
}
// Otherwise, fall back to the existing create query job logic
return create(JobInfo.of(configuration), options).getQueryResults();
}

private TableResult queryRpc(
final String projectId, final QueryRequest content, JobOption... options) {
com.google.api.services.bigquery.model.QueryResponse results;
try {
results =
runWithRetries(
new Callable<com.google.api.services.bigquery.model.QueryResponse>() {
@Override
public com.google.api.services.bigquery.model.QueryResponse call() {
return bigQueryRpc.queryRpc(projectId, content);
}
},
getOptions().getRetrySettings(),
EXCEPTION_HANDLER,
getOptions().getClock());
} catch (RetryHelperException e) {
throw BigQueryException.translateAndThrow(e);
}

if (results.getErrors() != null) {
List<BigQueryError> bigQueryErrors =
Lists.transform(results.getErrors(), BigQueryError.FROM_PB_FUNCTION);
// Throwing BigQueryException since there may be no JobId and we want to stay consistent
// with the case where there there is a HTTP error
throw new BigQueryException(bigQueryErrors);
}

Schema schema = results.getSchema() == null ? null : Schema.fromPb(results.getSchema());
Long numRows;
if (results.getNumDmlAffectedRows() == null && results.getTotalRows() == null) {
numRows = 0L;
} else if (results.getNumDmlAffectedRows() != null) {
numRows = results.getNumDmlAffectedRows();
} else {
numRows = results.getTotalRows().longValue();
}

if (results.getPageToken() != null) {
JobId jobId = JobId.fromPb(results.getJobReference());
String cursor = results.getPageToken();
return new TableResult(
schema,
numRows,
new PageImpl<>(
// fetch next pages of results
new QueryPageFetcher(jobId, schema, getOptions(), cursor, optionMap(options)),
cursor,
// cache first page of result
transformTableData(results.getRows(), schema)));
}
// only 1 page of result
return new TableResult(
schema,
numRows,
new PageImpl<>(
new TableDataPageFetcher(null, schema, getOptions(), null, optionMap(options)),
null,
transformTableData(results.getRows(), schema)));
}

@Override
public TableResult query(QueryJobConfiguration configuration, JobId jobId, JobOption... options)
throws InterruptedException, JobException {
@@ -69,6 +69,8 @@ public final class QueryJobConfiguration extends JobConfiguration {
private final Map<String, String> labels;
private final RangePartitioning rangePartitioning;
private final List<ConnectionProperty> connectionProperties;
// maxResults is only used for fast query path
private final Long maxResults;

/**
* Priority levels for a query. If not specified the priority is assumed to be {@link
@@ -118,6 +120,7 @@ public enum Priority {
private Map<String, String> labels;
private RangePartitioning rangePartitioning;
private List<ConnectionProperty> connectionProperties;
private Long maxResults;

private Builder() {
super(Type.QUERY);
@@ -150,6 +153,7 @@ private Builder(QueryJobConfiguration jobConfiguration) {
this.labels = jobConfiguration.labels;
this.rangePartitioning = jobConfiguration.rangePartitioning;
this.connectionProperties = jobConfiguration.connectionProperties;
this.maxResults = jobConfiguration.maxResults;
}

private Builder(com.google.api.services.bigquery.model.JobConfiguration configurationPb) {
@@ -603,6 +607,20 @@ public Builder setConnectionProperties(List<ConnectionProperty> connectionProper
return this;
}

/**
* This is only supported in the fast query path [Optional] The maximum number of rows of data
* to return per page of results. Setting this flag to a small value such as 1000 and then
* paging through results might improve reliability when the query result set is large. In
* addition to this limit, responses are also limited to 10 MB. By default, there is no maximum
* row count, and only the byte limit applies.
*
* @param maxResults maxResults or {@code null} for none
*/
public Builder setMaxResults(Long maxResults) {
this.maxResults = maxResults;
return this;
}

public QueryJobConfiguration build() {
return new QueryJobConfiguration(this);
}
@@ -644,6 +662,7 @@ private QueryJobConfiguration(Builder builder) {
this.labels = builder.labels;
this.rangePartitioning = builder.rangePartitioning;
this.connectionProperties = builder.connectionProperties;
this.maxResults = builder.maxResults;
}

/**
@@ -833,6 +852,19 @@ public List<ConnectionProperty> getConnectionProperties() {
return connectionProperties;
}

/**
* This is only supported in the fast query path [Optional] The maximum number of rows of data to
* return per page of results. Setting this flag to a small value such as 1000 and then paging
* through results might improve reliability when the query result set is large. In addition to
* this limit, responses are also limited to 10 MB. By default, there is no maximum row count, and
* only the byte limit applies.
*
* @return value or {@code null} for none
*/
public Long getMaxResults() {
return maxResults;
}

@Override
public Builder toBuilder() {
return new Builder(this);
@@ -851,7 +883,7 @@ ToStringHelper toStringHelper() {
.add("flattenResults", flattenResults)
.add("priority", priority)
.add("tableDefinitions", tableDefinitions)
.add("userQueryCache", useQueryCache)
.add("useQueryCache", useQueryCache)
.add("userDefinedFunctions", userDefinedFunctions)
.add("createDisposition", createDisposition)
.add("writeDisposition", writeDisposition)

0 comments on commit 64a7d65

Please sign in to comment.