From 21b682d84db4322c57b072a1ba8922fd165e9966 Mon Sep 17 00:00:00 2001 From: sunjincheng121 Date: Wed, 7 Jun 2017 08:04:55 +0800 Subject: [PATCH] TimeIndicators --- .../table/api/StreamTableEnvironment.scala | 66 ++++++++++--- .../datastream/TimeAttributesITCase.scala | 93 +++++++++++++++++-- 2 files changed, 136 insertions(+), 23 deletions(-) diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala index bc5038dda6b65..172588ef285d7 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/StreamTableEnvironment.scala @@ -32,12 +32,14 @@ import org.apache.calcite.tools.{RuleSet, RuleSets} import org.apache.flink.api.common.functions.MapFunction import org.apache.flink.api.common.typeinfo.{AtomicType, TypeInformation} import org.apache.flink.api.common.typeutils.CompositeType -import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2} +import org.apache.flink.api.java.tuple.{Tuple, Tuple2 => JTuple2} import org.apache.flink.api.java.typeutils.TupleTypeInfo import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment +import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks +import org.apache.flink.streaming.api.watermark.Watermark import org.apache.flink.table.calcite.RelTimeIndicatorConverter import org.apache.flink.table.explain.PlanJsonParser import org.apache.flink.table.expressions.{Expression, ProctimeAttribute, RowtimeAttribute, UnresolvedFieldReference} @@ -50,8 +52,8 @@ import org.apache.flink.table.runtime.{CRowInputJavaTupleOutputMapRunner, CRowIn import org.apache.flink.table.sinks.{AppendStreamTableSink, RetractStreamTableSink, TableSink, UpsertStreamTableSink} import org.apache.flink.table.sources.{DefinedRowtimeAttribute, StreamTableSource, TableSource} import org.apache.flink.table.typeutils.TypeCheckUtils -import org.apache.flink.types.Row - +import org.apache.flink.types.{LongValue, Record, Row} +import org.apache.flink.streaming.api.datastream.{DataStreamSource,SingleOutputStreamOperator} import _root_.scala.collection.JavaConverters._ /** @@ -398,7 +400,8 @@ abstract class StreamTableEnvironment( val (fieldNames, fieldIndexes) = getFieldInfo[T](streamType, fields) // validate and extract time attributes - val (rowtime, proctime) = validateAndExtractTimeAttributes(streamType, fields) + val (rowtime, proctime, needDefaultTimestampAssigner) = + validateAndExtractTimeAttributes(streamType, fields) // check if event-time is enabled if (rowtime.isDefined && execEnv.getStreamTimeCharacteristic != TimeCharacteristic.EventTime) { @@ -407,13 +410,44 @@ abstract class StreamTableEnvironment( s"But is: ${execEnv.getStreamTimeCharacteristic}") } - val dataStreamTable = new DataStreamTable[T]( - dataStream, - fieldIndexes, - fieldNames, - rowtime, - proctime - ) + if (rowtime.isDefined) { + if (dataStream.isInstanceOf[DataStreamSource[_]] && !needDefaultTimestampAssigner) { + throw new TableException( + "[.rowtime] on virtual column must call [assignTimestampsAndWatermarks] method.") + } else if (!dataStream.isInstanceOf[DataStreamSource[_]] && needDefaultTimestampAssigner) { + throw new TableException( + "[.rowtime] on already existing column must must not call " + + "[assignTimestampsAndWatermarks] method.") + } + + if (needDefaultTimestampAssigner) { + val streamOperator = dataStream.assignTimestampsAndWatermarks( + new AssignerWithPunctuatedWatermarks[T] { + override def checkAndGetNextWatermark( + lastElement: T, extractedTimestamp: Long): Watermark = { + new Watermark(extractedTimestamp) + } + override def extractTimestamp( + element: T, + previousElementTimestamp: Long): Long = { + element match { + case e: Product => e.productElement(rowtime.get._1).asInstanceOf[Long] + case e: Row => e.getField(rowtime.get._1).asInstanceOf[Long] + case e: Tuple => e.getField(rowtime.get._1).asInstanceOf[Long] + case e: Record => e.getField(rowtime.get._1, classOf[LongValue]).getValue + case _ => + throw TableException("xxxxxxxxxxxxxxxx") + } + } + }) + val dataStreamTable = + new DataStreamTable[T](streamOperator, fieldIndexes, fieldNames, rowtime, proctime) + return registerTableInternal(name, dataStreamTable) + } + } + + val dataStreamTable = + new DataStreamTable[T](dataStream, fieldIndexes, fieldNames, rowtime, proctime) registerTableInternal(name, dataStreamTable) } @@ -426,7 +460,7 @@ abstract class StreamTableEnvironment( private def validateAndExtractTimeAttributes( streamType: TypeInformation[_], exprs: Array[Expression]) - : (Option[(Int, String)], Option[(Int, String)]) = { + : (Option[(Int, String)], Option[(Int, String)], Boolean) = { val fieldTypes: Array[TypeInformation[_]] = streamType match { case c: CompositeType[_] => (0 until c.getArity).map(i => c.getTypeAt(i)).toArray @@ -482,7 +516,13 @@ abstract class StreamTableEnvironment( "The proctime attribute may not have the same name as an another field.") } - (rowtime, proctime) + val needDefaultTimestampAssigner = if (proctime.isDefined) { + rowtime.isDefined && exprs.length == (fieldTypes.length + 1) + } else { + rowtime.isDefined && exprs.length == fieldTypes.length + } + + (rowtime, proctime, needDefaultTimestampAssigner) } /** diff --git a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala index c25dfdfb6211a..0c9a1c664f4d2 100644 --- a/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala +++ b/flink-libraries/flink-table/src/test/scala/org/apache/flink/table/runtime/datastream/TimeAttributesITCase.scala @@ -58,7 +58,6 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { val tEnv = TableEnvironment.getTableEnvironment(env) val stream = env .fromCollection(data) - .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string) } @@ -69,7 +68,6 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { val tEnv = TableEnvironment.getTableEnvironment(env) val stream = env .fromCollection(data) - .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string) .select('rowtime.rowtime) } @@ -81,7 +79,6 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { val tEnv = TableEnvironment.getTableEnvironment(env) val stream = env .fromCollection(data) - .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) stream .toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string) .window(Tumble over 2.millis on 'rowtime as 'w) @@ -89,6 +86,87 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { .select('w.end.rowtime, 'int.count as 'int) // no rowtime on non-window reference } + @Test(expected = classOf[TableException]) + def test1(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + val stream = env + .fromCollection(data).assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) + stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string) + + } + + @Test(expected = classOf[TableException]) + def test2(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val stream = env + .fromCollection(data) + val table = stream.toTable( + tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string, 'rowtime.rowtime) + } + + @Test + def testDefaultTimestampAssigner(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val stream = env + .fromCollection(data) + val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string) + + val t = table.select('rowtime.cast(Types.STRING)) + + val results = t.toAppendStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq( + "1970-01-01 00:00:00.001", + "1970-01-01 00:00:00.002", + "1970-01-01 00:00:00.003", + "1970-01-01 00:00:00.004", + "1970-01-01 00:00:00.007", + "1970-01-01 00:00:00.008", + "1970-01-01 00:00:00.016") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + + @Test + def testDefaultTimestampAssigner2(): Unit = { + val env = StreamExecutionEnvironment.getExecutionEnvironment + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) + val tEnv = TableEnvironment.getTableEnvironment(env) + StreamITCase.testResults = mutable.MutableList() + + val stream = env + .fromCollection(data) + val table = stream.toTable( + tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string, 'proc.proctime) + + val t = table.select('rowtime.cast(Types.STRING)) + + val results = t.toAppendStream[Row] + results.addSink(new StreamITCase.StringSink) + env.execute() + + val expected = Seq( + "1970-01-01 00:00:00.001", + "1970-01-01 00:00:00.002", + "1970-01-01 00:00:00.003", + "1970-01-01 00:00:00.004", + "1970-01-01 00:00:00.007", + "1970-01-01 00:00:00.008", + "1970-01-01 00:00:00.016") + assertEquals(expected.sorted, StreamITCase.testResults.sorted) + } + @Test def testCalcMaterialization(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment @@ -99,7 +177,8 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { val stream = env .fromCollection(data) .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) - val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string) + val table = stream.toTable( + tEnv, 'long, 'int, 'double, 'float, 'bigdec, 'string, 'rowtime.rowtime) val t = table.select('rowtime.cast(Types.STRING)) @@ -127,7 +206,6 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { val stream = env .fromCollection(data) - .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string) val t = table @@ -154,7 +232,6 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { val stream = env .fromCollection(data) - .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) val table = stream.toTable( tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string, 'proctime.proctime) val func = new TableFunc @@ -185,7 +262,6 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { val stream = env .fromCollection(data) - .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) val table = stream.toTable( tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string, 'proctime.proctime) val func = new TableFunc @@ -216,7 +292,6 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { val stream = env .fromCollection(data) - .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) val table = stream.toTable( tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string) @@ -253,7 +328,6 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { val stream = env .fromCollection(data) - .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string) tEnv.registerTable("MyTable", table) @@ -282,7 +356,6 @@ class TimeAttributesITCase extends StreamingMultipleProgramsTestBase { val stream = env .fromCollection(data) - .assignTimestampsAndWatermarks(new TimestampWithEqualWatermark()) val table = stream.toTable(tEnv, 'rowtime.rowtime, 'int, 'double, 'float, 'bigdec, 'string) val t = table