Skip to content

Commit

Permalink
[FLINK-1788] [table] Make logical plans transformable
Browse files Browse the repository at this point in the history
  • Loading branch information
aljoscha committed Apr 3, 2015
1 parent 2cbbd32 commit 2d55cf0
Show file tree
Hide file tree
Showing 65 changed files with 505 additions and 380 deletions.
6 changes: 3 additions & 3 deletions docs/linq.md
Expand Up @@ -69,9 +69,9 @@ can join to Tables:
{% highlight scala %} {% highlight scala %}
case class MyResult(a: String, b: Int) case class MyResult(a: String, b: Int)


val input1 = env.fromElements(...).as('a, 'b) val input1 = env.fromElements(...).toTable('a, 'b)
val input2 = env.fromElements(...).as('c, 'd) val input2 = env.fromElements(...).toTable('c, 'd)
val joined = input1.join(input2).where("b = a && d > 42").select("a, d").as[MyResult] val joined = input1.join(input2).where("b = a && d > 42").select("a, d").toSet[MyResult]
{% endhighlight %} {% endhighlight %}


Notice, how a DataSet can be converted to a Table by using `as` and specifying new Notice, how a DataSet can be converted to a Table by using `as` and specifying new
Expand Down
Expand Up @@ -28,33 +28,33 @@ import org.apache.flink.api.java.operators.JoinOperator.EquiJoin
import org.apache.flink.api.java.operators.Keys.ExpressionKeys import org.apache.flink.api.java.operators.Keys.ExpressionKeys
import org.apache.flink.api.java.operators.{GroupReduceOperator, Keys, MapOperator, UnsortedGrouping} import org.apache.flink.api.java.operators.{GroupReduceOperator, Keys, MapOperator, UnsortedGrouping}
import org.apache.flink.api.java.{DataSet => JavaDataSet} import org.apache.flink.api.java.{DataSet => JavaDataSet}
import org.apache.flink.api.table.analysis.ExtractEquiJoinFields import org.apache.flink.api.table.expressions.analysis.ExtractEquiJoinFields
import org.apache.flink.api.table.operations._ import org.apache.flink.api.table.plan._
import org.apache.flink.api.table.runtime.{ExpressionAggregateFunction, ExpressionFilterFunction, ExpressionJoinFunction, ExpressionSelectFunction} import org.apache.flink.api.table.runtime.{ExpressionAggregateFunction, ExpressionFilterFunction, ExpressionJoinFunction, ExpressionSelectFunction}
import org.apache.flink.api.table.tree._ import org.apache.flink.api.table.expressions._
import org.apache.flink.api.table.typeinfo.{RenameOperator, RenamingProxyTypeInfo, RowTypeInfo} import org.apache.flink.api.table.typeinfo.{RenameOperator, RenamingProxyTypeInfo, RowTypeInfo}
import org.apache.flink.api.table.{ExpressionException, Row, Table} import org.apache.flink.api.table.{ExpressionException, Row, Table}


