Skip to content

Commit

Permalink
Align the refactoring design of FLINK-11884
Browse files Browse the repository at this point in the history
  • Loading branch information
sunjincheng121 committed Mar 28, 2019
1 parent 8803dd2 commit 3d33ad4
Show file tree
Hide file tree
Showing 3 changed files with 139 additions and 100 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ import org.apache.flink.table.util.JavaScalaConversionUtil.toJava

import _root_.scala.collection.JavaConversions._
import _root_.scala.collection.JavaConverters._
import _root_.scala.collection.mutable.ListBuffer

/**
* The implementation of the [[Table]].
Expand Down Expand Up @@ -414,22 +413,6 @@ class TableImpl(
new OverWindowedTableImpl(this, overWindows)
}

/**
* Registers an unique table name under the table environment
* and return the registered table name.
*/
override def toString: String = {
if (tableName == null) {
tableName = "UnnamedTable$" + tableEnv.attrNameCntr.getAndIncrement()
tableEnv.registerTable(tableName, this)
}
tableName
}

private def wrap(operation: TableOperation): Table = {
new TableImpl(tableEnv, operation)
}

override def addColumns(fields: String): Table = {
addColumns(ExpressionParser.parseExpressionList(fields): _*);
}
Expand All @@ -443,7 +426,6 @@ class TableImpl(
}

override def addColumns(replaceIfExist: Boolean, fields: Expression*): Table = {

val expressionsWithResolvedCalls = fields.map(_.accept(callResolver)).asJava
val extracted = extractAggregationsAndProperties(
expressionsWithResolvedCalls,
Expand All @@ -456,98 +438,40 @@ class TableImpl(
s"The added field expression cannot be an aggregation, find [${aggNames.head}].")
}

val childFields = operationTree
.asInstanceOf[LogicalNode].output.map(a => UnresolvedFieldReference(a.name))

if (replaceIfExist) {

val finalFields = new ListBuffer[Expression]()
val addFields = fields.map(tableEnv.expressionBridge.bridge)
childFields.foreach(e => finalFields.append(e))

// replace field if exist.
addFields.foreach {
case e@Alias(_, name, _) =>
val index = finalFields.indexWhere(p => p match {
case u: UnresolvedFieldReference => u.name.equals(name)
case a: Alias => a.name.equals(name)
case _ => false
})
if (index >= 0) {
finalFields(index) = e
} else {
finalFields.append(e)
}
case e =>
throw new TableException(
s"Should add an alias to the [$e], if replaceIfExist is true.")
}
select(finalFields: _*)
} else {
select(childFields ++ fields:_*)
}
wrap(operationTreeBuilder.addColumn(
replaceIfExist, expressionsWithResolvedCalls, operationTree))
}

override def renameColumns(fields: String): Table = {
renameColumns(ExpressionParser.parseExpressionList(fields): _*)
}

override def renameColumns(fields: Expression*): Table = {

val childFields = operationTree
.asInstanceOf[LogicalNode].output.map(a => UnresolvedFieldReference(a.name))

val finalFields = childFields.map(e => e.asInstanceOf[Expression]).toArray
val renameFields = fields.map(tableEnv.expressionBridge.bridge)

// Rename existing fields
renameFields.foreach {
case e@Alias(child: UnresolvedFieldReference, _, _) =>
val index = finalFields.indexWhere(p => p match {
case u: UnresolvedFieldReference => u.name.equals(child.name)
case _ => false
})
if (index >= 0) {
finalFields(index) = e
} else {
throw new TableException(s"Rename field [${child.name}] does not exist in source table.")
}
case e =>
throw new TableException(
s"Unexpected field expression type [$e]. " +
s"Renaming must add an alias to the original field, e.g., a as a1.")
}
select(finalFields: _*)
wrap(operationTreeBuilder.renameColumn(fields, operationTree))
}

override def dropColumns(fields: String): Table = {
dropColumns(ExpressionParser.parseExpressionList(fields): _*)
}

