From 8f884d31d949d8f62e787c3d6f67cd2c1b8bceab Mon Sep 17 00:00:00 2001 From: Neville Li Date: Tue, 18 Jul 2017 09:07:21 -0400 Subject: [PATCH 1/2] [BEAM-2532] add a Serializable TableSchema Supplier in BigQuerySourceBase --- .../io/gcp/bigquery/BigQuerySourceBase.java | 48 ++++++++++++++++++- 1 file changed, 46 insertions(+), 2 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java index 2de60a26bb81..f96ebf0cb84c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java @@ -29,9 +29,11 @@ import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; +import com.google.common.base.Supplier; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import java.io.IOException; +import java.io.Serializable; import java.util.List; import java.util.NoSuchElementException; import org.apache.avro.generic.GenericRecord; @@ -168,10 +170,12 @@ List> createSources(List files, TableSchema SerializableFunction function = new SerializableFunction() { + private Supplier schema = new SerializableTableSchemaSupplier( + BigQueryHelpers.fromJsonString(jsonSchema, TableSchema.class)); + @Override public TableRow apply(GenericRecord input) { - return BigQueryAvroUtils.convertGenericRecordToTableRow( - input, BigQueryHelpers.fromJsonString(jsonSchema, TableSchema.class)); + return BigQueryAvroUtils.convertGenericRecordToTableRow(input, schema.get()); }}; List> avroSources = Lists.newArrayList(); @@ -182,6 +186,46 @@ public TableRow apply(GenericRecord input) { return ImmutableList.copyOf(avroSources); } + /** + * A {@link Serializable} object that holds the {@link String} version of a {@link TableSchema}. + * This is paired with the {@link SerializableTableSchemaSupplier} via {@link Serializable}'s + * usage of the {@link #readResolve} method. + */ + private static class SerializableTableSchemaString implements Serializable { + private final String jsonSchema; + private SerializableTableSchemaString(String jsonSchema) { + this.jsonSchema = jsonSchema; + } + + private Object readResolve() throws IOException, ClassCastException { + return new SerializableTableSchemaSupplier( + BigQueryHelpers.fromJsonString(jsonSchema, TableSchema.class)); + } + } + + /** + * A {@link Serializable} object that delegates to the {@link SerializableTableSchemaString} via + * {@link Serializable}'s usage of the {@link #writeReplace} method. Kryo doesn't utilize + * Java's serialization and hence is able to encode the {@link TableSchema} object directly. + */ + private static class SerializableTableSchemaSupplier + implements Serializable, Supplier { + private final TableSchema tableSchema; + private SerializableTableSchemaSupplier(TableSchema tableSchema) { + this.tableSchema = tableSchema; + } + + private Object writeReplace() { + return new SerializableTableSchemaString(BigQueryHelpers.toJsonString(tableSchema)); + } + + @Override + public TableSchema get() { + return tableSchema; + } + } + + protected static class BigQueryReader extends BoundedReader { private final BigQuerySourceBase source; private final BigQueryServices.BigQueryJsonReader reader; From c254df9645491bee096765826b3d5fad57e8468d Mon Sep 17 00:00:00 2001 From: Neville Li Date: Tue, 18 Jul 2017 19:53:28 -0400 Subject: [PATCH 2/2] use Suppliers.{memorize,compuse,ofInstance} instead --- .../io/gcp/bigquery/BigQuerySourceBase.java | 48 ++++--------------- 1 file changed, 10 insertions(+), 38 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java index f96ebf0cb84c..79c0ec8f0fd1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySourceBase.java @@ -29,7 +29,9 @@ import com.google.api.services.bigquery.model.TableReference; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; +import com.google.common.base.Function; import com.google.common.base.Supplier; +import com.google.common.base.Suppliers; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; import java.io.IOException; @@ -48,6 +50,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.annotation.Nullable; + /** * An abstract {@link BoundedSource} to read a table from BigQuery. * @@ -170,8 +174,8 @@ List> createSources(List files, TableSchema SerializableFunction function = new SerializableFunction() { - private Supplier schema = new SerializableTableSchemaSupplier( - BigQueryHelpers.fromJsonString(jsonSchema, TableSchema.class)); + private Supplier schema = Suppliers.memoize( + Suppliers.compose(new TableSchemaFunction(), Suppliers.ofInstance(jsonSchema))); @Override public TableRow apply(GenericRecord input) { @@ -186,46 +190,14 @@ public TableRow apply(GenericRecord input) { return ImmutableList.copyOf(avroSources); } - /** - * A {@link Serializable} object that holds the {@link String} version of a {@link TableSchema}. - * This is paired with the {@link SerializableTableSchemaSupplier} via {@link Serializable}'s - * usage of the {@link #readResolve} method. - */ - private static class SerializableTableSchemaString implements Serializable { - private final String jsonSchema; - private SerializableTableSchemaString(String jsonSchema) { - this.jsonSchema = jsonSchema; - } - - private Object readResolve() throws IOException, ClassCastException { - return new SerializableTableSchemaSupplier( - BigQueryHelpers.fromJsonString(jsonSchema, TableSchema.class)); - } - } - - /** - * A {@link Serializable} object that delegates to the {@link SerializableTableSchemaString} via - * {@link Serializable}'s usage of the {@link #writeReplace} method. Kryo doesn't utilize - * Java's serialization and hence is able to encode the {@link TableSchema} object directly. - */ - private static class SerializableTableSchemaSupplier - implements Serializable, Supplier { - private final TableSchema tableSchema; - private SerializableTableSchemaSupplier(TableSchema tableSchema) { - this.tableSchema = tableSchema; - } - - private Object writeReplace() { - return new SerializableTableSchemaString(BigQueryHelpers.toJsonString(tableSchema)); - } - + private static class TableSchemaFunction implements Serializable, Function { + @Nullable @Override - public TableSchema get() { - return tableSchema; + public TableSchema apply(@Nullable String input) { + return BigQueryHelpers.fromJsonString(input, TableSchema.class); } } - protected static class BigQueryReader extends BoundedReader { private final BigQuerySourceBase source; private final BigQueryServices.BigQueryJsonReader reader;