Skip to content
This repository has been archived by the owner on Nov 11, 2022. It is now read-only.

Commit

Permalink
Allow unflattened results from a BigQuery query-based export
Browse files Browse the repository at this point in the history
----Release Notes----
Add an option to not flatten results from a BigQuery query-based export

[]
-------------
Created by MOE: https://github.com/google/moe
MOE_MIGRATED_REVID=113271738
  • Loading branch information
sammcveety authored and davorbonaci committed Feb 2, 2016
1 parent e006292 commit b08ec20
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 27 deletions.
48 changes: 42 additions & 6 deletions sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@
import com.google.cloud.dataflow.sdk.values.PDone;
import com.google.cloud.dataflow.sdk.values.PInput;
import com.google.cloud.hadoop.util.ApiErrorExtractor;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;

import com.fasterxml.jackson.annotation.JsonCreator;
Expand All @@ -80,6 +81,7 @@
import java.util.concurrent.ThreadLocalRandom;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import javax.annotation.Nullable;

/**
* {@link PTransform}s for reading and writing
Expand Down Expand Up @@ -339,20 +341,24 @@ public static class Bound extends PTransform<PInput, PCollection<TableRow>> {
TableReference table;
final String query;
final boolean validate;
@Nullable
Boolean flattenResults;

private static final String QUERY_VALIDATION_FAILURE_ERROR =
"Validation of query \"%1$s\" failed. If the query depends on an earlier stage of the"
+ " pipeline, This validation can be disabled using #withoutValidation.";

private Bound() {
this(null, null, null, true);
this(null, null, null, true, null);
}

private Bound(String name, String query, TableReference reference, boolean validate) {
private Bound(String name, String query, TableReference reference, boolean validate,
Boolean flattenResults) {
super(name);
this.table = reference;
this.query = query;
this.validate = validate;
this.flattenResults = flattenResults;
}

/**
Expand All @@ -361,7 +367,7 @@ private Bound(String name, String query, TableReference reference, boolean valid
* <p>Does not modify this object.
*/
public Bound named(String name) {
return new Bound(name, query, table, validate);
return new Bound(name, query, table, validate, flattenResults);
}

/**
Expand All @@ -380,23 +386,40 @@ public Bound from(String tableSpec) {
* <p>Does not modify this object.
*/
public Bound from(TableReference table) {
return new Bound(name, query, table, validate);
return new Bound(name, query, table, validate, flattenResults);
}

/**
* Returns a copy of this transform that reads the results of the specified query.
*
* <p>Does not modify this object.
*
* <p>By default, the query results will be flattened -- see
* "flattenResults" in the <a href="https://cloud.google.com/bigquery/docs/reference/v2/jobs">
* Jobs documentation</a> for more information. To disable flattening, use
* {@link BigQueryIO.Read.Bound#withoutResultFlattening}.
*/
public Bound fromQuery(String query) {
return new Bound(name, query, table, validate);
return new Bound(name, query, table, validate,
MoreObjects.firstNonNull(flattenResults, Boolean.TRUE));
}

/**
* Disable table validation.
*/
public Bound withoutValidation() {
return new Bound(name, query, table, false);
return new Bound(name, query, table, false, flattenResults);
}

/**
* Disable <a href="https://cloud.google.com/bigquery/docs/reference/v2/jobs">
* flattening of query results</a>.
*
* <p>Only valid when a query is used ({@link #fromQuery}). Setting this option when reading
* from a table will cause an error during validation.
*/
public Bound withoutResultFlattening() {
return new Bound(name, query, table, validate, false);
}

/**
Expand All @@ -410,6 +433,12 @@ public void validate(PInput input) {
} else if (table != null && query != null) {
throw new IllegalStateException("Invalid BigQuery read operation. Specifies both a"
+ " query and a table, only one of these should be provided");
} else if (table != null && flattenResults != null) {
throw new IllegalStateException("Invalid BigQuery read operation. Specifies a"
+ " table with a result flattening preference, which is not configurable");
} else if (query != null && flattenResults == null) {
throw new IllegalStateException("Invalid BigQuery read operation. Specifies a"
+ " query without a result flattening preference");
}

BigQueryOptions bqOptions = input.getPipeline().getOptions().as(BigQueryOptions.class);
Expand Down Expand Up @@ -500,6 +529,13 @@ public String getQuery() {
public boolean getValidate() {
return validate;
}

/**
* Returns true/false if result flattening is enabled/disabled, or null if not applicable.
*/
public Boolean getFlattenResults() {
return flattenResults;
}
}