override def dropColumns(fields: Expression*): Table = {
val childFields = operationTree
.asInstanceOf[LogicalNode].output.map(a => UnresolvedFieldReference(a.name))
val dropFields = fields.map(tableEnv.expressionBridge.bridge)

val finalFields = childFields.toBuffer

// Remove the fields which should be deleted in the final list
dropFields.distinct.foreach {
case UnresolvedFieldReference(name) =>
val index = finalFields.indexWhere(p => p match {
case u: UnresolvedFieldReference => u.name.equalsIgnoreCase(name)
case _ => false
})

if (index >= 0) {
finalFields.remove(index)
} else {
throw new TableException(s"Drop field [$name] does not exist in source table.")
}
case e =>
throw new TableException(s"Unexpected field expression type [$e].")
wrap(operationTreeBuilder.dropColumn(fields, operationTree))
}

/**
* Registers an unique table name under the table environment
* and return the registered table name.
*/
override def toString: String = {
if (tableName == null) {
tableName = "UnnamedTable$" + tableEnv.attrNameCntr.getAndIncrement()
tableEnv.registerTable(tableName, this)
}
select(finalFields: _*)
tableName
}

private def wrap(operation: TableOperation): Table = {
new TableImpl(tableEnv, operation)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ import java.util.{Optional, List => JList, Map => JMap}

import org.apache.flink.api.java.operators.join.JoinType
import org.apache.flink.table.api._
import org.apache.flink.table.expressions.{Alias, Asc, Expression, ExpressionBridge, Ordering, PlannerExpression, UnresolvedAlias, WindowProperty}
import org.apache.flink.table.expressions.{Alias, Asc, Expression, ExpressionBridge, Ordering, PlannerExpression, UnresolvedAlias, UnresolvedFieldReference, WindowProperty}
import org.apache.flink.table.functions.utils.UserDefinedFunctionUtils
import org.apache.flink.table.operations.TableOperation
import org.apache.flink.table.plan.ProjectionTranslator.{expandProjectList, flattenExpression, resolveOverWindows}
import org.apache.flink.table.plan.logical._
import org.apache.flink.table.util.JavaScalaConversionUtil.toScala

import _root_.scala.collection.JavaConverters._
import _root_.scala.collection.mutable.ListBuffer

/**
* Builder for [[[Operation]] tree.
Expand All @@ -54,6 +55,121 @@ class OperationTreeBuilder(private val tableEnv: TableEnvironment) {
Project(convertedProjectList, childNode, explicitAlias).validate(tableEnv)
}

def addColumn(
replaceIfExist: Boolean,
fieldLists: JList[Expression],
child: TableOperation)
: TableOperation = {

val childNode = child.asInstanceOf[LogicalNode]
val childFields = childNode.output.map(a => UnresolvedFieldReference(a.name))

if (replaceIfExist) {
val finalFields = new ListBuffer[Expression]()
val addFields = fieldLists.asScala.map(expressionBridge.bridge)
childFields.foreach(e => finalFields.append(e))

// replace field if exist.
addFields.foreach {
case e@Alias(_, name, _) =>
val index = finalFields.indexWhere(
p => p match {
case u: UnresolvedFieldReference => u.name.equals(name)
case a: Alias => a.name.equals(name)
case _ => false
})
if (index >= 0) {
finalFields(index) = e
} else {
finalFields.append(e)
}
case e =>
throw new TableException(
s"Should add an alias to the [$e], if replaceIfExist is true.")
}

val convertedProjectList = finalFields
.map(expressionBridge.bridge)
.flatMap(expr => flattenExpression(expr, childNode, tableEnv))
.map(UnresolvedAlias).toList

Project(convertedProjectList, childNode, false).validate(tableEnv)
} else {

val convertedProjectList = (childFields ++ fieldLists.asScala)
.map(expressionBridge.bridge)
.flatMap(expr => flattenExpression(expr, childNode, tableEnv))
.map(UnresolvedAlias).toList

Project(convertedProjectList, childNode, false).validate(tableEnv)
}
}

def renameColumn(
fieldLists: JList[Expression],
child: TableOperation)
: TableOperation = {

val childNode = child.asInstanceOf[LogicalNode]
val childFields = childNode.output.map(a => UnresolvedFieldReference(a.name))

val finalFields = childFields.map(e => e.asInstanceOf[Expression]).toArray
val renameFields = fieldLists.asScala.map(tableEnv.expressionBridge.bridge)

// Rename existing fields
renameFields.foreach {
case e@Alias(child: UnresolvedFieldReference, _, _) =>
val index = finalFields.indexWhere(
p => p match {
case u: UnresolvedFieldReference => u.name.equals(child.name)
case _ => false
})
if (index >= 0) {
finalFields(index) = e
} else {
throw new TableException(s"Rename field [${child.name}] does not exist in source table.")
}
case e =>
throw new TableException(
s"Unexpected field expression type [$e]. " +
s"Renaming must add an alias to the original field, e.g., a as a1.")
}
val convertedProjectList = finalFields.map(f => UnresolvedAlias(expressionBridge.bridge(f)))

Project(convertedProjectList, childNode, false).validate(tableEnv)
}

def dropColumn(
fieldLists: JList[Expression],
child: TableOperation)
: TableOperation = {
val childNode = child.asInstanceOf[LogicalNode]
val childFields = childNode.output.map(a => UnresolvedFieldReference(a.name))
val dropFields = fieldLists.asScala.map(tableEnv.expressionBridge.bridge)

val finalFields = childFields.toBuffer

// Remove the fields which should be deleted in the final list
dropFields.distinct.foreach {
case UnresolvedFieldReference(name) =>
val index = finalFields.indexWhere(
p => p match {
case u: UnresolvedFieldReference => u.name.equalsIgnoreCase(name)
case _ => false
})

if (index >= 0) {
finalFields.remove(index)
} else {
throw new TableException(s"Drop field [$name] does not exist in source table.")
}
case e =>
throw new TableException(s"Unexpected field expression type [$e].")
}
val convertedProjectList = finalFields.map(f => UnresolvedAlias(expressionBridge.bridge(f)))
Project(convertedProjectList, childNode, false).validate(tableEnv)
}

def aggregate(
groupingExpressions: JList[Expression],
namedAggregates: JMap[Expression, String],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -365,20 +365,19 @@ class CalcITCase extends AbstractTestBase {

val result = t
.addColumns("concat(c, 'sunny') as kid")
.addColumns(row(1, "str").flatten())
.addColumns(true, concat('c, "_kid") as 'kid, concat('c, "kid") as 'kid)
.addColumns(true, concat('c, " is a kid") as 'kid)
.select('a, 'b, 'kid, 'c)
.addColumns(true, concat('c, " is a kid") as 'kid)
.select('a, 'b, 'c, 'kid)
.addColumns("'last'")
.addColumns('a + 2, 'b as 'b2)

result.addSink(new StreamITCase.StringSink[Row])
env.execute()

val expected = mutable.MutableList(
"1,1,Kevin,Kevin is a kid,last,3,1",
"2,2,Sunny,Sunny is a kid,last,4,2"
"1,1,Kevin,Kevin is a kid,1,str,last,3,1",
"2,2,Sunny,Sunny is a kid,1,str,last,4,2"
)
assertEquals(expected.sorted, StreamITCase.testResults.sorted)
}
Expand Down

0 comments on commit 3d33ad4

Please sign in to comment.