From 42f6b27334672f66593ad7ddc1ece6c16432028e Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Wed, 22 Mar 2017 15:56:50 +0800 Subject: [PATCH 1/3] [BEAM-773] Implement Metrics support for Flink runner --- runners/flink/pom.xml | 8 +- .../flink/FlinkBatchTransformTranslators.java | 4 +- .../flink/FlinkBatchTranslationContext.java | 4 + .../runners/flink/FlinkPipelineOptions.java | 5 + .../beam/runners/flink/FlinkRunnerResult.java | 3 +- .../FlinkStreamingTransformTranslators.java | 13 + .../FlinkStreamingTranslationContext.java | 3 + .../metrics/DoFnRunnerWithMetricsUpdate.java | 91 +++++ .../flink/metrics/FlinkMetricContainer.java | 315 ++++++++++++++++++ .../flink/metrics/FlinkMetricResults.java | 146 ++++++++ .../runners/flink/metrics/package-info.java | 22 ++ .../functions/FlinkDoFnFunction.java | 10 + .../functions/FlinkStatefulDoFnFunction.java | 10 + .../wrappers/streaming/DoFnOperator.java | 13 +- .../streaming/SplittableDoFnOperator.java | 2 + .../streaming/WindowDoFnOperator.java | 2 + .../runners/flink/PipelineOptionsTest.java | 10 + .../flink/streaming/DoFnOperatorTest.java | 5 + 18 files changed, 661 insertions(+), 5 deletions(-) create mode 100644 runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java create mode 100644 runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java create mode 100644 runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java create mode 100644 runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/package-info.java diff --git a/runners/flink/pom.xml b/runners/flink/pom.xml index 6e1d3c570fa5..e88b68acda32 100644 --- a/runners/flink/pom.xml +++ b/runners/flink/pom.xml @@ -58,7 +58,6 @@ org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders, org.apache.beam.sdk.testing.UsesSplittableParDo, - org.apache.beam.sdk.testing.UsesAttemptedMetrics, org.apache.beam.sdk.testing.UsesCommittedMetrics, org.apache.beam.sdk.testing.UsesTestStream @@ -92,7 +91,6 @@ org.apache.beam.sdk.testing.FlattenWithHeterogeneousCoders, org.apache.beam.sdk.testing.UsesSetState, org.apache.beam.sdk.testing.UsesMapState, - org.apache.beam.sdk.testing.UsesAttemptedMetrics, org.apache.beam.sdk.testing.UsesCommittedMetrics, org.apache.beam.sdk.testing.UsesTestStream, org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs @@ -177,6 +175,12 @@ ${flink.version} + + org.apache.flink + flink-metrics-core + ${flink.version} + + org.apache.flink flink-java diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java index ff9521c009f2..57f677cd6cc7 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java @@ -571,7 +571,8 @@ public void translateNode( (KvCoder) context.getInput(transform).getCoder(); FlinkStatefulDoFnFunction doFnWrapper = new FlinkStatefulDoFnFunction<>( - (DoFn) doFn, windowingStrategy, sideInputStrategies, context.getPipelineOptions(), + (DoFn) doFn, context.getCurrentTransform().getFullName(), + windowingStrategy, sideInputStrategies, context.getPipelineOptions(), outputMap, transform.getMainOutputTag() ); @@ -585,6 +586,7 @@ public void translateNode( FlinkDoFnFunction doFnWrapper = new FlinkDoFnFunction( doFn, + context.getCurrentTransform().getFullName(), windowingStrategy, sideInputStrategies, context.getPipelineOptions(), diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java index 98dd0fb858eb..bb86cd9dc98f 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTranslationContext.java @@ -103,6 +103,10 @@ public void setCurrentTransform(AppliedPTransform currentTransform) { this.currentTransform = currentTransform; } + public AppliedPTransform getCurrentTransform() { + return currentTransform; + } + @SuppressWarnings("unchecked") public DataSet getSideInputDataSet(PCollectionView value) { return (DataSet) broadcastDataSets.get(value); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java index ef9afeaed52b..b769a6f31bfe 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPipelineOptions.java @@ -98,4 +98,9 @@ public interface FlinkPipelineOptions AbstractStateBackend getStateBackend(); void setStateBackend(AbstractStateBackend stateBackend); + @Description("Enable/disable Beam metrics in Flink Runner") + @Default.Boolean(true) + Boolean getEnableMetrics(); + void setEnableMetrics(Boolean enableMetrics); + } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java index 0682b5695f74..0f2462d20929 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkRunnerResult.java @@ -20,6 +20,7 @@ import java.io.IOException; import java.util.Collections; import java.util.Map; +import org.apache.beam.runners.flink.metrics.FlinkMetricResults; import org.apache.beam.sdk.AggregatorRetrievalException; import org.apache.beam.sdk.AggregatorValues; import org.apache.beam.sdk.PipelineResult; @@ -93,6 +94,6 @@ public State waitUntilFinish(Duration duration) { @Override public MetricResults metrics() { - throw new UnsupportedOperationException("The FlinkRunner does not currently support metrics."); + return new FlinkMetricResults(aggregators); } } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index 71f315db75ae..273023687808 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -286,6 +286,7 @@ static class ParDoTranslationHelper { interface DoFnOperatorFactory { DoFnOperator createDoFnOperator( DoFn doFn, + String stepName, List> sideInputs, TupleTag mainOutputTag, List> additionalOutputTags, @@ -300,6 +301,7 @@ DoFnOperator createDoFnOperator( static void translateParDo( String transformName, DoFn doFn, + String stepName, PCollection input, List> sideInputs, Map, PValue> outputs, @@ -340,6 +342,7 @@ static void translateParDo( DoFnOperator doFnOperator = doFnOperatorFactory.createDoFnOperator( doFn, + context.getCurrentTransform().getFullName(), sideInputs, mainOutputTag, additionalOutputTags, @@ -365,6 +368,7 @@ static void translateParDo( DoFnOperator doFnOperator = doFnOperatorFactory.createDoFnOperator( doFn, + context.getCurrentTransform().getFullName(), sideInputs, mainOutputTag, additionalOutputTags, @@ -483,6 +487,7 @@ public void translateNode( ParDoTranslationHelper.translateParDo( transform.getName(), transform.getFn(), + context.getCurrentTransform().getFullName(), (PCollection) context.getInput(transform), transform.getSideInputs(), context.getOutputs(transform), @@ -493,6 +498,7 @@ public void translateNode( @Override public DoFnOperator createDoFnOperator( DoFn doFn, + String stepName, List> sideInputs, TupleTag mainOutputTag, List> additionalOutputTags, @@ -504,6 +510,7 @@ public DoFnOperator createDoFnOperator( Map> transformedSideInputs) { return new DoFnOperator<>( doFn, + stepName, inputCoder, mainOutputTag, additionalOutputTags, @@ -531,6 +538,7 @@ public void translateNode( ParDoTranslationHelper.translateParDo( transform.getName(), transform.newProcessFn(transform.getFn()), + context.getCurrentTransform().getFullName(), (PCollection>>) context.getInput(transform), transform.getSideInputs(), @@ -548,6 +556,7 @@ RawUnionValue> createDoFnOperator( DoFn< KeyedWorkItem>, OutputT> doFn, + String stepName, List> sideInputs, TupleTag mainOutputTag, List> additionalOutputTags, @@ -563,6 +572,7 @@ RawUnionValue> createDoFnOperator( Map> transformedSideInputs) { return new SplittableDoFnOperator<>( doFn, + stepName, inputCoder, mainOutputTag, additionalOutputTags, @@ -700,6 +710,7 @@ public void translateNode( WindowDoFnOperator> doFnOperator = new WindowDoFnOperator<>( reduceFn, + context.getCurrentTransform().getFullName(), (Coder) windowedWorkItemCoder, new TupleTag>>("main output"), Collections.>emptyList(), @@ -800,6 +811,7 @@ public void translateNode( WindowDoFnOperator doFnOperator = new WindowDoFnOperator<>( reduceFn, + context.getCurrentTransform().getFullName(), (Coder) windowedWorkItemCoder, new TupleTag>("main output"), Collections.>emptyList(), @@ -825,6 +837,7 @@ public void translateNode( WindowDoFnOperator doFnOperator = new WindowDoFnOperator<>( reduceFn, + context.getCurrentTransform().getFullName(), (Coder) windowedWorkItemCoder, new TupleTag>("main output"), Collections.>emptyList(), diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java index 1a943a3dbb1b..45ee14d25a0d 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTranslationContext.java @@ -107,6 +107,9 @@ public TypeInformation> getTypeInfo(PCollection collecti return new CoderTypeInformation<>(windowedValueCoder); } + public AppliedPTransform getCurrentTransform() { + return currentTransform; + } @SuppressWarnings("unchecked") public T getInput(PTransform transform) { diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java new file mode 100644 index 000000000000..29a1a5248682 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/DoFnRunnerWithMetricsUpdate.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.metrics; + +import java.io.Closeable; +import java.io.IOException; +import org.apache.beam.runners.core.DoFnRunner; +import org.apache.beam.sdk.metrics.MetricsEnvironment; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.joda.time.Instant; + +/** + * {@link DoFnRunner} decorator which registers + * {@link org.apache.beam.sdk.metrics.MetricsContainer}. It updates metrics to Flink metrics and + * accumulators in {@link #finishBundle()}. + */ +public class DoFnRunnerWithMetricsUpdate implements DoFnRunner { + + private final FlinkMetricContainer container; + private final DoFnRunner delegate; + + public DoFnRunnerWithMetricsUpdate( + String stepName, + DoFnRunner delegate, + RuntimeContext runtimeContext) { + this.delegate = delegate; + container = new FlinkMetricContainer(stepName, runtimeContext); + } + + @Override + public void startBundle() { + try (Closeable ignored = + MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) { + delegate.startBundle(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void processElement(final WindowedValue elem) { + try (Closeable ignored = + MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) { + delegate.processElement(elem); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void onTimer(final String timerId, final BoundedWindow window, final Instant timestamp, + final TimeDomain timeDomain) { + try (Closeable ignored = + MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) { + delegate.onTimer(timerId, window, timestamp, timeDomain); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void finishBundle() { + try (Closeable ignored = + MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) { + delegate.finishBundle(); + } catch (IOException e) { + throw new RuntimeException(e); + } + + // update metrics + container.updateMetrics(); + } +} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java new file mode 100644 index 000000000000..d020f69f2d80 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricContainer.java @@ -0,0 +1,315 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.metrics; + +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.sdk.metrics.DistributionData; +import org.apache.beam.sdk.metrics.GaugeData; +import org.apache.beam.sdk.metrics.MetricKey; +import org.apache.beam.sdk.metrics.MetricName; +import org.apache.beam.sdk.metrics.MetricUpdates; +import org.apache.beam.sdk.metrics.MetricsContainer; +import org.apache.flink.api.common.accumulators.Accumulator; +import org.apache.flink.api.common.accumulators.LongCounter; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.metrics.Counter; +import org.apache.flink.metrics.Gauge; + +/** + * Helper class for holding a {@link MetricsContainer} and forwarding Beam metrics to + * Flink accumulators and metrics. + */ +public class FlinkMetricContainer { + + private static final String METRIC_KEY_SEPARATOR = "__"; + static final String COUNTER_PREFIX = "__counter"; + static final String DISTRIBUTION_PREFIX = "__distribution"; + static final String GAUGE_PREFIX = "__gauge"; + + private final MetricsContainer metricsContainer; + private final RuntimeContext runtimeContext; + private final Map flinkCounterCache; + private final Map flinkDistributionGaugeCache; + private final Map flinkGaugeCache; + + public FlinkMetricContainer(String stepName, RuntimeContext runtimeContext) { + metricsContainer = new MetricsContainer(stepName); + this.runtimeContext = runtimeContext; + flinkCounterCache = new HashMap<>(); + flinkDistributionGaugeCache = new HashMap<>(); + flinkGaugeCache = new HashMap<>(); + } + + public MetricsContainer getMetricsContainer() { + return metricsContainer; + } + + public void updateMetrics() { + // update metrics + MetricUpdates updates = metricsContainer.getUpdates(); + if (updates != null) { + updateCounters(updates.counterUpdates()); + updateDistributions(updates.distributionUpdates()); + updateGauge(updates.gaugeUpdates()); + metricsContainer.commitUpdates(); + } + } + + private void updateCounters(Iterable> updates) { + + for (MetricUpdates.MetricUpdate metricUpdate : updates) { + + String flinkMetricName = getFlinkMetricNameString(COUNTER_PREFIX, metricUpdate.getKey()); + Long update = metricUpdate.getUpdate(); + + // update flink metric + Counter counter = flinkCounterCache.get(flinkMetricName); + if (counter == null) { + counter = runtimeContext.getMetricGroup().counter(flinkMetricName); + flinkCounterCache.put(flinkMetricName, counter); + } + counter.dec(counter.getCount()); + counter.inc(update); + + // update flink accumulator + Accumulator accumulator = runtimeContext.getAccumulator(flinkMetricName); + if (accumulator == null) { + accumulator = new LongCounter(update); + runtimeContext.addAccumulator(flinkMetricName, accumulator); + } else { + accumulator.resetLocal(); + accumulator.add(update); + } + } + } + + private void updateDistributions(Iterable> updates) { + + for (MetricUpdates.MetricUpdate metricUpdate : updates) { + + String flinkMetricName = + getFlinkMetricNameString(DISTRIBUTION_PREFIX, metricUpdate.getKey()); + DistributionData update = metricUpdate.getUpdate(); + + // update flink metric + FlinkDistributionGauge gauge = flinkDistributionGaugeCache.get(flinkMetricName); + if (gauge == null) { + gauge = runtimeContext.getMetricGroup() + .gauge(flinkMetricName, new FlinkDistributionGauge(update)); + flinkDistributionGaugeCache.put(flinkMetricName, gauge); + } else { + gauge.update(update); + } + + // update flink accumulator + Accumulator accumulator = + runtimeContext.getAccumulator(flinkMetricName); + if (accumulator == null) { + accumulator = new FlinkDistributionDataAccumulator(update); + runtimeContext.addAccumulator(flinkMetricName, accumulator); + } else { + accumulator.resetLocal(); + accumulator.add(update); + } + } + } + + private void updateGauge(Iterable> updates) { + for (MetricUpdates.MetricUpdate metricUpdate : updates) { + + String flinkMetricName = + getFlinkMetricNameString(GAUGE_PREFIX, metricUpdate.getKey()); + GaugeData update = metricUpdate.getUpdate(); + + // update flink metric + FlinkGauge gauge = flinkGaugeCache.get(flinkMetricName); + if (gauge == null) { + gauge = runtimeContext.getMetricGroup() + .gauge(flinkMetricName, new FlinkGauge(update)); + flinkGaugeCache.put(flinkMetricName, gauge); + } else { + gauge.update(update); + } + + // update flink accumulator + Accumulator accumulator = + runtimeContext.getAccumulator(flinkMetricName); + if (accumulator == null) { + accumulator = new FlinkGaugeAccumulator(update); + runtimeContext.addAccumulator(flinkMetricName, accumulator); + } + accumulator.resetLocal(); + accumulator.add(update); + } + } + + private static String getFlinkMetricNameString(String prefix, MetricKey key) { + return prefix + + METRIC_KEY_SEPARATOR + key.stepName() + + METRIC_KEY_SEPARATOR + key.metricName().namespace() + + METRIC_KEY_SEPARATOR + key.metricName().name(); + } + + static MetricKey parseMetricKey(String flinkMetricName) { + String[] arr = flinkMetricName.split(METRIC_KEY_SEPARATOR); + return MetricKey.create(arr[2], MetricName.named(arr[3], arr[4])); + } + + /** + * Flink {@link Gauge} for {@link DistributionData}. + */ + public static class FlinkDistributionGauge implements Gauge { + + DistributionData data; + + FlinkDistributionGauge(DistributionData data) { + this.data = data; + } + + void update(DistributionData data) { + this.data = data; + } + + @Override + public DistributionData getValue() { + return data; + } + } + + /** + * Flink {@link Gauge} for {@link GaugeData}. + */ + public static class FlinkGauge implements Gauge { + + GaugeData data; + + FlinkGauge(GaugeData data) { + this.data = data; + } + + void update(GaugeData update) { + this.data = data.combine(update); + } + + @Override + public GaugeData getValue() { + return data; + } + } + + /** + * Flink {@link Accumulator} for {@link GaugeData}. + */ + public static class FlinkDistributionDataAccumulator implements + Accumulator { + + private static final long serialVersionUID = 1L; + + private DistributionData data; + + public FlinkDistributionDataAccumulator(DistributionData data) { + this.data = data; + } + + @Override + public void add(DistributionData value) { + if (data == null) { + this.data = value; + } else { + this.data = this.data.combine(value); + } + } + + @Override + public DistributionData getLocalValue() { + return data; + } + + @Override + public void resetLocal() { + data = null; + } + + @Override + public void merge(Accumulator other) { + data = data.combine(other.getLocalValue()); + } + + @Override + public Accumulator clone() { + try { + super.clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeException(e); + } + + return new FlinkDistributionDataAccumulator( + DistributionData.create(data.sum(), data.count(), data.min(), data.max())); + } + } + + /** + * Flink {@link Accumulator} for {@link GaugeData}. + */ + public static class FlinkGaugeAccumulator implements Accumulator { + + private GaugeData data; + + public FlinkGaugeAccumulator(GaugeData data) { + this.data = data; + } + + @Override + public void add(GaugeData value) { + if (data == null) { + this.data = value; + } else { + this.data = this.data.combine(value); + } + } + + @Override + public GaugeData getLocalValue() { + return data; + } + + @Override + public void resetLocal() { + this.data = null; + } + + @Override + public void merge(Accumulator other) { + data = data.combine(other.getLocalValue()); + } + + @Override + public Accumulator clone() { + try { + super.clone(); + } catch (CloneNotSupportedException e) { + throw new RuntimeException(e); + } + + return new FlinkGaugeAccumulator( + GaugeData.create(data.value())); + } + } + +} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java new file mode 100644 index 000000000000..263a68e80fe0 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/FlinkMetricResults.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.metrics; + + +import static org.apache.beam.runners.flink.metrics.FlinkMetricContainer.COUNTER_PREFIX; +import static org.apache.beam.runners.flink.metrics.FlinkMetricContainer.DISTRIBUTION_PREFIX; +import static org.apache.beam.runners.flink.metrics.FlinkMetricContainer.GAUGE_PREFIX; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.metrics.DistributionData; +import org.apache.beam.sdk.metrics.DistributionResult; +import org.apache.beam.sdk.metrics.GaugeData; +import org.apache.beam.sdk.metrics.GaugeResult; +import org.apache.beam.sdk.metrics.MetricFiltering; +import org.apache.beam.sdk.metrics.MetricKey; +import org.apache.beam.sdk.metrics.MetricName; +import org.apache.beam.sdk.metrics.MetricQueryResults; +import org.apache.beam.sdk.metrics.MetricResult; +import org.apache.beam.sdk.metrics.MetricResults; +import org.apache.beam.sdk.metrics.MetricsFilter; + +/** + * Implementation of {@link MetricResults} for the Flink Runner. + */ +public class FlinkMetricResults extends MetricResults { + + private Map aggregators; + + public FlinkMetricResults(Map aggregators) { + this.aggregators = aggregators; + } + + @Override + public MetricQueryResults queryMetrics(MetricsFilter filter) { + return new FlinkMetricQueryResults(filter); + } + + private class FlinkMetricQueryResults implements MetricQueryResults { + + private MetricsFilter filter; + + FlinkMetricQueryResults(MetricsFilter filter) { + this.filter = filter; + } + + @Override + public Iterable> counters() { + List> result = new ArrayList<>(); + for (Map.Entry entry : aggregators.entrySet()) { + if (entry.getKey().startsWith(COUNTER_PREFIX)) { + MetricKey metricKey = FlinkMetricContainer.parseMetricKey(entry.getKey()); + if (MetricFiltering.matches(filter, metricKey)) { + result.add(new FlinkMetricResult<>( + metricKey.metricName(), metricKey.stepName(), (Long) entry.getValue())); + } + } + } + return result; + } + + @Override + public Iterable> distributions() { + List> result = new ArrayList<>(); + for (Map.Entry entry : aggregators.entrySet()) { + if (entry.getKey().startsWith(DISTRIBUTION_PREFIX)) { + MetricKey metricKey = FlinkMetricContainer.parseMetricKey(entry.getKey()); + DistributionData data = (DistributionData) entry.getValue(); + if (MetricFiltering.matches(filter, metricKey)) { + result.add(new FlinkMetricResult<>( + metricKey.metricName(), metricKey.stepName(), data.extractResult())); + } + } + } + return result; + } + + @Override + public Iterable> gauges() { + List> result = new ArrayList<>(); + for (Map.Entry entry : aggregators.entrySet()) { + if (entry.getKey().startsWith(GAUGE_PREFIX)) { + MetricKey metricKey = FlinkMetricContainer.parseMetricKey(entry.getKey()); + GaugeData data = (GaugeData) entry.getValue(); + if (MetricFiltering.matches(filter, metricKey)) { + result.add(new FlinkMetricResult<>( + metricKey.metricName(), metricKey.stepName(), data.extractResult())); + } + } + } + return result; + } + + } + + private static class FlinkMetricResult implements MetricResult { + private final MetricName name; + private final String step; + private final T result; + + FlinkMetricResult(MetricName name, String step, T result) { + this.name = name; + this.step = step; + this.result = result; + } + + @Override + public MetricName name() { + return name; + } + + @Override + public String step() { + return step; + } + + @Override + public T committed() { + throw new UnsupportedOperationException("Flink runner does not currently support committed" + + " metrics results. Please use 'attempted' instead."); + } + + @Override + public T attempted() { + return result; + } + } + +} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/package-info.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/package-info.java new file mode 100644 index 000000000000..cfe77e448300 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/package-info.java @@ -0,0 +1,22 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Internal metrics implementation of the Beam runner for Apache Flink. + */ +package org.apache.beam.runners.flink.metrics; diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java index 51582afda7fb..68ac7806f328 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkDoFnFunction.java @@ -21,6 +21,8 @@ import java.util.Map; import org.apache.beam.runners.core.DoFnRunner; import org.apache.beam.runners.core.DoFnRunners; +import org.apache.beam.runners.flink.FlinkPipelineOptions; +import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; @@ -50,6 +52,7 @@ public class FlinkDoFnFunction private final SerializedPipelineOptions serializedOptions; private final DoFn doFn; + private final String stepName; private final Map, WindowingStrategy> sideInputs; private final WindowingStrategy windowingStrategy; @@ -61,6 +64,7 @@ public class FlinkDoFnFunction public FlinkDoFnFunction( DoFn doFn, + String stepName, WindowingStrategy windowingStrategy, Map, WindowingStrategy> sideInputs, PipelineOptions options, @@ -68,6 +72,7 @@ public FlinkDoFnFunction( TupleTag mainOutputTag) { this.doFn = doFn; + this.stepName = stepName; this.sideInputs = sideInputs; this.serializedOptions = new SerializedPipelineOptions(options); this.windowingStrategy = windowingStrategy; @@ -103,6 +108,11 @@ public void mapPartition( new FlinkAggregatorFactory(runtimeContext), windowingStrategy); + if ((serializedOptions.getPipelineOptions().as(FlinkPipelineOptions.class)) + .getEnableMetrics()) { + doFnRunner = new DoFnRunnerWithMetricsUpdate<>(stepName, doFnRunner, getRuntimeContext()); + } + doFnRunner.startBundle(); for (WindowedValue value : values) { diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java index c8193d29c9f1..3e02beeb84a1 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkStatefulDoFnFunction.java @@ -30,6 +30,8 @@ import org.apache.beam.runners.core.StateNamespace; import org.apache.beam.runners.core.StateNamespaces; import org.apache.beam.runners.core.TimerInternals; +import org.apache.beam.runners.flink.FlinkPipelineOptions; +import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.DoFn; @@ -55,6 +57,7 @@ public class FlinkStatefulDoFnFunction extends RichGroupReduceFunction>, WindowedValue> { private final DoFn, OutputT> dofn; + private String stepName; private final WindowingStrategy windowingStrategy; private final Map, WindowingStrategy> sideInputs; private final SerializedPipelineOptions serializedOptions; @@ -64,6 +67,7 @@ public class FlinkStatefulDoFnFunction public FlinkStatefulDoFnFunction( DoFn, OutputT> dofn, + String stepName, WindowingStrategy windowingStrategy, Map, WindowingStrategy> sideInputs, PipelineOptions pipelineOptions, @@ -71,6 +75,7 @@ public FlinkStatefulDoFnFunction( TupleTag mainOutputTag) { this.dofn = dofn; + this.stepName = stepName; this.windowingStrategy = windowingStrategy; this.sideInputs = sideInputs; this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); @@ -129,6 +134,11 @@ public TimerInternals timerInternals() { new FlinkAggregatorFactory(runtimeContext), windowingStrategy); + if ((serializedOptions.getPipelineOptions().as(FlinkPipelineOptions.class)) + .getEnableMetrics()) { + doFnRunner = new DoFnRunnerWithMetricsUpdate<>(stepName, doFnRunner, getRuntimeContext()); + } + doFnRunner.startBundle(); doFnRunner.processElement(currentValue); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java index 8a09286c2cfa..d3d907803819 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java @@ -47,6 +47,8 @@ import org.apache.beam.runners.core.StatefulDoFnRunner; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.TimerInternals.TimerData; +import org.apache.beam.runners.flink.FlinkPipelineOptions; +import org.apache.beam.runners.flink.metrics.DoFnRunnerWithMetricsUpdate; import org.apache.beam.runners.flink.translation.types.CoderTypeSerializer; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.runners.flink.translation.wrappers.SerializableFnAggregatorWrapper; @@ -139,7 +141,9 @@ public class DoFnOperator protected transient FlinkStateInternals stateInternals; - private Coder> inputCoder; + private final String stepName; + + private final Coder> inputCoder; private final Coder keyCoder; @@ -155,6 +159,7 @@ public class DoFnOperator public DoFnOperator( DoFn doFn, + String stepName, Coder> inputCoder, TupleTag mainOutputTag, List> additionalOutputTags, @@ -165,6 +170,7 @@ public DoFnOperator( PipelineOptions options, Coder keyCoder) { this.doFn = doFn; + this.stepName = stepName; this.inputCoder = inputCoder; this.mainOutputTag = mainOutputTag; this.additionalOutputTags = additionalOutputTags; @@ -321,6 +327,11 @@ public Aggregator createAggregatorFor stateCleaner); } + if ((serializedOptions.getPipelineOptions().as(FlinkPipelineOptions.class)) + .getEnableMetrics()) { + doFnRunner = new DoFnRunnerWithMetricsUpdate<>(stepName, doFnRunner, getRuntimeContext()); + } + pushbackDoFnRunner = SimplePushbackSideInputDoFnRunner.create(doFnRunner, sideInputs, sideInputHandler); } diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java index 40f70e402f2d..fb6762d6250d 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/SplittableDoFnOperator.java @@ -59,6 +59,7 @@ public class SplittableDoFnOperator< public SplittableDoFnOperator( DoFn>, FnOutputT> doFn, + String stepName, Coder< WindowedValue< KeyedWorkItem>>> inputCoder, @@ -72,6 +73,7 @@ public SplittableDoFnOperator( Coder keyCoder) { super( doFn, + stepName, inputCoder, mainOutputTag, additionalOutputTags, diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java index 9b2136c3c693..97187340dba4 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/WindowDoFnOperator.java @@ -52,6 +52,7 @@ public class WindowDoFnOperator public WindowDoFnOperator( SystemReduceFn systemReduceFn, + String stepName, Coder>> inputCoder, TupleTag> mainOutputTag, List> additionalOutputTags, @@ -63,6 +64,7 @@ public WindowDoFnOperator( Coder keyCoder) { super( null, + stepName, inputCoder, mainOutputTag, additionalOutputTags, diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java index 06187f63ec77..9bc2c3dd1a06 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/PipelineOptionsTest.java @@ -18,6 +18,7 @@ package org.apache.beam.runners.flink; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; @@ -93,6 +94,13 @@ public void testIgnoredFieldSerialization() { assertNull(deserialized.getStateBackend()); } + @Test + public void testEnableMetrics() { + FlinkPipelineOptions options = PipelineOptionsFactory.as(FlinkPipelineOptions.class); + options.setEnableMetrics(false); + assertFalse(options.getEnableMetrics()); + } + @Test public void testCaching() { PipelineOptions deserializedOptions = @@ -113,6 +121,7 @@ public void testNonNull() { public void parDoBaseClassPipelineOptionsNullTest() { DoFnOperator doFnOperator = new DoFnOperator<>( new TestDoFn(), + "stepName", WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()), new TupleTag("main-output"), Collections.>emptyList(), @@ -133,6 +142,7 @@ public void parDoBaseClassPipelineOptionsSerializationTest() throws Exception { DoFnOperator doFnOperator = new DoFnOperator<>( new TestDoFn(), + "stepName", WindowedValue.getValueOnlyCoder(StringUtf8Coder.of()), new TupleTag("main-output"), Collections.>emptyList(), diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java index 4c826d1e9594..4e18ac2029c9 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/DoFnOperatorTest.java @@ -112,6 +112,7 @@ public void testSingleOutput() throws Exception { DoFnOperator doFnOperator = new DoFnOperator<>( new IdentityDoFn(), + "stepName", windowedValueCoder, outputTag, Collections.>emptyList(), @@ -154,6 +155,7 @@ public void testMultiOutputOutput() throws Exception { DoFnOperator doFnOperator = new DoFnOperator<>( new MultiOutputDoFn(additionalOutput1, additionalOutput2), + "stepName", windowedValueCoder, mainOutput, ImmutableList.>of(additionalOutput1, additionalOutput2), @@ -212,6 +214,7 @@ public void processElement(ProcessContext context) { DoFnOperator> doFnOperator = new DoFnOperator<>( fn, + "stepName", windowedValueCoder, outputTag, Collections.>emptyList(), @@ -325,6 +328,7 @@ public void onTimer(OnTimerContext context, @StateId(stateId) ValueState KV, KV, WindowedValue>> doFnOperator = new DoFnOperator<>( fn, + "stepName", windowedValueCoder, outputTag, Collections.>emptyList(), @@ -420,6 +424,7 @@ public void testSideInputs(boolean keyed) throws Exception { DoFnOperator doFnOperator = new DoFnOperator<>( new IdentityDoFn(), + "stepName", windowedValueCoder, outputTag, Collections.>emptyList(), From d23b0e5810f315d8cae8b5ff08505ef25a64739d Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Wed, 29 Mar 2017 23:35:44 +0800 Subject: [PATCH 2/3] Add IO metrics to Flink runner --- .../flink/FlinkBatchTransformTranslators.java | 3 +- .../FlinkStreamingTransformTranslators.java | 2 + .../flink/metrics/ReaderInvocationUtil.java | 71 +++++++++++++++++++ .../wrappers/SourceInputFormat.java | 20 ++++-- .../streaming/io/BoundedSourceWrapper.java | 17 +++-- .../streaming/io/UnboundedSourceWrapper.java | 18 +++-- .../streaming/UnboundedSourceWrapperTest.java | 12 ++-- 7 files changed, 124 insertions(+), 19 deletions(-) create mode 100644 runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java index 57f677cd6cc7..cb33fc11f57b 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java @@ -136,7 +136,8 @@ public void translateNode(Read.Bounded transform, FlinkBatchTranslationContex DataSource> dataSource = new DataSource<>( context.getExecutionEnvironment(), - new SourceInputFormat<>(source, context.getPipelineOptions()), + new SourceInputFormat<>( + context.getCurrentTransform().getFullName(), source, context.getPipelineOptions()), typeInformation, name); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java index 273023687808..c024493f2885 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java @@ -155,6 +155,7 @@ public void translateNode( try { UnboundedSourceWrapper sourceWrapper = new UnboundedSourceWrapper<>( + context.getCurrentTransform().getFullName(), context.getPipelineOptions(), transform.getSource(), context.getExecutionEnvironment().getParallelism()); @@ -187,6 +188,7 @@ public void translateNode( try { BoundedSourceWrapper sourceWrapper = new BoundedSourceWrapper<>( + context.getCurrentTransform().getFullName(), context.getPipelineOptions(), transform.getSource(), context.getExecutionEnvironment().getParallelism()); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java new file mode 100644 index 000000000000..38263d98f687 --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/metrics/ReaderInvocationUtil.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.metrics; + +import java.io.Closeable; +import java.io.IOException; +import org.apache.beam.runners.flink.FlinkPipelineOptions; +import org.apache.beam.sdk.io.Source; +import org.apache.beam.sdk.metrics.MetricsEnvironment; +import org.apache.beam.sdk.options.PipelineOptions; + +/** + * Util for invoking {@link Source.Reader} methods that might require a + * {@link org.apache.beam.sdk.metrics.MetricsContainer} to be active. + * Source.Reader decorator which registers {@link org.apache.beam.sdk.metrics.MetricsContainer}. + * It update metrics to Flink metric and accumulator in start and advance. + */ +public class ReaderInvocationUtil> { + + private final FlinkMetricContainer container; + private final Boolean enableMetrics; + + public ReaderInvocationUtil( + PipelineOptions options, + FlinkMetricContainer container) { + FlinkPipelineOptions flinkPipelineOptions = options.as(FlinkPipelineOptions.class); + enableMetrics = flinkPipelineOptions.getEnableMetrics(); + this.container = container; + } + + public boolean invokeStart(ReaderT reader) throws IOException { + if (enableMetrics) { + try (Closeable ignored = + MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) { + boolean result = reader.start(); + container.updateMetrics(); + return result; + } + } else { + return reader.start(); + } + + } + public boolean invokeAdvance(ReaderT reader) throws IOException { + if (enableMetrics) { + try (Closeable ignored = + MetricsEnvironment.scopedMetricsContainer(container.getMetricsContainer())) { + boolean result = reader.advance(); + container.updateMetrics(); + return result; + } + } else { + return reader.advance(); + } + } +} diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java index 12be8eb0fa84..f2b81fc173b6 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/SourceInputFormat.java @@ -19,6 +19,8 @@ import java.io.IOException; import java.util.List; +import org.apache.beam.runners.flink.metrics.FlinkMetricContainer; +import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.Source; @@ -28,6 +30,7 @@ import org.apache.beam.sdk.util.WindowedValue; import org.apache.flink.api.common.io.DefaultInputSplitAssigner; import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.io.RichInputFormat; import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplitAssigner; @@ -40,9 +43,10 @@ * Wrapper for executing a {@link Source} as a Flink {@link InputFormat}. */ public class SourceInputFormat - implements InputFormat, SourceInputSplit> { + extends RichInputFormat, SourceInputSplit> { private static final Logger LOG = LoggerFactory.getLogger(SourceInputFormat.class); + private final String stepName; private final BoundedSource initialSource; private transient PipelineOptions options; @@ -51,7 +55,11 @@ public class SourceInputFormat private transient BoundedSource.BoundedReader reader; private boolean inputAvailable = false; - public SourceInputFormat(BoundedSource initialSource, PipelineOptions options) { + private transient ReaderInvocationUtil> readerInvoker; + + public SourceInputFormat( + String stepName, BoundedSource initialSource, PipelineOptions options) { + this.stepName = stepName; this.initialSource = initialSource; this.serializedOptions = new SerializedPipelineOptions(options); } @@ -63,8 +71,12 @@ public void configure(Configuration configuration) { @Override public void open(SourceInputSplit sourceInputSplit) throws IOException { + FlinkMetricContainer metricContainer = new FlinkMetricContainer(stepName, getRuntimeContext()); + readerInvoker = + new ReaderInvocationUtil<>(serializedOptions.getPipelineOptions(), metricContainer); + reader = ((BoundedSource) sourceInputSplit.getSource()).createReader(options); - inputAvailable = reader.start(); + inputAvailable = readerInvoker.invokeStart(reader); } @Override @@ -129,7 +141,7 @@ public WindowedValue nextRecord(WindowedValue t) throws IOException { final T current = reader.getCurrent(); final Instant timestamp = reader.getCurrentTimestamp(); // advance reader to have a record ready next time - inputAvailable = reader.advance(); + inputAvailable = readerInvoker.invokeAdvance(reader); return WindowedValue.of( current, timestamp, diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java index 2ed50249b905..a14268592a5b 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/BoundedSourceWrapper.java @@ -20,6 +20,8 @@ import com.google.common.annotations.VisibleForTesting; import java.util.ArrayList; import java.util.List; +import org.apache.beam.runners.flink.metrics.FlinkMetricContainer; +import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.options.PipelineOptions; @@ -42,6 +44,7 @@ public class BoundedSourceWrapper private static final Logger LOG = LoggerFactory.getLogger(BoundedSourceWrapper.class); + private String stepName; /** * Keep the options so that we can initialize the readers. */ @@ -66,9 +69,11 @@ public class BoundedSourceWrapper @SuppressWarnings("unchecked") public BoundedSourceWrapper( + String stepName, PipelineOptions pipelineOptions, BoundedSource source, int parallelism) throws Exception { + this.stepName = stepName; this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); long desiredBundleSize = source.getEstimatedSizeBytes(pipelineOptions) / parallelism; @@ -99,6 +104,10 @@ public void run(SourceContext> ctx) throws Exception { numSubtasks, localSources); + FlinkMetricContainer metricContainer = new FlinkMetricContainer(stepName, getRuntimeContext()); + ReaderInvocationUtil> readerInvoker = + new ReaderInvocationUtil<>(serializedOptions.getPipelineOptions(), metricContainer); + readers = new ArrayList<>(); // initialize readers from scratch for (BoundedSource source : localSources) { @@ -109,13 +118,13 @@ public void run(SourceContext> ctx) throws Exception { // the easy case, we just read from one reader BoundedSource.BoundedReader reader = readers.get(0); - boolean dataAvailable = reader.start(); + boolean dataAvailable = readerInvoker.invokeStart(reader); if (dataAvailable) { emitElement(ctx, reader); } while (isRunning) { - dataAvailable = reader.advance(); + dataAvailable = readerInvoker.invokeAdvance(reader); if (dataAvailable) { emitElement(ctx, reader); @@ -131,7 +140,7 @@ public void run(SourceContext> ctx) throws Exception { // start each reader and emit data if immediately available for (BoundedSource.BoundedReader reader : readers) { - boolean dataAvailable = reader.start(); + boolean dataAvailable = readerInvoker.invokeStart(reader); if (dataAvailable) { emitElement(ctx, reader); } @@ -142,7 +151,7 @@ public void run(SourceContext> ctx) throws Exception { boolean hadData = false; while (isRunning && !readers.isEmpty()) { BoundedSource.BoundedReader reader = readers.get(currentReader); - boolean dataAvailable = reader.advance(); + boolean dataAvailable = readerInvoker.invokeAdvance(reader); if (dataAvailable) { emitElement(ctx, reader); diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java index bb9b58ad81c3..ee20fd591ace 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java @@ -22,6 +22,8 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; +import org.apache.beam.runners.flink.metrics.FlinkMetricContainer; +import org.apache.beam.runners.flink.metrics.ReaderInvocationUtil; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.coders.Coder; @@ -64,6 +66,7 @@ public class UnboundedSourceWrapper< private static final Logger LOG = LoggerFactory.getLogger(UnboundedSourceWrapper.class); + private final String stepName; /** * Keep the options so that we can initialize the localReaders. */ @@ -131,9 +134,11 @@ public class UnboundedSourceWrapper< @SuppressWarnings("unchecked") public UnboundedSourceWrapper( + String stepName, PipelineOptions pipelineOptions, UnboundedSource source, int parallelism) throws Exception { + this.stepName = stepName; this.serializedOptions = new SerializedPipelineOptions(pipelineOptions); if (source.requiresDeduping()) { @@ -209,6 +214,11 @@ public void run(SourceContext> ctx) throws Exception { context = ctx; + FlinkMetricContainer metricContainer = new FlinkMetricContainer(stepName, getRuntimeContext()); + ReaderInvocationUtil> readerInvoker = + new ReaderInvocationUtil<>(serializedOptions.getPipelineOptions(), metricContainer); + + if (localReaders.size() == 0) { // do nothing, but still look busy ... // also, output a Long.MAX_VALUE watermark since we know that we're not @@ -238,7 +248,7 @@ public void run(SourceContext> ctx) throws Exception { // the easy case, we just read from one reader UnboundedSource.UnboundedReader reader = localReaders.get(0); - boolean dataAvailable = reader.start(); + boolean dataAvailable = readerInvoker.invokeStart(reader); if (dataAvailable) { emitElement(ctx, reader); } @@ -246,7 +256,7 @@ public void run(SourceContext> ctx) throws Exception { setNextWatermarkTimer(this.runtimeContext); while (isRunning) { - dataAvailable = reader.advance(); + dataAvailable = readerInvoker.invokeAdvance(reader); if (dataAvailable) { emitElement(ctx, reader); @@ -263,7 +273,7 @@ public void run(SourceContext> ctx) throws Exception { // start each reader and emit data if immediately available for (UnboundedSource.UnboundedReader reader : localReaders) { - boolean dataAvailable = reader.start(); + boolean dataAvailable = readerInvoker.invokeStart(reader); if (dataAvailable) { emitElement(ctx, reader); } @@ -274,7 +284,7 @@ public void run(SourceContext> ctx) throws Exception { boolean hadData = false; while (isRunning) { UnboundedSource.UnboundedReader reader = localReaders.get(currentReader); - boolean dataAvailable = reader.advance(); + boolean dataAvailable = readerInvoker.invokeAdvance(reader); if (dataAvailable) { emitElement(ctx, reader); diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java index 90f95d6b0468..0cb528a8141e 100644 --- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java +++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/UnboundedSourceWrapperTest.java @@ -111,7 +111,7 @@ public void testReaders() throws Exception { // elements later. TestCountingSource source = new TestCountingSource(numElements); UnboundedSourceWrapper, TestCountingSource.CounterMark> flinkWrapper = - new UnboundedSourceWrapper<>(options, source, numSplits); + new UnboundedSourceWrapper<>("stepName", options, source, numSplits); assertEquals(numSplits, flinkWrapper.getSplitSources().size()); @@ -179,7 +179,7 @@ public void testRestore() throws Exception { // elements later. TestCountingSource source = new TestCountingSource(numElements); UnboundedSourceWrapper, TestCountingSource.CounterMark> flinkWrapper = - new UnboundedSourceWrapper<>(options, source, numSplits); + new UnboundedSourceWrapper<>("stepName", options, source, numSplits); assertEquals(numSplits, flinkWrapper.getSplitSources().size()); @@ -270,7 +270,7 @@ public void close() { TestCountingSource restoredSource = new TestCountingSource(numElements); UnboundedSourceWrapper< KV, TestCountingSource.CounterMark> restoredFlinkWrapper = - new UnboundedSourceWrapper<>(options, restoredSource, numSplits); + new UnboundedSourceWrapper<>("stepName", options, restoredSource, numSplits); assertEquals(numSplits, restoredFlinkWrapper.getSplitSources().size()); @@ -343,7 +343,7 @@ public Coder getCheckpointMarkCoder() { } }; UnboundedSourceWrapper, TestCountingSource.CounterMark> flinkWrapper = - new UnboundedSourceWrapper<>(options, source, numSplits); + new UnboundedSourceWrapper<>("stepName", options, source, numSplits); OperatorStateStore backend = mock(OperatorStateStore.class); @@ -370,7 +370,7 @@ public Coder getCheckpointMarkCoder() { UnboundedSourceWrapper< KV, TestCountingSource.CounterMark> restoredFlinkWrapper = - new UnboundedSourceWrapper<>(options, new TestCountingSource(numElements), + new UnboundedSourceWrapper<>("stepName", options, new TestCountingSource(numElements), numSplits); StreamSource restoredSourceOperator = new StreamSource<>(flinkWrapper); @@ -429,7 +429,7 @@ public void testSerialization() throws Exception { TestCountingSource source = new TestCountingSource(numElements); UnboundedSourceWrapper, TestCountingSource.CounterMark> flinkWrapper = - new UnboundedSourceWrapper<>(options, source, parallelism); + new UnboundedSourceWrapper<>("stepName", options, source, parallelism); InstantiationUtil.serializeObject(flinkWrapper); } From 244dc943dc48e2f419ad40a150e9b1df56e5a4de Mon Sep 17 00:00:00 2001 From: JingsongLi Date: Fri, 21 Apr 2017 09:56:46 +0800 Subject: [PATCH 3/3] Ensure all Read outputs are consumed in Flink runner --- .../beam/runners/flink/FlinkStreamingPipelineTranslator.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java index 0459ef775166..42d75cf9a261 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java @@ -25,6 +25,7 @@ import org.apache.beam.runners.core.construction.PTransformReplacements; import org.apache.beam.runners.core.construction.ReplacementOutputs; import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; +import org.apache.beam.runners.core.construction.UnconsumedReads; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.PTransformOverride; @@ -112,6 +113,8 @@ public void translate(Pipeline pipeline) { flinkRunner))) .build(); + // Ensure all outputs of all reads are consumed. + UnconsumedReads.ensureAllReadsConsumed(pipeline); pipeline.replaceAll(transformOverrides); super.translate(pipeline); }