/** Disallow construction of utility class. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ public void translate(

if (transform.getQuery() != null) {
context.addInput(PropertyNames.BIGQUERY_QUERY, transform.getQuery());
context.addInput(PropertyNames.BIGQUERY_FLATTEN_RESULTS, transform.getFlattenResults());
} else {
TableReference table = transform.getTable();
if (table.getProjectId() == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,13 +44,15 @@ public class BigQueryReader extends NativeReader<WindowedValue<TableRow>> {
@Nullable private final TableReference tableRef;
@Nullable private final String query;
@Nullable private final String projectId;
@Nullable private final Boolean flattenResults;
private final Bigquery bigQueryClient;

private BigQueryReader(TableReference tableRef, String query, String projectId,
Bigquery bigQueryClient) {
Bigquery bigQueryClient, Boolean flattenResults) {
this.tableRef = tableRef;
this.query = query;
this.projectId = projectId;
this.flattenResults = flattenResults;
this.bigQueryClient = checkNotNull(bigQueryClient, "bigQueryClient");
}

Expand All @@ -59,7 +61,7 @@ private BigQueryReader(TableReference tableRef, String query, String projectId,
* table.
*/
public static BigQueryReader fromTable(TableReference tableRef, Bigquery bigQueryClient) {
return new BigQueryReader(tableRef, null, null, bigQueryClient);
return new BigQueryReader(tableRef, null, null, bigQueryClient, null);
}

/**
Expand All @@ -69,15 +71,15 @@ public static BigQueryReader fromTable(TableReference tableRef, Bigquery bigQuer
public static BigQueryReader fromTableWithOptions(
TableReference tableRef, BigQueryOptions bigQueryOptions) {
Bigquery client = Transport.newBigQueryClient(bigQueryOptions).build();
return new BigQueryReader(tableRef, null, null, client);
return new BigQueryReader(tableRef, null, null, client, null);
}

/**
* Returns a {@code BigQueryReader} that uses the specified client to read the results from
* executing the specified query in the specified project.
*/
public static BigQueryReader fromQuery(String query, String projectId, Bigquery bigQueryClient) {
return new BigQueryReader(null, query, projectId, bigQueryClient);
return new BigQueryReader(null, query, projectId, bigQueryClient, true);
}

/**
Expand All @@ -86,9 +88,10 @@ public static BigQueryReader fromQuery(String query, String projectId, Bigquery
* specified options.
*/
public static BigQueryReader fromQueryWithOptions(
String query, String projectId, BigQueryOptions bigQueryOptions) {
String query, String projectId, BigQueryOptions bigQueryOptions,
@Nullable Boolean flattenResults) {
Bigquery client = Transport.newBigQueryClient(bigQueryOptions).build();
return new BigQueryReader(null, query, projectId, client);
return new BigQueryReader(null, query, projectId, client, flattenResults);
}

public TableReference getTableRef() {
Expand All @@ -104,7 +107,7 @@ public BigQueryReaderIterator iterator() throws IOException {
if (tableRef != null) {
return new BigQueryReaderIterator(tableRef, bigQueryClient);
} else {
return new BigQueryReaderIterator(query, projectId, bigQueryClient);
return new BigQueryReaderIterator(query, projectId, bigQueryClient, flattenResults);
}
}

Expand All @@ -119,8 +122,10 @@ public BigQueryReaderIterator(TableReference tableRef, Bigquery bigQueryClient)
rowIterator = BigQueryTableRowIterator.fromTable(tableRef, bigQueryClient);
}

