From 5200ebf3586a0424093f160c287394c85f466d73 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Baptiste=20Onofr=C3=A9?= Date: Wed, 18 May 2016 18:46:34 +0200 Subject: [PATCH] [BEAM-286] Reorganize flink runner module to follow other runners structure --- runners/flink/examples/pom.xml | 109 ----- .../beam/runners/flink/examples/TFIDF.java | 454 ------------------ .../runners/flink/examples/WordCount.java | 120 ----- .../examples/streaming/AutoComplete.java | 403 ---------------- .../examples/streaming/JoinExamples.java | 163 ------- .../examples/streaming/KafkaIOExamples.java | 337 ------------- .../KafkaWindowedWordCountExample.java | 149 ------ .../examples/streaming/WindowedWordCount.java | 137 ------ runners/flink/pom.xml | 136 +++++- runners/flink/runner/pom.xml | 177 ------- .../flink/DefaultParallelismFactory.java | 0 .../FlinkPipelineExecutionEnvironment.java | 0 .../runners/flink/FlinkPipelineOptions.java | 0 .../runners/flink/FlinkPipelineRunner.java | 0 .../runners/flink/FlinkRunnerRegistrar.java | 0 .../beam/runners/flink/FlinkRunnerResult.java | 0 .../beam/runners/flink/io/ConsoleIO.java | 0 .../FlinkBatchPipelineTranslator.java | 0 .../FlinkBatchTransformTranslators.java | 0 .../FlinkBatchTranslationContext.java | 0 .../translation/FlinkPipelineTranslator.java | 0 .../FlinkStreamingPipelineTranslator.java | 0 .../FlinkStreamingTransformTranslators.java | 0 .../FlinkStreamingTranslationContext.java | 0 .../FlinkCoGroupKeyedListAggregator.java | 0 .../functions/FlinkCreateFunction.java | 0 .../functions/FlinkDoFnFunction.java | 0 .../FlinkKeyedListAggregationFunction.java | 0 .../FlinkMultiOutputDoFnFunction.java | 0 .../FlinkMultiOutputPruningFunction.java | 0 .../functions/FlinkPartialReduceFunction.java | 0 .../functions/FlinkReduceFunction.java | 0 .../translation/functions/UnionCoder.java | 0 .../translation/types/CoderComparator.java | 0 .../types/CoderTypeInformation.java | 0 .../types/CoderTypeSerializer.java | 0 .../flink/translation/types/FlinkCoder.java | 0 .../InspectableByteArrayOutputStream.java | 0 .../translation/types/KvCoderComperator.java | 0 .../types/KvCoderTypeInformation.java | 0 .../types/VoidCoderTypeSerializer.java | 0 .../utils/SerializedPipelineOptions.java | 0 .../wrappers/CombineFnAggregatorWrapper.java | 0 .../wrappers/DataInputViewWrapper.java | 0 .../wrappers/DataOutputViewWrapper.java | 0 .../SerializableFnAggregatorWrapper.java | 0 .../wrappers/SinkOutputFormat.java | 0 .../wrappers/SourceInputFormat.java | 0 .../wrappers/SourceInputSplit.java | 0 .../streaming/FlinkAbstractParDoWrapper.java | 0 .../FlinkGroupAlsoByWindowWrapper.java | 0 .../streaming/FlinkGroupByKeyWrapper.java | 0 .../FlinkParDoBoundMultiWrapper.java | 0 .../streaming/FlinkParDoBoundWrapper.java | 0 .../io/FlinkStreamingCreateFunction.java | 0 .../streaming/io/UnboundedFlinkSink.java | 0 .../streaming/io/UnboundedFlinkSource.java | 0 .../streaming/io/UnboundedSocketSource.java | 0 .../streaming/io/UnboundedSourceWrapper.java | 0 .../state/AbstractFlinkTimerInternals.java | 0 .../streaming/state/FlinkStateInternals.java | 0 .../state/StateCheckpointReader.java | 0 .../streaming/state/StateCheckpointUtils.java | 0 .../state/StateCheckpointWriter.java | 0 .../wrappers/streaming/state/StateType.java | 0 .../src/main/resources/log4j.properties | 0 .../apache/beam/runners/flink/AvroITCase.java | 0 .../beam/runners/flink/FlattenizeITCase.java | 0 .../flink/FlinkRunnerRegistrarTest.java | 0 .../beam/runners/flink/FlinkTestPipeline.java | 0 .../runners/flink/JoinExamplesITCase.java | 0 .../runners/flink/MaybeEmptyTestITCase.java | 0 .../runners/flink/ParDoMultiOutputITCase.java | 0 .../runners/flink/PipelineOptionsTest.java | 0 .../beam/runners/flink/ReadSourceITCase.java | 0 .../flink/ReadSourceStreamingITCase.java | 0 .../flink/RemoveDuplicatesEmptyITCase.java | 0 .../runners/flink/RemoveDuplicatesITCase.java | 0 .../beam/runners/flink/SideInputITCase.java | 0 .../beam/runners/flink/TfIdfITCase.java | 0 .../beam/runners/flink/WordCountITCase.java | 0 .../runners/flink/WordCountJoin2ITCase.java | 0 .../runners/flink/WordCountJoin3ITCase.java | 0 .../beam/runners/flink/WriteSinkITCase.java | 0 .../streaming/GroupAlsoByWindowTest.java | 0 .../flink/streaming/GroupByNullKeyTest.java | 0 .../streaming/StateSerializationTest.java | 0 .../flink/streaming/TestCountingSource.java | 0 .../streaming/TopWikipediaSessionsITCase.java | 0 .../streaming/UnboundedSourceWrapperTest.java | 0 .../beam/runners/flink/util/JoinExamples.java | 0 .../src/test/resources/log4j-test.properties | 0 92 files changed, 115 insertions(+), 2070 deletions(-) delete mode 100644 runners/flink/examples/pom.xml delete mode 100644 runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java delete mode 100644 runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java delete mode 100644 runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java delete mode 100644 runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java delete mode 100644 runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java delete mode 100644 runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java delete mode 100644 runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java delete mode 100644 runners/flink/runner/pom.xml rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkCoGroupKeyedListAggregator.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkCreateFunction.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkKeyedListAggregationFunction.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/functions/UnionCoder.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/types/CoderComparator.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/types/FlinkCoder.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/types/InspectableByteArrayOutputStream.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderComperator.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderTypeInformation.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/types/VoidCoderTypeSerializer.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/wrappers/CombineFnAggregatorWrapper.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataInputViewWrapper.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundWrapper.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointReader.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointUtils.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointWriter.java (100%) rename runners/flink/{runner => }/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateType.java (100%) rename runners/flink/{runner => }/src/main/resources/log4j.properties (100%) rename runners/flink/{runner => }/src/test/java/org/apache/beam/runners/flink/AvroITCase.java (100%) rename runners/flink/{runner => }/src/test/java/org/apache/beam/runners/flink/FlattenizeITCase.java (100%) rename runners/flink/{runner => }/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java (100%) rename runners/flink/{runner => }/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java (100%) rename runners/flink/{runner => }/src/test/java/org/apache/beam/runners/flink/JoinExamplesITCase.java (100%) rename runners/flink/{runner => }/src/test/java/org/apache/beam/runners/flink/MaybeEmptyTestITCase.java (100%) rename runners/flink/{runner => }/src/test/java/org/apache/beam/runners/flink/ParDoMultiOutputITCase.java (100%) rename runners/flink/{runner => }/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java (100%) rename runners/flink/{runner => }/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java (100%) rename runners/flink/{runner => }/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java (100%) rename runners/flink/{runner => }/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesEmptyITCase.java (100%) rename runners/flink/{runner => }/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesITCase.java (100%) rename runners/flink/{runner => }/src/test/java/org/apache/beam/runners/flink/SideInputITCase.java (100%) rename runners/flink/{runner => }/src/test/java/org/apache/beam/runners/flink/TfIdfITCase.java (100%) rename runners/flink/{runner => }/src/test/java/org/apache/beam/runners/flink/WordCountITCase.java (100%) rename runners/flink/{runner => }/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java (100%) rename runners/flink/{runner => }/src/test/java/org/apache/beam/runners/flink/WordCountJoin3ITCase.java (100%) rename runners/flink/{runner => }/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java (100%) rename runners/flink/{runner => }/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java (100%) rename runners/flink/{runner => }/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java (100%) rename runners/flink/{runner => }/src/test/java/org/apache/beam/runners/flink/streaming/StateSerializationTest.java (100%) rename runners/flink/{runner => }/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java (100%) rename runners/flink/{runner => }/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java (100%) rename runners/flink/{runner => }/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java (100%) rename runners/flink/{runner => }/src/test/java/org/apache/beam/runners/flink/util/JoinExamples.java (100%) rename runners/flink/{runner => }/src/test/resources/log4j-test.properties (100%) diff --git a/runners/flink/examples/pom.xml b/runners/flink/examples/pom.xml deleted file mode 100644 index e5bab3e9c03e6..0000000000000 --- a/runners/flink/examples/pom.xml +++ /dev/null @@ -1,109 +0,0 @@ - - - - - 4.0.0 - - - org.apache.beam - flink-runner-parent - 0.1.0-incubating-SNAPSHOT - ../pom.xml - - - flink-runner-examples_2.10 - - Apache Beam :: Runners :: Flink :: Examples - - jar - - - - org.apache.beam.runners.flink.examples.WordCount - kinglear.txt - wordcounts.txt - 1 - - - - - disable-runnable-on-service-tests - - true - - - - - org.apache.maven.plugins - maven-surefire-plugin - - - runnable-on-service-tests - - true - - - - - - - - - - - - - org.apache.beam - flink-runner_2.10 - ${project.version} - - - - - - - - - - org.codehaus.mojo - exec-maven-plugin - - java - - -classpath - - ${clazz} - --input=${input} - --output=${output} - --parallelism=${parallelism} - - - - - - - - - diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java deleted file mode 100644 index 0afde0a579781..0000000000000 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java +++ /dev/null @@ -1,454 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.examples; - -import org.apache.beam.runners.flink.FlinkPipelineOptions; -import org.apache.beam.runners.flink.FlinkPipelineRunner; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.KvCoder; -import org.apache.beam.sdk.coders.StringDelegateCoder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.GcsOptions; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.options.Validation; -import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.transforms.Keys; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.RemoveDuplicates; -import org.apache.beam.sdk.transforms.Values; -import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.transforms.WithKeys; -import org.apache.beam.sdk.transforms.join.CoGbkResult; -import org.apache.beam.sdk.transforms.join.CoGroupByKey; -import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; -import org.apache.beam.sdk.util.GcsUtil; -import org.apache.beam.sdk.util.gcsfs.GcsPath; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionList; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.PDone; -import org.apache.beam.sdk.values.PInput; -import org.apache.beam.sdk.values.TupleTag; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.File; -import java.io.IOException; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.HashSet; -import java.util.Set; - -/** - * An example that computes a basic TF-IDF search table for a directory or GCS prefix. - * - *

Concepts: joining data; side inputs; logging - * - *

To execute this pipeline locally, specify general pipeline configuration: - *

{@code
- *   --project=YOUR_PROJECT_ID
- * }
- * and a local output file or output prefix on GCS: - *
{@code
- *   --output=[YOUR_LOCAL_FILE | gs://YOUR_OUTPUT_PREFIX]
- * }
- * - *

To execute this pipeline using the Dataflow service, specify pipeline configuration: - *

{@code
- *   --project=YOUR_PROJECT_ID
- *   --stagingLocation=gs://YOUR_STAGING_DIRECTORY
- *   --runner=BlockingDataflowPipelineRunner
- * and an output prefix on GCS:
- *   --output=gs://YOUR_OUTPUT_PREFIX
- * }
- * - *

The default input is {@code gs://dataflow-samples/shakespeare/} and can be overridden with - * {@code --input}. - */ -public class TFIDF { - /** - * Options supported by {@link TFIDF}. - *

- * Inherits standard configuration options. - */ - private interface Options extends PipelineOptions, FlinkPipelineOptions { - @Description("Path to the directory or GCS prefix containing files to read from") - @Default.String("gs://dataflow-samples/shakespeare/") - String getInput(); - void setInput(String value); - - @Description("Prefix of output URI to write to") - @Validation.Required - String getOutput(); - void setOutput(String value); - } - - /** - * Lists documents contained beneath the {@code options.input} prefix/directory. - */ - public static Set listInputDocuments(Options options) - throws URISyntaxException, IOException { - URI baseUri = new URI(options.getInput()); - - // List all documents in the directory or GCS prefix. - URI absoluteUri; - if (baseUri.getScheme() != null) { - absoluteUri = baseUri; - } else { - absoluteUri = new URI( - "file", - baseUri.getAuthority(), - baseUri.getPath(), - baseUri.getQuery(), - baseUri.getFragment()); - } - - Set uris = new HashSet<>(); - if (absoluteUri.getScheme().equals("file")) { - File directory = new File(absoluteUri); - for (String entry : directory.list()) { - File path = new File(directory, entry); - uris.add(path.toURI()); - } - } else if (absoluteUri.getScheme().equals("gs")) { - GcsUtil gcsUtil = options.as(GcsOptions.class).getGcsUtil(); - URI gcsUriGlob = new URI( - absoluteUri.getScheme(), - absoluteUri.getAuthority(), - absoluteUri.getPath() + "*", - absoluteUri.getQuery(), - absoluteUri.getFragment()); - for (GcsPath entry : gcsUtil.expand(GcsPath.fromUri(gcsUriGlob))) { - uris.add(entry.toUri()); - } - } - - return uris; - } - - /** - * Reads the documents at the provided uris and returns all lines - * from the documents tagged with which document they are from. - */ - public static class ReadDocuments - extends PTransform>> { - private static final long serialVersionUID = 0; - - private Iterable uris; - - public ReadDocuments(Iterable uris) { - this.uris = uris; - } - - @Override - public Coder getDefaultOutputCoder() { - return KvCoder.of(StringDelegateCoder.of(URI.class), StringUtf8Coder.of()); - } - - @Override - public PCollection> apply(PInput input) { - Pipeline pipeline = input.getPipeline(); - - // Create one TextIO.Read transform for each document - // and add its output to a PCollectionList - PCollectionList> urisToLines = - PCollectionList.empty(pipeline); - - // TextIO.Read supports: - // - file: URIs and paths locally - // - gs: URIs on the service - for (final URI uri : uris) { - String uriString; - if (uri.getScheme().equals("file")) { - uriString = new File(uri).getPath(); - } else { - uriString = uri.toString(); - } - - PCollection> oneUriToLines = pipeline - .apply(TextIO.Read.from(uriString) - .named("TextIO.Read(" + uriString + ")")) - .apply("WithKeys(" + uriString + ")", WithKeys.of(uri)); - - urisToLines = urisToLines.and(oneUriToLines); - } - - return urisToLines.apply(Flatten.>pCollections()); - } - } - - /** - * A transform containing a basic TF-IDF pipeline. The input consists of KV objects - * where the key is the document's URI and the value is a piece - * of the document's content. The output is mapping from terms to - * scores for each document URI. - */ - public static class ComputeTfIdf - extends PTransform>, PCollection>>> { - private static final long serialVersionUID = 0; - - public ComputeTfIdf() { } - - @Override - public PCollection>> apply( - PCollection> uriToContent) { - - // Compute the total number of documents, and - // prepare this singleton PCollectionView for - // use as a side input. - final PCollectionView totalDocuments = - uriToContent - .apply("GetURIs", Keys.create()) - .apply("RemoveDuplicateDocs", RemoveDuplicates.create()) - .apply(Count.globally()) - .apply(View.asSingleton()); - - // Create a collection of pairs mapping a URI to each - // of the words in the document associated with that that URI. - PCollection> uriToWords = uriToContent - .apply(ParDo.named("SplitWords").of( - new DoFn, KV>() { - private static final long serialVersionUID = 0; - - @Override - public void processElement(ProcessContext c) { - URI uri = c.element().getKey(); - String line = c.element().getValue(); - for (String word : line.split("\\W+")) { - // Log INFO messages when the word “love” is found. - if (word.toLowerCase().equals("love")) { - LOG.info("Found {}", word.toLowerCase()); - } - - if (!word.isEmpty()) { - c.output(KV.of(uri, word.toLowerCase())); - } - } - } - })); - - // Compute a mapping from each word to the total - // number of documents in which it appears. - PCollection> wordToDocCount = uriToWords - .apply("RemoveDuplicateWords", RemoveDuplicates.>create()) - .apply(Values.create()) - .apply("CountDocs", Count.perElement()); - - // Compute a mapping from each URI to the total - // number of words in the document associated with that URI. - PCollection> uriToWordTotal = uriToWords - .apply("GetURIs2", Keys.create()) - .apply("CountWords", Count.perElement()); - - // Count, for each (URI, word) pair, the number of - // occurrences of that word in the document associated - // with the URI. - PCollection, Long>> uriAndWordToCount = uriToWords - .apply("CountWordDocPairs", Count.>perElement()); - - // Adjust the above collection to a mapping from - // (URI, word) pairs to counts into an isomorphic mapping - // from URI to (word, count) pairs, to prepare for a join - // by the URI key. - PCollection>> uriToWordAndCount = uriAndWordToCount - .apply(ParDo.named("ShiftKeys").of( - new DoFn, Long>, KV>>() { - private static final long serialVersionUID = 0; - - @Override - public void processElement(ProcessContext c) { - URI uri = c.element().getKey().getKey(); - String word = c.element().getKey().getValue(); - Long occurrences = c.element().getValue(); - c.output(KV.of(uri, KV.of(word, occurrences))); - } - })); - - // Prepare to join the mapping of URI to (word, count) pairs with - // the mapping of URI to total word counts, by associating - // each of the input PCollection> with - // a tuple tag. Each input must have the same key type, URI - // in this case. The type parameter of the tuple tag matches - // the types of the values for each collection. - final TupleTag wordTotalsTag = new TupleTag<>(); - final TupleTag> wordCountsTag = new TupleTag<>(); - KeyedPCollectionTuple coGbkInput = KeyedPCollectionTuple - .of(wordTotalsTag, uriToWordTotal) - .and(wordCountsTag, uriToWordAndCount); - - // Perform a CoGroupByKey (a sort of pre-join) on the prepared - // inputs. This yields a mapping from URI to a CoGbkResult - // (CoGroupByKey Result). The CoGbkResult is a mapping - // from the above tuple tags to the values in each input - // associated with a particular URI. In this case, each - // KV group a URI with the total number of - // words in that document as well as all the (word, count) - // pairs for particular words. - PCollection> uriToWordAndCountAndTotal = coGbkInput - .apply("CoGroupByUri", CoGroupByKey.create()); - - // Compute a mapping from each word to a (URI, term frequency) - // pair for each URI. A word's term frequency for a document - // is simply the number of times that word occurs in the document - // divided by the total number of words in the document. - PCollection>> wordToUriAndTf = uriToWordAndCountAndTotal - .apply(ParDo.named("ComputeTermFrequencies").of( - new DoFn, KV>>() { - private static final long serialVersionUID = 0; - - @Override - public void processElement(ProcessContext c) { - URI uri = c.element().getKey(); - Long wordTotal = c.element().getValue().getOnly(wordTotalsTag); - - for (KV wordAndCount - : c.element().getValue().getAll(wordCountsTag)) { - String word = wordAndCount.getKey(); - Long wordCount = wordAndCount.getValue(); - Double termFrequency = wordCount.doubleValue() / wordTotal.doubleValue(); - c.output(KV.of(word, KV.of(uri, termFrequency))); - } - } - })); - - // Compute a mapping from each word to its document frequency. - // A word's document frequency in a corpus is the number of - // documents in which the word appears divided by the total - // number of documents in the corpus. Note how the total number of - // documents is passed as a side input; the same value is - // presented to each invocation of the DoFn. - PCollection> wordToDf = wordToDocCount - .apply(ParDo - .named("ComputeDocFrequencies") - .withSideInputs(totalDocuments) - .of(new DoFn, KV>() { - private static final long serialVersionUID = 0; - - @Override - public void processElement(ProcessContext c) { - String word = c.element().getKey(); - Long documentCount = c.element().getValue(); - Long documentTotal = c.sideInput(totalDocuments); - Double documentFrequency = documentCount.doubleValue() - / documentTotal.doubleValue(); - - c.output(KV.of(word, documentFrequency)); - } - })); - - // Join the term frequency and document frequency - // collections, each keyed on the word. - final TupleTag> tfTag = new TupleTag<>(); - final TupleTag dfTag = new TupleTag<>(); - PCollection> wordToUriAndTfAndDf = KeyedPCollectionTuple - .of(tfTag, wordToUriAndTf) - .and(dfTag, wordToDf) - .apply(CoGroupByKey.create()); - - // Compute a mapping from each word to a (URI, TF-IDF) score - // for each URI. There are a variety of definitions of TF-IDF - // ("term frequency - inverse document frequency") score; - // here we use a basic version that is the term frequency - // divided by the log of the document frequency. - - return wordToUriAndTfAndDf - .apply(ParDo.named("ComputeTfIdf").of( - new DoFn, KV>>() { - private static final long serialVersionUID1 = 0; - - @Override - public void processElement(ProcessContext c) { - String word = c.element().getKey(); - Double df = c.element().getValue().getOnly(dfTag); - - for (KV uriAndTf : c.element().getValue().getAll(tfTag)) { - URI uri = uriAndTf.getKey(); - Double tf = uriAndTf.getValue(); - Double tfIdf = tf * Math.log(1 / df); - c.output(KV.of(word, KV.of(uri, tfIdf))); - } - } - })); - } - - // Instantiate Logger. - // It is suggested that the user specify the class name of the containing class - // (in this case ComputeTfIdf). - private static final Logger LOG = LoggerFactory.getLogger(ComputeTfIdf.class); - } - - /** - * A {@link PTransform} to write, in CSV format, a mapping from term and URI - * to score. - */ - public static class WriteTfIdf - extends PTransform>>, PDone> { - private static final long serialVersionUID = 0; - - private String output; - - public WriteTfIdf(String output) { - this.output = output; - } - - @Override - public PDone apply(PCollection>> wordToUriAndTfIdf) { - return wordToUriAndTfIdf - .apply(ParDo.named("Format").of(new DoFn>, String>() { - private static final long serialVersionUID = 0; - - @Override - public void processElement(ProcessContext c) { - c.output(String.format("%s,\t%s,\t%f", - c.element().getKey(), - c.element().getValue().getKey(), - c.element().getValue().getValue())); - } - })) - .apply(TextIO.Write - .to(output) - .withSuffix(".csv")); - } - } - - public static void main(String[] args) throws Exception { - Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); - - options.setRunner(FlinkPipelineRunner.class); - - Pipeline pipeline = Pipeline.create(options); - pipeline.getCoderRegistry().registerCoder(URI.class, StringDelegateCoder.of(URI.class)); - - pipeline - .apply(new ReadDocuments(listInputDocuments(options))) - .apply(new ComputeTfIdf()) - .apply(new WriteTfIdf(options.getOutput())); - - pipeline.run(); - } -} diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java deleted file mode 100644 index 702fb63ec68d5..0000000000000 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java +++ /dev/null @@ -1,120 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.examples; - -import org.apache.beam.runners.flink.FlinkPipelineOptions; -import org.apache.beam.runners.flink.FlinkPipelineRunner; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.MapElements; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.SimpleFunction; -import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; - -public class WordCount { - - public static class ExtractWordsFn extends DoFn { - private final Aggregator emptyLines = - createAggregator("emptyLines", new Sum.SumLongFn()); - - @Override - public void processElement(ProcessContext c) { - if (c.element().trim().isEmpty()) { - emptyLines.addValue(1L); - } - - // Split the line into words. - String[] words = c.element().split("[^a-zA-Z']+"); - - // Output each word encountered into the output PCollection. - for (String word : words) { - if (!word.isEmpty()) { - c.output(word); - } - } - } - } - - public static class CountWords extends PTransform, - PCollection>> { - @Override - public PCollection> apply(PCollection lines) { - - // Convert lines of text into individual words. - PCollection words = lines.apply( - ParDo.of(new ExtractWordsFn())); - - // Count the number of times each word occurs. - PCollection> wordCounts = - words.apply(Count.perElement()); - - return wordCounts; - } - } - - /** A SimpleFunction that converts a Word and Count into a printable string. */ - public static class FormatAsTextFn extends SimpleFunction, String> { - @Override - public String apply(KV input) { - return input.getKey() + ": " + input.getValue(); - } - } - - /** - * Options supported by {@link WordCount}. - *

- * Inherits standard configuration options. - */ - public interface Options extends PipelineOptions, FlinkPipelineOptions { - @Description("Path of the file to read from") - @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") - String getInput(); - void setInput(String value); - - @Description("Path of the file to write to") - String getOutput(); - void setOutput(String value); - } - - public static void main(String[] args) { - - Options options = PipelineOptionsFactory.fromArgs(args).withValidation() - .as(Options.class); - options.setRunner(FlinkPipelineRunner.class); - - Pipeline p = Pipeline.create(options); - - p.apply(TextIO.Read.named("ReadLines").from(options.getInput())) - .apply(new CountWords()) - .apply(MapElements.via(new FormatAsTextFn())) - .apply(TextIO.Write.named("WriteCounts").to(options.getOutput())); - - p.run(); - } - -} diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java deleted file mode 100644 index 9d1168ba40eef..0000000000000 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java +++ /dev/null @@ -1,403 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.examples.streaming; - -import org.apache.beam.runners.flink.FlinkPipelineRunner; -import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSocketSource; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.AvroCoder; -import org.apache.beam.sdk.coders.DefaultCoder; -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.Filter; -import org.apache.beam.sdk.transforms.Flatten; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.Partition; -import org.apache.beam.sdk.transforms.Partition.PartitionFn; -import org.apache.beam.sdk.transforms.SerializableFunction; -import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.transforms.Top; -import org.apache.beam.sdk.transforms.windowing.AfterWatermark; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionList; - -import org.joda.time.Duration; - -import java.io.IOException; -import java.util.List; - -/** - * To run the example, first open a socket on a terminal by executing the command: - *

  • - *
  • - * nc -lk 9999 - *
  • - * - * and then launch the example. Now whatever you type in the terminal is going to be - * the input to the program. - * */ -public class AutoComplete { - - /** - * A PTransform that takes as input a list of tokens and returns - * the most common tokens per prefix. - */ - public static class ComputeTopCompletions - extends PTransform, PCollection>>> { - private static final long serialVersionUID = 0; - - private final int candidatesPerPrefix; - private final boolean recursive; - - protected ComputeTopCompletions(int candidatesPerPrefix, boolean recursive) { - this.candidatesPerPrefix = candidatesPerPrefix; - this.recursive = recursive; - } - - public static ComputeTopCompletions top(int candidatesPerPrefix, boolean recursive) { - return new ComputeTopCompletions(candidatesPerPrefix, recursive); - } - - @Override - public PCollection>> apply(PCollection input) { - PCollection candidates = input - // First count how often each token appears. - .apply(new Count.PerElement()) - - // Map the KV outputs of Count into our own CompletionCandiate class. - .apply(ParDo.named("CreateCompletionCandidates").of( - new DoFn, CompletionCandidate>() { - private static final long serialVersionUID = 0; - - @Override - public void processElement(ProcessContext c) { - CompletionCandidate cand = new CompletionCandidate(c.element().getKey(), c.element().getValue()); - c.output(cand); - } - })); - - // Compute the top via either a flat or recursive algorithm. - if (recursive) { - return candidates - .apply(new ComputeTopRecursive(candidatesPerPrefix, 1)) - .apply(Flatten.>>pCollections()); - } else { - return candidates - .apply(new ComputeTopFlat(candidatesPerPrefix, 1)); - } - } - } - - /** - * Lower latency, but more expensive. - */ - private static class ComputeTopFlat - extends PTransform, - PCollection>>> { - private static final long serialVersionUID = 0; - - private final int candidatesPerPrefix; - private final int minPrefix; - - public ComputeTopFlat(int candidatesPerPrefix, int minPrefix) { - this.candidatesPerPrefix = candidatesPerPrefix; - this.minPrefix = minPrefix; - } - - @Override - public PCollection>> apply( - PCollection input) { - return input - // For each completion candidate, map it to all prefixes. - .apply(ParDo.of(new AllPrefixes(minPrefix))) - - // Find and return the top candiates for each prefix. - .apply(Top.largestPerKey(candidatesPerPrefix) - .withHotKeyFanout(new HotKeyFanout())); - } - - private static class HotKeyFanout implements SerializableFunction { - private static final long serialVersionUID = 0; - - @Override - public Integer apply(String input) { - return (int) Math.pow(4, 5 - input.length()); - } - } - } - - /** - * Cheaper but higher latency. - * - *

    Returns two PCollections, the first is top prefixes of size greater - * than minPrefix, and the second is top prefixes of size exactly - * minPrefix. - */ - private static class ComputeTopRecursive - extends PTransform, - PCollectionList>>> { - private static final long serialVersionUID = 0; - - private final int candidatesPerPrefix; - private final int minPrefix; - - public ComputeTopRecursive(int candidatesPerPrefix, int minPrefix) { - this.candidatesPerPrefix = candidatesPerPrefix; - this.minPrefix = minPrefix; - } - - private class KeySizePartitionFn implements PartitionFn>> { - private static final long serialVersionUID = 0; - - @Override - public int partitionFor(KV> elem, int numPartitions) { - return elem.getKey().length() > minPrefix ? 0 : 1; - } - } - - private static class FlattenTops - extends DoFn>, CompletionCandidate> { - private static final long serialVersionUID = 0; - - @Override - public void processElement(ProcessContext c) { - for (CompletionCandidate cc : c.element().getValue()) { - c.output(cc); - } - } - } - - @Override - public PCollectionList>> apply( - PCollection input) { - if (minPrefix > 10) { - // Base case, partitioning to return the output in the expected format. - return input - .apply(new ComputeTopFlat(candidatesPerPrefix, minPrefix)) - .apply(Partition.of(2, new KeySizePartitionFn())); - } else { - // If a candidate is in the top N for prefix a...b, it must also be in the top - // N for a...bX for every X, which is typlically a much smaller set to consider. - // First, compute the top candidate for prefixes of size at least minPrefix + 1. - PCollectionList>> larger = input - .apply(new ComputeTopRecursive(candidatesPerPrefix, minPrefix + 1)); - // Consider the top candidates for each prefix of length minPrefix + 1... - PCollection>> small = - PCollectionList - .of(larger.get(1).apply(ParDo.of(new FlattenTops()))) - // ...together with those (previously excluded) candidates of length - // exactly minPrefix... - .and(input.apply(Filter.by(new SerializableFunction() { - private static final long serialVersionUID = 0; - - @Override - public Boolean apply(CompletionCandidate c) { - return c.getValue().length() == minPrefix; - } - }))) - .apply("FlattenSmall", Flatten.pCollections()) - // ...set the key to be the minPrefix-length prefix... - .apply(ParDo.of(new AllPrefixes(minPrefix, minPrefix))) - // ...and (re)apply the Top operator to all of them together. - .apply(Top.largestPerKey(candidatesPerPrefix)); - - PCollection>> flattenLarger = larger - .apply("FlattenLarge", Flatten.>>pCollections()); - - return PCollectionList.of(flattenLarger).and(small); - } - } - } - - /** - * A DoFn that keys each candidate by all its prefixes. - */ - private static class AllPrefixes - extends DoFn> { - private static final long serialVersionUID = 0; - - private final int minPrefix; - private final int maxPrefix; - public AllPrefixes(int minPrefix) { - this(minPrefix, Integer.MAX_VALUE); - } - public AllPrefixes(int minPrefix, int maxPrefix) { - this.minPrefix = minPrefix; - this.maxPrefix = maxPrefix; - } - @Override - public void processElement(ProcessContext c) { - String word = c.element().value; - for (int i = minPrefix; i <= Math.min(word.length(), maxPrefix); i++) { - KV kv = KV.of(word.substring(0, i), c.element()); - c.output(kv); - } - } - } - - /** - * Class used to store tag-count pairs. - */ - @DefaultCoder(AvroCoder.class) - static class CompletionCandidate implements Comparable { - private long count; - private String value; - - public CompletionCandidate(String value, long count) { - this.value = value; - this.count = count; - } - - public String getValue() { - return value; - } - - // Empty constructor required for Avro decoding. - @SuppressWarnings("unused") - public CompletionCandidate() {} - - @Override - public int compareTo(CompletionCandidate o) { - if (this.count < o.count) { - return -1; - } else if (this.count == o.count) { - return this.value.compareTo(o.value); - } else { - return 1; - } - } - - @Override - public boolean equals(Object other) { - if (other instanceof CompletionCandidate) { - CompletionCandidate that = (CompletionCandidate) other; - return this.count == that.count && this.value.equals(that.value); - } else { - return false; - } - } - - @Override - public int hashCode() { - return Long.valueOf(count).hashCode() ^ value.hashCode(); - } - - @Override - public String toString() { - return "CompletionCandidate[" + value + ", " + count + "]"; - } - } - - static class ExtractWordsFn extends DoFn { - private final Aggregator emptyLines = - createAggregator("emptyLines", new Sum.SumLongFn()); - - @Override - public void processElement(ProcessContext c) { - if (c.element().trim().isEmpty()) { - emptyLines.addValue(1L); - } - - // Split the line into words. - String[] words = c.element().split("[^a-zA-Z']+"); - - // Output each word encountered into the output PCollection. - for (String word : words) { - if (!word.isEmpty()) { - c.output(word); - } - } - } - } - - /** - * Takes as input a the top candidates per prefix, and emits an entity - * suitable for writing to Datastore. - */ - static class FormatForPerTaskLocalFile extends DoFn>, String> - implements DoFn.RequiresWindowAccess{ - - private static final long serialVersionUID = 0; - - @Override - public void processElement(ProcessContext c) { - StringBuilder str = new StringBuilder(); - KV> elem = c.element(); - - str.append(elem.getKey() +" @ "+ c.window() +" -> "); - for(CompletionCandidate cand: elem.getValue()) { - str.append(cand.toString() + " "); - } - System.out.println(str.toString()); - c.output(str.toString()); - } - } - - /** - * Options supported by this class. - * - *

    Inherits standard Dataflow configuration options. - */ - private interface Options extends WindowedWordCount.StreamingWordCountOptions { - @Description("Whether to use the recursive algorithm") - @Default.Boolean(true) - Boolean getRecursive(); - void setRecursive(Boolean value); - } - - public static void main(String[] args) throws IOException { - Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); - options.setStreaming(true); - options.setCheckpointingInterval(1000L); - options.setNumberOfExecutionRetries(5); - options.setExecutionRetryDelay(3000L); - options.setRunner(FlinkPipelineRunner.class); - - PTransform> readSource = - Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("WordStream"); - WindowFn windowFn = FixedWindows.of(Duration.standardSeconds(options.getWindowSize())); - - // Create the pipeline. - Pipeline p = Pipeline.create(options); - PCollection>> toWrite = p - .apply(readSource) - .apply(ParDo.of(new ExtractWordsFn())) - .apply(Window.into(windowFn) - .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) - .discardingFiredPanes()) - .apply(ComputeTopCompletions.top(10, options.getRecursive())); - - toWrite - .apply(ParDo.named("FormatForPerTaskFile").of(new FormatForPerTaskLocalFile())) - .apply(TextIO.Write.to("./outputAutoComplete.txt")); - - p.run(); - } -} diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java deleted file mode 100644 index d3e963d91255a..0000000000000 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java +++ /dev/null @@ -1,163 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.examples.streaming; - -import org.apache.beam.runners.flink.FlinkPipelineRunner; -import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSocketSource; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.join.CoGbkResult; -import org.apache.beam.sdk.transforms.join.CoGroupByKey; -import org.apache.beam.sdk.transforms.join.KeyedPCollectionTuple; -import org.apache.beam.sdk.transforms.windowing.AfterWatermark; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.transforms.windowing.WindowFn; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PBegin; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.TupleTag; - -import org.joda.time.Duration; - -/** - * To run the example, first open two sockets on two terminals by executing the commands: - *

  • - *
  • - * nc -lk 9999, and - *
  • - *
  • - * nc -lk 9998 - *
  • - * - * and then launch the example. Now whatever you type in the terminal is going to be - * the input to the program. - * */ -public class JoinExamples { - - static PCollection joinEvents(PCollection streamA, - PCollection streamB) throws Exception { - - final TupleTag firstInfoTag = new TupleTag<>(); - final TupleTag secondInfoTag = new TupleTag<>(); - - // transform both input collections to tuple collections, where the keys are country - // codes in both cases. - PCollection> firstInfo = streamA.apply( - ParDo.of(new ExtractEventDataFn())); - PCollection> secondInfo = streamB.apply( - ParDo.of(new ExtractEventDataFn())); - - // country code 'key' -> CGBKR (, ) - PCollection> kvpCollection = KeyedPCollectionTuple - .of(firstInfoTag, firstInfo) - .and(secondInfoTag, secondInfo) - .apply(CoGroupByKey.create()); - - // Process the CoGbkResult elements generated by the CoGroupByKey transform. - // country code 'key' -> string of , - PCollection> finalResultCollection = - kvpCollection.apply(ParDo.named("Process").of( - new DoFn, KV>() { - private static final long serialVersionUID = 0; - - @Override - public void processElement(ProcessContext c) { - KV e = c.element(); - String key = e.getKey(); - - String defaultA = "NO_VALUE"; - - // the following getOnly is a bit tricky because it expects to have - // EXACTLY ONE value in the corresponding stream and for the corresponding key. - - String lineA = e.getValue().getOnly(firstInfoTag, defaultA); - for (String lineB : c.element().getValue().getAll(secondInfoTag)) { - // Generate a string that combines information from both collection values - c.output(KV.of(key, "Value A: " + lineA + " - Value B: " + lineB)); - } - } - })); - - return finalResultCollection - .apply(ParDo.named("Format").of(new DoFn, String>() { - private static final long serialVersionUID = 0; - - @Override - public void processElement(ProcessContext c) { - String result = c.element().getKey() + " -> " + c.element().getValue(); - System.out.println(result); - c.output(result); - } - })); - } - - static class ExtractEventDataFn extends DoFn> { - private static final long serialVersionUID = 0; - - @Override - public void processElement(ProcessContext c) { - String line = c.element().toLowerCase(); - String key = line.split("\\s")[0]; - c.output(KV.of(key, line)); - } - } - - private interface Options extends WindowedWordCount.StreamingWordCountOptions { - - } - - public static void main(String[] args) throws Exception { - Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class); - options.setStreaming(true); - options.setCheckpointingInterval(1000L); - options.setNumberOfExecutionRetries(5); - options.setExecutionRetryDelay(3000L); - options.setRunner(FlinkPipelineRunner.class); - - PTransform> readSourceA = - Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("FirstStream"); - PTransform> readSourceB = - Read.from(new UnboundedSocketSource<>("localhost", 9998, '\n', 3)).named("SecondStream"); - - WindowFn windowFn = FixedWindows.of(Duration.standardSeconds(options.getWindowSize())); - - Pipeline p = Pipeline.create(options); - - // the following two 'applys' create multiple inputs to our pipeline, one for each - // of our two input sources. - PCollection streamA = p.apply(readSourceA) - .apply(Window.into(windowFn) - .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) - .discardingFiredPanes()); - PCollection streamB = p.apply(readSourceB) - .apply(Window.into(windowFn) - .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) - .discardingFiredPanes()); - - PCollection formattedResults = joinEvents(streamA, streamB); - formattedResults.apply(TextIO.Write.to("./outputJoin.txt")); - p.run(); - } - -} diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java deleted file mode 100644 index af6bb351f1ee9..0000000000000 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaIOExamples.java +++ /dev/null @@ -1,337 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.examples.streaming; - -import org.apache.beam.runners.flink.FlinkPipelineOptions; -import org.apache.beam.runners.flink.FlinkPipelineRunner; -import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSink; -import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.coders.AvroCoder; -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.io.Write; -import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.PCollection; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; -import org.apache.flink.streaming.util.serialization.SerializationSchema; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; - -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.io.Serializable; -import java.util.Properties; - -/** - * Recipes/Examples that demonstrate how to read/write data from/to Kafka. - */ -public class KafkaIOExamples { - - - private static final String KAFKA_TOPIC = "input"; // Default kafka topic to read from - private static final String KAFKA_AVRO_TOPIC = "output"; // Default kafka topic to read from - private static final String KAFKA_BROKER = "localhost:9092"; // Default kafka broker to contact - private static final String GROUP_ID = "myGroup"; // Default groupId - private static final String ZOOKEEPER = "localhost:2181"; // Default zookeeper to connect to for Kafka - - /** - * Read/Write String data to Kafka - */ - public static class KafkaString { - - /** - * Read String data from Kafka - */ - public static class ReadStringFromKafka { - - public static void main(String[] args) { - - Pipeline p = initializePipeline(args); - KafkaOptions options = getOptions(p); - - FlinkKafkaConsumer08 kafkaConsumer = - new FlinkKafkaConsumer08<>(options.getKafkaTopic(), - new SimpleStringSchema(), getKafkaProps(options)); - - p - .apply(Read.from(UnboundedFlinkSource.of(kafkaConsumer))) - .apply(ParDo.of(new PrintFn<>())); - - p.run(); - - } - - } - - /** - * Write String data to Kafka - */ - public static class WriteStringToKafka { - - public static void main(String[] args) { - - Pipeline p = initializePipeline(args); - KafkaOptions options = getOptions(p); - - PCollection words = - p.apply(Create.of("These", "are", "some", "words")); - - FlinkKafkaProducer08 kafkaSink = - new FlinkKafkaProducer08<>(options.getKafkaTopic(), - new SimpleStringSchema(), getKafkaProps(options)); - - words.apply(Write.to(UnboundedFlinkSink.of(kafkaSink))); - - p.run(); - } - - } - } - - /** - * Read/Write Avro data to Kafka - */ - public static class KafkaAvro { - - /** - * Read Avro data from Kafka - */ - public static class ReadAvroFromKafka { - - public static void main(String[] args) { - - Pipeline p = initializePipeline(args); - KafkaOptions options = getOptions(p); - - FlinkKafkaConsumer08 kafkaConsumer = - new FlinkKafkaConsumer08<>(options.getKafkaAvroTopic(), - new AvroSerializationDeserializationSchema<>(MyType.class), getKafkaProps(options)); - - p - .apply(Read.from(UnboundedFlinkSource.of(kafkaConsumer))) - .apply(ParDo.of(new PrintFn<>())); - - p.run(); - - } - - } - - /** - * Write Avro data to Kafka - */ - public static class WriteAvroToKafka { - - public static void main(String[] args) { - - Pipeline p = initializePipeline(args); - KafkaOptions options = getOptions(p); - - PCollection words = - p.apply(Create.of( - new MyType("word", 1L), - new MyType("another", 2L), - new MyType("yet another", 3L))); - - FlinkKafkaProducer08 kafkaSink = - new FlinkKafkaProducer08<>(options.getKafkaAvroTopic(), - new AvroSerializationDeserializationSchema<>(MyType.class), getKafkaProps(options)); - - words.apply(Write.to(UnboundedFlinkSink.of(kafkaSink))); - - p.run(); - - } - } - - /** - * Serialiation/Deserialiation schema for Avro types - * @param - */ - static class AvroSerializationDeserializationSchema - implements SerializationSchema, DeserializationSchema { - - private final Class avroType; - - private final AvroCoder coder; - private transient ByteArrayOutputStream out; - - AvroSerializationDeserializationSchema(Class clazz) { - this.avroType = clazz; - this.coder = AvroCoder.of(clazz); - this.out = new ByteArrayOutputStream(); - } - - @Override - public byte[] serialize(T element) { - if (out == null) { - out = new ByteArrayOutputStream(); - } - try { - out.reset(); - coder.encode(element, out, Coder.Context.NESTED); - } catch (IOException e) { - throw new RuntimeException("Avro encoding failed.", e); - } - return out.toByteArray(); - } - - @Override - public T deserialize(byte[] message) throws IOException { - return coder.decode(new ByteArrayInputStream(message), Coder.Context.NESTED); - } - - @Override - public boolean isEndOfStream(T nextElement) { - return false; - } - - @Override - public TypeInformation getProducedType() { - return TypeExtractor.getForClass(avroType); - } - } - - /** - * Custom type for Avro serialization - */ - static class MyType implements Serializable { - - public MyType() {} - - MyType(String word, long count) { - this.word = word; - this.count = count; - } - - String word; - long count; - - @Override - public String toString() { - return "MyType{" + - "word='" + word + '\'' + - ", count=" + count + - '}'; - } - } - } - - // -------------- Utilities -------------- - - /** - * Custom options for the Pipeline - */ - public interface KafkaOptions extends FlinkPipelineOptions { - @Description("The Kafka topic to read from") - @Default.String(KAFKA_TOPIC) - String getKafkaTopic(); - - void setKafkaTopic(String value); - - void setKafkaAvroTopic(String value); - - @Description("The Kafka topic to read from") - @Default.String(KAFKA_AVRO_TOPIC) - String getKafkaAvroTopic(); - - @Description("The Kafka Broker to read from") - @Default.String(KAFKA_BROKER) - String getBroker(); - - void setBroker(String value); - - @Description("The Zookeeper server to connect to") - @Default.String(ZOOKEEPER) - String getZookeeper(); - - void setZookeeper(String value); - - @Description("The groupId") - @Default.String(GROUP_ID) - String getGroup(); - - void setGroup(String value); - } - - /** - * Initializes some options for the Flink runner - * @param args The command line args - * @return the pipeline - */ - private static Pipeline initializePipeline(String[] args) { - KafkaOptions options = - PipelineOptionsFactory.fromArgs(args).as(KafkaOptions.class); - - options.setStreaming(true); - options.setRunner(FlinkPipelineRunner.class); - - options.setCheckpointingInterval(1000L); - options.setNumberOfExecutionRetries(5); - options.setExecutionRetryDelay(3000L); - - return Pipeline.create(options); - } - - /** - * Gets KafkaOptions from the Pipeline - * @param p the pipeline - * @return KafkaOptions - */ - private static KafkaOptions getOptions(Pipeline p) { - return p.getOptions().as(KafkaOptions.class); - } - - /** - * Helper method to set the Kafka props from the pipeline options. - * @param options KafkaOptions - * @return Kafka props - */ - private static Properties getKafkaProps(KafkaOptions options) { - - Properties props = new Properties(); - props.setProperty("zookeeper.connect", options.getZookeeper()); - props.setProperty("bootstrap.servers", options.getBroker()); - props.setProperty("group.id", options.getGroup()); - - return props; - } - - /** - * Print contents to stdout - * @param type of the input - */ - private static class PrintFn extends DoFn { - - @Override - public void processElement(ProcessContext c) throws Exception { - System.out.println(c.element().toString()); - } - } - -} diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java deleted file mode 100644 index abb9feaaac717..0000000000000 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java +++ /dev/null @@ -1,149 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.examples.streaming; - -import org.apache.beam.runners.flink.FlinkPipelineRunner; -import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedFlinkSource; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.transforms.windowing.AfterWatermark; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; - -import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; -import org.joda.time.Duration; - -import java.util.Properties; - -public class KafkaWindowedWordCountExample { - - static final String KAFKA_TOPIC = "test"; // Default kafka topic to read from - static final String KAFKA_BROKER = "localhost:9092"; // Default kafka broker to contact - static final String GROUP_ID = "myGroup"; // Default groupId - static final String ZOOKEEPER = "localhost:2181"; // Default zookeeper to connect to for Kafka - - public static class ExtractWordsFn extends DoFn { - private final Aggregator emptyLines = - createAggregator("emptyLines", new Sum.SumLongFn()); - - @Override - public void processElement(ProcessContext c) { - if (c.element().trim().isEmpty()) { - emptyLines.addValue(1L); - } - - // Split the line into words. - String[] words = c.element().split("[^a-zA-Z']+"); - - // Output each word encountered into the output PCollection. - for (String word : words) { - if (!word.isEmpty()) { - c.output(word); - } - } - } - } - - public static class FormatAsStringFn extends DoFn, String> { - @Override - public void processElement(ProcessContext c) { - String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString(); - System.out.println(row); - c.output(row); - } - } - - public interface KafkaStreamingWordCountOptions extends WindowedWordCount.StreamingWordCountOptions { - @Description("The Kafka topic to read from") - @Default.String(KAFKA_TOPIC) - String getKafkaTopic(); - - void setKafkaTopic(String value); - - @Description("The Kafka Broker to read from") - @Default.String(KAFKA_BROKER) - String getBroker(); - - void setBroker(String value); - - @Description("The Zookeeper server to connect to") - @Default.String(ZOOKEEPER) - String getZookeeper(); - - void setZookeeper(String value); - - @Description("The groupId") - @Default.String(GROUP_ID) - String getGroup(); - - void setGroup(String value); - - } - - public static void main(String[] args) { - PipelineOptionsFactory.register(KafkaStreamingWordCountOptions.class); - KafkaStreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args).as(KafkaStreamingWordCountOptions.class); - options.setJobName("KafkaExample - WindowSize: " + options.getWindowSize() + " seconds"); - options.setStreaming(true); - options.setCheckpointingInterval(1000L); - options.setNumberOfExecutionRetries(5); - options.setExecutionRetryDelay(3000L); - options.setRunner(FlinkPipelineRunner.class); - - System.out.println(options.getKafkaTopic() +" "+ options.getZookeeper() +" "+ options.getBroker() +" "+ options.getGroup() ); - Pipeline pipeline = Pipeline.create(options); - - Properties p = new Properties(); - p.setProperty("zookeeper.connect", options.getZookeeper()); - p.setProperty("bootstrap.servers", options.getBroker()); - p.setProperty("group.id", options.getGroup()); - - // this is the Flink consumer that reads the input to - // the program from a kafka topic. - FlinkKafkaConsumer08 kafkaConsumer = new FlinkKafkaConsumer08<>( - options.getKafkaTopic(), - new SimpleStringSchema(), p); - - PCollection words = pipeline - .apply(Read.named("StreamingWordCount").from(UnboundedFlinkSource.of(kafkaConsumer))) - .apply(ParDo.of(new ExtractWordsFn())) - .apply(Window.into(FixedWindows.of(Duration.standardSeconds(options.getWindowSize()))) - .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) - .discardingFiredPanes()); - - PCollection> wordCounts = - words.apply(Count.perElement()); - - wordCounts.apply(ParDo.of(new FormatAsStringFn())) - .apply(TextIO.Write.to("./outputKafka.txt")); - - pipeline.run(); - } -} diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java deleted file mode 100644 index e803e6ed80ea7..0000000000000 --- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java +++ /dev/null @@ -1,137 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.examples.streaming; - -import org.apache.beam.runners.flink.FlinkPipelineRunner; -import org.apache.beam.runners.flink.translation.wrappers.streaming.io.UnboundedSocketSource; -import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.Read; -import org.apache.beam.sdk.io.TextIO; -import org.apache.beam.sdk.options.Default; -import org.apache.beam.sdk.options.Description; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Count; -import org.apache.beam.sdk.transforms.DoFn; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.transforms.windowing.AfterWatermark; -import org.apache.beam.sdk.transforms.windowing.SlidingWindows; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; - -import org.joda.time.Duration; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; - -/** - * To run the example, first open a socket on a terminal by executing the command: - *
  • - *
  • - * nc -lk 9999 - *
  • - * - * and then launch the example. Now whatever you type in the terminal is going to be - * the input to the program. - * */ -public class WindowedWordCount { - - private static final Logger LOG = LoggerFactory.getLogger(WindowedWordCount.class); - - static final long WINDOW_SIZE = 10; // Default window duration in seconds - static final long SLIDE_SIZE = 5; // Default window slide in seconds - - static class FormatAsStringFn extends DoFn, String> { - @Override - public void processElement(ProcessContext c) { - String row = c.element().getKey() + " - " + c.element().getValue() + " @ " + c.timestamp().toString(); - c.output(row); - } - } - - static class ExtractWordsFn extends DoFn { - private final Aggregator emptyLines = - createAggregator("emptyLines", new Sum.SumLongFn()); - - @Override - public void processElement(ProcessContext c) { - if (c.element().trim().isEmpty()) { - emptyLines.addValue(1L); - } - - // Split the line into words. - String[] words = c.element().split("[^a-zA-Z']+"); - - // Output each word encountered into the output PCollection. - for (String word : words) { - if (!word.isEmpty()) { - c.output(word); - } - } - } - } - - public interface StreamingWordCountOptions extends org.apache.beam.runners.flink.examples.WordCount.Options { - @Description("Sliding window duration, in seconds") - @Default.Long(WINDOW_SIZE) - Long getWindowSize(); - - void setWindowSize(Long value); - - @Description("Window slide, in seconds") - @Default.Long(SLIDE_SIZE) - Long getSlide(); - - void setSlide(Long value); - } - - public static void main(String[] args) throws IOException { - StreamingWordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(StreamingWordCountOptions.class); - options.setStreaming(true); - options.setWindowSize(10L); - options.setSlide(5L); - options.setCheckpointingInterval(1000L); - options.setNumberOfExecutionRetries(5); - options.setExecutionRetryDelay(3000L); - options.setRunner(FlinkPipelineRunner.class); - - LOG.info("Windpwed WordCount with Sliding Windows of " + options.getWindowSize() + - " sec. and a slide of " + options.getSlide()); - - Pipeline pipeline = Pipeline.create(options); - - PCollection words = pipeline - .apply(Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("StreamingWordCount")) - .apply(ParDo.of(new ExtractWordsFn())) - .apply(Window.into(SlidingWindows.of(Duration.standardSeconds(options.getWindowSize())) - .every(Duration.standardSeconds(options.getSlide()))) - .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO) - .discardingFiredPanes()); - - PCollection> wordCounts = - words.apply(Count.perElement()); - - wordCounts.apply(ParDo.of(new FormatAsStringFn())) - .apply(TextIO.Write.to("./outputWordCount.txt")); - - pipeline.run(); - } -} diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml index 5e05e6a09f20d..02aeffdd016d4 100644 --- a/runners/flink/pom.xml +++ b/runners/flink/pom.xml @@ -28,17 +28,11 @@ ../pom.xml - flink-runner-parent - 0.1.0-incubating-SNAPSHOT + flink-runner_2.10 Apache Beam :: Runners :: Flink - pom - - - runner - examples - + jar UTF-8 @@ -46,19 +40,119 @@ 1.0.0 - - - apache.snapshots - Apache Development Snapshot Repository - https://repository.apache.org/content/repositories/snapshots/ - - false - - - true - - - + + + disable-runnable-on-service-tests + + true + + + + + org.apache.maven.plugins + maven-surefire-plugin + + + runnable-on-service-tests + + true + + + + + + + + + + + + + org.apache.flink + flink-streaming-java_2.10 + ${flink.version} + + + org.apache.flink + flink-java + ${flink.version} + + + org.apache.flink + flink-clients_2.10 + ${flink.version} + + + + org.apache.flink + flink-connector-kafka-0.8_2.10 + ${flink.version} + + + org.apache.flink + flink-avro_2.10 + ${flink.version} + + + + org.apache.beam + java-sdk-all + + + org.slf4j + slf4j-jdk14 + + + + + + org.apache.beam + runners-core + + + org.slf4j + slf4j-jdk14 + + + + + + + org.apache.beam + java-examples-all + + + org.slf4j + slf4j-jdk14 + + + test + + + org.apache.flink + flink-streaming-java_2.10 + ${flink.version} + test + test-jar + + + org.apache.flink + flink-test-utils_2.10 + ${flink.version} + test + + + org.mockito + mockito-all + test + + + + com.google.auto.service + auto-service + true + + diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml deleted file mode 100644 index a53a386c28284..0000000000000 --- a/runners/flink/runner/pom.xml +++ /dev/null @@ -1,177 +0,0 @@ - - - - - 4.0.0 - - - org.apache.beam - flink-runner-parent - 0.1.0-incubating-SNAPSHOT - ../pom.xml - - - flink-runner_2.10 - - Apache Beam :: Runners :: Flink :: Core - - jar - - - - disable-runnable-on-service-tests - - true - - - - - org.apache.maven.plugins - maven-surefire-plugin - - - runnable-on-service-tests - - true - - - - - - - - - - - - - org.apache.flink - flink-streaming-java_2.10 - ${flink.version} - - - org.apache.flink - flink-java - ${flink.version} - - - org.apache.flink - flink-clients_2.10 - ${flink.version} - - - - org.apache.flink - flink-connector-kafka-0.8_2.10 - ${flink.version} - - - org.apache.flink - flink-avro_2.10 - ${flink.version} - - - - org.apache.beam - java-sdk-all - - - org.slf4j - slf4j-jdk14 - - - - - - org.apache.beam - runners-core - - - org.slf4j - slf4j-jdk14 - - - - - - - org.apache.beam - java-examples-all - - - org.slf4j - slf4j-jdk14 - - - test - - - org.apache.flink - flink-streaming-java_2.10 - ${flink.version} - test - test-jar - - - org.apache.flink - flink-test-utils_2.10 - ${flink.version} - test - - - org.mockito - mockito-all - test - - - - com.google.auto.service - auto-service - true - - - - - - - - - - - org.apache.maven.plugins - maven-failsafe-plugin - - - - - org.apache.maven.plugins - maven-surefire-plugin - - - - - - - diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/DefaultParallelismFactory.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineRunner.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/io/ConsoleIO.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingPipelineTranslator.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTranslationContext.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkCoGroupKeyedListAggregator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkCoGroupKeyedListAggregator.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkCoGroupKeyedListAggregator.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkCoGroupKeyedListAggregator.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkCreateFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkCreateFunction.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkCreateFunction.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkCreateFunction.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkKeyedListAggregationFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkKeyedListAggregationFunction.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkKeyedListAggregationFunction.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkKeyedListAggregationFunction.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputDoFnFunction.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMultiOutputPruningFunction.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/UnionCoder.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/UnionCoder.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/UnionCoder.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/UnionCoder.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderComparator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderComparator.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderComparator.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderComparator.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeInformation.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/FlinkCoder.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/FlinkCoder.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/FlinkCoder.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/FlinkCoder.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/InspectableByteArrayOutputStream.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/InspectableByteArrayOutputStream.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/InspectableByteArrayOutputStream.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/InspectableByteArrayOutputStream.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderComperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderComperator.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderComperator.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderComperator.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderTypeInformation.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderTypeInformation.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderTypeInformation.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/KvCoderTypeInformation.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/VoidCoderTypeSerializer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/VoidCoderTypeSerializer.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/VoidCoderTypeSerializer.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/types/VoidCoderTypeSerializer.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/CombineFnAggregatorWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/CombineFnAggregatorWrapper.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/CombineFnAggregatorWrapper.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/CombineFnAggregatorWrapper.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataInputViewWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataInputViewWrapper.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataInputViewWrapper.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataInputViewWrapper.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SerializableFnAggregatorWrapper.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SinkOutputFormat.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputSplit.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkAbstractParDoWrapper.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupByKeyWrapper.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundMultiWrapper.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundWrapper.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundWrapper.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkParDoBoundWrapper.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/AbstractFlinkTimerInternals.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/FlinkStateInternals.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointReader.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointReader.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointReader.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointReader.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointUtils.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointUtils.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointUtils.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointUtils.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointWriter.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointWriter.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointWriter.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateCheckpointWriter.java diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateType.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateType.java similarity index 100% rename from runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateType.java rename to runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/state/StateType.java diff --git a/runners/flink/runner/src/main/resources/log4j.properties b/runners/flink/src/main/resources/log4j.properties similarity index 100% rename from runners/flink/runner/src/main/resources/log4j.properties rename to runners/flink/src/main/resources/log4j.properties diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/AvroITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/AvroITCase.java similarity index 100% rename from runners/flink/runner/src/test/java/org/apache/beam/runners/flink/AvroITCase.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/AvroITCase.java diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlattenizeITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlattenizeITCase.java similarity index 100% rename from runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlattenizeITCase.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/FlattenizeITCase.java diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java similarity index 100% rename from runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkRunnerRegistrarTest.java diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java similarity index 100% rename from runners/flink/runner/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/FlinkTestPipeline.java diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/JoinExamplesITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/JoinExamplesITCase.java similarity index 100% rename from runners/flink/runner/src/test/java/org/apache/beam/runners/flink/JoinExamplesITCase.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/JoinExamplesITCase.java diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/MaybeEmptyTestITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/MaybeEmptyTestITCase.java similarity index 100% rename from runners/flink/runner/src/test/java/org/apache/beam/runners/flink/MaybeEmptyTestITCase.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/MaybeEmptyTestITCase.java diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ParDoMultiOutputITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/ParDoMultiOutputITCase.java similarity index 100% rename from runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ParDoMultiOutputITCase.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/ParDoMultiOutputITCase.java diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java similarity index 100% rename from runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java similarity index 100% rename from runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java similarity index 100% rename from runners/flink/runner/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesEmptyITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesEmptyITCase.java similarity index 100% rename from runners/flink/runner/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesEmptyITCase.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesEmptyITCase.java diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesITCase.java similarity index 100% rename from runners/flink/runner/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesITCase.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/RemoveDuplicatesITCase.java diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/SideInputITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/SideInputITCase.java similarity index 100% rename from runners/flink/runner/src/test/java/org/apache/beam/runners/flink/SideInputITCase.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/SideInputITCase.java diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/TfIdfITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/TfIdfITCase.java similarity index 100% rename from runners/flink/runner/src/test/java/org/apache/beam/runners/flink/TfIdfITCase.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/TfIdfITCase.java diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountITCase.java similarity index 100% rename from runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountITCase.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountITCase.java diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java similarity index 100% rename from runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountJoin2ITCase.java diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin3ITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountJoin3ITCase.java similarity index 100% rename from runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WordCountJoin3ITCase.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/WordCountJoin3ITCase.java diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java similarity index 100% rename from runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java similarity index 100% rename from runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java similarity index 100% rename from runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/StateSerializationTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StateSerializationTest.java similarity index 100% rename from runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/StateSerializationTest.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/StateSerializationTest.java diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java similarity index 100% rename from runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TestCountingSource.java diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java similarity index 100% rename from runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java similarity index 100% rename from runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/util/JoinExamples.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/util/JoinExamples.java similarity index 100% rename from runners/flink/runner/src/test/java/org/apache/beam/runners/flink/util/JoinExamples.java rename to runners/flink/src/test/java/org/apache/beam/runners/flink/util/JoinExamples.java diff --git a/runners/flink/runner/src/test/resources/log4j-test.properties b/runners/flink/src/test/resources/log4j-test.properties similarity index 100% rename from runners/flink/runner/src/test/resources/log4j-test.properties rename to runners/flink/src/test/resources/log4j-test.properties