From 8f54b00f902e3c6a76cc19ebd9d3ccc5d3654a36 Mon Sep 17 00:00:00 2001 From: gallenvara Date: Fri, 16 Oct 2015 16:48:30 +0800 Subject: [PATCH 1/4] [FLINK-2853] [tests] Apply JMH on MutableHashTablePerformanceBenchmark class. --- flink-benchmark/pom.xml | 9 +- .../MutableHashTablePerformanceBenchmark.java | 360 ++++++++++++++++++ flink-runtime/pom.xml | 7 +- pom.xml | 10 + 4 files changed, 384 insertions(+), 2 deletions(-) create mode 100644 flink-benchmark/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java diff --git a/flink-benchmark/pom.xml b/flink-benchmark/pom.xml index b139be5a7ce32..1bd1c1af3eaff 100644 --- a/flink-benchmark/pom.xml +++ b/flink-benchmark/pom.xml @@ -36,7 +36,7 @@ under the License. UTF-8 - 1.4.1 + 1.11 benchmarks @@ -57,6 +57,13 @@ under the License. ${jmh.version} provided + + org.apache.flink + flink-runtime + 0.10-SNAPSHOT + test-jar + test + diff --git a/flink-benchmark/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java b/flink-benchmark/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java new file mode 100644 index 0000000000000..deb98efb11683 --- /dev/null +++ b/flink-benchmark/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java @@ -0,0 +1,360 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.operators.hash; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; + +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memory.MemoryAllocationException; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.testutils.DummyInvokable; +import org.apache.flink.runtime.operators.testutils.types.StringPair; +import org.apache.flink.runtime.operators.testutils.types.StringPairComparator; +import org.apache.flink.runtime.operators.testutils.types.StringPairPairComparator; +import org.apache.flink.runtime.operators.testutils.types.StringPairSerializer; +import org.apache.flink.util.MutableObjectIterator; + +import org.junit.Assert; +import org.openjdk.jmh.annotations.*; +import org.openjdk.jmh.runner.Runner; +import org.openjdk.jmh.runner.options.Options; +import org.openjdk.jmh.runner.options.OptionsBuilder; + +import static org.junit.Assert.fail; + +@State(Scope.Thread) +@BenchmarkMode(Mode.AverageTime) +@OutputTimeUnit(TimeUnit.MILLISECONDS) +public class MutableHashTablePerformanceBenchmark { + + private static final AbstractInvokable MEM_OWNER = new DummyInvokable(); + + private MemoryManager memManager; + private IOManager ioManager; + + private TypeSerializer pairBuildSideAccesssor; + private TypeSerializer pairProbeSideAccesssor; + private TypeComparator pairBuildSideComparator; + private TypeComparator pairProbeSideComparator; + private TypePairComparator pairComparator; + + private static final String COMMENT = "this comments should contains a 96 byte data, 100 plus another integer value and seperator char."; + + + @Setup + public void setup() { + this.pairBuildSideAccesssor = new StringPairSerializer(); + this.pairProbeSideAccesssor = new StringPairSerializer(); + this.pairBuildSideComparator = new StringPairComparator(); + this.pairProbeSideComparator = new StringPairComparator(); + this.pairComparator = new StringPairPairComparator(); + + this.memManager = new MemoryManager(64 * 1024 * 1024, 1); + this.ioManager = new IOManagerAsync(); + } + + @TearDown + public void tearDown() { + // shut down I/O manager and Memory Manager and verify the correct shutdown + this.ioManager.shutdown(); + if (!this.ioManager.isProperlyShutDown()) { + fail("I/O manager was not property shut down."); + } + if (!this.memManager.verifyEmpty()) { + fail("Not all memory was properly released to the memory manager --> Memory Leak."); + } + } + + @Benchmark + public void compareMutableHashTableWithBloomFilter1() throws IOException { + // ----------------------------------------------90% filtered during probe spill phase----------------------------------------- + // create a build input with 1000000 records with key spread between [0 -- 10000000] with step of 10 for nearby records. + int buildSize = 1000000; + int buildStep = 10; + int buildScope = buildStep * buildSize; + // create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records. + int probeSize = 5000000; + int probeStep = 1; + int probeScope = buildSize; + + int expectedResult = 500000; + + this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true); + + System.out.println("HybridHashJoin2:"); + System.out.println("Build input size: " + 100 * buildSize); + System.out.println("Probe input size: " + 100 * probeSize); + System.out.println("Available memory: " + this.memManager.getMemorySize()); + System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent."); + } + + @Benchmark + public void compareMutableHashTableWithoutBloomFilter1() throws IOException { + // ----------------------------------------------90% filtered during probe spill phase----------------------------------------- + // create a build input with 1000000 records with key spread between [0 -- 10000000] with step of 10 for nearby records. + int buildSize = 1000000; + int buildStep = 10; + int buildScope = buildStep * buildSize; + // create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records. + int probeSize = 5000000; + int probeStep = 1; + int probeScope = buildSize; + + int expectedResult = 500000; + + this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false); + + System.out.println("HybridHashJoin2:"); + System.out.println("Build input size: " + 100 * buildSize); + System.out.println("Probe input size: " + 100 * probeSize); + System.out.println("Available memory: " + this.memManager.getMemorySize()); + System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent."); + } + + @Benchmark + public void compareMutableHashTableWithBloomFilter2() throws IOException { + // ----------------------------------------------80% filtered during probe spill phase----------------------------------------- + // create a build input with 1000000 records with key spread between [0 -- 5000000] with step of 5 for nearby records. + int buildSize = 1000000; + int buildStep = 5; + int buildScope = buildStep * buildSize; + // create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records. + int probeSize = 5000000; + int probeStep = 1; + int probeScope = buildSize; + + int expectedResult = 1000000; + + this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true); + + System.out.println("HybridHashJoin3:"); + System.out.println("Build input size: " + 100 * buildSize); + System.out.println("Probe input size: " + 100 * probeSize); + System.out.println("Available memory: " + this.memManager.getMemorySize()); + System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent."); + } + + @Benchmark + public void compareMutableHashTableWithoutBloomFilter2() throws IOException { + // ----------------------------------------------80% filtered during probe spill phase----------------------------------------- + // create a build input with 1000000 records with key spread between [0 -- 5000000] with step of 5 for nearby records. + int buildSize = 1000000; + int buildStep = 5; + int buildScope = buildStep * buildSize; + // create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records. + int probeSize = 5000000; + int probeStep = 1; + int probeScope = buildSize; + + int expectedResult = 1000000; + + this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false); + + System.out.println("HybridHashJoin3:"); + System.out.println("Build input size: " + 100 * buildSize); + System.out.println("Probe input size: " + 100 * probeSize); + System.out.println("Available memory: " + this.memManager.getMemorySize()); + System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent."); + } + + @Benchmark + public void compareMutableHashTableWithBloomFilter3() throws IOException { + // ----------------------------------------------50% filtered during probe spill phase------------------------------------------------- + // create a build input with 1000000 records with key spread between [0 -- 2000000] with step of 2 for nearby records. + int buildSize = 1000000; + int buildStep = 2; + int buildScope = buildStep * buildSize; + // create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records. + int probeSize = 5000000; + int probeStep = 1; + int probeScope = buildSize; + + int expectedResult = 2500000; + + this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true); + + System.out.println("HybridHashJoin4:"); + System.out.println("Build input size: " + 100 * buildSize); + System.out.println("Probe input size: " + 100 * probeSize); + System.out.println("Available memory: " + this.memManager.getMemorySize()); + System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent."); + } + + @Benchmark + public void compareMutableHashTableWithoutBloomFilter3() throws IOException { + // ----------------------------------------------50% filtered during probe spill phase------------------------------------------------- + // create a build input with 1000000 records with key spread between [0 -- 2000000] with step of 2 for nearby records. + int buildSize = 1000000; + int buildStep = 2; + int buildScope = buildStep * buildSize; + // create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records. + int probeSize = 5000000; + int probeStep = 1; + int probeScope = buildSize; + + int expectedResult = 2500000; + + this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false); + + System.out.println("HybridHashJoin4:"); + System.out.println("Build input size: " + 100 * buildSize); + System.out.println("Probe input size: " + 100 * probeSize); + System.out.println("Available memory: " + this.memManager.getMemorySize()); + System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent."); + } + + @Benchmark + public void compareMutableHashTableWithBloomFilter4() throws IOException { + // ----------------------------------------------0% filtered during probe spill phase----------------------------------------- + // create a build input with 1000000 records with key spread between [0 -- 1000000] with step of 1 for nearby records. + int buildSize = 1000000; + int buildStep = 1; + int buildScope = buildStep * buildSize; + // create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records. + int probeSize = 5000000; + int probeStep = 1; + int probeScope = buildSize; + + int expectedResult = probeSize / buildStep; + + this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true); + + System.out.println("HybridHashJoin5:"); + System.out.println("Build input size: " + 100 * buildSize); + System.out.println("Probe input size: " + 100 * probeSize); + System.out.println("Available memory: " + this.memManager.getMemorySize()); + System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent."); + } + + @Benchmark + public void compareMutableHashTableWithoutBloomFilter4() throws IOException { + // ----------------------------------------------0% filtered during probe spill phase----------------------------------------- + // create a build input with 1000000 records with key spread between [0 -- 1000000] with step of 1 for nearby records. + int buildSize = 1000000; + int buildStep = 1; + int buildScope = buildStep * buildSize; + // create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records. + int probeSize = 5000000; + int probeStep = 1; + int probeScope = buildSize; + + int expectedResult = probeSize / buildStep; + + this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false); + + System.out.println("HybridHashJoin5:"); + System.out.println("Build input size: " + 100 * buildSize); + System.out.println("Probe input size: " + 100 * probeSize); + System.out.println("Available memory: " + this.memManager.getMemorySize()); + System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent."); + } + + private long hybridHashJoin(int buildSize, int buildStep, int buildScope, int probeSize, + int probeStep, int probeScope, int expectedResultSize, boolean enableBloomFilter) throws IOException { + + InputIterator buildIterator = new InputIterator(buildSize, buildStep, buildScope); + InputIterator probeIterator = new InputIterator(probeSize, probeStep, probeScope); + + // allocate the memory for the HashTable + List memSegments; + try { + // 33 is minimum number of pages required to perform hash join this inputs + memSegments = this.memManager.allocatePages(MEM_OWNER, (int) (this.memManager.getMemorySize() / this.memManager.getPageSize())); + } catch (MemoryAllocationException maex) { + fail("Memory for the Join could not be provided."); + return -1; + } + + // ---------------------------------------------------------------------------------------- + + final MutableHashTable join = new MutableHashTable( + this.pairBuildSideAccesssor, this.pairProbeSideAccesssor, + this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator, + memSegments, ioManager, enableBloomFilter); + join.open(buildIterator, probeIterator); + + final StringPair recordReuse = new StringPair(); + int numRecordsInJoinResult = 0; + + while (join.nextRecord()) { + MutableHashTable.HashBucketIterator buildSide = join.getBuildSideIterator(); + while (buildSide.next(recordReuse) != null) { + numRecordsInJoinResult++; + } + } + Assert.assertEquals("Wrong number of records in join result.", expectedResultSize, numRecordsInJoinResult); + + join.close(); + // ---------------------------------------------------------------------------------------- + + this.memManager.release(join.getFreedMemory()); + //return cost; + return 1; + } + + + static class InputIterator implements MutableObjectIterator { + + private int numLeft; + private int distance; + private int scope; + + public InputIterator(int size, int distance, int scope) { + this.numLeft = size; + this.distance = distance; + this.scope = scope; + } + + @Override + public StringPair next(StringPair reuse) throws IOException { + if (this.numLeft > 0) { + numLeft--; + int currentKey = (numLeft * distance) % scope; + reuse.setKey(Integer.toString(currentKey)); + reuse.setValue(COMMENT); + return reuse; + } else { + return null; + } + } + + @Override + public StringPair next() throws IOException { + return next(new StringPair()); + } + } + + public static void main(String[] args) throws Exception { + Options opt = new OptionsBuilder() + .include(MutableHashTablePerformanceBenchmark.class.getSimpleName()) + .warmupIterations(2) + .measurementIterations(2) + .forks(1) + .build(); + new Runner(opt).run(); + } +} diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml index 9db82b28aa3ec..89b53e8e5b5cf 100644 --- a/flink-runtime/pom.xml +++ b/flink-runtime/pom.xml @@ -192,7 +192,12 @@ under the License. ${curator.version} test - + + org.apache.flink + flink-runtime + 0.10-SNAPSHOT + + diff --git a/pom.xml b/pom.xml index 00952c780c93d..9ac4e7758177e 100644 --- a/pom.xml +++ b/pom.xml @@ -167,6 +167,16 @@ under the License. jar test + + org.openjdk.jmh + jmh-core + 1.11 + + + org.openjdk.jmh + jmh-generator-annprocess + 1.11 + From 313c0102b677ef2ff753932fb6e7667a8ef3cf36 Mon Sep 17 00:00:00 2001 From: gallenvara Date: Tue, 20 Oct 2015 11:13:38 +0800 Subject: [PATCH 2/4] Modify the pom file and move the benchmark class to flink-benchmark module. --- .../hash/MutableHashTablePerformanceBenchmark.java | 3 ++- flink-runtime/pom.xml | 5 ----- pom.xml | 10 ---------- 3 files changed, 2 insertions(+), 16 deletions(-) rename flink-benchmark/src/test/java/org/apache/flink/{runtime/operators => benchmark/runtime/operates}/hash/MutableHashTablePerformanceBenchmark.java (99%) diff --git a/flink-benchmark/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java b/flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/operates/hash/MutableHashTablePerformanceBenchmark.java similarity index 99% rename from flink-benchmark/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java rename to flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/operates/hash/MutableHashTablePerformanceBenchmark.java index deb98efb11683..4cc8458f910d7 100644 --- a/flink-benchmark/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java +++ b/flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/operates/hash/MutableHashTablePerformanceBenchmark.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.runtime.operators.hash; +package org.apache.flink.benchmark.runtime.operates.hash; import java.io.IOException; import java.util.List; @@ -31,6 +31,7 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memory.MemoryAllocationException; import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.hash.MutableHashTable; import org.apache.flink.runtime.operators.testutils.DummyInvokable; import org.apache.flink.runtime.operators.testutils.types.StringPair; import org.apache.flink.runtime.operators.testutils.types.StringPairComparator; diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml index 89b53e8e5b5cf..3ea5666500299 100644 --- a/flink-runtime/pom.xml +++ b/flink-runtime/pom.xml @@ -192,11 +192,6 @@ under the License. ${curator.version} test - - org.apache.flink - flink-runtime - 0.10-SNAPSHOT - diff --git a/pom.xml b/pom.xml index 9ac4e7758177e..00952c780c93d 100644 --- a/pom.xml +++ b/pom.xml @@ -167,16 +167,6 @@ under the License. jar test - - org.openjdk.jmh - jmh-core - 1.11 - - - org.openjdk.jmh - jmh-generator-annprocess - 1.11 - From ff41f9b0e1222127480752cf49b4f93795ca2314 Mon Sep 17 00:00:00 2001 From: gallenvara Date: Tue, 20 Oct 2015 11:44:28 +0800 Subject: [PATCH 3/4] Modify the pom file and correct the output information. --- flink-benchmark/pom.xml | 2 +- .../MutableHashTablePerformanceBenchmark.java | 16 ++++++++-------- flink-runtime/pom.xml | 2 +- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/flink-benchmark/pom.xml b/flink-benchmark/pom.xml index 1bd1c1af3eaff..560c5904e364e 100644 --- a/flink-benchmark/pom.xml +++ b/flink-benchmark/pom.xml @@ -60,7 +60,7 @@ under the License. org.apache.flink flink-runtime - 0.10-SNAPSHOT + ${project.version} test-jar test diff --git a/flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/operates/hash/MutableHashTablePerformanceBenchmark.java b/flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/operates/hash/MutableHashTablePerformanceBenchmark.java index 4cc8458f910d7..6efa80154966a 100644 --- a/flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/operates/hash/MutableHashTablePerformanceBenchmark.java +++ b/flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/operates/hash/MutableHashTablePerformanceBenchmark.java @@ -106,7 +106,7 @@ public void compareMutableHashTableWithBloomFilter1() throws IOException { this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true); - System.out.println("HybridHashJoin2:"); + System.out.println("HybridHashJoin1:"); System.out.println("Build input size: " + 100 * buildSize); System.out.println("Probe input size: " + 100 * probeSize); System.out.println("Available memory: " + this.memManager.getMemorySize()); @@ -129,7 +129,7 @@ public void compareMutableHashTableWithoutBloomFilter1() throws IOException { this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false); - System.out.println("HybridHashJoin2:"); + System.out.println("HybridHashJoin1:"); System.out.println("Build input size: " + 100 * buildSize); System.out.println("Probe input size: " + 100 * probeSize); System.out.println("Available memory: " + this.memManager.getMemorySize()); @@ -152,7 +152,7 @@ public void compareMutableHashTableWithBloomFilter2() throws IOException { this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true); - System.out.println("HybridHashJoin3:"); + System.out.println("HybridHashJoin2:"); System.out.println("Build input size: " + 100 * buildSize); System.out.println("Probe input size: " + 100 * probeSize); System.out.println("Available memory: " + this.memManager.getMemorySize()); @@ -175,7 +175,7 @@ public void compareMutableHashTableWithoutBloomFilter2() throws IOException { this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false); - System.out.println("HybridHashJoin3:"); + System.out.println("HybridHashJoin2:"); System.out.println("Build input size: " + 100 * buildSize); System.out.println("Probe input size: " + 100 * probeSize); System.out.println("Available memory: " + this.memManager.getMemorySize()); @@ -198,7 +198,7 @@ public void compareMutableHashTableWithBloomFilter3() throws IOException { this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true); - System.out.println("HybridHashJoin4:"); + System.out.println("HybridHashJoin3:"); System.out.println("Build input size: " + 100 * buildSize); System.out.println("Probe input size: " + 100 * probeSize); System.out.println("Available memory: " + this.memManager.getMemorySize()); @@ -221,7 +221,7 @@ public void compareMutableHashTableWithoutBloomFilter3() throws IOException { this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false); - System.out.println("HybridHashJoin4:"); + System.out.println("HybridHashJoin3:"); System.out.println("Build input size: " + 100 * buildSize); System.out.println("Probe input size: " + 100 * probeSize); System.out.println("Available memory: " + this.memManager.getMemorySize()); @@ -244,7 +244,7 @@ public void compareMutableHashTableWithBloomFilter4() throws IOException { this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true); - System.out.println("HybridHashJoin5:"); + System.out.println("HybridHashJoin4:"); System.out.println("Build input size: " + 100 * buildSize); System.out.println("Probe input size: " + 100 * probeSize); System.out.println("Available memory: " + this.memManager.getMemorySize()); @@ -267,7 +267,7 @@ public void compareMutableHashTableWithoutBloomFilter4() throws IOException { this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false); - System.out.println("HybridHashJoin5:"); + System.out.println("HybridHashJoin4:"); System.out.println("Build input size: " + 100 * buildSize); System.out.println("Probe input size: " + 100 * probeSize); System.out.println("Available memory: " + this.memManager.getMemorySize()); diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml index 3ea5666500299..9db82b28aa3ec 100644 --- a/flink-runtime/pom.xml +++ b/flink-runtime/pom.xml @@ -192,7 +192,7 @@ under the License. ${curator.version} test - + From e5e96561cd6bec34b19fcbde54b21eeb213ba821 Mon Sep 17 00:00:00 2001 From: gallenvara Date: Wed, 21 Oct 2015 18:24:49 +0800 Subject: [PATCH 4/4] Add the dependencies to the pom file in flink-benchmark. --- flink-benchmark/pom.xml | 28 +- .../MutableHashTablePerformanceBenchmark.java | 122 ++++---- .../MutableHashTablePerformanceBenchmark.java | 262 ------------------ 3 files changed, 81 insertions(+), 331 deletions(-) rename flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/{operates => operators}/hash/MutableHashTablePerformanceBenchmark.java (96%) delete mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java diff --git a/flink-benchmark/pom.xml b/flink-benchmark/pom.xml index 560c5904e364e..42f1c892930df 100644 --- a/flink-benchmark/pom.xml +++ b/flink-benchmark/pom.xml @@ -57,13 +57,27 @@ under the License. ${jmh.version} provided - - org.apache.flink - flink-runtime - ${project.version} - test-jar - test - + + org.apache.flink + flink-runtime + ${project.version} + test-jar + test + + + org.apache.flink + flink-core + ${project.version} + test-jar + test + + + org.apache.flink + flink-java + ${project.version} + test-jar + test + diff --git a/flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/operates/hash/MutableHashTablePerformanceBenchmark.java b/flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java similarity index 96% rename from flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/operates/hash/MutableHashTablePerformanceBenchmark.java rename to flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java index 6efa80154966a..186c595e76f38 100644 --- a/flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/operates/hash/MutableHashTablePerformanceBenchmark.java +++ b/flink-benchmark/src/test/java/org/apache/flink/benchmark/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.benchmark.runtime.operates.hash; +package org.apache.flink.benchmark.runtime.operators.hash; import java.io.IOException; import java.util.List; @@ -51,21 +51,20 @@ @BenchmarkMode(Mode.AverageTime) @OutputTimeUnit(TimeUnit.MILLISECONDS) public class MutableHashTablePerformanceBenchmark { - + private static final AbstractInvokable MEM_OWNER = new DummyInvokable(); - + private MemoryManager memManager; private IOManager ioManager; - + private TypeSerializer pairBuildSideAccesssor; private TypeSerializer pairProbeSideAccesssor; private TypeComparator pairBuildSideComparator; private TypeComparator pairProbeSideComparator; private TypePairComparator pairComparator; - + private static final String COMMENT = "this comments should contains a 96 byte data, 100 plus another integer value and seperator char."; - - + @Setup public void setup() { this.pairBuildSideAccesssor = new StringPairSerializer(); @@ -73,11 +72,11 @@ public void setup() { this.pairBuildSideComparator = new StringPairComparator(); this.pairProbeSideComparator = new StringPairComparator(); this.pairComparator = new StringPairPairComparator(); - + this.memManager = new MemoryManager(64 * 1024 * 1024, 1); this.ioManager = new IOManagerAsync(); } - + @TearDown public void tearDown() { // shut down I/O manager and Memory Manager and verify the correct shutdown @@ -89,7 +88,7 @@ public void tearDown() { fail("Not all memory was properly released to the memory manager --> Memory Leak."); } } - + @Benchmark public void compareMutableHashTableWithBloomFilter1() throws IOException { // ----------------------------------------------90% filtered during probe spill phase----------------------------------------- @@ -101,18 +100,18 @@ public void compareMutableHashTableWithBloomFilter1() throws IOException { int probeSize = 5000000; int probeStep = 1; int probeScope = buildSize; - + int expectedResult = 500000; - + this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true); - + System.out.println("HybridHashJoin1:"); System.out.println("Build input size: " + 100 * buildSize); System.out.println("Probe input size: " + 100 * probeSize); System.out.println("Available memory: " + this.memManager.getMemorySize()); System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent."); } - + @Benchmark public void compareMutableHashTableWithoutBloomFilter1() throws IOException { // ----------------------------------------------90% filtered during probe spill phase----------------------------------------- @@ -124,18 +123,18 @@ public void compareMutableHashTableWithoutBloomFilter1() throws IOException { int probeSize = 5000000; int probeStep = 1; int probeScope = buildSize; - + int expectedResult = 500000; - + this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false); - + System.out.println("HybridHashJoin1:"); System.out.println("Build input size: " + 100 * buildSize); System.out.println("Probe input size: " + 100 * probeSize); System.out.println("Available memory: " + this.memManager.getMemorySize()); System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent."); } - + @Benchmark public void compareMutableHashTableWithBloomFilter2() throws IOException { // ----------------------------------------------80% filtered during probe spill phase----------------------------------------- @@ -147,18 +146,18 @@ public void compareMutableHashTableWithBloomFilter2() throws IOException { int probeSize = 5000000; int probeStep = 1; int probeScope = buildSize; - + int expectedResult = 1000000; - + this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true); - + System.out.println("HybridHashJoin2:"); System.out.println("Build input size: " + 100 * buildSize); System.out.println("Probe input size: " + 100 * probeSize); System.out.println("Available memory: " + this.memManager.getMemorySize()); System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent."); } - + @Benchmark public void compareMutableHashTableWithoutBloomFilter2() throws IOException { // ----------------------------------------------80% filtered during probe spill phase----------------------------------------- @@ -170,18 +169,18 @@ public void compareMutableHashTableWithoutBloomFilter2() throws IOException { int probeSize = 5000000; int probeStep = 1; int probeScope = buildSize; - + int expectedResult = 1000000; - + this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false); - + System.out.println("HybridHashJoin2:"); System.out.println("Build input size: " + 100 * buildSize); System.out.println("Probe input size: " + 100 * probeSize); System.out.println("Available memory: " + this.memManager.getMemorySize()); System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent."); } - + @Benchmark public void compareMutableHashTableWithBloomFilter3() throws IOException { // ----------------------------------------------50% filtered during probe spill phase------------------------------------------------- @@ -193,18 +192,18 @@ public void compareMutableHashTableWithBloomFilter3() throws IOException { int probeSize = 5000000; int probeStep = 1; int probeScope = buildSize; - + int expectedResult = 2500000; - + this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true); - + System.out.println("HybridHashJoin3:"); System.out.println("Build input size: " + 100 * buildSize); System.out.println("Probe input size: " + 100 * probeSize); System.out.println("Available memory: " + this.memManager.getMemorySize()); System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent."); } - + @Benchmark public void compareMutableHashTableWithoutBloomFilter3() throws IOException { // ----------------------------------------------50% filtered during probe spill phase------------------------------------------------- @@ -216,18 +215,18 @@ public void compareMutableHashTableWithoutBloomFilter3() throws IOException { int probeSize = 5000000; int probeStep = 1; int probeScope = buildSize; - + int expectedResult = 2500000; - + this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false); - + System.out.println("HybridHashJoin3:"); System.out.println("Build input size: " + 100 * buildSize); System.out.println("Probe input size: " + 100 * probeSize); System.out.println("Available memory: " + this.memManager.getMemorySize()); System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent."); } - + @Benchmark public void compareMutableHashTableWithBloomFilter4() throws IOException { // ----------------------------------------------0% filtered during probe spill phase----------------------------------------- @@ -239,18 +238,18 @@ public void compareMutableHashTableWithBloomFilter4() throws IOException { int probeSize = 5000000; int probeStep = 1; int probeScope = buildSize; - + int expectedResult = probeSize / buildStep; - + this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true); - + System.out.println("HybridHashJoin4:"); System.out.println("Build input size: " + 100 * buildSize); System.out.println("Probe input size: " + 100 * probeSize); System.out.println("Available memory: " + this.memManager.getMemorySize()); System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent."); } - + @Benchmark public void compareMutableHashTableWithoutBloomFilter4() throws IOException { // ----------------------------------------------0% filtered during probe spill phase----------------------------------------- @@ -262,24 +261,24 @@ public void compareMutableHashTableWithoutBloomFilter4() throws IOException { int probeSize = 5000000; int probeStep = 1; int probeScope = buildSize; - + int expectedResult = probeSize / buildStep; - + this.hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false); - + System.out.println("HybridHashJoin4:"); System.out.println("Build input size: " + 100 * buildSize); System.out.println("Probe input size: " + 100 * probeSize); System.out.println("Available memory: " + this.memManager.getMemorySize()); System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent."); } - + private long hybridHashJoin(int buildSize, int buildStep, int buildScope, int probeSize, - int probeStep, int probeScope, int expectedResultSize, boolean enableBloomFilter) throws IOException { - + int probeStep, int probeScope, int expectedResultSize, boolean enableBloomFilter) throws IOException { + InputIterator buildIterator = new InputIterator(buildSize, buildStep, buildScope); InputIterator probeIterator = new InputIterator(probeSize, probeStep, probeScope); - + // allocate the memory for the HashTable List memSegments; try { @@ -289,18 +288,18 @@ private long hybridHashJoin(int buildSize, int buildStep, int buildScope, int pr fail("Memory for the Join could not be provided."); return -1; } - + // ---------------------------------------------------------------------------------------- - + final MutableHashTable join = new MutableHashTable( - this.pairBuildSideAccesssor, this.pairProbeSideAccesssor, - this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator, - memSegments, ioManager, enableBloomFilter); + this.pairBuildSideAccesssor, this.pairProbeSideAccesssor, + this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator, + memSegments, ioManager, enableBloomFilter); join.open(buildIterator, probeIterator); - + final StringPair recordReuse = new StringPair(); int numRecordsInJoinResult = 0; - + while (join.nextRecord()) { MutableHashTable.HashBucketIterator buildSide = join.getBuildSideIterator(); while (buildSide.next(recordReuse) != null) { @@ -308,28 +307,27 @@ private long hybridHashJoin(int buildSize, int buildStep, int buildScope, int pr } } Assert.assertEquals("Wrong number of records in join result.", expectedResultSize, numRecordsInJoinResult); - + join.close(); // ---------------------------------------------------------------------------------------- - + this.memManager.release(join.getFreedMemory()); - //return cost; return 1; } - - + + static class InputIterator implements MutableObjectIterator { - + private int numLeft; private int distance; private int scope; - + public InputIterator(int size, int distance, int scope) { this.numLeft = size; this.distance = distance; this.scope = scope; } - + @Override public StringPair next(StringPair reuse) throws IOException { if (this.numLeft > 0) { @@ -342,13 +340,13 @@ public StringPair next(StringPair reuse) throws IOException { return null; } } - + @Override public StringPair next() throws IOException { return next(new StringPair()); } } - + public static void main(String[] args) throws Exception { Options opt = new OptionsBuilder() .include(MutableHashTablePerformanceBenchmark.class.getSimpleName()) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java deleted file mode 100644 index 70c94277b6139..0000000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/MutableHashTablePerformanceBenchmark.java +++ /dev/null @@ -1,262 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.operators.hash; - -import java.io.IOException; -import java.util.List; - -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypePairComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.memory.MemoryAllocationException; -import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.operators.testutils.DummyInvokable; -import org.apache.flink.runtime.operators.testutils.types.StringPair; -import org.apache.flink.runtime.operators.testutils.types.StringPairComparator; -import org.apache.flink.runtime.operators.testutils.types.StringPairPairComparator; -import org.apache.flink.runtime.operators.testutils.types.StringPairSerializer; -import org.apache.flink.util.MutableObjectIterator; - -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -import static org.junit.Assert.fail; - -public class MutableHashTablePerformanceBenchmark { - - private static final AbstractInvokable MEM_OWNER = new DummyInvokable(); - - private MemoryManager memManager; - private IOManager ioManager; - - private TypeSerializer pairBuildSideAccesssor; - private TypeSerializer pairProbeSideAccesssor; - private TypeComparator pairBuildSideComparator; - private TypeComparator pairProbeSideComparator; - private TypePairComparator pairComparator; - - private static final String COMMENT = "this comments should contains a 96 byte data, 100 plus another integer value and seperator char."; - - - @Before - public void setup() { - this.pairBuildSideAccesssor = new StringPairSerializer(); - this.pairProbeSideAccesssor = new StringPairSerializer(); - this.pairBuildSideComparator = new StringPairComparator(); - this.pairProbeSideComparator = new StringPairComparator(); - this.pairComparator = new StringPairPairComparator(); - - this.memManager = new MemoryManager(64 * 1024 * 1024, 1); - this.ioManager = new IOManagerAsync(); - } - - @After - public void tearDown() { - // shut down I/O manager and Memory Manager and verify the correct shutdown - this.ioManager.shutdown(); - if (!this.ioManager.isProperlyShutDown()) { - fail("I/O manager was not property shut down."); - } - if (!this.memManager.verifyEmpty()) { - fail("Not all memory was properly released to the memory manager --> Memory Leak."); - } - } - - @Test - public void compareMutableHashTablePerformance1() throws IOException { - // ----------------------------------------------90% filtered during probe spill phase----------------------------------------- - // create a build input with 1000000 records with key spread between [0 -- 10000000] with step of 10 for nearby records. - int buildSize = 1000000; - int buildStep = 10; - int buildScope = buildStep * buildSize; - // create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records. - int probeSize = 5000000; - int probeStep = 1; - int probeScope = buildSize; - - int expectedResult = 500000; - - long withBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true); - long withoutBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false); - - System.out.println("HybridHashJoin2:"); - System.out.println("Build input size: " + 100 * buildSize); - System.out.println("Probe input size: " + 100 * probeSize); - System.out.println("Available memory: " + this.memManager.getMemorySize()); - System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent."); - System.out.println(String.format("Cost: without bloom filter(%d), with bloom filter(%d)", withoutBloomFilterCost, withBloomFilterCost)); - } - - @Test - public void compareMutableHashTablePerformance2() throws IOException { - // ----------------------------------------------80% filtered during probe spill phase----------------------------------------- - // create a build input with 1000000 records with key spread between [0 -- 5000000] with step of 5 for nearby records. - int buildSize = 1000000; - int buildStep = 5; - int buildScope = buildStep * buildSize; - // create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records. - int probeSize = 5000000; - int probeStep = 1; - int probeScope = buildSize; - - int expectedResult = 1000000; - - long withBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true); - long withoutBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false); - - System.out.println("HybridHashJoin3:"); - System.out.println("Build input size: " + 100 * buildSize); - System.out.println("Probe input size: " + 100 * probeSize); - System.out.println("Available memory: " + this.memManager.getMemorySize()); - System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent."); - System.out.println(String.format("Cost: without bloom filter(%d), with bloom filter(%d)", withoutBloomFilterCost, withBloomFilterCost)); - } - - @Test - public void compareMutableHashTablePerformance3() throws IOException { - // ----------------------------------------------50% filtered during probe spill phase------------------------------------------------- - // create a build input with 1000000 records with key spread between [0 -- 2000000] with step of 2 for nearby records. - int buildSize = 1000000; - int buildStep = 2; - int buildScope = buildStep * buildSize; - // create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records. - int probeSize = 5000000; - int probeStep = 1; - int probeScope = buildSize; - - int expectedResult = 2500000; - - long withoutBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false); - long withBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true); - - System.out.println("HybridHashJoin4:"); - System.out.println("Build input size: " + 100 * buildSize); - System.out.println("Probe input size: " + 100 * probeSize); - System.out.println("Available memory: " + this.memManager.getMemorySize()); - System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent."); - System.out.println(String.format("Cost: without bloom filter(%d), with bloom filter(%d)", withoutBloomFilterCost, withBloomFilterCost)); - } - - @Test - public void compareMutableHashTablePerformance4() throws IOException { - // ----------------------------------------------0% filtered during probe spill phase----------------------------------------- - // create a build input with 1000000 records with key spread between [0 -- 1000000] with step of 1 for nearby records. - int buildSize = 1000000; - int buildStep = 1; - int buildScope = buildStep * buildSize; - // create a probe input with 5000000 records with key spread between [0 -- 1000000] with distance of 1 for nearby records. - int probeSize = 5000000; - int probeStep = 1; - int probeScope = buildSize; - - int expectedResult = probeSize / buildStep; - - long withBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, true); - long withoutBloomFilterCost = hybridHashJoin(buildSize, buildStep, buildScope, probeSize, probeStep, probeScope, expectedResult, false); - - System.out.println("HybridHashJoin5:"); - System.out.println("Build input size: " + 100 * buildSize); - System.out.println("Probe input size: " + 100 * probeSize); - System.out.println("Available memory: " + this.memManager.getMemorySize()); - System.out.println("Probe record be filtered before spill: " + (1 - (double)probeScope / buildScope) * 100 + "% percent."); - System.out.println(String.format("Cost: without bloom filter(%d), with bloom filter(%d)", withoutBloomFilterCost, withBloomFilterCost)); - } - - private long hybridHashJoin(int buildSize, int buildStep, int buildScope, int probeSize, - int probeStep, int probeScope, int expectedResultSize, boolean enableBloomFilter) throws IOException { - - InputIterator buildIterator = new InputIterator(buildSize, buildStep, buildScope); - InputIterator probeIterator = new InputIterator(probeSize, probeStep, probeScope); - - // allocate the memory for the HashTable - List memSegments; - try { - // 33 is minimum number of pages required to perform hash join this inputs - memSegments = this.memManager.allocatePages(MEM_OWNER, (int) (this.memManager.getMemorySize() / this.memManager.getPageSize())); - } catch (MemoryAllocationException maex) { - fail("Memory for the Join could not be provided."); - return -1; - } - - // ---------------------------------------------------------------------------------------- - - long start = System.currentTimeMillis(); - final MutableHashTable join = new MutableHashTable( - this.pairBuildSideAccesssor, this.pairProbeSideAccesssor, - this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator, - memSegments, ioManager, enableBloomFilter); - join.open(buildIterator, probeIterator); - - final StringPair recordReuse = new StringPair(); - int numRecordsInJoinResult = 0; - - while (join.nextRecord()) { - MutableHashTable.HashBucketIterator buildSide = join.getBuildSideIterator(); - while (buildSide.next(recordReuse) != null) { - numRecordsInJoinResult++; - } - } - Assert.assertEquals("Wrong number of records in join result.", expectedResultSize, numRecordsInJoinResult); - - join.close(); - long cost = System.currentTimeMillis() - start; - // ---------------------------------------------------------------------------------------- - - this.memManager.release(join.getFreedMemory()); - return cost; - } - - - static class InputIterator implements MutableObjectIterator { - - private int numLeft; - private int distance; - private int scope; - - public InputIterator(int size, int distance, int scope) { - this.numLeft = size; - this.distance = distance; - this.scope = scope; - } - - @Override - public StringPair next(StringPair reuse) throws IOException { - if (this.numLeft > 0) { - numLeft--; - int currentKey = (numLeft * distance) % scope; - reuse.setKey(Integer.toString(currentKey)); - reuse.setValue(COMMENT); - return reuse; - } else { - return null; - } - } - - @Override - public StringPair next() throws IOException { - return next(new StringPair()); - } - } -}