From 65519e1b37cdae00410ff99272b7782656de0c11 Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Mon, 23 May 2016 16:34:38 -0700 Subject: [PATCH 1/3] Updated all existing instances of PCollection's #setCoder to use Create.of use it's #withCoder --- .../java/org/apache/beam/sdk/io/PubsubIO.java | 2 +- .../apache/beam/sdk/io/BigQueryIOTest.java | 22 ++++++++----------- 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java index 77c0b35ad81b..7e24253821b8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java @@ -645,7 +645,7 @@ public PCollection apply(PInput input) { if (boundedOutput) { return input.getPipeline().begin() - .apply(Create.of((Void) null)).setCoder(VoidCoder.of()) + .apply(Create.of((Void) null).withCoder(VoidCoder.of())) .apply(ParDo.of(new PubsubBoundedReader())) .setCoder(coder); } else { diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java index 2d1b5505a8bd..c5016be9bf51 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java @@ -511,19 +511,15 @@ public void testCustomSink() throws Exception { .pollJobReturns(Status.FAILED, Status.FAILED, Status.SUCCEEDED)); Pipeline p = TestPipeline.create(bqOptions); - p.apply(Create.of( - new TableRow().set("name", "a").set("number", 1), + p.apply(Create.of(new TableRow().set("name", "a").set("number", 1), new TableRow().set("name", "b").set("number", 2), - new TableRow().set("name", "c").set("number", 3))) - .setCoder(TableRowJsonCoder.of()) + new TableRow().set("name", "c").set("number", 3)) + .withCoder(TableRowJsonCoder.of())) .apply(BigQueryIO.Write.to("dataset-id.table-id") - .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) - .withSchema(new TableSchema().setFields( - ImmutableList.of( - new TableFieldSchema().setName("name").setType("STRING"), - new TableFieldSchema().setName("number").setType("INTEGER")))) - .withTestServices(fakeBqServices) - .withoutValidation()); + .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED).withSchema(new TableSchema() + .setFields(ImmutableList.of(new TableFieldSchema().setName("name").setType("STRING"), + new TableFieldSchema().setName("number").setType("INTEGER")))) + .withTestServices(fakeBqServices).withoutValidation()); p.run(); logged.verifyInfo("Starting BigQuery load job"); @@ -547,8 +543,8 @@ public void testCustomSinkUnknown() throws Exception { p.apply(Create.of( new TableRow().set("name", "a").set("number", 1), new TableRow().set("name", "b").set("number", 2), - new TableRow().set("name", "c").set("number", 3))) - .setCoder(TableRowJsonCoder.of()) + new TableRow().set("name", "c").set("number", 3)) + .withCoder(TableRowJsonCoder.of())) .apply(BigQueryIO.Write.to("project-id:dataset-id.table-id") .withCreateDisposition(CreateDisposition.CREATE_NEVER) .withTestServices(fakeBqServices) From 3a21f536cbcbe0bc6feef1a234e022d0bd29857b Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Mon, 23 May 2016 16:43:44 -0700 Subject: [PATCH 2/3] Fixed formatting changes. --- .../org/apache/beam/sdk/io/BigQueryIOTest.java | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java index c5016be9bf51..9d786a672104 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java @@ -511,15 +511,20 @@ public void testCustomSink() throws Exception { .pollJobReturns(Status.FAILED, Status.FAILED, Status.SUCCEEDED)); Pipeline p = TestPipeline.create(bqOptions); - p.apply(Create.of(new TableRow().set("name", "a").set("number", 1), + p.apply(Create.of( + new TableRow().set("name", "a").set("number", 1), new TableRow().set("name", "b").set("number", 2), new TableRow().set("name", "c").set("number", 3)) .withCoder(TableRowJsonCoder.of())) + .setCoder(TableRowJsonCoder.of()) .apply(BigQueryIO.Write.to("dataset-id.table-id") - .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED).withSchema(new TableSchema() - .setFields(ImmutableList.of(new TableFieldSchema().setName("name").setType("STRING"), - new TableFieldSchema().setName("number").setType("INTEGER")))) - .withTestServices(fakeBqServices).withoutValidation()); + .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) + .withSchema(new TableSchema().setFields( + ImmutableList.of( + new TableFieldSchema().setName("name").setType("STRING"), + new TableFieldSchema().setName("number").setType("INTEGER")))) + .withTestServices(fakeBqServices) + .withoutValidation()); p.run(); logged.verifyInfo("Starting BigQuery load job"); From 3a967e3add790a6acd4064f1b8ea63834107e482 Mon Sep 17 00:00:00 2001 From: Ilya Ganelin Date: Mon, 23 May 2016 16:44:45 -0700 Subject: [PATCH 3/3] Deleted stale code. --- .../src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java index 9d786a672104..7c360b940427 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java @@ -516,7 +516,6 @@ public void testCustomSink() throws Exception { new TableRow().set("name", "b").set("number", 2), new TableRow().set("name", "c").set("number", 3)) .withCoder(TableRowJsonCoder.of())) - .setCoder(TableRowJsonCoder.of()) .apply(BigQueryIO.Write.to("dataset-id.table-id") .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) .withSchema(new TableSchema().setFields(