Skip to content

Commit

Permalink
[FLINK-12799][table] Improve expression based TableSchema extraction …
Browse files Browse the repository at this point in the history
…from DataStream/DataSet
  • Loading branch information
dawidwys committed Jun 13, 2019
1 parent 180fe43 commit d894d35
Show file tree
Hide file tree
Showing 8 changed files with 474 additions and 476 deletions.

Large diffs are not rendered by default.

Expand Up @@ -32,7 +32,7 @@ import org.apache.flink.table.catalog.CatalogManager
import org.apache.flink.table.descriptors.{BatchTableDescriptor, ConnectorDescriptor}
import org.apache.flink.table.explain.PlanJsonParser
import org.apache.flink.table.expressions.BuiltInFunctionDefinitions.TIME_ATTRIBUTES
import org.apache.flink.table.expressions.{CallExpression, Expression}
import org.apache.flink.table.expressions.{CallExpression, Expression, ExpressionDefaultVisitor}
import org.apache.flink.table.operations.DataSetQueryOperation
import org.apache.flink.table.plan.nodes.FlinkConventions
import org.apache.flink.table.plan.nodes.dataset.DataSetRel
Expand All @@ -42,10 +42,11 @@ import org.apache.flink.table.runtime.MapRunner
import org.apache.flink.table.sinks._
import org.apache.flink.table.sources.{BatchTableSource, InputFormatTableSource, TableSource, TableSourceUtil}
import org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo
import org.apache.flink.table.typeutils.FieldInfoUtils.{calculateTableSchema, getFieldsInfo, validateInputTypeInfo}
import org.apache.flink.table.typeutils.FieldInfoUtils.{getFieldsInfo, validateInputTypeInfo}
import org.apache.flink.table.utils.TableConnectorUtils
import org.apache.flink.types.Row

