Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-2871] support outer join for hash join on build side. #1469

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -841,6 +841,7 @@ public <R> JoinOperatorSetsBase<T, R> leftOuterJoin(DataSet<R> other, JoinHint s
switch(strategy) {
case OPTIMIZER_CHOOSES:
case REPARTITION_SORT_MERGE:
case REPARTITION_HASH_FIRST:
case REPARTITION_HASH_SECOND:
case BROADCAST_HASH_SECOND:
return new JoinOperatorSetsBase<>(this, other, strategy, JoinType.LEFT_OUTER);
Expand Down Expand Up @@ -891,6 +892,7 @@ public <R> JoinOperatorSetsBase<T, R> rightOuterJoin(DataSet<R> other, JoinHint
case OPTIMIZER_CHOOSES:
case REPARTITION_SORT_MERGE:
case REPARTITION_HASH_FIRST:
case REPARTITION_HASH_SECOND:
case BROADCAST_HASH_FIRST:
return new JoinOperatorSetsBase<>(this, other, strategy, JoinType.RIGHT_OUTER);
default:
Expand Down Expand Up @@ -938,6 +940,8 @@ public <R> JoinOperatorSetsBase<T, R> fullOuterJoin(DataSet<R> other, JoinHint s
switch(strategy) {
case OPTIMIZER_CHOOSES:
case REPARTITION_SORT_MERGE:
case REPARTITION_HASH_FIRST:
case REPARTITION_HASH_SECOND:
return new JoinOperatorSetsBase<>(this, other, strategy, JoinType.FULL_OUTER);
default:
throw new InvalidProgramException("Invalid JoinHint for FullOuterJoin: "+strategy);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public void testFullOuterStrategy2() {
this.testFullOuterStrategies(JoinHint.REPARTITION_SORT_MERGE);
}

@Test(expected = InvalidProgramException.class)
@Test
public void testFullOuterStrategy3() {
this.testFullOuterStrategies(JoinHint.REPARTITION_HASH_SECOND);
}
Expand All @@ -191,7 +191,7 @@ public void testFullOuterStrategy4() {
this.testFullOuterStrategies(JoinHint.BROADCAST_HASH_SECOND);
}

@Test(expected = InvalidProgramException.class)
@Test
public void testFullOuterStrategy5() {
this.testFullOuterStrategies(JoinHint.REPARTITION_HASH_FIRST);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ public void testLeftOuterStrategy4() {
this.testLeftOuterStrategies(JoinHint.BROADCAST_HASH_SECOND);
}

@Test(expected = InvalidProgramException.class)
@Test
public void testLeftOuterStrategy5() {
this.testLeftOuterStrategies(JoinHint.REPARTITION_HASH_FIRST);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ public void testRightOuterStrategy2() {
this.testRightOuterStrategies(JoinHint.REPARTITION_SORT_MERGE);
}

@Test(expected = InvalidProgramException.class)
@Test
public void testRightOuterStrategy3() {
this.testRightOuterStrategies(JoinHint.REPARTITION_HASH_SECOND);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,10 +202,14 @@ public void costOperator(PlanNode n) {
break;
case HYBRIDHASH_BUILD_FIRST:
case RIGHT_HYBRIDHASH_BUILD_FIRST:
case LEFT_HYBRIDHASH_BUILD_FIRST:
case FULL_OUTER_HYBRIDHASH_BUILD_FIRST:
addHybridHashCosts(firstInput, secondInput, driverCosts, costWeight);
break;
case HYBRIDHASH_BUILD_SECOND:
case LEFT_HYBRIDHASH_BUILD_SECOND:
case RIGHT_HYBRIDHASH_BUILD_SECOND:
case FULL_OUTER_HYBRIDHASH_BUILD_SECOND:
addHybridHashCosts(secondInput, firstInput, driverCosts, costWeight);
break;
case HYBRIDHASH_BUILD_FIRST_CACHED:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,12 @@
import org.apache.flink.optimizer.CompilerException;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.operators.AbstractJoinDescriptor;
import org.apache.flink.optimizer.operators.HashFullOuterJoinBuildFirstDescriptor;
import org.apache.flink.optimizer.operators.HashFullOuterJoinBuildSecondDescriptor;
import org.apache.flink.optimizer.operators.HashLeftOuterJoinBuildFirstDescriptor;
import org.apache.flink.optimizer.operators.HashLeftOuterJoinBuildSecondDescriptor;
import org.apache.flink.optimizer.operators.HashRightOuterJoinBuildFirstDescriptor;
import org.apache.flink.optimizer.operators.HashRightOuterJoinBuildSecondDescriptor;
import org.apache.flink.optimizer.operators.OperatorDescriptorDual;
import org.apache.flink.optimizer.operators.SortMergeFullOuterJoinDescriptor;
import org.apache.flink.optimizer.operators.SortMergeLeftOuterJoinDescriptor;
Expand Down Expand Up @@ -99,8 +103,10 @@ private List<OperatorDescriptorDual> createLeftOuterJoinDescriptors(JoinHint hin
case BROADCAST_HASH_SECOND:
list.add(new HashLeftOuterJoinBuildSecondDescriptor(this.keys1, this.keys2, true, false));
break;
case BROADCAST_HASH_FIRST:
case REPARTITION_HASH_FIRST:
list.add(new HashLeftOuterJoinBuildFirstDescriptor(this.keys1, this.keys2, false, true));
break;
case BROADCAST_HASH_FIRST:
default:
throw new CompilerException("Invalid join hint: " + hint + " for left outer join");
}
Expand All @@ -124,8 +130,10 @@ private List<OperatorDescriptorDual> createRightOuterJoinDescriptors(JoinHint hi
case BROADCAST_HASH_FIRST:
list.add(new HashRightOuterJoinBuildFirstDescriptor(this.keys1, this.keys2, true, false));
break;
case BROADCAST_HASH_SECOND:
case REPARTITION_HASH_SECOND:
list.add(new HashRightOuterJoinBuildSecondDescriptor(this.keys1, this.keys2, false, true));
break;
case BROADCAST_HASH_SECOND:
default:
throw new CompilerException("Invalid join hint: " + hint + " for right outer join");
}
Expand All @@ -142,10 +150,14 @@ private List<OperatorDescriptorDual> createFullOuterJoinDescriptors(JoinHint hin
case REPARTITION_SORT_MERGE:
list.add(new SortMergeFullOuterJoinDescriptor(this.keys1, this.keys2));
break;
case REPARTITION_HASH_FIRST:
list.add(new HashFullOuterJoinBuildFirstDescriptor(this.keys1, this.keys2));
break;
case REPARTITION_HASH_SECOND:
case BROADCAST_HASH_SECOND:
list.add(new HashFullOuterJoinBuildSecondDescriptor(this.keys1, this.keys2));
break;
case BROADCAST_HASH_FIRST:
case REPARTITION_HASH_FIRST:
case BROADCAST_HASH_SECOND:
default:
throw new CompilerException("Invalid join hint: " + hint + " for full outer join");
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.optimizer.operators;

import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.optimizer.dag.TwoInputNode;
import org.apache.flink.optimizer.dataproperties.LocalProperties;
import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
import org.apache.flink.optimizer.plan.Channel;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.runtime.operators.DriverStrategy;

import java.util.Collections;
import java.util.List;

public class HashFullOuterJoinBuildFirstDescriptor extends AbstractJoinDescriptor {

public HashFullOuterJoinBuildFirstDescriptor(FieldList keys1, FieldList keys2) {
super(keys1, keys2, false, false, true);
}

@Override
public DriverStrategy getStrategy() {
return DriverStrategy.FULL_OUTER_HYBRIDHASH_BUILD_FIRST;
}

@Override
protected List<LocalPropertiesPair> createPossibleLocalProperties() {
// all properties are possible
return Collections.singletonList(new LocalPropertiesPair(new RequestedLocalProperties(), new RequestedLocalProperties()));
}

@Override
public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
LocalProperties produced1, LocalProperties produced2) {
return true;
}

@Override
public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {

String nodeName = "FullOuterJoin("+node.getOperator().getName()+")";
return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2);
}

@Override
public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) {
return new LocalProperties();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.optimizer.operators;

import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.optimizer.dag.TwoInputNode;
import org.apache.flink.optimizer.dataproperties.LocalProperties;
import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
import org.apache.flink.optimizer.plan.Channel;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.runtime.operators.DriverStrategy;

import java.util.Collections;
import java.util.List;

public class HashFullOuterJoinBuildSecondDescriptor extends AbstractJoinDescriptor {
public HashFullOuterJoinBuildSecondDescriptor(FieldList keys1, FieldList keys2) {
super(keys1, keys2, false, false, true);
}

@Override
public DriverStrategy getStrategy() {
return DriverStrategy.FULL_OUTER_HYBRIDHASH_BUILD_SECOND;
}

@Override
protected List<LocalPropertiesPair> createPossibleLocalProperties() {
// all properties are possible
return Collections.singletonList(new LocalPropertiesPair(new RequestedLocalProperties(), new RequestedLocalProperties()));
}

@Override
public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
LocalProperties produced1, LocalProperties produced2) {
return true;
}

@Override
public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {

String nodeName = "FullOuterJoin("+node.getOperator().getName()+")";
return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2);
}

@Override
public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) {
return new LocalProperties();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.optimizer.operators;

import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.optimizer.dag.TwoInputNode;
import org.apache.flink.optimizer.dataproperties.LocalProperties;
import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
import org.apache.flink.optimizer.plan.Channel;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.runtime.operators.DriverStrategy;

import java.util.Collections;
import java.util.List;

public class HashLeftOuterJoinBuildFirstDescriptor extends AbstractJoinDescriptor{
public HashLeftOuterJoinBuildFirstDescriptor(FieldList keys1, FieldList keys2,
boolean broadcastSecondAllowed, boolean repartitionAllowed) {
super(keys1, keys2, false, broadcastSecondAllowed, repartitionAllowed);
}

@Override
public DriverStrategy getStrategy() {
return DriverStrategy.LEFT_HYBRIDHASH_BUILD_FIRST;
}

@Override
protected List<OperatorDescriptorDual.LocalPropertiesPair> createPossibleLocalProperties() {
// all properties are possible
return Collections.singletonList(new OperatorDescriptorDual.LocalPropertiesPair(new RequestedLocalProperties(), new RequestedLocalProperties()));
}

@Override
public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
LocalProperties produced1, LocalProperties produced2) {
return true;
}

@Override
public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {

String nodeName = "LeftOuterJoin("+node.getOperator().getName()+")";
return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2);
}

@Override
public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) {
return new LocalProperties();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.flink.optimizer.operators;

import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.optimizer.dag.TwoInputNode;
import org.apache.flink.optimizer.dataproperties.LocalProperties;
import org.apache.flink.optimizer.dataproperties.RequestedLocalProperties;
import org.apache.flink.optimizer.plan.Channel;
import org.apache.flink.optimizer.plan.DualInputPlanNode;
import org.apache.flink.runtime.operators.DriverStrategy;

import java.util.Collections;
import java.util.List;

public class HashRightOuterJoinBuildSecondDescriptor extends AbstractJoinDescriptor {
public HashRightOuterJoinBuildSecondDescriptor(FieldList keys1, FieldList keys2,
boolean broadcastFirstAllowed, boolean repartitionAllowed) {
super(keys1, keys2, broadcastFirstAllowed, false, repartitionAllowed);
}

@Override
public DriverStrategy getStrategy() {
return DriverStrategy.RIGHT_HYBRIDHASH_BUILD_SECOND;
}

@Override
protected List<LocalPropertiesPair> createPossibleLocalProperties() {
// all properties are possible
return Collections.singletonList(new LocalPropertiesPair(new RequestedLocalProperties(), new RequestedLocalProperties()));
}

@Override
public boolean areCoFulfilled(RequestedLocalProperties requested1, RequestedLocalProperties requested2,
LocalProperties produced1, LocalProperties produced2) {
return true;
}

@Override
public DualInputPlanNode instantiate(Channel in1, Channel in2, TwoInputNode node) {

String nodeName = "RightOuterJoin("+node.getOperator().getName()+")";
return new DualInputPlanNode(node, nodeName, in1, in2, getStrategy(), this.keys1, this.keys2);
}

@Override
public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperties in2) {
return new LocalProperties();
}
}