From eaf540fd1c9c7395a7f278b9a4f25e42cf8d56b0 Mon Sep 17 00:00:00 2001 From: chengxiang li Date: Wed, 27 Jan 2016 11:49:05 +0800 Subject: [PATCH] [FLINK-2871] support outer join for hash on build side. this commit support full outer join includes: 1. left outer join with REPARTITION_HASH_FIRST. 2. right outer join with REPARTITION_HASH_SECOND. 3. fullouter join with REPARTITION_HASH_FIRST and REPARTITION_HASH_SECOND. this close #1469 --- .../org/apache/flink/api/java/DataSet.java | 4 + .../operator/FullOuterJoinOperatorTest.java | 4 +- .../operator/LeftOuterJoinOperatorTest.java | 2 +- .../operator/RightOuterJoinOperatorTest.java | 2 +- .../flink/optimizer/costs/CostEstimator.java | 4 + .../flink/optimizer/dag/OuterJoinNode.java | 20 +- ...HashFullOuterJoinBuildFirstDescriptor.java | 65 +++ ...ashFullOuterJoinBuildSecondDescriptor.java | 64 +++ ...HashLeftOuterJoinBuildFirstDescriptor.java | 65 +++ ...shRightOuterJoinBuildSecondDescriptor.java | 65 +++ .../AbstractCachedBuildSideJoinDriver.java | 4 + .../runtime/operators/DriverStrategy.java | 16 +- .../operators/FullOuterJoinDriver.java | 50 ++- .../flink/runtime/operators/JoinDriver.java | 4 + .../operators/LeftOuterJoinDriver.java | 28 +- .../operators/RightOuterJoinDriver.java | 28 +- .../operators/hash/MutableHashTable.java | 382 ++++++++++++++++-- .../NonReusingBuildFirstHashJoinIterator.java | 46 ++- ...gBuildFirstReOpenableHashJoinIterator.java | 5 +- ...NonReusingBuildSecondHashJoinIterator.java | 49 ++- ...BuildSecondReOpenableHashJoinIterator.java | 5 +- .../hash/ReOpenableMutableHashTable.java | 4 +- .../ReusingBuildFirstHashJoinIterator.java | 58 ++- ...gBuildFirstReOpenableHashJoinIterator.java | 5 +- .../ReusingBuildSecondHashJoinIterator.java | 58 ++- ...BuildSecondReOpenableHashJoinIterator.java | 5 +- .../flink/runtime/operators/util/BitSet.java | 108 +++++ .../operators/hash/HashTableITCase.java | 142 ++++++- .../runtime/operators/hash/HashTableTest.java | 4 +- .../NonReusingHashJoinIteratorITCase.java | 303 ++++++++++++-- .../NonReusingReOpenableHashTableITCase.java | 11 +- .../hash/ReusingHashJoinIteratorITCase.java | 250 ++++++++++-- .../ReusingReOpenableHashTableITCase.java | 7 +- .../runtime/operators/util/BitSetTest.java | 86 ++++ .../util/HashVsSortMiniBenchmark.java | 8 +- .../javaApiOperators/OuterJoinITCase.java | 44 +- .../HashTableRecordWidthCombinations.java | 2 +- 37 files changed, 1754 insertions(+), 253 deletions(-) create mode 100644 flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildFirstDescriptor.java create mode 100644 flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildSecondDescriptor.java create mode 100644 flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashLeftOuterJoinBuildFirstDescriptor.java create mode 100644 flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashRightOuterJoinBuildSecondDescriptor.java create mode 100644 flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BitSet.java create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BitSetTest.java diff --git a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java index 56331858d66e7..6db32c5ad5e4a 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/DataSet.java @@ -841,6 +841,7 @@ public JoinOperatorSetsBase leftOuterJoin(DataSet other, JoinHint s switch(strategy) { case OPTIMIZER_CHOOSES: case REPARTITION_SORT_MERGE: + case REPARTITION_HASH_FIRST: case REPARTITION_HASH_SECOND: case BROADCAST_HASH_SECOND: return new JoinOperatorSetsBase<>(this, other, strategy, JoinType.LEFT_OUTER); @@ -891,6 +892,7 @@ public JoinOperatorSetsBase rightOuterJoin(DataSet other, JoinHint case OPTIMIZER_CHOOSES: case REPARTITION_SORT_MERGE: case REPARTITION_HASH_FIRST: + case REPARTITION_HASH_SECOND: case BROADCAST_HASH_FIRST: return new JoinOperatorSetsBase<>(this, other, strategy, JoinType.RIGHT_OUTER); default: @@ -938,6 +940,8 @@ public JoinOperatorSetsBase fullOuterJoin(DataSet other, JoinHint s switch(strategy) { case OPTIMIZER_CHOOSES: case REPARTITION_SORT_MERGE: + case REPARTITION_HASH_FIRST: + case REPARTITION_HASH_SECOND: return new JoinOperatorSetsBase<>(this, other, strategy, JoinType.FULL_OUTER); default: throw new InvalidProgramException("Invalid JoinHint for FullOuterJoin: "+strategy); diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java index 794f0d405e41c..9f2aa41970fb5 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/FullOuterJoinOperatorTest.java @@ -181,7 +181,7 @@ public void testFullOuterStrategy2() { this.testFullOuterStrategies(JoinHint.REPARTITION_SORT_MERGE); } - @Test(expected = InvalidProgramException.class) + @Test public void testFullOuterStrategy3() { this.testFullOuterStrategies(JoinHint.REPARTITION_HASH_SECOND); } @@ -191,7 +191,7 @@ public void testFullOuterStrategy4() { this.testFullOuterStrategies(JoinHint.BROADCAST_HASH_SECOND); } - @Test(expected = InvalidProgramException.class) + @Test public void testFullOuterStrategy5() { this.testFullOuterStrategies(JoinHint.REPARTITION_HASH_FIRST); } diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java index cab06c2dee33d..bfcc3e874d245 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/LeftOuterJoinOperatorTest.java @@ -192,7 +192,7 @@ public void testLeftOuterStrategy4() { this.testLeftOuterStrategies(JoinHint.BROADCAST_HASH_SECOND); } - @Test(expected = InvalidProgramException.class) + @Test public void testLeftOuterStrategy5() { this.testLeftOuterStrategies(JoinHint.REPARTITION_HASH_FIRST); } diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java index 411edd5d71118..709d830e5ff8b 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/RightOuterJoinOperatorTest.java @@ -181,7 +181,7 @@ public void testRightOuterStrategy2() { this.testRightOuterStrategies(JoinHint.REPARTITION_SORT_MERGE); } - @Test(expected = InvalidProgramException.class) + @Test public void testRightOuterStrategy3() { this.testRightOuterStrategies(JoinHint.REPARTITION_HASH_SECOND); } diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java index 78d61d1e586a4..9cc5e48b9c58e 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/costs/CostEstimator.java @@ -202,10 +202,14 @@ public void costOperator(PlanNode n) { break; case HYBRIDHASH_BUILD_FIRST: case RIGHT_HYBRIDHASH_BUILD_FIRST: + case LEFT_HYBRIDHASH_BUILD_FIRST: + case FULL_OUTER_HYBRIDHASH_BUILD_FIRST: addHybridHashCosts(firstInput, secondInput, driverCosts, costWeight); break; case HYBRIDHASH_BUILD_SECOND: case LEFT_HYBRIDHASH_BUILD_SECOND: + case RIGHT_HYBRIDHASH_BUILD_SECOND: + case FULL_OUTER_HYBRIDHASH_BUILD_SECOND: addHybridHashCosts(secondInput, firstInput, driverCosts, costWeight); break; case HYBRIDHASH_BUILD_FIRST_CACHED: diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OuterJoinNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OuterJoinNode.java index 0784de3b82c61..7c5c849cd08c7 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OuterJoinNode.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/OuterJoinNode.java @@ -25,8 +25,12 @@ import org.apache.flink.optimizer.CompilerException; import org.apache.flink.optimizer.DataStatistics; import org.apache.flink.optimizer.operators.AbstractJoinDescriptor; +import org.apache.flink.optimizer.operators.HashFullOuterJoinBuildFirstDescriptor; +import org.apache.flink.optimizer.operators.HashFullOuterJoinBuildSecondDescriptor; +import org.apache.flink.optimizer.operators.HashLeftOuterJoinBuildFirstDescriptor; import org.apache.flink.optimizer.operators.HashLeftOuterJoinBuildSecondDescriptor; import org.apache.flink.optimizer.operators.HashRightOuterJoinBuildFirstDescriptor; +import org.apache.flink.optimizer.operators.HashRightOuterJoinBuildSecondDescriptor; import org.apache.flink.optimizer.operators.OperatorDescriptorDual; import org.apache.flink.optimizer.operators.SortMergeFullOuterJoinDescriptor; import org.apache.flink.optimizer.operators.SortMergeLeftOuterJoinDescriptor; @@ -99,8 +103,10 @@ private List createLeftOuterJoinDescriptors(JoinHint hin case BROADCAST_HASH_SECOND: list.add(new HashLeftOuterJoinBuildSecondDescriptor(this.keys1, this.keys2, true, false)); break; - case BROADCAST_HASH_FIRST: case REPARTITION_HASH_FIRST: + list.add(new HashLeftOuterJoinBuildFirstDescriptor(this.keys1, this.keys2, false, true)); + break; + case BROADCAST_HASH_FIRST: default: throw new CompilerException("Invalid join hint: " + hint + " for left outer join"); } @@ -124,8 +130,10 @@ private List createRightOuterJoinDescriptors(JoinHint hi case BROADCAST_HASH_FIRST: list.add(new HashRightOuterJoinBuildFirstDescriptor(this.keys1, this.keys2, true, false)); break; - case BROADCAST_HASH_SECOND: case REPARTITION_HASH_SECOND: + list.add(new HashRightOuterJoinBuildSecondDescriptor(this.keys1, this.keys2, false, true)); + break; + case BROADCAST_HASH_SECOND: default: throw new CompilerException("Invalid join hint: " + hint + " for right outer join"); } @@ -142,10 +150,14 @@ private List createFullOuterJoinDescriptors(JoinHint hin case REPARTITION_SORT_MERGE: list.add(new SortMergeFullOuterJoinDescriptor(this.keys1, this.keys2)); break; + case REPARTITION_HASH_FIRST: + list.add(new HashFullOuterJoinBuildFirstDescriptor(this.keys1, this.keys2)); + break; case REPARTITION_HASH_SECOND: - case BROADCAST_HASH_SECOND: + list.add(new HashFullOuterJoinBuildSecondDescriptor(this.keys1, this.keys2)); + break; case BROADCAST_HASH_FIRST: - case REPARTITION_HASH_FIRST: + case BROADCAST_HASH_SECOND: default: throw new CompilerException("Invalid join hint: " + hint + " for full outer join"); } diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildFirstDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildFirstDescriptor.java new file mode 100644 index 0000000000000..49852484b617a --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildFirstDescriptor.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.optimizer.operators; + +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.optimizer.dag.TwoInputNode; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.plan.DualInputPlanNode; +import org.apache.flink.runtime.operators.DriverStrategy; + +import java.util.Collections; +import java.util.List; + +public class HashFullOuterJoinBuildFirstDescriptor extends AbstractJoinDescriptor { + + public HashFullOuterJoinBuildFirstDescriptor(FieldList keys1, FieldList keys2) { + super(keys1, keys2, false, false, true); + } + + @Override + public DriverStrategy getStrategy() { + return DriverStrategy.FULL_OUTER_HYBRIDHASH_BUILD_FIRST; + } + + @Override + protected List createPossibleLocalProperties() { + // all properties are possible + return Collections.singletonList(new LocalPropertiesPair(new RequestedLocalProperties(), new RequestedLocalProperties())); + } + + @Override + public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2, + LocalProperties produced1, LocalProperties produced2) { + return true; + } + + @Override + public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) { + + String nodeName = "FullOuterJoin("+node.getOperator().getName()+")"; + return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2); + } + + @Override + public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) { + return new LocalProperties(); + } +} diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildSecondDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildSecondDescriptor.java new file mode 100644 index 0000000000000..d605a19179af4 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashFullOuterJoinBuildSecondDescriptor.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.optimizer.operators; + +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.optimizer.dag.TwoInputNode; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.plan.DualInputPlanNode; +import org.apache.flink.runtime.operators.DriverStrategy; + +import java.util.Collections; +import java.util.List; + +public class HashFullOuterJoinBuildSecondDescriptor extends AbstractJoinDescriptor { + public HashFullOuterJoinBuildSecondDescriptor(FieldList keys1, FieldList keys2) { + super(keys1, keys2, false, false, true); + } + + @Override + public DriverStrategy getStrategy() { + return DriverStrategy.FULL_OUTER_HYBRIDHASH_BUILD_SECOND; + } + + @Override + protected List createPossibleLocalProperties() { + // all properties are possible + return Collections.singletonList(new LocalPropertiesPair(new RequestedLocalProperties(), new RequestedLocalProperties())); + } + + @Override + public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2, + LocalProperties produced1, LocalProperties produced2) { + return true; + } + + @Override + public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) { + + String nodeName = "FullOuterJoin("+node.getOperator().getName()+")"; + return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2); + } + + @Override + public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) { + return new LocalProperties(); + } +} diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashLeftOuterJoinBuildFirstDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashLeftOuterJoinBuildFirstDescriptor.java new file mode 100644 index 0000000000000..ab4e106591183 --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashLeftOuterJoinBuildFirstDescriptor.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.optimizer.operators; + +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.optimizer.dag.TwoInputNode; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.plan.DualInputPlanNode; +import org.apache.flink.runtime.operators.DriverStrategy; + +import java.util.Collections; +import java.util.List; + +public class HashLeftOuterJoinBuildFirstDescriptor extends AbstractJoinDescriptor{ + public HashLeftOuterJoinBuildFirstDescriptor(FieldList keys1, FieldList keys2, + boolean broadcastSecondAllowed, boolean repartitionAllowed) { + super(keys1, keys2, false, broadcastSecondAllowed, repartitionAllowed); + } + + @Override + public DriverStrategy getStrategy() { + return DriverStrategy.LEFT_HYBRIDHASH_BUILD_FIRST; + } + + @Override + protected List createPossibleLocalProperties() { + // all properties are possible + return Collections.singletonList(new OperatorDescriptorDual.LocalPropertiesPair(new RequestedLocalProperties(), new RequestedLocalProperties())); + } + + @Override + public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2, + LocalProperties produced1, LocalProperties produced2) { + return true; + } + + @Override + public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) { + + String nodeName = "LeftOuterJoin("+node.getOperator().getName()+")"; + return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2); + } + + @Override + public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) { + return new LocalProperties(); + } +} diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashRightOuterJoinBuildSecondDescriptor.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashRightOuterJoinBuildSecondDescriptor.java new file mode 100644 index 0000000000000..7bb8f1e972e8c --- /dev/null +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/HashRightOuterJoinBuildSecondDescriptor.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.optimizer.operators; + +import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.optimizer.dag.TwoInputNode; +import org.apache.flink.optimizer.dataproperties.LocalProperties; +import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties; +import org.apache.flink.optimizer.plan.Channel; +import org.apache.flink.optimizer.plan.DualInputPlanNode; +import org.apache.flink.runtime.operators.DriverStrategy; + +import java.util.Collections; +import java.util.List; + +public class HashRightOuterJoinBuildSecondDescriptor extends AbstractJoinDescriptor { + public HashRightOuterJoinBuildSecondDescriptor(FieldList keys1, FieldList keys2, + boolean broadcastFirstAllowed, boolean repartitionAllowed) { + super(keys1, keys2, broadcastFirstAllowed, false, repartitionAllowed); + } + + @Override + public DriverStrategy getStrategy() { + return DriverStrategy.RIGHT_HYBRIDHASH_BUILD_SECOND; + } + + @Override + protected List createPossibleLocalProperties() { + // all properties are possible + return Collections.singletonList(new LocalPropertiesPair(new RequestedLocalProperties(), new RequestedLocalProperties())); + } + + @Override + public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2, + LocalProperties produced1, LocalProperties produced2) { + return true; + } + + @Override + public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) { + + String nodeName = "RightOuterJoin("+node.getOperator().getName()+")"; + return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2); + } + + @Override + public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) { + return new LocalProperties(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java index 9ac2ed6251956..e034dd6504c24 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AbstractCachedBuildSideJoinDriver.java @@ -95,6 +95,7 @@ public void initialize() throws Exception { this.taskContext.getOwningNepheleTask(), availableMemory, false, + false, hashJoinUseBitMaps); @@ -110,6 +111,7 @@ public void initialize() throws Exception { this.taskContext.getOwningNepheleTask(), availableMemory, false, + false, hashJoinUseBitMaps); } else { @@ -128,6 +130,7 @@ public void initialize() throws Exception { this.taskContext.getOwningNepheleTask(), availableMemory, false, + false, hashJoinUseBitMaps); @@ -143,6 +146,7 @@ public void initialize() throws Exception { this.taskContext.getOwningNepheleTask(), availableMemory, false, + false, hashJoinUseBitMaps); } else { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java index d01594eef9c36..e43350bdb6458 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/DriverStrategy.java @@ -88,12 +88,20 @@ public enum DriverStrategy { HYBRIDHASH_BUILD_FIRST_CACHED(BuildFirstCachedJoinDriver.class, null, FULL_DAM, MATERIALIZING, 2), // cached variant of HYBRIDHASH_BUILD_SECOND, that can only be used inside of iterations HYBRIDHASH_BUILD_SECOND_CACHED(BuildSecondCachedJoinDriver.class, null, MATERIALIZING, FULL_DAM, 2), - - // right outer join, the first input is build side, the second side is probe side of a hybrid hash table + + // right outer join, the first input is build side, the second input is probe side of a hybrid hash table. RIGHT_HYBRIDHASH_BUILD_FIRST(RightOuterJoinDriver.class, null, FULL_DAM, MATERIALIZING, 2), - // left outer join, the second input is build side, the first side is probe side of a hybrid hash table + // right outer join, the first input is probe side, the second input is build side of a hybrid hash table. + RIGHT_HYBRIDHASH_BUILD_SECOND(RightOuterJoinDriver.class, null, FULL_DAM, MATERIALIZING, 2), + // left outer join, the first input is build side, the second input is probe side of a hybrid hash table. + LEFT_HYBRIDHASH_BUILD_FIRST(LeftOuterJoinDriver.class, null, MATERIALIZING, FULL_DAM, 2), + // left outer join, the first input is probe side, the second input is build side of a hybrid hash table. LEFT_HYBRIDHASH_BUILD_SECOND(LeftOuterJoinDriver.class, null, MATERIALIZING, FULL_DAM, 2), - + // full outer join, the first input is build side, the second input is the probe side of a hybrid hash table. + FULL_OUTER_HYBRIDHASH_BUILD_FIRST(FullOuterJoinDriver.class, null, FULL_DAM, MATERIALIZING, 2), + // full outer join, the first input is probe side, the second input is the build side of a hybrid hash table. + FULL_OUTER_HYBRIDHASH_BUILD_SECOND(FullOuterJoinDriver.class, null, MATERIALIZING, FULL_DAM, 2), + // the second input is inner loop, the first input is outer loop and block-wise processed NESTEDLOOP_BLOCKED_OUTER_FIRST(CrossDriver.class, null, MATERIALIZING, FULL_DAM, 0), // the first input is inner loop, the second input is outer loop and block-wise processed diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java index 2c01fec2e294f..a41a6ec6e664f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/FullOuterJoinDriver.java @@ -24,6 +24,10 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashJoinIterator; +import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashJoinIterator; +import org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashJoinIterator; +import org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator; import org.apache.flink.runtime.operators.sort.NonReusingMergeOuterJoinIterator; import org.apache.flink.runtime.operators.sort.ReusingMergeOuterJoinIterator; import org.apache.flink.runtime.operators.util.JoinTaskIterator; @@ -66,6 +70,28 @@ protected JoinTaskIterator getReusingOuterJoinIterator( numPages, super.taskContext.getOwningNepheleTask() ); + case FULL_OUTER_HYBRIDHASH_BUILD_FIRST: + return new ReusingBuildFirstHashJoinIterator<>(in1, in2, + serializer1, comparator1, + serializer2, comparator2, + pairComparatorFactory.createComparator21(comparator1, comparator2), + memoryManager, ioManager, + this.taskContext.getOwningNepheleTask(), + driverMemFraction, + true, + true, + false); + case FULL_OUTER_HYBRIDHASH_BUILD_SECOND: + return new ReusingBuildSecondHashJoinIterator<>(in1, in2, + serializer1, comparator1, + serializer2, comparator2, + pairComparatorFactory.createComparator12(comparator1, comparator2), + memoryManager, ioManager, + this.taskContext.getOwningNepheleTask(), + driverMemFraction, + true, + true, + false); default: throw new Exception("Unsupported driver strategy for full outer join driver: " + driverStrategy.name()); } @@ -102,8 +128,30 @@ protected JoinTaskIterator getNonReusingOuterJoinIterator( numPages, super.taskContext.getOwningNepheleTask() ); + case FULL_OUTER_HYBRIDHASH_BUILD_FIRST: + return new NonReusingBuildFirstHashJoinIterator<>(in1, in2, + serializer1, comparator1, + serializer2, comparator2, + pairComparatorFactory.createComparator21(comparator1, comparator2), + memoryManager, ioManager, + this.taskContext.getOwningNepheleTask(), + driverMemFraction, + true, + true, + false); + case FULL_OUTER_HYBRIDHASH_BUILD_SECOND: + return new NonReusingBuildSecondHashJoinIterator<>(in1, in2, + serializer1, comparator1, + serializer2, comparator2, + pairComparatorFactory.createComparator12(comparator1, comparator2), + memoryManager, ioManager, + this.taskContext.getOwningNepheleTask(), + driverMemFraction, + true, + true, + false); default: throw new Exception("Unsupported driver strategy for full outer join driver: " + driverStrategy.name()); } } -} \ No newline at end of file +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java index c55843a619aaa..f7ad8d1e3f9e2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/JoinDriver.java @@ -141,6 +141,7 @@ public void prepare() throws Exception{ this.taskContext.getOwningNepheleTask(), fractionAvailableMemory, false, + false, hashJoinUseBitMaps); break; case HYBRIDHASH_BUILD_SECOND: @@ -152,6 +153,7 @@ public void prepare() throws Exception{ this.taskContext.getOwningNepheleTask(), fractionAvailableMemory, false, + false, hashJoinUseBitMaps); break; default: @@ -176,6 +178,7 @@ public void prepare() throws Exception{ this.taskContext.getOwningNepheleTask(), fractionAvailableMemory, false, + false, hashJoinUseBitMaps); break; case HYBRIDHASH_BUILD_SECOND: @@ -187,6 +190,7 @@ public void prepare() throws Exception{ this.taskContext.getOwningNepheleTask(), fractionAvailableMemory, false, + false, hashJoinUseBitMaps); break; default: diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java index 49d3648a7b10a..41bb54d852653 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/LeftOuterJoinDriver.java @@ -24,7 +24,9 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.memory.MemoryManager; +import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashJoinIterator; import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashJoinIterator; +import org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashJoinIterator; import org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator; import org.apache.flink.runtime.operators.sort.NonReusingMergeOuterJoinIterator; import org.apache.flink.runtime.operators.sort.ReusingMergeOuterJoinIterator; @@ -68,6 +70,17 @@ protected JoinTaskIterator getReusingOuterJoinIterator( numPages, super.taskContext.getOwningNepheleTask() ); + case LEFT_HYBRIDHASH_BUILD_FIRST: + return new ReusingBuildFirstHashJoinIterator<>(in1, in2, + serializer1, comparator1, + serializer2, comparator2, + pairComparatorFactory.createComparator21(comparator1, comparator2), + memoryManager, ioManager, + this.taskContext.getOwningNepheleTask(), + driverMemFraction, + false, + true, + false); case LEFT_HYBRIDHASH_BUILD_SECOND: return new ReusingBuildSecondHashJoinIterator<>(in1, in2, serializer1, comparator1, @@ -77,6 +90,7 @@ protected JoinTaskIterator getReusingOuterJoinIterator( this.taskContext.getOwningNepheleTask(), driverMemFraction, true, + false, false); default: throw new Exception("Unsupported driver strategy for left outer join driver: " + driverStrategy.name()); @@ -114,6 +128,17 @@ protected JoinTaskIterator getNonReusingOuterJoinIterator( numPages, super.taskContext.getOwningNepheleTask() ); + case LEFT_HYBRIDHASH_BUILD_FIRST: + return new NonReusingBuildFirstHashJoinIterator<>(in1, in2, + serializer1, comparator1, + serializer2, comparator2, + pairComparatorFactory.createComparator21(comparator1, comparator2), + memoryManager, ioManager, + this.taskContext.getOwningNepheleTask(), + driverMemFraction, + false, + true, + false); case LEFT_HYBRIDHASH_BUILD_SECOND: return new NonReusingBuildSecondHashJoinIterator<>(in1, in2, serializer1, comparator1, @@ -123,9 +148,10 @@ protected JoinTaskIterator getNonReusingOuterJoinIterator( this.taskContext.getOwningNepheleTask(), driverMemFraction, true, + false, false); default: throw new Exception("Unsupported driver strategy for left outer join driver: " + driverStrategy.name()); } } -} \ No newline at end of file +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java index 1b67397c7c6cb..96f65b4d332fa 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/RightOuterJoinDriver.java @@ -25,7 +25,9 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.operators.hash.NonReusingBuildFirstHashJoinIterator; +import org.apache.flink.runtime.operators.hash.NonReusingBuildSecondHashJoinIterator; import org.apache.flink.runtime.operators.hash.ReusingBuildFirstHashJoinIterator; +import org.apache.flink.runtime.operators.hash.ReusingBuildSecondHashJoinIterator; import org.apache.flink.runtime.operators.sort.NonReusingMergeOuterJoinIterator; import org.apache.flink.runtime.operators.sort.ReusingMergeOuterJoinIterator; import org.apache.flink.runtime.operators.util.JoinTaskIterator; @@ -77,6 +79,18 @@ protected JoinTaskIterator getReusingOuterJoinIterator( this.taskContext.getOwningNepheleTask(), driverMemFraction, true, + false, + false); + case RIGHT_HYBRIDHASH_BUILD_SECOND: + return new ReusingBuildSecondHashJoinIterator<>(in1, in2, + serializer1, comparator1, + serializer2, comparator2, + pairComparatorFactory.createComparator12(comparator1, comparator2), + memoryManager, ioManager, + this.taskContext.getOwningNepheleTask(), + driverMemFraction, + false, + true, false); default: throw new Exception("Unsupported driver strategy for right outer join driver: " + driverStrategy.name()); @@ -123,9 +137,21 @@ protected JoinTaskIterator getNonReusingOuterJoinIterator( this.taskContext.getOwningNepheleTask(), driverMemFraction, true, + false, + false); + case RIGHT_HYBRIDHASH_BUILD_SECOND: + return new NonReusingBuildSecondHashJoinIterator<>(in1, in2, + serializer1, comparator1, + serializer2, comparator2, + pairComparatorFactory.createComparator12(comparator1, comparator2), + memoryManager, ioManager, + this.taskContext.getOwningNepheleTask(), + driverMemFraction, + false, + true, false); default: throw new Exception("Unsupported driver strategy for right outer join driver: " + driverStrategy.name()); } } -} \ No newline at end of file +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java index 0bf4433b49049..ce4e8271fc5dc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/MutableHashTable.java @@ -24,6 +24,7 @@ import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.flink.runtime.operators.util.BitSet; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -57,7 +58,7 @@ *
  * +----------------------------- Bucket x ----------------------------
  * |Partition (1 byte) | Status (1 byte) | element count (2 bytes) |
