From 796649d70a3a2772a6b4b010b44563ca61b7c2d1 Mon Sep 17 00:00:00 2001 From: Pei He Date: Mon, 23 Jan 2017 16:25:43 -0800 Subject: [PATCH 1/5] [BEAM-1235] BigQueryIO.Write: log failed load/copy jobs. --- .../beam/sdk/io/gcp/bigquery/BigQueryIO.java | 63 ++++++++++++------ .../io/gcp/bigquery/BigQueryServicesImpl.java | 1 + .../sdk/io/gcp/bigquery/BigQueryIOTest.java | 64 ++++++++++++------- .../bigquery/BigQueryServicesImplTest.java | 2 + 4 files changed, 87 insertions(+), 43 deletions(-) diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java index b6f9fb03234b..4ace9852fdaa 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java @@ -1155,7 +1155,8 @@ private void executeQuery( jobService.startQueryJob(jobRef, queryConfig); Job job = jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES); if (parseStatus(job) != Status.SUCCEEDED) { - throw new IOException("Query job failed: " + jobId); + throw new IOException(String.format( + "Query job %s failed, status: %s.", jobId, statusToPrettyString(job.getStatus()))); } } @@ -1260,8 +1261,8 @@ private List executeExtract( jobService.pollJob(jobRef, JOB_POLL_MAX_RETRIES); if (parseStatus(extractJob) != Status.SUCCEEDED) { throw new IOException(String.format( - "Extract job %s failed, status: %s", - extractJob.getJobReference().getJobId(), extractJob.getStatus())); + "Extract job %s failed, status: %s.", + extractJob.getJobReference().getJobId(), statusToPrettyString(extractJob.getStatus()))); } List tempFiles = getExtractFilePaths(extractDestinationDir, extractJob); @@ -2361,30 +2362,36 @@ private void load( .setSourceFormat("NEWLINE_DELIMITED_JSON"); String projectId = ref.getProjectId(); + Job lastFailedLoadJob = null; for (int i = 0; i < Bound.MAX_RETRY_JOBS; ++i) { String jobId = jobIdPrefix + "-" + i; - LOG.info("Starting BigQuery load job {}: try {}/{}", jobId, i, Bound.MAX_RETRY_JOBS); JobReference jobRef = new JobReference() .setProjectId(projectId) .setJobId(jobId); jobService.startLoadJob(jobRef, loadConfig); - Status jobStatus = - parseStatus(jobService.pollJob(jobRef, Bound.LOAD_JOB_POLL_MAX_RETRIES)); + Job loadJob = jobService.pollJob(jobRef, Bound.LOAD_JOB_POLL_MAX_RETRIES); + Status jobStatus = parseStatus(loadJob); switch (jobStatus) { case SUCCEEDED: return; case UNKNOWN: - throw new RuntimeException("Failed to poll the load job status of job " + jobId); + throw new RuntimeException(String.format( + "UNKNOWN status of load job [%s]: %s.", jobId, jobToPrettyString(loadJob))); case FAILED: - LOG.info("BigQuery load job failed: {}", jobId); + lastFailedLoadJob = loadJob; continue; default: - throw new IllegalStateException(String.format("Unexpected job status: %s of job %s", - jobStatus, jobId)); + throw new IllegalStateException(String.format( + "Unexpected status [%s] of load job: %s.", + jobStatus, jobToPrettyString(loadJob))); } } - throw new RuntimeException(String.format("Failed to create the load job %s, reached max " - + "retries: %d", jobIdPrefix, Bound.MAX_RETRY_JOBS)); + throw new RuntimeException(String.format( + "Failed to create load job with id prefix %s, " + + "reached max retries: %d, last failed load job: %s.", + jobIdPrefix, + Bound.MAX_RETRY_JOBS, + jobToPrettyString(lastFailedLoadJob))); } static void removeTemporaryFiles( @@ -2491,30 +2498,36 @@ private void copy( .setCreateDisposition(createDisposition.name()); String projectId = ref.getProjectId(); + Job lastFailedCopyJob = null; for (int i = 0; i < Bound.MAX_RETRY_JOBS; ++i) { String jobId = jobIdPrefix + "-" + i; - LOG.info("Starting BigQuery copy job {}: try {}/{}", jobId, i, Bound.MAX_RETRY_JOBS); JobReference jobRef = new JobReference() .setProjectId(projectId) .setJobId(jobId); jobService.startCopyJob(jobRef, copyConfig); - Status jobStatus = - parseStatus(jobService.pollJob(jobRef, Bound.LOAD_JOB_POLL_MAX_RETRIES)); + Job copyJob = jobService.pollJob(jobRef, Bound.LOAD_JOB_POLL_MAX_RETRIES); + Status jobStatus = parseStatus(copyJob); switch (jobStatus) { case SUCCEEDED: return; case UNKNOWN: - throw new RuntimeException("Failed to poll the copy job status of job " + jobId); + throw new RuntimeException(String.format( + "UNKNOWN status of copy job [%s]: %s.", jobId, jobToPrettyString(copyJob))); case FAILED: - LOG.info("BigQuery copy job failed: {}", jobId); + lastFailedCopyJob = copyJob; continue; default: - throw new IllegalStateException(String.format("Unexpected job status: %s of job %s", - jobStatus, jobId)); + throw new IllegalStateException(String.format( + "Unexpected status [%s] of load job: %s.", + jobStatus, jobToPrettyString(copyJob))); } } - throw new RuntimeException(String.format("Failed to create the copy job %s, reached max " - + "retries: %d", jobIdPrefix, Bound.MAX_RETRY_JOBS)); + throw new RuntimeException(String.format( + "Failed to create copy job with id prefix %s, " + + "reached max retries: %d, last failed copy job: %s.", + jobIdPrefix, + Bound.MAX_RETRY_JOBS, + jobToPrettyString(lastFailedCopyJob))); } static void removeTemporaryTables(DatasetService tableService, @@ -2549,6 +2562,14 @@ public void populateDisplayData(DisplayData.Builder builder) { private Write() {} } + private static String jobToPrettyString(@Nullable Job job) throws IOException { + return job == null ? "null" : job.toPrettyString(); + } + + private static String statusToPrettyString(@Nullable JobStatus status) throws IOException { + return status == null ? "Unknown status: null." : status.toPrettyString(); + } + private static void verifyDatasetPresence(DatasetService datasetService, TableReference table) { try { datasetService.getDataset(table.getProjectId(), table.getDatasetId()); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java index 75796abce72d..7c3edbe20c7a 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImpl.java @@ -214,6 +214,7 @@ static void startJob( do { try { client.jobs().insert(jobRef.getProjectId(), job).execute(); + LOG.info("Started BigQuery job: {}.", jobRef); return; // SUCCEEDED } catch (GoogleJsonResponseException e) { if (errorExtractor.itemAlreadyExists(e)) { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java index 0b8d60de86e1..bbfc2ce1ac86 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIOTest.java @@ -988,12 +988,6 @@ public void testWrite() throws Exception { .withoutValidation()); p.run(); - logged.verifyInfo("Starting BigQuery load job"); - logged.verifyInfo("BigQuery load job failed"); - logged.verifyInfo("try 0/" + BigQueryIO.Write.Bound.MAX_RETRY_JOBS); - logged.verifyInfo("try 1/" + BigQueryIO.Write.Bound.MAX_RETRY_JOBS); - logged.verifyInfo("try 2/" + BigQueryIO.Write.Bound.MAX_RETRY_JOBS); - logged.verifyNotLogged("try 3/" + BigQueryIO.Write.Bound.MAX_RETRY_JOBS); File tempDir = new File(bqOptions.getTempLocation()); testNumFiles(tempDir, 0); } @@ -1232,11 +1226,49 @@ public void testWriteUnknown() throws Exception { .withoutValidation()); thrown.expect(RuntimeException.class); - thrown.expectMessage("Failed to poll the load job status"); - p.run(); + thrown.expectMessage("UNKNOWN status of load job"); + try { + p.run(); + } finally { + File tempDir = new File(bqOptions.getTempLocation()); + testNumFiles(tempDir, 0); + } + } - File tempDir = new File(bqOptions.getTempLocation()); - testNumFiles(tempDir, 0); + @Test + @Category(NeedsRunner.class) + public void testWriteFailedJobs() throws Exception { + BigQueryOptions bqOptions = TestPipeline.testingPipelineOptions().as(BigQueryOptions.class); + bqOptions.setProject("defaultProject"); + bqOptions.setTempLocation(testFolder.newFolder("BigQueryIOTest").getAbsolutePath()); + + FakeBigQueryServices fakeBqServices = new FakeBigQueryServices() + .withJobService(new FakeJobService() + .startJobReturns("done", "done", "done") + .pollJobReturns(Status.FAILED, Status.FAILED, Status.FAILED)); + + Pipeline p = TestPipeline.create(bqOptions); + p.apply(Create.of( + new TableRow().set("name", "a").set("number", 1), + new TableRow().set("name", "b").set("number", 2), + new TableRow().set("name", "c").set("number", 3)) + .withCoder(TableRowJsonCoder.of())) + .apply(BigQueryIO.Write.to("dataset-id.table-id") + .withCreateDisposition(CreateDisposition.CREATE_NEVER) + .withTestServices(fakeBqServices) + .withoutValidation()); + + thrown.expect(RuntimeException.class); + thrown.expectMessage("Failed to create load job with id prefix"); + thrown.expectMessage("reached max retries"); + thrown.expectMessage("last failed load job"); + + try { + p.run(); + } finally { + File tempDir = new File(bqOptions.getTempLocation()); + testNumFiles(tempDir, 0); + } } @Test @@ -2164,12 +2196,6 @@ public void testWriteTables() throws Exception { List tempTables = tester.takeOutputElements(); - logged.verifyInfo("Starting BigQuery load job"); - logged.verifyInfo("BigQuery load job failed"); - logged.verifyInfo("try 0/" + BigQueryIO.Write.Bound.MAX_RETRY_JOBS); - logged.verifyInfo("try 1/" + BigQueryIO.Write.Bound.MAX_RETRY_JOBS); - logged.verifyNotLogged("try 2/" + BigQueryIO.Write.Bound.MAX_RETRY_JOBS); - assertEquals(expectedTempTables, tempTables); } @@ -2237,12 +2263,6 @@ public void testWriteRename() throws Exception { DoFnTester tester = DoFnTester.of(writeRename); tester.setSideInput(tempTablesView, GlobalWindow.INSTANCE, tempTables); tester.processElement(null); - - logged.verifyInfo("Starting BigQuery copy job"); - logged.verifyInfo("BigQuery copy job failed"); - logged.verifyInfo("try 0/" + BigQueryIO.Write.Bound.MAX_RETRY_JOBS); - logged.verifyInfo("try 1/" + BigQueryIO.Write.Bound.MAX_RETRY_JOBS); - logged.verifyNotLogged("try 2/" + BigQueryIO.Write.Bound.MAX_RETRY_JOBS); } @Test diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java index 1ce10f1519c1..ef51650633c1 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryServicesImplTest.java @@ -138,6 +138,7 @@ public void testStartLoadJobSucceeds() throws IOException, InterruptedException verify(response, times(1)).getStatusCode(); verify(response, times(1)).getContent(); verify(response, times(1)).getContentType(); + expectedLogs.verifyInfo(String.format("Started BigQuery job: %s", jobRef)); } /** @@ -161,6 +162,7 @@ public void testStartLoadJobSucceedsAlreadyExists() throws IOException, Interrup verify(response, times(1)).getStatusCode(); verify(response, times(1)).getContent(); verify(response, times(1)).getContentType(); + expectedLogs.verifyNotLogged("Started BigQuery job"); } /** From 8505e9312705ceb0bb5779580c930ab2113c0b3d Mon Sep 17 00:00:00 2001 From: Jesse Anderson Date: Tue, 24 Jan 2017 08:37:33 -0800 Subject: [PATCH 2/5] Refactored existing code. Added iterable and KV. Changed from element to of. --- .../apache/beam/sdk/transforms/ToString.java | 168 ++++++++++++++++-- .../org/apache/beam/sdk/io/WriteTest.java | 2 +- .../beam/sdk/transforms/ToStringTest.java | 86 +++++++-- 3 files changed, 226 insertions(+), 30 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java index ef49267669eb..d5c9784d715c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ToString.java @@ -18,51 +18,181 @@ package org.apache.beam.sdk.transforms; +import java.util.Iterator; + +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; /** - * {@link PTransform PTransforms} for converting a {@link PCollection PCollection<T>} to a - * {@link PCollection PCollection<String>}. - * - *

Example of use: - *

 {@code
- * PCollection longs = ...;
- * PCollection strings = longs.apply(ToString.element());
- * } 
- * + * {@link PTransform PTransforms} for converting a {@link PCollection PCollection<?>}, + * {@link PCollection PCollection<KV<?,?>>}, or + * {@link PCollection PCollection<Iterable<?>>} + * to a {@link PCollection PCollection<String>}. * *

Note: For any custom string conversion and formatting, we recommend applying your own * {@link SerializableFunction} using {@link MapElements#via(SerializableFunction)} */ public final class ToString { + private ToString() { + // do not instantiate + } /** * Returns a {@code PTransform>} which transforms each * element of the input {@link PCollection} to a {@link String} using the * {@link Object#toString} method. */ - public static PTransform, PCollection> element() { - return new Default(); + public static PTransform, PCollection> of() { + return new SimpleToString(); } - private ToString() { + /** + * Returns a {@code PTransform, PCollection>} which transforms each + * element of the input {@link PCollection} to a {@link String} by using the + * {@link Object#toString} on the key followed by a "," followed by the {@link Object#toString} + * of the value. + */ + public static PTransform>, PCollection> kv() { + return kv(","); + } + + /** + * Returns a {@code PTransform, PCollection>} which transforms each + * element of the input {@link PCollection} to a {@link String} by using the + * {@link Object#toString} on the key followed by the specified delimeter followed by the + * {@link Object#toString} of the value. + * @param delimiter The delimiter to put between the key and value + */ + public static PTransform>, + PCollection> kv(String delimiter) { + return new KVToString(delimiter); + } + + /** + * Returns a {@code PTransform, PCollection>} which + * transforms each item in the iterable of the input {@link PCollection} to a {@link String} + * using the {@link Object#toString} method followed by a "," until + * the last element in the iterable. There is no trailing delimiter. + */ + public static PTransform>, PCollection> iterable() { + return iterable(","); + } + + /** + * Returns a {@code PTransform, PCollection>} which + * transforms each item in the iterable of the input {@link PCollection} to a {@link String} + * using the {@link Object#toString} method followed by the specified delimiter until + * the last element in the iterable. There is no trailing delimiter. + * @param delimiter The delimiter to put between the items in the iterable. + */ + public static PTransform>, + PCollection> iterable(String delimiter) { + return new IterablesToString(delimiter); } /** * A {@link PTransform} that converts a {@code PCollection} to a {@code PCollection} * using the {@link Object#toString} method. + * + *

Example of use: + *

{@code
+   * PCollection longs = ...;
+   * PCollection strings = longs.apply(ToString.of());
+   * }
+ * + * + *

Note: For any custom string conversion and formatting, we recommend applying your own + * {@link SerializableFunction} using {@link MapElements#via(SerializableFunction)} */ - private static final class Default extends PTransform, PCollection> { + private static final class SimpleToString extends + PTransform, PCollection> { @Override public PCollection expand(PCollection input) { - return input.apply(MapElements.via(new ToStringFunction<>())); + return input.apply(MapElements.via(new SimpleFunction() { + @Override + public String apply(Object input) { + return input.toString(); + } + })); } + } + + /** + * A {@link PTransform} that converts a {@code PCollection} of {@code KV} to a + * {@code PCollection} using the {@link Object#toString} method for + * the key and value and an optional delimiter. + * + *

Example of use: + *

{@code
+   * PCollection> nameToLong = ...;
+   * PCollection strings = nameToLong.apply(ToString.kv());
+   * }
+ * + * + *

Note: For any custom string conversion and formatting, we recommend applying your + * own {@link SerializableFunction} using {@link MapElements#via(SerializableFunction)} + */ + private static final class KVToString extends + PTransform>, PCollection> { + private final String delimiter; + + public KVToString(String delimiter) { + this.delimiter = delimiter; + } + + @Override + public PCollection expand(PCollection> input) { + return input.apply(MapElements.via(new SimpleFunction, String>() { + @Override + public String apply(KV input) { + return input.getKey().toString() + delimiter + input.getValue().toString(); + } + })); + } + } + + /** + * A {@link PTransform} that converts a {@code PCollection} of {@link Iterable} to a + * {@code PCollection} using the {@link Object#toString} method and + * an optional delimiter. + * + *

Example of use: + *

{@code
+   * PCollection> longs = ...;
+   * PCollection strings = nameToLong.apply(ToString.iterable());
+   * }
+ * + * + *

Note: For any custom string conversion and formatting, we recommend applying your + * own {@link SerializableFunction} using {@link MapElements#via(SerializableFunction)} + */ + private static final class IterablesToString extends + PTransform>, PCollection> { + private final String delimiter; + + public IterablesToString(String delimiter) { + this.delimiter = delimiter; + } + + @Override + public PCollection expand(PCollection> input) { + return input.apply(MapElements.via(new SimpleFunction, String>() { + @Override + public String apply(Iterable input) { + StringBuilder builder = new StringBuilder(); + Iterator iterator = input.iterator(); + + while (iterator.hasNext()) { + builder.append(iterator.next().toString()); + + if (iterator.hasNext()) { + builder.append(delimiter); + } + } - private static class ToStringFunction extends SimpleFunction { - @Override - public String apply(T input) { - return input.toString(); - } + return builder.toString(); + } + })); } } } diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java index 9772b9b945ff..f81cc0c984af 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java @@ -297,7 +297,7 @@ public void populateDisplayData(DisplayData.Builder builder) { @Test public void testWriteUnbounded() { PCollection unbounded = p.apply(CountingInput.unbounded()) - .apply(ToString.element()); + .apply(ToString.of()); TestSink sink = new TestSink(); thrown.expect(IllegalArgumentException.class); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java index e5c9f05b6bcb..ab984f10cdda 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ToStringTest.java @@ -20,10 +20,13 @@ import java.util.ArrayList; import java.util.Arrays; -import java.util.List; + +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.RunnableOnService; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.junit.Rule; import org.junit.Test; @@ -41,19 +44,82 @@ public class ToStringTest { @Test @Category(RunnableOnService.class) - public void testToStringElement() { + public void testToStringOf() { Integer[] ints = {1, 2, 3, 4, 5}; + String[] strings = {"1", "2", "3", "4", "5"}; PCollection input = p.apply(Create.of(Arrays.asList(ints))); - PCollection output = input.apply(ToString.element()); - PAssert.that(output).containsInAnyOrder(toStringList(ints)); + PCollection output = input.apply(ToString.of()); + PAssert.that(output).containsInAnyOrder(strings); + p.run(); + } + + @Test + @Category(RunnableOnService.class) + public void testToStringKV() { + ArrayList> kvs = new ArrayList<>(); + kvs.add(KV.of("one", 1)); + kvs.add(KV.of("two", 2)); + + ArrayList expected = new ArrayList<>(); + expected.add("one,1"); + expected.add("two,2"); + + PCollection> input = p.apply(Create.of(kvs)); + PCollection output = input.apply(ToString.kv()); + PAssert.that(output).containsInAnyOrder(expected); p.run(); } - private List toStringList(Object[] ints) { - List ll = new ArrayList<>(ints.length); - for (Object i : ints) { - ll.add(i.toString()); - } - return ll; + @Test + @Category(RunnableOnService.class) + public void testToStringKVWithDelimiter() { + ArrayList> kvs = new ArrayList<>(); + kvs.add(KV.of("one", 1)); + kvs.add(KV.of("two", 2)); + + ArrayList expected = new ArrayList<>(); + expected.add("one\t1"); + expected.add("two\t2"); + + PCollection> input = p.apply(Create.of(kvs)); + PCollection output = input.apply(ToString.kv("\t")); + PAssert.that(output).containsInAnyOrder(expected); + p.run(); + } + + @Test + @Category(RunnableOnService.class) + public void testToStringIterable() { + ArrayList> iterables = new ArrayList<>(); + iterables.add(Arrays.asList(new String[]{"one", "two", "three"})); + iterables.add(Arrays.asList(new String[]{"four", "five", "six"})); + + ArrayList expected = new ArrayList<>(); + expected.add("one,two,three"); + expected.add("four,five,six"); + + PCollection> input = p.apply(Create.of(iterables) + .withCoder(IterableCoder.of(StringUtf8Coder.of()))); + PCollection output = input.apply(ToString.iterable()); + PAssert.that(output).containsInAnyOrder(expected); + p.run(); + } + + @Test + @Category(RunnableOnService.class) + public void testToStringIterableWithDelimiter() { + ArrayList> iterables = new ArrayList<>(); + iterables.add(Arrays.asList(new String[]{"one", "two", "three"})); + iterables.add(Arrays.asList(new String[]{"four", "five", "six"})); + + ArrayList expected = new ArrayList<>(); + expected.add("one\ttwo\tthree"); + expected.add("four\tfive\tsix"); + + PCollection> input = p.apply(Create.of(iterables) + .withCoder(IterableCoder.of(StringUtf8Coder.of()))); + PCollection output = input.apply(ToString.iterable("\t")); + PAssert.that(output).containsInAnyOrder(expected); + p.run(); } } From fccfa8ae06f6a29e208557d3181d26f6e0eb4bd5 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Tue, 31 Jan 2017 21:27:17 +0100 Subject: [PATCH 3/5] Revert "Removes ReduceFnExecutor interface" This reverts commit 8989473b8e379a40b888565aadead001379c9398. --- .../apache/beam/runners/core/DoFnRunner.java | 20 +++++++++++++++++++ .../GroupAlsoByWindowViaWindowSetDoFn.java | 5 ++++- .../beam/runners/direct/ParDoEvaluator.java | 2 ++ .../spark/translation/DoFnFunction.java | 2 ++ .../spark/translation/MultiDoFnFunction.java | 2 ++ 5 files changed, 30 insertions(+), 1 deletion(-) diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java index b29adcc1c7da..66f95dbbea97 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java @@ -17,10 +17,12 @@ */ package org.apache.beam.runners.core; +import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.KV; import org.joda.time.Instant; /** @@ -49,4 +51,22 @@ public interface DoFnRunner { * additional tasks, such as flushing in-memory states. */ void finishBundle(); + + /** + * An internal interface for signaling that a {@link OldDoFn} requires late data dropping. + */ + public interface ReduceFnExecutor { + /** + * Gets this object as a {@link OldDoFn}. + * + *

Most implementors of this interface are expected to be {@link OldDoFn} instances, and will + * return themselves. + */ + OldDoFn, KV> asDoFn(); + + /** + * Returns an aggregator that tracks elements that are dropped due to being late. + */ + Aggregator getDroppedDueToLatenessAggregator(); + } } diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java index d0387cf8a7ac..ecce4fc4e4a2 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.core; +import org.apache.beam.runners.core.DoFnRunner.ReduceFnExecutor; import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachines; import org.apache.beam.sdk.transforms.Aggregator; @@ -36,7 +37,7 @@ @SystemDoFnInternal public class GroupAlsoByWindowViaWindowSetDoFn< K, InputT, OutputT, W extends BoundedWindow, RinT extends KeyedWorkItem> - extends OldDoFn> { + extends OldDoFn> implements ReduceFnExecutor { public static OldDoFn, KV> create( @@ -94,6 +95,7 @@ public void processElement(ProcessContext c) throws Exception { reduceFnRunner.persist(); } + @Override public OldDoFn, KV> asDoFn() { // Safe contravariant cast @SuppressWarnings("unchecked") @@ -102,6 +104,7 @@ public OldDoFn, KV> asDoFn() { return asFn; } + @Override public Aggregator getDroppedDueToLatenessAggregator() { return droppedDueToLateness; } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java index 48f0f8d6a415..97d53602ac39 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java @@ -73,6 +73,8 @@ public static ParDoEvaluator create( ReadyCheckingSideInputReader sideInputReader = evaluationContext.createSideInputReader(sideInputs); + // Unlike for OldDoFn, there is no ReduceFnExecutor that is a new DoFn, + // and window-exploded processing is achieved within the simple runner DoFnRunner underlying = DoFnRunners.simpleRunner( evaluationContext.getPipelineOptions(), diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java index 4fd5e510ca79..bd6cfbea051e 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java @@ -81,6 +81,8 @@ public Iterable> call( DoFnOutputManager outputManager = new DoFnOutputManager(); + // Unlike for OldDoFn, there is no ReduceFnExecutor that is a new DoFn, + // and window-exploded processing is achieved within the simple runner DoFnRunner doFnRunner = DoFnRunners.simpleRunner( runtimeContext.getPipelineOptions(), diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java index 911e6c5e1fc7..cceffc8134a7 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java @@ -88,6 +88,8 @@ public Iterable, WindowedValue>> call( DoFnOutputManager outputManager = new DoFnOutputManager(); + // Unlike for OldDoFn, there is no ReduceFnExecutor that is a new DoFn, + // and window-exploded processing is achieved within the simple runner DoFnRunner doFnRunner = DoFnRunners.simpleRunner( runtimeContext.getPipelineOptions(), From d6ddce31aad92c4c61ff4614afd37cdb25b6e6aa Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Tue, 31 Jan 2017 21:27:27 +0100 Subject: [PATCH 4/5] Revert "Removes some OldDoFn code from DoFnRunners" This reverts commit 2b26ec8934725a600954ced9c4063766a582396a. --- .../operators/ApexParDoOperator.java | 2 +- .../apache/beam/runners/core/DoFnRunners.java | 137 +++++++++++++++++- .../beam/runners/direct/ParDoEvaluator.java | 9 +- .../wrappers/streaming/DoFnOperator.java | 2 +- .../beam/runners/dataflow/util/DoFnInfo.java | 62 ++++---- .../spark/translation/DoFnFunction.java | 11 +- .../spark/translation/MultiDoFnFunction.java | 9 +- .../sdk/transforms/reflect/DoFnInvokers.java | 17 ++- 8 files changed, 194 insertions(+), 55 deletions(-) diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java index 808001e5825c..f47667e9f6ce 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java @@ -305,7 +305,7 @@ public void setup(OperatorContext context) { sideOutputPortMapping.put(sideOutputTags.get(i), port); } - DoFnRunner doFnRunner = DoFnRunners.simpleRunner( + DoFnRunner doFnRunner = DoFnRunners.createDefault( pipelineOptions.get(), doFn, sideInputReader, diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java index f3972aea545d..ae8467a26080 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java @@ -18,7 +18,9 @@ package org.apache.beam.runners.core; import java.util.List; +import org.apache.beam.runners.core.DoFnRunner.ReduceFnExecutor; import org.apache.beam.runners.core.ExecutionContext.StepContext; +import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.DoFn; @@ -50,7 +52,7 @@ public interface OutputManager { * compressed {@link WindowedValue}. It is the responsibility of the runner to perform any key * partitioning needed, etc. */ - public static DoFnRunner simpleRunner( + static DoFnRunner simpleRunner( PipelineOptions options, DoFn fn, SideInputReader sideInputReader, @@ -116,4 +118,137 @@ DoFnRunner, KV> lateDataDroppingRunner( stepContext.timerInternals(), droppedDueToLatenessAggregator); } + + /** + * Creates a {@link DoFnRunner} for the provided {@link DoFn}. + */ + public static DoFnRunner createDefault( + PipelineOptions options, + DoFn doFn, + SideInputReader sideInputReader, + OutputManager outputManager, + TupleTag mainOutputTag, + List> sideOutputTags, + StepContext stepContext, + AggregatorFactory aggregatorFactory, + WindowingStrategy windowingStrategy) { + + // Unlike for OldDoFn, there is no ReduceFnExecutor that is a new DoFn, + // and window-exploded processing is achieved within the simple runner + return simpleRunner( + options, + doFn, + sideInputReader, + outputManager, + mainOutputTag, + sideOutputTags, + stepContext, + aggregatorFactory, + windowingStrategy); + } + + /** + * Creates a {@link DoFnRunner} for the provided {@link OldDoFn}. + * + *

In particular, if the {@link OldDoFn} is a {@link ReduceFnExecutor}, a specialized + * implementation detail of streaming {@link GroupAlsoByWindow}, then it will create a special + * runner that operates on {@link KeyedWorkItem KeyedWorkItems}, drops late data and counts + * dropped elements. + * + * @deprecated please port uses of {@link OldDoFn} to use {@link DoFn} + */ + @Deprecated + public static DoFnRunner createDefault( + PipelineOptions options, + OldDoFn doFn, + SideInputReader sideInputReader, + OutputManager outputManager, + TupleTag mainOutputTag, + List> sideOutputTags, + StepContext stepContext, + AggregatorFactory aggregatorFactory, + WindowingStrategy windowingStrategy) { + + DoFnRunner doFnRunner = simpleRunner( + options, + doFn, + sideInputReader, + outputManager, + mainOutputTag, + sideOutputTags, + stepContext, + aggregatorFactory, + windowingStrategy); + + if (!(doFn instanceof ReduceFnExecutor)) { + return doFnRunner; + } else { + // When a DoFn is a ReduceFnExecutor, we know it has to have an aggregator for dropped + // elements and we also learn that for some K and V, + // InputT = KeyedWorkItem + // OutputT = KV + + Aggregator droppedDueToLatenessAggregator = + ((ReduceFnExecutor) doFn).getDroppedDueToLatenessAggregator(); + + @SuppressWarnings({"unchecked", "cast", "rawtypes"}) + DoFnRunner runner = (DoFnRunner) lateDataDroppingRunner( + (DoFnRunner) doFnRunner, + stepContext, + (WindowingStrategy) windowingStrategy, + droppedDueToLatenessAggregator); + + return runner; + } + } + + /** + * Creates the right kind of {@link DoFnRunner} for an object that can be either a {@link DoFn} or + * {@link OldDoFn}. This can be used so that the client need not explicitly reference either such + * class, but merely deserialize a payload and pass it to this method. + * + * @deprecated for migration purposes only for services where users may still submit either {@link + * OldDoFn} or {@link DoFn}. If you know that you have a {@link DoFn} then you should use the + * variant for that instead. + */ + @Deprecated + public static DoFnRunner createDefault( + PipelineOptions options, + Object deserializedFn, + SideInputReader sideInputReader, + OutputManager outputManager, + TupleTag mainOutputTag, + List> sideOutputTags, + StepContext stepContext, + AggregatorFactory aggregatorFactory, + WindowingStrategy windowingStrategy) { + if (deserializedFn instanceof DoFn) { + return createDefault( + options, + (DoFn) deserializedFn, + sideInputReader, + outputManager, + mainOutputTag, + sideOutputTags, + stepContext, + aggregatorFactory, + windowingStrategy); + } else if (deserializedFn instanceof OldDoFn) { + return createDefault( + options, + (OldDoFn) deserializedFn, + sideInputReader, + outputManager, + mainOutputTag, + sideOutputTags, + stepContext, + aggregatorFactory, + windowingStrategy); + } else { + throw new IllegalArgumentException(String.format("Cannot create %s for %s of class %s", + DoFnRunner.class.getSimpleName(), + deserializedFn, + deserializedFn.getClass())); + } + } } diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java index 97d53602ac39..e1464707a05f 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.direct; import com.google.common.collect.ImmutableList; +import java.io.Serializable; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -29,7 +30,6 @@ import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; import org.apache.beam.sdk.util.TimerInternals.TimerData; @@ -47,7 +47,7 @@ public static ParDoEvaluator create( DirectStepContext stepContext, AppliedPTransform application, WindowingStrategy windowingStrategy, - DoFn fn, + Serializable fn, // may be OldDoFn or DoFn StructuralKey key, List> sideInputs, TupleTag mainOutputTag, @@ -72,11 +72,8 @@ public static ParDoEvaluator create( ReadyCheckingSideInputReader sideInputReader = evaluationContext.createSideInputReader(sideInputs); - - // Unlike for OldDoFn, there is no ReduceFnExecutor that is a new DoFn, - // and window-exploded processing is achieved within the simple runner DoFnRunner underlying = - DoFnRunners.simpleRunner( + DoFnRunners.createDefault( evaluationContext.getPipelineOptions(), fn, sideInputReader, diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index ac85b3c090da..2a8f5b7bdf6c 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -244,7 +244,7 @@ public Aggregator createAggregatorFor sideInputReader = sideInputHandler; } - DoFnRunner doFnRunner = DoFnRunners.simpleRunner( + DoFnRunner doFnRunner = DoFnRunners.createDefault( serializedOptions.getPipelineOptions(), oldDoFn, sideInputReader, diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java index 4d80a39cb3a0..1541d80fd062 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java @@ -17,6 +17,8 @@ */ package org.apache.beam.runners.dataflow.util; +import static com.google.common.base.Preconditions.checkState; + import java.io.Serializable; import java.util.Map; import org.apache.beam.sdk.coders.Coder; @@ -26,13 +28,14 @@ import org.apache.beam.sdk.values.TupleTag; /** - * Wrapper class holding the necessary information to serialize a {@link DoFn}. + * Wrapper class holding the necessary information to serialize a {@link OldDoFn} + * or {@link DoFn}. * - * @param the type of the (main) input elements of the {@link DoFn} - * @param the type of the (main) output elements of the {@link DoFn} + * @param the type of the (main) input elements of the {@link OldDoFn} + * @param the type of the (main) output elements of the {@link OldDoFn} */ public class DoFnInfo implements Serializable { - private final DoFn doFn; + private final Serializable doFn; private final WindowingStrategy windowingStrategy; private final Iterable> sideInputViews; private final Coder inputCoder; @@ -42,20 +45,6 @@ public class DoFnInfo implements Serializable { /** * Creates a {@link DoFnInfo} for the given {@link DoFn}. */ - public static DoFnInfo forFn( - DoFn doFn, - WindowingStrategy windowingStrategy, - Iterable> sideInputViews, - Coder inputCoder, - long mainOutput, - Map> outputMap) { - return new DoFnInfo<>( - doFn, windowingStrategy, sideInputViews, inputCoder, mainOutput, outputMap); - } - - /** TODO: remove this when Dataflow worker uses the DoFn overload. */ - @Deprecated - @SuppressWarnings("unchecked") public static DoFnInfo forFn( Serializable doFn, WindowingStrategy windowingStrategy, @@ -63,17 +52,11 @@ public static DoFnInfo forFn( Coder inputCoder, long mainOutput, Map> outputMap) { - return forFn( - (DoFn) doFn, - windowingStrategy, - sideInputViews, - inputCoder, - mainOutput, - outputMap); + return new DoFnInfo(doFn, windowingStrategy, sideInputViews, inputCoder, mainOutput, outputMap); } private DoFnInfo( - DoFn doFn, + Serializable doFn, WindowingStrategy windowingStrategy, Iterable> sideInputViews, Coder inputCoder, @@ -87,15 +70,34 @@ private DoFnInfo( this.outputMap = outputMap; } - /** TODO: remove this when Dataflow worker uses {@link #getDoFn}. */ + /** + * @deprecated use {@link #forFn}. + */ @Deprecated + public DoFnInfo( + OldDoFn doFn, + WindowingStrategy windowingStrategy, + Iterable> sideInputViews, + Coder inputCoder, + long mainOutput, + Map> outputMap) { + this((Serializable) doFn, windowingStrategy, sideInputViews, inputCoder, mainOutput, outputMap); + } + + /** Returns the embedded serialized function. It may be a {@code DoFn} or {@code OldDoFn}. */ public Serializable getFn() { return doFn; } - /** Returns the embedded function. */ - public DoFn getDoFn() { - return doFn; + /** @deprecated use {@link #getFn()} */ + @Deprecated + public OldDoFn getDoFn() { + checkState( + doFn instanceof OldDoFn, + "Deprecated %s.getDoFn() called when the payload was actually a new %s", + DoFnInfo.class.getSimpleName(), + DoFn.class.getSimpleName()); + return (OldDoFn) doFn; } public WindowingStrategy getWindowingStrategy() { diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java index bd6cfbea051e..af8e0897bd36 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/DoFnFunction.java @@ -80,21 +80,18 @@ public Iterable> call( Iterator> iter) throws Exception { DoFnOutputManager outputManager = new DoFnOutputManager(); - - // Unlike for OldDoFn, there is no ReduceFnExecutor that is a new DoFn, - // and window-exploded processing is achieved within the simple runner DoFnRunner doFnRunner = - DoFnRunners.simpleRunner( + DoFnRunners.createDefault( runtimeContext.getPipelineOptions(), doFn, new SparkSideInputReader(sideInputs), outputManager, - new TupleTag() { - }, + new TupleTag() {}, Collections.>emptyList(), new SparkProcessContext.NoOpStepContext(), new SparkAggregators.Factory(runtimeContext, accumulator), - windowingStrategy); + windowingStrategy + ); return new SparkProcessContext<>(doFn, doFnRunner, outputManager).processPartition(iter); } diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java index cceffc8134a7..0f9417a1fadc 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/MultiDoFnFunction.java @@ -38,6 +38,7 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.spark.Accumulator; import org.apache.spark.api.java.function.PairFlatMapFunction; + import scala.Tuple2; @@ -87,11 +88,8 @@ public Iterable, WindowedValue>> call( Iterator> iter) throws Exception { DoFnOutputManager outputManager = new DoFnOutputManager(); - - // Unlike for OldDoFn, there is no ReduceFnExecutor that is a new DoFn, - // and window-exploded processing is achieved within the simple runner DoFnRunner doFnRunner = - DoFnRunners.simpleRunner( + DoFnRunners.createDefault( runtimeContext.getPipelineOptions(), doFn, new SparkSideInputReader(sideInputs), @@ -100,7 +98,8 @@ public Iterable, WindowedValue>> call( Collections.>emptyList(), new SparkProcessContext.NoOpStepContext(), new SparkAggregators.Factory(runtimeContext, accumulator), - windowingStrategy); + windowingStrategy + ); return new SparkProcessContext<>(doFn, doFnRunner, outputManager).processPartition(iter); } diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java index 33c5a6ab2d1f..b141d51150ba 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvokers.java @@ -36,12 +36,21 @@ public static DoFnInvoker invokerFor( return ByteBuddyDoFnInvokerFactory.only().newByteBuddyInvoker(fn); } - /** TODO: remove this when Dataflow worker uses the DoFn overload. */ + /** + * Temporarily retained for compatibility with Dataflow worker. + * TODO: delete this when Dataflow worker is fixed to call {@link #invokerFor(DoFn)}. + * + * @deprecated Use {@link #invokerFor(DoFn)}. + */ + @SuppressWarnings("unchecked") @Deprecated - @SuppressWarnings({"unchecked"}) public static DoFnInvoker invokerFor( - Serializable fn) { - return invokerFor((DoFn) fn); + Serializable deserializedFn) { + if (deserializedFn instanceof DoFn) { + return invokerFor((DoFn) deserializedFn); + } + throw new UnsupportedOperationException( + "Only DoFn supported, was: " + deserializedFn.getClass()); } private DoFnInvokers() {} From 27ad60323974eb4cd889e4586ab21697f6eff172 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Tue, 31 Jan 2017 21:34:07 +0100 Subject: [PATCH 5/5] Revert "Moves OldDoFn to runners-core" This reverts commit 5f8b8c5b06cfd49c4293a20dff2eea99f1076444. --- .../translation/WindowBoundTranslator.java | 2 +- .../operators/ApexGroupByKeyOperator.java | 2 +- .../operators/ApexParDoOperator.java | 2 +- .../beam/runners/core/AssignWindowsDoFn.java | 3 ++- .../beam/runners/core/DoFnAdapters.java | 1 + .../apache/beam/runners/core/DoFnRunner.java | 1 + .../apache/beam/runners/core/DoFnRunners.java | 1 + .../GroupAlsoByWindowViaWindowSetDoFn.java | 1 + .../runners/core/GroupAlsoByWindowsDoFn.java | 1 + .../core/LateDataDroppingDoFnRunner.java | 1 + .../runners/core/SimpleOldDoFnRunner.java | 3 ++- .../runners/core/SimpleOldDoFnRunnerTest.java | 2 +- .../flink/OldPerKeyCombineFnRunner.java | 2 +- .../flink/OldPerKeyCombineFnRunners.java | 2 +- .../functions/FlinkDoFnFunction.java | 2 +- .../FlinkMergingNonShuffleReduceFunction.java | 2 +- .../FlinkMultiOutputDoFnFunction.java | 2 +- .../FlinkMultiOutputProcessContext.java | 2 +- .../FlinkNoElementAssignContext.java | 2 +- .../functions/FlinkPartialReduceFunction.java | 2 +- .../functions/FlinkProcessContextBase.java | 2 +- .../functions/FlinkReduceFunction.java | 2 +- .../FlinkSingleOutputProcessContext.java | 2 +- .../wrappers/streaming/DoFnOperator.java | 2 +- .../streaming/WindowDoFnOperator.java | 2 +- .../beam/runners/dataflow/util/DoFnInfo.java | 4 +++- .../sdk/transforms/DelegatingAggregator.java | 2 +- .../apache/beam/sdk/transforms}/OldDoFn.java | 9 +-------- .../org/apache/beam/sdk/util/NameUtils.java | 2 +- .../DoFnDelegatingAggregatorTest.java | 4 +--- .../beam/sdk/transforms}/NoOpOldDoFn.java | 3 +-- .../sdk/transforms}/OldDoFnContextTest.java | 5 +---- .../beam/sdk/transforms}/OldDoFnTest.java | 7 +------ .../apache/beam/sdk/util/NameUtilsTest.java | 20 +++++++++++-------- 34 files changed, 49 insertions(+), 53 deletions(-) rename {runners/core-java/src/main/java/org/apache/beam/runners/core => sdks/java/core/src/main/java/org/apache/beam/sdk/transforms}/OldDoFn.java (97%) rename {runners/core-java/src/test/java/org/apache/beam/runners/core => sdks/java/core/src/test/java/org/apache/beam/sdk/transforms}/DoFnDelegatingAggregatorTest.java (96%) rename {runners/core-java/src/test/java/org/apache/beam/runners/core => sdks/java/core/src/test/java/org/apache/beam/sdk/transforms}/NoOpOldDoFn.java (96%) rename {runners/core-java/src/test/java/org/apache/beam/runners/core => sdks/java/core/src/test/java/org/apache/beam/sdk/transforms}/OldDoFnContextTest.java (92%) rename {runners/core-java/src/test/java/org/apache/beam/runners/core => sdks/java/core/src/test/java/org/apache/beam/sdk/transforms}/OldDoFnTest.java (95%) diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java index a241cad670d1..190d3b6b0c21 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/WindowBoundTranslator.java @@ -23,8 +23,8 @@ import org.apache.beam.runners.apex.translation.operators.ApexParDoOperator; import org.apache.beam.runners.core.AssignWindowsDoFn; import org.apache.beam.runners.core.DoFnAdapters; -import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.util.WindowedValue; diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java index 4c2b46121908..173434f66c33 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexGroupByKeyOperator.java @@ -44,7 +44,6 @@ import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItems; -import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.CoderException; @@ -53,6 +52,7 @@ import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.GroupByKey; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.CoderUtils; diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java index f47667e9f6ce..1a3387cb76a9 100644 --- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java +++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java @@ -43,7 +43,6 @@ import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.DoFnRunners.OutputManager; import org.apache.beam.runners.core.ExecutionContext; -import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; import org.apache.beam.runners.core.SideInputHandler; import org.apache.beam.sdk.coders.Coder; @@ -51,6 +50,7 @@ import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.util.NullSideInputReader; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.UserCodeException; diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java index bbf3574616bd..0eb1667c3a1c 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/AssignWindowsDoFn.java @@ -21,7 +21,8 @@ import com.google.common.collect.Iterables; import java.util.Collection; -import org.apache.beam.runners.core.OldDoFn.RequiresWindowAccess; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.WindowFn; diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java index 23aba5895b90..0f5624f56166 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnAdapters.java @@ -26,6 +26,7 @@ import org.apache.beam.sdk.transforms.DoFn.Context; import org.apache.beam.sdk.transforms.DoFn.OnTimerContext; import org.apache.beam.sdk.transforms.DoFn.ProcessContext; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java index 66f95dbbea97..7c73a3491556 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunner.java @@ -19,6 +19,7 @@ import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowedValue; diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java index ae8467a26080..820bfcd3076e 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java @@ -24,6 +24,7 @@ import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java index ecce4fc4e4a2..d79683ada1fe 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowViaWindowSetDoFn.java @@ -21,6 +21,7 @@ import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; import org.apache.beam.runners.core.triggers.TriggerStateMachines; import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.SystemDoFnInternal; diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java index 7e96136bd715..9a2f8fd02037 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/GroupAlsoByWindowsDoFn.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.core; import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.SystemDoFnInternal; diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java index 9436ccffbeae..290171ad2280 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataDroppingDoFnRunner.java @@ -22,6 +22,7 @@ import com.google.common.base.Predicate; import com.google.common.collect.Iterables; import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.TimerInternals; diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java index 2fe9226593e0..9808e5683069 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java @@ -27,10 +27,11 @@ import java.util.Set; import org.apache.beam.runners.core.DoFnRunners.OutputManager; import org.apache.beam.runners.core.ExecutionContext.StepContext; -import org.apache.beam.runners.core.OldDoFn.RequiresWindowAccess; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine.CombineFn; +import org.apache.beam.sdk.transforms.OldDoFn; +import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java index 97da9ee1c98e..4610069aa64d 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java @@ -23,7 +23,7 @@ import java.util.Arrays; import java.util.List; import org.apache.beam.runners.core.BaseExecutionContext.StepContext; - +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.util.SystemDoFnInternal; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java index 71c3aa429ae3..5d676dce55de 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunner.java @@ -18,8 +18,8 @@ package org.apache.beam.runners.flink; import java.io.Serializable; -import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn; +import org.apache.beam.sdk.transforms.OldDoFn; /** * An interface that runs a {@link PerKeyCombineFn} with unified APIs using diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java index 90894f239d44..8ebeadf7d775 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/OldPerKeyCombineFnRunners.java @@ -17,13 +17,13 @@ */ package org.apache.beam.runners.flink; -import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.runners.core.PerKeyCombineFnRunner; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; import org.apache.beam.sdk.transforms.CombineFnBase.PerKeyCombineFn; import org.apache.beam.sdk.transforms.CombineWithContext; import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithContext; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.values.PCollectionView; /** diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java index 8b2bcc6baacf..2a4a68e7b5bb 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java @@ -19,10 +19,10 @@ import java.util.Map; import org.apache.beam.runners.core.DoFnAdapters; -import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java index 5ec6a773c8e0..1b4317280962 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java @@ -24,12 +24,12 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner; import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java index aeeabbf59ecb..a97bd46fc77b 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java @@ -19,10 +19,10 @@ import java.util.Map; import org.apache.beam.runners.core.DoFnAdapters; -import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.util.WindowedValue; diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java index 7882b5f7a53e..a3d2b1849dd4 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputProcessContext.java @@ -19,8 +19,8 @@ import java.util.Collection; import java.util.Map; -import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java index ad7255b27902..c89027262cd9 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkNoElementAssignContext.java @@ -17,7 +17,7 @@ */ package org.apache.beam.runners.flink.translation.functions; -import org.apache.beam.runners.core.OldDoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.joda.time.Instant; diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java index 7db30d113293..f5a908799ce3 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java @@ -24,12 +24,12 @@ import java.util.Comparator; import java.util.Iterator; import java.util.Map; -import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner; import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.transforms.windowing.PaneInfo; diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java index e955679ff8fb..53b98038adb9 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkProcessContextBase.java @@ -24,11 +24,11 @@ import java.util.Collections; import java.util.Iterator; import java.util.Map; -import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.TimerInternals; diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java index 81e37f48c16d..a3fa0d41cc27 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java @@ -26,12 +26,12 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.runners.flink.OldPerKeyCombineFnRunner; import org.apache.beam.runners.flink.OldPerKeyCombineFnRunners; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.transforms.windowing.PaneInfo; diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java index 0db7f5a0764c..529b1cc9e528 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkSingleOutputProcessContext.java @@ -19,8 +19,8 @@ import java.util.Collection; import java.util.Map; -import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 2a8f5b7bdf6c..95f2bfdfc696 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -33,7 +33,6 @@ import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.ExecutionContext; -import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; import org.apache.beam.runners.core.SideInputHandler; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; @@ -44,6 +43,7 @@ import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.join.RawUnionValue; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.CoderUtils; diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java index d4273b228d02..cd6b5aa98270 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java @@ -42,12 +42,12 @@ import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.KeyedWorkItems; -import org.apache.beam.runners.core.OldDoFn; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.flink.translation.wrappers.DataInputViewWrapper; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.TimerInternals; diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java index 1541d80fd062..b84def8dbcea 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/util/DoFnInfo.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; @@ -43,7 +44,8 @@ public class DoFnInfo implements Serializable { private final Map> outputMap; /** - * Creates a {@link DoFnInfo} for the given {@link DoFn}. + * Creates a {@link DoFnInfo} for the given {@link Serializable} object, which is expected to be a + * {@link DoFn} or {@link OldDoFn} or other context-appropriate UDF blob. */ public static DoFnInfo forFn( Serializable doFn, diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java index cfaf0a64bf15..e03d3b15775c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DelegatingAggregator.java @@ -35,7 +35,7 @@ * @param the type of input element * @param the type of output element */ -public class DelegatingAggregator +class DelegatingAggregator implements Aggregator, Serializable { private static final AtomicInteger ID_GEN = new AtomicInteger(); private final int id; diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java similarity index 97% rename from runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java rename to sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java index b099721de8a1..7b0453302a39 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.core; +package org.apache.beam.sdk.transforms; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; @@ -30,14 +30,7 @@ import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.PipelineRunner; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.DelegatingAggregator; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.DoFnTester; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java index 72179a35c12d..1c59af74a581 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/NameUtils.java @@ -40,7 +40,7 @@ public interface NameOverride { } private static final String[] STANDARD_NAME_SUFFIXES = - new String[]{"DoFn", "CombineFn", "Fn"}; + new String[]{"OldDoFn", "DoFn", "CombineFn", "Fn"}; /** * Pattern to match a non-anonymous inner class. diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/DoFnDelegatingAggregatorTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java similarity index 96% rename from runners/core-java/src/test/java/org/apache/beam/runners/core/DoFnDelegatingAggregatorTest.java rename to sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java index b44e8a42f888..f51a6b0b1539 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/DoFnDelegatingAggregatorTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnDelegatingAggregatorTest.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.core; +package org.apache.beam.sdk.transforms; import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.anyLong; @@ -23,9 +23,7 @@ import static org.mockito.Mockito.never; import static org.mockito.Mockito.verify; -import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.DelegatingAggregator; import org.junit.Before; import org.junit.Rule; import org.junit.Test; diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java similarity index 96% rename from runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java rename to sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java index 5cbea8c5e1cf..0db130db59dd 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/NoOpOldDoFn.java @@ -15,10 +15,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.core; +package org.apache.beam.sdk.transforms; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.values.TupleTag; import org.joda.time.Instant; diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnContextTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnContextTest.java similarity index 92% rename from runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnContextTest.java rename to sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnContextTest.java index a1cd49deb3bc..b5cb286d416d 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnContextTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnContextTest.java @@ -15,15 +15,12 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.core; +package org.apache.beam.sdk.transforms; import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Combine; -import org.apache.beam.sdk.transforms.Sum; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java similarity index 95% rename from runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java rename to sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java index 651bc7257d13..1c767b17c9aa 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/OldDoFnTest.java @@ -15,7 +15,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.beam.runners.core; +package org.apache.beam.sdk.transforms; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.isA; @@ -25,12 +25,7 @@ import java.io.Serializable; import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.Max; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.values.TupleTag; import org.joda.time.Instant; diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java index 6848ea440ad4..b81aa366c43b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java @@ -42,18 +42,22 @@ public class NameUtilsTest { @Test public void testDropsStandardSuffixes() { + assertEquals("Embedded", NameUtils.approximateSimpleName("EmbeddedOldDoFn", true)); assertEquals("Embedded", NameUtils.approximateSimpleName("EmbeddedDoFn", true)); assertEquals("Embedded", NameUtils.approximateSimpleName("EmbeddedFn", true)); + assertEquals("Embedded", NameUtils.approximateSimpleName("EmbeddedOldDoFn", false)); assertEquals("Embedded", NameUtils.approximateSimpleName("EmbeddedDoFn", false)); assertEquals("Embedded", NameUtils.approximateSimpleName("EmbeddedFn", false)); } @Test public void testDropsStandardSuffixesInAllComponents() { + assertEquals("Embedded", NameUtils.approximateSimpleName("SomeOldDoFn$EmbeddedFn", true)); assertEquals("Embedded", NameUtils.approximateSimpleName("SomeDoFn$EmbeddedDoFn", true)); assertEquals("Embedded", NameUtils.approximateSimpleName("SomeFn$EmbeddedFn", true)); + assertEquals("Some.Embedded", NameUtils.approximateSimpleName("SomeOldDoFn$EmbeddedFn", false)); assertEquals("Some.Embedded", NameUtils.approximateSimpleName("SomeDoFn$EmbeddedDoFn", false)); assertEquals("Some.Embedded", NameUtils.approximateSimpleName("SomeFn$EmbeddedFn", false)); } @@ -75,12 +79,12 @@ public void testDropsOuterClassNamesFalse() { /** * Inner class for simple name test. */ - private class EmbeddedDoFn { + private class EmbeddedOldDoFn { - private class DeeperEmbeddedDoFn extends EmbeddedDoFn {} + private class DeeperEmbeddedOldDoFn extends EmbeddedOldDoFn {} - private EmbeddedDoFn getEmbedded() { - return new DeeperEmbeddedDoFn(); + private EmbeddedOldDoFn getEmbedded() { + return new DeeperEmbeddedOldDoFn(); } } @@ -108,18 +112,18 @@ private interface AnonymousClass { @Test public void testSimpleName() { - assertEquals("Embedded", NameUtils.approximateSimpleName(new EmbeddedDoFn())); + assertEquals("Embedded", NameUtils.approximateSimpleName(new EmbeddedOldDoFn())); } @Test public void testAnonSimpleName() throws Exception { - assertEquals("Anonymous", NameUtils.approximateSimpleName(new EmbeddedDoFn() {})); + assertEquals("Anonymous", NameUtils.approximateSimpleName(new EmbeddedOldDoFn() {})); } @Test public void testNestedSimpleName() { - EmbeddedDoFn fn = new EmbeddedDoFn(); - EmbeddedDoFn inner = fn.getEmbedded(); + EmbeddedOldDoFn fn = new EmbeddedOldDoFn(); + EmbeddedOldDoFn inner = fn.getEmbedded(); assertEquals("DeeperEmbedded", NameUtils.approximateSimpleName(inner)); }