Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Experiment with changes on RSM API #437

Draft
wants to merge 5 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,5 @@ build/
rpm/
rpmbuild/
*.sh
# ignore benchmark outputs
io.aiven.kafka.tieredstorage*/
5 changes: 5 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,8 @@ docker_image: build
.PHONY: docker_push
docker_push:
docker push $(IMAGE_TAG)

# Prepare kernel to capture CPU events
async_profiler_cpu_kernel-prep:
sudo sh -c 'echo 1 >/proc/sys/kernel/perf_event_paranoid'
sudo sh -c 'echo 0 >/proc/sys/kernel/kptr_restrict'
121 changes: 121 additions & 0 deletions benchmarks/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
### Benchmarks module

> Borrowed from https://github.com/apache/kafka/blob/trunk/jmh-benchmarks

This module contains benchmarks written using [JMH](https://openjdk.java.net/projects/code-tools/jmh/) from OpenJDK.

### Running benchmarks

If you want to set specific JMH flags or only run certain benchmarks, passing arguments via
gradle tasks is cumbersome. These are simplified by the provided `jmh.sh` script.

The default behavior is to run all benchmarks:

./benchmarks/jmh.sh

Pass a pattern or name after the command to select the benchmarks:

./benchmarks/jmh.sh TransformBench

Check which benchmarks that match the provided pattern:

./benchmarks/jmh.sh -l TransformBench

Run a specific test and override the number of forks, iterations and warm-up iteration to `2`:

./benchmarks/jmh.sh -f 2 -i 2 -wi 2 TransformBench

Run a specific test with async and GC profilers on Linux and flame graph output:

./benchmarks/jmh.sh -prof gc -prof async:libPath=/path/to/libasyncProfiler.so\;output=flamegraph TransformBench

The following sections cover async profiler and GC profilers in more detail.

### Using JMH with async profiler

It's good practice to check profiler output for micro-benchmarks in order to verify that they represent the expected
application behavior and measure what you expect to measure. Some example pitfalls include the use of expensive mocks
or accidental inclusion of test setup code in the benchmarked code. JMH includes
[async-profiler](https://github.com/jvm-profiling-tools/async-profiler) integration that makes this easy:

./benchmarks/jmh.sh -prof async:libPath=/path/to/libasyncProfiler.so

or if having async-profiler on environment variable `export LD_LIBRARY_PATH=/opt/async-profiler-2.9-linux-x64/build/`

./benchmarks/jmh.sh -prof async

With flame graph output (the semicolon is escaped to ensure it is not treated as a command separator):

./benchmarks/jmh.sh -prof async:libPath=/path/to/libasyncProfiler.so\;output=flamegraph

Simultaneous cpu, allocation and lock profiling with async profiler 2.0 and jfr output (the semicolon is
escaped to ensure it is not treated as a command separator):

./benchmarks/jmh.sh -prof async:libPath=/path/to/libasyncProfiler.so\;output=jfr\;alloc\;lock TransformBench

A number of arguments can be passed to configure async profiler, run the following for a description:

./benchmarks/jmh.sh -prof async:help

### Using JMH GC profiler

It's good practice to run your benchmark with `-prof gc` to measure its allocation rate:

./benchmarks/jmh.sh -prof gc

Of particular importance is the `norm` alloc rates, which measure the allocations per operation rather than allocations
per second which can increase when you have make your code faster.

### Running JMH outside gradle

The JMH benchmarks can be run outside gradle as you would with any executable jar file:

java -jar ./benchmarks/build/libs/kafka-benchmarks-*.jar -f2 TransformBench

### Gradle Tasks

If no benchmark mode is specified, the default is used which is throughput. It is assumed that users run
the gradle tasks with `./gradlew` from the root of the Kafka project.

* `benchmarks:shadowJar` - creates the uber jar required to run the benchmarks.

* `benchmarks:jmh` - runs the `clean` and `shadowJar` tasks followed by all the benchmarks.

### JMH Options
Some common JMH options are:

```text

-e <regexp+> Benchmarks to exclude from the run.

-f <int> How many times to fork a single benchmark. Use 0 to
disable forking altogether. Warning: disabling
forking may have detrimental impact on benchmark
and infrastructure reliability, you might want
to use different warmup mode instead.

-i <int> Number of measurement iterations to do. Measurement
iterations are counted towards the benchmark score.
(default: 1 for SingleShotTime, and 5 for all other
modes)

-l List the benchmarks that match a filter, and exit.

-lprof List profilers, and exit.

-o <filename> Redirect human-readable output to a given file.

-prof <profiler> Use profilers to collect additional benchmark data.
Some profilers are not available on all JVMs and/or
all OSes. Please see the list of available profilers
with -lprof.

-v <mode> Verbosity mode. Available modes are: [SILENT, NORMAL,
EXTRA]

-wi <int> Number of warmup iterations to do. Warmup iterations
are not counted towards the benchmark score. (default:
0 for SingleShotTime, and 5 for all other modes)
```

To view all options run jmh with the -h flag.
67 changes: 67 additions & 0 deletions benchmarks/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2021 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

// JMH execution borrowed from https://github.com/apache/kafka/blob/trunk/jmh-benchmarks


apply plugin: 'com.github.johnrengelman.shadow'

shadowJar {
archiveBaseName = 'kafka-ts-benchmarks'
}

ext {
jmhVersion = "1.36"
}

dependencies {
implementation project(':core')
implementation project(':storage:s3')
implementation project(':storage:gcs')
implementation project(':storage:azure')
implementation group: "org.apache.kafka", name: "kafka-storage-api", version: kafkaVersion
implementation group: "org.apache.kafka", name: "kafka-clients", version: kafkaVersion

implementation "org.openjdk.jmh:jmh-core:$jmhVersion"
implementation "org.openjdk.jmh:jmh-core-benchmarks:$jmhVersion"
annotationProcessor "org.openjdk.jmh:jmh-generator-annprocess:$jmhVersion"

implementation "org.slf4j:slf4j-log4j12:1.7.36"
}

jar {
manifest {
attributes "Main-Class": "org.openjdk.jmh.Main"
}
}

tasks.register('jmh', JavaExec) {
dependsOn ':benchmarks:clean'
dependsOn ':benchmarks:shadowJar'

mainClass = "-jar"

doFirst {
if (System.getProperty("jmhArgs")) {
args System.getProperty("jmhArgs").split(' ')
}
args = [shadowJar.getArchiveFile(), *args]
}
}

javadoc {
enabled = false
}
42 changes: 42 additions & 0 deletions benchmarks/jmh.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#!/usr/bin/env bash
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

base_dir=$(dirname $0)
jmh_project_name="benchmarks"

if [ ${base_dir} == "." ]; then
gradlew_dir=".."
elif [ ${base_dir##./} == "${jmh_project_name}" ]; then
gradlew_dir="."
else
echo "JMH Benchmarks need to be run from the root of the kafka repository or the 'benchmarks' directory"
exit
fi

gradleCmd="${gradlew_dir}/gradlew"
libDir="${base_dir}/build/libs"

echo "running gradlew :benchmarks:clean :benchmarks:shadowJar"

$gradleCmd :benchmarks:clean :benchmarks:shadowJar

echo "gradle build done"

echo "running JMH with args: $@"

java -jar ${libDir}/kafka-ts-benchmarks-*.jar "$@"

echo "JMH benchmarks done"
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
/*
* Copyright 2023 Aiven Oy
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.aiven.kafka.tieredstorage.benchs.fetch;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;

import org.apache.kafka.common.TopicIdPartition;
import org.apache.kafka.common.Uuid;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
import org.apache.kafka.server.log.remote.storage.RemoteResourceNotFoundException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageException;
import org.apache.kafka.server.log.remote.storage.RemoteStorageManager.IndexType;

import io.aiven.kafka.tieredstorage.RemoteStorageManager;
import io.aiven.kafka.tieredstorage.chunkmanager.cache.DiskBasedChunkCache;

import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Fork;
import org.openjdk.jmh.annotations.Level;
import org.openjdk.jmh.annotations.Measurement;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.Warmup;
import org.openjdk.jmh.infra.Blackhole;

@State(Scope.Benchmark)
@Fork(value = 1)
@Warmup(iterations = 4, time = 10)
@Measurement(iterations = 16, time = 30)
@BenchmarkMode({Mode.SampleTime})
@OutputTimeUnit(TimeUnit.MILLISECONDS)
public class FetchIndexesBenchmark {

// s3://jeqo-test1/tiered-storage-demo/t1-QW1...-Q/0/00000000000000057362-ABC....indexes
final TopicIdPartition tip =
new TopicIdPartition(Uuid.fromString("QW1-OwYOSt6w-CBTuczz-Q"), 0, "t1");
final RemoteLogSegmentMetadata meta = new RemoteLogSegmentMetadata(
new RemoteLogSegmentId(tip, Uuid.fromString("AYSp9LTtQyqRJF6u5bHdVg")), 57362L, 58302L - 1,
0, 0, 0, 200 * 1024 * 1024, Map.of(0, 0L));

RemoteStorageManager rsm = new RemoteStorageManager();

@Setup(Level.Trial)
public void setup() throws IOException {
final var tmpDir = Files.createTempDirectory("rsm-cache");
final var compression = false;
final var encryption = false;
final var cacheClass = DiskBasedChunkCache.class.getCanonicalName();
// Configure the RSM.
final var cacheDir = tmpDir.resolve("cache");
Files.createDirectories(cacheDir);

final var props = new Properties();
props.load(Files.newInputStream(Path.of("rsm.properties")));
final Map<String, String> config = new HashMap<>();
props.forEach((k, v) -> config.put((String) k, (String) v));
// 4MiB
final int chunkSize = 4 * 1024 * 1024;
config.putAll(Map.of(
"chunk.size", Integer.toString(chunkSize),
"compression.enabled", Boolean.toString(compression),
"encryption.enabled", Boolean.toString(encryption),
"chunk.cache.class", cacheClass,
"chunk.cache.path", cacheDir.toString(),
"chunk.cache.size", Integer.toString(100 * 1024 * 1024),
"custom.metadata.fields.include", "REMOTE_SIZE,OBJECT_PREFIX,OBJECT_KEY"
));

rsm.configure(config);
}

@Benchmark
public void fetchIndexesV1(final Blackhole b) throws RemoteStorageException {
final var offsetindex = rsm.fetchIndex(meta, IndexType.OFFSET);
final var timeindex = rsm.fetchIndex(meta, IndexType.TIMESTAMP);
try {
final var txnindex = rsm.fetchIndex(meta, IndexType.TRANSACTION);
b.consume(offsetindex);
b.consume(timeindex);
b.consume(txnindex);
} catch (final RemoteResourceNotFoundException e) {
b.consume(offsetindex);
b.consume(timeindex);
}
}

@Benchmark
public void fetchIndexesV2(final Blackhole b) throws RemoteStorageException {
final var indexes = rsm.fetchIndexes(
meta,
Set.of(IndexType.OFFSET, IndexType.TIMESTAMP, IndexType.TRANSACTION));
b.consume(indexes);
}


@Benchmark
public void fetchAllIndexesV2(final Blackhole b) throws RemoteStorageException {
final var indexes = rsm.fetchAllIndexes(meta);
b.consume(indexes);
}
}
21 changes: 21 additions & 0 deletions benchmarks/src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
#
# Copyright 2023 Aiven Oy
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

log4j.rootLogger=WARN, stdout

log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
Loading