From 5b9ae1352832e8dfe4fc44ee924ac2654db9ac1a Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Sun, 13 Nov 2016 20:29:31 -0800 Subject: [PATCH 1/2] Add PubSub attributes support to PubsubIO. --- .../core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java index 806b7da73639..8c635a93a9e6 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java @@ -152,13 +152,12 @@ private static void populateCommonDisplayData(DisplayData.Builder builder, .withLabel("Pubsub Topic")); } } - + /** * Class representing a Pub/Sub message. Each message contains a single message payload and * a map of attached attributes. */ public static class PubsubMessage { - private byte[] message; private Map attributes; @@ -177,9 +176,7 @@ public byte[] getMessage() { /** * Returns the given attribute value. If not such attribute exists, returns null. */ - @Nullable public String getAttribute(String attribute) { - checkNotNull(attribute, "attribute"); return attributes.get(attribute); } @@ -1066,6 +1063,7 @@ public PDone expand(PCollection input) { idLabel, formatFn, 100 /* numShards */)); + } throw new RuntimeException(); // cases are exhaustive. } From 70f1321147c9755ef2c58b1c1eb293ec6ac38f55 Mon Sep 17 00:00:00 2001 From: Reuven Lax Date: Wed, 22 Feb 2017 00:05:34 -0800 Subject: [PATCH 2/2] Fix CheckStyle issues --- .../java/org/apache/beam/sdk/io/PubsubIO.java | 3 +- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 158 ++++++++++-------- .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 15 +- 3 files changed, 100 insertions(+), 76 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java index 8c635a93a9e6..26c4536833ef 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java @@ -17,7 +17,6 @@ */ package org.apache.beam.sdk.io; -import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; import com.google.common.base.Strings; @@ -152,7 +151,7 @@ private static void populateCommonDisplayData(DisplayData.Builder builder, .withLabel("Pubsub Topic")); } } - + /** * Class representing a Pub/Sub message. Each message contains a single message payload and * a map of attached attributes. diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index 5dbec5423ae6..94b011cdc3d3 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -209,8 +209,8 @@ *

Sharding BigQuery output tables

* *

