Skip to content

Commit

Permalink
[FLINK-2871] support outer join for hash on build side.
Browse files Browse the repository at this point in the history
this commit support full outer join includes:
    1. left outer join with REPARTITION_HASH_FIRST.
    2. right outer join with REPARTITION_HASH_SECOND.
    3. fullouter join with REPARTITION_HASH_FIRST and REPARTITION_HASH_SECOND.

this close #1469
  • Loading branch information
chengxiang li committed Jan 27, 2016
1 parent a7d4334 commit eaf540f
Show file tree
Hide file tree
Showing 37 changed files with 1,754 additions and 253 deletions.
Expand Up @@ -841,6 +841,7 @@ public <R> JoinOperatorSetsBase<T, R> leftOuterJoin(DataSet<R> other, JoinHint s
switch(strategy) {
case OPTIMIZER_CHOOSES:
case REPARTITION_SORT_MERGE:
case REPARTITION_HASH_FIRST:
case REPARTITION_HASH_SECOND:
case BROADCAST_HASH_SECOND:
return new JoinOperatorSetsBase<>(this, other, strategy, JoinType.LEFT_OUTER);
Expand Down Expand Up @@ -891,6 +892,7 @@ public <R> JoinOperatorSetsBase<T, R> rightOuterJoin(DataSet<R> other, JoinHint
case OPTIMIZER_CHOOSES:
case REPARTITION_SORT_MERGE:
case REPARTITION_HASH_FIRST:
case REPARTITION_HASH_SECOND:
case BROADCAST_HASH_FIRST:
return new JoinOperatorSetsBase<>(this, other, strategy, JoinType.RIGHT_OUTER);
default:
Expand Down Expand Up @@ -938,6 +940,8 @@ public <R> JoinOperatorSetsBase<T, R> fullOuterJoin(DataSet<R> other, JoinHint s
switch(strategy) {
case OPTIMIZER_CHOOSES:
case REPARTITION_SORT_MERGE:
case REPARTITION_HASH_FIRST:
case REPARTITION_HASH_SECOND:
return new JoinOperatorSetsBase<>(this, other, strategy, JoinType.FULL_OUTER);
default:
throw new InvalidProgramException("Invalid JoinHint for FullOuterJoin: "+strategy);
Expand Down
Expand Up @@ -181,7 +181,7 @@ public void testFullOuterStrategy2() {
this.testFullOuterStrategies(JoinHint.REPARTITION_SORT_MERGE);
}

@Test(expected = InvalidProgramException.class)
@Test
public void testFullOuterStrategy3() {
this.testFullOuterStrategies(JoinHint.REPARTITION_HASH_SECOND);
}
Expand All @@ -191,7 +191,7 @@ public void testFullOuterStrategy4() {
this.testFullOuterStrategies(JoinHint.BROADCAST_HASH_SECOND);
}

@Test(expected = InvalidProgramException.class)
@Test
public void testFullOuterStrategy5() {
this.testFullOuterStrategies(JoinHint.REPARTITION_HASH_FIRST);
}
Expand Down
Expand Up @@ -192,7 +192,7 @@ public void testLeftOuterStrategy4() {
this.testLeftOuterStrategies(JoinHint.BROADCAST_HASH_SECOND);
}

@Test(expected = InvalidProgramException.class)
@Test
public void testLeftOuterStrategy5() {
this.testLeftOuterStrategies(JoinHint.REPARTITION_HASH_FIRST);
}
Expand Down
Expand Up @@ -181,7 +181,7 @@ public void testRightOuterStrategy2() {
this.testRightOuterStrategies(JoinHint.REPARTITION_SORT_MERGE);
}

@Test(expected = InvalidProgramException.class)
@Test
public void testRightOuterStrategy3() {
this.testRightOuterStrategies(JoinHint.REPARTITION_HASH_SECOND);
}
Expand Down
Expand Up @@ -202,10 +202,14 @@ public void costOperator(PlanNode n) {
break;
case HYBRIDHASH_BUILD_FIRST:
case RIGHT_HYBRIDHASH_BUILD_FIRST:
case LEFT_HYBRIDHASH_BUILD_FIRST:
case FULL_OUTER_HYBRIDHASH_BUILD_FIRST:
addHybridHashCosts(firstInput, secondInput, driverCosts, costWeight);
break;
case HYBRIDHASH_BUILD_SECOND:
case LEFT_HYBRIDHASH_BUILD_SECOND:
case RIGHT_HYBRIDHASH_BUILD_SECOND:
case FULL_OUTER_HYBRIDHASH_BUILD_SECOND:
addHybridHashCosts(secondInput, firstInput, driverCosts, costWeight);
break;
case HYBRIDHASH_BUILD_FIRST_CACHED:
Expand Down
Expand Up @@ -25,8 +25,12 @@
import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.operators.AbstractJoinDescriptor;
import org.apache.flink.optimizer.operators.HashFullOuterJoinBuildFirstDescriptor;
import org.apache.flink.optimizer.operators.HashFullOuterJoinBuildSecondDescriptor;
import org.apache.flink.optimizer.operators.HashLeftOuterJoinBuildFirstDescriptor;
import org.apache.flink.optimizer.operators.HashLeftOuterJoinBuildSecondDescriptor;
import org.apache.flink.optimizer.operators.HashRightOuterJoinBuildFirstDescriptor;
import org.apache.flink.optimizer.operators.HashRightOuterJoinBuildSecondDescriptor;
import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
import org.apache.flink.optimizer.operators.SortMergeFullOuterJoinDescriptor;
import org.apache.flink.optimizer.operators.SortMergeLeftOuterJoinDescriptor;
Expand Down Expand Up @@ -99,8 +103,10 @@ private List<OperatorDescriptorDual> createLeftOuterJoinDescriptors(JoinHint hin
case BROADCAST_HASH_SECOND:
list.add(new HashLeftOuterJoinBuildSecondDescriptor(this.keys1, this.keys2, true, false));
break;
case BROADCAST_HASH_FIRST:
case REPARTITION_HASH_FIRST:
list.add(new HashLeftOuterJoinBuildFirstDescriptor(this.keys1, this.keys2, false, true));
break;
case BROADCAST_HASH_FIRST:
default:
throw new CompilerException("Invalid join hint: " + hint + " for left outer join");
}
Expand All @@ -124,8 +130,10 @@ private List<OperatorDescriptorDual> createRightOuterJoinDescriptors(JoinHint hi
case BROADCAST_HASH_FIRST:
list.add(new HashRightOuterJoinBuildFirstDescriptor(this.keys1, this.keys2, true, false));
break;
case BROADCAST_HASH_SECOND:
case REPARTITION_HASH_SECOND:
list.add(new HashRightOuterJoinBuildSecondDescriptor(this.keys1, this.keys2, false, true));
break;
case BROADCAST_HASH_SECOND:
default:
throw new CompilerException("Invalid join hint: " + hint + " for right outer join");
}
Expand All @@ -142,10 +150,14 @@ private List<OperatorDescriptorDual> createFullOuterJoinDescriptors(JoinHint hin
case REPARTITION_SORT_MERGE:
list.add(new SortMergeFullOuterJoinDescriptor(this.keys1, this.keys2));
break;
case REPARTITION_HASH_FIRST:
list.add(new HashFullOuterJoinBuildFirstDescriptor(this.keys1, this.keys2));
break;
case REPARTITION_HASH_SECOND:
case BROADCAST_HASH_SECOND:
list.add(new HashFullOuterJoinBuildSecondDescriptor(this.keys1, this.keys2));
break;
case BROADCAST_HASH_FIRST:
case REPARTITION_HASH_FIRST:
case BROADCAST_HASH_SECOND:
default:
throw new CompilerException("Invalid join hint: " + hint + " for full outer join");
}
Expand Down
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.optimizer.operators;

import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.optimizer.dag.TwoInputNode;
import org.apache.flink.optimizer.dataproperties.LocalProperties;
import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
import org.apache.flink.optimizer.plan.Channel;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.runtime.operators.DriverStrategy;

import java.util.Collections;
import java.util.List;

public class HashFullOuterJoinBuildFirstDescriptor extends AbstractJoinDescriptor {

public HashFullOuterJoinBuildFirstDescriptor(FieldList keys1, FieldList keys2) {
super(keys1, keys2, false, false, true);
}

@Override
public DriverStrategy getStrategy() {
return DriverStrategy.FULL_OUTER_HYBRIDHASH_BUILD_FIRST;
}

@Override
protected List<LocalPropertiesPair> createPossibleLocalProperties() {
// all properties are possible
return Collections.singletonList(new LocalPropertiesPair(new RequestedLocalProperties(), new RequestedLocalProperties()));
}

@Override
public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
LocalProperties produced1, LocalProperties produced2) {
return true;
}

@Override
public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {

String nodeName = "FullOuterJoin("+node.getOperator().getName()+")";
return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2);
}

@Override
public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) {
return new LocalProperties();
}
}
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.optimizer.operators;

import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.optimizer.dag.TwoInputNode;
import org.apache.flink.optimizer.dataproperties.LocalProperties;
import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
import org.apache.flink.optimizer.plan.Channel;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.runtime.operators.DriverStrategy;

import java.util.Collections;
import java.util.List;

public class HashFullOuterJoinBuildSecondDescriptor extends AbstractJoinDescriptor {
public HashFullOuterJoinBuildSecondDescriptor(FieldList keys1, FieldList keys2) {
super(keys1, keys2, false, false, true);
}

@Override
public DriverStrategy getStrategy() {
return DriverStrategy.FULL_OUTER_HYBRIDHASH_BUILD_SECOND;
}

@Override
protected List<LocalPropertiesPair> createPossibleLocalProperties() {
// all properties are possible
return Collections.singletonList(new LocalPropertiesPair(new RequestedLocalProperties(), new RequestedLocalProperties()));
}

@Override
public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
LocalProperties produced1, LocalProperties produced2) {
return true;
}

@Override
public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {

String nodeName = "FullOuterJoin("+node.getOperator().getName()+")";
return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2);
}

@Override
public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) {
return new LocalProperties();
}
}
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.optimizer.operators;