public BigQueryReaderIterator(String query, String projectId, Bigquery bigQueryClient) {
rowIterator = BigQueryTableRowIterator.fromQuery(query, projectId, bigQueryClient);
public BigQueryReaderIterator(String query, String projectId, Bigquery bigQueryClient,
@Nullable Boolean flattenResults) {
rowIterator = BigQueryTableRowIterator.fromQuery(query, projectId, bigQueryClient,
flattenResults);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.dataflow.sdk.runners.worker;

import static com.google.cloud.dataflow.sdk.util.Structs.getBoolean;
import static com.google.cloud.dataflow.sdk.util.Structs.getString;

import com.google.api.services.bigquery.model.TableReference;
Expand Down Expand Up @@ -54,10 +55,11 @@ public BigQueryReader createTyped(
PipelineOptions options,
ExecutionContext executionContext) throws Exception {
String query = getString(spec, PropertyNames.BIGQUERY_QUERY, null);
Boolean flatten = getBoolean(spec, PropertyNames.BIGQUERY_FLATTEN_RESULTS, true);
if (query != null) {
GcpOptions gcpOptions = options.as(GcpOptions.class);
return BigQueryReader.fromQueryWithOptions(
query, gcpOptions.getProject(), options.as(BigQueryOptions.class));
query, gcpOptions.getProject(), options.as(BigQueryOptions.class), flatten);
}

String tableId = getString(spec, PropertyNames.BIGQUERY_TABLE, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;

import org.joda.time.Duration;
Expand Down Expand Up @@ -89,18 +90,21 @@ public class BigQueryTableRowIterator implements Iterator<TableRow>, Closeable {
private static final Duration QUERY_COMPLETION_POLL_TIME = Duration.standardSeconds(1);

private final String query;
// Whether to flatten query results.
private final boolean flattenResults;
// Temporary dataset used to store query results.
private String temporaryDatasetId = null;
// Temporary table used to store query results.
private String temporaryTableId = null;

private BigQueryTableRowIterator(
@Nullable TableReference ref, @Nullable String query, @Nullable String projectId,
Bigquery client) {
Bigquery client, boolean flattenResults) {
this.ref = ref;
this.query = query;
this.projectId = projectId;
this.client = checkNotNull(client, "client");
this.flattenResults = flattenResults;
}

/**
Expand All @@ -109,19 +113,20 @@ private BigQueryTableRowIterator(
public static BigQueryTableRowIterator fromTable(TableReference ref, Bigquery client) {
checkNotNull(ref, "ref");
checkNotNull(client, "client");
return new BigQueryTableRowIterator(ref, null, ref.getProjectId(), client);
return new BigQueryTableRowIterator(ref, null, ref.getProjectId(), client, true);
}

/**
* Constructs a {@code BigQueryTableRowIterator} that reads from the results of executing the
* specified query in the specified project.
*/
public static BigQueryTableRowIterator fromQuery(
String query, String projectId, Bigquery client) {
String query, String projectId, Bigquery client, @Nullable Boolean flattenResults) {
checkNotNull(query, "query");
checkNotNull(projectId, "projectId");
checkNotNull(client, "client");
return new BigQueryTableRowIterator(null, query, projectId, client);
return new BigQueryTableRowIterator(null, query, projectId, client,
MoreObjects.firstNonNull(flattenResults, Boolean.TRUE));
}

@Override
Expand Down Expand Up @@ -376,6 +381,7 @@ private TableReference executeQueryAndWaitForCompletion()
job.setConfiguration(config);
queryConfig.setQuery(query);
queryConfig.setAllowLargeResults(true);
queryConfig.setFlattenResults(flattenResults);

TableReference destinationTable = new TableReference();
destinationTable.setProjectId(projectId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class PropertyNames {
public static final String BIGQUERY_SCHEMA = "schema";
public static final String BIGQUERY_TABLE = "table";
public static final String BIGQUERY_QUERY = "bigquery_query";
public static final String BIGQUERY_FLATTEN_RESULTS = "bigquery_flatten_results";
public static final String BIGQUERY_WRITE_DISPOSITION = "write_disposition";
public static final String CO_GBK_RESULT_SCHEMA = "co_gbk_result_schema";
public static final String COMBINE_FN = "combine_fn";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,8 @@ public void testValidateReadSetsDefaultProject() {
public void testBuildSourceWithoutTableOrQuery() {
Pipeline p = TestPipeline.create();
thrown.expect(IllegalStateException.class);
thrown.expectMessage(Matchers.containsString(
"Invalid BigQuery read operation, either table reference or query has to be set"));
thrown.expectMessage(
"Invalid BigQuery read operation, either table reference or query has to be set");
p.apply(BigQueryIO.Read.named("ReadMyTable"));
p.run();
}
Expand All @@ -196,16 +196,31 @@ public void testBuildSourceWithoutTableOrQuery() {
public void testBuildSourceWithTableAndQuery() {
Pipeline p = TestPipeline.create();
thrown.expect(IllegalStateException.class);
thrown.expectMessage(Matchers.containsString(
thrown.expectMessage(
"Invalid BigQuery read operation. Specifies both a query and a table, only one of these"
+ " should be provided"));
+ " should be provided");
p.apply(
BigQueryIO.Read.named("ReadMyTable")
.from("foo.com:project:somedataset.sometable")
.fromQuery("query"));
p.run();
}

@Test
@Category(RunnableOnService.class)
public void testBuildSourceWithTableAndFlatten() {
Pipeline p = TestPipeline.create();
thrown.expect(IllegalStateException.class);
thrown.expectMessage(
"Invalid BigQuery read operation. Specifies a"
+ " table with a result flattening preference, which is not configurable");
p.apply(
BigQueryIO.Read.named("ReadMyTable")
.from("foo.com:project:somedataset.sometable")
.withoutResultFlattening());
p.run();
}

@Test
public void testBuildSink() {
BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable")
Expand Down Expand Up @@ -253,7 +268,7 @@ public void testBuildSinkWithTableReference() {
public void testBuildSinkWithoutTable() {
Pipeline p = TestPipeline.create();
thrown.expect(IllegalStateException.class);
thrown.expectMessage(Matchers.containsString("must set the table reference"));
thrown.expectMessage("must set the table reference");
p.apply(Create.<TableRow>of().withCoder(TableRowJsonCoder.of()))
.apply(BigQueryIO.Write.named("WriteMyTable"));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ public void testReadFromQuery() throws IOException {
// Run query and verify
String query = "SELECT name, count from table";
try (BigQueryTableRowIterator iterator =
BigQueryTableRowIterator.fromQuery(query, "project", mockClient)) {
BigQueryTableRowIterator.fromQuery(query, "project", mockClient, null)) {
assertTrue(iterator.hasNext());
TableRow row = iterator.next();

Expand Down Expand Up @@ -226,7 +226,7 @@ public void testQueryFailed() throws IOException {

String query = "NOT A QUERY";
try (BigQueryTableRowIterator iterator =
BigQueryTableRowIterator.fromQuery(query, "project", mockClient)) {
BigQueryTableRowIterator.fromQuery(query, "project", mockClient, null)) {
try {
iterator.hasNext();
fail();
Expand Down

0 comments on commit b08ec20

Please sign in to comment.