diff --git a/build.gradle b/build.gradle index 56cecbe7ae67..fe1655b8ca7b 100644 --- a/build.gradle +++ b/build.gradle @@ -2537,6 +2537,8 @@ project(':jmh-benchmarks') { implementation project(':storage') implementation project(':streams') implementation project(':core') + implementation project(':connect:api') + implementation project(':connect:transforms') implementation project(':clients').sourceSets.test.output implementation project(':core').sourceSets.test.output diff --git a/checkstyle/import-control-jmh-benchmarks.xml b/checkstyle/import-control-jmh-benchmarks.xml index 4cbd34f89cbf..1160e3f67dfd 100644 --- a/checkstyle/import-control-jmh-benchmarks.xml +++ b/checkstyle/import-control-jmh-benchmarks.xml @@ -54,6 +54,7 @@ + diff --git a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java index fb02577e99ac..a70acb60a58a 100644 --- a/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java +++ b/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/ReplaceField.java @@ -33,8 +33,10 @@ import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import static org.apache.kafka.connect.transforms.util.Requirements.requireMap; import static org.apache.kafka.connect.transforms.util.Requirements.requireStruct; @@ -80,8 +82,8 @@ public String toString() { private static final String PURPOSE = "field replacement"; - private List exclude; - private List include; + private Set exclude; + private Set include; private Map renames; private Map reverseRenames; @@ -94,8 +96,8 @@ public void configure(Map configs) { {ConfigName.EXCLUDE, "blacklist"}, })); - exclude = config.getList(ConfigName.EXCLUDE); - include = config.getList(ConfigName.INCLUDE); + exclude = new HashSet<>(config.getList(ConfigName.EXCLUDE)); + include = new HashSet<>(config.getList(ConfigName.INCLUDE)); renames = parseRenameMappings(config.getList(ConfigName.RENAME)); reverseRenames = invert(renames); diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/connect/ReplaceFieldBenchmark.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/connect/ReplaceFieldBenchmark.java new file mode 100644 index 000000000000..808441615066 --- /dev/null +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/connect/ReplaceFieldBenchmark.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.jmh.connect; + +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.transforms.ReplaceField; +import org.openjdk.jmh.annotations.Benchmark; +import org.openjdk.jmh.annotations.BenchmarkMode; +import org.openjdk.jmh.annotations.Fork; +import org.openjdk.jmh.annotations.Level; +import org.openjdk.jmh.annotations.Measurement; +import org.openjdk.jmh.annotations.Mode; +import org.openjdk.jmh.annotations.OutputTimeUnit; +import org.openjdk.jmh.annotations.Param; +import org.openjdk.jmh.annotations.Scope; +import org.openjdk.jmh.annotations.Setup; +import org.openjdk.jmh.annotations.State; +import org.openjdk.jmh.annotations.Warmup; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + + +/** + * This benchmark tests the performance of the {@link ReplaceField} {@link org.apache.kafka.connect.transforms.Transformation SMT} + * when configured with a large number of include and exclude fields and applied on a {@link SourceRecord} containing a similarly + * large number of fields. + */ +@State(Scope.Benchmark) +@Fork(value = 1) +@Warmup(iterations = 3) +@Measurement(iterations = 5) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.NANOSECONDS) +public class ReplaceFieldBenchmark { + + @Param({"100", "1000", "10000"}) + private int valueFieldCount; + @Param({"1", "100", "10000"}) + private int includeExcludeFieldCount; + private ReplaceField replaceFieldSmt; + private SourceRecord record; + + @Setup(Level.Trial) + public void setup() { + this.replaceFieldSmt = new ReplaceField.Value<>(); + Map replaceFieldConfigs = new HashMap<>(); + replaceFieldConfigs.put("exclude", + IntStream.range(0, 2 * includeExcludeFieldCount).filter(x -> (x & 1) == 0).mapToObj(x -> "Field-" + x).collect(Collectors.joining(","))); + replaceFieldConfigs.put("include", + IntStream.range(0, 2 * includeExcludeFieldCount).filter(x -> (x & 1) == 1).mapToObj(x -> "Field-" + x).collect(Collectors.joining(","))); + replaceFieldSmt.configure(replaceFieldConfigs); + + Map value = new HashMap<>(); + IntStream.range(0, valueFieldCount).forEach(x -> value.put("Field-" + x, new Object())); + this.record = new SourceRecord(null, null, null, null, null, value); + } + + @Benchmark + public void includeExcludeReplaceFieldBenchmark() { + replaceFieldSmt.apply(record); + } +}