- * | next-bucket-in-chain-pointer (8 bytes) | reserved (4 bytes) |
+ * | next-bucket-in-chain-pointer (8 bytes) | probedFlags (2 bytes) | reserved (2 bytes) |
  * |
  * |hashCode 1 (4 bytes) | hashCode 2 (4 bytes) | hashCode 3 (4 bytes) |
  * | ... hashCode n-1 (4 bytes) | hashCode n (4 bytes)
@@ -67,7 +68,7 @@
  * |
  * +---------------------------- Bucket x + 1--------------------------
  * |Partition (1 byte) | Status (1 byte) | element count (2 bytes) |
- * | next-bucket-in-chain-pointer (8 bytes) | reserved (4 bytes) |
+ * | next-bucket-in-chain-pointer (8 bytes) | probedFlags (2 bytes) | reserved (2 bytes) |
  * |
  * |hashCode 1 (4 bytes) | hashCode 2 (4 bytes) | hashCode 3 (4 bytes) |
  * | ... hashCode n-1 (4 bytes) | hashCode n (4 bytes)
@@ -163,7 +164,12 @@ public class MutableHashTable implements MemorySegmentSource {
 	 * Offset of the field in the bucket header that holds the forward pointer to its
 	 * first overflow bucket.
 	 */
-	private static final int HEADER_FORWARD_OFFSET = 4;	
+	private static final int HEADER_FORWARD_OFFSET = 4;
+	
+	/**
+	 * Offset of the field in the bucket header that holds the probed bit set.
+	 */
+	static final int HEADER_PROBED_FLAGS_OFFSET = 12;
 	
 	/**
 	 * Constant for the forward pointer, indicating that the pointer is not set. 
@@ -301,7 +307,8 @@ public class MutableHashTable implements MemorySegmentSource {
 	protected MemorySegment[] buckets;
 
 	/** The bloom filter utility used to transform hash buckets of spilled partitions into a
-	 * probabilistic filter */
+	 * probabilistic filter
+	 */
 	private BloomFilter bloomFilter;
 	
 	/**
@@ -333,11 +340,24 @@ public class MutableHashTable implements MemorySegmentSource {
 	 * If true, build side partitions are kept for multiple probe steps.
 	 */
 	protected boolean keepBuildSidePartitions;
+
+	/**
+	 * BitSet which used to mark whether the element(int build side) has successfully matched during
+	 * probe phase. As there are 9 elements in each bucket, we assign 2 bytes to BitSet.
+	 */
+	private final BitSet probedSet = new BitSet(2);
 	
 	protected boolean furtherPartitioning;
 	
 	private boolean running = true;
 	
+	private boolean buildSideOuterJoin = false;
+	
+	private MutableObjectIterator unmatchedBuildIterator;
+	
+	private boolean probeMatchedPhase = true;
+	
+	private boolean unmatchedBuildVisited = false;
 	
 	// ------------------------------------------------------------------------
 	//                         Construction and Teardown
@@ -422,13 +442,32 @@ public MutableHashTable(TypeSerializer buildSideSerializer, TypeSerializer

buildSide, final MutableObjectIterator probeSide) - throws IOException - { + throws IOException { + + open(buildSide, probeSide, false); + } + + /** + * Opens the hash join. This method reads the build-side input and constructs the initial + * hash table, gradually spilling partitions that do not fit into memory. + * + * @param buildSide Build side input. + * @param probeSide Probe side input. + * @param buildOuterJoin Whether outer join on build side. + * @throws IOException Thrown, if an I/O problem occurs while spilling a partition. + */ + public void open(final MutableObjectIterator buildSide, final MutableObjectIterator probeSide, + boolean buildOuterJoin) throws IOException { + + this.buildSideOuterJoin = buildOuterJoin; + // sanity checks if (!this.closed.compareAndSet(true, false)) { throw new IllegalStateException("Hash Join cannot be opened, because it is currently not closed."); @@ -446,12 +485,16 @@ public void open(final MutableObjectIterator buildSide, final MutableObjectI this.probeIterator = new ProbeIterator(probeSide, this.probeSideSerializer.createInstance()); // the bucket iterator can remain constant over the time - this.bucketIterator = new HashBucketIterator(this.buildSideSerializer, this.recordComparator); + this.bucketIterator = new HashBucketIterator(this.buildSideSerializer, this.recordComparator, probedSet, buildOuterJoin); } protected boolean processProbeIter() throws IOException{ final ProbeIterator probeIter = this.probeIterator; final TypeComparator probeAccessors = this.probeSideComparator; + + if (!this.probeMatchedPhase) { + return false; + } PT next; while ((next = probeIter.next()) != null) { @@ -486,11 +529,39 @@ protected boolean processProbeIter() throws IOException{ } } // -------------- partition done --------------- - + return false; } + protected boolean processUnmatchedBuildIter() throws IOException { + if (this.unmatchedBuildVisited) { + return false; + } + + this.probeMatchedPhase = false; + UnmatchedBuildIterator unmatchedBuildIter = new UnmatchedBuildIterator<>(this.buildSideSerializer, this.numBuckets, + this.bucketsPerSegmentBits, this.bucketsPerSegmentMask, this.buckets, this.partitionsBeingBuilt, probedSet); + this.unmatchedBuildIterator = unmatchedBuildIter; + + // There maybe none unmatched build element, so we add a verification here to make sure we do not return (null, null) to user. + if (unmatchedBuildIter.next() == null) { + this.unmatchedBuildVisited = true; + return false; + } + + unmatchedBuildIter.back(); + + // While visit the unmatched build elements, the probe element is null, and the unmatchedBuildIterator + // would iterate all the unmatched build elements, so we return false during the second calling of this method. + this.unmatchedBuildVisited = true; + return true; + } + protected boolean prepareNextPartition() throws IOException { + + this.probeMatchedPhase = true; + this.unmatchedBuildVisited = false; + // finalize and cleanup the partitions of the current table int buffersAvailable = 0; for (int i = 0; i < this.partitionsBeingBuilt.size(); i++) { @@ -551,9 +622,11 @@ protected boolean prepareNextPartition() throws IOException { } public boolean nextRecord() throws IOException { - - final boolean probeProcessing = processProbeIter(); - return probeProcessing || prepareNextPartition(); + if (buildSideOuterJoin) { + return processProbeIter() || processUnmatchedBuildIter() || prepareNextPartition(); + } else { + return processProbeIter() || prepareNextPartition(); + } } public HashBucketIterator getMatchesFor(PT record) throws IOException { @@ -582,11 +655,19 @@ public HashBucketIterator getMatchesFor(PT record) throws IOException { } public PT getCurrentProbeRecord() { - return this.probeIterator.getCurrent(); + if (this.probeMatchedPhase) { + return this.probeIterator.getCurrent(); + } else { + return null; + } } - public HashBucketIterator getBuildSideIterator() { - return this.bucketIterator; + public MutableObjectIterator getBuildSideIterator() { + if (this.probeMatchedPhase) { + return this.bucketIterator; + } else { + return this.unmatchedBuildIterator; + } } /** @@ -976,7 +1057,10 @@ final void insertBucketEntry(final HashPartition p, final MemorySegment overflowSeg.putLong(overflowBucketOffset + BUCKET_POINTER_START_OFFSET, pointer); // pointer // set the count to one - overflowSeg.putShort(overflowBucketOffset + HEADER_COUNT_OFFSET, (short) 1); + overflowSeg.putShort(overflowBucketOffset + HEADER_COUNT_OFFSET, (short) 1); + + // initiate the probed bitset to 0. + overflowSeg.putShort(overflowBucketOffset + HEADER_PROBED_FLAGS_OFFSET, (short) 0); } } @@ -1049,6 +1133,7 @@ protected void initTable(int numBuckets, byte numPartitions) { seg.put(bucketOffset + HEADER_STATUS_OFFSET, BUCKET_STATUS_IN_MEMORY); seg.putShort(bucketOffset + HEADER_COUNT_OFFSET, (short) 0); seg.putLong(bucketOffset + HEADER_FORWARD_OFFSET, BUCKET_FORWARD_POINTER_NOT_SET); + seg.putShort(bucketOffset + HEADER_PROBED_FLAGS_OFFSET, (short) 0); } table[i] = seg; @@ -1417,14 +1502,19 @@ public static class HashBucketIterator implements MutableObjectIterator< private MemorySegment originalBucket; private long lastPointer; + + private BitSet probedSet; - - HashBucketIterator(TypeSerializer accessor, TypePairComparator comparator) { + private boolean isBuildOuterJoin = false; + + HashBucketIterator(TypeSerializer accessor, TypePairComparator comparator, + BitSet probedSet, boolean isBuildOuterJoin) { this.accessor = accessor; this.comparator = comparator; + this.probedSet = probedSet; + this.isBuildOuterJoin = isBuildOuterJoin; } - void set(MemorySegment bucket, MemorySegment[] overflowSegments, HashPartition partition, int searchHashCode, int bucketInSegmentOffset) { @@ -1435,26 +1525,24 @@ void set(MemorySegment bucket, MemorySegment[] overflowSegments, HashPartition implements MutableObjectIterator { + + private final TypeSerializer accessor; + + private final long totalBucketNumber; + + private final int bucketsPerSegmentBits; + + private final int bucketsPerSegmentMask; + + private final MemorySegment[] buckets; + + private final ArrayList> partitionsBeingBuilt; + + private final BitSet probedSet; + + private MemorySegment bucketSegment; + + private MemorySegment[] overflowSegments; + + private HashPartition partition; + + private int scanCount; + + private int bucketInSegmentOffset; + + private int countInSegment; + + private int numInSegment; + + UnmatchedBuildIterator( + TypeSerializer accessor, + long totalBucketNumber, + int bucketsPerSegmentBits, + int bucketsPerSegmentMask, + MemorySegment[] buckets, + ArrayList> partitionsBeingBuilt, + BitSet probedSet) { + + this.accessor = accessor; + this.totalBucketNumber = totalBucketNumber; + this.bucketsPerSegmentBits = bucketsPerSegmentBits; + this.bucketsPerSegmentMask = bucketsPerSegmentMask; + this.buckets = buckets; + this.partitionsBeingBuilt = partitionsBeingBuilt; + this.probedSet = probedSet; + init(); + } + + private void init() { + scanCount = -1; + while (!moveToNextBucket()) { + if (scanCount >= totalBucketNumber) { + break; + } + } + } + + public BT next(BT reuse) { + // search unprobed record in bucket, while found none, move to next bucket and search. + while (true) { + BT result = nextInBucket(reuse); + if (result == null) { + if (!moveToNextOnHeapBucket()) { + return null; + } + } else { + return result; + } + } + } + + public BT next() { + // search unProbed record in bucket, while found none, move to next bucket and search. + while (true) { + BT result = nextInBucket(); + if (result == null) { + // return null while there is no more bucket. + if (!moveToNextOnHeapBucket()) { + return null; + } + } else { + return result; + } + } + } + + /** + * Loop to make sure that it would move to next on heap bucket, return true while move to a on heap bucket, + * return false if there is no more bucket. + */ + private boolean moveToNextOnHeapBucket() { + while (!moveToNextBucket()) { + if (scanCount >= totalBucketNumber) { + return false; + } + } + return true; + } + + /** + * Move to next bucket, return true while move to a on heap bucket, return false while move to a spilled bucket + * or there is no more bucket. + */ + private boolean moveToNextBucket() { + scanCount++; + if (scanCount > totalBucketNumber - 1) { + return false; + } + // move to next bucket, update all the current bucket status with new bucket information. + final int bucketArrayPos = scanCount >> this.bucketsPerSegmentBits; + final int currentBucketInSegmentOffset = (scanCount & this.bucketsPerSegmentMask) << NUM_INTRA_BUCKET_BITS; + MemorySegment currentBucket = this.buckets[bucketArrayPos]; + final int partitionNumber = currentBucket.get(currentBucketInSegmentOffset + HEADER_PARTITION_OFFSET); + final HashPartition p = this.partitionsBeingBuilt.get(partitionNumber); + if (p.isInMemory()) { + setBucket(currentBucket, p.overflowSegments, p, currentBucketInSegmentOffset); + return true; + } else { + return false; + } + } + + // update current bucket status. + private void setBucket(MemorySegment bucket, MemorySegment[] overflowSegments, HashPartition partition, + int bucketInSegmentOffset) { + this.bucketSegment = bucket; + this.overflowSegments = overflowSegments; + this.partition = partition; + this.bucketInSegmentOffset = bucketInSegmentOffset; + this.countInSegment = bucket.getShort(bucketInSegmentOffset + HEADER_COUNT_OFFSET); + this.numInSegment = 0; + // reset probedSet with probedFlags offset in this bucket. + this.probedSet.setMemorySegment(bucketSegment, this.bucketInSegmentOffset + HEADER_PROBED_FLAGS_OFFSET); + } + + private BT nextInBucket(BT reuse) { + // loop over all segments that are involved in the bucket (original bucket plus overflow buckets) + while (true) { + while (this.numInSegment < this.countInSegment) { + boolean probed = probedSet.get(numInSegment); + if (!probed) { + final long pointer = this.bucketSegment.getLong(this.bucketInSegmentOffset + + BUCKET_POINTER_START_OFFSET + (this.numInSegment * POINTER_LEN)); + try { + this.partition.setReadPosition(pointer); + reuse = this.accessor.deserialize(reuse, this.partition); + this.numInSegment++; + return reuse; + } catch (IOException ioex) { + throw new RuntimeException("Error deserializing key or value from the hashtable: " + + ioex.getMessage(), ioex); + } + } else { + this.numInSegment++; + } + } + // this segment is done. check if there is another chained bucket + final long forwardPointer = this.bucketSegment.getLong(this.bucketInSegmentOffset + HEADER_FORWARD_OFFSET); + if (forwardPointer == BUCKET_FORWARD_POINTER_NOT_SET) { + return null; + } + + final int overflowSegNum = (int) (forwardPointer >>> 32); + this.bucketSegment = this.overflowSegments[overflowSegNum]; + this.bucketInSegmentOffset = (int) forwardPointer; + this.countInSegment = this.bucketSegment.getShort(this.bucketInSegmentOffset + HEADER_COUNT_OFFSET); + this.numInSegment = 0; + // reset probedSet with probedFlags offset in this bucket. + this.probedSet.setMemorySegment(bucketSegment, this.bucketInSegmentOffset + HEADER_PROBED_FLAGS_OFFSET); + } + } + + private BT nextInBucket() { + // loop over all segments that are involved in the bucket (original bucket plus overflow buckets) + while (true) { + while (this.numInSegment < this.countInSegment) { + boolean probed = probedSet.get(numInSegment); + if (!probed) { + final long pointer = this.bucketSegment.getLong(this.bucketInSegmentOffset + + BUCKET_POINTER_START_OFFSET + (this.numInSegment * POINTER_LEN)); + try { + this.partition.setReadPosition(pointer); + BT result = this.accessor.deserialize(this.partition); + this.numInSegment++; + return result; + } catch (IOException ioex) { + throw new RuntimeException("Error deserializing key or value from the hashtable: " + + ioex.getMessage(), ioex); + } + } else { + this.numInSegment++; + } + } + + // this segment is done. check if there is another chained bucket + final long forwardPointer = this.bucketSegment.getLong(this.bucketInSegmentOffset + HEADER_FORWARD_OFFSET); + if (forwardPointer == BUCKET_FORWARD_POINTER_NOT_SET) { + return null; + } + + final int overflowSegNum = (int) (forwardPointer >>> 32); + this.bucketSegment = this.overflowSegments[overflowSegNum]; + this.bucketInSegmentOffset = (int) forwardPointer; + this.countInSegment = this.bucketSegment.getShort(this.bucketInSegmentOffset + HEADER_COUNT_OFFSET); + this.numInSegment = 0; + // reset probedSet with probedFlags offset in this bucket. + this.probedSet.setMemorySegment(bucketSegment, this.bucketInSegmentOffset + HEADER_PROBED_FLAGS_OFFSET); + } + } + + public void back() { + this.numInSegment--; + } + } + // ====================================================================================================== public static final class ProbeIterator { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstHashJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstHashJoinIterator.java index 3b940c2238b3c..2c21619ab9a5d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstHashJoinIterator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstHashJoinIterator.java @@ -53,7 +53,9 @@ public class NonReusingBuildFirstHashJoinIterator extends HashJoinIte private final MutableObjectIterator secondInput; - private final boolean joinWithEmptyBuildSide; + private final boolean probeSideOuterJoin; + + private final boolean buildSideOuterJoin; private volatile boolean running = true; @@ -70,7 +72,8 @@ public NonReusingBuildFirstHashJoinIterator( MemoryManager memManager, IOManager ioManager, AbstractInvokable ownerTask, double memoryFraction, - boolean joinWithEmptyBuildSide, + boolean probeSideOuterJoin, + boolean buildSideOuterJoin, boolean useBitmapFilters) throws MemoryAllocationException { this.memManager = memManager; @@ -78,10 +81,11 @@ public NonReusingBuildFirstHashJoinIterator( this.secondInput = secondInput; this.probeSideSerializer = serializer2; - if(useBitmapFilters && joinWithEmptyBuildSide) { + if(useBitmapFilters && probeSideOuterJoin) { throw new IllegalArgumentException("Bitmap filter may not be activated for joining with empty build side"); } - this.joinWithEmptyBuildSide = joinWithEmptyBuildSide; + this.probeSideOuterJoin = probeSideOuterJoin; + this.buildSideOuterJoin = buildSideOuterJoin; this.hashJoin = getHashJoin(serializer1, comparator1, serializer2, comparator2, pairComparator, memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters); @@ -91,7 +95,7 @@ public NonReusingBuildFirstHashJoinIterator( @Override public void open() throws IOException, MemoryAllocationException, InterruptedException { - this.hashJoin.open(this.firstInput, this.secondInput); + this.hashJoin.open(this.firstInput, this.secondInput, this.buildSideOuterJoin); } @@ -112,14 +116,14 @@ public final boolean callWithNextKey(FlatJoinFunction matchFunction, if (this.hashJoin.nextRecord()) { // we have a next record, get the iterators to the probe and build side values - final MutableHashTable.HashBucketIterator buildSideIterator = this.hashJoin.getBuildSideIterator(); - V1 nextBuildSideRecord; - + final MutableObjectIterator buildSideIterator = this.hashJoin.getBuildSideIterator(); + final V2 probeRecord = this.hashJoin.getCurrentProbeRecord(); + V1 nextBuildSideRecord = buildSideIterator.next(); + // get the first build side value - if ((nextBuildSideRecord = buildSideIterator.next()) != null) { + if (probeRecord != null && nextBuildSideRecord != null) { V1 tmpRec; - final V2 probeRecord = this.hashJoin.getCurrentProbeRecord(); - + // check if there is another build-side value if ((tmpRec = buildSideIterator.next()) != null) { // more than one build-side value --> copy the probe side @@ -144,13 +148,23 @@ public final boolean callWithNextKey(FlatJoinFunction matchFunction, // only single pair matches matchFunction.join(nextBuildSideRecord, probeRecord, collector); } - } - else if(joinWithEmptyBuildSide) { - // build side is empty, join with null - final V2 probeRecord = this.hashJoin.getCurrentProbeRecord(); + } else { + // while probe side outer join, join current probe record with null. + if (probeSideOuterJoin && probeRecord != null && nextBuildSideRecord == null) { + matchFunction.join(null, probeRecord, collector); + } - matchFunction.join(null, probeRecord, collector); + // while build side outer join, iterate all build records which have not been probed before, + // and join with null. + if (buildSideOuterJoin && probeRecord == null && nextBuildSideRecord != null) { + matchFunction.join(nextBuildSideRecord, null, collector); + + while (this.running && ((nextBuildSideRecord = buildSideIterator.next()) != null)) { + matchFunction.join(nextBuildSideRecord, null, collector); + } + } } + return true; } else { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstReOpenableHashJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstReOpenableHashJoinIterator.java index 77521af9139de..1c193d15c0727 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstReOpenableHashJoinIterator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildFirstReOpenableHashJoinIterator.java @@ -49,12 +49,13 @@ public NonReusingBuildFirstReOpenableHashJoinIterator( IOManager ioManager, AbstractInvokable ownerTask, double memoryFraction, - boolean joinWithEmptyBuildSide, + boolean probeSideOuterJoin, + boolean buildSideOuterJoin, boolean useBitmapFilters) throws MemoryAllocationException { super(firstInput, secondInput, serializer1, comparator1, serializer2, comparator2, pairComparator, memManager, ioManager, ownerTask, - memoryFraction, joinWithEmptyBuildSide, useBitmapFilters); + memoryFraction, probeSideOuterJoin, buildSideOuterJoin, useBitmapFilters); reopenHashTable = (ReOpenableMutableHashTable) hashJoin; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondHashJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondHashJoinIterator.java index 9ea0b743f7d3b..2ac22aea4c84b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondHashJoinIterator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondHashJoinIterator.java @@ -52,7 +52,9 @@ public class NonReusingBuildSecondHashJoinIterator extends HashJoinIt private final MutableObjectIterator secondInput; - private final boolean joinWithEmptyBuildSide; + private final boolean buildSideOuterJoin; + + private final boolean probeSideOuterJoin; private volatile boolean running = true; @@ -69,7 +71,8 @@ public NonReusingBuildSecondHashJoinIterator( MemoryManager memManager, IOManager ioManager, AbstractInvokable ownerTask, double memoryFraction, - boolean joinWithEmptyBuildSide, + boolean probeSideOuterJoin, + boolean buildSideOuterJoin, boolean useBitmapFilters) throws MemoryAllocationException { this.memManager = memManager; @@ -77,20 +80,21 @@ public NonReusingBuildSecondHashJoinIterator( this.secondInput = secondInput; this.probeSideSerializer = serializer1; - if(useBitmapFilters && joinWithEmptyBuildSide) { + if(useBitmapFilters && probeSideOuterJoin) { throw new IllegalArgumentException("Bitmap filter may not be activated for joining with empty build side"); } - this.joinWithEmptyBuildSide = joinWithEmptyBuildSide; + this.probeSideOuterJoin = probeSideOuterJoin; + this.buildSideOuterJoin = buildSideOuterJoin; this.hashJoin = getHashJoin(serializer2, comparator2, serializer1, comparator1, pairComparator, memManager, ioManager, ownerTask, memoryFraction, useBitmapFilters); } - + // -------------------------------------------------------------------------------------------- @Override public void open() throws IOException, MemoryAllocationException, InterruptedException { - this.hashJoin.open(this.secondInput, this.firstInput); + this.hashJoin.open(this.secondInput, this.firstInput, buildSideOuterJoin); } @Override @@ -110,14 +114,14 @@ public boolean callWithNextKey(FlatJoinFunction matchFunction, Collec if (this.hashJoin.nextRecord()) { // we have a next record, get the iterators to the probe and build side values - final MutableHashTable.HashBucketIterator buildSideIterator = this.hashJoin.getBuildSideIterator(); - V2 nextBuildSideRecord; - - // get the first build side value - if ((nextBuildSideRecord = buildSideIterator.next()) != null) { + final MutableObjectIterator buildSideIterator = this.hashJoin.getBuildSideIterator(); + + final V1 probeRecord = this.hashJoin.getCurrentProbeRecord(); + V2 nextBuildSideRecord = buildSideIterator.next(); + + if (probeRecord != null && nextBuildSideRecord != null) { V2 tmpRec; - final V1 probeRecord = this.hashJoin.getCurrentProbeRecord(); - + // check if there is another build-side value if ((tmpRec = buildSideIterator.next()) != null) { // more than one build-side value --> copy the probe side @@ -142,13 +146,22 @@ public boolean callWithNextKey(FlatJoinFunction matchFunction, Collec // only single pair matches matchFunction.join(probeRecord, nextBuildSideRecord, collector); } - } - else if(joinWithEmptyBuildSide) { - // build side is empty, join with null - final V1 probeRecord = this.hashJoin.getCurrentProbeRecord(); + } else { + // while probe side outer join, join current probe record with null. + if (probeSideOuterJoin && probeRecord != null && nextBuildSideRecord == null) { + matchFunction.join(probeRecord, null, collector); + } - matchFunction.join(probeRecord, null, collector); + // while build side outer join, iterate all build records which have not been probed before, + // and join with null. + if (buildSideOuterJoin && probeRecord == null && nextBuildSideRecord != null) { + matchFunction.join(null, nextBuildSideRecord, collector); + while (this.running && ((nextBuildSideRecord = buildSideIterator.next()) != null)) { + matchFunction.join(null, nextBuildSideRecord, collector); + } + } } + return true; } else { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondReOpenableHashJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondReOpenableHashJoinIterator.java index c9c9165efa507..c9d7d0d2ddb55 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondReOpenableHashJoinIterator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/NonReusingBuildSecondReOpenableHashJoinIterator.java @@ -49,12 +49,13 @@ public NonReusingBuildSecondReOpenableHashJoinIterator( IOManager ioManager, AbstractInvokable ownerTask, double memoryFraction, - boolean joinWithEmptyBuildSide, + boolean probeSideOuterJoin, + boolean buildSideOuterJoin, boolean useBitmapFilters) throws MemoryAllocationException { super(firstInput, secondInput, serializer1, comparator1, serializer2, comparator2, pairComparator, memManager, ioManager, ownerTask, - memoryFraction, joinWithEmptyBuildSide, useBitmapFilters); + memoryFraction, probeSideOuterJoin, buildSideOuterJoin, useBitmapFilters); reopenHashTable = (ReOpenableMutableHashTable) hashJoin; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java index b7a72622e2ba9..b9ddff8d6a092 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReOpenableMutableHashTable.java @@ -63,8 +63,8 @@ public ReOpenableMutableHashTable(TypeSerializer buildSideSerializer, } @Override - public void open(MutableObjectIterator buildSide, MutableObjectIterator probeSide) throws IOException { - super.open(buildSide, probeSide); + public void open(MutableObjectIterator buildSide, MutableObjectIterator probeSide, boolean buildSideOuterJoin) throws IOException { + super.open(buildSide, probeSide, buildSideOuterJoin); initialPartitions = new ArrayList>( partitionsBeingBuilt ); initialPartitionFanOut = (byte) partitionsBeingBuilt.size(); initialBucketCount = this.numBuckets; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstHashJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstHashJoinIterator.java index c1e601d241e93..78e0ab6bc6886 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstHashJoinIterator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstHashJoinIterator.java @@ -56,7 +56,9 @@ public class ReusingBuildFirstHashJoinIterator extends HashJoinIterat private final MutableObjectIterator secondInput; - private final boolean joinWithEmptyBuildSide; + private final boolean probeSideOuterJoin; + + private final boolean buildSideOuterJoin; private volatile boolean running = true; @@ -74,7 +76,8 @@ public ReusingBuildFirstHashJoinIterator( IOManager ioManager, AbstractInvokable ownerTask, double memoryFraction, - boolean joinWithEmptyBuildSide, + boolean probeSideOuterJoin, + boolean buildSideOuterJoin, boolean useBitmapFilters) throws MemoryAllocationException { this.memManager = memManager; @@ -82,10 +85,11 @@ public ReusingBuildFirstHashJoinIterator( this.secondInput = secondInput; this.probeSideSerializer = serializer2; - if(useBitmapFilters && joinWithEmptyBuildSide) { + if(useBitmapFilters && probeSideOuterJoin) { throw new IllegalArgumentException("Bitmap filter may not be activated for joining with empty build side"); } - this.joinWithEmptyBuildSide = joinWithEmptyBuildSide; + this.probeSideOuterJoin = probeSideOuterJoin; + this.buildSideOuterJoin = buildSideOuterJoin; this.nextBuildSideObject = serializer1.createInstance(); this.tempBuildSideRecord = serializer1.createInstance(); @@ -98,7 +102,7 @@ public ReusingBuildFirstHashJoinIterator( @Override public void open() throws IOException, MemoryAllocationException, InterruptedException { - this.hashJoin.open(this.firstInput, this.secondInput); + this.hashJoin.open(this.firstInput, this.secondInput, buildSideOuterJoin); } @@ -119,40 +123,34 @@ public final boolean callWithNextKey(FlatJoinFunction matchFunction, if (this.hashJoin.nextRecord()) { // we have a next record, get the iterators to the probe and build side values - final MutableHashTable.HashBucketIterator buildSideIterator = this.hashJoin.getBuildSideIterator(); - V1 nextBuildSideRecord = this.nextBuildSideObject; - - // get the first build side value - if ((nextBuildSideRecord = buildSideIterator.next(nextBuildSideRecord)) != null) { - V1 tmpRec = this.tempBuildSideRecord; - final V2 probeRecord = this.hashJoin.getCurrentProbeRecord(); - - // check if there is another build-side value - if ((tmpRec = buildSideIterator.next(tmpRec)) != null) { + final MutableObjectIterator buildSideIterator = this.hashJoin.getBuildSideIterator(); - // call match on the first pair + final V2 probeRecord = this.hashJoin.getCurrentProbeRecord(); + V1 nextBuildSideRecord = buildSideIterator.next(this.nextBuildSideObject); + + if (probeRecord != null && nextBuildSideRecord != null) { + matchFunction.join(nextBuildSideRecord, probeRecord, collector); + + while (this.running && ((nextBuildSideRecord = buildSideIterator.next(nextBuildSideRecord)) != null)) { matchFunction.join(nextBuildSideRecord, probeRecord, collector); - - // call match on the second pair - matchFunction.join(tmpRec, probeRecord, collector); - + } + } else { + if (probeSideOuterJoin && probeRecord != null && nextBuildSideRecord == null) { + matchFunction.join(null, probeRecord, collector); + } + + if (buildSideOuterJoin && probeRecord == null && nextBuildSideRecord != null) { + // call match on the first pair + matchFunction.join(nextBuildSideRecord, null, collector); + while (this.running && ((nextBuildSideRecord = buildSideIterator.next(nextBuildSideRecord)) != null)) { // call match on the next pair // make sure we restore the value of the probe side record - matchFunction.join(nextBuildSideRecord, probeRecord, collector); + matchFunction.join(nextBuildSideRecord, null, collector); } } - else { - // only single pair matches - matchFunction.join(nextBuildSideRecord, probeRecord, collector); - } } - else if(joinWithEmptyBuildSide) { - // build side is empty, join with null - final V2 probeRecord = this.hashJoin.getCurrentProbeRecord(); - matchFunction.join(null, probeRecord, collector); - } return true; } else { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstReOpenableHashJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstReOpenableHashJoinIterator.java index 1cc3f9124e880..5e29bc58fa1c2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstReOpenableHashJoinIterator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildFirstReOpenableHashJoinIterator.java @@ -49,13 +49,14 @@ public ReusingBuildFirstReOpenableHashJoinIterator( IOManager ioManager, AbstractInvokable ownerTask, double memoryFraction, - boolean joinWithEmptyBuildSide, + boolean probeSideOuterJoin, + boolean buildSideOuterJoin, boolean useBitmapFilters) throws MemoryAllocationException { super(firstInput, secondInput, serializer1, comparator1, serializer2, comparator2, pairComparator, memManager, ioManager, ownerTask, - memoryFraction, joinWithEmptyBuildSide, useBitmapFilters); + memoryFraction, probeSideOuterJoin, buildSideOuterJoin, useBitmapFilters); reopenHashTable = (ReOpenableMutableHashTable) hashJoin; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondHashJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondHashJoinIterator.java index 4402665499c04..fc9788c85d7dd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondHashJoinIterator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondHashJoinIterator.java @@ -56,7 +56,9 @@ public class ReusingBuildSecondHashJoinIterator extends HashJoinItera private final MutableObjectIterator secondInput; - private final boolean joinWithEmptyBuildSide; + private final boolean probeSideOuterJoin; + + private final boolean buildSideOuterJoin; private volatile boolean running = true; @@ -74,7 +76,8 @@ public ReusingBuildSecondHashJoinIterator( IOManager ioManager, AbstractInvokable ownerTask, double memoryFraction, - boolean joinWithEmptyBuildSide, + boolean probeSideOuterJoin, + boolean buildSideOuterJoin, boolean useBitmapFilters) throws MemoryAllocationException { this.memManager = memManager; @@ -82,10 +85,11 @@ public ReusingBuildSecondHashJoinIterator( this.secondInput = secondInput; this.probeSideSerializer = serializer1; - if(useBitmapFilters && joinWithEmptyBuildSide) { + if(useBitmapFilters && probeSideOuterJoin) { throw new IllegalArgumentException("Bitmap filter may not be activated for joining with empty build side"); } - this.joinWithEmptyBuildSide = joinWithEmptyBuildSide; + this.probeSideOuterJoin = probeSideOuterJoin; + this.buildSideOuterJoin = buildSideOuterJoin; this.nextBuildSideObject = serializer2.createInstance(); this.tempBuildSideRecord = serializer2.createInstance(); @@ -98,7 +102,7 @@ public ReusingBuildSecondHashJoinIterator( @Override public void open() throws IOException, MemoryAllocationException, InterruptedException { - this.hashJoin.open(this.secondInput, this.firstInput); + this.hashJoin.open(this.secondInput, this.firstInput, buildSideOuterJoin); } @Override @@ -118,39 +122,29 @@ public boolean callWithNextKey(FlatJoinFunction matchFunction, Collec if (this.hashJoin.nextRecord()) { // we have a next record, get the iterators to the probe and build side values - final MutableHashTable.HashBucketIterator buildSideIterator = this.hashJoin.getBuildSideIterator(); - V2 nextBuildSideRecord = this.nextBuildSideObject; - - // get the first build side value - if ((nextBuildSideRecord = buildSideIterator.next(nextBuildSideRecord)) != null) { - V2 tmpRec = this.tempBuildSideRecord; - final V1 probeRecord = this.hashJoin.getCurrentProbeRecord(); - - // check if there is another build-side value - if ((tmpRec = buildSideIterator.next(tmpRec)) != null) { - // call match on the first pair + final MutableObjectIterator buildSideIterator = this.hashJoin.getBuildSideIterator(); + final V1 probeRecord = this.hashJoin.getCurrentProbeRecord(); + V2 nextBuildSideRecord = buildSideIterator.next(this.nextBuildSideObject); + + if (probeRecord != null && nextBuildSideRecord != null) { + matchFunction.join(probeRecord, nextBuildSideRecord, collector); + + while (this.running && ((nextBuildSideRecord = buildSideIterator.next(nextBuildSideRecord)) != null)) { matchFunction.join(probeRecord, nextBuildSideRecord, collector); - - // call match on the second pair - matchFunction.join(probeRecord, tmpRec, collector); - + } + } else { + if (probeSideOuterJoin && probeRecord != null && nextBuildSideRecord == null) { + matchFunction.join(probeRecord, null, collector); + } + + if (buildSideOuterJoin && probeRecord == null && nextBuildSideRecord != null) { + matchFunction.join(null, nextBuildSideRecord, collector); while (this.running && ((nextBuildSideRecord = buildSideIterator.next(nextBuildSideRecord)) != null)) { - // call match on the next pair - // make sure we restore the value of the probe side record - matchFunction.join(probeRecord, nextBuildSideRecord, collector); + matchFunction.join(null, nextBuildSideRecord, collector); } } - else { - // only single pair matches - matchFunction.join(probeRecord, nextBuildSideRecord, collector); - } } - else if(joinWithEmptyBuildSide) { - // build side is empty, join with null - final V1 probeRecord = this.hashJoin.getCurrentProbeRecord(); - matchFunction.join(probeRecord, null, collector); - } return true; } else { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondReOpenableHashJoinIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondReOpenableHashJoinIterator.java index 190398fde82f3..e603ea8597b5b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondReOpenableHashJoinIterator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/hash/ReusingBuildSecondReOpenableHashJoinIterator.java @@ -49,12 +49,13 @@ public ReusingBuildSecondReOpenableHashJoinIterator( IOManager ioManager, AbstractInvokable ownerTask, double memoryFraction, - boolean joinWithEmptyBuildSide, + boolean probeSideOuterJoin, + boolean buildSideOuterJoin, boolean useBitmapFilters) throws MemoryAllocationException { super(firstInput, secondInput, serializer1, comparator1, serializer2, comparator2, pairComparator, memManager, ioManager, ownerTask, - memoryFraction, joinWithEmptyBuildSide, useBitmapFilters); + memoryFraction, probeSideOuterJoin, buildSideOuterJoin, useBitmapFilters); reopenHashTable = (ReOpenableMutableHashTable) hashJoin; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BitSet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BitSet.java new file mode 100644 index 0000000000000..6054727954996 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/BitSet.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.operators.util; + +import com.google.common.base.Preconditions; +import org.apache.flink.core.memory.MemorySegment; + +public class BitSet { + private MemorySegment memorySegment; + + // MemorySegment byte array offset. + private int offset; + + // The BitSet byte size. + private int byteLength; + + // The BitSet bit size. + private int bitLength; + + private final int BYTE_POSITION_MASK = 0xfffffff8; + private final int BYTE_INDEX_MASK = 0x00000007; + + public BitSet(int byteSize) { + Preconditions.checkArgument(byteSize > 0, "bits size should be greater than 0."); + this.byteLength = byteSize; + this.bitLength = byteSize << 3; + } + + public void setMemorySegment(MemorySegment memorySegment, int offset) { + Preconditions.checkArgument(memorySegment != null, "MemorySegment can not be null."); + Preconditions.checkArgument(offset >= 0, "Offset should be positive integer."); + Preconditions.checkArgument(offset + byteLength <= memorySegment.size(), + "Could not set MemorySegment, the remain buffers is not enough."); + this.memorySegment = memorySegment; + this.offset = offset; + } + + /** + * Sets the bit at specified index. + * + * @param index - position + */ + public void set(int index) { + Preconditions.checkArgument(index < bitLength && index >= 0, + String.format("Input Index[%d] is larger than BitSet available size[%d].", index, bitLength)); + + int byteIndex = (index & BYTE_POSITION_MASK) >>> 3; + byte current = memorySegment.get(offset + byteIndex); + current |= (1 << (index & BYTE_INDEX_MASK)); + memorySegment.put(offset + byteIndex, current); + } + + /** + * Returns true if the bit is set in the specified index. + * + * @param index - position + * @return - value at the bit position + */ + public boolean get(int index) { + Preconditions.checkArgument(index < bitLength && index >= 0, + String.format("Input Index[%d] is larger than BitSet available size[%d].", index, bitLength)); + + int byteIndex = (index & BYTE_POSITION_MASK) >>> 3; + byte current = memorySegment.get(offset + byteIndex); + return (current & (1 << (index & BYTE_INDEX_MASK))) != 0; + } + + /** + * Number of bits + */ + public int bitSize() { + return bitLength; + } + + /** + * Clear the bit set. + */ + public void clear() { + for (int i = 0; i < byteLength; i++) { + memorySegment.put(offset + i, (byte) 0); + } + } + + @Override + public String toString() { + StringBuilder output = new StringBuilder(); + output.append("BitSet:\n"); + output.append("\tMemorySegment:").append(memorySegment.size()).append("\n"); + output.append("\tOffset:").append(offset).append("\n"); + output.append("\tLength:").append(byteLength).append("\n"); + return output.toString(); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java index 4afa11434c92a..ed709c0376113 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableITCase.java @@ -172,7 +172,7 @@ public void testInMemoryMutableHashTable() throws IOException int numRecordsInJoinResult = 0; while (join.nextRecord()) { - HashBucketIterator buildSide = join.getBuildSideIterator(); + MutableObjectIterator buildSide = join.getBuildSideIterator(); while (buildSide.next(recordReuse) != null) { numRecordsInJoinResult++; } @@ -222,7 +222,7 @@ public void testSpillingHashJoinOneRecursionPerformance() throws IOException int numRecordsInJoinResult = 0; while (join.nextRecord()) { - HashBucketIterator buildSide = join.getBuildSideIterator(); + MutableObjectIterator buildSide = join.getBuildSideIterator(); while (buildSide.next(recordReuse) != null) { numRecordsInJoinResult++; } @@ -279,7 +279,7 @@ public void testSpillingHashJoinOneRecursionValidity() throws IOException int key = 0; - HashBucketIterator buildSide = join.getBuildSideIterator(); + MutableObjectIterator buildSide = join.getBuildSideIterator(); if ((record = buildSide.next(recordReuse)) != null) { numBuildValues = 1; key = record.getField(0, IntValue.class).getValue(); @@ -392,7 +392,7 @@ public void testSpillingHashJoinWithMassiveCollisions() throws IOException final Record probeRec = join.getCurrentProbeRecord(); int key = probeRec.getField(0, IntValue.class).getValue(); - HashBucketIterator buildSide = join.getBuildSideIterator(); + MutableObjectIterator buildSide = join.getBuildSideIterator(); if ((record = buildSide.next(recordReuse)) != null) { numBuildValues = 1; Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue()); @@ -505,7 +505,7 @@ public void testSpillingHashJoinWithTwoRecursions() throws IOException final Record probeRec = join.getCurrentProbeRecord(); int key = probeRec.getField(0, IntValue.class).getValue(); - HashBucketIterator buildSide = join.getBuildSideIterator(); + MutableObjectIterator buildSide = join.getBuildSideIterator(); if ((record = buildSide.next(recordReuse)) != null) { numBuildValues = 1; Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getField(0, IntValue.class).getValue()); @@ -608,7 +608,7 @@ public void testFailingHashJoinTooManyRecursions() throws IOException try { while (join.nextRecord()) { - HashBucketIterator buildSide = join.getBuildSideIterator(); + MutableObjectIterator buildSide = join.getBuildSideIterator(); if (buildSide.next(recordReuse) == null) { fail("No build side values found for a probe key."); } @@ -666,7 +666,7 @@ public void testSparseProbeSpilling() throws IOException, MemoryAllocationExcept int numRecordsInJoinResult = 0; while (join.nextRecord()) { - HashBucketIterator buildSide = join.getBuildSideIterator(); + MutableObjectIterator buildSide = join.getBuildSideIterator(); while (buildSide.next(recordReuse) != null) { numRecordsInJoinResult++; } @@ -715,7 +715,7 @@ public void validateSpillingDuringInsertion() throws IOException, MemoryAllocati * NUM_PROBE_VALS; while (join.nextRecord()) { - HashBucketIterator buildSide = join.getBuildSideIterator(); + MutableObjectIterator buildSide = join.getBuildSideIterator(); while (buildSide.next(recordReuse) != null) { numRecordsInJoinResult++; } @@ -770,7 +770,7 @@ public void testInMemoryMutableHashTableIntPair() throws IOException int numRecordsInJoinResult = 0; while (join.nextRecord()) { - HashBucketIterator buildSide = join.getBuildSideIterator(); + MutableObjectIterator buildSide = join.getBuildSideIterator(); while (buildSide.next(recordReuse) != null) { numRecordsInJoinResult++; } @@ -819,7 +819,7 @@ public void testSpillingHashJoinOneRecursionPerformanceIntPair() throws IOExcept int numRecordsInJoinResult = 0; while (join.nextRecord()) { - HashBucketIterator buildSide = join.getBuildSideIterator(); + MutableObjectIterator buildSide = join.getBuildSideIterator(); while (buildSide.next(recordReuse) != null) { numRecordsInJoinResult++; } @@ -879,7 +879,7 @@ public void testSpillingHashJoinOneRecursionValidityIntPair() throws IOException int key = 0; - HashBucketIterator buildSide = join.getBuildSideIterator(); + MutableObjectIterator buildSide = join.getBuildSideIterator(); if ((record = buildSide.next(recordReuse)) != null) { numBuildValues = 1; key = record.getKey(); @@ -995,7 +995,7 @@ public void testSpillingHashJoinWithMassiveCollisionsIntPair() throws IOExceptio final IntPair probeRec = join.getCurrentProbeRecord(); int key = probeRec.getKey(); - HashBucketIterator buildSide = join.getBuildSideIterator(); + MutableObjectIterator buildSide = join.getBuildSideIterator(); if ((record = buildSide.next(recordReuse)) != null) { numBuildValues = 1; Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getKey()); @@ -1107,7 +1107,7 @@ public void testSpillingHashJoinWithTwoRecursionsIntPair() throws IOException final IntPair probeRec = join.getCurrentProbeRecord(); int key = probeRec.getKey(); - HashBucketIterator buildSide = join.getBuildSideIterator(); + MutableObjectIterator buildSide = join.getBuildSideIterator(); if ((record = buildSide.next(recordReuse)) != null) { numBuildValues = 1; Assert.assertEquals("Probe-side key was different than build-side key.", key, record.getKey()); @@ -1210,7 +1210,7 @@ public void testFailingHashJoinTooManyRecursionsIntPair() throws IOException try { while (join.nextRecord()) { - HashBucketIterator buildSide = join.getBuildSideIterator(); + MutableObjectIterator buildSide = join.getBuildSideIterator(); if (buildSide.next(recordReuse) == null) { fail("No build side values found for a probe key."); } @@ -1267,7 +1267,7 @@ public void testSparseProbeSpillingIntPair() throws IOException, MemoryAllocatio int numRecordsInJoinResult = 0; while (join.nextRecord()) { - HashBucketIterator buildSide = join.getBuildSideIterator(); + MutableObjectIterator buildSide = join.getBuildSideIterator(); while (buildSide.next(recordReuse) != null) { numRecordsInJoinResult++; } @@ -1316,7 +1316,7 @@ public void validateSpillingDuringInsertionIntPair() throws IOException, MemoryA * NUM_PROBE_VALS; while (join.nextRecord()) { - HashBucketIterator buildSide = join.getBuildSideIterator(); + MutableObjectIterator buildSide = join.getBuildSideIterator(); while (buildSide.next(recordReuse) != null) { numRecordsInJoinResult++; } @@ -1363,7 +1363,7 @@ public void testInMemoryReOpen() throws IOException int numRecordsInJoinResult = 0; while (join.nextRecord()) { - HashBucketIterator buildSide = join.getBuildSideIterator(); + MutableObjectIterator buildSide = join.getBuildSideIterator(); while (buildSide.next(recordReuse) != null) { numRecordsInJoinResult++; } @@ -1386,7 +1386,7 @@ public void testInMemoryReOpen() throws IOException numRecordsInJoinResult = 0; while (join.nextRecord()) { - HashBucketIterator buildSide = join.getBuildSideIterator(); + MutableObjectIterator buildSide = join.getBuildSideIterator(); while (buildSide.next(recordReuse) != null) { numRecordsInJoinResult++; } @@ -1439,7 +1439,7 @@ public void testInMemoryReOpenWithSmallMemory() throws Exception { int numRecordsInJoinResult = 0; while (join.nextRecord()) { - HashBucketIterator buildSide = join.getBuildSideIterator(); + MutableObjectIterator buildSide = join.getBuildSideIterator(); while (buildSide.next(recordReuse) != null) { numRecordsInJoinResult++; } @@ -1462,7 +1462,7 @@ public void testInMemoryReOpenWithSmallMemory() throws Exception { numRecordsInJoinResult = 0; while (join.nextRecord()) { - HashBucketIterator buildSide = join.getBuildSideIterator(); + MutableObjectIterator buildSide = join.getBuildSideIterator(); while (buildSide.next(recordReuse) != null) { numRecordsInJoinResult++; } @@ -1524,7 +1524,7 @@ public void testBucketsNotFulfillSegment() throws Exception { int numRecordsInJoinResult = 0; while (join.nextRecord()) { - HashBucketIterator buildSide = join.getBuildSideIterator(); + MutableObjectIterator buildSide = join.getBuildSideIterator(); while (buildSide.next(recordReuse) != null) { numRecordsInJoinResult++; } @@ -1534,6 +1534,106 @@ public void testBucketsNotFulfillSegment() throws Exception { join.close(); this.memManager.release(join.getFreedMemory()); } + + @Test + public void testHashWithBuildSideOuterJoin1() throws Exception { + final int NUM_KEYS = 20000; + final int BUILD_VALS_PER_KEY = 1; + final int PROBE_VALS_PER_KEY = 1; + + // create a build input that gives 40000 pairs with 1 values sharing the same key + MutableObjectIterator buildInput = new UniformIntPairGenerator(2 * NUM_KEYS, BUILD_VALS_PER_KEY, false); + + // create a probe input that gives 20000 pairs with 1 values sharing a key + MutableObjectIterator probeInput = new UniformIntPairGenerator(NUM_KEYS, PROBE_VALS_PER_KEY, true); + + // allocate the memory for the HashTable + List memSegments; + try { + // 33 is minimum number of pages required to perform hash join this inputs + memSegments = this.memManager.allocatePages(MEM_OWNER, 33); + } + catch (MemoryAllocationException maex) { + fail("Memory for the Join could not be provided."); + return; + } + + // ---------------------------------------------------------------------------------------- + + final MutableHashTable join = new MutableHashTable( + this.pairBuildSideAccesssor, this.pairProbeSideAccesssor, + this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator, + memSegments, ioManager); + join.open(buildInput, probeInput, true); + + final IntPair recordReuse = new IntPair(); + int numRecordsInJoinResult = 0; + + while (join.nextRecord()) { + MutableObjectIterator buildSide = join.getBuildSideIterator(); + while (buildSide.next(recordReuse) != null) { + numRecordsInJoinResult++; + } + } + Assert.assertEquals("Wrong number of records in join result.", 2 * NUM_KEYS * BUILD_VALS_PER_KEY * PROBE_VALS_PER_KEY, numRecordsInJoinResult); + + join.close(); + this.memManager.release(join.getFreedMemory()); + } + + @Test + public void testHashWithBuildSideOuterJoin2() throws Exception { + final int NUM_KEYS = 40000; + final int BUILD_VALS_PER_KEY = 2; + final int PROBE_VALS_PER_KEY = 1; + + // The keys of probe and build sides are overlapped, so there would be none unmatched build elements + // after probe phase, make sure build side outer join works well in this case. + + // create a build input that gives 80000 pairs with 2 values sharing the same key + MutableObjectIterator buildInput = new UniformIntPairGenerator(NUM_KEYS, BUILD_VALS_PER_KEY, false); + + // create a probe input that gives 40000 pairs with 1 values sharing a key + MutableObjectIterator probeInput = new UniformIntPairGenerator(NUM_KEYS, PROBE_VALS_PER_KEY, true); + + // allocate the memory for the HashTable + List memSegments; + try { + // 33 is minimum number of pages required to perform hash join this inputs + memSegments = this.memManager.allocatePages(MEM_OWNER, 33); + } + catch (MemoryAllocationException maex) { + fail("Memory for the Join could not be provided."); + return; + } + + // ---------------------------------------------------------------------------------------- + + final MutableHashTable join = new MutableHashTable( + this.pairBuildSideAccesssor, this.pairProbeSideAccesssor, + this.pairBuildSideComparator, this.pairProbeSideComparator, this.pairComparator, + memSegments, ioManager); + join.open(buildInput, probeInput, true); + + final IntPair recordReuse = new IntPair(); + int numRecordsInJoinResult = 0; + + while (join.nextRecord()) { + MutableObjectIterator buildSide = join.getBuildSideIterator(); + IntPair next = buildSide.next(recordReuse); + if (next == null && join.getCurrentProbeRecord() == null) { + fail("Should not return join result that both probe and build element are null."); + } + while (next != null) { + numRecordsInJoinResult++; + next = buildSide.next(recordReuse); + } + } + Assert.assertEquals("Wrong number of records in join result.", NUM_KEYS * BUILD_VALS_PER_KEY * PROBE_VALS_PER_KEY, numRecordsInJoinResult); + + join.close(); + this.memManager.release(join.getFreedMemory()); + } // ============================================================================================ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java index 6ef6d47f2a0a4..7c385fc661a1c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/HashTableTest.java @@ -131,7 +131,7 @@ public void testBufferMissingForProbing() { try { while (table.nextRecord()) { - MutableHashTable.HashBucketIterator, Long> matches = table.getBuildSideIterator(); + MutableObjectIterator> matches = table.getBuildSideIterator(); while (matches.next() != null); } } @@ -240,7 +240,7 @@ public void testSpillingWhenBuildingTableWithoutOverflow() throws Exception { new ByteArrayIterator(1, 128,(byte) 1))); while(table.nextRecord()) { - MutableHashTable.HashBucketIterator iterator = table.getBuildSideIterator(); + MutableObjectIterator iterator = table.getBuildSideIterator(); int counter = 0; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashJoinIteratorITCase.java index aff78135e6672..cc5c4726f58bb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashJoinIteratorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingHashJoinIteratorITCase.java @@ -147,7 +147,7 @@ public void testBuildFirst() { new NonReusingBuildFirstHashJoinIterator<>( input1, input2, this.recordSerializer, this.record1Comparator, this.recordSerializer, this.record2Comparator, this.recordPairComparator, - this.memoryManager, ioManager, this.parentTask, 1.0, false, true); + this.memoryManager, ioManager, this.parentTask, 1.0, false, false, true); iterator.open(); @@ -234,7 +234,7 @@ public void testBuildFirstWithHighNumberOfCommonKeys() new NonReusingBuildFirstHashJoinIterator<>( input1, input2, this.recordSerializer, this.record1Comparator, this.recordSerializer, this.record2Comparator, this.recordPairComparator, - this.memoryManager, ioManager, this.parentTask, 1.0, false, true); + this.memoryManager, ioManager, this.parentTask, 1.0, false, false, true); iterator.open(); @@ -283,7 +283,7 @@ public void testBuildSecond() { new NonReusingBuildSecondHashJoinIterator<>( input1, input2, this.recordSerializer, this.record1Comparator, this.recordSerializer, this.record2Comparator, this.recordPairComparator, - this.memoryManager, ioManager, this.parentTask, 1.0, false, true); + this.memoryManager, ioManager, this.parentTask, 1.0, false, false, true); iterator.open(); @@ -370,7 +370,7 @@ public void testBuildSecondWithHighNumberOfCommonKeys() new NonReusingBuildSecondHashJoinIterator<>( input1, input2, this.recordSerializer, this.record1Comparator, this.recordSerializer, this.record2Comparator, this.recordPairComparator, - this.memoryManager, ioManager, this.parentTask, 1.0, false, true); + this.memoryManager, ioManager, this.parentTask, 1.0, false, false, true); iterator.open(); @@ -417,7 +417,7 @@ public void testBuildFirstWithMixedDataTypes() { new NonReusingBuildSecondHashJoinIterator<>( input1, input2, this.pairSerializer, this.pairComparator, this.recordSerializer, this.record2Comparator, this.pairRecordPairComparator, - this.memoryManager, this.ioManager, this.parentTask, 1.0, false, true); + this.memoryManager, this.ioManager, this.parentTask, 1.0, false, false, true); iterator.open(); @@ -464,7 +464,7 @@ public void testBuildSecondWithMixedDataTypes() { new NonReusingBuildFirstHashJoinIterator<>( input1, input2, this.pairSerializer, this.pairComparator, this.recordSerializer, this.record2Comparator, this.recordPairPairComparator, - this.memoryManager, this.ioManager, this.parentTask, 1.0, false, true); + this.memoryManager, this.ioManager, this.parentTask, 1.0, false, false, true); iterator.open(); @@ -484,43 +484,43 @@ public void testBuildSecondWithMixedDataTypes() { Assert.fail("An exception occurred during the test: " + e.getMessage()); } } - + @Test - public void testBuildFirstJoinWithEmptyBuild() { + public void testBuildFirstAndProbeSideOuterJoin() { try { TupleGenerator generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); TupleGenerator generator2 = new TupleGenerator(SEED2, 1000, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); - + final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE); final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE); - + // collect expected data final Map> expectedMatchesMap = rightOuterJoinTuples( collectTupleData(input1), collectTupleData(input2)); - + final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap); final Collector> collector = new DiscardingOutputCollector<>(); - + // reset the generators generator1.reset(); generator2.reset(); input1.reset(); input2.reset(); - + // compare with iterator values NonReusingBuildFirstHashJoinIterator, Tuple2, Tuple2> iterator = new NonReusingBuildFirstHashJoinIterator<>( input1, input2, this.recordSerializer, this.record1Comparator, this.recordSerializer, this.record2Comparator, this.recordPairComparator, - this.memoryManager, ioManager, this.parentTask, 1.0, true, false); - + this.memoryManager, ioManager, this.parentTask, 1.0, true, false, false); + iterator.open(); - + while (iterator.callWithNextKey(matcher, collector)); - + iterator.close(); - + // assert that each expected match was seen for (Entry> entry : expectedMatchesMap.entrySet()) { if (!entry.getValue().isEmpty()) { @@ -533,43 +533,190 @@ public void testBuildFirstJoinWithEmptyBuild() { Assert.fail("An exception occurred during the test: " + e.getMessage()); } } - + + @Test + public void testBuildFirstAndBuildSideOuterJoin() { + try { + TupleGenerator generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); + TupleGenerator generator2 = new TupleGenerator(SEED2, 1000, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); + + final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE); + final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE); + + // collect expected data + final Map> expectedMatchesMap = leftOuterJoinTuples( + collectTupleData(input1), + collectTupleData(input2)); + + final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap); + final Collector> collector = new DiscardingOutputCollector<>(); + + // reset the generators + generator1.reset(); + generator2.reset(); + input1.reset(); + input2.reset(); + + // compare with iterator values + NonReusingBuildFirstHashJoinIterator, Tuple2, Tuple2> iterator = + new NonReusingBuildFirstHashJoinIterator<>( + input1, input2, this.recordSerializer, this.record1Comparator, + this.recordSerializer, this.record2Comparator, this.recordPairComparator, + this.memoryManager, ioManager, this.parentTask, 1.0, false, true, false); + + iterator.open(); + + while (iterator.callWithNextKey(matcher, collector)); + + iterator.close(); + + // assert that each expected match was seen + for (Entry> entry : expectedMatchesMap.entrySet()) { + if (!entry.getValue().isEmpty()) { + Assert.fail("Collection for key " + entry.getKey() + " is not empty"); + } + } + } + catch (Exception e) { + e.printStackTrace(); + Assert.fail("An exception occurred during the test: " + e.getMessage()); + } + } + @Test - public void testBuildSecondJoinWithEmptyBuild() { + public void testBuildFirstAndFullOuterJoin() { + try { + TupleGenerator generator1 = new TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); + TupleGenerator generator2 = new TupleGenerator(SEED2, 1000, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); + + final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE); + final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE); + + // collect expected data + final Map> expectedMatchesMap = fullOuterJoinTuples( + collectTupleData(input1), + collectTupleData(input2)); + + final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap); + final Collector> collector = new DiscardingOutputCollector<>(); + + // reset the generators + generator1.reset(); + generator2.reset(); + input1.reset(); + input2.reset(); + + // compare with iterator values + NonReusingBuildFirstHashJoinIterator, Tuple2, Tuple2> iterator = + new NonReusingBuildFirstHashJoinIterator<>( + input1, input2, this.recordSerializer, this.record1Comparator, + this.recordSerializer, this.record2Comparator, this.recordPairComparator, + this.memoryManager, ioManager, this.parentTask, 1.0, true, true, false); + + iterator.open(); + + while (iterator.callWithNextKey(matcher, collector)); + + iterator.close(); + + // assert that each expected match was seen + for (Entry> entry : expectedMatchesMap.entrySet()) { + if (!entry.getValue().isEmpty()) { + Assert.fail("Collection for key " + entry.getKey() + " is not empty"); + } + } + } + catch (Exception e) { + e.printStackTrace(); + Assert.fail("An exception occurred during the test: " + e.getMessage()); + } + } + + @Test + public void testBuildSecondAndProbeSideOuterJoin() { try { TupleGenerator generator1 = new TupleGenerator(SEED1, 1000, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); - + final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE); final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE); - + // collect expected data final Map> expectedMatchesMap = leftOuterJoinTuples( collectTupleData(input1), collectTupleData(input2)); - + final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap); final Collector> collector = new DiscardingOutputCollector<>(); - + // reset the generators generator1.reset(); generator2.reset(); input1.reset(); input2.reset(); - + // compare with iterator values NonReusingBuildSecondHashJoinIterator, Tuple2, Tuple2> iterator = new NonReusingBuildSecondHashJoinIterator<>( input1, input2, this.recordSerializer, this.record1Comparator, this.recordSerializer, this.record2Comparator, this.recordPairComparator, - this.memoryManager, ioManager, this.parentTask, 1.0, true, false); - + this.memoryManager, ioManager, this.parentTask, 1.0, true, false, false); + iterator.open(); - + while (iterator.callWithNextKey(matcher, collector)); - + iterator.close(); - + + // assert that each expected match was seen + for (Entry> entry : expectedMatchesMap.entrySet()) { + if (!entry.getValue().isEmpty()) { + Assert.fail("Collection for key " + entry.getKey() + " is not empty"); + } + } + } + catch (Exception e) { + e.printStackTrace(); + Assert.fail("An exception occurred during the test: " + e.getMessage()); + } + } + + @Test + public void testBuildSecondAndBuildSideOuterJoin() { + try { + TupleGenerator generator1 = new TupleGenerator(SEED1, 1000, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); + TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); + + final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE); + final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE); + + // collect expected data + final Map> expectedMatchesMap = rightOuterJoinTuples( + collectTupleData(input1), + collectTupleData(input2)); + + final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap); + final Collector> collector = new DiscardingOutputCollector<>(); + + // reset the generators + generator1.reset(); + generator2.reset(); + input1.reset(); + input2.reset(); + + // compare with iterator values + NonReusingBuildSecondHashJoinIterator, Tuple2, Tuple2> iterator = + new NonReusingBuildSecondHashJoinIterator<>( + input1, input2, this.recordSerializer, this.record1Comparator, + this.recordSerializer, this.record2Comparator, this.recordPairComparator, + this.memoryManager, ioManager, this.parentTask, 1.0, false, true, false); + + iterator.open(); + + while (iterator.callWithNextKey(matcher, collector)); + + iterator.close(); + // assert that each expected match was seen for (Entry> entry : expectedMatchesMap.entrySet()) { if (!entry.getValue().isEmpty()) { @@ -583,6 +730,55 @@ public void testBuildSecondJoinWithEmptyBuild() { } } + @Test + public void testBuildSecondAndFullOuterJoin() { + try { + TupleGenerator generator1 = new TupleGenerator(SEED1, 1000, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); + TupleGenerator generator2 = new TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); + + final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE); + final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE); + + // collect expected data + final Map> expectedMatchesMap = fullOuterJoinTuples( + collectTupleData(input1), + collectTupleData(input2)); + + final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap); + final Collector> collector = new DiscardingOutputCollector<>(); + + // reset the generators + generator1.reset(); + generator2.reset(); + input1.reset(); + input2.reset(); + + // compare with iterator values + NonReusingBuildSecondHashJoinIterator, Tuple2, Tuple2> iterator = + new NonReusingBuildSecondHashJoinIterator<>( + input1, input2, this.recordSerializer, this.record1Comparator, + this.recordSerializer, this.record2Comparator, this.recordPairComparator, + this.memoryManager, ioManager, this.parentTask, 1.0, true, true, false); + + iterator.open(); + + while (iterator.callWithNextKey(matcher, collector)); + + iterator.close(); + + // assert that each expected match was seen + for (Entry> entry : expectedMatchesMap.entrySet()) { + if (!entry.getValue().isEmpty()) { + Assert.fail("Collection for key " + entry.getKey() + " is not empty"); + } + } + } + catch (Exception e) { + e.printStackTrace(); + Assert.fail("An exception occurred during the test: " + e.getMessage()); + } + } + // -------------------------------------------------------------------------------------------- // Utilities // -------------------------------------------------------------------------------------------- @@ -680,6 +876,53 @@ public static Map> rightOuterJoinTuples( return map; } + + public static Map> fullOuterJoinTuples( + Map> leftMap, + Map> rightMap) + { + Map> map = new HashMap<>(); + + for (Integer key : rightMap.keySet()) { + Collection leftValues = leftMap.get(key); + Collection rightValues = rightMap.get(key); + + if (!map.containsKey(key)) { + map.put(key, new ArrayList()); + } + + Collection matchedValues = map.get(key); + + for (String rightValue : rightValues) { + if(leftValues != null) { + for (String leftValue : leftValues) { + matchedValues.add(new TupleMatch(leftValue, rightValue)); + } + } + else { + matchedValues.add(new TupleMatch(null, rightValue)); + } + } + } + + for (Integer key : leftMap.keySet()) { + Collection leftValues = leftMap.get(key); + Collection rightValues = rightMap.get(key); + if (rightValues == null) { + if (!map.containsKey(key)) { + map.put(key, new ArrayList()); + } + + Collection matchedValues = map.get(key); + + for (String leftValue : leftValues) { + matchedValues.add(new TupleMatch(leftValue, null)); + } + } + } + + return map; + } public static Map> joinIntPairs( Map> leftMap, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java index 814d0ed13724c..576cbd4f53372 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/NonReusingReOpenableHashTableITCase.java @@ -29,7 +29,6 @@ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.memory.MemoryAllocationException; import org.apache.flink.runtime.memory.MemoryManager; -import org.apache.flink.runtime.operators.hash.MutableHashTable.HashBucketIterator; import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatch; import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatchRemovingJoin; import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector; @@ -228,7 +227,7 @@ private void doTest(TestData.TupleGeneratorIterator buildInput, TestData.TupleGe new NonReusingBuildFirstReOpenableHashJoinIterator<>( buildInput, probeInput, this.recordSerializer, this.record1Comparator, this.recordSerializer, this.record2Comparator, this.recordPairComparator, - this.memoryManager, ioManager, this.parentTask, 1.0, false, true); + this.memoryManager, ioManager, this.parentTask, 1.0, false, false, true); iterator.open(); // do first join with both inputs @@ -342,8 +341,8 @@ public void testSpillingHashJoinWithMassiveCollisions() throws IOException final Tuple2 probeRec = join.getCurrentProbeRecord(); Integer key = probeRec.f0; - - HashBucketIterator, Tuple2> buildSide = join.getBuildSideIterator(); + + MutableObjectIterator> buildSide = join.getBuildSideIterator(); if ((record = buildSide.next(recordReuse)) != null) { numBuildValues = 1; Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0); @@ -456,8 +455,8 @@ public void testSpillingHashJoinWithTwoRecursions() throws IOException final Tuple2 probeRec = join.getCurrentProbeRecord(); Integer key = probeRec.f0; - - HashBucketIterator, Tuple2> buildSide = join.getBuildSideIterator(); + + MutableObjectIterator> buildSide = join.getBuildSideIterator(); if ((record = buildSide.next(recordReuse)) != null) { numBuildValues = 1; Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashJoinIteratorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashJoinIteratorITCase.java index af814c3b17332..1d846ed8edfc0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashJoinIteratorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingHashJoinIteratorITCase.java @@ -55,6 +55,7 @@ import org.junit.Before; import org.junit.Test; +import static org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.fullOuterJoinTuples; import static org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.joinTuples; import static org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.leftOuterJoinTuples; import static org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.rightOuterJoinTuples; @@ -155,7 +156,7 @@ public void testBuildFirst() { new ReusingBuildFirstHashJoinIterator<>( input1, input2, this.recordSerializer, this.record1Comparator, this.recordSerializer, this.record2Comparator, this.recordPairComparator, - this.memoryManager, ioManager, this.parentTask, 1.0, false, true); + this.memoryManager, ioManager, this.parentTask, 1.0, false, false, true); iterator.open(); @@ -242,7 +243,7 @@ public void testBuildFirstWithHighNumberOfCommonKeys() new ReusingBuildFirstHashJoinIterator<>( input1, input2, this.recordSerializer, this.record1Comparator, this.recordSerializer, this.record2Comparator, this.recordPairComparator, - this.memoryManager, ioManager, this.parentTask, 1.0, false, true); + this.memoryManager, ioManager, this.parentTask, 1.0, false, false, true); iterator.open(); @@ -291,7 +292,7 @@ public void testBuildSecond() { new ReusingBuildSecondHashJoinIterator<>( input1, input2, this.recordSerializer, this.record1Comparator, this.recordSerializer, this.record2Comparator, this.recordPairComparator, - this.memoryManager, ioManager, this.parentTask, 1.0, false, true); + this.memoryManager, ioManager, this.parentTask, 1.0, false, false, true); iterator.open(); @@ -378,7 +379,7 @@ public void testBuildSecondWithHighNumberOfCommonKeys() new ReusingBuildSecondHashJoinIterator<>( input1, input2, this.recordSerializer, this.record1Comparator, this.recordSerializer, this.record2Comparator, this.recordPairComparator, - this.memoryManager, ioManager, this.parentTask, 1.0, false, true); + this.memoryManager, ioManager, this.parentTask, 1.0, false, false, true); iterator.open(); @@ -425,7 +426,7 @@ public void testBuildFirstWithMixedDataTypes() { new ReusingBuildSecondHashJoinIterator<>( input1, input2, this.pairSerializer, this.pairComparator, this.recordSerializer, this.record2Comparator, this.pairRecordPairComparator, - this.memoryManager, this.ioManager, this.parentTask, 1.0, false, true); + this.memoryManager, this.ioManager, this.parentTask, 1.0, false, false, true); iterator.open(); @@ -472,7 +473,7 @@ public void testBuildSecondWithMixedDataTypes() { new ReusingBuildFirstHashJoinIterator<>( input1, input2, this.pairSerializer, this.pairComparator, this.recordSerializer, this.record2Comparator, this.recordPairPairComparator, - this.memoryManager, this.ioManager, this.parentTask, 1.0, false, true); + this.memoryManager, this.ioManager, this.parentTask, 1.0, false, false, true); iterator.open(); @@ -492,9 +493,9 @@ public void testBuildSecondWithMixedDataTypes() { Assert.fail("An exception occurred during the test: " + e.getMessage()); } } - + @Test - public void testBuildFirstJoinWithEmptyBuild() { + public void testBuildFirstAndProbeSideOuterJoin() { try { TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 1000, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); @@ -506,7 +507,7 @@ public void testBuildFirstJoinWithEmptyBuild() { final Map> expectedMatchesMap = rightOuterJoinTuples( collectTupleData(input1), collectTupleData(input2)); - + final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap); final Collector> collector = new DiscardingOutputCollector<>(); @@ -515,20 +516,69 @@ public void testBuildFirstJoinWithEmptyBuild() { generator2.reset(); input1.reset(); input2.reset(); - + // compare with iterator values ReusingBuildFirstHashJoinIterator, Tuple2, Tuple2> iterator = new ReusingBuildFirstHashJoinIterator<>( input1, input2, this.recordSerializer, this.record1Comparator, this.recordSerializer, this.record2Comparator, this.recordPairComparator, - this.memoryManager, ioManager, this.parentTask, 1.0, true, false); - + this.memoryManager, ioManager, this.parentTask, 1.0, true, false, false); + iterator.open(); - + while (iterator.callWithNextKey(matcher, collector)); - + iterator.close(); - + + // assert that each expected match was seen + for (Entry> entry : expectedMatchesMap.entrySet()) { + if (!entry.getValue().isEmpty()) { + Assert.fail("Collection for key " + entry.getKey() + " is not empty"); + } + } + } + catch (Exception e) { + e.printStackTrace(); + Assert.fail("An exception occurred during the test: " + e.getMessage()); + } + } + + @Test + public void testBuildFirstAndBuildSideOuterJoin() { + try { + TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); + TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 1000, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); + + final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE); + final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE); + + // collect expected data + final Map> expectedMatchesMap = leftOuterJoinTuples( + collectTupleData(input1), + collectTupleData(input2)); + + final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap); + final Collector> collector = new DiscardingOutputCollector<>(); + + // reset the generators + generator1.reset(); + generator2.reset(); + input1.reset(); + input2.reset(); + + // compare with iterator values + ReusingBuildFirstHashJoinIterator, Tuple2, Tuple2> iterator = + new ReusingBuildFirstHashJoinIterator<>( + input1, input2, this.recordSerializer, this.record1Comparator, + this.recordSerializer, this.record2Comparator, this.recordPairComparator, + this.memoryManager, ioManager, this.parentTask, 1.0, false, true, false); + + iterator.open(); + + while (iterator.callWithNextKey(matcher, collector)); + + iterator.close(); + // assert that each expected match was seen for (Entry> entry : expectedMatchesMap.entrySet()) { if (!entry.getValue().isEmpty()) { @@ -541,43 +591,190 @@ public void testBuildFirstJoinWithEmptyBuild() { Assert.fail("An exception occurred during the test: " + e.getMessage()); } } + + @Test + public void testBuildFirstAndFullOuterJoin() { + try { + TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, 500, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); + TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 1000, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); + + final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE); + final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE); + // collect expected data + final Map> expectedMatchesMap = fullOuterJoinTuples( + collectTupleData(input1), + collectTupleData(input2)); + + final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap); + final Collector> collector = new DiscardingOutputCollector<>(); + + // reset the generators + generator1.reset(); + generator2.reset(); + input1.reset(); + input2.reset(); + + // compare with iterator values + ReusingBuildFirstHashJoinIterator, Tuple2, Tuple2> iterator = + new ReusingBuildFirstHashJoinIterator<>( + input1, input2, this.recordSerializer, this.record1Comparator, + this.recordSerializer, this.record2Comparator, this.recordPairComparator, + this.memoryManager, ioManager, this.parentTask, 1.0, true, true, false); + + iterator.open(); + + while (iterator.callWithNextKey(matcher, collector)); + + iterator.close(); + + // assert that each expected match was seen + for (Entry> entry : expectedMatchesMap.entrySet()) { + if (!entry.getValue().isEmpty()) { + Assert.fail("Collection for key " + entry.getKey() + " is not empty"); + } + } + } + catch (Exception e) { + e.printStackTrace(); + Assert.fail("An exception occurred during the test: " + e.getMessage()); + } + } + @Test - public void testBuildSecondJoinWithEmptyBuild() { + public void testBuildSecondAndProbeSideOuterJoin() { try { TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, 1000, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); - + final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE); final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE); - + // collect expected data final Map> expectedMatchesMap = leftOuterJoinTuples( collectTupleData(input1), collectTupleData(input2)); - + final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap); final Collector> collector = new DiscardingOutputCollector<>(); - + // reset the generators generator1.reset(); generator2.reset(); input1.reset(); input2.reset(); - + // compare with iterator values ReusingBuildSecondHashJoinIterator, Tuple2, Tuple2> iterator = new ReusingBuildSecondHashJoinIterator<>( input1, input2, this.recordSerializer, this.record1Comparator, this.recordSerializer, this.record2Comparator, this.recordPairComparator, - this.memoryManager, ioManager, this.parentTask, 1.0, true, false); - + this.memoryManager, ioManager, this.parentTask, 1.0, true, false, false); + iterator.open(); - + while (iterator.callWithNextKey(matcher, collector)); - + iterator.close(); - + + // assert that each expected match was seen + for (Entry> entry : expectedMatchesMap.entrySet()) { + if (!entry.getValue().isEmpty()) { + Assert.fail("Collection for key " + entry.getKey() + " is not empty"); + } + } + } + catch (Exception e) { + e.printStackTrace(); + Assert.fail("An exception occurred during the test: " + e.getMessage()); + } + } + + @Test + public void testBuildSecondAndBuildSideOuterJoin() { + try { + TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, 1000, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); + TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); + + final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE); + final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE); + + // collect expected data + final Map> expectedMatchesMap = rightOuterJoinTuples( + collectTupleData(input1), + collectTupleData(input2)); + + final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap); + final Collector> collector = new DiscardingOutputCollector<>(); + + // reset the generators + generator1.reset(); + generator2.reset(); + input1.reset(); + input2.reset(); + + // compare with iterator values + ReusingBuildSecondHashJoinIterator, Tuple2, Tuple2> iterator = + new ReusingBuildSecondHashJoinIterator<>( + input1, input2, this.recordSerializer, this.record1Comparator, + this.recordSerializer, this.record2Comparator, this.recordPairComparator, + this.memoryManager, ioManager, this.parentTask, 1.0, false, true, false); + + iterator.open(); + + while (iterator.callWithNextKey(matcher, collector)); + + iterator.close(); + + // assert that each expected match was seen + for (Entry> entry : expectedMatchesMap.entrySet()) { + if (!entry.getValue().isEmpty()) { + Assert.fail("Collection for key " + entry.getKey() + " is not empty"); + } + } + } + catch (Exception e) { + e.printStackTrace(); + Assert.fail("An exception occurred during the test: " + e.getMessage()); + } + } + + @Test + public void testBuildSecondAndFullOuterJoin() { + try { + TestData.TupleGenerator generator1 = new TestData.TupleGenerator(SEED1, 1000, 4096, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); + TestData.TupleGenerator generator2 = new TestData.TupleGenerator(SEED2, 500, 2048, KeyMode.RANDOM, ValueMode.RANDOM_LENGTH); + + final TestData.TupleGeneratorIterator input1 = new TestData.TupleGeneratorIterator(generator1, INPUT_1_SIZE); + final TestData.TupleGeneratorIterator input2 = new TestData.TupleGeneratorIterator(generator2, INPUT_2_SIZE); + + // collect expected data + final Map> expectedMatchesMap = fullOuterJoinTuples( + collectTupleData(input1), + collectTupleData(input2)); + + final FlatJoinFunction matcher = new TupleMatchRemovingJoin(expectedMatchesMap); + final Collector> collector = new DiscardingOutputCollector<>(); + + // reset the generators + generator1.reset(); + generator2.reset(); + input1.reset(); + input2.reset(); + + // compare with iterator values + ReusingBuildSecondHashJoinIterator, Tuple2, Tuple2> iterator = + new ReusingBuildSecondHashJoinIterator<>( + input1, input2, this.recordSerializer, this.record1Comparator, + this.recordSerializer, this.record2Comparator, this.recordPairComparator, + this.memoryManager, ioManager, this.parentTask, 1.0, true, true, false); + + iterator.open(); + + while (iterator.callWithNextKey(matcher, collector)); + + iterator.close(); + // assert that each expected match was seen for (Entry> entry : expectedMatchesMap.entrySet()) { if (!entry.getValue().isEmpty()) { @@ -590,7 +787,6 @@ public void testBuildSecondJoinWithEmptyBuild() { Assert.fail("An exception occurred during the test: " + e.getMessage()); } } - // -------------------------------------------------------------------------------------------- // Utilities diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java index d2229477a6af0..6afde162d71e4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/hash/ReusingReOpenableHashTableITCase.java @@ -44,7 +44,6 @@ import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatchRemovingJoin; import org.apache.flink.runtime.operators.hash.NonReusingHashJoinIteratorITCase.TupleMatch; -import org.apache.flink.runtime.operators.hash.MutableHashTable.HashBucketIterator; import org.apache.flink.runtime.operators.testutils.DiscardingOutputCollector; import org.apache.flink.runtime.operators.testutils.DummyInvokable; import org.apache.flink.runtime.operators.testutils.TestData; @@ -230,7 +229,7 @@ private void doTest(TestData.TupleGeneratorIterator buildInput, TestData.TupleGe new ReusingBuildFirstReOpenableHashJoinIterator<>( buildInput, probeInput, this.recordSerializer, this.record1Comparator, this.recordSerializer, this.record2Comparator, this.recordPairComparator, - this.memoryManager, ioManager, this.parentTask, 1.0, false, true); + this.memoryManager, ioManager, this.parentTask, 1.0, false, false, true); iterator.open(); // do first join with both inputs @@ -341,7 +340,7 @@ public void testSpillingHashJoinWithMassiveCollisions() throws IOException { final Tuple2 probeRec = join.getCurrentProbeRecord(); Integer key = probeRec.f0; - HashBucketIterator, Tuple2> buildSide = join.getBuildSideIterator(); + MutableObjectIterator> buildSide = join.getBuildSideIterator(); if ((record = buildSide.next(recordReuse)) != null) { numBuildValues = 1; Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0); @@ -456,7 +455,7 @@ public void testSpillingHashJoinWithTwoRecursions() throws IOException final Tuple2 probeRec = join.getCurrentProbeRecord(); Integer key = probeRec.f0; - HashBucketIterator, Tuple2> buildSide = join.getBuildSideIterator(); + MutableObjectIterator> buildSide = join.getBuildSideIterator(); if ((record = buildSide.next(recordReuse)) != null) { numBuildValues = 1; Assert.assertEquals("Probe-side key was different than build-side key.", key, record.f0); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BitSetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BitSetTest.java new file mode 100644 index 0000000000000..ec8ae2b4cf861 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/BitSetTest.java @@ -0,0 +1,86 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.operators.util; + +import org.apache.flink.core.memory.MemorySegment; +import org.apache.flink.core.memory.MemorySegmentFactory; +import org.junit.Before; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class BitSetTest { + + private BitSet bitSet; + int byteSize = 1024; + MemorySegment memorySegment = MemorySegmentFactory.allocateUnpooledSegment(byteSize); + + @Before + public void init() { + bitSet = new BitSet(byteSize); + bitSet.setMemorySegment(memorySegment, 0); + bitSet.clear(); + } + + @Test(expected = IllegalArgumentException.class) + public void verifyBitSetSize1() { + bitSet.setMemorySegment(memorySegment, 1); + } + + @Test(expected = IllegalArgumentException.class) + public void verifyBitSetSize2() { + bitSet.setMemorySegment(null, 1); + } + + @Test(expected = IllegalArgumentException.class) + public void verifyBitSetSize3() { + bitSet.setMemorySegment(memorySegment, -1); + } + + @Test(expected = IllegalArgumentException.class) + public void verifyInputIndex1() { + bitSet.set(8 * byteSize + 1); + } + + @Test(expected = IllegalArgumentException.class) + public void verifyInputIndex2() { + bitSet.set(-1); + } + + @Test + public void testSetValues() { + int bitSize = bitSet.bitSize(); + assertEquals(bitSize, 8 * 1024); + for (int i = 0; i < bitSize; i++) { + assertFalse(bitSet.get(i)); + if (i % 2 == 0) { + bitSet.set(i); + } + } + + for (int i = 0; i < bitSize; i++) { + if (i % 2 == 0) { + assertTrue(bitSet.get(i)); + } else { + assertFalse(bitSet.get(i)); + } + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java index 4f18494fbf8c5..6205de4d381a7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/util/HashVsSortMiniBenchmark.java @@ -130,11 +130,11 @@ public void testSortBothMerge() { final UnilateralSortMerger> sorter1 = new UnilateralSortMerger<>( this.memoryManager, this.ioManager, input1, this.parentTask, this.serializer1, - this.comparator1.duplicate(), MEMORY_FOR_SORTER, 128, 0.8f, true); + this.comparator1.duplicate(), (double)MEMORY_FOR_SORTER/MEMORY_SIZE, 128, 0.8f, true); final UnilateralSortMerger> sorter2 = new UnilateralSortMerger<>( this.memoryManager, this.ioManager, input2, this.parentTask, this.serializer2, - this.comparator2.duplicate(), MEMORY_FOR_SORTER, 128, 0.8f, true); + this.comparator2.duplicate(), (double)MEMORY_FOR_SORTER/MEMORY_SIZE, 128, 0.8f, true); final MutableObjectIterator> sortedInput1 = sorter1.getIterator(); final MutableObjectIterator> sortedInput2 = sorter2.getIterator(); @@ -184,7 +184,7 @@ public void testBuildFirst() { new ReusingBuildFirstHashJoinIterator<>( input1, input2, this.serializer1.getSerializer(), this.comparator1, this.serializer2.getSerializer(), this.comparator2, this.pairComparator11, - this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE, false, true); + this.memoryManager, this.ioManager, this.parentTask, 1, false, false, true); iterator.open(); @@ -223,7 +223,7 @@ public void testBuildSecond() { new ReusingBuildSecondHashJoinIterator<>( input1, input2, this.serializer1.getSerializer(), this.comparator1, this.serializer2.getSerializer(), this.comparator2, this.pairComparator11, - this.memoryManager, this.ioManager, this.parentTask, MEMORY_SIZE, false, true); + this.memoryManager, this.ioManager, this.parentTask, 1, false, false, true); iterator.open(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/OuterJoinITCase.java b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/OuterJoinITCase.java index c2dca6655d2b5..5215a36623169 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/OuterJoinITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/javaApiOperators/OuterJoinITCase.java @@ -18,6 +18,7 @@ package org.apache.flink.test.javaApiOperators; +import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.functions.FlatJoinFunction; import org.apache.flink.api.common.functions.JoinFunction; import org.apache.flink.api.common.functions.RichFlatJoinFunction; @@ -30,6 +31,7 @@ import org.apache.flink.api.java.tuple.Tuple5; import org.apache.flink.api.java.tuple.Tuple7; import org.apache.flink.configuration.Configuration; +import org.apache.flink.optimizer.CompilerException; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.CustomType; import org.apache.flink.test.javaApiOperators.util.CollectionDataSets.POJO; @@ -57,14 +59,24 @@ public void testLeftOuterJoin1() throws Exception { @Test public void testLeftOuterJoin2() throws Exception { - testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_HASH_SECOND); + testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_HASH_FIRST); } @Test public void testLeftOuterJoin3() throws Exception { + testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_HASH_SECOND); + } + + @Test + public void testLeftOuterJoin4() throws Exception { testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint.BROADCAST_HASH_SECOND); } + @Test (expected = InvalidProgramException.class) + public void testLeftOuterJoin5() throws Exception { + testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint.BROADCAST_HASH_FIRST); + } + private void testLeftOuterJoinOnTuplesWithKeyPositions(JoinHint hint) throws Exception { /* * UDF Join on tuples with key field positions @@ -102,9 +114,19 @@ public void testRightOuterJoin2() throws Exception { @Test public void testRightOuterJoin3() throws Exception { + testRightOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_HASH_SECOND); + } + + @Test + public void testRightOuterJoin4() throws Exception { testRightOuterJoinOnTuplesWithKeyPositions(JoinHint.BROADCAST_HASH_FIRST); } + @Test (expected = InvalidProgramException.class) + public void testRightOuterJoin5() throws Exception { + testRightOuterJoinOnTuplesWithKeyPositions(JoinHint.BROADCAST_HASH_SECOND); + } + private void testRightOuterJoinOnTuplesWithKeyPositions(JoinHint hint) throws Exception { /* * UDF Join on tuples with key field positions @@ -135,6 +157,26 @@ public void testFullOuterJoin1() throws Exception { testFullOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_SORT_MERGE); } + @Test + public void testFullOuterJoin2() throws Exception { + testFullOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_HASH_FIRST); + } + + @Test + public void testFullOuterJoin3() throws Exception { + testFullOuterJoinOnTuplesWithKeyPositions(JoinHint.REPARTITION_HASH_SECOND); + } + + @Test (expected = InvalidProgramException.class) + public void testFullOuterJoin4() throws Exception { + testFullOuterJoinOnTuplesWithKeyPositions(JoinHint.BROADCAST_HASH_FIRST); + } + + @Test (expected = InvalidProgramException.class) + public void testFullOuterJoin5() throws Exception { + testFullOuterJoinOnTuplesWithKeyPositions(JoinHint.BROADCAST_HASH_SECOND); + } + private void testFullOuterJoinOnTuplesWithKeyPositions(JoinHint hint) throws Exception { /* * UDF Join on tuples with key field positions diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java b/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java index cb20e72e42b34..06921967b62b5 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java +++ b/flink-tests/src/test/java/org/apache/flink/test/manual/HashTableRecordWidthCombinations.java @@ -150,7 +150,7 @@ public Long next() { try { while (table.nextRecord()) { - MutableHashTable.HashBucketIterator, Long> matches = table.getBuildSideIterator(); + MutableObjectIterator> matches = table.getBuildSideIterator(); while (matches.next() != null); } }