Skip to content

Commit

Permalink
[FLINK-11896] [table-planner-blink] Introduce stream physical nodes (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
godfreyhe authored and HuangZhenQiu committed Apr 19, 2019
1 parent bdbb922 commit 2741423
Show file tree
Hide file tree
Showing 51 changed files with 2,848 additions and 142 deletions.
@@ -0,0 +1,41 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.plan;

/**
* Enumerations for partial final aggregate types.
*
* @see org.apache.flink.table.plan.rules.physical.stream.SplitAggregateRule
*/
public enum PartialFinalType {
/**
* partial aggregate type represents partial-aggregation,
* which produces a partial distinct aggregated result based on group key and bucket number.
*/
PARTIAL,
/**
* final aggregate type represents final-aggregation,
* which produces final result based on the partially distinct aggregated result.
*/
FINAL,
/**
* the aggregate which has not been split.
*/
NONE
}
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.flink.table.api

import org.apache.flink.configuration.{Configuration, GlobalConfiguration}

import _root_.java.util.TimeZone
import _root_.java.math.MathContext

Expand Down Expand Up @@ -47,6 +49,11 @@ class TableConfig {
*/
private var maxGeneratedCodeLength: Int = 64000 // just an estimate

/**
* Defines user-defined configuration
*/
private var conf = GlobalConfiguration.loadConfiguration()

/**
* Sets the timezone for date/time/timestamp conversions.
*/
Expand Down Expand Up @@ -104,6 +111,19 @@ class TableConfig {
}
this.maxGeneratedCodeLength = maxGeneratedCodeLength
}

/**
* Returns user-defined configuration
*/
def getConf: Configuration = conf

/**
* Sets user-defined configuration
*/
def setConf(conf: Configuration): Unit = {
this.conf = GlobalConfiguration.loadConfiguration()
this.conf.addAll(conf)
}
}

object TableConfig {
Expand Down
Expand Up @@ -355,6 +355,11 @@ object FlinkTypeFactory {
case _ => false
}

def isRowtimeIndicatorType(relDataType: RelDataType): Boolean = relDataType match {
case ti: TimeIndicatorRelDataType if ti.isEventTime => true
case _ => false
}

def toInternalType(relDataType: RelDataType): InternalType = relDataType.getSqlTypeName match {
case BOOLEAN => InternalTypes.BOOLEAN
case TINYINT => InternalTypes.BYTE
Expand Down
Expand Up @@ -32,12 +32,12 @@ final class LogicalWatermarkAssigner(
cluster: RelOptCluster,
traits: RelTraitSet,
input: RelNode,
rowtimeField: String,
watermarkOffset: Long)
extends WatermarkAssigner(cluster, traits, input, rowtimeField, watermarkOffset) {
rowtimeFieldIndex: Option[Int],
watermarkDelay: Option[Long])
extends WatermarkAssigner(cluster, traits, input, rowtimeFieldIndex, watermarkDelay) {

override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): RelNode = {
new LogicalWatermarkAssigner(cluster, traits, inputs.get(0), rowtimeField, watermarkOffset)
new LogicalWatermarkAssigner(cluster, traits, inputs.get(0), rowtimeFieldIndex, watermarkDelay)
}
}

Expand Up @@ -109,9 +109,9 @@ abstract class Rank(
}.mkString(", ")
super.explainTerms(pw)
.item("rankFunction", rankFunction)
.item("partitionBy", partitionKey.map(i => s"$$$i").mkString(","))
.item("orderBy", Rank.sortFieldsToString(sortCollation))
.item("rankRange", rankRange.toString())
.item("partitionBy", partitionKey.map(i => s"$$$i").mkString(","))
.item("orderBy", RelExplainUtil.collationToString(sortCollation))
.item("select", select)
}

Expand Down Expand Up @@ -168,24 +168,3 @@ case class VariableRankRange(rankEndIndex: Int) extends RankRange {
s"rankEnd=$$$rankEndIndex"
}
}

