From 8beab6746cbb6fd712b848b5008109d9a89d2b4b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-Baptiste=20Onofr=C3=A9?= Date: Thu, 25 Aug 2016 16:19:54 +0200 Subject: [PATCH] [BEAM-242] Enable checkstyle and fix checkstyle errors in Flink runner --- runners/flink/runner/pom.xml | 2 - .../FlinkPipelineExecutionEnvironment.java | 17 +- .../runners/flink/FlinkPipelineOptions.java | 30 ++-- .../runners/flink/FlinkRunnerRegistrar.java | 6 + .../beam/runners/flink/FlinkRunnerResult.java | 17 +- .../beam/runners/flink/TestFlinkRunner.java | 8 +- .../beam/runners/flink/package-info.java | 22 +++ .../FlinkBatchPipelineTranslator.java | 15 +- .../FlinkBatchTranslationContext.java | 10 +- .../translation/FlinkPipelineTranslator.java | 2 +- .../flink/translation/TranslationMode.java | 8 +- .../translation/functions/package-info.java | 22 +++ .../flink/translation/package-info.java | 22 +++ .../types/CoderTypeSerializer.java | 2 +- .../types/EncodedValueSerializer.java | 162 +++++++++--------- .../flink/translation/types/package-info.java | 22 +++ .../utils/SerializedPipelineOptions.java | 2 +- .../flink/translation/utils/package-info.java | 22 +++ .../wrappers/DataOutputViewWrapper.java | 2 +- .../translation/wrappers/package-info.java | 22 +++ .../wrappers/streaming/DoFnOperator.java | 2 +- .../streaming/SingletonKeyedWorkItem.java | 5 + .../SingletonKeyedWorkItemCoder.java | 14 +- .../streaming/WorkItemKeySelector.java | 3 +- .../streaming/io/UnboundedFlinkSink.java | 13 +- .../streaming/io/UnboundedFlinkSource.java | 29 ++-- .../streaming/io/UnboundedSocketSource.java | 46 +++-- .../wrappers/streaming/io/package-info.java | 22 +++ .../wrappers/streaming/package-info.java | 22 +++ .../runners/flink/PipelineOptionsTest.java | 3 + .../beam/runners/flink/WriteSinkITCase.java | 3 +- .../beam/runners/flink/package-info.java | 22 +++ .../streaming/FlinkStateInternalsTest.java | 3 +- .../flink/streaming/GroupByNullKeyTest.java | 6 + .../streaming/TopWikipediaSessionsITCase.java | 2 +- .../streaming/UnboundedSourceWrapperTest.java | 33 ++-- .../runners/flink/streaming/package-info.java | 22 +++ 37 files changed, 483 insertions(+), 182 deletions(-) create mode 100644 runners/flink/runner/src/main/java/org/apache/beam/runners/flink/package-info.java create mode 100644 runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/package-info.java create mode 100644 runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/package-info.java create mode 100644 runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/package-info.java create mode 100644 runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/package-info.java create mode 100644 runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/package-info.java create mode 100644 runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/package-info.java create mode 100644 runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/package-info.java create mode 100644 runners/flink/runner/src/test/java/org/apache/beam/runners/flink/package-info.java create mode 100644 runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/package-info.java diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml index 08adc607abf6..7c322804e5b9 100644 --- a/runners/flink/runner/pom.xml +++ b/runners/flink/runner/pom.xml @@ -234,12 +234,10 @@ - org.apache.maven.plugins diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java index d1977a445adf..a5d33b40b8f6 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineExecutionEnvironment.java @@ -37,14 +37,15 @@ /** * The class that instantiates and manages the execution of a given job. * Depending on if the job is a Streaming or Batch processing one, it creates - * the adequate execution environment ({@link ExecutionEnvironment} or {@link StreamExecutionEnvironment}), - * the necessary {@link FlinkPipelineTranslator} ({@link FlinkBatchPipelineTranslator} or - * {@link FlinkStreamingPipelineTranslator}) to transform the Beam job into a Flink one, and - * executes the (translated) job. + * the adequate execution environment ({@link ExecutionEnvironment} + * or {@link StreamExecutionEnvironment}), the necessary {@link FlinkPipelineTranslator} + * ({@link FlinkBatchPipelineTranslator} or {@link FlinkStreamingPipelineTranslator}) to + * transform the Beam job into a Flink one, and executes the (translated) job. */ public class FlinkPipelineExecutionEnvironment { - private static final Logger LOG = LoggerFactory.getLogger(FlinkPipelineExecutionEnvironment.class); + private static final Logger LOG = + LoggerFactory.getLogger(FlinkPipelineExecutionEnvironment.class); private final FlinkPipelineOptions options; @@ -79,8 +80,8 @@ public class FlinkPipelineExecutionEnvironment { * Depending on if the job is a Streaming or a Batch one, this method creates * the necessary execution environment and pipeline translator, and translates * the {@link org.apache.beam.sdk.values.PCollection} program into - * a {@link org.apache.flink.api.java.DataSet} or {@link org.apache.flink.streaming.api.datastream.DataStream} - * one. + * a {@link org.apache.flink.api.java.DataSet} + * or {@link org.apache.flink.streaming.api.datastream.DataStream} one. * */ public void translate(Pipeline pipeline) { this.flinkBatchEnv = null; @@ -213,7 +214,7 @@ private StreamExecutionEnvironment createStreamExecutionEnvironment() { // If the value is not -1, then the validity checks are applied. // By default, checkpointing is disabled. long checkpointInterval = options.getCheckpointingInterval(); - if(checkpointInterval != -1) { + if (checkpointInterval != -1) { if (checkpointInterval < 1) { throw new IllegalArgumentException("The checkpoint interval must be positive"); } diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java index 6d1a8d040f1a..bd3cc2546509 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -34,7 +34,8 @@ /** * Options which can be used to configure a Flink PipelineRunner. */ -public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOptions, StreamingOptions { +public interface FlinkPipelineOptions + extends PipelineOptions, ApplicationNameOptions, StreamingOptions { /** * List of local files to make available to workers. @@ -43,8 +44,8 @@ public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOp *

* The default value is the list of jars from the main program's classpath. */ - @Description("Jar-Files to send to all workers and put on the classpath. " + - "The default value is all files from the classpath.") + @Description("Jar-Files to send to all workers and put on the classpath. " + + "The default value is all files from the classpath.") @JsonIgnore List getFilesToStage(); void setFilesToStage(List value); @@ -65,9 +66,9 @@ public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOp * Cluster in the JVM, "[collection]" will execute the pipeline on Java Collections while * "[auto]" will let the system decide where to execute the pipeline based on the environment. */ - @Description("Address of the Flink Master where the Pipeline should be executed. Can" + - " either be of the form \"host:port\" or one of the special values [local], " + - "[collection] or [auto].") + @Description("Address of the Flink Master where the Pipeline should be executed. Can" + + " either be of the form \"host:port\" or one of the special values [local], " + + "[collection] or [auto].") String getFlinkMaster(); void setFlinkMaster(String value); @@ -76,25 +77,28 @@ public interface FlinkPipelineOptions extends PipelineOptions, ApplicationNameOp Integer getParallelism(); void setParallelism(Integer value); - @Description("The interval between consecutive checkpoints (i.e. snapshots of the current pipeline state used for " + - "fault tolerance).") + @Description("The interval between consecutive checkpoints (i.e. snapshots of the current" + + "pipeline state used for fault tolerance).") @Default.Long(-1L) Long getCheckpointingInterval(); void setCheckpointingInterval(Long interval); - @Description("Sets the number of times that failed tasks are re-executed. " + - "A value of zero effectively disables fault tolerance. A value of -1 indicates " + - "that the system default value (as defined in the configuration) should be used.") + @Description("Sets the number of times that failed tasks are re-executed. " + + "A value of zero effectively disables fault tolerance. A value of -1 indicates " + + "that the system default value (as defined in the configuration) should be used.") @Default.Integer(-1) Integer getNumberOfExecutionRetries(); void setNumberOfExecutionRetries(Integer retries); - @Description("Sets the delay between executions. A value of {@code -1} indicates that the default value should be used.") + @Description("Sets the delay between executions. A value of {@code -1} " + + "indicates that the default value should be used.") @Default.Long(-1L) Long getExecutionRetryDelay(); void setExecutionRetryDelay(Long delay); - + /** + * Simple job name factory. + */ class JobNameFactory implements DefaultValueFactory { private static final DateTimeFormatter FORMATTER = DateTimeFormat.forPattern("MMddHHmmss").withZone(DateTimeZone.UTC); diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java index f328279bfba0..0e4b5130be91 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerRegistrar.java @@ -36,6 +36,9 @@ public class FlinkRunnerRegistrar { private FlinkRunnerRegistrar() { } + /** + * Pipeline runner registrar. + */ @AutoService(PipelineRunnerRegistrar.class) public static class Runner implements PipelineRunnerRegistrar { @Override @@ -46,6 +49,9 @@ public Iterable>> getPipelineRunners() { } } + /** + * Pipeline options registrar. + */ @AutoService(PipelineOptionsRegistrar.class) public static class Options implements PipelineOptionsRegistrar { @Override diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java index dd0733a671c9..90bb64d31046 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java @@ -35,9 +35,9 @@ public class FlinkRunnerResult implements PipelineResult { private final Map aggregators; private final long runtime; public FlinkRunnerResult(Map aggregators, long runtime) { - this.aggregators = (aggregators == null || aggregators.isEmpty()) ? - Collections.emptyMap() : - Collections.unmodifiableMap(aggregators); + this.aggregators = (aggregators == null || aggregators.isEmpty()) + ? Collections.emptyMap() + : Collections.unmodifiableMap(aggregators); this.runtime = runtime; } @@ -47,7 +47,8 @@ public State getState() { } @Override - public AggregatorValues getAggregatorValues(final Aggregator aggregator) throws AggregatorRetrievalException { + public AggregatorValues getAggregatorValues(final Aggregator aggregator) + throws AggregatorRetrievalException { // TODO provide a list of all accumulator step values Object value = aggregators.get(aggregator.getName()); if (value != null) { @@ -65,10 +66,10 @@ public Map getValuesAtSteps() { @Override public String toString() { - return "FlinkRunnerResult{" + - "aggregators=" + aggregators + - ", runtime=" + runtime + - '}'; + return "FlinkRunnerResult{" + + "aggregators=" + aggregators + + ", runtime=" + runtime + + '}'; } @Override diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java index dd231d6e6bdd..67a7d3891d3d 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/TestFlinkRunner.java @@ -26,6 +26,9 @@ import org.apache.beam.sdk.values.PInput; import org.apache.beam.sdk.values.POutput; +/** + * Test Flink runner. + */ public class TestFlinkRunner extends PipelineRunner { private FlinkRunner delegate; @@ -37,7 +40,8 @@ private TestFlinkRunner(FlinkPipelineOptions options) { } public static TestFlinkRunner fromOptions(PipelineOptions options) { - FlinkPipelineOptions flinkOptions = PipelineOptionsValidator.validate(FlinkPipelineOptions.class, options); + FlinkPipelineOptions flinkOptions = + PipelineOptionsValidator.validate(FlinkPipelineOptions.class, options); return new TestFlinkRunner(flinkOptions); } @@ -50,7 +54,7 @@ public static TestFlinkRunner create(boolean streaming) { @Override public - OutputT apply(PTransform transform, InputT input) { + OutputT apply(PTransform transform, InputT input) { return delegate.apply(transform, input); } diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/package-info.java new file mode 100644 index 000000000000..57f1e599ee50 --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Internal implementation of the Beam runner for Apache Flink. + */ +package org.apache.beam.runners.flink; diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java index 66c48b07e23b..aa38bfbbf5d2 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java @@ -91,15 +91,20 @@ public void visitPrimitiveTransform(TransformTreeNode node) { // get the transformation corresponding to the node we are // currently visiting and translate it into its Flink alternative. PTransform transform = node.getTransform(); - BatchTransformTranslator translator = FlinkBatchTransformTranslators.getTranslator(transform); + BatchTransformTranslator translator = + FlinkBatchTransformTranslators.getTranslator(transform); if (translator == null) { LOG.info(node.getTransform().getClass().toString()); - throw new UnsupportedOperationException("The transform " + transform + " is currently not supported."); + throw new UnsupportedOperationException("The transform " + transform + + " is currently not supported."); } applyBatchTransform(transform, node, translator); } - private > void applyBatchTransform(PTransform transform, TransformTreeNode node, BatchTransformTranslator translator) { + private > void applyBatchTransform( + PTransform transform, + TransformTreeNode node, + BatchTransformTranslator translator) { @SuppressWarnings("unchecked") T typedTransform = (T) transform; @@ -116,8 +121,8 @@ public void visitPrimitiveTransform(TransformTreeNode node) { /** * A translator of a {@link PTransform}. */ - public interface BatchTransformTranslator { - void translateNode(Type transform, FlinkBatchTranslationContext context); + public interface BatchTransformTranslator { + void translateNode(TransformT transform, FlinkBatchTranslationContext context); } /** diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java index 835648e43ea8..611f5e6d9321 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java @@ -40,7 +40,7 @@ * {@link FlinkBatchTransformTranslators}. */ public class FlinkBatchTranslationContext { - + private final Map> dataSets; private final Map, DataSet> broadcastDataSets; @@ -55,9 +55,9 @@ public class FlinkBatchTranslationContext { private final PipelineOptions options; private AppliedPTransform currentTransform; - + // ------------------------------------------------------------------------ - + public FlinkBatchTranslationContext(ExecutionEnvironment env, PipelineOptions options) { this.env = env; this.options = options; @@ -66,7 +66,7 @@ public FlinkBatchTranslationContext(ExecutionEnvironment env, PipelineOptions op this.danglingDataSets = new HashMap<>(); } - + // ------------------------------------------------------------------------ public Map> getDanglingDataSets() { @@ -80,7 +80,7 @@ public ExecutionEnvironment getExecutionEnvironment() { public PipelineOptions getPipelineOptions() { return options; } - + @SuppressWarnings("unchecked") public DataSet> getInputDataSet(PValue value) { // assume that the DataSet is used as an input if retrieved here diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.java index 4db929b0641b..cba28e48e01b 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkPipelineTranslator.java @@ -39,7 +39,7 @@ public void translate(Pipeline pipeline) { } /** - * Utility formatting method + * Utility formatting method. * @param n number of spaces to generate * @return String with "|" followed by n spaces */ diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/TranslationMode.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/TranslationMode.java index 71eb6552b710..57b69aa14753 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/TranslationMode.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/TranslationMode.java @@ -18,14 +18,14 @@ package org.apache.beam.runners.flink.translation; /** - * The translation mode of the Beam Pipeline + * The translation mode of the Beam Pipeline. */ public enum TranslationMode { - /** Uses the batch mode of Flink */ + /** Uses the batch mode of Flink. */ BATCH, - /** Uses the streaming mode of Flink */ + /** Uses the streaming mode of Flink. */ STREAMING -} \ No newline at end of file +} diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/package-info.java new file mode 100644 index 000000000000..9f1121225b88 --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/functions/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Internal implementation of the Beam runner for Apache Flink. + */ +package org.apache.beam.runners.flink.translation.functions; diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/package-info.java new file mode 100644 index 000000000000..af4b35491ba4 --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Internal implementation of the Beam runner for Apache Flink. + */ +package org.apache.beam.runners.flink.translation; diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java index 4eda357f64eb..e210ed9d7b98 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/CoderTypeSerializer.java @@ -33,7 +33,7 @@ * Dataflow {@link org.apache.beam.sdk.coders.Coder Coders}. */ public class CoderTypeSerializer extends TypeSerializer { - + private Coder coder; public CoderTypeSerializer(Coder coder) { diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java index f3e667d575a5..41db61edbb49 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/EncodedValueSerializer.java @@ -18,7 +18,9 @@ package org.apache.beam.runners.flink.translation.types; import java.io.IOException; + import org.apache.beam.sdk.coders.Coder; + import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; @@ -28,84 +30,84 @@ */ public final class EncodedValueSerializer extends TypeSerializer { - private static final long serialVersionUID = 1L; - - private static final byte[] EMPTY = new byte[0]; - - @Override - public boolean isImmutableType() { - return true; - } - - @Override - public byte[] createInstance() { - return EMPTY; - } - - @Override - public byte[] copy(byte[] from) { - return from; - } - - @Override - public byte[] copy(byte[] from, byte[] reuse) { - return copy(from); - } - - @Override - public int getLength() { - return -1; - } - - - @Override - public void serialize(byte[] record, DataOutputView target) throws IOException { - if (record == null) { - throw new IllegalArgumentException("The record must not be null."); - } - - final int len = record.length; - target.writeInt(len); - target.write(record); - } - - @Override - public byte[] deserialize(DataInputView source) throws IOException { - final int len = source.readInt(); - byte[] result = new byte[len]; - source.readFully(result); - return result; - } - - @Override - public byte[] deserialize(byte[] reuse, DataInputView source) throws IOException { - return deserialize(source); - } - - @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { - final int len = source.readInt(); - target.writeInt(len); - target.write(source, len); - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof EncodedValueSerializer; - } - - @Override - public int hashCode() { - return this.getClass().hashCode(); - } - - @Override - public boolean equals(Object obj) { - return obj instanceof EncodedValueSerializer; - } - - @Override - public TypeSerializer duplicate() { - return this; - } + private static final long serialVersionUID = 1L; + + private static final byte[] EMPTY = new byte[0]; + + @Override + public boolean isImmutableType() { + return true; + } + + @Override + public byte[] createInstance() { + return EMPTY; + } + + @Override + public byte[] copy(byte[] from) { + return from; + } + + @Override + public byte[] copy(byte[] from, byte[] reuse) { + return copy(from); + } + + @Override + public int getLength() { + return -1; + } + + + @Override + public void serialize(byte[] record, DataOutputView target) throws IOException { + if (record == null) { + throw new IllegalArgumentException("The record must not be null."); + } + + final int len = record.length; + target.writeInt(len); + target.write(record); + } + + @Override + public byte[] deserialize(DataInputView source) throws IOException { + final int len = source.readInt(); + byte[] result = new byte[len]; + source.readFully(result); + return result; + } + + @Override + public byte[] deserialize(byte[] reuse, DataInputView source) throws IOException { + return deserialize(source); + } + + @Override + public void copy(DataInputView source, DataOutputView target) throws IOException { + final int len = source.readInt(); + target.writeInt(len); + target.write(source, len); + } + + @Override + public boolean canEqual(Object obj) { + return obj instanceof EncodedValueSerializer; + } + + @Override + public int hashCode() { + return this.getClass().hashCode(); + } + + @Override + public boolean equals(Object obj) { + return obj instanceof EncodedValueSerializer; + } + + @Override + public TypeSerializer duplicate() { + return this; + } } diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/package-info.java new file mode 100644 index 000000000000..6fb3182f27c5 --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/types/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Internal implementation of the Beam runner for Apache Flink. + */ +package org.apache.beam.runners.flink.translation.types; diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java index 0c6cea8dfd29..fe2602b3377b 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/SerializedPipelineOptions.java @@ -33,7 +33,7 @@ public class SerializedPipelineOptions implements Serializable { private final byte[] serializedOptions; - /** Lazily initialized copy of deserialized options */ + /** Lazily initialized copy of deserialized options. */ private transient PipelineOptions pipelineOptions; public SerializedPipelineOptions(PipelineOptions options) { diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/package-info.java new file mode 100644 index 000000000000..5dedd53c7752 --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/utils/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Internal implementation of the Beam runner for Apache Flink. + */ +package org.apache.beam.runners.flink.translation.utils; diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java index 2cb9b188ff06..f2d9db222c56 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/DataOutputViewWrapper.java @@ -28,7 +28,7 @@ * {@link java.io.OutputStream}. */ public class DataOutputViewWrapper extends OutputStream { - + private DataOutputView outputView; public DataOutputViewWrapper(DataOutputView outputView) { diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/package-info.java new file mode 100644 index 000000000000..72f7debb434f --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Internal implementation of the Beam runner for Apache Flink. + */ +package org.apache.beam.runners.flink.translation.wrappers; diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 000d69f34823..db53e6a9f01c 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -169,7 +169,7 @@ public void open() throws Exception { currentInputWatermark = Long.MIN_VALUE; currentOutputWatermark = currentInputWatermark; - Aggregator.AggregatorFactory aggregatorFactory = new Aggregator.AggregatorFactory() { + Aggregator.AggregatorFactory aggregatorFactory = new Aggregator.AggregatorFactory() { @Override public Aggregator createAggregatorForDoFn( Class fnClass, diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java index 5751aac78123..6d2582b5da5b 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItem.java @@ -22,6 +22,11 @@ import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.WindowedValue; +/** + * Singleton keyed word item. + * @param + * @param + */ public class SingletonKeyedWorkItem implements KeyedWorkItem { final K key; diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java index 5e583e9013c2..37454a31c4bf 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SingletonKeyedWorkItemCoder.java @@ -35,7 +35,13 @@ import org.apache.beam.sdk.util.PropertyNames; import org.apache.beam.sdk.util.WindowedValue; -public class SingletonKeyedWorkItemCoder extends StandardCoder> { +/** + * Singleton keyed word iteam coder. + * @param + * @param + */ +public class SingletonKeyedWorkItemCoder + extends StandardCoder> { /** * Create a new {@link KeyedWorkItemCoder} with the provided key coder, element coder, and window * coder. @@ -68,7 +74,7 @@ private SingletonKeyedWorkItemCoder( this.keyCoder = keyCoder; this.elemCoder = elemCoder; this.windowCoder = windowCoder; - valueCoder= WindowedValue.FullWindowedValueCoder.of(elemCoder, windowCoder); + valueCoder = WindowedValue.FullWindowedValueCoder.of(elemCoder, windowCoder); } public Coder getKeyCoder() { @@ -80,7 +86,9 @@ public Coder getElementCoder() { } @Override - public void encode(SingletonKeyedWorkItem value, OutputStream outStream, Context context) + public void encode(SingletonKeyedWorkItem value, + OutputStream outStream, + Context context) throws CoderException, IOException { Context nestedContext = context.nested(); keyCoder.encode(value.key(), outStream, nestedContext); diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java index 51d9e0c9399c..7829163e6a26 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WorkItemKeySelector.java @@ -33,7 +33,8 @@ * that all key comparisons/hashing happen on the encoded form. */ public class WorkItemKeySelector - implements KeySelector>, ByteBuffer>, ResultTypeQueryable { + implements KeySelector>, ByteBuffer>, + ResultTypeQueryable { private final Coder keyCoder; diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java index 2117e9d19353..5b01796fe1f9 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSink.java @@ -62,7 +62,8 @@ public void initialize(PipelineOptions options) throws Exception { } @Override - public void finalize(Iterable writerResults, PipelineOptions options) throws Exception { + public void finalize(Iterable writerResults, PipelineOptions options) + throws Exception { } @@ -70,12 +71,14 @@ public void finalize(Iterable writerResults, PipelineOptions options) th public Coder getWriterResultCoder() { return new Coder() { @Override - public void encode(Object value, OutputStream outStream, Context context) throws CoderException, IOException { + public void encode(Object value, OutputStream outStream, Context context) + throws CoderException, IOException { } @Override - public Object decode(InputStream inStream, Context context) throws CoderException, IOException { + public Object decode(InputStream inStream, Context context) + throws CoderException, IOException { return null; } @@ -110,7 +113,9 @@ public boolean isRegisterByteSizeObserverCheap(Object value, Context context) { } @Override - public void registerByteSizeObserver(Object value, ElementByteSizeObserver observer, Context context) throws Exception { + public void registerByteSizeObserver(Object value, + ElementByteSizeObserver observer, + Context context) throws Exception { } diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java index c6e0825648f4..ac20c34ff204 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedFlinkSource.java @@ -36,17 +36,19 @@ public class UnboundedFlinkSource extends UnboundedSource flinkSource; - /** Coder set during translation */ + /** Coder set during translation. */ private Coder coder; - /** Timestamp / watermark assigner for source; defaults to ingestion time */ - private AssignerWithPeriodicWatermarks flinkTimestampAssigner = new IngestionTimeExtractor(); + /** Timestamp / watermark assigner for source; defaults to ingestion time. */ + private AssignerWithPeriodicWatermarks flinkTimestampAssigner = + new IngestionTimeExtractor(); public UnboundedFlinkSource(SourceFunction source) { flinkSource = checkNotNull(source); } - public UnboundedFlinkSource(SourceFunction source, AssignerWithPeriodicWatermarks timestampAssigner) { + public UnboundedFlinkSource(SourceFunction source, + AssignerWithPeriodicWatermarks timestampAssigner) { flinkSource = checkNotNull(source); flinkTimestampAssigner = checkNotNull(timestampAssigner); } @@ -60,19 +62,25 @@ public AssignerWithPeriodicWatermarks getFlinkTimestampAssigner() { } @Override - public List> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception { - throw new RuntimeException("Flink Sources are supported only when running with the FlinkRunner."); + public List> generateInitialSplits( + int desiredNumSplits, + PipelineOptions options) throws Exception { + throw new RuntimeException("Flink Sources are supported only when " + + "running with the FlinkRunner."); } @Override - public UnboundedReader createReader(PipelineOptions options, @Nullable CheckpointMark checkpointMark) { - throw new RuntimeException("Flink Sources are supported only when running with the FlinkRunner."); + public UnboundedReader createReader(PipelineOptions options, + @Nullable CheckpointMark checkpointMark) { + throw new RuntimeException("Flink Sources are supported only when " + + "running with the FlinkRunner."); } @Nullable @Override public Coder getCheckpointMarkCoder() { - throw new RuntimeException("Flink Sources are supported only when running with the FlinkRunner."); + throw new RuntimeException("Flink Sources are supported only when " + + "running with the FlinkRunner."); } @@ -100,7 +108,8 @@ public void setFlinkTimestampAssigner(AssignerWithPeriodicWatermarks flinkTim * @param The type that the source function produces. * @return The wrapped source function. */ - public static UnboundedSource of(SourceFunction flinkSource) { + public static UnboundedSource of( + SourceFunction flinkSource) { return new UnboundedFlinkSource<>(flinkSource); } diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java index 8d37fe70dfb8..96b51389deaf 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSocketSource.java @@ -38,9 +38,11 @@ import org.slf4j.LoggerFactory; /** - * An example unbounded Beam source that reads input from a socket. This is used mainly for testing and debugging. + * An example unbounded Beam source that reads input from a socket. + * This is used mainly for testing and debugging. * */ -public class UnboundedSocketSource extends UnboundedSource { +public class UnboundedSocketSource + extends UnboundedSource { private static final Coder DEFAULT_SOCKET_CODER = StringUtf8Coder.of(); @@ -60,7 +62,11 @@ public UnboundedSocketSource(String hostname, int port, char delimiter, long max this(hostname, port, delimiter, maxNumRetries, DEFAULT_CONNECTION_RETRY_SLEEP); } - public UnboundedSocketSource(String hostname, int port, char delimiter, long maxNumRetries, long delayBetweenRetries) { + public UnboundedSocketSource(String hostname, + int port, + char delimiter, + long maxNumRetries, + long delayBetweenRetries) { this.hostname = hostname; this.port = port; this.delimiter = delimiter; @@ -89,12 +95,15 @@ public long getDelayBetweenRetries() { } @Override - public List> generateInitialSplits(int desiredNumSplits, PipelineOptions options) throws Exception { - return Collections.>singletonList(this); + public List> generateInitialSplits( + int desiredNumSplits, + PipelineOptions options) throws Exception { + return Collections.>singletonList(this); } @Override - public UnboundedReader createReader(PipelineOptions options, @Nullable C checkpointMark) { + public UnboundedReader createReader(PipelineOptions options, + @Nullable CheckpointMarkT checkpointMark) { return new UnboundedSocketReader(this); } @@ -109,7 +118,8 @@ public Coder getCheckpointMarkCoder() { @Override public void validate() { checkArgument(port > 0 && port < 65536, "port is out of range"); - checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero or larger (num retries), or -1 (infinite retries)"); + checkArgument(maxNumRetries >= -1, "maxNumRetries must be zero or larger (num retries), " + + "or -1 (infinite retries)"); checkArgument(delayBetweenRetries >= 0, "delayBetweenRetries must be zero or positive"); } @@ -118,7 +128,11 @@ public Coder getDefaultOutputCoder() { return DEFAULT_SOCKET_CODER; } - public static class UnboundedSocketReader extends UnboundedSource.UnboundedReader implements Serializable { + /** + * Unbounded socket reader. + */ + public static class UnboundedSocketReader extends UnboundedSource.UnboundedReader + implements Serializable { private static final long serialVersionUID = 7526472295622776147L; private static final Logger LOG = LoggerFactory.getLogger(UnboundedSocketReader.class); @@ -138,7 +152,8 @@ public UnboundedSocketReader(UnboundedSocketSource source) { private void openConnection() throws IOException { this.socket = new Socket(); - this.socket.connect(new InetSocketAddress(this.source.getHostname(), this.source.getPort()), CONNECTION_TIMEOUT_TIME); + this.socket.connect(new InetSocketAddress(this.source.getHostname(), this.source.getPort()), + CONNECTION_TIMEOUT_TIME); this.reader = new BufferedReader(new InputStreamReader(this.socket.getInputStream())); this.isRunning = true; } @@ -149,11 +164,14 @@ public boolean start() throws IOException { while (!isRunning) { try { openConnection(); - LOG.info("Connected to server socket " + this.source.getHostname() + ':' + this.source.getPort()); + LOG.info("Connected to server socket " + this.source.getHostname() + ':' + + this.source.getPort()); return advance(); } catch (IOException e) { - LOG.info("Lost connection to server socket " + this.source.getHostname() + ':' + this.source.getPort() + ". Retrying in " + this.source.getDelayBetweenRetries() + " msecs..."); + LOG.info("Lost connection to server socket " + this.source.getHostname() + ':' + + this.source.getPort() + ". Retrying in " + + this.source.getDelayBetweenRetries() + " msecs..."); if (this.source.getMaxNumRetries() == -1 || attempt++ < this.source.getMaxNumRetries()) { try { @@ -167,7 +185,8 @@ public boolean start() throws IOException { } } } - LOG.error("Unable to connect to host " + this.source.getHostname() + " : " + this.source.getPort()); + LOG.error("Unable to connect to host " + this.source.getHostname() + + " : " + this.source.getPort()); return false; } @@ -211,7 +230,8 @@ public void close() throws IOException { this.reader.close(); this.socket.close(); this.isRunning = false; - LOG.info("Closed connection to server socket at " + this.source.getHostname() + ":" + this.source.getPort() + "."); + LOG.info("Closed connection to server socket at " + this.source.getHostname() + ":" + + this.source.getPort() + "."); } @Override diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/package-info.java new file mode 100644 index 000000000000..b431ce745bde --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Internal implementation of the Beam runner for Apache Flink. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming.io; diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/package-info.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/package-info.java new file mode 100644 index 000000000000..0674871307e6 --- /dev/null +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Internal implementation of the Beam runner for Apache Flink. + */ +package org.apache.beam.runners.flink.translation.wrappers.streaming; diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java index 32339dce5165..3c30fed923ba 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java @@ -52,6 +52,9 @@ */ public class PipelineOptionsTest { + /** + * Pipeline options. + */ public interface MyOptions extends FlinkPipelineOptions { @Description("Bla bla bla") @Default.String("Hello") diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java index 09881463f6c2..37eedb220bac 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java @@ -118,7 +118,8 @@ public void initialize(PipelineOptions options) throws Exception { } @Override - public void finalize(Iterable writerResults, PipelineOptions options) throws Exception { + public void finalize(Iterable writerResults, PipelineOptions options) + throws Exception { } diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/package-info.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/package-info.java new file mode 100644 index 000000000000..57f1e599ee50 --- /dev/null +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Internal implementation of the Beam runner for Apache Flink. + */ +package org.apache.beam.runners.flink; diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java index 711ae0001f79..628212a4e5f1 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/FlinkStateInternalsTest.java @@ -56,7 +56,8 @@ import org.junit.runners.JUnit4; /** - * Tests for {@link FlinkStateInternals}. This is based on the tests for {@code InMemoryStateInternals}. + * Tests for {@link FlinkStateInternals}. This is based on the tests for + * {@code InMemoryStateInternals}. */ @RunWith(JUnit4.class) public class FlinkStateInternalsTest { diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java index ab98c27733b3..c6381ee4002e 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java @@ -36,6 +36,9 @@ import org.joda.time.Duration; import org.joda.time.Instant; +/** + * Test for GroupByNullKey. + */ public class GroupByNullKeyTest extends StreamingProgramTestBase implements Serializable { @@ -58,6 +61,9 @@ protected void postSubmit() throws Exception { compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), resultPath); } + /** + * DoFn extracting user and timestamp. + */ public static class ExtractUserAndTimestamp extends OldDoFn, String> { private static final long serialVersionUID = 0; diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java index 64f978fa0f23..9410481f054e 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java @@ -38,7 +38,7 @@ /** - * Session window test + * Session window test. */ public class TopWikipediaSessionsITCase extends StreamingProgramTestBase implements Serializable { protected String resultPath; diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java index a70ad49b9f73..73124a9eed72 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java @@ -56,14 +56,14 @@ public class UnboundedSourceWrapperTest { */ @Test public void testWithOneReader() throws Exception { - final int NUM_ELEMENTS = 20; + final int numElements = 20; final Object checkpointLock = new Object(); PipelineOptions options = PipelineOptionsFactory.create(); // this source will emit exactly NUM_ELEMENTS across all parallel readers, // afterwards it will stall. We check whether we also receive NUM_ELEMENTS // elements later. - TestCountingSource source = new TestCountingSource(NUM_ELEMENTS); + TestCountingSource source = new TestCountingSource(numElements); UnboundedSourceWrapper, TestCountingSource.CounterMark> flinkWrapper = new UnboundedSourceWrapper<>(options, source, 1); @@ -92,7 +92,7 @@ public void collect( StreamRecord>> windowedValueStreamRecord) { count++; - if (count >= NUM_ELEMENTS) { + if (count >= numElements) { throw new SuccessException(); } } @@ -116,14 +116,14 @@ public void close() { */ @Test public void testWithMultipleReaders() throws Exception { - final int NUM_ELEMENTS = 20; + final int numElements = 20; final Object checkpointLock = new Object(); PipelineOptions options = PipelineOptionsFactory.create(); // this source will emit exactly NUM_ELEMENTS across all parallel readers, // afterwards it will stall. We check whether we also receive NUM_ELEMENTS // elements later. - TestCountingSource source = new TestCountingSource(NUM_ELEMENTS); + TestCountingSource source = new TestCountingSource(numElements); UnboundedSourceWrapper, TestCountingSource.CounterMark> flinkWrapper = new UnboundedSourceWrapper<>(options, source, 4); @@ -149,10 +149,10 @@ public void emitWatermark(Watermark watermark) { @Override public void collect( - StreamRecord>> windowedValueStreamRecord) { + StreamRecord>> windowedValueStreamRecord) { count++; - if (count >= NUM_ELEMENTS) { + if (count >= numElements) { throw new SuccessException(); } } @@ -177,14 +177,14 @@ public void close() { */ @Test public void testRestore() throws Exception { - final int NUM_ELEMENTS = 20; + final int numElements = 20; final Object checkpointLock = new Object(); PipelineOptions options = PipelineOptionsFactory.create(); // this source will emit exactly NUM_ELEMENTS across all parallel readers, // afterwards it will stall. We check whether we also receive NUM_ELEMENTS // elements later. - TestCountingSource source = new TestCountingSource(NUM_ELEMENTS); + TestCountingSource source = new TestCountingSource(numElements); UnboundedSourceWrapper, TestCountingSource.CounterMark> flinkWrapper = new UnboundedSourceWrapper<>(options, source, 1); @@ -213,11 +213,11 @@ public void emitWatermark(Watermark watermark) { @Override public void collect( - StreamRecord>> windowedValueStreamRecord) { + StreamRecord>> windowedValueStreamRecord) { emittedElements.add(windowedValueStreamRecord.getValue().getValue()); count++; - if (count >= NUM_ELEMENTS / 2) { + if (count >= numElements / 2) { throw new SuccessException(); } } @@ -238,7 +238,7 @@ public void close() { byte[] snapshot = flinkWrapper.snapshotState(0, 0); // create a completely new source but restore from the snapshot - TestCountingSource restoredSource = new TestCountingSource(NUM_ELEMENTS); + TestCountingSource restoredSource = new TestCountingSource(numElements); UnboundedSourceWrapper< KV, TestCountingSource.CounterMark> restoredFlinkWrapper = new UnboundedSourceWrapper<>(options, restoredSource, 1); @@ -271,10 +271,10 @@ public void emitWatermark(Watermark watermark) { @Override public void collect( - StreamRecord>> windowedValueStreamRecord) { + StreamRecord>> windowedValueStreamRecord) { emittedElements.add(windowedValueStreamRecord.getValue().getValue()); count++; - if (count >= NUM_ELEMENTS / 2) { + if (count >= numElements / 2) { throw new SuccessException(); } } @@ -292,7 +292,7 @@ public void close() { assertTrue("Did not successfully read second batch of elements.", readSecondBatchOfElements); // verify that we saw all NUM_ELEMENTS elements - assertTrue(emittedElements.size() == NUM_ELEMENTS); + assertTrue(emittedElements.size() == numElements); } @SuppressWarnings("unchecked") @@ -310,7 +310,8 @@ private static void setupSourceOperator(StreamSource operator) { when(mockTask.getConfiguration()).thenReturn(cfg); when(mockTask.getEnvironment()).thenReturn(env); when(mockTask.getExecutionConfig()).thenReturn(executionConfig); - when(mockTask.getAccumulatorMap()).thenReturn(Collections.>emptyMap()); + when(mockTask.getAccumulatorMap()) + .thenReturn(Collections.>emptyMap()); operator.setup(mockTask, cfg, (Output< StreamRecord>) mock(Output.class)); } diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/package-info.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/package-info.java new file mode 100644 index 000000000000..08a1e03ec243 --- /dev/null +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Internal implementation of the Beam runner for Apache Flink. + */ +package org.apache.beam.runners.flink.streaming;