Skip to content
This repository has been archived by the owner on Aug 3, 2020. It is now read-only.

[FLINK-11986] [state backend, tests] Add micro benchmark for state operations #13

Merged
merged 2 commits into from
Apr 30, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,32 @@ This repository contains sets of micro benchmarks designed to run on single mach
their changes.

The main methods defined in the various classes (test cases) are using [jmh](http://openjdk.java.net/projects/code-tools/jmh/) micro
benchmark suite to define runners to execute those test cases. You can either execute
whole benchmark suite (which takes ~1hour) at once:
benchmark suite to define runners to execute those test cases. You can execute the
default benchmark suite (which takes ~1hour) at once:

```
mvn -Dflink.version=1.5.0 clean install exec:exec
```

or if you want to execute just one benchmark, the best approach is to execute selected main function manually.
For example from your IDE (hint there is a plugin for Intellij IDEA). In that case don't forget about selecting
`flink.version`, default value for the property is defined in pom.xml.
There is also a separate benchmark suit for state backend, and you can execute this suit (which takes ~1hour) using
below command:

```
mvn -Dflink.version=1.5.0 clean package exec:exec \
-Dexec.executable=java -Dexec.args=-jar target/benchmarks.jar -rf csv org.apache.flink.state.benchmark.*
```

If you want to execute just one benchmark, the best approach is to execute selected main function manually.
There're mainly two ways:

1. From your IDE (hint there is a plugin for Intellij IDEA).
* In this case don't forget about selecting `flink.version`, default value for the property is defined in pom.xml.

2. From command line, using command like:
```
mvn -Dflink.version=1.5.0 clean package exec:exec \
-Dexec.executable=java -Dexec.args=-jar target/benchmarks.jar <benchmark_class>
```

## Code structure

Expand Down
32 changes: 31 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ under the License.

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.7-SNAPSHOT</flink.version>
<flink.version>1.8-SNAPSHOT</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
Expand Down Expand Up @@ -171,6 +171,36 @@ under the License.
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.0</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<finalName>benchmarks</finalName>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>org.openjdk.jmh.Main</mainClass>
</transformer>
<transformer implementation="org.apache.maven.plugins.shade.resource.AppendingTransformer">
<resource>reference.conf</resource>
</transformer>
<!-- The service transformer is needed to merge META-INF/services files -->
<transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/>
<transformer implementation="org.apache.maven.plugins.shade.resource.ApacheNoticeResourceTransformer">
<projectName>Apache Flink</projectName>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
Expand Down
6 changes: 4 additions & 2 deletions save_jmh_result.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
parser.add_argument('--dry', dest='dry', action='store_true')
parser.add_argument('--codespeed', dest='codespeed', default=DEFAULT_CODESPEED_URL,
help='codespeed url, default: %s' % DEFAULT_CODESPEED_URL)
parser.add_argument('--project', dest='project', default="Flink")
parser.add_argument('--exec', dest='executable', default="Flink")

def readData(args):
results = []
Expand Down Expand Up @@ -58,8 +60,8 @@ def readData(args):
results.append({
'commitid': args.commit,
'branch': args.branch,
'project': 'Flink',
'executable': 'Flink',
'project': args.project,
'executable': args.executable,
'benchmark': name,
'environment': args.environment,
'lessisbetter': False,
Expand Down
25 changes: 25 additions & 0 deletions src/main/java/org/apache/flink/state/benchmark/BackendType.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.state.benchmark;

/**
* Enum of backend type.
*/
public enum BackendType {
HEAP, ROCKSDB
}
124 changes: 124 additions & 0 deletions src/main/java/org/apache/flink/state/benchmark/BackendUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.state.benchmark;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.contrib.streaming.state.PredefinedOptions;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.LocalRecoveryDirectoryProviderImpl;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackendBuilder;
import org.apache.flink.runtime.state.heap.HeapPriorityQueueSetFactory;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.IOUtils;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;

import java.io.File;
import java.io.IOException;
import java.util.Collections;

import static org.apache.flink.state.benchmark.StateBenchmarkConstants.dbDirName;
import static org.apache.flink.state.benchmark.StateBenchmarkConstants.recoveryDirName;
import static org.apache.flink.state.benchmark.StateBenchmarkConstants.rootDirName;

/**
* Utils to create keyed state backend.
*/
public class BackendUtils {
static RocksDBKeyedStateBackend<Long> createRocksDBKeyedStateBackend() throws IOException {
File rootDir = prepareDirectory(rootDirName, null);
File recoveryBaseDir = prepareDirectory(recoveryDirName, rootDir);
File dbPathFile = prepareDirectory(dbDirName, rootDir);
DBOptions dbOptions = new DBOptions().setCreateIfMissing(true);
ColumnFamilyOptions columnOptions = new ColumnFamilyOptions();
ExecutionConfig executionConfig = new ExecutionConfig();
RocksDBKeyedStateBackendBuilder<Long> builder = new RocksDBKeyedStateBackendBuilder<>(
"Test",
Thread.currentThread().getContextClassLoader(),
dbPathFile,
dbOptions,
stateName -> PredefinedOptions.DEFAULT.createColumnOptions(),
null,
LongSerializer.INSTANCE,
2,
new KeyGroupRange(0, 1),
executionConfig,
new LocalRecoveryConfig(false, new LocalRecoveryDirectoryProviderImpl(recoveryBaseDir, new JobID(), new JobVertexID(), 0)),
RocksDBStateBackend.PriorityQueueStateType.ROCKSDB,
TtlTimeProvider.DEFAULT,
new UnregisteredMetricsGroup(),
Collections.emptyList(),
AbstractStateBackend.getCompressionDecorator(executionConfig),
new CloseableRegistry());
try {
return builder.build();
} catch (Exception e) {
IOUtils.closeQuietly(columnOptions);
IOUtils.closeQuietly(dbOptions);
throw e;
}
}

static HeapKeyedStateBackend<Long> createHeapKeyedStateBackend() throws IOException {
File rootDir = prepareDirectory(rootDirName, null);
File recoveryBaseDir = prepareDirectory(recoveryDirName, rootDir);
KeyGroupRange keyGroupRange = new KeyGroupRange(0, 1);
int numberOfKeyGroups = keyGroupRange.getNumberOfKeyGroups();
ExecutionConfig executionConfig = new ExecutionConfig();
HeapPriorityQueueSetFactory priorityQueueSetFactory =
new HeapPriorityQueueSetFactory(keyGroupRange, numberOfKeyGroups, 128);
HeapKeyedStateBackendBuilder<Long> backendBuilder = new HeapKeyedStateBackendBuilder<>(
null,
new LongSerializer(),
Thread.currentThread().getContextClassLoader(),
numberOfKeyGroups,
keyGroupRange,
executionConfig,
TtlTimeProvider.DEFAULT,
Collections.emptyList(),
AbstractStateBackend.getCompressionDecorator(executionConfig),
new LocalRecoveryConfig(false, new LocalRecoveryDirectoryProviderImpl(recoveryBaseDir, new JobID(), new JobVertexID(), 0)),
priorityQueueSetFactory,
false,
new CloseableRegistry()
);
return backendBuilder.build();
}

private static File prepareDirectory(String prefix, File parentDir) throws IOException {
File target = File.createTempFile(prefix, "", parentDir);
if (target.exists() && !target.delete()) {
throw new IOException("Target dir {" + target.getAbsolutePath() + "} exists but failed to clean it up");
} else if (!target.mkdirs()) {
throw new IOException("Failed to create target directory: " + target.getAbsolutePath());
}
return target;
}
}
107 changes: 107 additions & 0 deletions src/main/java/org/apache/flink/state/benchmark/ListStateBenchmark.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.state.benchmark;

import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.infra.Blackhole;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import org.openjdk.jmh.runner.options.VerboseMode;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.flink.state.benchmark.StateBenchmarkConstants.listValueCount;
import static org.apache.flink.state.benchmark.StateBenchmarkConstants.setupKeyCount;

/**
* Implementation for list state benchmark testing.
*/
public class ListStateBenchmark extends StateBenchmarkBase {
private ListState<Long> listState;
private List<Long> dummyLists;

public static void main(String[] args) throws RunnerException {
Options opt = new OptionsBuilder()
.verbosity(VerboseMode.NORMAL)
.include(".*" + ListStateBenchmark.class.getSimpleName() + ".*")
.build();

new Runner(opt).run();
}

@Setup
public void setUp() throws Exception {
keyedStateBackend = createKeyedStateBackend();
listState = keyedStateBackend.getPartitionedState(
VoidNamespace.INSTANCE,
VoidNamespaceSerializer.INSTANCE,
new ListStateDescriptor<>("listState", Long.class));
dummyLists = new ArrayList<>(listValueCount);
for (int i = 0; i < listValueCount; ++i) {
dummyLists.add(random.nextLong());
}
for (int i = 0; i < setupKeyCount; ++i) {
keyedStateBackend.setCurrentKey((long) i);
listState.add(random.nextLong());
}
keyIndex = new AtomicInteger();
}

@Benchmark
public void listUpdate(KeyValue keyValue) throws Exception {
keyedStateBackend.setCurrentKey(keyValue.setUpKey);
listState.update(keyValue.listValue);
}

@Benchmark
public void listAdd(KeyValue keyValue) throws Exception {
keyedStateBackend.setCurrentKey(keyValue.newKey);
listState.update(keyValue.listValue);
}

@Benchmark
public Iterable<Long> listGet(KeyValue keyValue) throws Exception {
keyedStateBackend.setCurrentKey(keyValue.setUpKey);
return listState.get();
}

@Benchmark
public void listGetAndIterate(KeyValue keyValue, Blackhole bh) throws Exception {
keyedStateBackend.setCurrentKey(keyValue.setUpKey);
Iterable<Long> iterable = listState.get();
for (Long value : iterable) {
bh.consume(value);
}
}

@Benchmark
public void listAddAll(KeyValue keyValue) throws Exception {
keyedStateBackend.setCurrentKey(keyValue.setUpKey);
listState.addAll(dummyLists);
}
}
Loading