import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.optimizer.dag.TwoInputNode;
import org.apache.flink.optimizer.dataproperties.LocalProperties;
import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
import org.apache.flink.optimizer.plan.Channel;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.runtime.operators.DriverStrategy;

import java.util.Collections;
import java.util.List;

public class HashLeftOuterJoinBuildFirstDescriptor extends AbstractJoinDescriptor{
public HashLeftOuterJoinBuildFirstDescriptor(FieldList keys1, FieldList keys2,
boolean broadcastSecondAllowed, boolean repartitionAllowed) {
super(keys1, keys2, false, broadcastSecondAllowed, repartitionAllowed);
}

@Override
public DriverStrategy getStrategy() {
return DriverStrategy.LEFT_HYBRIDHASH_BUILD_FIRST;
}

@Override
protected List<OperatorDescriptorDual.LocalPropertiesPair> createPossibleLocalProperties() {
// all properties are possible
return Collections.singletonList(new OperatorDescriptorDual.LocalPropertiesPair(new RequestedLocalProperties(), new RequestedLocalProperties()));
}

@Override
public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
LocalProperties produced1, LocalProperties produced2) {
return true;
}

@Override
public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {

String nodeName = "LeftOuterJoin("+node.getOperator().getName()+")";
return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2);
}

@Override
public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) {
return new LocalProperties();
}
}
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.optimizer.operators;

