From 334014d76e24f8424ad494c1a06c5feab65ebe84 Mon Sep 17 00:00:00 2001 From: Alexey Diomin Date: Tue, 10 Jan 2017 22:04:41 +0400 Subject: [PATCH] remove duplicated tests --- .../NonReusingReOpenableHashTableITCase.java | 421 +---------------- .../hash/ReOpenableHashTableITCase.java | 222 +++++++++ .../hash/ReOpenableHashTableTestBase.java | 193 ++++++++ .../ReusingReOpenableHashTableITCase.java | 429 +----------------- 4 files changed, 429 insertions(+), 836 deletions(-) create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableTestBase.java diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java index 576cbd4f53372..6b4e170630f45 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java @@ -19,190 +19,34 @@ package org.apache.flink.runtime.operators.hash; -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypePairComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.core.memory.MemoryType; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.memory.MemoryAllocationException; -import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatch; import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatchRemovingJoin; import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector; -import org.apache.flink.runtime.operators.testutils.DummyInvokable; import org.apache.flink.runtime.operators.testutils.TestData; import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator; -import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode; -import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode; -import org.apache.flink.runtime.operators.testutils.UnionIterator; import org.apache.flink.util.Collector; -import org.apache.flink.util.MutableObjectIterator; -import org.junit.After; import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; -import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import org.apache.flink.api.common.functions.FlatJoinFunction; -import org.apache.flink.api.common.typeutils.GenericPairComparator; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator; -import static org.junit.Assert.fail; +import static org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.joinTuples; +import static org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.collectTupleData; /** * Test specialized hash join that keeps the build side data (in memory and on hard disk) * This is used for iterative tasks. */ -public class NonReusingReOpenableHashTableITCase { +public class NonReusingReOpenableHashTableITCase extends ReOpenableHashTableTestBase { - private static final int PAGE_SIZE = 8 * 1024; - private static final long MEMORY_SIZE = PAGE_SIZE * 1000; // 100 Pages. - - private static final long SEED1 = 561349061987311L; - private static final long SEED2 = 231434613412342L; - - private static final int NUM_PROBES = 3; // number of reopenings of hash join - - private final AbstractInvokable parentTask = new DummyInvokable(); - - private IOManager ioManager; - private MemoryManager memoryManager; - - private TypeSerializer> recordSerializer; - private TypeComparator> record1Comparator; - private TypeComparator> record2Comparator; - private TypePairComparator, Tuple2> recordPairComparator; - - - - - private static final AbstractInvokable MEM_OWNER = new DummyInvokable(); - private TypeSerializer> recordBuildSideAccesssor; - private TypeSerializer> recordProbeSideAccesssor; - private TypeComparator> recordBuildSideComparator; - private TypeComparator> recordProbeSideComparator; - private TypePairComparator, Tuple2> pactRecordComparator; - - @SuppressWarnings({"unchecked", "rawtypes"}) - @Before - public void beforeTest() { - this.recordSerializer = TestData.getIntStringTupleSerializer(); - - this.record1Comparator = TestData.getIntStringTupleComparator(); - this.record2Comparator = TestData.getIntStringTupleComparator(); - this.recordPairComparator = new GenericPairComparator(this.record1Comparator, this.record2Comparator); - - this.recordBuildSideAccesssor = TestData.getIntIntTupleSerializer(); - this.recordProbeSideAccesssor = TestData.getIntIntTupleSerializer(); - this.recordBuildSideComparator = TestData.getIntIntTupleComparator(); - this.recordProbeSideComparator = TestData.getIntIntTupleComparator(); - this.pactRecordComparator = new GenericPairComparator(this.recordBuildSideComparator, this.recordProbeSideComparator); - - this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true); - this.ioManager = new IOManagerAsync(); - } - - @After - public void afterTest() { - if (this.ioManager != null) { - this.ioManager.shutdown(); - if (!this.ioManager.isProperlyShutDown()) { - Assert.fail("I/O manager failed to properly shut down."); - } - this.ioManager = null; - } - - if (this.memoryManager != null) { - Assert.assertTrue("Memory Leak: Not all memory has been returned to the memory manager.", - this.memoryManager.verifyEmpty()); - this.memoryManager.shutdown(); - this.memoryManager = null; - } - } - - - /** - * Test behavior with overflow buckets (Overflow buckets must be initialized correctly - * if the input is reopened again) - */ - @Test - public void testOverflow() { - - int buildSize = 1000; - int probeSize = 1000; - try { - TupleGenerator bgen = new TupleGenerator(SEED1, 200, 1024, KeyMode.RANDOM, ValueMode.FIX_LENGTH); - TupleGenerator pgen = new TupleGenerator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH); - - final TestData.TupleGeneratorIterator buildInput = new TestData.TupleGeneratorIterator(bgen, buildSize); - final TestData.TupleGeneratorIterator probeInput = new TestData.TupleGeneratorIterator(pgen, probeSize); - doTest(buildInput,probeInput, bgen, pgen); - } - catch (Exception e) { - e.printStackTrace(); - Assert.fail("An exception occurred during the test: " + e.getMessage()); - } - } - - /** - * Verify proper operation if the build side is spilled to disk. - */ - @Test - public void testDoubleProbeSpilling() { - - int buildSize = 1000; - int probeSize = 1000; - try { - TupleGenerator bgen = new TupleGenerator(SEED1, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH); - TupleGenerator pgen = new TupleGenerator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH); - - final TestData.TupleGeneratorIterator buildInput = new TestData.TupleGeneratorIterator(bgen, buildSize); - final TestData.TupleGeneratorIterator probeInput = new TestData.TupleGeneratorIterator(pgen, probeSize); - doTest(buildInput,probeInput, bgen, pgen); - } - catch (Exception e) { - e.printStackTrace(); - Assert.fail("An exception occurred during the test: " + e.getMessage()); - } - } - - /** - * This test case verifies that hybrid hash join is able to handle multiple probe phases - * when the build side fits completely into memory. - */ - @Test - public void testDoubleProbeInMemory() { - - int buildSize = 1000; - int probeSize = 1000; - try { - TupleGenerator bgen = new TupleGenerator(SEED1, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH); - TupleGenerator pgen = new TupleGenerator(SEED2, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH); - - final TestData.TupleGeneratorIterator buildInput = new TestData.TupleGeneratorIterator(bgen, buildSize); - final TestData.TupleGeneratorIterator probeInput = new TestData.TupleGeneratorIterator(pgen, probeSize); - - doTest(buildInput,probeInput, bgen, pgen); - } - catch (Exception e) { - e.printStackTrace(); - Assert.fail("An exception occurred during the test: " + e.getMessage()); - } - } - - private void doTest(TestData.TupleGeneratorIterator buildInput, TestData.TupleGeneratorIterator probeInput, TupleGenerator bgen, TupleGenerator pgen) throws Exception { + protected void doTest(TestData.TupleGeneratorIterator buildInput, TestData.TupleGeneratorIterator probeInput, TupleGenerator bgen, TupleGenerator pgen) throws Exception { // collect expected data - final Map> expectedFirstMatchesMap = NonReusingHashJoinIteratorITCase.joinTuples(NonReusingHashJoinIteratorITCase.collectTupleData(buildInput), NonReusingHashJoinIteratorITCase.collectTupleData(probeInput)); + final Map> expectedFirstMatchesMap = joinTuples(collectTupleData(buildInput), collectTupleData(probeInput)); final List>> expectedNMatchesMapList = new ArrayList<>(NUM_PROBES); final FlatJoinFunction[] nMatcher = new TupleMatchRemovingJoin[NUM_PROBES]; @@ -259,260 +103,5 @@ private void doTest(TestData.TupleGeneratorIterator buildInput, TestData.TupleGe iterator.close(); } - // - // - // Tests taken from HahTableITCase! - // - // - - private MutableObjectIterator> getProbeInput(final int numKeys, - final int probeValsPerKey, final int repeatedValue1, final int repeatedValue2) { - MutableObjectIterator> probe1 = new UniformIntTupleGenerator(numKeys, probeValsPerKey, true); - MutableObjectIterator> probe2 = new TestData.ConstantIntIntTuplesIterator(repeatedValue1, 17, 5); - MutableObjectIterator> probe3 = new TestData.ConstantIntIntTuplesIterator(repeatedValue2, 23, 5); - List>> probes = new ArrayList<>(); - probes.add(probe1); - probes.add(probe2); - probes.add(probe3); - return new UnionIterator<>(probes); - } - - @Test - public void testSpillingHashJoinWithMassiveCollisions() throws IOException - { - // the following two values are known to have a hash-code collision on the initial level. - // we use them to make sure one partition grows over-proportionally large - final int REPEATED_VALUE_1 = 40559; - final int REPEATED_VALUE_2 = 92882; - final int REPEATED_VALUE_COUNT_BUILD = 200000; - final int REPEATED_VALUE_COUNT_PROBE = 5; - - final int NUM_KEYS = 1000000; - final int BUILD_VALS_PER_KEY = 3; - final int PROBE_VALS_PER_KEY = 10; - - // create a build input that gives 3 million pairs with 3 values sharing the same key, plus 400k pairs with two colliding keys - MutableObjectIterator> build1 = new UniformIntTupleGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false); - MutableObjectIterator> build2 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD); - MutableObjectIterator> build3 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD); - List>> builds = new ArrayList<>(); - builds.add(build1); - builds.add(build2); - builds.add(build3); - MutableObjectIterator> buildInput = new UnionIterator<>(builds); - - - - - // allocate the memory for the HashTable - List memSegments; - try { - memSegments = this.memoryManager.allocatePages(MEM_OWNER, 896); - } - catch (MemoryAllocationException maex) { - fail("Memory for the Join could not be provided."); - return; - } - - // create the map for validating the results - HashMap map = new HashMap(NUM_KEYS); - - // ---------------------------------------------------------------------------------------- - - final ReOpenableMutableHashTable, Tuple2> join = new ReOpenableMutableHashTable<>( - this.recordBuildSideAccesssor, this.recordProbeSideAccesssor, - this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator, - memSegments, ioManager, true); - - for (int probe = 0; probe < NUM_PROBES; probe++) { - // create a probe input that gives 10 million pairs with 10 values sharing a key - MutableObjectIterator> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2); - if(probe == 0) { - join.open(buildInput, probeInput); - } else { - join.reopenProbe(probeInput); - } - - Tuple2 record; - final Tuple2 recordReuse = new Tuple2<>(); - - while (join.nextRecord()) { - long numBuildValues = 0; - - final Tuple2 probeRec = join.getCurrentProbeRecord(); - Integer key = probeRec.f0; - - MutableObjectIterator> buildSide = join.getBuildSideIterator(); - if ((record = buildSide.next(recordReuse)) != null) { - numBuildValues = 1; - Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0); - } - else { - fail("No build side values found for a probe key."); - } - while ((record = buildSide.next(record)) != null) { - numBuildValues++; - Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0); - } - - Long contained = map.get(key); - if (contained == null) { - contained = numBuildValues; - } - else { - contained = contained + numBuildValues; - } - - map.put(key, contained); - } - } - - join.close(); - - Assert.assertEquals("Wrong number of keys", NUM_KEYS, map.size()); - for (Entry entry : map.entrySet()) { - long val = entry.getValue(); - int key = entry.getKey(); - - if( key == REPEATED_VALUE_1 || key == REPEATED_VALUE_2) { - Assert.assertEquals("Wrong number of values in per-key cross product for key " + key, - (PROBE_VALS_PER_KEY + REPEATED_VALUE_COUNT_PROBE) * (BUILD_VALS_PER_KEY + REPEATED_VALUE_COUNT_BUILD) * NUM_PROBES, val); - } else { - Assert.assertEquals("Wrong number of values in per-key cross product for key " + key, - PROBE_VALS_PER_KEY * BUILD_VALS_PER_KEY * NUM_PROBES, val); - } - } - - - // ---------------------------------------------------------------------------------------- - - this.memoryManager.release(join.getFreedMemory()); - } - - /* - * This test is basically identical to the "testSpillingHashJoinWithMassiveCollisions" test, only that the number - * of repeated values (causing bucket collisions) are large enough to make sure that their target partition no longer - * fits into memory by itself and needs to be repartitioned in the recursion again. - */ - @Test - public void testSpillingHashJoinWithTwoRecursions() throws IOException - { - // the following two values are known to have a hash-code collision on the first recursion level. - // we use them to make sure one partition grows over-proportionally large - final int REPEATED_VALUE_1 = 40559; - final int REPEATED_VALUE_2 = 92882; - final int REPEATED_VALUE_COUNT_BUILD = 200000; - final int REPEATED_VALUE_COUNT_PROBE = 5; - - final int NUM_KEYS = 1000000; - final int BUILD_VALS_PER_KEY = 3; - final int PROBE_VALS_PER_KEY = 10; - - // create a build input that gives 3 million pairs with 3 values sharing the same key, plus 400k pairs with two colliding keys - MutableObjectIterator> build1 = new UniformIntTupleGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false); - MutableObjectIterator> build2 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD); - MutableObjectIterator> build3 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD); - List>> builds = new ArrayList<>(); - builds.add(build1); - builds.add(build2); - builds.add(build3); - MutableObjectIterator> buildInput = new UnionIterator<>(builds); - - - // allocate the memory for the HashTable - List memSegments; - try { - memSegments = this.memoryManager.allocatePages(MEM_OWNER, 896); - } - catch (MemoryAllocationException maex) { - fail("Memory for the Join could not be provided."); - return; - } - - // create the map for validating the results - HashMap map = new HashMap(NUM_KEYS); - - // ---------------------------------------------------------------------------------------- - - final ReOpenableMutableHashTable, Tuple2> join = new ReOpenableMutableHashTable<>( - this.recordBuildSideAccesssor, this.recordProbeSideAccesssor, - this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator, - memSegments, ioManager, true); - - for (int probe = 0; probe < NUM_PROBES; probe++) { - // create a probe input that gives 10 million pairs with 10 values sharing a key - MutableObjectIterator> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2); - if (probe == 0) { - join.open(buildInput, probeInput); - } else { - join.reopenProbe(probeInput); - } - Tuple2 record; - final Tuple2 recordReuse = new Tuple2<>(); - - while (join.nextRecord()) { - long numBuildValues = 0; - - final Tuple2 probeRec = join.getCurrentProbeRecord(); - Integer key = probeRec.f0; - - MutableObjectIterator> buildSide = join.getBuildSideIterator(); - if ((record = buildSide.next(recordReuse)) != null) { - numBuildValues = 1; - Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0); - } - else { - fail("No build side values found for a probe key."); - } - while ((record = buildSide.next(recordReuse)) != null) { - numBuildValues++; - Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0); - } - - Long contained = map.get(key); - if (contained == null) { - contained = numBuildValues; - } - else { - contained = contained + numBuildValues; - } - - map.put(key, contained); - } - } - - join.close(); - Assert.assertEquals("Wrong number of keys", NUM_KEYS, map.size()); - for (Entry entry : map.entrySet()) { - long val = entry.getValue(); - int key = entry.getKey(); - - if( key == REPEATED_VALUE_1 || key == REPEATED_VALUE_2) { - Assert.assertEquals("Wrong number of values in per-key cross product for key " + key, - (PROBE_VALS_PER_KEY + REPEATED_VALUE_COUNT_PROBE) * (BUILD_VALS_PER_KEY + REPEATED_VALUE_COUNT_BUILD) * NUM_PROBES, val); - } else { - Assert.assertEquals("Wrong number of values in per-key cross product for key " + key, - PROBE_VALS_PER_KEY * BUILD_VALS_PER_KEY * NUM_PROBES, val); - } - } - - - // ---------------------------------------------------------------------------------------- - - this.memoryManager.release(join.getFreedMemory()); - } - - static Map> deepCopy(Map> expectedSecondMatchesMap) { - Map> copy = new HashMap<>(expectedSecondMatchesMap.size()); - for(Entry> entry : expectedSecondMatchesMap.entrySet()) { - List matches = new ArrayList(entry.getValue().size()); - for(TupleMatch m : entry.getValue()) { - matches.add(m); - } - copy.put(entry.getKey(), matches); - } - return copy; - } - } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java new file mode 100644 index 0000000000000..f667c878e20c3 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableITCase.java @@ -0,0 +1,222 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.typeutils.GenericPairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemoryType; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memory.MemoryAllocationException; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.testutils.DummyInvokable; +import org.apache.flink.runtime.operators.testutils.TestData; +import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator; +import org.apache.flink.runtime.operators.testutils.UnionIterator; +import org.apache.flink.util.MutableObjectIterator; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.fail; + +public class ReOpenableHashTableITCase { + + private static final int PAGE_SIZE = 8 * 1024; + private static final long MEMORY_SIZE = PAGE_SIZE * 1000; // 100 Pages. + + private static final int NUM_PROBES = 3; // number of reopenings of hash join + + private IOManager ioManager; + private MemoryManager memoryManager; + + private static final AbstractInvokable MEM_OWNER = new DummyInvokable(); + private TypeSerializer> recordBuildSideAccesssor; + private TypeSerializer> recordProbeSideAccesssor; + private TypeComparator> recordBuildSideComparator; + private TypeComparator> recordProbeSideComparator; + private TypePairComparator, Tuple2> pactRecordComparator; + + @SuppressWarnings({"unchecked", "rawtypes"}) + @Before + public void beforeTest() { + this.recordBuildSideAccesssor = TestData.getIntIntTupleSerializer(); + this.recordProbeSideAccesssor = TestData.getIntIntTupleSerializer(); + this.recordBuildSideComparator = TestData.getIntIntTupleComparator(); + this.recordProbeSideComparator = TestData.getIntIntTupleComparator(); + this.pactRecordComparator = new GenericPairComparator(this.recordBuildSideComparator, this.recordProbeSideComparator); + + this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true); + this.ioManager = new IOManagerAsync(); + } + + @After + public void afterTest() { + if (this.ioManager != null) { + this.ioManager.shutdown(); + if (!this.ioManager.isProperlyShutDown()) { + Assert.fail("I/O manager failed to properly shut down."); + } + this.ioManager = null; + } + + if (this.memoryManager != null) { + Assert.assertTrue("Memory Leak: Not all memory has been returned to the memory manager.", + this.memoryManager.verifyEmpty()); + this.memoryManager.shutdown(); + this.memoryManager = null; + } + } + + private MutableObjectIterator> getProbeInput(final int numKeys, + final int probeValsPerKey, final int repeatedValue1, final int repeatedValue2) { + MutableObjectIterator> probe1 = new UniformIntTupleGenerator(numKeys, probeValsPerKey, true); + MutableObjectIterator> probe2 = new TestData.ConstantIntIntTuplesIterator(repeatedValue1, 17, 5); + MutableObjectIterator> probe3 = new TestData.ConstantIntIntTuplesIterator(repeatedValue2, 23, 5); + List>> probes = new ArrayList<>(); + probes.add(probe1); + probes.add(probe2); + probes.add(probe3); + return new UnionIterator<>(probes); + } + + @Test + public void testSpillingHashJoinWithMassiveCollisions() throws IOException + { + // the following two values are known to have a hash-code collision on the initial level. + // we use them to make sure one partition grows over-proportionally large + final int REPEATED_VALUE_1 = 40559; + final int REPEATED_VALUE_2 = 92882; + final int REPEATED_VALUE_COUNT_BUILD = 200000; + final int REPEATED_VALUE_COUNT_PROBE = 5; + + final int NUM_KEYS = 1000000; + final int BUILD_VALS_PER_KEY = 3; + final int PROBE_VALS_PER_KEY = 10; + + // create a build input that gives 3 million pairs with 3 values sharing the same key, plus 400k pairs with two colliding keys + MutableObjectIterator> build1 = new UniformIntTupleGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false); + MutableObjectIterator> build2 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD); + MutableObjectIterator> build3 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD); + List>> builds = new ArrayList<>(); + builds.add(build1); + builds.add(build2); + builds.add(build3); + MutableObjectIterator> buildInput = new UnionIterator<>(builds); + + + + + // allocate the memory for the HashTable + List memSegments; + try { + memSegments = this.memoryManager.allocatePages(MEM_OWNER, 896); + } + catch (MemoryAllocationException maex) { + fail("Memory for the Join could not be provided."); + return; + } + + // create the map for validating the results + HashMap map = new HashMap(NUM_KEYS); + + // ---------------------------------------------------------------------------------------- + + final ReOpenableMutableHashTable, Tuple2> join = new ReOpenableMutableHashTable<>( + this.recordBuildSideAccesssor, this.recordProbeSideAccesssor, + this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator, + memSegments, ioManager, true); + + for (int probe = 0; probe < NUM_PROBES; probe++) { + // create a probe input that gives 10 million pairs with 10 values sharing a key + MutableObjectIterator> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2); + if(probe == 0) { + join.open(buildInput, probeInput); + } else { + join.reopenProbe(probeInput); + } + + Tuple2 record; + final Tuple2 recordReuse = new Tuple2<>(); + + while (join.nextRecord()) { + long numBuildValues = 0; + + final Tuple2 probeRec = join.getCurrentProbeRecord(); + Integer key = probeRec.f0; + + MutableObjectIterator> buildSide = join.getBuildSideIterator(); + if ((record = buildSide.next(recordReuse)) != null) { + numBuildValues = 1; + Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0); + } + else { + fail("No build side values found for a probe key."); + } + while ((record = buildSide.next(recordReuse)) != null) { + numBuildValues++; + Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0); + } + + Long contained = map.get(key); + if (contained == null) { + contained = numBuildValues; + } + else { + contained = contained + numBuildValues; + } + + map.put(key, contained); + } + } + + join.close(); + Assert.assertEquals("Wrong number of keys", NUM_KEYS, map.size()); + for (Map.Entry entry : map.entrySet()) { + long val = entry.getValue(); + int key = entry.getKey(); + + if( key == REPEATED_VALUE_1 || key == REPEATED_VALUE_2) { + Assert.assertEquals("Wrong number of values in per-key cross product for key " + key, + (PROBE_VALS_PER_KEY + REPEATED_VALUE_COUNT_PROBE) * (BUILD_VALS_PER_KEY + REPEATED_VALUE_COUNT_BUILD) * NUM_PROBES, val); + } else { + Assert.assertEquals("Wrong number of values in per-key cross product for key " + key, + PROBE_VALS_PER_KEY * BUILD_VALS_PER_KEY * NUM_PROBES, val); + } + } + + + // ---------------------------------------------------------------------------------------- + + this.memoryManager.release(join.getFreedMemory()); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableTestBase.java new file mode 100644 index 0000000000000..c1b87b025f33d --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReOpenableHashTableTestBase.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +package org.apache.flink.runtime.operators.hash; + +import org.apache.flink.api.common.typeutils.GenericPairComparator; +import org.apache.flink.api.common.typeutils.TypeComparator; +import org.apache.flink.api.common.typeutils.TypePairComparator; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.MemoryType; +import org.apache.flink.runtime.io.disk.iomanager.IOManager; +import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; +import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; +import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatch; +import org.apache.flink.runtime.operators.testutils.DummyInvokable; +import org.apache.flink.runtime.operators.testutils.TestData; +import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator; +import org.apache.flink.runtime.operators.testutils.TestData.TupleGeneratorIterator; +import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode; +import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode; + +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + + +import java.util.*; + +public abstract class ReOpenableHashTableTestBase { + + protected static final int PAGE_SIZE = 8 * 1024; + protected static final long MEMORY_SIZE = PAGE_SIZE * 1000; // 100 Pages. + + protected static final long SEED1 = 561349061987311L; + protected static final long SEED2 = 231434613412342L; + + protected static final int NUM_PROBES = 3; // number of reopenings of hash join + + protected final AbstractInvokable parentTask = new DummyInvokable(); + + protected IOManager ioManager; + protected MemoryManager memoryManager; + + protected TypeSerializer> recordSerializer; + protected TypeComparator> record1Comparator; + protected TypeComparator> record2Comparator; + protected TypePairComparator, Tuple2> recordPairComparator; + + protected TypeSerializer> recordBuildSideAccesssor; + protected TypeSerializer> recordProbeSideAccesssor; + protected TypeComparator> recordBuildSideComparator; + protected TypeComparator> recordProbeSideComparator; + protected TypePairComparator, Tuple2> pactRecordComparator; + + @SuppressWarnings({"unchecked", "rawtypes"}) + @Before + public void beforeTest() { + this.recordSerializer = TestData.getIntStringTupleSerializer(); + + this.record1Comparator = TestData.getIntStringTupleComparator(); + this.record2Comparator = TestData.getIntStringTupleComparator(); + this.recordPairComparator = new GenericPairComparator(this.record1Comparator, this.record2Comparator); + + this.recordBuildSideAccesssor = TestData.getIntIntTupleSerializer(); + this.recordProbeSideAccesssor = TestData.getIntIntTupleSerializer(); + this.recordBuildSideComparator = TestData.getIntIntTupleComparator(); + this.recordProbeSideComparator = TestData.getIntIntTupleComparator(); + this.pactRecordComparator = new GenericPairComparator(this.recordBuildSideComparator, this.recordProbeSideComparator); + + this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true); + this.ioManager = new IOManagerAsync(); + } + + @After + public void afterTest() { + if (this.ioManager != null) { + this.ioManager.shutdown(); + if (!this.ioManager.isProperlyShutDown()) { + Assert.fail("I/O manager failed to properly shut down."); + } + this.ioManager = null; + } + + if (this.memoryManager != null) { + Assert.assertTrue("Memory Leak: Not all memory has been returned to the memory manager.", + this.memoryManager.verifyEmpty()); + this.memoryManager.shutdown(); + this.memoryManager = null; + } + } + + /** + * Test behavior with overflow buckets (Overflow buckets must be initialized correctly + * if the input is reopened again) + */ + @Test + public void testOverflow() { + + int buildSize = 1000; + int probeSize = 1000; + try { + TupleGenerator bgen = new TupleGenerator(SEED1, 200, 1024, KeyMode.RANDOM, ValueMode.FIX_LENGTH); + TupleGenerator pgen = new TupleGenerator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH); + + final TupleGeneratorIterator buildInput = new TupleGeneratorIterator(bgen, buildSize); + final TupleGeneratorIterator probeInput = new TupleGeneratorIterator(pgen, probeSize); + doTest(buildInput,probeInput, bgen, pgen); + } + catch (Exception e) { + e.printStackTrace(); + Assert.fail("An exception occurred during the test: " + e.getMessage()); + } + } + + /** + * Verify proper operation if the build side is spilled to disk. + */ + @Test + public void testDoubleProbeSpilling() { + + int buildSize = 1000; + int probeSize = 1000; + try { + TupleGenerator bgen = new TupleGenerator(SEED1, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH); + TupleGenerator pgen = new TupleGenerator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH); + + final TupleGeneratorIterator buildInput = new TupleGeneratorIterator(bgen, buildSize); + final TupleGeneratorIterator probeInput = new TupleGeneratorIterator(pgen, probeSize); + doTest(buildInput,probeInput, bgen, pgen); + } + catch (Exception e) { + e.printStackTrace(); + Assert.fail("An exception occurred during the test: " + e.getMessage()); + } + } + + /** + * This test case verifies that hybrid hash join is able to handle multiple probe phases + * when the build side fits completely into memory. + */ + @Test + public void testDoubleProbeInMemory() { + + int buildSize = 1000; + int probeSize = 1000; + try { + TupleGenerator bgen = new TupleGenerator(SEED1, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH); + TupleGenerator pgen = new TupleGenerator(SEED2, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH); + + final TupleGeneratorIterator buildInput = new TupleGeneratorIterator(bgen, buildSize); + final TupleGeneratorIterator probeInput = new TupleGeneratorIterator(pgen, probeSize); + + doTest(buildInput,probeInput, bgen, pgen); + } + catch (Exception e) { + e.printStackTrace(); + Assert.fail("An exception occurred during the test: " + e.getMessage()); + } + } + + abstract protected void doTest(TupleGeneratorIterator buildInput, TupleGeneratorIterator probeInput, TestData.TupleGenerator bgen, TestData.TupleGenerator pgen) throws Exception; + + static Map> deepCopy(Map> expectedSecondMatchesMap) { + Map> copy = new HashMap<>(expectedSecondMatchesMap.size()); + for(Map.Entry> entry : expectedSecondMatchesMap.entrySet()) { + List matches = new ArrayList(entry.getValue().size()); + for(TupleMatch m : entry.getValue()) { + matches.add(m); + } + copy.put(entry.getKey(), matches); + } + return copy; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java index 6afde162d71e4..af3a89466a56a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java @@ -19,44 +19,21 @@ package org.apache.flink.runtime.operators.hash; -import static org.junit.Assert.fail; +import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatch; +import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatchRemovingJoin; +import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector; +import org.apache.flink.runtime.operators.testutils.TestData; +import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator; +import org.apache.flink.util.Collector; +import org.junit.Assert; -import java.io.IOException; import java.util.ArrayList; import java.util.Collection; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Map.Entry; import org.apache.flink.api.common.functions.FlatJoinFunction; -import org.apache.flink.api.common.typeutils.GenericPairComparator; - -import org.apache.flink.api.common.typeutils.TypeComparator; -import org.apache.flink.api.common.typeutils.TypePairComparator; -import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.core.memory.MemorySegment; -import org.apache.flink.core.memory.MemoryType; -import org.apache.flink.runtime.io.disk.iomanager.IOManager; -import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; -import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.memory.MemoryAllocationException; -import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatchRemovingJoin; -import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatch; -import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector; -import org.apache.flink.runtime.operators.testutils.DummyInvokable; -import org.apache.flink.runtime.operators.testutils.TestData; -import org.apache.flink.runtime.operators.testutils.UnionIterator; -import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.KeyMode; -import org.apache.flink.runtime.operators.testutils.TestData.TupleGenerator.ValueMode; -import org.apache.flink.runtime.operators.testutils.UniformIntTupleGenerator; -import org.apache.flink.util.Collector; -import org.apache.flink.util.MutableObjectIterator; -import org.junit.After; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; import static org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.joinTuples; import static org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.collectTupleData; @@ -65,144 +42,9 @@ * Test specialized hash join that keeps the build side data (in memory and on hard disk) * This is used for iterative tasks. */ -public class ReusingReOpenableHashTableITCase { - - private static final int PAGE_SIZE = 8 * 1024; - private static final long MEMORY_SIZE = PAGE_SIZE * 1000; // 100 Pages. +public class ReusingReOpenableHashTableITCase extends ReOpenableHashTableTestBase { - private static final long SEED1 = 561349061987311L; - private static final long SEED2 = 231434613412342L; - - private static final int NUM_PROBES = 3; // number of reopenings of hash join - - private final AbstractInvokable parentTask = new DummyInvokable(); - - private IOManager ioManager; - private MemoryManager memoryManager; - - private TypeSerializer> recordSerializer; - private TypeComparator> record1Comparator; - private TypeComparator> record2Comparator; - private TypePairComparator, Tuple2> recordPairComparator; - - - - - private static final AbstractInvokable MEM_OWNER = new DummyInvokable(); - private TypeSerializer> recordBuildSideAccesssor; - private TypeSerializer> recordProbeSideAccesssor; - private TypeComparator> recordBuildSideComparator; - private TypeComparator> recordProbeSideComparator; - private TypePairComparator, Tuple2> pactRecordComparator; - - @SuppressWarnings({"unchecked", "rawtypes"}) - @Before - public void beforeTest() { - this.recordSerializer = TestData.getIntStringTupleSerializer(); - - this.record1Comparator = TestData.getIntStringTupleComparator(); - this.record2Comparator = TestData.getIntStringTupleComparator(); - this.recordPairComparator = new GenericPairComparator(this.record1Comparator, this.record2Comparator); - - this.recordBuildSideAccesssor = TestData.getIntIntTupleSerializer(); - this.recordProbeSideAccesssor = TestData.getIntIntTupleSerializer(); - this.recordBuildSideComparator = TestData.getIntIntTupleComparator(); - this.recordProbeSideComparator = TestData.getIntIntTupleComparator(); - this.pactRecordComparator = new GenericPairComparator(this.recordBuildSideComparator, this.recordProbeSideComparator); - - this.memoryManager = new MemoryManager(MEMORY_SIZE, 1, PAGE_SIZE, MemoryType.HEAP, true); - this.ioManager = new IOManagerAsync(); - } - - @After - public void afterTest() { - if (this.ioManager != null) { - this.ioManager.shutdown(); - if (!this.ioManager.isProperlyShutDown()) { - Assert.fail("I/O manager failed to properly shut down."); - } - this.ioManager = null; - } - - if (this.memoryManager != null) { - Assert.assertTrue("Memory Leak: Not all memory has been returned to the memory manager.", - this.memoryManager.verifyEmpty()); - this.memoryManager.shutdown(); - this.memoryManager = null; - } - } - - - /** - * Test behavior with overflow buckets (Overflow buckets must be initialized correctly - * if the input is reopened again) - */ - @Test - public void testOverflow() { - - int buildSize = 1000; - int probeSize = 1000; - try { - TestData.TupleGenerator bgen = new TestData.TupleGenerator(SEED1, 200, 1024, KeyMode.RANDOM, ValueMode.FIX_LENGTH); - TestData.TupleGenerator pgen = new TestData.TupleGenerator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH); - - final TestData.TupleGeneratorIterator buildInput = new TestData.TupleGeneratorIterator(bgen, buildSize); - final TestData.TupleGeneratorIterator probeInput = new TestData.TupleGeneratorIterator(pgen, probeSize); - doTest(buildInput,probeInput, bgen, pgen); - } - catch (Exception e) { - e.printStackTrace(); - Assert.fail("An exception occurred during the test: " + e.getMessage()); - } - } - - /** - * Verify proper operation if the build side is spilled to disk. - */ - @Test - public void testDoubleProbeSpilling() { - - int buildSize = 1000; - int probeSize = 1000; - try { - TestData.TupleGenerator bgen = new TestData.TupleGenerator(SEED1, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH); - TestData.TupleGenerator pgen = new TestData.TupleGenerator(SEED2, 0, 1024, KeyMode.SORTED, ValueMode.FIX_LENGTH); - - final TestData.TupleGeneratorIterator buildInput = new TestData.TupleGeneratorIterator(bgen, buildSize); - final TestData.TupleGeneratorIterator probeInput = new TestData.TupleGeneratorIterator(pgen, probeSize); - doTest(buildInput,probeInput, bgen, pgen); - } - catch (Exception e) { - e.printStackTrace(); - Assert.fail("An exception occurred during the test: " + e.getMessage()); - } - } - - /** - * This test case verifies that hybrid hash join is able to handle multiple probe phases - * when the build side fits completely into memory. - */ - @Test - public void testDoubleProbeInMemory() { - - int buildSize = 1000; - int probeSize = 1000; - try { - TestData.TupleGenerator bgen = new TestData.TupleGenerator(SEED1, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH); - TestData.TupleGenerator pgen = new TestData.TupleGenerator(SEED2, 0, 28, KeyMode.SORTED, ValueMode.FIX_LENGTH); - - final TestData.TupleGeneratorIterator buildInput = new TestData.TupleGeneratorIterator(bgen, buildSize); - final TestData.TupleGeneratorIterator probeInput = new TestData.TupleGeneratorIterator(pgen, probeSize); - - doTest(buildInput,probeInput, bgen, pgen); - } - catch (Exception e) { - e.printStackTrace(); - Assert.fail("An exception occurred during the test: " + e.getMessage()); - } - } - - private void doTest(TestData.TupleGeneratorIterator buildInput, TestData.TupleGeneratorIterator probeInput, TestData.TupleGenerator bgen, TestData.TupleGenerator pgen) throws Exception { + protected void doTest(TestData.TupleGeneratorIterator buildInput, TestData.TupleGeneratorIterator probeInput, TupleGenerator bgen, TupleGenerator pgen) throws Exception { // collect expected data final Map> expectedFirstMatchesMap = joinTuples(collectTupleData(buildInput), collectTupleData(probeInput)); @@ -260,257 +102,4 @@ private void doTest(TestData.TupleGeneratorIterator buildInput, TestData.TupleGe iterator.close(); } - - // - // - // Tests taken from HahTableITCase! - // - // - - private MutableObjectIterator> getProbeInput(final int numKeys, - final int probeValsPerKey, final int repeatedValue1, final int repeatedValue2) { - MutableObjectIterator> probe1 = new UniformIntTupleGenerator(numKeys, probeValsPerKey, true); - MutableObjectIterator> probe2 = new TestData.ConstantIntIntTuplesIterator(repeatedValue1, 17, 5); - MutableObjectIterator> probe3 = new TestData.ConstantIntIntTuplesIterator(repeatedValue2, 23, 5); - List>> probes = new ArrayList<>(); - probes.add(probe1); - probes.add(probe2); - probes.add(probe3); - return new UnionIterator<>(probes); - } - - @Test - public void testSpillingHashJoinWithMassiveCollisions() throws IOException { - // the following two values are known to have a hash-code collision on the initial level. - // we use them to make sure one partition grows over-proportionally large - final int REPEATED_VALUE_1 = 40559; - final int REPEATED_VALUE_2 = 92882; - final int REPEATED_VALUE_COUNT_BUILD = 200000; - final int REPEATED_VALUE_COUNT_PROBE = 5; - - final int NUM_KEYS = 1000000; - final int BUILD_VALS_PER_KEY = 3; - final int PROBE_VALS_PER_KEY = 10; - - // create a build input that gives 3 million pairs with 3 values sharing the same key, plus 400k pairs with two colliding keys - MutableObjectIterator> build1 = new UniformIntTupleGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false); - MutableObjectIterator> build2 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD); - MutableObjectIterator> build3 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD); - List>> builds = new ArrayList<>(); - builds.add(build1); - builds.add(build2); - builds.add(build3); - MutableObjectIterator> buildInput = new UnionIterator<>(builds); - - // allocate the memory for the HashTable - List memSegments; - try { - memSegments = this.memoryManager.allocatePages(MEM_OWNER, 896); - } - catch (MemoryAllocationException maex) { - fail("Memory for the Join could not be provided."); - return; - } - - // create the map for validating the results - HashMap map = new HashMap(NUM_KEYS); - - // ---------------------------------------------------------------------------------------- - - final ReOpenableMutableHashTable, Tuple2> join = new ReOpenableMutableHashTable<>( - this.recordBuildSideAccesssor, this.recordProbeSideAccesssor, - this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator, - memSegments, ioManager, true); - - for(int probe = 0; probe < NUM_PROBES; probe++) { - // create a probe input that gives 10 million pairs with 10 values sharing a key - MutableObjectIterator> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2); - if(probe == 0) { - join.open(buildInput, probeInput); - } else { - join.reopenProbe(probeInput); - } - - Tuple2 record; - final Tuple2 recordReuse = new Tuple2<>(); - - while (join.nextRecord()) { - long numBuildValues = 0; - - final Tuple2 probeRec = join.getCurrentProbeRecord(); - Integer key = probeRec.f0; - - MutableObjectIterator> buildSide = join.getBuildSideIterator(); - if ((record = buildSide.next(recordReuse)) != null) { - numBuildValues = 1; - Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0); - } - else { - fail("No build side values found for a probe key."); - } - while ((record = buildSide.next(record)) != null) { - numBuildValues++; - Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0); - } - - Long contained = map.get(key); - if (contained == null) { - contained = numBuildValues; - } - else { - contained = contained + numBuildValues; - } - - map.put(key, contained); - } - } - - join.close(); - - Assert.assertEquals("Wrong number of keys", NUM_KEYS, map.size()); - for (Map.Entry entry : map.entrySet()) { - long val = entry.getValue(); - int key = entry.getKey(); - - if( key == REPEATED_VALUE_1 || key == REPEATED_VALUE_2) { - Assert.assertEquals("Wrong number of values in per-key cross product for key " + key, - (PROBE_VALS_PER_KEY + REPEATED_VALUE_COUNT_PROBE) * (BUILD_VALS_PER_KEY + REPEATED_VALUE_COUNT_BUILD) * NUM_PROBES, val); - } else { - Assert.assertEquals("Wrong number of values in per-key cross product for key " + key, - PROBE_VALS_PER_KEY * BUILD_VALS_PER_KEY * NUM_PROBES, val); - } - } - - - // ---------------------------------------------------------------------------------------- - - this.memoryManager.release(join.getFreedMemory()); - } - - /* - * This test is basically identical to the "testSpillingHashJoinWithMassiveCollisions" test, only that the number - * of repeated values (causing bucket collisions) are large enough to make sure that their target partition no longer - * fits into memory by itself and needs to be repartitioned in the recursion again. - */ - @Test - public void testSpillingHashJoinWithTwoRecursions() throws IOException - { - // the following two values are known to have a hash-code collision on the first recursion level. - // we use them to make sure one partition grows over-proportionally large - final int REPEATED_VALUE_1 = 40559; - final int REPEATED_VALUE_2 = 92882; - final int REPEATED_VALUE_COUNT_BUILD = 200000; - final int REPEATED_VALUE_COUNT_PROBE = 5; - - final int NUM_KEYS = 1000000; - final int BUILD_VALS_PER_KEY = 3; - final int PROBE_VALS_PER_KEY = 10; - - // create a build input that gives 3 million pairs with 3 values sharing the same key, plus 400k pairs with two colliding keys - MutableObjectIterator> build1 = new UniformIntTupleGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false); - MutableObjectIterator> build2 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_1, 17, REPEATED_VALUE_COUNT_BUILD); - MutableObjectIterator> build3 = new TestData.ConstantIntIntTuplesIterator(REPEATED_VALUE_2, 23, REPEATED_VALUE_COUNT_BUILD); - List>> builds = new ArrayList<>(); - builds.add(build1); - builds.add(build2); - builds.add(build3); - MutableObjectIterator> buildInput = new UnionIterator<>(builds); - - - // allocate the memory for the HashTable - List memSegments; - try { - memSegments = this.memoryManager.allocatePages(MEM_OWNER, 896); - } - catch (MemoryAllocationException maex) { - fail("Memory for the Join could not be provided."); - return; - } - - // create the map for validating the results - HashMap map = new HashMap(NUM_KEYS); - - // ---------------------------------------------------------------------------------------- - - final ReOpenableMutableHashTable, Tuple2> join = new ReOpenableMutableHashTable<>( - this.recordBuildSideAccesssor, this.recordProbeSideAccesssor, - this.recordBuildSideComparator, this.recordProbeSideComparator, this.pactRecordComparator, - memSegments, ioManager, true); - - for (int probe = 0; probe < NUM_PROBES; probe++) { - // create a probe input that gives 10 million pairs with 10 values sharing a key - MutableObjectIterator> probeInput = getProbeInput(NUM_KEYS, PROBE_VALS_PER_KEY, REPEATED_VALUE_1, REPEATED_VALUE_2); - if(probe == 0) { - join.open(buildInput, probeInput); - } else { - join.reopenProbe(probeInput); - } - Tuple2 record; - final Tuple2 recordReuse = new Tuple2<>(); - - while (join.nextRecord()) - { - long numBuildValues = 0; - - final Tuple2 probeRec = join.getCurrentProbeRecord(); - Integer key = probeRec.f0; - - MutableObjectIterator> buildSide = join.getBuildSideIterator(); - if ((record = buildSide.next(recordReuse)) != null) { - numBuildValues = 1; - Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0); - } - else { - fail("No build side values found for a probe key."); - } - while ((record = buildSide.next(recordReuse)) != null) { - numBuildValues++; - Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0); - } - - Long contained = map.get(key); - if (contained == null) { - contained = numBuildValues; - } - else { - contained = contained + numBuildValues; - } - - map.put(key, contained); - } - } - - join.close(); - Assert.assertEquals("Wrong number of keys", NUM_KEYS, map.size()); - for (Map.Entry entry : map.entrySet()) { - long val = entry.getValue(); - int key = entry.getKey(); - - if( key == REPEATED_VALUE_1 || key == REPEATED_VALUE_2) { - Assert.assertEquals("Wrong number of values in per-key cross product for key " + key, - (PROBE_VALS_PER_KEY + REPEATED_VALUE_COUNT_PROBE) * (BUILD_VALS_PER_KEY + REPEATED_VALUE_COUNT_BUILD) * NUM_PROBES, val); - } else { - Assert.assertEquals("Wrong number of values in per-key cross product for key " + key, - PROBE_VALS_PER_KEY * BUILD_VALS_PER_KEY * NUM_PROBES, val); - } - } - - - // ---------------------------------------------------------------------------------------- - - this.memoryManager.release(join.getFreedMemory()); - } - - - static Map> deepCopy(Map> expectedSecondMatchesMap) { - Map> copy = new HashMap<>(expectedSecondMatchesMap.size()); - for(Map.Entry> entry : expectedSecondMatchesMap.entrySet()) { - List matches = new ArrayList(entry.getValue().size()); - for(TupleMatch m : entry.getValue()) { - matches.add(m); - } - copy.put(entry.getKey(), matches); - } - return copy; - } }