Skip to content
This repository was archived by the owner on Nov 11, 2022. It is now read-only.
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
108 changes: 80 additions & 28 deletions sdk/src/main/java/com/google/cloud/dataflow/sdk/io/BigQueryIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.api.services.bigquery.model.TableRow;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.SerializableCoder;
import com.google.cloud.dataflow.sdk.coders.TableRowJsonCoder;
import com.google.cloud.dataflow.sdk.coders.VoidCoder;
import com.google.cloud.dataflow.sdk.options.BigQueryOptions;
Expand Down Expand Up @@ -50,12 +51,12 @@
import com.google.cloud.dataflow.sdk.values.PCollection.IsBounded;
import com.google.cloud.dataflow.sdk.values.PDone;
import com.google.cloud.dataflow.sdk.values.PInput;

import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
Expand Down Expand Up @@ -217,102 +218,146 @@ public static String toTableSpec(TableReference ref) {
* </code></pre>
*/
public static class Read {
public static Bound named(String name) {
return new Bound().named(name);
public static Bound<TableRow> named(String name) {
return new Bound(TableRow.class).named(name);
}

/**
* Reads a BigQuery table specified as
* "[project_id]:[dataset_id].[table_id]" or "[dataset_id].[table_id]" for
* tables within the current project.
*/
public static Bound from(String tableSpec) {
return new Bound().from(tableSpec);
public static Bound<TableRow> from(String tableSpec) {
return new Bound(TableRow.class).from(tableSpec);
}

/**
* Reads a BigQuery table specified as a TableReference object.
*/
public static Bound from(TableReference table) {
return new Bound().from(table);
public static Bound<TableRow> from(TableReference table) {
return new Bound(TableRow.class).from(table);
}

/**
* Disables BigQuery table validation, which is enabled by default.
*/
public static Bound withoutValidation() {
return new Bound().withoutValidation();
public static Bound<TableRow> withoutValidation() {
return new Bound(TableRow.class).withoutValidation();
}

/**
* Use typing.
* @param type
* @param <T>
* @return
*/
public static <T> Bound<T> withType(Class<T> type) {
return new Bound<>(type);
}

/**
* A {@link PTransform} that reads from a BigQuery table and returns a bounded
* {@link PCollection} of {@link TableRow TableRows}.
*/
public static class Bound extends PTransform<PInput, PCollection<TableRow>> {
public static class Bound<T> extends PTransform<PInput, PCollection<T>> {
private static final long serialVersionUID = 0;

TableReference table;
final boolean validate;
/** The class type of the row. */
final Class<T> type;

Bound() {
Bound(Class<T> type) {
this.type = type;
this.validate = true;
}

Bound(String name, TableReference reference, boolean validate) {
Bound(String name, TableReference reference, Class<T> type, boolean validate) {
super(name);
this.table = reference;
this.type = type;
this.validate = validate;
}

/**
* Sets the name associated with this transformation.
*/
public Bound named(String name) {
return new Bound(name, table, validate);
public Bound<T> named(String name) {
return new Bound(name, table, type, validate);
}

/**
* Sets the table specification.
* <p>
* Refer to {@link #parseTableSpec(String)} for the specification format.
*/
public Bound from(String tableSpec) {
public Bound<T> from(String tableSpec) {
return from(parseTableSpec(tableSpec));
}

/**
* Sets the table specification.
*/
public Bound from(TableReference table) {
return new Bound(name, table, validate);
public Bound<T> from(TableReference table) {
return new Bound(name, table, type, validate);
}

/**
* Disable table validation.
*/
public Bound withoutValidation() {
return new Bound(name, table, false);
public Bound<T> withoutValidation() {
return new Bound(name, table, type, false);
}

@Override
public PCollection<TableRow> apply(PInput input) {
public PCollection<T> apply(PInput input) {
if (table == null) {
throw new IllegalStateException(
"must set the table reference of a BigQueryIO.Read transform");
}
return PCollection.<TableRow>createPrimitiveOutputInternal(
if (TableRow.class.equals(type)) {
return (PCollection<T>) PCollection.<TableRow>createPrimitiveOutputInternal(
input.getPipeline(),
WindowingStrategy.globalDefault(),
IsBounded.BOUNDED)
// Force the output's Coder to be what the read is using, and
// unchangeable later, to ensure that we read the input in the
// format specified by the Read transform.
.setCoder(TableRowJsonCoder.of());
}
PCollection<T> pCollection = (PCollection<T>) PCollection.
<TableRow>createPrimitiveOutputInternal(
input.getPipeline(),
WindowingStrategy.globalDefault(),
IsBounded.BOUNDED);

if (Serializable.class.isAssignableFrom(type)) {
try {
pCollection.setCoder((Coder<T>) SerializableCoder.of(type.getName()));
} catch (ClassNotFoundException e) {
throw new IllegalStateException(
"typed BigQueryIO.Read should be serializable",
e);
}
} else {
throw new IllegalStateException(
"type show be serializable");
}
return pCollection;
}

@Override
protected Coder<TableRow> getDefaultOutputCoder() {
return TableRowJsonCoder.of();
protected Coder<T> getDefaultOutputCoder() {
if (TableRow.class.equals(type)) {
return (Coder<T>) TableRowJsonCoder.of();
}
try {
return (Coder<T>) SerializableCoder.of(type.getName());
} catch (ClassNotFoundException e) {
throw new IllegalStateException(
"typed BigQueryIO.Read should be serializable",
e);
}
}

@Override
Expand Down Expand Up @@ -344,6 +389,13 @@ public TableReference getTable() {
public boolean getValidate() {
return validate;
}

/**
* Return the type of model object.
*/
public Class<T> getType() {
return type;
}
}
}

Expand Down Expand Up @@ -862,8 +914,8 @@ public PDone apply(PCollection<TableRow> input) {
* <p>
* This loads the entire table into an in-memory PCollection.
*/
private static void evaluateReadHelper(
Read.Bound transform, DirectPipelineRunner.EvaluationContext context) {
private static <T> void evaluateReadHelper(
Read.Bound<T> transform, DirectPipelineRunner.EvaluationContext context) {
BigQueryOptions options = context.getPipelineOptions();
Bigquery client = Transport.newBigQueryClient(options).build();
TableReference ref = transform.table;
Expand All @@ -872,10 +924,10 @@ private static void evaluateReadHelper(
}

LOG.info("Reading from BigQuery table {}", toTableSpec(ref));
List<WindowedValue<TableRow>> elems =
ReaderUtils.readElemsFromReader(new BigQueryReader(client, ref));
List<WindowedValue<T>> elems = ReaderUtils.readElemsFromReader(
new BigQueryReader(client, ref, transform.getType()));
LOG.info("Number of records read from BigQuery: {}", elems.size());
context.setPCollectionWindowedValue(context.getOutput(transform), elems);
context.setPCollectionWindowedValue((PCollection) context.getOutput(transform), elems);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ public class BigQueryIOTranslator {
/**
* Implements BigQueryIO Read translation for the Dataflow backend.
*/
public static class ReadTranslator
implements DataflowPipelineTranslator.TransformTranslator<BigQueryIO.Read.Bound> {
public static class ReadTranslator<T>
implements DataflowPipelineTranslator.TransformTranslator<BigQueryIO.Read.Bound<T>> {

@Override
public void translate(BigQueryIO.Read.Bound transform,
public void translate(BigQueryIO.Read.Bound<T> transform,
DataflowPipelineTranslator.TranslationContext context) {
TableReference table = transform.getTable();
if (table.getProjectId() == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.dataflow.sdk.options.BigQueryOptions;
import com.google.cloud.dataflow.sdk.util.AbstractBigQueryIterator;
import com.google.cloud.dataflow.sdk.util.BigQueryTableRowIterator;
import com.google.cloud.dataflow.sdk.util.BigQueryTypedIterator;
import com.google.cloud.dataflow.sdk.util.Transport;
import com.google.cloud.dataflow.sdk.util.WindowedValue;
import com.google.cloud.dataflow.sdk.util.common.worker.Reader;
Expand All @@ -35,44 +37,56 @@
* query for all rows of a table and then iterates over the result. There is no support for
* progress reporting because the source is used only in situations where the entire table must be
* read by each worker (i.e. the source is used as a side input).
*
* @param <T> the type of the elements read from the source
*/
public class BigQueryReader extends Reader<WindowedValue<TableRow>> {
public class BigQueryReader<T> extends Reader<WindowedValue<T>> {
final TableReference tableRef;
final BigQueryOptions bigQueryOptions;
final Bigquery bigQueryClient;
final Class<T> type;

/** Builds a BigQuery source using pipeline options to instantiate a Bigquery client. */
public BigQueryReader(BigQueryOptions bigQueryOptions, TableReference tableRef) {
public BigQueryReader(BigQueryOptions bigQueryOptions, TableReference tableRef, Class<T> type) {
// Save pipeline options so that we can construct the BigQuery client on-demand whenever an
// iterator gets created.
this.bigQueryOptions = bigQueryOptions;
this.tableRef = tableRef;
this.bigQueryClient = null;
this.type = type;
}

/** Builds a BigQueryReader directly using a BigQuery client. */
public BigQueryReader(Bigquery bigQueryClient, TableReference tableRef) {
public BigQueryReader(Bigquery bigQueryClient, TableReference tableRef, Class<T> type) {
this.bigQueryOptions = null;
this.tableRef = tableRef;
this.bigQueryClient = bigQueryClient;
this.type = type;
}

@Override
public ReaderIterator<WindowedValue<TableRow>> iterator() throws IOException {
public ReaderIterator<WindowedValue<T>> iterator() throws IOException {
return new BigQueryReaderIterator(
bigQueryClient != null
? bigQueryClient : Transport.newBigQueryClient(bigQueryOptions).build(),
tableRef);
tableRef,
type);
}

/**
* A ReaderIterator that yields TableRow objects for each row of a BigQuery table.
*/
class BigQueryReaderIterator extends AbstractReaderIterator<WindowedValue<TableRow>> {
private BigQueryTableRowIterator rowIterator;
class BigQueryReaderIterator extends AbstractReaderIterator<WindowedValue<T>> {
private AbstractBigQueryIterator<T> rowIterator;

public BigQueryReaderIterator(Bigquery bigQueryClient, TableReference tableRef) {
rowIterator = new BigQueryTableRowIterator(bigQueryClient, tableRef);
public BigQueryReaderIterator(Bigquery bigQueryClient, TableReference tableRef, Class<T> type) {
if (type == TableRow.class) {
rowIterator = (AbstractBigQueryIterator<T>) new BigQueryTableRowIterator(
bigQueryClient,
tableRef);
} else {
rowIterator = new BigQueryTypedIterator(bigQueryClient, tableRef, type);
}
}

@Override
Expand All @@ -81,7 +95,7 @@ public boolean hasNext() {
}

@Override
public WindowedValue<TableRow> next() throws IOException {
public WindowedValue<T> next() throws IOException {
if (!hasNext()) {
throw new NoSuchElementException();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@
import static com.google.cloud.dataflow.sdk.util.Structs.getString;

import com.google.api.services.bigquery.model.TableReference;
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.dataflow.sdk.coders.Coder;
import com.google.cloud.dataflow.sdk.coders.SerializableCoder;
import com.google.cloud.dataflow.sdk.coders.TableRowJsonCoder;
import com.google.cloud.dataflow.sdk.options.BigQueryOptions;
import com.google.cloud.dataflow.sdk.options.PipelineOptions;
import com.google.cloud.dataflow.sdk.util.CloudObject;
Expand All @@ -35,11 +38,27 @@ private BigQueryReaderFactory() {}

public static BigQueryReader create(PipelineOptions options, CloudObject spec, Coder<?> coder,
ExecutionContext executionContext) throws Exception {
return new BigQueryReader(

if (coder instanceof TableRowJsonCoder) {
return new BigQueryReader(
options.as(BigQueryOptions.class),
new TableReference()
.setProjectId(getString(spec, PropertyNames.BIGQUERY_PROJECT))
.setDatasetId(getString(spec, PropertyNames.BIGQUERY_DATASET))
.setTableId(getString(spec, PropertyNames.BIGQUERY_TABLE)),
TableRow.class);
} else {
if (coder instanceof SerializableCoder) {
SerializableCoder serializableCoder = (SerializableCoder) coder;
return new BigQueryReader(
options.as(BigQueryOptions.class),
new TableReference()
.setProjectId(getString(spec, PropertyNames.BIGQUERY_PROJECT))
.setDatasetId(getString(spec, PropertyNames.BIGQUERY_DATASET))
.setTableId(getString(spec, PropertyNames.BIGQUERY_TABLE)));
.setTableId(getString(spec, PropertyNames.BIGQUERY_TABLE)),
serializableCoder.getRecordType());
}
throw new IllegalStateException("Unsupported coder.");
}
}
}
Loading