import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.optimizer.dag.TwoInputNode;
import org.apache.flink.optimizer.dataproperties.LocalProperties;
import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
import org.apache.flink.optimizer.plan.Channel;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.runtime.operators.DriverStrategy;

import java.util.Collections;
import java.util.List;

public class HashRightOuterJoinBuildSecondDescriptor extends AbstractJoinDescriptor {
public HashRightOuterJoinBuildSecondDescriptor(FieldList keys1, FieldList keys2,
boolean broadcastFirstAllowed, boolean repartitionAllowed) {
super(keys1, keys2, broadcastFirstAllowed, false, repartitionAllowed);
}

@Override
public DriverStrategy getStrategy() {
return DriverStrategy.RIGHT_HYBRIDHASH_BUILD_SECOND;
}

@Override
protected List<LocalPropertiesPair> createPossibleLocalProperties() {
// all properties are possible
return Collections.singletonList(new LocalPropertiesPair(new RequestedLocalProperties(), new RequestedLocalProperties()));
}

@Override
public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
LocalProperties produced1, LocalProperties produced2) {
return true;
}

@Override
public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {

String nodeName = "RightOuterJoin("+node.getOperator().getName()+")";
return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2);
}

@Override
public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) {
return new LocalProperties();
}
}

0 comments on commit eaf540f

Please sign in to comment.