/** /**
* [[TableTranslator]] for creating [[Table]]s from Java [[org.apache.flink.api.java.DataSet]]s and * [[PlanTranslator]] for creating [[Table]]s from Java [[org.apache.flink.api.java.DataSet]]s and
* translating them back to Java [[org.apache.flink.api.java.DataSet]]s. * translating them back to Java [[org.apache.flink.api.java.DataSet]]s.
*/ */
class JavaBatchTranslator extends TableTranslator { class JavaBatchTranslator extends PlanTranslator {


type Representation[A] = JavaDataSet[A] type Representation[A] = JavaDataSet[A]


override def createTable[A]( override def createTable[A](
repr: Representation[A], repr: Representation[A],
inputType: CompositeType[A], inputType: CompositeType[A],
expressions: Array[Expression], expressions: Array[Expression],
resultFields: Seq[(String, TypeInformation[_])]): Table[this.type] = { resultFields: Seq[(String, TypeInformation[_])]): Table = {


val rowDataSet = createSelect(expressions, repr, inputType) val rowDataSet = createSelect(expressions, repr, inputType)


Table(Root(rowDataSet, resultFields), this) Table(Root(rowDataSet, resultFields))
} }


override def translate[A](op: Operation)(implicit tpe: TypeInformation[A]): JavaDataSet[A] = { override def translate[A](op: PlanNode)(implicit tpe: TypeInformation[A]): JavaDataSet[A] = {


if (tpe.getTypeClass == classOf[Row]) { if (tpe.getTypeClass == classOf[Row]) {
// shortcut for DataSet[Row] // shortcut for DataSet[Row]
Expand Down Expand Up @@ -113,13 +113,14 @@ class JavaBatchTranslator extends TableTranslator {
operator operator
} }


private def translateInternal(op: Operation): JavaDataSet[Row] = { private def translateInternal(op: PlanNode): JavaDataSet[Row] = {
op match { op match {
case Root(dataSet: JavaDataSet[Row], resultFields) => case Root(dataSet: JavaDataSet[Row], resultFields) =>
dataSet dataSet


case Root(_, _) => case Root(_, _) =>
throw new ExpressionException("Invalid Root for JavaBatchTranslator: " + op) throw new ExpressionException("Invalid Root for JavaBatchTranslator: " + op + ". " +
"Did you try converting a Table based on a DataSet to a DataStream or vice-versa?")


case GroupBy(_, fields) => case GroupBy(_, fields) =>
throw new ExpressionException("Dangling GroupBy operation. Did you forget a " + throw new ExpressionException("Dangling GroupBy operation. Did you forget a " +
Expand Down
Expand Up @@ -23,38 +23,38 @@ import java.lang.reflect.Modifier
import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint
import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.table.operations._ import org.apache.flink.api.table.plan._
import org.apache.flink.api.table.runtime.{ExpressionFilterFunction, ExpressionSelectFunction} import org.apache.flink.api.table.runtime.{ExpressionFilterFunction, ExpressionSelectFunction}
import org.apache.flink.api.table.tree._ import org.apache.flink.api.table.expressions._
import org.apache.flink.api.table.typeinfo.RowTypeInfo import org.apache.flink.api.table.typeinfo.RowTypeInfo
import org.apache.flink.api.table.{ExpressionException, Row, Table} import org.apache.flink.api.table.{ExpressionException, Row, Table}
import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.datastream.DataStream
import org.apache.flink.streaming.api.invokable.operator.MapInvokable import org.apache.flink.streaming.api.invokable.operator.MapInvokable


/** /**
* [[TableTranslator]] for creating [[Table]]s from Java [[DataStream]]s and * [[PlanTranslator]] for creating [[Table]]s from Java [[DataStream]]s and
* translating them back to Java [[DataStream]]s. * translating them back to Java [[DataStream]]s.
* *
* This is very limited right now. Only select and filter are implemented. Also, the expression * This is very limited right now. Only select and filter are implemented. Also, the expression
* operations must be extended to allow windowing operations. * operations must be extended to allow windowing operations.
*/ */


class JavaStreamingTranslator extends TableTranslator { class JavaStreamingTranslator extends PlanTranslator {


type Representation[A] = DataStream[A] type Representation[A] = DataStream[A]


override def createTable[A]( override def createTable[A](
repr: Representation[A], repr: Representation[A],
inputType: CompositeType[A], inputType: CompositeType[A],
expressions: Array[Expression], expressions: Array[Expression],
resultFields: Seq[(String, TypeInformation[_])]): Table[this.type] = { resultFields: Seq[(String, TypeInformation[_])]): Table = {


val rowDataStream = createSelect(expressions, repr, inputType) val rowDataStream = createSelect(expressions, repr, inputType)


new Table(Root(rowDataStream, resultFields), this) new Table(Root(rowDataStream, resultFields))
} }


override def translate[A](op: Operation)(implicit tpe: TypeInformation[A]): DataStream[A] = { override def translate[A](op: PlanNode)(implicit tpe: TypeInformation[A]): DataStream[A] = {


if (tpe.getTypeClass == classOf[Row]) { if (tpe.getTypeClass == classOf[Row]) {
// shortcut for DataSet[Row] // shortcut for DataSet[Row]
Expand Down Expand Up @@ -112,13 +112,14 @@ class JavaStreamingTranslator extends TableTranslator {
resultSet.transform(opName, outputType, new MapInvokable[Row, A](function)) resultSet.transform(opName, outputType, new MapInvokable[Row, A](function))
} }


private def translateInternal(op: Operation): DataStream[Row] = { private def translateInternal(op: PlanNode): DataStream[Row] = {
op match { op match {
case Root(dataSet: DataStream[Row], resultFields) => case Root(dataSet: DataStream[Row], resultFields) =>
dataSet dataSet


case Root(_, _) => case Root(_, _) =>
throw new ExpressionException("Invalid Root for JavaStreamingTranslator: " + op) throw new ExpressionException("Invalid Root for JavaStreamingTranslator: " + op + ". " +
"Did you try converting a Table based on a DataSet to a DataStream or vice-versa?")


case GroupBy(_, fields) => case GroupBy(_, fields) =>
throw new ExpressionException("Dangling GroupBy operation. Did you forget a " + throw new ExpressionException("Dangling GroupBy operation. Did you forget a " +
Expand Down
Expand Up @@ -17,6 +17,7 @@
*/ */
package org.apache.flink.api.java.table package org.apache.flink.api.java.table


import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.DataSet import org.apache.flink.api.java.DataSet
import org.apache.flink.api.java.typeutils.TypeExtractor import org.apache.flink.api.java.typeutils.TypeExtractor
import org.apache.flink.api.table.Table import org.apache.flink.api.table.Table
Expand All @@ -43,17 +44,17 @@ class TableEnvironment {
* This will transform the set containing elements of two fields to a table where the fields * This will transform the set containing elements of two fields to a table where the fields
* are named a and b. * are named a and b.
*/ */
def toTable[T](set: DataSet[T], fields: String): Table[JavaBatchTranslator] = { def toTable[T](set: DataSet[T], fields: String): Table = {
new JavaBatchTranslator().createTable(set, fields).asInstanceOf[Table[JavaBatchTranslator]] new JavaBatchTranslator().createTable(set, fields)
} }


/** /**
* Transforms the given DataSet to a [[org.apache.flink.api.table.Table]]. * Transforms the given DataSet to a [[org.apache.flink.api.table.Table]].
* The fields of the DataSet type are used to name the * The fields of the DataSet type are used to name the
* [[org.apache.flink.api.table.Table]] fields. * [[org.apache.flink.api.table.Table]] fields.
*/ */
def toTable[T](set: DataSet[T]): Table[JavaBatchTranslator] = { def toTable[T](set: DataSet[T]): Table = {
new JavaBatchTranslator().createTable(set).asInstanceOf[Table[JavaBatchTranslator]] new JavaBatchTranslator().createTable(set)
} }


/** /**
Expand All @@ -69,18 +70,17 @@ class TableEnvironment {
* This will transform the set containing elements of two fields to a table where the fields * This will transform the set containing elements of two fields to a table where the fields
* are named a and b. * are named a and b.
*/ */
def toTable[T](set: DataStream[T], fields: String): Table[JavaStreamingTranslator] = { def toTable[T](set: DataStream[T], fields: String): Table = {
new JavaStreamingTranslator().createTable(set, fields) new JavaStreamingTranslator().createTable(set, fields)
.asInstanceOf[Table[JavaStreamingTranslator]]
} }


/** /**
* Transforms the given DataSet to a [[org.apache.flink.api.table.Table]]. * Transforms the given DataSet to a [[org.apache.flink.api.table.Table]].
* The fields of the DataSet type are used to name the * The fields of the DataSet type are used to name the
* [[org.apache.flink.api.table.Table]] fields. * [[org.apache.flink.api.table.Table]] fields.
*/ */
def toTable[T](set: DataStream[T]): Table[JavaStreamingTranslator] = { def toTable[T](set: DataStream[T]): Table = {
new JavaStreamingTranslator().createTable(set).asInstanceOf[Table[JavaStreamingTranslator]] new JavaStreamingTranslator().createTable(set)
} }


/** /**
Expand All @@ -90,10 +90,9 @@ class TableEnvironment {
* fields and the types must match. * fields and the types must match.
*/ */
@SuppressWarnings(Array("unchecked")) @SuppressWarnings(Array("unchecked"))
def toSet[T]( def toSet[T](table: Table, clazz: Class[T]): DataSet[T] = {
table: Table[JavaBatchTranslator], new JavaBatchTranslator().translate[T](table.operation)(
clazz: Class[T]): DataSet[T] = { TypeExtractor.createTypeInfo(clazz).asInstanceOf[TypeInformation[T]])
table.as(TypeExtractor.createTypeInfo(clazz)).asInstanceOf[DataSet[T]]
} }


/** /**
Expand All @@ -103,10 +102,10 @@ class TableEnvironment {
* fields and the types must match. * fields and the types must match.
*/ */
@SuppressWarnings(Array("unchecked")) @SuppressWarnings(Array("unchecked"))
def toStream[T]( def toStream[T](table: Table, clazz: Class[T]): DataStream[T] = {
table: Table[JavaStreamingTranslator], new JavaStreamingTranslator().translate[T](table.operation)(
clazz: Class[T]): DataStream[T] = { TypeExtractor.createTypeInfo(clazz).asInstanceOf[TypeInformation[T]])
table.as(TypeExtractor.createTypeInfo(clazz)).asInstanceOf[DataStream[T]]
} }
} }


Expand Up @@ -18,7 +18,7 @@
package org.apache.flink.api.scala.table package org.apache.flink.api.scala.table


import org.apache.flink.api.table._ import org.apache.flink.api.table._
import org.apache.flink.api.table.tree.{UnresolvedFieldReference, Expression} import org.apache.flink.api.table.expressions.{UnresolvedFieldReference, Expression}
import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.common.typeutils.CompositeType


import org.apache.flink.api.scala._ import org.apache.flink.api.scala._
Expand All @@ -40,7 +40,7 @@ class DataSetConversions[T](set: DataSet[T], inputType: CompositeType[T]) {
* This results in a [[Table]] that has field `a` of type `String` and field `b` * This results in a [[Table]] that has field `a` of type `String` and field `b`
* of type `Int`. * of type `Int`.
*/ */
def as(fields: Expression*): Table[ScalaBatchTranslator] = { def as(fields: Expression*): Table = {
new ScalaBatchTranslator().createTable(set, fields.toArray) new ScalaBatchTranslator().createTable(set, fields.toArray)
} }


Expand All @@ -58,7 +58,7 @@ class DataSetConversions[T](set: DataSet[T], inputType: CompositeType[T]) {
* Here, the result is a [[Table]] that has field `_1` of type `String` and field `_2` * Here, the result is a [[Table]] that has field `_1` of type `String` and field `_2`
* of type `Int`. * of type `Int`.
*/ */
def toTable: Table[ScalaBatchTranslator] = { def toTable: Table = {
val resultFields = inputType.getFieldNames.map(UnresolvedFieldReference) val resultFields = inputType.getFieldNames.map(UnresolvedFieldReference)
as(resultFields: _*) as(resultFields: _*)
} }
Expand Down
Expand Up @@ -20,7 +20,7 @@ package org.apache.flink.api.scala.table


import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.table._ import org.apache.flink.api.table._
import org.apache.flink.api.table.tree.{Expression, UnresolvedFieldReference} import org.apache.flink.api.table.expressions.{Expression, UnresolvedFieldReference}
import org.apache.flink.streaming.api.scala.DataStream import org.apache.flink.streaming.api.scala.DataStream


class DataStreamConversions[T](stream: DataStream[T], inputType: CompositeType[T]) { class DataStreamConversions[T](stream: DataStream[T], inputType: CompositeType[T]) {
Expand All @@ -37,11 +37,11 @@ class DataStreamConversions[T](stream: DataStream[T], inputType: CompositeType[T
* of type `Int`. * of type `Int`.
*/ */


def as(fields: Expression*): Table[ScalaStreamingTranslator] = { def as(fields: Expression*): Table = {
new ScalaStreamingTranslator().createTable( new ScalaStreamingTranslator().createTable(
stream, stream,
fields.toArray, fields.toArray,
checkDeterministicFields = true).asInstanceOf[Table[ScalaStreamingTranslator]] checkDeterministicFields = true)
} }


/** /**
Expand All @@ -59,7 +59,7 @@ class DataStreamConversions[T](stream: DataStream[T], inputType: CompositeType[T
* of type `Int`. * of type `Int`.
*/ */


def toTable: Table[ScalaStreamingTranslator] = { def toTable: Table = {
val resultFields = inputType.getFieldNames.map(UnresolvedFieldReference) val resultFields = inputType.getFieldNames.map(UnresolvedFieldReference)
as(resultFields: _*) as(resultFields: _*)
} }
Expand Down
Expand Up @@ -21,9 +21,9 @@ package org.apache.flink.api.scala.table


import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.java.table.JavaBatchTranslator import org.apache.flink.api.java.table.JavaBatchTranslator
import org.apache.flink.api.table.tree.Expression import org.apache.flink.api.table.expressions.Expression
import org.apache.flink.api.scala.wrap import org.apache.flink.api.scala.wrap
import org.apache.flink.api.table.operations._ import org.apache.flink.api.table.plan._
import org.apache.flink.api.table.Table import org.apache.flink.api.table.Table
import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.scala.DataSet import org.apache.flink.api.scala.DataSet
Expand All @@ -32,25 +32,25 @@ import scala.reflect.ClassTag




/** /**
* [[TableTranslator]] for creating [[Table]]s from Scala [[DataSet]]s and * [[PlanTranslator]] for creating [[Table]]s from Scala [[DataSet]]s and
* translating them back to Scala [[DataSet]]s. * translating them back to Scala [[DataSet]]s.
*/ */
class ScalaBatchTranslator extends TableTranslator { class ScalaBatchTranslator extends PlanTranslator {


private val javaTranslator = new JavaBatchTranslator private val javaTranslator = new JavaBatchTranslator


type Representation[A] = DataSet[A] type Representation[A] = DataSet[A]


def createTable[A]( def createTable[A](
repr: DataSet[A], repr: DataSet[A],
fields: Array[Expression]): Table[ScalaBatchTranslator] = { fields: Array[Expression]): Table = {


val result = javaTranslator.createTable(repr.javaSet, fields) val result = javaTranslator.createTable(repr.javaSet, fields)


new Table[ScalaBatchTranslator](result.operation, this) new Table(result.operation)
} }


override def translate[O](op: Operation)(implicit tpe: TypeInformation[O]): DataSet[O] = { override def translate[O](op: PlanNode)(implicit tpe: TypeInformation[O]): DataSet[O] = {
// fake it till you make it ... // fake it till you make it ...
wrap(javaTranslator.translate(op))(ClassTag.AnyRef.asInstanceOf[ClassTag[O]]) wrap(javaTranslator.translate(op))(ClassTag.AnyRef.asInstanceOf[ClassTag[O]])
} }
Expand All @@ -59,10 +59,10 @@ class ScalaBatchTranslator extends TableTranslator {
repr: Representation[A], repr: Representation[A],
inputType: CompositeType[A], inputType: CompositeType[A],
expressions: Array[Expression], expressions: Array[Expression],
resultFields: Seq[(String, TypeInformation[_])]): Table[this.type] = { resultFields: Seq[(String, TypeInformation[_])]): Table = {


val result = javaTranslator.createTable(repr.javaSet, inputType, expressions, resultFields) val result = javaTranslator.createTable(repr.javaSet, inputType, expressions, resultFields)


Table(result.operation, this) Table(result.operation)
} }
} }
Expand Up @@ -22,24 +22,24 @@ import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.java.table.JavaStreamingTranslator import org.apache.flink.api.java.table.JavaStreamingTranslator
import org.apache.flink.api.table.Table import org.apache.flink.api.table.Table
import org.apache.flink.api.table.operations._ import org.apache.flink.api.table.plan._
import org.apache.flink.api.table.tree.Expression import org.apache.flink.api.table.expressions.Expression
import org.apache.flink.streaming.api.scala.{DataStream, javaToScalaStream} import org.apache.flink.streaming.api.scala.{DataStream, javaToScalaStream}


/** /**
* [[TableTranslator]] for creating [[Table]]s from Scala [[DataStream]]s and * [[PlanTranslator]] for creating [[Table]]s from Scala [[DataStream]]s and
* translating them back to Scala [[DataStream]]s. * translating them back to Scala [[DataStream]]s.
* *
* This is very limited right now. Only select and filter are implemented. Also, the expression * This is very limited right now. Only select and filter are implemented. Also, the expression
* operations must be extended to allow windowing operations. * operations must be extended to allow windowing operations.
*/ */
class ScalaStreamingTranslator extends TableTranslator { class ScalaStreamingTranslator extends PlanTranslator {


private val javaTranslator = new JavaStreamingTranslator private val javaTranslator = new JavaStreamingTranslator


override type Representation[A] = DataStream[A] override type Representation[A] = DataStream[A]


override def translate[O](op: Operation)(implicit tpe: TypeInformation[O]): DataStream[O] = { override def translate[O](op: PlanNode)(implicit tpe: TypeInformation[O]): DataStream[O] = {
// fake it till you make it ... // fake it till you make it ...
javaToScalaStream(javaTranslator.translate(op)) javaToScalaStream(javaTranslator.translate(op))
} }
Expand All @@ -48,11 +48,11 @@ class ScalaStreamingTranslator extends TableTranslator {
repr: Representation[A], repr: Representation[A],
inputType: CompositeType[A], inputType: CompositeType[A],
expressions: Array[Expression], expressions: Array[Expression],
resultFields: Seq[(String, TypeInformation[_])]): Table[this.type] = { resultFields: Seq[(String, TypeInformation[_])]): Table = {


val result = val result =
javaTranslator.createTable(repr.getJavaStream, inputType, expressions, resultFields) javaTranslator.createTable(repr.getJavaStream, inputType, expressions, resultFields)


new Table(result.operation, this) new Table(result.operation)
} }
} }

0 comments on commit 2d55cf0

Please sign in to comment.