Skip to content

Commit

Permalink
[FLINK-21150][table-planner-blink] Introduce ExecEdge to connect two …
Browse files Browse the repository at this point in the history
…ExecNodes
  • Loading branch information
godfreyhe committed Feb 1, 2021
1 parent d859920 commit 4fe91df
Show file tree
Hide file tree
Showing 75 changed files with 722 additions and 303 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.planner.plan.nodes.exec;

import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.transformations.ShuffleMode;
import org.apache.flink.table.api.TableException;
import org.apache.flink.table.delegation.Planner;
import org.apache.flink.table.types.logical.LogicalType;

import java.util.Arrays;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

/** The representation of an edge connecting two {@link ExecNode}. */
public class ExecEdge {
/** The source node of this edge. */
private final ExecNode<?> source;
/** The target node of this edge. */
private final ExecNode<?> target;
/** The {@link Shuffle} on this edge from source to target. */
private final Shuffle shuffle;
/** The {@link ShuffleMode} defines the data exchange mode on this edge. */
private final ShuffleMode shuffleMode;

public ExecEdge(
ExecNode<?> source, ExecNode<?> target, Shuffle shuffle, ShuffleMode shuffleMode) {
this.source = checkNotNull(source);
this.target = checkNotNull(target);
this.shuffle = checkNotNull(shuffle);
this.shuffleMode = checkNotNull(shuffleMode);

// TODO once FLINK-21224 [Remove BatchExecExchange and StreamExecExchange, and replace their
// functionality with ExecEdge] is finished, we should remove the following validation.
if (shuffle.getType() != Shuffle.Type.FORWARD) {
throw new TableException("Only FORWARD shuffle is supported now.");
}
if (shuffleMode != ShuffleMode.PIPELINED) {
throw new TableException("Only PIPELINED shuffle mode is supported now.");
}
}

public ExecNode<?> getSource() {
return source;
}

public ExecNode<?> getTarget() {
return target;
}

public Shuffle getShuffle() {
return shuffle;
}

public ShuffleMode getShuffleMode() {
return shuffleMode;
}

/** Returns the output {@link LogicalType} of the data passing this edge. */
public LogicalType getOutputType() {
return source.getOutputType();
}

@Override
public String toString() {
return "ExecEdge{"
+ "source="
+ source.getDescription()
+ ", target="
+ target.getDescription()
+ ", shuffle="
+ shuffle
+ ", shuffleMode="
+ shuffleMode
+ '}';
}

public static Builder builder() {
return new Builder();
}

/** Builder of the {@link ExecEdge}. */
public static class Builder {
private ExecNode<?> source;
private ExecNode<?> target;
private Shuffle shuffle = FORWARD_SHUFFLE;
private ShuffleMode shuffleMode = ShuffleMode.PIPELINED;

public Builder source(ExecNode<?> source) {
this.source = source;
return this;
}

public Builder target(ExecNode<?> target) {
this.target = target;
return this;
}

public Builder shuffle(Shuffle shuffle) {
this.shuffle = shuffle;
return this;
}

public Builder requiredDistribution(
InputProperty.RequiredDistribution requiredDistribution) {
return shuffle(fromRequiredDistribution(requiredDistribution));
}

public Builder shuffleMode(ShuffleMode shuffleMode) {
this.shuffleMode = shuffleMode;
return this;
}

public ExecEdge build() {
return new ExecEdge(source, target, shuffle, shuffleMode);
}

private Shuffle fromRequiredDistribution(
InputProperty.RequiredDistribution requiredDistribution) {
switch (requiredDistribution.getType()) {
case ANY:
return ANY_SHUFFLE;
case SINGLETON:
return SINGLETON_SHUFFLE;
case BROADCAST:
return BROADCAST_SHUFFLE;
case HASH:
InputProperty.HashDistribution hashDistribution =
(InputProperty.HashDistribution) requiredDistribution;
return hashShuffle(hashDistribution.getKeys());
default:
throw new TableException(
"Unsupported RequiredDistribution type: "
+ requiredDistribution.getType());
}
}
}

/** The shuffle for records when passing this edge. */
public abstract static class Shuffle {
private final Type type;

protected Shuffle(Type type) {
this.type = type;
}

public Type getType() {
return type;
}

@Override
public String toString() {
return type.name();
}

/** Enumeration which describes the shuffle type for records when passing this edge. */
public enum Type {
/** Any type of shuffle is OK when passing through this edge. */
ANY,

/** Records are shuffle by hash when passing through this edge. */
HASH,

/** Full records are provided for each parallelism of the target node. */
BROADCAST,

/** The parallelism of the target node must be 1. */
SINGLETON,

/** Records are shuffled in same parallelism (function call). */
FORWARD
}
}

/** Records are shuffle by hash when passing through this edge. */
public static class HashShuffle extends Shuffle {
private final int[] keys;

public HashShuffle(int[] keys) {
super(Type.HASH);
this.keys = checkNotNull(keys);
checkArgument(keys.length > 0, "Hash keys must no be empty.");
}

public int[] getKeys() {
return keys;
}

@Override
public String toString() {
return "HASH" + Arrays.toString(keys);
}
}

/** Any type of shuffle is OK when passing through this edge. */
public static final Shuffle ANY_SHUFFLE = new Shuffle(Shuffle.Type.ANY) {};

/** Full records are provided for each parallelism of the target node. */
public static final Shuffle BROADCAST_SHUFFLE = new Shuffle(Shuffle.Type.BROADCAST) {};

/** The parallelism of the target node must be 1. */
public static final Shuffle SINGLETON_SHUFFLE = new Shuffle(Shuffle.Type.SINGLETON) {};

/** Records are shuffled in same parallelism (function call). */
public static final Shuffle FORWARD_SHUFFLE = new Shuffle(Shuffle.Type.FORWARD) {};

/**
* Return hash {@link Shuffle}.
*
* @param keys hash keys
*/
public static Shuffle hashShuffle(int[] keys) {
return new HashShuffle(keys);
}

/**
* Translates this edge into a Flink operator.
*
* @param planner The {@link Planner} of the translated Table.
*/
public Transformation<?> translateToPlan(Planner planner) {
return source.translateToPlan(planner);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -49,29 +49,37 @@ public interface ExecNode<T> {
LogicalType getOutputType();

/**
* Returns a list of this node's input nodes. If there are no inputs, returns an empty list, not
* null.
* Returns a list of this node's input properties.
*
* @return List of this node's input nodes
* <p>NOTE: If there are no inputs, returns an empty list, not null.
*
* @return List of this node's input properties.
*/
List<ExecNode<?>> getInputNodes();
List<InputProperty> getInputProperties();

/**
* Returns a list of this node's input properties. If there are no inputs, returns an empty
* list, not null.
* Returns a list of this node's input {@link ExecEdge}s.
*
* @return List of this node's input properties
* <p>NOTE: If there are no inputs, returns an empty list, not null.
*/
List<InputProperty> getInputProperties();
List<ExecEdge> getInputEdges();

/**
* Sets the input {@link ExecEdge}s which connect this nodes and its input nodes.
*
* <p>NOTE: If there are no inputs, the given inputEdges should be empty, not null.
*
* @param inputEdges the input {@link ExecEdge}s.
*/
void setInputEdges(List<ExecEdge> inputEdges);

/**
* Replaces the <code>ordinalInParent</code><sup>th</sup> input. Once we introduce source node
* and target node for {@link InputProperty}, we will remove this method.
* Replaces the <code>ordinalInParent</code><sup>th</sup> input edge.
*
* @param ordinalInParent Position of the child input, 0 is the first
* @param newInputNode New node that should be put at position ordinalInParent
* @param index Position of the child input edge, 0 is the first.
* @param newInputEdge New edge that should be put at position `index`.
*/
void replaceInputNode(int ordinalInParent, ExecNode<?> newInputNode);
void replaceInputEdge(int index, ExecEdge newInputEdge);

/**
* Translates this node into a Flink operator.
Expand All @@ -85,7 +93,7 @@ public interface ExecNode<T> {
/**
* Accepts a visit from a {@link ExecNodeVisitor}.
*
* @param visitor ExecNodeVisitor
* @param visitor ExecNodeVisitor.
*/
void accept(ExecNodeVisitor visitor);
}
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,7 @@ public abstract class ExecNodeBase<T> implements ExecNode<T> {
private final String description;
private final List<InputProperty> inputProperties;
private final LogicalType outputType;
// TODO remove this field once edge support `source` and `target`,
// and then we can get/set `inputNodes` through `inputEdges`.
private List<ExecNode<?>> inputNodes;
private List<ExecEdge> inputEdges;

private transient Transformation<T> transformation;

Expand All @@ -65,27 +63,28 @@ public LogicalType getOutputType() {
}

@Override
public List<ExecNode<?>> getInputNodes() {
checkNotNull(inputNodes, "inputNodes should not be null, please call setInputNodes first.");
return inputNodes;
public List<InputProperty> getInputProperties() {
return inputProperties;
}

@Override
public List<InputProperty> getInputProperties() {
return inputProperties;
public List<ExecEdge> getInputEdges() {
return checkNotNull(
inputEdges,
"inputEdges should not null, please call `setInputEdges(List<ExecEdge>)` first.");
}

// TODO remove this method once edge support `source` and `target`,
// and then we can get/set `inputNodes` through `inputEdges`.
public void setInputNodes(List<ExecNode<?>> inputNodes) {
checkArgument(checkNotNull(inputNodes).size() == inputProperties.size());
this.inputNodes = new ArrayList<>(inputNodes);
@Override
public void setInputEdges(List<ExecEdge> inputEdges) {
checkNotNull(inputEdges, "inputEdges should not be null.");
this.inputEdges = new ArrayList<>(inputEdges);
}

@Override
public void replaceInputNode(int ordinalInParent, ExecNode<?> newInputNode) {
checkArgument(ordinalInParent >= 0 && ordinalInParent < inputNodes.size());
inputNodes.set(ordinalInParent, newInputNode);
public void replaceInputEdge(int index, ExecEdge newInputEdge) {
List<ExecEdge> edges = getInputEdges();
checkArgument(index >= 0 && index < edges.size());
edges.set(index, newInputEdge);
}

public Transformation<T> translateToPlan(Planner planner) {
Expand All @@ -105,7 +104,8 @@ public void accept(ExecNodeVisitor visitor) {

/** Whether there is singleton exchange node as input. */
protected boolean inputsContainSingleton() {
return getInputNodes().stream()
return getInputEdges().stream()
.map(ExecEdge::getSource)
.anyMatch(
i ->
i instanceof CommonExecExchange
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,12 @@ private ExecNode<?> generate(FlinkPhysicalRel rel) {
}

execNode = rel.translateToExecNode();
// connects the input/output nodes
((ExecNodeBase<?>) execNode).setInputNodes(inputNodes);
// connects the input nodes
List<ExecEdge> inputEdges = new ArrayList<>(inputNodes.size());
for (ExecNode<?> inputNode : inputNodes) {
inputEdges.add(ExecEdge.builder().source(inputNode).target(execNode).build());
}
execNode.setInputEdges(inputEdges);

visitedRels.put(rel, execNode);
return execNode;
Expand Down

0 comments on commit 4fe91df

Please sign in to comment.