Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-21150][table-planner-blink] Introduce ExecEdge to connect two ExecNodes #14827

Merged
merged 1 commit into from
Feb 1, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.plan.nodes.exec;

import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.transformations.ShuffleMode;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.types.logical.LogicalType;

import java.util.Arrays;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

/** The representation of an edge connecting two {@link ExecNode}. */
public class ExecEdge {
/** The source node of this edge. */
private final ExecNode<?> source;
/** The target node of this edge. */
private final ExecNode<?> target;
/** The {@link Shuffle} on this edge from source to target. */
private final Shuffle shuffle;
/** The {@link ShuffleMode} defines the data exchange mode on this edge. */
private final ShuffleMode shuffleMode;

public ExecEdge(
ExecNode<?> source, ExecNode<?> target, Shuffle shuffle, ShuffleMode shuffleMode) {
this.source = checkNotNull(source);
this.target = checkNotNull(target);
this.shuffle = checkNotNull(shuffle);
this.shuffleMode = checkNotNull(shuffleMode);

// TODO once FLINK-21224 [Remove BatchExecExchange and StreamExecExchange, and replace their
// functionality with ExecEdge] is finished, we should remove the following validation.
if (shuffle.getType() != Shuffle.Type.FORWARD) {
throw new TableException("Only FORWARD shuffle is supported now.");
}
if (shuffleMode != ShuffleMode.PIPELINED) {
throw new TableException("Only PIPELINED shuffle mode is supported now.");
}
}

public ExecNode<?> getSource() {
return source;
}

public ExecNode<?> getTarget() {
return target;
}

public Shuffle getShuffle() {
return shuffle;
}

public ShuffleMode getShuffleMode() {
return shuffleMode;
}

/** Returns the output {@link LogicalType} of the data passing this edge. */
public LogicalType getOutputType() {
return source.getOutputType();
}

@Override
public String toString() {
return "ExecEdge{"
+ "source="
+ source.getDescription()
+ ", target="
+ target.getDescription()
+ ", shuffle="
+ shuffle
+ ", shuffleMode="
+ shuffleMode
+ '}';
}

public static Builder builder() {
return new Builder();
}

/** Builder of the {@link ExecEdge}. */
public static class Builder {
private ExecNode<?> source;
private ExecNode<?> target;
private Shuffle shuffle = FORWARD_SHUFFLE;
private ShuffleMode shuffleMode = ShuffleMode.PIPELINED;

public Builder source(ExecNode<?> source) {
this.source = source;
return this;
}

public Builder target(ExecNode<?> target) {
this.target = target;
return this;
}

public Builder shuffle(Shuffle shuffle) {
this.shuffle = shuffle;
return this;
}

public Builder requiredDistribution(
InputProperty.RequiredDistribution requiredDistribution) {
return shuffle(fromRequiredDistribution(requiredDistribution));
}

public Builder shuffleMode(ShuffleMode shuffleMode) {
this.shuffleMode = shuffleMode;
return this;
}

public ExecEdge build() {
return new ExecEdge(source, target, shuffle, shuffleMode);
}

private Shuffle fromRequiredDistribution(
InputProperty.RequiredDistribution requiredDistribution) {
switch (requiredDistribution.getType()) {
case ANY:
return ANY_SHUFFLE;
case SINGLETON:
return SINGLETON_SHUFFLE;
case BROADCAST:
return BROADCAST_SHUFFLE;
case HASH:
InputProperty.HashDistribution hashDistribution =
(InputProperty.HashDistribution) requiredDistribution;
return hashShuffle(hashDistribution.getKeys());
default:
throw new TableException(
"Unsupported RequiredDistribution type: "
+ requiredDistribution.getType());
}
}
}

/** The shuffle for records when passing this edge. */
public abstract static class Shuffle {
private final Type type;

protected Shuffle(Type type) {
this.type = type;
}

public Type getType() {
return type;
}

@Override
public String toString() {
return type.name();
}

/** Enumeration which describes the shuffle type for records when passing this edge. */
public enum Type {
/** Any type of shuffle is OK when passing through this edge. */
ANY,

/** Records are shuffle by hash when passing through this edge. */
HASH,

/** Full records are provided for each parallelism of the target node. */
BROADCAST,

/** The parallelism of the target node must be 1. */
SINGLETON,

/** Records are shuffled in same parallelism (function call). */
FORWARD
}
}

/** Records are shuffle by hash when passing through this edge. */
public static class HashShuffle extends Shuffle {
private final int[] keys;

public HashShuffle(int[] keys) {
super(Type.HASH);
this.keys = checkNotNull(keys);
checkArgument(keys.length > 0, "Hash keys must no be empty.");
}

public int[] getKeys() {
return keys;
}

@Override
public String toString() {
return "HASH" + Arrays.toString(keys);
}
}

/** Any type of shuffle is OK when passing through this edge. */
public static final Shuffle ANY_SHUFFLE = new Shuffle(Shuffle.Type.ANY) {};

/** Full records are provided for each parallelism of the target node. */
public static final Shuffle BROADCAST_SHUFFLE = new Shuffle(Shuffle.Type.BROADCAST) {};

/** The parallelism of the target node must be 1. */
public static final Shuffle SINGLETON_SHUFFLE = new Shuffle(Shuffle.Type.SINGLETON) {};

/** Records are shuffled in same parallelism (function call). */
public static final Shuffle FORWARD_SHUFFLE = new Shuffle(Shuffle.Type.FORWARD) {};

/**
* Return hash {@link Shuffle}.
*
* @param keys hash keys
*/
public static Shuffle hashShuffle(int[] keys) {
return new HashShuffle(keys);
}

/**
* Translates this edge into a Flink operator.
*
* @param planner The {@link Planner} of the translated Table.
*/
public Transformation<?> translateToPlan(Planner planner) {
return source.translateToPlan(planner);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,29 +49,37 @@ public interface ExecNode<T> {
LogicalType getOutputType();

/**
* Returns a list of this node's input nodes. If there are no inputs, returns an empty list, not
* null.
* Returns a list of this node's input properties.
*
* @return List of this node's input nodes
* <p>NOTE: If there are no inputs, returns an empty list, not null.
*
* @return List of this node's input properties.
*/
List<ExecNode<?>> getInputNodes();
List<InputProperty> getInputProperties();

/**
* Returns a list of this node's input properties. If there are no inputs, returns an empty
* list, not null.
* Returns a list of this node's input {@link ExecEdge}s.
*
* @return List of this node's input properties
* <p>NOTE: If there are no inputs, returns an empty list, not null.
*/
List<InputProperty> getInputProperties();
List<ExecEdge> getInputEdges();

/**
* Sets the input {@link ExecEdge}s which connect this nodes and its input nodes.
*
* <p>NOTE: If there are no inputs, the given inputEdges should be empty, not null.
*
* @param inputEdges the input {@link ExecEdge}s.
*/
void setInputEdges(List<ExecEdge> inputEdges);

/**
* Replaces the <code>ordinalInParent</code><sup>th</sup> input. Once we introduce source node
* and target node for {@link InputProperty}, we will remove this method.
* Replaces the <code>ordinalInParent</code><sup>th</sup> input edge.
*
* @param ordinalInParent Position of the child input, 0 is the first
* @param newInputNode New node that should be put at position ordinalInParent
* @param index Position of the child input edge, 0 is the first.
* @param newInputEdge New edge that should be put at position `index`.
*/
void replaceInputNode(int ordinalInParent, ExecNode<?> newInputNode);
void replaceInputEdge(int index, ExecEdge newInputEdge);

/**
* Translates this node into a Flink operator.
Expand All @@ -85,7 +93,7 @@ public interface ExecNode<T> {
/**
* Accepts a visit from a {@link ExecNodeVisitor}.
*
* @param visitor ExecNodeVisitor
* @param visitor ExecNodeVisitor.
*/
void accept(ExecNodeVisitor visitor);
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ public abstract class ExecNodeBase<T> implements ExecNode<T> {
private final String description;
private final List<InputProperty> inputProperties;
private final LogicalType outputType;
// TODO remove this field once edge support `source` and `target`,
// and then we can get/set `inputNodes` through `inputEdges`.
private List<ExecNode<?>> inputNodes;
private List<ExecEdge> inputEdges;

private transient Transformation<T> transformation;

Expand All @@ -65,27 +63,28 @@ public LogicalType getOutputType() {
}

@Override
public List<ExecNode<?>> getInputNodes() {
checkNotNull(inputNodes, "inputNodes should not be null, please call setInputNodes first.");
return inputNodes;
public List<InputProperty> getInputProperties() {
return inputProperties;
}

@Override
public List<InputProperty> getInputProperties() {
return inputProperties;
public List<ExecEdge> getInputEdges() {
return checkNotNull(
inputEdges,
"inputEdges should not null, please call `setInputEdges(List<ExecEdge>)` first.");
}

// TODO remove this method once edge support `source` and `target`,
// and then we can get/set `inputNodes` through `inputEdges`.
public void setInputNodes(List<ExecNode<?>> inputNodes) {
checkArgument(checkNotNull(inputNodes).size() == inputProperties.size());
this.inputNodes = new ArrayList<>(inputNodes);
@Override
public void setInputEdges(List<ExecEdge> inputEdges) {
checkNotNull(inputEdges, "inputEdges should not be null.");
this.inputEdges = new ArrayList<>(inputEdges);
}

@Override
public void replaceInputNode(int ordinalInParent, ExecNode<?> newInputNode) {
checkArgument(ordinalInParent >= 0 && ordinalInParent < inputNodes.size());
inputNodes.set(ordinalInParent, newInputNode);
public void replaceInputEdge(int index, ExecEdge newInputEdge) {
List<ExecEdge> edges = getInputEdges();
checkArgument(index >= 0 && index < edges.size());
edges.set(index, newInputEdge);
}

public Transformation<T> translateToPlan(Planner planner) {
Expand All @@ -105,7 +104,8 @@ public void accept(ExecNodeVisitor visitor) {

/** Whether there is singleton exchange node as input. */
protected boolean inputsContainSingleton() {
return getInputNodes().stream()
return getInputEdges().stream()
.map(ExecEdge::getSource)
.anyMatch(
i ->
i instanceof CommonExecExchange
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,12 @@ private ExecNode<?> generate(FlinkPhysicalRel rel) {
}

execNode = rel.translateToExecNode();
// connects the input/output nodes
((ExecNodeBase<?>) execNode).setInputNodes(inputNodes);
// connects the input nodes
List<ExecEdge> inputEdges = new ArrayList<>(inputNodes.size());
for (ExecNode<?> inputNode : inputNodes) {
inputEdges.add(ExecEdge.builder().source(inputNode).target(execNode).build());
}
execNode.setInputEdges(inputEdges);

visitedRels.put(rel, execNode);
return execNode;
Expand Down