From 66b6c7c90bfa6c994b9d76fa286b4ea08f69d8cb Mon Sep 17 00:00:00 2001 From: Dawid Wysakowicz Date: Tue, 8 Jun 2021 14:45:04 +0200 Subject: [PATCH] [FLINK-22881] Parameterize InputBenchmark test to use FLIP-27 source This closes #13 --- .../flink/benchmark/InputBenchmark.java | 31 +++++---- .../benchmark/functions/LongNewSource.java | 65 +++++++++++++++++++ .../benchmark/functions/LongSourceType.java | 56 ++++++++++++++++ 3 files changed, 138 insertions(+), 14 deletions(-) create mode 100644 src/main/java/org/apache/flink/benchmark/functions/LongNewSource.java create mode 100644 src/main/java/org/apache/flink/benchmark/functions/LongSourceType.java diff --git a/src/main/java/org/apache/flink/benchmark/InputBenchmark.java b/src/main/java/org/apache/flink/benchmark/InputBenchmark.java index 0d70cc1..d35d592 100644 --- a/src/main/java/org/apache/flink/benchmark/InputBenchmark.java +++ b/src/main/java/org/apache/flink/benchmark/InputBenchmark.java @@ -18,7 +18,7 @@ package org.apache.flink.benchmark; -import org.apache.flink.benchmark.functions.LongSource; +import org.apache.flink.benchmark.functions.LongSourceType; import org.apache.flink.benchmark.functions.MultiplyByTwo; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -26,6 +26,7 @@ import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.OperationsPerInvocation; +import org.openjdk.jmh.annotations.Param; import org.openjdk.jmh.runner.Runner; import org.openjdk.jmh.runner.RunnerException; import org.openjdk.jmh.runner.options.Options; @@ -34,20 +35,22 @@ @OperationsPerInvocation(value = InputBenchmark.RECORDS_PER_INVOCATION) public class InputBenchmark extends BenchmarkBase { - public static final int RECORDS_PER_INVOCATION = 15_000_000; private static final long CHECKPOINT_INTERVAL_MS = 100; public static void main(String[] args) - throws RunnerException { + throws RunnerException { Options options = new OptionsBuilder() - .verbosity(VerboseMode.NORMAL) - .include(".*" + InputBenchmark.class.getCanonicalName() + ".*") - .build(); + .verbosity(VerboseMode.NORMAL) + .include(".*" + InputBenchmark.class.getCanonicalName() + ".*") + .build(); new Runner(options).run(); } + @Param({"LEGACY", "F27_UNBOUNDED"}) + public LongSourceType sourceType; + @Benchmark public void mapSink(FlinkEnvironmentContext context) throws Exception { @@ -55,10 +58,10 @@ public void mapSink(FlinkEnvironmentContext context) throws Exception { env.enableCheckpointing(CHECKPOINT_INTERVAL_MS); env.setParallelism(1); - DataStreamSource source = env.addSource(new LongSource(RECORDS_PER_INVOCATION)); + DataStreamSource source = sourceType.source(env, RECORDS_PER_INVOCATION); source - .map(new MultiplyByTwo()) - .addSink(new DiscardingSink<>()); + .map(new MultiplyByTwo()) + .addSink(new DiscardingSink<>()); env.execute(); } @@ -70,12 +73,12 @@ public void mapRebalanceMapSink(FlinkEnvironmentContext context) throws Exceptio env.enableCheckpointing(CHECKPOINT_INTERVAL_MS); env.setParallelism(1); - DataStreamSource source = env.addSource(new LongSource(RECORDS_PER_INVOCATION)); + DataStreamSource source = sourceType.source(env, RECORDS_PER_INVOCATION); source - .map(new MultiplyByTwo()) - .rebalance() - .map((Long in) -> in) - .addSink(new DiscardingSink<>()); + .map(new MultiplyByTwo()) + .rebalance() + .map((Long in) -> in) + .addSink(new DiscardingSink<>()); env.execute(); } diff --git a/src/main/java/org/apache/flink/benchmark/functions/LongNewSource.java b/src/main/java/org/apache/flink/benchmark/functions/LongNewSource.java new file mode 100644 index 0000000..5066847 --- /dev/null +++ b/src/main/java/org/apache/flink/benchmark/functions/LongNewSource.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.benchmark.functions; + +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.api.connector.source.SplitEnumerator; +import org.apache.flink.api.connector.source.SplitEnumeratorContext; +import org.apache.flink.api.connector.source.lib.NumberSequenceSource; +import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator; + +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +/** + * The source should produce same records as {@link LongSource}. + * + *

{@link LongSource} generates records from 0 to {@code maxValue} for every parallel instance. + * The original {@link NumberSequenceSource} would split the range 0 to {@code maxValue} between all subtasks. + */ +public class LongNewSource extends NumberSequenceSource { + private final Boundedness boundedness; + private final long maxValue; + + public LongNewSource(Boundedness boundedness, long maxValue) { + super(-1, -1); // we do not use the from/to of the underlying source + this.boundedness = boundedness; + this.maxValue = maxValue; + } + + @Override + public Boundedness getBoundedness() { + return boundedness; + } + + @Override + public SplitEnumerator> createEnumerator( + SplitEnumeratorContext splitEnumeratorContext) { + + final List splits = + IntStream.range(0, splitEnumeratorContext.currentParallelism()) + .mapToObj( + id -> new NumberSequenceSplit(String.valueOf(id), 0, maxValue) + ) + .collect(Collectors.toList()); + return new IteratorSourceEnumerator<>(splitEnumeratorContext, splits); + } +} diff --git a/src/main/java/org/apache/flink/benchmark/functions/LongSourceType.java b/src/main/java/org/apache/flink/benchmark/functions/LongSourceType.java new file mode 100644 index 0000000..2a33a19 --- /dev/null +++ b/src/main/java/org/apache/flink/benchmark/functions/LongSourceType.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.benchmark.functions; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.connector.source.Boundedness; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; + +import java.util.function.BiFunction; + +/** + * Enum based factory for different Long sources. + */ +public enum LongSourceType { + LEGACY((env, maxValue) -> { + return env.addSource(new LongSource(maxValue)); + }), + F27_BOUNDED((env, maxValue) -> { + return env.fromSource( + new LongNewSource(Boundedness.BOUNDED, maxValue), + WatermarkStrategy.noWatermarks(), + "NewLongSource"); + }), + F27_UNBOUNDED((env, maxValue) -> { + return env.fromSource( + new LongNewSource(Boundedness.CONTINUOUS_UNBOUNDED, maxValue), + WatermarkStrategy.noWatermarks(), + "NewLongSource"); + }); + private final BiFunction> factory; + + LongSourceType(BiFunction> factory) { + this.factory = factory; + } + + public DataStreamSource source(StreamExecutionEnvironment environment, long maxValue) { + return factory.apply(environment, maxValue); + } +};