From 57195358592548e6f7e05bc8e4e292b126a726c5 Mon Sep 17 00:00:00 2001 From: Ben Chambers Date: Thu, 23 Jun 2016 22:27:05 -0700 Subject: [PATCH 1/2] Remove many uses of .named methods Specifically, remove uses of: - Window.named - AvroIO.named - PubSubIO.named - TextIO.named - BigQueryIO.named - Read.named --- .../beam/examples/DebuggingWordCount.java | 2 +- .../org/apache/beam/examples/WordCount.java | 4 +- .../apache/beam/examples/complete/TfIdf.java | 3 +- .../complete/TopWikipediaSessions.java | 2 +- .../examples/cookbook/DatastoreWordCount.java | 2 +- .../beam/examples/cookbook/DeDupExample.java | 5 +- .../examples/cookbook/TriggerExample.java | 4 +- .../examples/complete/game/GameStats.java | 28 +-- .../complete/game/HourlyTeamScore.java | 5 +- .../examples/complete/game/LeaderBoard.java | 8 +- .../beam/runners/flink/examples/TFIDF.java | 3 +- .../runners/flink/examples/WordCount.java | 4 +- .../examples/streaming/AutoComplete.java | 9 +- .../examples/streaming/JoinExamples.java | 13 +- .../KafkaWindowedWordCountExample.java | 2 +- .../examples/streaming/WindowedWordCount.java | 3 +- .../beam/runners/dataflow/DataflowRunner.java | 2 +- .../DataflowPipelineTranslatorTest.java | 6 +- .../runners/dataflow/DataflowRunnerTest.java | 18 +- .../runners/spark/SimpleWordCountTest.java | 3 +- .../org/apache/beam/sdk/io/BigQueryIO.java | 1 - .../beam/sdk/io/AvroIOGeneratedClassTest.java | 192 +++++------------- .../org/apache/beam/sdk/io/AvroIOTest.java | 5 +- .../apache/beam/sdk/io/BigQueryIOTest.java | 82 +++----- .../beam/sdk/io/FileBasedSourceTest.java | 5 +- .../org/apache/beam/sdk/io/PubsubIOTest.java | 4 - .../org/apache/beam/sdk/io/TextIOTest.java | 37 +--- .../org/apache/beam/sdk/io/XmlSourceTest.java | 19 +- .../beam/sdk/runners/TransformTreeTest.java | 4 +- .../sdk/transforms/windowing/WindowTest.java | 6 +- .../transforms/windowing/WindowingTest.java | 2 +- .../src/main/java/DebuggingWordCount.java | 2 +- .../src/main/java/WordCount.java | 4 +- 33 files changed, 158 insertions(+), 331 deletions(-) diff --git a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java index 85823c2005cb1..8d85d44baefb7 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java @@ -173,7 +173,7 @@ public static void main(String[] args) { Pipeline p = Pipeline.create(options); PCollection> filteredWords = - p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile())) + p.apply("ReadLines", TextIO.Read.from(options.getInputFile())) .apply(new WordCount.CountWords()) .apply(ParDo.of(new FilterTextFn(options.getFilterPattern()))); diff --git a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java index cf6c45aac4a0e..af16c445a7ac5 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java @@ -205,10 +205,10 @@ public static void main(String[] args) { // Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the // static FormatAsTextFn() to the ParDo transform. - p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile())) + p.apply("ReadLines", TextIO.Read.from(options.getInputFile())) .apply(new CountWords()) .apply(MapElements.via(new FormatAsTextFn())) - .apply(TextIO.Write.named("WriteCounts").to(options.getOutput())); + .apply("WriteCounts", TextIO.Write.to(options.getOutput())); p.run(); } diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java index 23653d6aab643..83053142737eb 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java @@ -187,8 +187,7 @@ public PCollection> apply(PInput input) { } PCollection> oneUriToLines = pipeline - .apply(TextIO.Read.from(uriString) - .named("TextIO.Read(" + uriString + ")")) + .apply("TextIO.Read(" + uriString + ")", TextIO.Read.from(uriString)) .apply("WithKeys(" + uriString + ")", WithKeys.of(uri)); urisToLines = urisToLines.and(oneUriToLines); diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java index 5d95e3f4e6edb..80b3ade94fe6b 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java +++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java @@ -217,7 +217,7 @@ public static void main(String[] args) { .from(options.getInput()) .withCoder(TableRowJsonCoder.of())) .apply(new ComputeTopSessions(samplingThreshold)) - .apply(TextIO.Write.named("Write").withoutSharding().to(options.getOutput())); + .apply("Write", TextIO.Write.withoutSharding().to(options.getOutput())); p.run(); } diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java index 7578d7955bc4a..b070f9409d350 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java @@ -193,7 +193,7 @@ public static interface Options extends PipelineOptions { */ public static void writeDataToDatastore(Options options) { Pipeline p = Pipeline.create(options); - p.apply(TextIO.Read.named("ReadLines").from(options.getInput())) + p.apply("ReadLines", TextIO.Read.from(options.getInput())) .apply(ParDo.of(new CreateEntityFn(options.getNamespace(), options.getKind()))) .apply(DatastoreIO.writeTo(options.getProject())); diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java index db65543101bf4..d573bcd9d295e 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java @@ -89,10 +89,9 @@ public static void main(String[] args) Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); Pipeline p = Pipeline.create(options); - p.apply(TextIO.Read.named("ReadLines").from(options.getInput())) + p.apply("ReadLines", TextIO.Read.from(options.getInput())) .apply(RemoveDuplicates.create()) - .apply(TextIO.Write.named("DedupedShakespeare") - .to(options.getOutput())); + .apply("DedupedShakespeare", TextIO.Write.to(options.getOutput())); p.run(); } diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java index ab1fb6620954c..ff4909b610cce 100644 --- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java +++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java @@ -467,7 +467,7 @@ public static void main(String[] args) throws Exception { TableReference tableRef = getTableReference(options.getProject(), options.getBigQueryDataset(), options.getBigQueryTable()); - PCollectionList resultList = pipeline.apply(PubsubIO.Read.named("ReadPubsubInput") + PCollectionList resultList = pipeline.apply("ReadPubsubInput", PubsubIO.Read .timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY) .topic(options.getPubsubTopic())) .apply(ParDo.of(new ExtractFlowInfo())) @@ -493,7 +493,7 @@ private static Pipeline runInjector(TrafficFlowOptions options){ copiedOptions.setJobName(options.getJobName() + "-injector"); Pipeline injectorPipeline = Pipeline.create(copiedOptions); injectorPipeline - .apply(TextIO.Read.named("ReadMyFile").from(options.getInput())) + .apply("ReadMyFile", TextIO.Read.from(options.getInput())) .apply("InsertRandomDelays", ParDo.of(new InsertDelays())) .apply(IntraBundleParallelization.of(PubsubFileInjector .withTimestampLabelKey(PUBSUB_TIMESTAMP_LABEL_KEY) diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java index ad8b49e0ab58f..b1cb312f58a17 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java @@ -260,10 +260,8 @@ public static void main(String[] args) throws Exception { // Calculate the total score per user over fixed windows, and // cumulative updates for late data. final PCollectionView> spammersView = userEvents - .apply(Window.named("FixedWindowsUser") - .>into(FixedWindows.of( - Duration.standardMinutes(options.getFixedWindowDuration()))) - ) + .apply("FixedWindowsUser", Window.>into( + FixedWindows.of(Duration.standardMinutes(options.getFixedWindowDuration())))) // Filter out everyone but those with (SCORE_WEIGHT * avg) clickrate. // These might be robots/spammers. @@ -278,10 +276,8 @@ public static void main(String[] args) throws Exception { // suspected robots-- to filter out scores from those users from the sum. // Write the results to BigQuery. rawEvents - .apply(Window.named("WindowIntoFixedWindows") - .into(FixedWindows.of( - Duration.standardMinutes(options.getFixedWindowDuration()))) - ) + .apply("WindowIntoFixedWindows", Window.into( + FixedWindows.of(Duration.standardMinutes(options.getFixedWindowDuration())))) // Filter out the detected spammer users, using the side input derived above. .apply("FilterOutSpammers", ParDo .withSideInputs(spammersView) @@ -299,8 +295,8 @@ public void processElement(ProcessContext c) { // [END DocInclude_FilterAndCalc] // Write the result to BigQuery .apply("WriteTeamSums", - new WriteWindowedToBigQuery>( - options.getTablePrefix() + "_team", configureWindowedWrite())); + new WriteWindowedToBigQuery>( + options.getTablePrefix() + "_team", configureWindowedWrite())); // [START DocInclude_SessionCalc] @@ -309,10 +305,9 @@ public void processElement(ProcessContext c) { // This information could help the game designers track the changing user engagement // as their set of games changes. userEvents - .apply(Window.named("WindowIntoSessions") - .>into( - Sessions.withGapDuration(Duration.standardMinutes(options.getSessionGap()))) - .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow())) + .apply("WindowIntoSessions", Window.>into( + Sessions.withGapDuration(Duration.standardMinutes(options.getSessionGap()))) + .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow())) // For this use, we care only about the existence of the session, not any particular // information aggregated over it, so the following is an efficient way to do that. .apply(Combine.perKey(x -> 0)) @@ -321,9 +316,8 @@ public void processElement(ProcessContext c) { // [END DocInclude_SessionCalc] // [START DocInclude_Rewindow] // Re-window to process groups of session sums according to when the sessions complete. - .apply(Window.named("WindowToExtractSessionMean") - .into( - FixedWindows.of(Duration.standardMinutes(options.getUserActivityWindowDuration())))) + .apply("WindowToExtractSessionMean", Window.into( + FixedWindows.of(Duration.standardMinutes(options.getUserActivityWindowDuration())))) // Find the mean session duration in each window. .apply(Mean.globally().withoutDefaults()) // Write this info to a BigQuery table. diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java index 7a808acd0fcb5..e489607dee4d6 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java @@ -178,9 +178,8 @@ public static void main(String[] args) throws Exception { // Add an element timestamp based on the event log, and apply fixed windowing. .apply("AddEventTimestamps", WithTimestamps.of((GameActionInfo i) -> new Instant(i.getTimestamp()))) - .apply(Window.named("FixedWindowsTeam") - .into(FixedWindows.of( - Duration.standardMinutes(options.getWindowDuration())))) + .apply("FixedWindowsTeam", Window.into( + FixedWindows.of(Duration.standardMinutes(options.getWindowDuration())))) // [END DocInclude_HTSAddTsAndWindow] // Extract and sum teamname/score pairs from the event data. diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java index 2c608aa094835..a14d533413373 100644 --- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java +++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java @@ -190,9 +190,8 @@ public static void main(String[] args) throws Exception { // [START DocInclude_WindowAndTrigger] // Extract team/score pairs from the event stream, using hour-long windows by default. gameEvents - .apply(Window.named("LeaderboardTeamFixedWindows") - .into(FixedWindows.of( - Duration.standardMinutes(options.getTeamWindowDuration()))) + .apply("LeaderboardTeamFixedWindows", Window.into( + FixedWindows.of(Duration.standardMinutes(options.getTeamWindowDuration()))) // We will get early (speculative) results as well as cumulative // processing of late data. .triggering( @@ -215,8 +214,7 @@ public static void main(String[] args) throws Exception { // Extract user/score pairs from the event stream using processing time, via global windowing. // Get periodic updates on all users' running scores. gameEvents - .apply(Window.named("LeaderboardUserGlobalWindow") - .into(new GlobalWindows()) + .apply("LeaderboardUserGlobalWindow", Window.into(new GlobalWindows()) // Get periodic results every ten minutes. .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane() .plusDelayOf(TEN_MINUTES))) diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java index 084ac7c16c7a4..56737a4c8e1ae 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java @@ -191,8 +191,7 @@ public PCollection> apply(PInput input) { } PCollection> oneUriToLines = pipeline - .apply(TextIO.Read.from(uriString) - .named("TextIO.Read(" + uriString + ")")) + .apply("TextIO.Read(" + uriString + ")", TextIO.Read.from(uriString)) .apply("WithKeys(" + uriString + ")", WithKeys.of(uri)); urisToLines = urisToLines.and(oneUriToLines); diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java index 28176228a2110..2d95c978b3ffb 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java @@ -109,10 +109,10 @@ public static void main(String[] args) { Pipeline p = Pipeline.create(options); - p.apply(TextIO.Read.named("ReadLines").from(options.getInput())) + p.apply("ReadLines", TextIO.Read.from(options.getInput())) .apply(new CountWords()) .apply(MapElements.via(new FormatAsTextFn())) - .apply(TextIO.Write.named("WriteCounts").to(options.getOutput())); + .apply("WriteCounts", TextIO.Write.to(options.getOutput())); p.run(); } diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java index ed1178130ff65..c0ff85d6e80bb 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java @@ -44,7 +44,6 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionList; @@ -380,14 +379,14 @@ public static void main(String[] args) throws IOException { options.setExecutionRetryDelay(3000L); options.setRunner(FlinkRunner.class); - PTransform> readSource = - Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("WordStream"); - WindowFn windowFn = FixedWindows.of(Duration.standardSeconds(options.getWindowSize())); + + WindowFn windowFn = + FixedWindows.of(Duration.standardSeconds(options.getWindowSize())); // Create the pipeline. Pipeline p = Pipeline.create(options); PCollection>> toWrite = p - .apply(readSource) + .apply("WordStream", Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3))) .apply(ParDo.of(new ExtractWordsFn())) .apply(Window.into(windowFn) .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java index 0828ddc580012..f456b2730845d 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java @@ -24,7 +24,6 @@ import org.apache.beam.sdk.io.TextIO; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.join.CoGbkResult; import org.apache.beam.sdk.transforms.join.CoGroupByKey; @@ -34,7 +33,6 @@ import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PBegin; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TupleTag; @@ -135,22 +133,19 @@ public static void main(String[] args) throws Exception { options.setExecutionRetryDelay(3000L); options.setRunner(FlinkRunner.class); - PTransform> readSourceA = - Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("FirstStream"); - PTransform> readSourceB = - Read.from(new UnboundedSocketSource<>("localhost", 9998, '\n', 3)).named("SecondStream"); - WindowFn windowFn = FixedWindows.of(Duration.standardSeconds(options.getWindowSize())); Pipeline p = Pipeline.create(options); // the following two 'applys' create multiple inputs to our pipeline, one for each // of our two input sources. - PCollection streamA = p.apply(readSourceA) + PCollection streamA = p + .apply("FirstStream", Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3))) .apply(Window.into(windowFn) .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) .discardingFiredPanes()); - PCollection streamB = p.apply(readSourceB) + PCollection streamB = p + .apply("SecondStream", Read.from(new UnboundedSocketSource<>("localhost", 9998, '\n', 3))) .apply(Window.into(windowFn) .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) .discardingFiredPanes()); diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java index b14c5ae526630..4e81420506c80 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java @@ -132,7 +132,7 @@ public static void main(String[] args) { new SimpleStringSchema(), p); PCollection words = pipeline - .apply(Read.named("StreamingWordCount").from(UnboundedFlinkSource.of(kafkaConsumer))) + .apply("StreamingWordCount", Read.from(UnboundedFlinkSource.of(kafkaConsumer))) .apply(ParDo.of(new ExtractWordsFn())) .apply(Window.into(FixedWindows.of(Duration.standardSeconds(options.getWindowSize()))) .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java index f72b705a54302..1b532a70efa44 100644 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java +++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java @@ -119,7 +119,8 @@ public static void main(String[] args) throws IOException { Pipeline pipeline = Pipeline.create(options); PCollection words = pipeline - .apply(Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("StreamingWordCount")) + .apply("StreamingWordCount", + Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3))) .apply(ParDo.of(new ExtractWordsFn())) .apply(Window.into(SlidingWindows.of(Duration.standardSeconds(options.getWindowSize())) .every(Duration.standardSeconds(options.getSlide()))) diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index d47d285d13bfa..7ff247a96d585 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -2686,7 +2686,7 @@ public PCollection apply(PInput input) { try { Coder coder = transform.getDefaultOutputCoder(input); return Pipeline.applyTransform( - input, PubsubIO.Read.named("StartingSignal").subscription("_starting_signal/")) + "StartingSignal", input, PubsubIO.Read.subscription("_starting_signal/")) .apply(ParDo.of(new OutputNullKv())) .apply("GlobalSingleton", Window.>into(new GlobalWindows()) .triggering(AfterPane.elementCountAtLeast(1)) diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java index c3a6a1140e128..e04a1fcb1e294 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java @@ -127,8 +127,8 @@ private Pipeline buildPipeline(DataflowPipelineOptions options) { options.setRunner(DataflowRunner.class); Pipeline p = Pipeline.create(options); - p.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/object")) - .apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/object")); + p.apply("ReadMyFile", TextIO.Read.from("gs://bucket/object")) + .apply("WriteMyFile", TextIO.Write.to("gs://bucket/object")); return p; } @@ -458,7 +458,7 @@ public void testPredefinedAddStep() throws Exception { // Create a pipeline that the predefined step will be embedded into Pipeline pipeline = Pipeline.create(options); - pipeline.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/in")) + pipeline.apply("ReadMyFile", TextIO.Read.from("gs://bucket/in")) .apply(ParDo.of(new NoOpFn())) .apply(new EmbeddedTransform(predefinedStep.clone())) .apply(ParDo.of(new NoOpFn())); diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java index e094d0d3ed382..999dc3ad22ac8 100644 --- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java +++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.dataflow; import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow; + import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.equalTo; @@ -151,8 +152,8 @@ private Pipeline buildDataflowPipeline(DataflowPipelineOptions options) { options.setRunner(DataflowRunner.class); Pipeline p = Pipeline.create(options); - p.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/object")) - .apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/object")); + p.apply("ReadMyFile", TextIO.Read.from("gs://bucket/object")) + .apply("WriteMyFile", TextIO.Write.to("gs://bucket/object")); return p; } @@ -464,7 +465,7 @@ public void testNonGcsFilePathInReadFailure() throws IOException { ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class); Pipeline p = buildDataflowPipeline(buildPipelineOptions(jobCaptor)); - p.apply(TextIO.Read.named("ReadMyNonGcsFile").from(tmpFolder.newFile().getPath())); + p.apply("ReadMyNonGcsFile", TextIO.Read.from(tmpFolder.newFile().getPath())); thrown.expectCause(Matchers.allOf( instanceOf(IllegalArgumentException.class), @@ -477,11 +478,11 @@ public void testNonGcsFilePathInReadFailure() throws IOException { @Test public void testNonGcsFilePathInWriteFailure() throws IOException { Pipeline p = buildDataflowPipeline(buildPipelineOptions()); - PCollection pc = p.apply(TextIO.Read.named("ReadMyGcsFile").from("gs://bucket/object")); + PCollection pc = p.apply("ReadMyGcsFile", TextIO.Read.from("gs://bucket/object")); thrown.expect(IllegalArgumentException.class); thrown.expectMessage(containsString("expected a valid 'gs://' path but was given")); - pc.apply(TextIO.Write.named("WriteMyNonGcsFile").to("/tmp/file")); + pc.apply("WriteMyNonGcsFile", TextIO.Write.to("/tmp/file")); } @Test @@ -489,8 +490,7 @@ public void testMultiSlashGcsFileReadPath() throws IOException { ArgumentCaptor jobCaptor = ArgumentCaptor.forClass(Job.class); Pipeline p = buildDataflowPipeline(buildPipelineOptions(jobCaptor)); - p.apply(TextIO.Read.named("ReadInvalidGcsFile") - .from("gs://bucket/tmp//file")); + p.apply("ReadInvalidGcsFile", TextIO.Read.from("gs://bucket/tmp//file")); thrown.expectCause(Matchers.allOf( instanceOf(IllegalArgumentException.class), @@ -502,11 +502,11 @@ public void testMultiSlashGcsFileReadPath() throws IOException { @Test public void testMultiSlashGcsFileWritePath() throws IOException { Pipeline p = buildDataflowPipeline(buildPipelineOptions()); - PCollection pc = p.apply(TextIO.Read.named("ReadMyGcsFile").from("gs://bucket/object")); + PCollection pc = p.apply("ReadMyGcsFile", TextIO.Read.from("gs://bucket/object")); thrown.expect(IllegalArgumentException.class); thrown.expectMessage("consecutive slashes"); - pc.apply(TextIO.Write.named("WriteInvalidGcsFile").to("gs://bucket/tmp//file")); + pc.apply("WriteInvalidGcsFile", TextIO.Write.to("gs://bucket/tmp//file")); } @Test diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java index da3fa7af9f973..6a3edd7e3cd5a 100644 --- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java +++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java @@ -89,8 +89,7 @@ public void testOutputFile() throws Exception { PCollection output = inputWords.apply(new CountWords()); File outputFile = testFolder.newFile(); - output.apply( - TextIO.Write.named("WriteCounts").to(outputFile.getAbsolutePath()).withoutSharding()); + output.apply("WriteCounts", TextIO.Write.to(outputFile.getAbsolutePath()).withoutSharding()); EvaluationResult res = SparkRunner.create().run(p); res.close(); diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java index 6a36c8dfeaa1b..7cac7052fb50a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java @@ -518,7 +518,6 @@ public void validate(PInput input) { + " query without a result flattening preference"); } - // Only verify existence/correctness if validation is enabled. if (validate) { // Check for source table/query presence for early failure notification. // Note that a presence check can fail if the table or dataset are created by earlier diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java index da886dee37930..6e26d33d92e4b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java @@ -136,6 +136,18 @@ private List readAvroFile() throws IOException { return users; } + void runTestRead( + String applyName, AvroIO.Read.Bound read, String expectedName, T[] expectedOutput) + throws Exception { + generateAvroFile(generateAvroObjects()); + + TestPipeline p = TestPipeline.create(); + PCollection output = p.apply(applyName, read); + PAssert.that(output).containsInAnyOrder(expectedOutput); + p.run(); + assertEquals(expectedName, output.getName()); + } + void runTestRead(AvroIO.Read.Bound read, String expectedName, T[] expectedOutput) throws Exception { generateAvroFile(generateAvroObjects()); @@ -158,28 +170,16 @@ public void testReadFromGeneratedClass() throws Exception { AvroIO.Read.withSchema(AvroGeneratedUser.class).from(avroFile.getPath()), "AvroIO.Read/Read.out", generateAvroObjects()); - runTestRead( - AvroIO.Read.named("MyRead").from(avroFile.getPath()).withSchema(AvroGeneratedUser.class), + runTestRead("MyRead", + AvroIO.Read.from(avroFile.getPath()).withSchema(AvroGeneratedUser.class), "MyRead/Read.out", generateAvroObjects()); - runTestRead( - AvroIO.Read.named("MyRead").withSchema(AvroGeneratedUser.class).from(avroFile.getPath()), + runTestRead("MyRead", + AvroIO.Read.withSchema(AvroGeneratedUser.class).from(avroFile.getPath()), "MyRead/Read.out", generateAvroObjects()); - runTestRead( - AvroIO.Read.from(avroFile.getPath()).withSchema(AvroGeneratedUser.class).named("HerRead"), - "HerRead/Read.out", - generateAvroObjects()); - runTestRead( - AvroIO.Read.from(avroFile.getPath()).named("HerRead").withSchema(AvroGeneratedUser.class), - "HerRead/Read.out", - generateAvroObjects()); - runTestRead( - AvroIO.Read.withSchema(AvroGeneratedUser.class).named("HerRead").from(avroFile.getPath()), - "HerRead/Read.out", - generateAvroObjects()); - runTestRead( - AvroIO.Read.withSchema(AvroGeneratedUser.class).from(avroFile.getPath()).named("HerRead"), + runTestRead("HerRead", + AvroIO.Read.from(avroFile.getPath()).withSchema(AvroGeneratedUser.class), "HerRead/Read.out", generateAvroObjects()); } @@ -195,28 +195,20 @@ public void testReadFromSchema() throws Exception { AvroIO.Read.withSchema(schema).from(avroFile.getPath()), "AvroIO.Read/Read.out", generateAvroGenericRecords()); - runTestRead( - AvroIO.Read.named("MyRead").from(avroFile.getPath()).withSchema(schema), + runTestRead("MyRead", + AvroIO.Read.from(avroFile.getPath()).withSchema(schema), "MyRead/Read.out", generateAvroGenericRecords()); - runTestRead( - AvroIO.Read.named("MyRead").withSchema(schema).from(avroFile.getPath()), + runTestRead("MyRead", + AvroIO.Read.withSchema(schema).from(avroFile.getPath()), "MyRead/Read.out", generateAvroGenericRecords()); - runTestRead( - AvroIO.Read.from(avroFile.getPath()).withSchema(schema).named("HerRead"), - "HerRead/Read.out", - generateAvroGenericRecords()); - runTestRead( - AvroIO.Read.from(avroFile.getPath()).named("HerRead").withSchema(schema), - "HerRead/Read.out", - generateAvroGenericRecords()); - runTestRead( - AvroIO.Read.withSchema(schema).named("HerRead").from(avroFile.getPath()), + runTestRead("HerRead", + AvroIO.Read.from(avroFile.getPath()).withSchema(schema), "HerRead/Read.out", generateAvroGenericRecords()); - runTestRead( - AvroIO.Read.withSchema(schema).from(avroFile.getPath()).named("HerRead"), + runTestRead("HerRead", + AvroIO.Read.from(avroFile.getPath()).withSchema(schema), "HerRead/Read.out", generateAvroGenericRecords()); } @@ -232,28 +224,12 @@ public void testReadFromSchemaString() throws Exception { AvroIO.Read.withSchema(schemaString).from(avroFile.getPath()), "AvroIO.Read/Read.out", generateAvroGenericRecords()); - runTestRead( - AvroIO.Read.named("MyRead").from(avroFile.getPath()).withSchema(schemaString), - "MyRead/Read.out", - generateAvroGenericRecords()); - runTestRead( - AvroIO.Read.named("MyRead").withSchema(schemaString).from(avroFile.getPath()), + runTestRead("MyRead", + AvroIO.Read.from(avroFile.getPath()).withSchema(schemaString), "MyRead/Read.out", generateAvroGenericRecords()); - runTestRead( - AvroIO.Read.from(avroFile.getPath()).withSchema(schemaString).named("HerRead"), - "HerRead/Read.out", - generateAvroGenericRecords()); - runTestRead( - AvroIO.Read.from(avroFile.getPath()).named("HerRead").withSchema(schemaString), - "HerRead/Read.out", - generateAvroGenericRecords()); - runTestRead( - AvroIO.Read.withSchema(schemaString).named("HerRead").from(avroFile.getPath()), - "HerRead/Read.out", - generateAvroGenericRecords()); - runTestRead( - AvroIO.Read.withSchema(schemaString).from(avroFile.getPath()).named("HerRead"), + runTestRead("HerRead", + AvroIO.Read.withSchema(schemaString).from(avroFile.getPath()), "HerRead/Read.out", generateAvroGenericRecords()); } @@ -276,106 +252,34 @@ void runTestWrite(AvroIO.Write.Bound write, String expectedName) @Test @Category(NeedsRunner.class) public void testWriteFromGeneratedClass() throws Exception { - runTestWrite(AvroIO.Write.to(avroFile.getPath()) - .withSchema(AvroGeneratedUser.class), - "AvroIO.Write"); - runTestWrite(AvroIO.Write.withSchema(AvroGeneratedUser.class) - .to(avroFile.getPath()), - "AvroIO.Write"); - runTestWrite(AvroIO.Write.named("MyWrite") - .to(avroFile.getPath()) - .withSchema(AvroGeneratedUser.class), - "MyWrite"); - runTestWrite(AvroIO.Write.named("MyWrite") - .withSchema(AvroGeneratedUser.class) - .to(avroFile.getPath()), - "MyWrite"); - runTestWrite(AvroIO.Write.to(avroFile.getPath()) - .withSchema(AvroGeneratedUser.class) - .named("HerWrite"), - "HerWrite"); - runTestWrite(AvroIO.Write.to(avroFile.getPath()) - .named("HerWrite") - .withSchema(AvroGeneratedUser.class), - "HerWrite"); - runTestWrite(AvroIO.Write.withSchema(AvroGeneratedUser.class) - .named("HerWrite") - .to(avroFile.getPath()), - "HerWrite"); - runTestWrite(AvroIO.Write.withSchema(AvroGeneratedUser.class) - .to(avroFile.getPath()) - .named("HerWrite"), - "HerWrite"); + runTestWrite( + AvroIO.Write.to(avroFile.getPath()).withSchema(AvroGeneratedUser.class), + "AvroIO.Write"); + runTestWrite( + AvroIO.Write.withSchema(AvroGeneratedUser.class).to(avroFile.getPath()), + "AvroIO.Write"); } @Test @Category(NeedsRunner.class) public void testWriteFromSchema() throws Exception { - runTestWrite(AvroIO.Write.to(avroFile.getPath()) - .withSchema(schema), - "AvroIO.Write"); - runTestWrite(AvroIO.Write.withSchema(schema) - .to(avroFile.getPath()), - "AvroIO.Write"); - runTestWrite(AvroIO.Write.named("MyWrite") - .to(avroFile.getPath()) - .withSchema(schema), - "MyWrite"); - runTestWrite(AvroIO.Write.named("MyWrite") - .withSchema(schema) - .to(avroFile.getPath()), - "MyWrite"); - runTestWrite(AvroIO.Write.to(avroFile.getPath()) - .withSchema(schema) - .named("HerWrite"), - "HerWrite"); - runTestWrite(AvroIO.Write.to(avroFile.getPath()) - .named("HerWrite") - .withSchema(schema), - "HerWrite"); - runTestWrite(AvroIO.Write.withSchema(schema) - .named("HerWrite") - .to(avroFile.getPath()), - "HerWrite"); - runTestWrite(AvroIO.Write.withSchema(schema) - .to(avroFile.getPath()) - .named("HerWrite"), - "HerWrite"); + runTestWrite( + AvroIO.Write.to(avroFile.getPath()).withSchema(schema), + "AvroIO.Write"); + runTestWrite( + AvroIO.Write.withSchema(schema).to(avroFile.getPath()), + "AvroIO.Write"); } @Test @Category(NeedsRunner.class) public void testWriteFromSchemaString() throws Exception { - runTestWrite(AvroIO.Write.to(avroFile.getPath()) - .withSchema(schemaString), - "AvroIO.Write"); - runTestWrite(AvroIO.Write.withSchema(schemaString) - .to(avroFile.getPath()), - "AvroIO.Write"); - runTestWrite(AvroIO.Write.named("MyWrite") - .to(avroFile.getPath()) - .withSchema(schemaString), - "MyWrite"); - runTestWrite(AvroIO.Write.named("MyWrite") - .withSchema(schemaString) - .to(avroFile.getPath()), - "MyWrite"); - runTestWrite(AvroIO.Write.to(avroFile.getPath()) - .withSchema(schemaString) - .named("HerWrite"), - "HerWrite"); - runTestWrite(AvroIO.Write.to(avroFile.getPath()) - .named("HerWrite") - .withSchema(schemaString), - "HerWrite"); - runTestWrite(AvroIO.Write.withSchema(schemaString) - .named("HerWrite") - .to(avroFile.getPath()), - "HerWrite"); - runTestWrite(AvroIO.Write.withSchema(schemaString) - .to(avroFile.getPath()) - .named("HerWrite"), - "HerWrite"); + runTestWrite( + AvroIO.Write.to(avroFile.getPath()).withSchema(schemaString), + "AvroIO.Write"); + runTestWrite( + AvroIO.Write.withSchema(schemaString).to(avroFile.getPath()), + "AvroIO.Write"); } // TODO: for Write only, test withSuffix, withNumShards, diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java index 13c1bcf2a76da..8625b1050101c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; + import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -81,10 +82,6 @@ public void testWriteWithoutValidationFlag() throws Exception { public void testAvroIOGetName() { assertEquals("AvroIO.Read", AvroIO.Read.from("gs://bucket/foo*/baz").getName()); assertEquals("AvroIO.Write", AvroIO.Write.to("gs://bucket/foo/baz").getName()); - assertEquals("ReadMyFile", - AvroIO.Read.named("ReadMyFile").from("gs://bucket/foo*/baz").getName()); - assertEquals("WriteMyFile", - AvroIO.Write.named("WriteMyFile").to("gs://bucket/foo/baz").getName()); } @DefaultCoder(AvroCoder.class) diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java index a1daf722d3579..f0d3fcece2757 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java @@ -340,8 +340,7 @@ private void checkReadTableObject( checkReadTableObjectWithValidate(bound, project, dataset, table, true); } - private void checkReadQueryObject( - BigQueryIO.Read.Bound bound, String query) { + private void checkReadQueryObject(BigQueryIO.Read.Bound bound, String query) { checkReadQueryObjectWithValidate(bound, query, true); } @@ -393,15 +392,13 @@ public void setUp() throws IOException { @Test public void testBuildTableBasedSource() { - BigQueryIO.Read.Bound bound = BigQueryIO.Read.named("ReadMyTable") - .from("foo.com:project:somedataset.sometable"); + BigQueryIO.Read.Bound bound = BigQueryIO.Read.from("foo.com:project:somedataset.sometable"); checkReadTableObject(bound, "foo.com:project", "somedataset", "sometable"); } @Test public void testBuildQueryBasedSource() { - BigQueryIO.Read.Bound bound = BigQueryIO.Read.named("ReadMyQuery") - .fromQuery("foo_query"); + BigQueryIO.Read.Bound bound = BigQueryIO.Read.fromQuery("foo_query"); checkReadQueryObject(bound, "foo_query"); } @@ -409,8 +406,8 @@ public void testBuildQueryBasedSource() { public void testBuildTableBasedSourceWithoutValidation() { // This test just checks that using withoutValidation will not trigger object // construction errors. - BigQueryIO.Read.Bound bound = BigQueryIO.Read.named("ReadMyTable") - .from("foo.com:project:somedataset.sometable").withoutValidation(); + BigQueryIO.Read.Bound bound = + BigQueryIO.Read.from("foo.com:project:somedataset.sometable").withoutValidation(); checkReadTableObjectWithValidate(bound, "foo.com:project", "somedataset", "sometable", false); } @@ -418,15 +415,15 @@ public void testBuildTableBasedSourceWithoutValidation() { public void testBuildQueryBasedSourceWithoutValidation() { // This test just checks that using withoutValidation will not trigger object // construction errors. - BigQueryIO.Read.Bound bound = BigQueryIO.Read.named("ReadMyTable") - .fromQuery("some_query").withoutValidation(); + BigQueryIO.Read.Bound bound = + BigQueryIO.Read.fromQuery("some_query").withoutValidation(); checkReadQueryObjectWithValidate(bound, "some_query", false); } @Test public void testBuildTableBasedSourceWithDefaultProject() { - BigQueryIO.Read.Bound bound = BigQueryIO.Read.named("ReadMyTable") - .from("somedataset.sometable"); + BigQueryIO.Read.Bound bound = + BigQueryIO.Read.from("somedataset.sometable"); checkReadTableObject(bound, null, "somedataset", "sometable"); } @@ -436,8 +433,7 @@ public void testBuildSourceWithTableReference() { .setProjectId("foo.com:project") .setDatasetId("somedataset") .setTableId("sometable"); - BigQueryIO.Read.Bound bound = BigQueryIO.Read.named("ReadMyTable") - .from(table); + BigQueryIO.Read.Bound bound = BigQueryIO.Read.from(table); checkReadTableObject(bound, "foo.com:project", "somedataset", "sometable"); } @@ -457,18 +453,7 @@ public void testValidateReadSetsDefaultProject() { thrown.expectMessage( Matchers.either(Matchers.containsString("Unable to confirm BigQuery dataset presence")) .or(Matchers.containsString("BigQuery dataset not found for table"))); - p.apply(BigQueryIO.Read.named("ReadMyTable").from(tableRef)); - } - - @Test - @Category(RunnableOnService.class) - public void testBuildSourceWithoutTableOrQuery() { - Pipeline p = TestPipeline.create(); - thrown.expect(IllegalStateException.class); - thrown.expectMessage( - "Invalid BigQuery read operation, either table reference or query has to be set"); - p.apply(BigQueryIO.Read.named("ReadMyTable")); - p.run(); + p.apply(BigQueryIO.Read.from(tableRef)); } @Test @@ -490,8 +475,8 @@ public void testBuildSourceWithTableAndQuery() { thrown.expectMessage( "Invalid BigQuery read operation. Specifies both a query and a table, only one of these" + " should be provided"); - p.apply( - BigQueryIO.Read.named("ReadMyTable") + p.apply("ReadMyTable", + BigQueryIO.Read .from("foo.com:project:somedataset.sometable") .fromQuery("query")); p.run(); @@ -505,8 +490,8 @@ public void testBuildSourceWithTableAndFlatten() { thrown.expectMessage( "Invalid BigQuery read operation. Specifies a" + " table with a result flattening preference, which is not configurable"); - p.apply( - BigQueryIO.Read.named("ReadMyTable") + p.apply("ReadMyTable", + BigQueryIO.Read .from("foo.com:project:somedataset.sometable") .withoutResultFlattening()); p.run(); @@ -521,7 +506,7 @@ public void testBuildSourceWithTableAndFlattenWithoutValidation() { "Invalid BigQuery read operation. Specifies a" + " table with a result flattening preference, which is not configurable"); p.apply( - BigQueryIO.Read.named("ReadMyTable") + BigQueryIO.Read .from("foo.com:project:somedataset.sometable") .withoutValidation() .withoutResultFlattening()); @@ -644,8 +629,7 @@ public void testBuildSourceDisplayData() { @Test public void testBuildSink() { - BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable") - .to("foo.com:project:somedataset.sometable"); + BigQueryIO.Write.Bound bound = BigQueryIO.Write.to("foo.com:project:somedataset.sometable"); checkWriteObject( bound, "foo.com:project", "somedataset", "sometable", null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY); @@ -655,8 +639,8 @@ public void testBuildSink() { public void testBuildSinkwithoutValidation() { // This test just checks that using withoutValidation will not trigger object // construction errors. - BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable") - .to("foo.com:project:somedataset.sometable").withoutValidation(); + BigQueryIO.Write.Bound bound = + BigQueryIO.Write.to("foo.com:project:somedataset.sometable").withoutValidation(); checkWriteObjectWithValidate( bound, "foo.com:project", "somedataset", "sometable", null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, false); @@ -664,8 +648,7 @@ public void testBuildSinkwithoutValidation() { @Test public void testBuildSinkDefaultProject() { - BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable") - .to("somedataset.sometable"); + BigQueryIO.Write.Bound bound = BigQueryIO.Write.to("somedataset.sometable"); checkWriteObject( bound, null, "somedataset", "sometable", null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY); @@ -677,8 +660,7 @@ public void testBuildSinkWithTableReference() { .setProjectId("foo.com:project") .setDatasetId("somedataset") .setTableId("sometable"); - BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable") - .to(table); + BigQueryIO.Write.Bound bound = BigQueryIO.Write.to(table); checkWriteObject( bound, "foo.com:project", "somedataset", "sometable", null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY); @@ -691,14 +673,14 @@ public void testBuildSinkWithoutTable() { thrown.expect(IllegalStateException.class); thrown.expectMessage("must set the table reference"); p.apply(Create.of().withCoder(TableRowJsonCoder.of())) - .apply(BigQueryIO.Write.named("WriteMyTable")); + .apply(BigQueryIO.Write.withoutValidation()); } @Test public void testBuildSinkWithSchema() { TableSchema schema = new TableSchema(); - BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable") - .to("foo.com:project:somedataset.sometable").withSchema(schema); + BigQueryIO.Write.Bound bound = + BigQueryIO.Write.to("foo.com:project:somedataset.sometable").withSchema(schema); checkWriteObject( bound, "foo.com:project", "somedataset", "sometable", schema, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY); @@ -706,7 +688,7 @@ public void testBuildSinkWithSchema() { @Test public void testBuildSinkWithCreateDispositionNever() { - BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable") + BigQueryIO.Write.Bound bound = BigQueryIO.Write .to("foo.com:project:somedataset.sometable") .withCreateDisposition(CreateDisposition.CREATE_NEVER); checkWriteObject( @@ -716,7 +698,7 @@ public void testBuildSinkWithCreateDispositionNever() { @Test public void testBuildSinkWithCreateDispositionIfNeeded() { - BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable") + BigQueryIO.Write.Bound bound = BigQueryIO.Write .to("foo.com:project:somedataset.sometable") .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED); checkWriteObject( @@ -726,7 +708,7 @@ public void testBuildSinkWithCreateDispositionIfNeeded() { @Test public void testBuildSinkWithWriteDispositionTruncate() { - BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable") + BigQueryIO.Write.Bound bound = BigQueryIO.Write .to("foo.com:project:somedataset.sometable") .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE); checkWriteObject( @@ -736,7 +718,7 @@ public void testBuildSinkWithWriteDispositionTruncate() { @Test public void testBuildSinkWithWriteDispositionAppend() { - BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable") + BigQueryIO.Write.Bound bound = BigQueryIO.Write .to("foo.com:project:somedataset.sometable") .withWriteDisposition(WriteDisposition.WRITE_APPEND); checkWriteObject( @@ -746,7 +728,7 @@ public void testBuildSinkWithWriteDispositionAppend() { @Test public void testBuildSinkWithWriteDispositionEmpty() { - BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable") + BigQueryIO.Write.Bound bound = BigQueryIO.Write .to("foo.com:project:somedataset.sometable") .withWriteDisposition(WriteDisposition.WRITE_EMPTY); checkWriteObject( @@ -794,7 +776,7 @@ private void testWriteValidatesDataset(boolean streaming) { Matchers.either(Matchers.containsString("Unable to confirm BigQuery dataset presence")) .or(Matchers.containsString("BigQuery dataset not found for table"))); p.apply(Create.of().withCoder(TableRowJsonCoder.of())) - .apply(BigQueryIO.Write.named("WriteMyTable") + .apply(BigQueryIO.Write .to(tableRef) .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED) .withSchema(new TableSchema())); @@ -878,8 +860,6 @@ public void testCoder_nullCell() throws CoderException { public void testBigQueryIOGetName() { assertEquals("BigQueryIO.Read", BigQueryIO.Read.from("somedataset.sometable").getName()); assertEquals("BigQueryIO.Write", BigQueryIO.Write.to("somedataset.sometable").getName()); - assertEquals("ReadMyTable", BigQueryIO.Read.named("ReadMyTable").getName()); - assertEquals("WriteMyTable", BigQueryIO.Write.named("WriteMyTable").getName()); } @Test @@ -915,7 +895,7 @@ public void testWriteValidateFailsNoTableAndNoTableSpec() { thrown.expectMessage("must set the table reference of a BigQueryIO.Write transform"); TestPipeline.create() .apply(Create.of()) - .apply(BigQueryIO.Write.named("name")); + .apply("name", BigQueryIO.Write.withoutValidation()); } @Test diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java index b0c577d0b822a..c9f4079e5c1aa 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java @@ -21,6 +21,7 @@ import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionFails; import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent; import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource; + import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; @@ -717,7 +718,7 @@ public void testDataflowFile() throws IOException { File file = createFileWithData(fileName, data); TestFileBasedSource source = new TestFileBasedSource(file.getPath(), 64, null); - PCollection output = p.apply(Read.from(source).named("ReadFileData")); + PCollection output = p.apply("ReadFileData", Read.from(source)); PAssert.that(output).containsInAnyOrder(data); p.run(); @@ -743,7 +744,7 @@ public void testDataflowFilePattern() throws IOException { TestFileBasedSource source = new TestFileBasedSource(new File(file1.getParent(), "file*").getPath(), 64, null); - PCollection output = p.apply(Read.from(source).named("ReadFileData")); + PCollection output = p.apply("ReadFileData", Read.from(source)); List expectedResults = new ArrayList(); expectedResults.addAll(data1); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java index eaf452d8bafa7..efa1cd2717c93 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java @@ -45,10 +45,6 @@ public void testPubsubIOGetName() { PubsubIO.Read.topic("projects/myproject/topics/mytopic").getName()); assertEquals("PubsubIO.Write", PubsubIO.Write.topic("projects/myproject/topics/mytopic").getName()); - assertEquals("ReadMyTopic", - PubsubIO.Read.named("ReadMyTopic").topic("projects/myproject/topics/mytopic").getName()); - assertEquals("WriteMyTopic", - PubsubIO.Write.named("WriteMyTopic").topic("projects/myproject/topics/mytopic").getName()); } @Test diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java index c3a50849020f9..df598c8606c7f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java @@ -22,6 +22,7 @@ import static org.apache.beam.sdk.TestUtils.NO_INTS_ARRAY; import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; + import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; @@ -44,14 +45,12 @@ import org.apache.beam.sdk.testing.SourceTestUtils; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.CoderUtils; import org.apache.beam.sdk.util.GcsUtil; import org.apache.beam.sdk.util.IOChannelUtils; import org.apache.beam.sdk.util.gcsfs.GcsPath; import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PDone; import com.google.common.collect.ImmutableList; @@ -167,16 +166,9 @@ public void testReadNamed() throws Exception { } { - PCollection output2 = - p.apply(TextIO.Read.named("MyRead").from(file)); + PCollection output2 = p.apply("MyRead", TextIO.Read.from(file)); assertEquals("MyRead/Read.out", output2.getName()); } - - { - PCollection output3 = - p.apply(TextIO.Read.from(file).named("HerRead")); - assertEquals("HerRead/Read.out", output3.getName()); - } } @Test @@ -298,27 +290,6 @@ public void testWriteEmptyInts() throws Exception { runTestWrite(NO_INTS_ARRAY, TextualIntegerCoder.of()); } - @Test - public void testWriteNamed() { - { - PTransform, PDone> transform1 = - TextIO.Write.to("/tmp/file.txt"); - assertEquals("TextIO.Write", transform1.getName()); - } - - { - PTransform, PDone> transform2 = - TextIO.Write.named("MyWrite").to("/tmp/file.txt"); - assertEquals("MyWrite", transform2.getName()); - } - - { - PTransform, PDone> transform3 = - TextIO.Write.to("/tmp/file.txt").named("HerWrite"); - assertEquals("HerWrite", transform3.getName()); - } - } - @Test @Category(NeedsRunner.class) public void testShardedWrite() throws Exception { @@ -620,12 +591,8 @@ public void testGZIPReadWhenUncompressed() throws Exception { public void testTextIOGetName() { assertEquals("TextIO.Read", TextIO.Read.from("somefile").getName()); assertEquals("TextIO.Write", TextIO.Write.to("somefile").getName()); - assertEquals("ReadMyFile", TextIO.Read.named("ReadMyFile").from("somefile").getName()); - assertEquals("WriteMyFile", TextIO.Write.named("WriteMyFile").to("somefile").getName()); assertEquals("TextIO.Read", TextIO.Read.from("somefile").toString()); - assertEquals( - "ReadMyFile [TextIO.Read]", TextIO.Read.named("ReadMyFile").from("somefile").toString()); } @Test diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java index eb654685bb575..37e3881888136 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java @@ -21,6 +21,7 @@ import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionFails; import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; + import static org.hamcrest.Matchers.both; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.containsString; @@ -582,7 +583,7 @@ public void testReadXMLSmallPipeline() throws IOException { .withRecordClass(Train.class) .withMinBundleSize(1024); - PCollection output = p.apply(Read.from(source).named("ReadFileData")); + PCollection output = p.apply("ReadFileData", Read.from(source)); List expectedResults = ImmutableList.of(new Train("Thomas", 1, "blue", null), new Train("Henry", 3, "green", null), @@ -672,7 +673,7 @@ public void testReadXMLLargePipeline() throws IOException { .withRecordElement("train") .withRecordClass(Train.class) .withMinBundleSize(1024); - PCollection output = p.apply(Read.from(source).named("ReadFileData")); + PCollection output = p.apply("ReadFileData", Read.from(source)); PAssert.that(output).containsInAnyOrder(trains); p.run(); @@ -814,13 +815,13 @@ public void testReadXMLFilePattern() throws IOException { Pipeline p = TestPipeline.create(); - XmlSource source = XmlSource.from(file.getParent() + "/" - + "temp*.xml") - .withRootElement("trains") - .withRecordElement("train") - .withRecordClass(Train.class) - .withMinBundleSize(1024); - PCollection output = p.apply(Read.from(source).named("ReadFileData")); + XmlSource source = + XmlSource.from(file.getParent() + "/" + "temp*.xml") + .withRootElement("trains") + .withRecordElement("train") + .withRecordClass(Train.class) + .withMinBundleSize(1024); + PCollection output = p.apply("ReadFileData", Read.from(source)); List expectedResults = new ArrayList<>(); expectedResults.addAll(trains1); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java index 0c992c4910fc4..08c39962c7692 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java @@ -120,9 +120,9 @@ public void testCompositeCapture() throws Exception { Pipeline p = TestPipeline.create(); - p.apply(TextIO.Read.named("ReadMyFile").from(inputFile.getPath())) + p.apply("ReadMyFile", TextIO.Read.from(inputFile.getPath())) .apply(Sample.any(10)) - .apply(TextIO.Write.named("WriteMyFile").to(outputFile.getPath())); + .apply("WriteMyFile", TextIO.Write.to(outputFile.getPath())); final EnumSet visited = EnumSet.noneOf(TransformsSeen.class); diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java index c858f32d2663f..76bc03831b842 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java @@ -119,11 +119,11 @@ public void testWindowIntoPropagatesLateness() { FixedWindows fixed25 = FixedWindows.of(Duration.standardMinutes(25)); WindowingStrategy strategy = TestPipeline.create() .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of())) - .apply(Window.named("WindowInto10").into(fixed10) + .apply("WindowInto10", Window.into(fixed10) .withAllowedLateness(Duration.standardDays(1)) .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(5))) .accumulatingFiredPanes()) - .apply(Window.named("WindowInto25").into(fixed25)) + .apply("WindowInto25", Window.into(fixed25)) .getWindowingStrategy(); assertEquals(Duration.standardDays(1), strategy.getAllowedLateness()); @@ -272,7 +272,7 @@ public void testDisplayData() { @Test public void testDisplayDataExcludesUnspecifiedProperties() { - Window.Bound onlyHasAccumulationMode = Window.named("foobar").discardingFiredPanes(); + Window.Bound onlyHasAccumulationMode = Window.discardingFiredPanes(); assertThat(DisplayData.from(onlyHasAccumulationMode), not(hasDisplayItem(hasKey(isOneOf( "windowFn", "trigger", diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java index 21f58dfcd5b2b..c1e092acf8503 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java @@ -221,7 +221,7 @@ public void testTextIoInput() throws Exception { Pipeline p = TestPipeline.create(); PCollection output = p.begin() - .apply(TextIO.Read.named("ReadLines").from(filename)) + .apply("ReadLines", TextIO.Read.from(filename)) .apply(ParDo.of(new ExtractWordsWithTimestampsFn())) .apply(new WindowedCount(FixedWindows.of(Duration.millis(10)))); diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java index 3306cb4023c2e..c0e5b17247fb5 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java @@ -174,7 +174,7 @@ public static void main(String[] args) { Pipeline p = Pipeline.create(options); PCollection> filteredWords = - p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile())) + p.apply("ReadLines", TextIO.Read.from(options.getInputFile())) .apply(new WordCount.CountWords()) .apply(ParDo.of(new FilterTextFn(options.getFilterPattern()))); diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java index 07ed6d0ffc9c7..803e800541b56 100644 --- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java +++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java @@ -195,10 +195,10 @@ public static void main(String[] args) { // Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the // static FormatAsTextFn() to the ParDo transform. - p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile())) + p.apply("ReadLines", TextIO.Read.from(options.getInputFile())) .apply(new CountWords()) .apply(ParDo.of(new FormatAsTextFn())) - .apply(TextIO.Write.named("WriteCounts").to(options.getOutput())); + .apply("WriteCounts", TextIO.Write.to(options.getOutput())); p.run(); } From fc52a10259cd045f4b55ec59b2ae87c02c926ed4 Mon Sep 17 00:00:00 2001 From: Ben Chambers Date: Thu, 23 Jun 2016 17:55:24 -0700 Subject: [PATCH 2/2] Remove many definitions of named methods Specifically, remove the occurrences in: - Window - AvroIO - PubsubIO - TextIO - BigQueryIO - Read --- .../java/org/apache/beam/sdk/io/AvroIO.java | 53 +++--------------- .../org/apache/beam/sdk/io/BigQueryIO.java | 42 +------------- .../java/org/apache/beam/sdk/io/PubsubIO.java | 35 +----------- .../java/org/apache/beam/sdk/io/Read.java | 29 +--------- .../java/org/apache/beam/sdk/io/TextIO.java | 55 ++++--------------- .../org/apache/beam/sdk/io/package-info.java | 6 +- .../beam/sdk/transforms/windowing/Window.java | 42 -------------- 7 files changed, 25 insertions(+), 237 deletions(-) diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java index 4b40c01eedfc8..604051b9e4015 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java @@ -55,9 +55,7 @@ * {@link AvroIO.Read}, specifying {@link AvroIO.Read#from} to specify * the path of the file(s) to read from (e.g., a local filename or * filename pattern if running locally, or a Google Cloud Storage - * filename or filename pattern of the form - * {@code "gs:///"}), and optionally - * {@link AvroIO.Read#named} to specify the name of the pipeline step. + * filename or filename pattern of the form {@code "gs:///"}). * *

It is required to specify {@link AvroIO.Read#withSchema}. To * read specific records, such as Avro-generated classes, provide an @@ -73,15 +71,15 @@ * // A simple Read of a local file (only runs locally): * PCollection records = * p.apply(AvroIO.Read.from("/path/to/file.avro") - * .withSchema(AvroAutoGenClass.class)); + * .withSchema(AvroAutoGenClass.class)); * * // A Read from a GCS file (runs locally and via the Google Cloud * // Dataflow service): * Schema schema = new Schema.Parser().parse(new File("schema.avsc")); * PCollection records = - * p.apply(AvroIO.Read.named("ReadFromAvro") - * .from("gs://my_bucket/path/to/records-*.avro") - * .withSchema(schema)); + * p.apply(AvroIO.Read + * .from("gs://my_bucket/path/to/records-*.avro") + * .withSchema(schema)); * } * *

To write a {@link PCollection} to one or more Avro files, use @@ -110,10 +108,10 @@ * // Dataflow service): * Schema schema = new Schema.Parser().parse(new File("schema.avsc")); * PCollection records = ...; - * records.apply(AvroIO.Write.named("WriteToAvro") - * .to("gs://my_bucket/path/to/numbers") - * .withSchema(schema) - * .withSuffix(".avro")); + * records.apply("WriteToAvro", AvroIO.Write + * .to("gs://my_bucket/path/to/numbers") + * .withSchema(schema) + * .withSuffix(".avro")); * } * *

Permissions

@@ -128,12 +126,6 @@ public class AvroIO { * the decoding of each record. */ public static class Read { - /** - * Returns a {@link PTransform} with the given step name. - */ - public static Bound named(String name) { - return new Bound<>(GenericRecord.class).named(name); - } /** * Returns a {@link PTransform} that reads from the file(s) @@ -221,16 +213,6 @@ public static class Bound extends PTransform> { this.validate = validate; } - /** - * Returns a new {@link PTransform} that's like this one but - * with the given step name. - * - *

Does not modify this object. - */ - public Bound named(String name) { - return new Bound<>(name, filepattern, type, schema, validate); - } - /** * Returns a new {@link PTransform} that's like this one but * that reads from the file(s) with the given name or pattern. @@ -366,12 +348,6 @@ private Read() {} * multiple Avro files matching a sharding pattern). */ public static class Write { - /** - * Returns a {@link PTransform} with the given step name. - */ - public static Bound named(String name) { - return new Bound<>(GenericRecord.class).named(name); - } /** * Returns a {@link PTransform} that writes to the file(s) @@ -520,17 +496,6 @@ public static class Bound extends PTransform, PDone> { this.validate = validate; } - /** - * Returns a new {@link PTransform} that's like this one but - * with the given step name. - * - *

Does not modify this object. - */ - public Bound named(String name) { - return new Bound<>( - name, filenamePrefix, filenameSuffix, numShards, shardTemplate, type, schema, validate); - } - /** * Returns a new {@link PTransform} that's like this one but * that writes to the file(s) with the given filename prefix. diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java index 7cac7052fb50a..a9d85b8e10de0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java @@ -162,8 +162,7 @@ * This produces a {@link PCollection} of {@link TableRow TableRows} as output: *

{@code
  * PCollection shakespeare = pipeline.apply(
- *     BigQueryIO.Read.named("Read")
- *                    .from("clouddataflow-readonly:samples.weather_stations"));
+ *     BigQueryIO.Read.from("clouddataflow-readonly:samples.weather_stations"));
  * }
* *

See {@link TableRow} for more information on the {@link TableRow} object. @@ -174,8 +173,7 @@ * *

{@code
  * PCollection shakespeare = pipeline.apply(
- *     BigQueryIO.Read.named("Read")
- *                    .fromQuery("SELECT year, mean_temp FROM samples.weather_stations"));
+ *     BigQueryIO.Read.fromQuery("SELECT year, mean_temp FROM samples.weather_stations"));
  * }
* *

When creating a BigQuery input transform, users should provide either a query or a table. @@ -193,7 +191,6 @@ * TableSchema schema = new TableSchema().setFields(fields); * * quotes.apply(BigQueryIO.Write - * .named("Write") * .to("my-project:output.output_table") * .withSchema(schema) * .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE)); @@ -214,7 +211,6 @@ * PCollection quotes = ... * quotes.apply(Window.into(CalendarWindows.days(1))) * .apply(BigQueryIO.Write - * .named("Write") * .withSchema(schema) * .to(new SerializableFunction() { * public String apply(BoundedWindow window) { @@ -345,13 +341,6 @@ public static String toTableSpec(TableReference ref) { * }} */ public static class Read { - /** - * Returns a {@link Read.Bound} with the given name. The BigQuery table or query to be read - * from has not yet been configured. - */ - public static Bound named(String name) { - return new Bound().named(name); - } /** * Reads a BigQuery table specified as {@code "[project_id]:[dataset_id].[table_id]"} or @@ -428,15 +417,6 @@ private Bound( this.testBigQueryServices = testBigQueryServices; } - /** - * Returns a copy of this transform using the name associated with this transformation. - * - *

Does not modify this object. - */ - public Bound named(String name) { - return new Bound(name, query, jsonTableRef, validate, flattenResults, testBigQueryServices); - } - /** * Returns a copy of this transform that reads from the specified table. Refer to * {@link #parseTableSpec(String)} for the specification format. @@ -1371,14 +1351,6 @@ public enum WriteDisposition { WRITE_EMPTY } - /** - * Creates a write transformation with the given transform name. The BigQuery table to be - * written has not yet been configured. - */ - public static Bound named(String name) { - return new Bound().named(name); - } - /** * Creates a write transformation for the given table specification. * @@ -1521,16 +1493,6 @@ private Bound(String name, @Nullable String jsonTableRef, this.testBigQueryServices = testBigQueryServices; } - /** - * Returns a copy of this write transformation, but with the specified transform name. - * - *

Does not modify this object. - */ - public Bound named(String name) { - return new Bound(name, jsonTableRef, tableRefFunction, jsonSchema, createDisposition, - writeDisposition, validate, testBigQueryServices); - } - /** * Returns a copy of this write transformation, but writing to the specified table. Refer to * {@link #parseTableSpec(String)} for the specification format. diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java index c6de8b424bc7a..ecb1f0a6f2631 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/PubsubIO.java @@ -55,6 +55,7 @@ import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; + import javax.annotation.Nullable; /** @@ -369,13 +370,6 @@ public String asPath() { * {@link Bound#maxNumRecords(int)} or {@link Bound#maxReadTime(Duration)} must be set. */ public static class Read { - /** - * Creates and returns a transform for reading from Cloud Pub/Sub with the specified transform - * name. - */ - public static Bound named(String name) { - return new Bound<>(DEFAULT_PUBSUB_CODER).named(name); - } /** * Creates and returns a transform for reading from a Cloud Pub/Sub topic. Mutually exclusive @@ -530,16 +524,6 @@ private Bound(String name, PubsubSubscription subscription, PubsubTopic topic, this.maxReadTime = maxReadTime; } - /** - * Returns a transform that's like this one but with the given step name. - * - *

Does not modify this object. - */ - public Bound named(String name) { - return new Bound<>( - name, subscription, topic, timestampLabel, coder, idLabel, maxNumRecords, maxReadTime); - } - /** * Returns a transform that's like this one but reading from the * given subscription. @@ -833,13 +817,6 @@ private PubsubIO() {} */ // TODO: Support non-String encodings. public static class Write { - /** - * Creates a transform that writes to Pub/Sub with the given step name. - */ - public static Bound named(String name) { - return new Bound<>(DEFAULT_PUBSUB_CODER).named(name); - } - /** * Creates a transform that publishes to the specified topic. * @@ -916,16 +893,6 @@ private Bound( this.coder = coder; } - /** - * Returns a new transform that's like this one but with the specified step - * name. - * - *

Does not modify this object. - */ - public Bound named(String name) { - return new Bound<>(name, topic, timestampLabel, idLabel, coder); - } - /** * Returns a new transform that's like this one but that writes to the specified * topic. diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java index c0440f260104e..e13ff06dc39db 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Read.java @@ -38,17 +38,10 @@ *

Usage example: *

  * Pipeline p = Pipeline.create();
- * p.apply(Read.from(new MySource().withFoo("foo").withBar("bar"))
- *             .named("foobar"));
+ * p.apply(Read.from(new MySource().withFoo("foo").withBar("bar")));
  * 
*/ public class Read { - /** - * Returns a new {@code Read} {@code PTransform} builder with the given name. - */ - public static Builder named(String name) { - return new Builder(name); - } /** * Returns a new {@code Read.Bounded} {@code PTransform} reading from the given @@ -104,16 +97,6 @@ private Bounded(@Nullable String name, BoundedSource source) { this.source = SerializableUtils.ensureSerializable(source); } - /** - * Returns a new {@code Bounded} {@code PTransform} that's like this one but - * has the given name. - * - *

Does not modify this object. - */ - public Bounded named(String name) { - return new Bounded(name, source); - } - @Override protected Coder getDefaultOutputCoder() { return source.getDefaultOutputCoder(); @@ -161,16 +144,6 @@ private Unbounded(@Nullable String name, UnboundedSource source) { this.source = SerializableUtils.ensureSerializable(source); } - /** - * Returns a new {@code Unbounded} {@code PTransform} that's like this one but - * has the given name. - * - *

Does not modify this object. - */ - public Unbounded named(String name) { - return new Unbounded(name, source); - } - /** * Returns a new {@link BoundedReadFromUnboundedSource} that reads a bounded amount * of data from the given {@link UnboundedSource}. The bound is specified as a number diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java index a7e5e296756f0..7e7a3e6caa217 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java @@ -58,8 +58,7 @@ * the path of the file(s) to read from (e.g., a local filename or * filename pattern if running locally, or a Google Cloud Storage * filename or filename pattern of the form - * {@code "gs:///"}). You may optionally call - * {@link TextIO.Read#named(String)} to specify the name of the pipeline step. + * {@code "gs:///"}). * *

By default, {@link TextIO.Read} returns a {@link PCollection} of {@link String Strings}, * each corresponding to one line of an input UTF-8 text file. To convert directly from the raw @@ -78,9 +77,9 @@ * // A fully-specified Read from a GCS file (runs locally and via the * // Google Cloud Dataflow service): * PCollection numbers = - * p.apply(TextIO.Read.named("ReadNumbers") - * .from("gs://my_bucket/path/to/numbers-*.txt") - * .withCoder(TextualIntegerCoder.of())); + * p.apply("ReadNumbers", TextIO.Read + * .from("gs://my_bucket/path/to/numbers-*.txt") + * .withCoder(TextualIntegerCoder.of())); * } * *

To write a {@link PCollection} to one or more text files, use @@ -88,9 +87,8 @@ * the path of the file to write to (e.g., a local filename or sharded * filename pattern if running locally, or a Google Cloud Storage * filename or sharded filename pattern of the form - * {@code "gs:///"}). You can optionally name the resulting transform using - * {@link TextIO.Write#named(String)}, and you can use {@link TextIO.Write#withCoder(Coder)} - * to specify the Coder to use to encode the Java values into text lines. + * {@code "gs:///"}). You can use {@link TextIO.Write#withCoder(Coder)} + * to specify the {@link Coder} to use to encode the Java values into text lines. * *

Any existing files with the same names as generated output files * will be overwritten. @@ -104,10 +102,10 @@ * // A fully-specified Write to a sharded GCS file (runs locally and via the * // Google Cloud Dataflow service): * PCollection numbers = ...; - * numbers.apply(TextIO.Write.named("WriteNumbers") - * .to("gs://my_bucket/path/to/numbers") - * .withSuffix(".txt") - * .withCoder(TextualIntegerCoder.of())); + * numbers.apply("WriteNumbers", TextIO.Write + * .to("gs://my_bucket/path/to/numbers") + * .withSuffix(".txt") + * .withCoder(TextualIntegerCoder.of())); * } * *

Permissions

@@ -130,12 +128,6 @@ public class TextIO { * {@link #withCoder(Coder)} to change the return type. */ public static class Read { - /** - * Returns a transform for reading text files that uses the given step name. - */ - public static Bound named(String name) { - return new Bound<>(DEFAULT_TEXT_CODER).named(name); - } /** * Returns a transform for reading text files that reads from the file(s) @@ -226,16 +218,6 @@ private Bound(String name, String filepattern, Coder coder, boolean validate, this.compressionType = compressionType; } - /** - * Returns a new transform for reading from text files that's like this one but - * with the given step name. - * - *

Does not modify this object. - */ - public Bound named(String name) { - return new Bound<>(name, filepattern, coder, validate, compressionType); - } - /** * Returns a new transform for reading from text files that's like this one but * that reads from the file(s) with the given name or pattern. See {@link TextIO.Read#from} @@ -387,12 +369,6 @@ private Read() {} * element of the input collection encoded into its own line. */ public static class Write { - /** - * Returns a transform for writing to text files with the given step name. - */ - public static Bound named(String name) { - return new Bound<>(DEFAULT_TEXT_CODER).named(name); - } /** * Returns a transform for writing to text files that writes to the file(s) @@ -519,17 +495,6 @@ private Bound(String name, String filenamePrefix, String filenameSuffix, CoderDoes not modify this object. - */ - public Bound named(String name) { - return new Bound<>(name, filenamePrefix, filenameSuffix, coder, numShards, - shardTemplate, validate); - } - /** * Returns a transform for writing to text files that's like this one but * that writes to the file(s) with the given filename prefix. diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java index c2c0685594c7b..432c5df9ba2c7 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java @@ -25,14 +25,12 @@ * from existing storage: *

{@code
  * PCollection inputData = pipeline.apply(
- *     BigQueryIO.Read.named("Read")
- *                    .from("clouddataflow-readonly:samples.weather_stations");
+ *     BigQueryIO.Read.from("clouddataflow-readonly:samples.weather_stations"));
  * }
* and {@code Write} transforms that persist PCollections to external storage: *
 {@code
  * PCollection numbers = ...;
- * numbers.apply(TextIO.Write.named("WriteNumbers")
- *                           .to("gs://my_bucket/path/to/numbers"));
+ * numbers.apply(TextIO.Write.to("gs://my_bucket/path/to/numbers"));
  * } 
*/ package org.apache.beam.sdk.io; diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java index dde5c0536abd1..bc122e2eead13 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java @@ -158,21 +158,6 @@ public enum ClosingBehavior { FIRE_IF_NON_EMPTY; } - /** - * Creates a {@code Window} {@code PTransform} with the given name. - * - *

See the discussion of Naming in - * {@link org.apache.beam.sdk.transforms.ParDo} for more explanation. - * - *

The resulting {@code PTransform} is incomplete, and its input/output - * type is not yet bound. Use {@link Window.Unbound#into} to specify the - * {@link WindowFn} to use, which will also bind the input/output type of this - * {@code PTransform}. - */ - public static Unbound named(String name) { - return new Unbound().named(name); - } - /** * Creates a {@code Window} {@code PTransform} that uses the given * {@link WindowFn} to window the data. @@ -254,19 +239,6 @@ public static class Unbound { this.name = name; } - /** - * Returns a new {@code Window} transform that's like this - * transform but with the specified name. Does not modify this - * transform. The resulting transform is still incomplete. - * - *

See the discussion of Naming in - * {@link org.apache.beam.sdk.transforms.ParDo} for more - * explanation. - */ - public Unbound named(String name) { - return new Unbound(name); - } - /** * Returns a new {@code Window} {@code PTransform} that's like this * transform but that will use the given {@link WindowFn}, and that has @@ -407,20 +379,6 @@ private Bound into(WindowFn windowFn) { name, windowFn, trigger, mode, allowedLateness, closingBehavior, outputTimeFn); } - /** - * Returns a new {@code Window} {@code PTransform} that's like this - * {@code PTransform} but with the specified name. Does not - * modify this {@code PTransform}. - * - *

See the discussion of Naming in - * {@link org.apache.beam.sdk.transforms.ParDo} for more - * explanation. - */ - public Bound named(String name) { - return new Bound<>( - name, windowFn, trigger, mode, allowedLateness, closingBehavior, outputTimeFn); - } - /** * Sets a non-default trigger for this {@code Window} {@code PTransform}. * Elements that are assigned to a specific window will be output when