object Rank {
def sortFieldsToString(collationSort: RelCollation): String = {
val fieldCollations = collationSort.getFieldCollations
.map(c => (c.getFieldIndex, FlinkRelOptUtil.directionToOrder(c.getDirection)))

fieldCollations.map {
case (index, order) => s"$$$index ${order.getShortName}"
}.mkString(", ")
}

def sortFieldsToString(collationSort: RelCollation, inputType: RelDataType): String = {
val fieldCollations = collationSort.getFieldCollations
.map(c => (c.getFieldIndex, FlinkRelOptUtil.directionToOrder(c.getDirection)))
val inputFieldNames = inputType.getFieldNames

fieldCollations.map {
case (index, order) => s"${inputFieldNames.get(index)} ${order.getShortName}"
}.mkString(", ")
}
}
Expand Up @@ -32,21 +32,21 @@ import scala.collection.JavaConversions._
abstract class WatermarkAssigner(
cluster: RelOptCluster,
traits: RelTraitSet,
inputNode: RelNode,
val rowtimeField: String,
val watermarkOffset: Long)
extends SingleRel(cluster, traits, inputNode) {
inputRel: RelNode,
val rowtimeFieldIndex: Option[Int],
val watermarkDelay: Option[Long])
extends SingleRel(cluster, traits, inputRel) {

override def deriveRowType(): RelDataType = {
val inputRowType = inputNode.getRowType
val inputRowType = inputRel.getRowType
val typeFactory = cluster.getTypeFactory.asInstanceOf[FlinkTypeFactory]

val newFieldList = inputRowType.getFieldList.map { f =>
if (f.getName.equals(rowtimeField)) {
val rowtimeIndicatorType = typeFactory.createRowtimeIndicatorType()
new RelDataTypeFieldImpl(rowtimeField, f.getIndex, rowtimeIndicatorType)
} else {
f
rowtimeFieldIndex match {
case Some(index) if f.getIndex == index =>
val rowtimeIndicatorType = typeFactory.createRowtimeIndicatorType()
new RelDataTypeFieldImpl(f.getName, f.getIndex, rowtimeIndicatorType)
case _ => f
}
}

Expand All @@ -57,8 +57,9 @@ abstract class WatermarkAssigner(

override def explainTerms(pw: RelWriter): RelWriter = {
super.explainTerms(pw)
.item("fields", getRowType.getFieldNames)
.item("rowtimeField", rowtimeField)
.item("watermarkOffset", watermarkOffset)
.item("fields", getRowType.getFieldNames.mkString(", "))
.itemIf("rowtimeField", getRowType.getFieldNames.get(rowtimeFieldIndex.getOrElse(0)),
rowtimeFieldIndex.isDefined)
.itemIf("watermarkDelay", watermarkDelay.getOrElse(0L), watermarkDelay.isDefined)
}
}
Expand Up @@ -21,7 +21,7 @@ package org.apache.flink.table.plan.nodes.common
import org.apache.flink.table.plan.`trait`.FlinkRelDistribution
import org.apache.flink.table.plan.cost.FlinkCost._
import org.apache.flink.table.plan.cost.FlinkCostFactory
import org.apache.flink.table.plan.nodes.FlinkRelNode
import org.apache.flink.table.plan.nodes.physical.FlinkPhysicalRel

import org.apache.calcite.plan.{RelOptCluster, RelOptCost, RelOptPlanner, RelTraitSet}
import org.apache.calcite.rel.core.Exchange
Expand All @@ -33,13 +33,13 @@ import scala.collection.JavaConverters._
/**
* Base class for flink [[Exchange]].
*/
abstract class CommonExchange(
abstract class CommonPhysicalExchange(
cluster: RelOptCluster,
traitSet: RelTraitSet,
relNode: RelNode,
relDistribution: RelDistribution)
extends Exchange(cluster, traitSet, relNode, relDistribution)
with FlinkRelNode {
with FlinkPhysicalRel {

override def computeSelfCost(planner: RelOptPlanner, mq: RelMetadataQuery): RelOptCost = {
val inputRows = mq.getRowCount(input)
Expand Down
@@ -0,0 +1,78 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.table.plan.nodes.common

import org.apache.flink.table.plan.FlinkJoinRelType
import org.apache.flink.table.plan.nodes.physical.FlinkPhysicalRel
import org.apache.flink.table.plan.util.{FlinkRelOptUtil, RelExplainUtil}

import org.apache.calcite.rel.RelWriter
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeField}
import org.apache.calcite.rel.core.{Join, SemiJoin}
import org.apache.calcite.sql.validate.SqlValidatorUtil
import org.apache.calcite.util.mapping.IntPair

import java.util
import java.util.Collections

import scala.collection.JavaConversions._

/**
* Base physical class for flink [[Join]].
*/
trait CommonPhysicalJoin extends Join with FlinkPhysicalRel {

lazy val (joinInfo, filterNulls) = {
val filterNulls = new util.ArrayList[java.lang.Boolean]
val joinInfo = FlinkRelOptUtil.createJoinInfo(getLeft, getRight, getCondition, filterNulls)
(joinInfo, filterNulls.map(_.booleanValue()).toArray)
}

lazy val keyPairs: List[IntPair] = joinInfo.pairs.toList

// TODO supports FlinkJoinRelType.ANTI
lazy val flinkJoinType: FlinkJoinRelType = this match {
case sj: SemiJoin => FlinkJoinRelType.SEMI
case j: Join => FlinkJoinRelType.toFlinkJoinRelType(getJoinType)
case _ => throw new IllegalArgumentException(s"Illegal join node: ${this.getRelTypeName}")
}

lazy val inputRowType: RelDataType = this match {
case sj: SemiJoin =>
// Combines inputs' RowType, the result is different from SemiJoin's RowType.
SqlValidatorUtil.deriveJoinRowType(
sj.getLeft.getRowType,
sj.getRight.getRowType,
getJoinType,
sj.getCluster.getTypeFactory,
null,
Collections.emptyList[RelDataTypeField]
)
case j: Join => getRowType
case _ => throw new IllegalArgumentException(s"Illegal join node: ${this.getRelTypeName}")
}

override def explainTerms(pw: RelWriter): RelWriter = {
pw.input("left", getLeft).input("right", getRight)
.item("joinType", RelExplainUtil.joinTypeToString(flinkJoinType))
.item("where",
RelExplainUtil.expressionToString(getCondition, inputRowType, getExpressionString))
.item("select", getRowType.getFieldNames.mkString(", "))
}
}
Expand Up @@ -19,6 +19,7 @@ package org.apache.flink.table.plan.nodes.logical

import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.calcite.{LogicalRank, Rank, RankRange}
import org.apache.flink.table.plan.util.RelExplainUtil

import org.apache.calcite.plan._
import org.apache.calcite.rel.`type`.RelDataType
Expand Down Expand Up @@ -60,7 +61,7 @@ class FlinkLogicalRank(
pw.item("input", getInput)
.item("rankFunction", rankFunction)
.item("partitionBy", partitionKey.map(inputFieldNames.get(_)).mkString(","))
.item("orderBy", Rank.sortFieldsToString(sortCollation, input.getRowType))
.item("orderBy", RelExplainUtil.collationToString(sortCollation, input.getRowType))
.item("rankRange", rankRange.toString(inputFieldNames))
.item("select", getRowType.getFieldNames.mkString(", "))
}
Expand Down
Expand Up @@ -35,16 +35,16 @@ class FlinkLogicalWatermarkAssigner(
cluster: RelOptCluster,
traits: RelTraitSet,
input: RelNode,
rowtimeField: String,
watermarkOffset: Long)
extends WatermarkAssigner(cluster, traits, input, rowtimeField, watermarkOffset)
rowtimeFieldIndex: Option[Int],
watermarkDelay: Option[Long])
extends WatermarkAssigner(cluster, traits, input, rowtimeFieldIndex, watermarkDelay)
with FlinkLogicalRel {

override def copy(
traitSet: RelTraitSet,
inputs: util.List[RelNode]): RelNode = {
new FlinkLogicalWatermarkAssigner(
cluster, traitSet, inputs.get(0), rowtimeField, watermarkOffset)
cluster, traitSet, inputs.get(0), rowtimeFieldIndex, watermarkDelay)
}

}
Expand All @@ -60,8 +60,8 @@ class FlinkLogicalWatermarkAssignerConverter extends ConverterRule(
val newInput = RelOptRule.convert(watermark.getInput, FlinkConventions.LOGICAL)
FlinkLogicalWatermarkAssigner.create(
newInput,
watermark.rowtimeField,
watermark.watermarkOffset)
watermark.rowtimeFieldIndex,
watermark.watermarkDelay)
}
}

Expand All @@ -70,12 +70,10 @@ object FlinkLogicalWatermarkAssigner {

def create(
input: RelNode,
rowtimeField: String,
watermarkOffset: Long): FlinkLogicalWatermarkAssigner = {
rowtimeFieldIndex: Option[Int],
watermarkDelay: Option[Long]): FlinkLogicalWatermarkAssigner = {
val cluster = input.getCluster
val traitSet = cluster.traitSet().replace(FlinkConventions.LOGICAL).simplify()
new FlinkLogicalWatermarkAssigner(cluster, traitSet, input, rowtimeField, watermarkOffset)
new FlinkLogicalWatermarkAssigner(cluster, traitSet, input, rowtimeFieldIndex, watermarkDelay)
}
}


Expand Up @@ -18,7 +18,7 @@

package org.apache.flink.table.plan.nodes.physical.batch

import org.apache.flink.table.plan.nodes.common.CommonExchange
import org.apache.flink.table.plan.nodes.common.CommonPhysicalExchange

import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
import org.apache.calcite.rel.{RelDistribution, RelNode}
Expand Down Expand Up @@ -76,7 +76,7 @@ class BatchExecExchange(
traitSet: RelTraitSet,
inputRel: RelNode,
relDistribution: RelDistribution)
extends CommonExchange(cluster, traitSet, inputRel, relDistribution)
extends CommonPhysicalExchange(cluster, traitSet, inputRel, relDistribution)
with BatchPhysicalRel {

override def copy(
Expand Down

0 comments on commit 2741423

Please sign in to comment.