A common use case is to dynamically generate BigQuery table names based on - * the current window. To support this, - * {@link BigQueryIO.Write#to(SerializableFunction)} + * the current window or element. To support this, + * {@link BigQueryIO.Write#to(BigQueryIO.TableWritePolicy)} * accepts a function mapping the current window to a tablespec. For example, * here's code that outputs daily tables to BigQuery: *

{@code
@@ -218,12 +218,12 @@
  * quotes.apply(Window.into(CalendarWindows.days(1)))
  *       .apply(BigQueryIO.Write
  *         .withSchema(schema)
- *         .to(new SerializableFunction() {
- *           public String apply(BoundedWindow window) {
+ *         .to(new TableWritePolicy() {
+ *           public String apply(Context window) {
  *             // The cast below is safe because CalendarWindows.days(1) produces IntervalWindows.
  *             String dayString = DateTimeFormat.forPattern("yyyy_MM_dd")
  *                  .withZone(DateTimeZone.UTC)
- *                  .print(((IntervalWindow) window).start());
+ *                  .print(((IntervalWindow) context.getWindow()).start());
  *             return "my-project:output.output_table_" + dayString;
  *           }
  *         }));
@@ -433,6 +433,33 @@ private static ValueProvider displayTable(
     return NestedValueProvider.of(table, new TableRefToTableSpec());
   }
 
+  /**
+   * A policy for determining output table names. Currently supports examining the elemented or
+   * the window.
+   */
+   public abstract static class TableWritePolicy implements SerializableFunction<
+      TableWritePolicy.Context, T> {
+    /**
+     * Context object used by the policy to determine the destination table.
+     */
+    public static class Context {
+      private TableRow element;
+      private BoundedWindow window;
+
+      Context(TableRow element, BoundedWindow window) {
+        this.element = element;
+        this.window = window;
+      }
+
+      public TableRow getElement() {
+        return element;
+      }
+
+      public BoundedWindow getWindow() {
+        return window;
+      }
+    }
+  }
   /**
    * A {@link PTransform} that reads from a BigQuery table and returns a
    * {@link PCollection} of {@link TableRow TableRows} containing each of the rows of the table.
@@ -1588,20 +1615,19 @@ public static Bound to(TableReference table) {
      * 

{@code tableSpecFunction} should be deterministic. When given the same window, it should * always return the same table specification. */ - public static Bound to(SerializableFunction tableSpecFunction) { - return new Bound().to(tableSpecFunction); + public static Bound to(TableWritePolicy tableRefPolicy) { + return new Bound().to(tableRefPolicy); } /** * Creates a write transformation from a function that maps windows to {@link TableReference} * objects. * - *

{@code tableRefFunction} should be deterministic. When given the same window, it should + *

{@code tableRefPolicy} should be deterministic. When given the same window, it should * always return the same table reference. */ - public static Bound toTableReference( - SerializableFunction tableRefFunction) { - return new Bound().toTableReference(tableRefFunction); + public static Bound toTableReference(TableWritePolicy tableRefPolicy) { + return new Bound().toTableReference(tableRefPolicy); } /** @@ -1664,7 +1690,7 @@ public static class Bound extends PTransform, PDone> { @Nullable final ValueProvider jsonTableRef; - @Nullable final SerializableFunction tableRefFunction; + @Nullable final TableWritePolicy tableRefPolicy; // Table schema. The schema is required only if the table does not exist. @Nullable final ValueProvider jsonSchema; @@ -1687,16 +1713,15 @@ public static class Bound extends PTransform, PDone> { @VisibleForTesting @Nullable String stepUuid; @VisibleForTesting @Nullable ValueProvider jobUuid; - private static class TranslateTableSpecFunction implements - SerializableFunction { - private SerializableFunction tableSpecFunction; + private static class TranslateTableSpecFunction extends TableWritePolicy { + private TableWritePolicy tableSpecFunction; - TranslateTableSpecFunction(SerializableFunction tableSpecFunction) { + TranslateTableSpecFunction(TableWritePolicy tableSpecFunction) { this.tableSpecFunction = tableSpecFunction; } @Override - public TableReference apply(BoundedWindow value) { + public TableReference apply(TableWritePolicy.Context value) { return parseTableSpec(tableSpecFunction.apply(value)); } } @@ -1711,7 +1736,7 @@ public Bound() { this( null /* name */, null /* jsonTableRef */, - null /* tableRefFunction */, + null /* tableRefPolicy */, null /* jsonSchema */, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, @@ -1721,7 +1746,7 @@ public Bound() { } private Bound(String name, @Nullable ValueProvider jsonTableRef, - @Nullable SerializableFunction tableRefFunction, + @Nullable TableWritePolicy tableRefPolicy, @Nullable ValueProvider jsonSchema, CreateDisposition createDisposition, WriteDisposition writeDisposition, @@ -1730,7 +1755,7 @@ private Bound(String name, @Nullable ValueProvider jsonTableRef, @Nullable BigQueryServices bigQueryServices) { super(name); this.jsonTableRef = jsonTableRef; - this.tableRefFunction = tableRefFunction; + this.tableRefPolicy = tableRefPolicy; this.jsonSchema = jsonSchema; this.createDisposition = checkNotNull(createDisposition, "createDisposition"); this.writeDisposition = checkNotNull(writeDisposition, "writeDisposition"); @@ -1777,7 +1802,7 @@ public Bound to(ValueProvider tableSpec) { private Bound toTableRef(ValueProvider table) { return new Bound(name, NestedValueProvider.of(table, new TableRefToJson()), - tableRefFunction, jsonSchema, createDisposition, + tableRefPolicy, jsonSchema, createDisposition, writeDisposition, tableDescription, validate, bigQueryServices); } @@ -1787,12 +1812,11 @@ private Bound toTableRef(ValueProvider table) { * *

Does not modify this object. * - *

{@code tableSpecFunction} should be deterministic. When given the same window, it + *

{@code tableRefPolicy} should be deterministic. When given the same window, it * should always return the same table specification. */ - public Bound to( - SerializableFunction tableSpecFunction) { - return toTableReference(new TranslateTableSpecFunction(tableSpecFunction)); + public Bound to(TableWritePolicy tableRefPolicy) { + return toTableReference(new TranslateTableSpecFunction(tableRefPolicy)); } /** @@ -1801,12 +1825,11 @@ public Bound to( * *

Does not modify this object. * - *

{@code tableRefFunction} should be deterministic. When given the same window, it should + *

{@code tableRefPolicy} should be deterministic. When given the same window, it should * always return the same table reference. */ - public Bound toTableReference( - SerializableFunction tableRefFunction) { - return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, + public Bound toTableReference(TableWritePolicy tableRefPolicy) { + return new Bound(name, jsonTableRef, tableRefPolicy, jsonSchema, createDisposition, writeDisposition, tableDescription, validate, bigQueryServices); } @@ -1817,7 +1840,7 @@ public Bound toTableReference( *

Does not modify this object. */ public Bound withSchema(TableSchema schema) { - return new Bound(name, jsonTableRef, tableRefFunction, + return new Bound(name, jsonTableRef, tableRefPolicy, StaticValueProvider.of(toJsonString(schema)), createDisposition, writeDisposition, tableDescription, validate, bigQueryServices); } @@ -1826,7 +1849,7 @@ public Bound withSchema(TableSchema schema) { * Like {@link #withSchema(TableSchema)}, but with a {@link ValueProvider}. */ public Bound withSchema(ValueProvider schema) { - return new Bound(name, jsonTableRef, tableRefFunction, + return new Bound(name, jsonTableRef, tableRefPolicy, NestedValueProvider.of(schema, new TableSchemaToJsonSchema()), createDisposition, writeDisposition, tableDescription, validate, bigQueryServices); } @@ -1837,7 +1860,7 @@ public Bound withSchema(ValueProvider schema) { *

Does not modify this object. */ public Bound withCreateDisposition(CreateDisposition createDisposition) { - return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, + return new Bound(name, jsonTableRef, tableRefPolicy, jsonSchema, createDisposition, writeDisposition, tableDescription, validate, bigQueryServices); } @@ -1847,7 +1870,7 @@ public Bound withCreateDisposition(CreateDisposition createDisposition) { *

Does not modify this object. */ public Bound withWriteDisposition(WriteDisposition writeDisposition) { - return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, + return new Bound(name, jsonTableRef, tableRefPolicy, jsonSchema, createDisposition, writeDisposition, tableDescription, validate, bigQueryServices); } @@ -1857,7 +1880,7 @@ public Bound withWriteDisposition(WriteDisposition writeDisposition) { *

Does not modify this object. */ public Bound withTableDescription(@Nullable String tableDescription) { - return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, + return new Bound(name, jsonTableRef, tableRefPolicy, jsonSchema, createDisposition, writeDisposition, tableDescription, validate, bigQueryServices); } @@ -1867,13 +1890,13 @@ public Bound withTableDescription(@Nullable String tableDescription) { *

Does not modify this object. */ public Bound withoutValidation() { - return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, + return new Bound(name, jsonTableRef, tableRefPolicy, jsonSchema, createDisposition, writeDisposition, tableDescription, false, bigQueryServices); } @VisibleForTesting Bound withTestServices(BigQueryServices testServices) { - return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, + return new Bound(name, jsonTableRef, tableRefPolicy, jsonSchema, createDisposition, writeDisposition, tableDescription, validate, testServices); } @@ -1903,11 +1926,11 @@ public void validate(PCollection input) { // Exactly one of the table and table reference can be configured. checkState( - jsonTableRef != null || tableRefFunction != null, + jsonTableRef != null || tableRefPolicy != null, "must set the table reference of a BigQueryIO.Write transform"); checkState( - jsonTableRef == null || tableRefFunction == null, - "Cannot set both a table reference and a table function for a BigQueryIO.Write" + jsonTableRef == null || tableRefPolicy == null, + "Cannot set both a table reference and a table policy for a BigQueryIO.Write" + " transform"); // Require a schema if creating one or more tables. @@ -1933,13 +1956,13 @@ public void validate(PCollection input) { } } - if (input.isBounded() == PCollection.IsBounded.UNBOUNDED || tableRefFunction != null) { + if (input.isBounded() == PCollection.IsBounded.UNBOUNDED || tableRefPolicy != null) { // We will use BigQuery's streaming write API -- validate supported dispositions. - if (tableRefFunction != null) { + if (tableRefPolicy != null) { checkArgument( createDisposition != CreateDisposition.CREATE_NEVER, "CreateDisposition.CREATE_NEVER is not supported when using a tablespec" - + " function."); + + " policy."); } if (jsonSchema == null) { checkArgument( @@ -1950,7 +1973,7 @@ public void validate(PCollection input) { checkArgument( writeDisposition != WriteDisposition.WRITE_TRUNCATE, "WriteDisposition.WRITE_TRUNCATE is not supported for an unbounded PCollection or" - + " when using a tablespec function."); + + " when using a tablespec policy."); } else { // We will use a BigQuery load job -- validate the temp location. String tempLocation = options.getTempLocation(); @@ -1977,11 +2000,11 @@ public PDone expand(PCollection input) { BigQueryOptions options = p.getOptions().as(BigQueryOptions.class); BigQueryServices bqServices = getBigQueryServices(); - // When writing an Unbounded PCollection, or when a tablespec function is defined, we use + // When writing an Unbounded PCollection, or when a tablespec policy is defined, we use // StreamWithDeDup and BigQuery's streaming import API. - if (input.isBounded() == IsBounded.UNBOUNDED || tableRefFunction != null) { + if (input.isBounded() == IsBounded.UNBOUNDED || tableRefPolicy != null) { return input.apply( - new StreamWithDeDup(getTable(), tableRefFunction, + new StreamWithDeDup(getTable(), tableRefPolicy, jsonSchema == null ? null : NestedValueProvider.of( jsonSchema, new JsonSchemaToTableSchema()), createDisposition, @@ -2144,9 +2167,9 @@ public void populateDisplayData(DisplayData.Builder builder) { .addIfNotNull(DisplayData.item("schema", jsonSchema) .withLabel("Table Schema")); - if (tableRefFunction != null) { - builder.add(DisplayData.item("tableFn", tableRefFunction.getClass()) - .withLabel("Table Reference Function")); + if (tableRefPolicy != null) { + builder.add(DisplayData.item("tableFn", tableRefPolicy.getClass()) + .withLabel("Table Reference Policy")); } builder @@ -2177,7 +2200,7 @@ public TableSchema getSchema() { } /** - * Returns the table to write, or {@code null} if writing with {@code tableRefFunction}. + * Returns the table to write, or {@code null} if writing with {@code tableRefPolicy}. * *

If the table's project is not specified, use the executing project. */ @@ -2941,17 +2964,17 @@ static class TagWithUniqueIdsAndTable /** TableSpec to write to. */ private final ValueProvider tableSpec; - /** User function mapping windows to {@link TableReference} in JSON. */ - private final SerializableFunction tableRefFunction; + /** User policy mapping windows to {@link TableReference} in JSON. */ + private final TableWritePolicy tableRefPolicy; private transient String randomUUID; private transient long sequenceNo = 0L; TagWithUniqueIdsAndTable(BigQueryOptions options, ValueProvider table, - SerializableFunction tableRefFunction) { - checkArgument(table == null ^ tableRefFunction == null, - "Exactly one of table or tableRefFunction should be set"); + TableWritePolicy tableRefPolicy) { + checkArgument(table == null ^ tableRefPolicy == null, + "Exactly one of table or tableRefPolicy should be set"); if (table != null) { if (table.isAccessible() && Strings.isNullOrEmpty(table.get().getProjectId())) { TableReference tableRef = table.get() @@ -2964,7 +2987,7 @@ static class TagWithUniqueIdsAndTable } else { tableSpec = null; } - this.tableRefFunction = tableRefFunction; + this.tableRefPolicy = tableRefPolicy; } @@ -2978,8 +3001,8 @@ public void startBundle(Context context) { public void processElement(ProcessContext context, BoundedWindow window) throws IOException { String uniqueId = randomUUID + sequenceNo++; ThreadLocalRandom randomGenerator = ThreadLocalRandom.current(); - String tableSpec = tableSpecFromWindow( - context.getPipelineOptions().as(BigQueryOptions.class), window); + String tableSpec = tableSpecFromElementAndWindow( + context.getPipelineOptions().as(BigQueryOptions.class), context.element(), window); // We output on keys 0-50 to ensure that there's enough batching for // BigQuery. context.output(KV.of(ShardedKey.of(tableSpec, randomGenerator.nextInt(0, 50)), @@ -2991,9 +3014,9 @@ public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); builder.addIfNotNull(DisplayData.item("table", tableSpec)); - if (tableRefFunction != null) { - builder.add(DisplayData.item("tableFn", tableRefFunction.getClass()) - .withLabel("Table Reference Function")); + if (tableRefPolicy != null) { + builder.add(DisplayData.item("tableFn", tableRefPolicy.getClass()) + .withLabel("Table Reference Policy")); } } @@ -3002,11 +3025,12 @@ ValueProvider getTableSpec() { return tableSpec; } - private String tableSpecFromWindow(BigQueryOptions options, BoundedWindow window) { + private String tableSpecFromElementAndWindow( + BigQueryOptions options, TableRow element, BoundedWindow window) { if (tableSpec != null) { return tableSpec.get(); } else { - TableReference table = tableRefFunction.apply(window); + TableReference table = tableRefPolicy.apply(new TableWritePolicy.Context(element, window)); if (table.getProjectId() == null) { table.setProjectId(options.getProject()); } @@ -3023,7 +3047,7 @@ private String tableSpecFromWindow(BigQueryOptions options, BoundedWindow window */ private static class StreamWithDeDup extends PTransform, PDone> { @Nullable private final transient ValueProvider tableReference; - @Nullable private final SerializableFunction tableRefFunction; + @Nullable private final TableWritePolicy tableRefPolicy; @Nullable private final transient ValueProvider tableSchema; private final Write.CreateDisposition createDisposition; private final BigQueryServices bqServices; @@ -3031,12 +3055,12 @@ private static class StreamWithDeDup extends PTransform, P /** Constructor. */ StreamWithDeDup(ValueProvider tableReference, - @Nullable SerializableFunction tableRefFunction, + @Nullable TableWritePolicy tableRefPolicy, @Nullable ValueProvider tableSchema, Write.CreateDisposition createDisposition, @Nullable String tableDescription, BigQueryServices bqServices) { this.tableReference = tableReference; - this.tableRefFunction = tableRefFunction; + this.tableRefPolicy = tableRefPolicy; this.tableSchema = tableSchema; this.createDisposition = createDisposition; this.bqServices = checkNotNull(bqServices, "bqServices"); @@ -3062,7 +3086,7 @@ public PDone expand(PCollection input) { PCollection, TableRowInfo>> tagged = input.apply(ParDo.of( new TagWithUniqueIdsAndTable(input.getPipeline().getOptions().as(BigQueryOptions.class), - tableReference, tableRefFunction))); + tableReference, tableRefPolicy))); // To prevent having the same TableRow processed more than once with regenerated // different unique ids, this implementation relies on "checkpointing", which is diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 3aa90cfedfb1..c73d4627c721 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -109,6 +109,7 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.PassThroughThenCleanup; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.PassThroughThenCleanup.CleanupOperation; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Status; +import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TableWritePolicy; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TransformingSource; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.TableRowWriter; @@ -1172,11 +1173,11 @@ public String apply(TableRow value) { } ); - SerializableFunction tableFunction = - new SerializableFunction() { + TableWritePolicy tableFunction = new TableWritePolicy() { @Override - public String apply(BoundedWindow input) { - return "project-id:dataset-id.table-id-" + ((PartitionedGlobalWindow) input).value; + public String apply(Context input) { + return "project-id:dataset-id.table-id-" + + ((PartitionedGlobalWindow) input.getWindow()).value; } }; @@ -1722,14 +1723,14 @@ public void testWriteValidateFailsTableAndTableSpec() { p.enableAbandonedNodeEnforcement(false); thrown.expect(IllegalStateException.class); - thrown.expectMessage("Cannot set both a table reference and a table function"); + thrown.expectMessage("Cannot set both a table reference and a table policy"); p .apply(Create.empty(TableRowJsonCoder.of())) .apply(BigQueryIO.Write .to("dataset.table") - .to(new SerializableFunction() { + .to(new BigQueryIO.TableWritePolicy() { @Override - public String apply(BoundedWindow input) { + public String apply(Context input) { return null; } }));