import _root_.scala.collection.JavaConverters._
/**
* The abstract base class for the implementation of batch TableEnvironments.
*
Expand Down Expand Up @@ -206,23 +207,35 @@ abstract class BatchTableEnvImpl(

val fieldsInfo = fields match {
case Some(f) =>
if (f.exists(f =>
f.isInstanceOf[CallExpression] &&
TIME_ATTRIBUTES.contains(f.asInstanceOf[CallExpression].getFunctionDefinition))) {
throw new ValidationException(
".rowtime and .proctime time indicators are not allowed in a batch environment.")
}

checkNoTimeAttributes(f)
getFieldsInfo[T](inputType, f)

case None => getFieldsInfo[T](inputType)
}

val tableOperation = new DataSetQueryOperation[T](dataSet,
val tableOperation = new DataSetQueryOperation[T](
dataSet,
fieldsInfo.getIndices,
calculateTableSchema(inputType, fieldsInfo.getIndices, fieldsInfo.getFieldNames))
fieldsInfo.toTableSchema)
tableOperation
}

private def checkNoTimeAttributes[T](f: Array[Expression]) = {
if (f.exists(f =>
f.accept(new ExpressionDefaultVisitor[Boolean] {

override def visitCall(call: CallExpression): Boolean = {
TIME_ATTRIBUTES.contains(call.getFunctionDefinition) ||
call.getChildren.asScala.exists(_.accept(this))
}

override protected def defaultMethod(expression: Expression): Boolean = false
}))) {
throw new ValidationException(
".rowtime and .proctime time indicators are not allowed in a batch environment.")
}
}

/**
* Returns the built-in normalization rules that are defined by the environment.
*/
Expand Down
Expand Up @@ -28,7 +28,6 @@ import org.apache.calcite.sql2rel.RelDecorrelator
import org.apache.calcite.tools.{RuleSet, RuleSets}
import org.apache.flink.api.common.functions.MapFunction
import org.apache.flink.api.common.typeinfo.{SqlTimeTypeInfo, TypeInformation}
import org.apache.flink.api.common.typeutils.CompositeType
import org.apache.flink.api.java.tuple.{Tuple2 => JTuple2}
import org.apache.flink.api.java.typeutils.{RowTypeInfo, TupleTypeInfo}
import org.apache.flink.api.scala.typeutils.CaseClassTypeInfo
Expand All @@ -52,8 +51,7 @@ import org.apache.flink.table.runtime.{CRowMapRunner, OutputRowtimeProcessFuncti
import org.apache.flink.table.sinks._
import org.apache.flink.table.sources.{StreamTableSource, TableSource, TableSourceUtil}
import org.apache.flink.table.types.utils.TypeConversions.fromDataTypeToLegacyInfo
import org.apache.flink.table.typeutils.FieldInfoUtils.{calculateTableSchema, getFieldsInfo, isReferenceByPosition}
import org.apache.flink.table.typeutils.{TimeIndicatorTypeInfo, TypeCheckUtils}
import org.apache.flink.table.typeutils.FieldInfoUtils.getFieldsInfo

import _root_.scala.collection.JavaConverters._

Expand Down Expand Up @@ -325,239 +323,31 @@ abstract class StreamTableEnvImpl(
val streamType = dataStream.getType

// get field names and types for all non-replaced fields
val (indices, names) = fields match {
val fieldsInfo = fields match {
case Some(f) =>
// validate and extract time attributes
val fieldsInfo = getFieldsInfo[T](streamType, f)
val (rowtime, proctime) = validateAndExtractTimeAttributes(streamType, f)

// check if event-time is enabled
if (rowtime.isDefined &&
if (fieldsInfo.isRowtimeDefined &&
execEnv.getStreamTimeCharacteristic != TimeCharacteristic.EventTime) {
throw new TableException(
throw new ValidationException(
s"A rowtime attribute requires an EventTime time characteristic in stream environment" +
s". But is: ${execEnv.getStreamTimeCharacteristic}")
}

// adjust field indexes and field names
val indexesWithIndicatorFields = adjustFieldIndexes(
fieldsInfo.getIndices,
rowtime,
proctime)
val namesWithIndicatorFields = adjustFieldNames(
fieldsInfo.getFieldNames,
rowtime,
proctime)

(indexesWithIndicatorFields, namesWithIndicatorFields)
fieldsInfo
case None =>
val fieldsInfo = getFieldsInfo[T](streamType)
(fieldsInfo.getIndices, fieldsInfo.getFieldNames)
getFieldsInfo[T](streamType)
}

val dataStreamTable = new DataStreamQueryOperation(
dataStream,
indices,
calculateTableSchema(streamType, indices, names))
fieldsInfo.getIndices,
fieldsInfo.toTableSchema)
dataStreamTable
}

/**
* Checks for at most one rowtime and proctime attribute.
* Returns the time attributes.
*
* @return rowtime attribute and proctime attribute
*/
private def validateAndExtractTimeAttributes(
streamType: TypeInformation[_],
exprs: Array[Expression])
: (Option[(Int, String)], Option[(Int, String)]) = {

val (isRefByPos, fieldTypes) = streamType match {
case c: CompositeType[_] =>
// determine schema definition mode (by position or by name)
(isReferenceByPosition(c, exprs), (0 until c.getArity).map(i => c.getTypeAt(i)).toArray)
case t: TypeInformation[_] =>
(false, Array(t))
}

var fieldNames: List[String] = Nil
var rowtime: Option[(Int, String)] = None
var proctime: Option[(Int, String)] = None

def checkRowtimeType(t: TypeInformation[_]): Unit = {
if (!(TypeCheckUtils.isLong(t) || TypeCheckUtils.isTimePoint(t))) {
throw new TableException(
s"The rowtime attribute can only replace a field with a valid time type, " +
s"such as Timestamp or Long. But was: $t")
}
}

def extractRowtime(idx: Int, name: String, origName: Option[String]): Unit = {
if (rowtime.isDefined) {
throw new TableException(
"The rowtime attribute can only be defined once in a table schema.")
} else {
// if the fields are referenced by position,
// it is possible to replace an existing field or append the time attribute at the end
if (isRefByPos) {
// aliases are not permitted
if (origName.isDefined) {
throw new TableException(
s"Invalid alias '${origName.get}' because fields are referenced by position.")
}
// check type of field that is replaced
if (idx < fieldTypes.length) {
checkRowtimeType(fieldTypes(idx))
}
}
// check reference-by-name
else {
val aliasOrName = origName.getOrElse(name)
streamType match {
// both alias and reference must have a valid type if they replace a field
case ct: CompositeType[_] if ct.hasField(aliasOrName) =>
val t = ct.getTypeAt(ct.getFieldIndex(aliasOrName))
checkRowtimeType(t)
// alias could not be found
case _ if origName.isDefined =>
throw new TableException(s"Alias '${origName.get}' must reference an existing field.")
case _ => // ok
}
}

rowtime = Some(idx, name)
}
}

def extractProctime(idx: Int, name: String): Unit = {
if (proctime.isDefined) {
throw new TableException(
"The proctime attribute can only be defined once in a table schema.")
} else {
// if the fields are referenced by position,
// it is only possible to append the time attribute at the end
if (isRefByPos) {

// check that proctime is only appended
if (idx < fieldTypes.length) {
throw new TableException(
"The proctime attribute can only be appended to the table schema and not replace " +
s"an existing field. Please move '$name' to the end of the schema.")
}
}
// check reference-by-name
else {
streamType match {
// proctime attribute must not replace a field
case ct: CompositeType[_] if ct.hasField(name) =>
throw new TableException(
s"The proctime attribute '$name' must not replace an existing field.")
case _ => // ok
}
}
proctime = Some(idx, name)
}
}

val bridgedFields = exprs.map(expressionBridge.bridge).toArray[Expression]
bridgedFields.zipWithIndex.foreach {
case (RowtimeAttribute(UnresolvedFieldReference(name)), idx) =>
extractRowtime(idx, name, None)

case (Alias(RowtimeAttribute(UnresolvedFieldReference(origName)), name, _), idx) =>
extractRowtime(idx, name, Some(origName))

case (ProctimeAttribute(UnresolvedFieldReference(name)), idx) =>
extractProctime(idx, name)

case (Alias(ProctimeAttribute(UnresolvedFieldReference(_)), name, _), idx) =>
extractProctime(idx, name)

case (UnresolvedFieldReference(name), _) => fieldNames = name :: fieldNames

case (Alias(UnresolvedFieldReference(_), name, _), _) => fieldNames = name :: fieldNames

case (e, _) =>
throw new TableException(s"Time attributes can only be defined on field references. " +
s"Rowtime attributes can replace existing fields, proctime attributes can not. " +
s"But was: $e")
}

if (rowtime.isDefined && fieldNames.contains(rowtime.get._2)) {
throw new TableException(
"The rowtime attribute may not have the same name as an another field.")
}

if (proctime.isDefined && fieldNames.contains(proctime.get._2)) {
throw new TableException(
"The proctime attribute may not have the same name as an another field.")
}

(rowtime, proctime)
}

/**
* Injects markers for time indicator fields into the field indexes.
*
* @param fieldIndexes The field indexes into which the time indicators markers are injected.
* @param rowtime An optional rowtime indicator
* @param proctime An optional proctime indicator
* @return An adjusted array of field indexes.
*/
private def adjustFieldIndexes(
fieldIndexes: Array[Int],
rowtime: Option[(Int, String)],
proctime: Option[(Int, String)]): Array[Int] = {

// inject rowtime field
val withRowtime = rowtime match {
case Some(rt) =>
fieldIndexes.patch(rt._1, Seq(TimeIndicatorTypeInfo.ROWTIME_STREAM_MARKER), 0)
case _ =>
fieldIndexes
}

// inject proctime field
val withProctime = proctime match {
case Some(pt) =>
withRowtime.patch(pt._1, Seq(TimeIndicatorTypeInfo.PROCTIME_STREAM_MARKER), 0)
case _ =>
withRowtime
}

withProctime
}

/**
* Injects names of time indicator fields into the list of field names.
*
* @param fieldNames The array of field names into which the time indicator field names are
* injected.
* @param rowtime An optional rowtime indicator
* @param proctime An optional proctime indicator
* @return An adjusted array of field names.
*/
private def adjustFieldNames(
fieldNames: Array[String],
rowtime: Option[(Int, String)],
proctime: Option[(Int, String)]): Array[String] = {

// inject rowtime field
val withRowtime = rowtime match {
case Some(rt) => fieldNames.patch(rt._1, Seq(rowtime.get._2), 0)
case _ => fieldNames
}

// inject proctime field
val withProctime = proctime match {
case Some(pt) => withRowtime.patch(pt._1, Seq(proctime.get._2), 0)
case _ => withRowtime
}

withProctime
}

/**
* Returns the decoration rule set for this environment
* including a custom RuleSet configuration.
Expand Down

0 comments on commit d894d35

Please sign in to comment.