Skip to content

Commit

Permalink
[FLINK-22881] Parameterize InputBenchmark test to use FLIP-27 source
Browse files Browse the repository at this point in the history
This closes dataArtisans#13
  • Loading branch information
dawidwys committed Jun 9, 2021
1 parent bb597ea commit 66b6c7c
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 14 deletions.
31 changes: 17 additions & 14 deletions src/main/java/org/apache/flink/benchmark/InputBenchmark.java
Expand Up @@ -18,14 +18,15 @@

package org.apache.flink.benchmark;

import org.apache.flink.benchmark.functions.LongSource;
import org.apache.flink.benchmark.functions.LongSourceType;
import org.apache.flink.benchmark.functions.MultiplyByTwo;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;

import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.OperationsPerInvocation;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
Expand All @@ -34,31 +35,33 @@

@OperationsPerInvocation(value = InputBenchmark.RECORDS_PER_INVOCATION)
public class InputBenchmark extends BenchmarkBase {

public static final int RECORDS_PER_INVOCATION = 15_000_000;
private static final long CHECKPOINT_INTERVAL_MS = 100;

public static void main(String[] args)
throws RunnerException {
throws RunnerException {
Options options = new OptionsBuilder()
.verbosity(VerboseMode.NORMAL)
.include(".*" + InputBenchmark.class.getCanonicalName() + ".*")
.build();
.verbosity(VerboseMode.NORMAL)
.include(".*" + InputBenchmark.class.getCanonicalName() + ".*")
.build();

new Runner(options).run();
}

@Param({"LEGACY", "F27_UNBOUNDED"})
public LongSourceType sourceType;

@Benchmark
public void mapSink(FlinkEnvironmentContext context) throws Exception {

StreamExecutionEnvironment env = context.env;
env.enableCheckpointing(CHECKPOINT_INTERVAL_MS);
env.setParallelism(1);

DataStreamSource<Long> source = env.addSource(new LongSource(RECORDS_PER_INVOCATION));
DataStreamSource<Long> source = sourceType.source(env, RECORDS_PER_INVOCATION);
source
.map(new MultiplyByTwo())
.addSink(new DiscardingSink<>());
.map(new MultiplyByTwo())
.addSink(new DiscardingSink<>());

env.execute();
}
Expand All @@ -70,12 +73,12 @@ public void mapRebalanceMapSink(FlinkEnvironmentContext context) throws Exceptio
env.enableCheckpointing(CHECKPOINT_INTERVAL_MS);
env.setParallelism(1);

DataStreamSource<Long> source = env.addSource(new LongSource(RECORDS_PER_INVOCATION));
DataStreamSource<Long> source = sourceType.source(env, RECORDS_PER_INVOCATION);
source
.map(new MultiplyByTwo())
.rebalance()
.map((Long in) -> in)
.addSink(new DiscardingSink<>());
.map(new MultiplyByTwo())
.rebalance()
.map((Long in) -> in)
.addSink(new DiscardingSink<>());

env.execute();
}
Expand Down
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.benchmark.functions;

import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.api.connector.source.SplitEnumerator;
import org.apache.flink.api.connector.source.SplitEnumeratorContext;
import org.apache.flink.api.connector.source.lib.NumberSequenceSource;
import org.apache.flink.api.connector.source.lib.util.IteratorSourceEnumerator;

import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

/**
* The source should produce same records as {@link LongSource}.
*
* <p>{@link LongSource} generates records from 0 to {@code maxValue} for every parallel instance.
* The original {@link NumberSequenceSource} would split the range 0 to {@code maxValue} between all subtasks.
*/
public class LongNewSource extends NumberSequenceSource {
private final Boundedness boundedness;
private final long maxValue;

public LongNewSource(Boundedness boundedness, long maxValue) {
super(-1, -1); // we do not use the from/to of the underlying source
this.boundedness = boundedness;
this.maxValue = maxValue;
}

@Override
public Boundedness getBoundedness() {
return boundedness;
}

@Override
public SplitEnumerator<NumberSequenceSplit, Collection<NumberSequenceSplit>> createEnumerator(
SplitEnumeratorContext<NumberSequenceSplit> splitEnumeratorContext) {

final List<NumberSequenceSplit> splits =
IntStream.range(0, splitEnumeratorContext.currentParallelism())
.mapToObj(
id -> new NumberSequenceSplit(String.valueOf(id), 0, maxValue)
)
.collect(Collectors.toList());
return new IteratorSourceEnumerator<>(splitEnumeratorContext, splits);
}
}
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.benchmark.functions;

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

import java.util.function.BiFunction;

/**
* Enum based factory for different Long sources.
*/
public enum LongSourceType {
LEGACY((env, maxValue) -> {
return env.addSource(new LongSource(maxValue));
}),
F27_BOUNDED((env, maxValue) -> {
return env.fromSource(
new LongNewSource(Boundedness.BOUNDED, maxValue),
WatermarkStrategy.noWatermarks(),
"NewLongSource");
}),
F27_UNBOUNDED((env, maxValue) -> {
return env.fromSource(
new LongNewSource(Boundedness.CONTINUOUS_UNBOUNDED, maxValue),
WatermarkStrategy.noWatermarks(),
"NewLongSource");
});
private final BiFunction<StreamExecutionEnvironment, Long, DataStreamSource<Long>> factory;

LongSourceType(BiFunction<StreamExecutionEnvironment, Long, DataStreamSource<Long>> factory) {
this.factory = factory;
}

public DataStreamSource<Long> source(StreamExecutionEnvironment environment, long maxValue) {
return factory.apply(environment, maxValue);
}
};

0 comments on commit 66b6c7c

Please sign in to comment.