Skip to content

Commit

Permalink
[FLINK-12844][table-planner-blink] Use default conversion class Local…
Browse files Browse the repository at this point in the history
…Date/LocalTime/LocalDateTime for DateType/TimeType/TimestampType

This closes #8762
  • Loading branch information
JingsongLi authored and KurtYoung committed Jul 8, 2019
1 parent 53ccd70 commit 2e09d94
Show file tree
Hide file tree
Showing 60 changed files with 1,471 additions and 1,060 deletions.
Expand Up @@ -129,6 +129,27 @@ public static TypeInformation<java.sql.Timestamp> SQL_TIMESTAMP() {
return org.apache.flink.api.common.typeinfo.Types.SQL_TIMESTAMP;
}

/**
* Returns type information for a Table API LocalDate type.
*/
public static TypeInformation<java.time.LocalDate> LOCAL_DATE() {
return org.apache.flink.api.common.typeinfo.Types.LOCAL_DATE;
}

/**
* Returns type information for a Table API LocalTime type.
*/
public static TypeInformation<java.time.LocalTime> LOCAL_TIME() {
return org.apache.flink.api.common.typeinfo.Types.LOCAL_TIME;
}

/**
* Returns type information for a Table API LocalDateTime type.
*/
public static TypeInformation<java.time.LocalDateTime> LOCAL_DATE_TIME() {
return org.apache.flink.api.common.typeinfo.Types.LOCAL_DATE_TIME;
}

/**
* Returns type information for a Table API interval of months.
*/
Expand Down
Expand Up @@ -48,7 +48,9 @@
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -99,6 +101,9 @@ public final class LegacyTypeInfoDataTypeConverter {
addMapping(Types.FLOAT, DataTypes.FLOAT().bridgedTo(Float.class));
addMapping(Types.DOUBLE, DataTypes.DOUBLE().bridgedTo(Double.class));
addMapping(Types.BIG_DEC, createLegacyType(LogicalTypeRoot.DECIMAL, Types.BIG_DEC));
addMapping(Types.LOCAL_DATE, DataTypes.DATE().bridgedTo(LocalDate.class));
addMapping(Types.LOCAL_TIME, DataTypes.TIME(0).bridgedTo(LocalTime.class));
addMapping(Types.LOCAL_DATE_TIME, DataTypes.TIMESTAMP(3).bridgedTo(LocalDateTime.class));
addMapping(Types.SQL_DATE, DataTypes.DATE().bridgedTo(java.sql.Date.class));
addMapping(Types.SQL_TIME, DataTypes.TIME(0).bridgedTo(java.sql.Time.class));
addMapping(Types.SQL_TIMESTAMP, DataTypes.TIMESTAMP(3).bridgedTo(java.sql.Timestamp.class));
Expand Down
Expand Up @@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.codegen.CodeGenUtils._
import org.apache.flink.table.codegen.GenerateUtils.generateRecordStatement
import org.apache.flink.table.dataformat.GenericRow
import org.apache.flink.table.dataformat.{DataFormatConverters, GenericRow}
import org.apache.flink.table.functions.{FunctionContext, UserDefinedFunction}
import org.apache.flink.table.runtime.TableStreamOperator
import org.apache.flink.table.runtime.util.collections._
Expand Down Expand Up @@ -433,22 +433,21 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
}

/**
* Adds a reusable local timestamp to the beginning of the SAM of the generated class.
*/
def addReusableLocalTimestamp(): String = {
addReusableTimestamp()
}

/**
* Adds a reusable time to the beginning of the SAM of the generated class.
* Adds a reusable time to the beginning of the SAM of the generated [[Function]].
*/
def addReusableTime(): String = {
val fieldTerm = s"time"

val timestamp = addReusableTimestamp()

// declaration
reusableMemberStatements.add(s"private int $fieldTerm;")

// assignment
// adopted from org.apache.calcite.runtime.SqlFunctions.currentTime()
val field =
s"""
|final int $fieldTerm = (int) ($timestamp % ${DateTimeUtils.MILLIS_PER_DAY});
|$fieldTerm = (int) ($timestamp % ${DateTimeUtils.MILLIS_PER_DAY});
|if (time < 0) {
| time += ${DateTimeUtils.MILLIS_PER_DAY};
|}
Expand All @@ -457,19 +456,43 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
fieldTerm
}

/**
* Adds a reusable local date time to the beginning of the SAM of the generated class.
*/
def addReusableLocalDateTime(): String = {
val fieldTerm = s"localtimestamp"

val timestamp = addReusableTimestamp()

// declaration
reusableMemberStatements.add(s"private long $fieldTerm;")

// assignment
val field =
s"""
|$fieldTerm = $timestamp + java.util.TimeZone.getDefault().getOffset($timestamp);
|""".stripMargin
reusablePerRecordStatements.add(field)
fieldTerm
}

/**
* Adds a reusable local time to the beginning of the SAM of the generated class.
*/
def addReusableLocalTime(): String = {
val fieldTerm = s"localtime"
val timeZone = addReusableTimeZone()
val localtimestamp = addReusableLocalTimestamp()

val localtimestamp = addReusableLocalDateTime()

// declaration
reusableMemberStatements.add(s"private int $fieldTerm;")

// assignment
// adopted from org.apache.calcite.runtime.SqlFunctions.localTime()
val field =
s"""
|final int $fieldTerm = (int) ( ($localtimestamp + $timeZone.getOffset($localtimestamp))
| % ${DateTimeUtils.MILLIS_PER_DAY});
|""".stripMargin
s"""
|$fieldTerm = (int) ($localtimestamp % ${DateTimeUtils.MILLIS_PER_DAY});
|""".stripMargin
reusablePerRecordStatements.add(field)
fieldTerm
}
Expand All @@ -479,15 +502,18 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
*/
def addReusableDate(): String = {
val fieldTerm = s"date"

val timestamp = addReusableTimestamp()
val time = addReusableTime()
val timeZone = addReusableTimeZone()

// declaration
reusableMemberStatements.add(s"private int $fieldTerm;")

// assignment
// adopted from org.apache.calcite.runtime.SqlFunctions.currentDate()
val field =
s"""
|final int $fieldTerm = (int) (($timestamp + $timeZone.getOffset($timestamp))
| / ${DateTimeUtils.MILLIS_PER_DAY});
|$fieldTerm = (int) ($timestamp / ${DateTimeUtils.MILLIS_PER_DAY});
|if ($time < 0) {
| $fieldTerm -= 1;
|}
Expand Down
Expand Up @@ -369,14 +369,8 @@ object GenerateUtils {
generateNonNullLiteral(literalType, literalValue.toString, literalValue)

case TIMESTAMP_WITHOUT_TIME_ZONE =>
// Hack
// Currently, in RexLiteral/SqlLiteral(Calcite), TimestampString has no time zone.
// TimeString, DateString TimestampString are treated as UTC time/(unix time)
// when they are converted/formatted/validated
// Here, we adjust millis before Calcite solve TimeZone perfectly
val millis = literalValue.asInstanceOf[Long]
val adjustedValue = millis - ctx.tableConfig.getTimeZone.getOffset(millis)
generateNonNullLiteral(literalType, adjustedValue.toString + "L", adjustedValue)
generateNonNullLiteral(literalType, millis + "L", millis)

case INTERVAL_YEAR_MONTH =>
val decimal = BigDecimal(literalValue.asInstanceOf[JBigDecimal])
Expand Down
Expand Up @@ -217,6 +217,11 @@ object BuiltInMethods {
classOf[Int])

val TIMESTAMP_TO_STRING = Types.lookupMethod(
classOf[SqlDateTimeUtils],
"timestampToString",
classOf[Long], classOf[Int])

val TIMESTAMP_TO_STRING_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils],
"timestampToString",
classOf[Long], classOf[Int], classOf[TimeZone])
Expand All @@ -232,24 +237,43 @@ object BuiltInMethods {
val NOW_OFFSET = Types.lookupMethod(
classOf[SqlDateTimeUtils], "now", classOf[Long])

val DATE_FORMAT_STRING_STRING_STRING = Types.lookupMethod(
val DATE_FORMAT_STRING_STRING_STRING_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateFormat", classOf[String],
classOf[String], classOf[String], classOf[TimeZone])

val DATE_FORMAT_STIRNG_STRING = Types.lookupMethod(
val DATE_FORMAT_STIRNG_STRING_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateFormat", classOf[String], classOf[String], classOf[TimeZone])

val DATE_FORMAT_LONG_STRING = Types.lookupMethod(
val DATE_FORMAT_LONG_STRING_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateFormat", classOf[Long], classOf[String], classOf[TimeZone])

val DATE_FORMAT_STRING_STRING_STRING = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateFormat", classOf[String],
classOf[String], classOf[String])

val DATE_FORMAT_STIRNG_STRING = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateFormat", classOf[String], classOf[String])

val DATE_FORMAT_LONG_STRING = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateFormat", classOf[Long], classOf[String])

val UNIX_TIMESTAMP_FORMAT = Types.lookupMethod(
classOf[SqlDateTimeUtils],
"unixTimestamp",
classOf[String],
classOf[String])

val UNIX_TIMESTAMP_FORMAT_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils],
"unixTimestamp",
classOf[String],
classOf[String],
classOf[TimeZone])

val UNIX_TIMESTAMP_STR = Types.lookupMethod(
classOf[SqlDateTimeUtils], "unixTimestamp", classOf[String])

val UNIX_TIMESTAMP_STR_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "unixTimestamp", classOf[String], classOf[TimeZone])

val UNIX_TIMESTAMP = Types.lookupMethod(
Expand All @@ -259,41 +283,77 @@ object BuiltInMethods {
classOf[SqlDateTimeUtils], "unixTimestamp", classOf[Long])

val FROM_UNIXTIME_FORMAT = Types.lookupMethod(
classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Long], classOf[String], classOf[TimeZone])
classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Long], classOf[String])

val FROM_UNIXTIME = Types.lookupMethod(
classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Long], classOf[TimeZone])
classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Long])

val FROM_UNIXTIME_AS_DOUBLE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Double], classOf[TimeZone])
classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Double])

val FROM_UNIXTIME_AS_DECIMAL = Types.lookupMethod(
classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Decimal])

val FROM_UNIXTIME_FORMAT_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Long], classOf[String], classOf[TimeZone])

val FROM_UNIXTIME_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Long], classOf[TimeZone])

val FROM_UNIXTIME_AS_DOUBLE_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Double], classOf[TimeZone])

val FROM_UNIXTIME_AS_DECIMAL_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Decimal], classOf[TimeZone])

val DATEDIFF_T_S = Types.lookupMethod(
val DATEDIFF_T_S_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateDiff", classOf[Long], classOf[String], classOf[TimeZone])

val DATEDIFF_S_S = Types.lookupMethod(
val DATEDIFF_S_S_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateDiff", classOf[String], classOf[String], classOf[TimeZone])

val DATEDIFF_S_T = Types.lookupMethod(
val DATEDIFF_S_T_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateDiff", classOf[String], classOf[Long], classOf[TimeZone])

val DATEDIFF_T_T = Types.lookupMethod(
val DATEDIFF_T_T_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateDiff", classOf[Long], classOf[Long], classOf[TimeZone])

val DATE_SUB_S = Types.lookupMethod(
val DATEDIFF_T_S = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateDiff", classOf[Long], classOf[String])

val DATEDIFF_S_S = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateDiff", classOf[String], classOf[String])

val DATEDIFF_S_T = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateDiff", classOf[String], classOf[Long])

val DATEDIFF_T_T = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateDiff", classOf[Long], classOf[Long])

val DATE_SUB_S_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateSub", classOf[String], classOf[Int], classOf[TimeZone])

val DATE_SUB_T = Types.lookupMethod(
val DATE_SUB_T_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateSub", classOf[Long], classOf[Int], classOf[TimeZone])

val DATE_ADD_S = Types.lookupMethod(
val DATE_SUB_S = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateSub", classOf[String], classOf[Int])

val DATE_SUB_T = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateSub", classOf[Long], classOf[Int])

val DATE_ADD_S_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateAdd", classOf[String], classOf[Int], classOf[TimeZone])

val DATE_ADD_T = Types.lookupMethod(
val DATE_ADD_T_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateAdd", classOf[Long], classOf[Int], classOf[TimeZone])

val DATE_ADD_S = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateAdd", classOf[String], classOf[Int])

val DATE_ADD_T = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateAdd", classOf[Long], classOf[Int])

val INT_TO_DATE = Types.lookupMethod(
classOf[SqlDateTimeUtils],
"toDate",
Expand All @@ -317,9 +377,19 @@ object BuiltInMethods {
val STRING_TO_TIMESTAMP = Types.lookupMethod(
classOf[SqlDateTimeUtils],
"toTimestamp",
classOf[String], classOf[TimeZone])
classOf[String])

val STRING_TO_TIMESTAMP_WITH_FORMAT = Types.lookupMethod(
classOf[SqlDateTimeUtils],
"toTimestamp",
classOf[String], classOf[String])

val STRING_TO_TIMESTAMP_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils],
"toTimestamp",
classOf[String], classOf[TimeZone])

val STRING_TO_TIMESTAMP_WITH_FORMAT_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils],
"toTimestamp",
classOf[String], classOf[String], classOf[TimeZone])
Expand All @@ -329,7 +399,7 @@ object BuiltInMethods {
"fromTimestamp",
classOf[Long])

val EXTRACT_FROM_TIMESTAMP = Types.lookupMethod(
val EXTRACT_FROM_TIMESTAMP_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils],
"extractFromTimestamp",
classOf[TimeUnitRange], classOf[Long], classOf[TimeZone])
Expand All @@ -349,12 +419,12 @@ object BuiltInMethods {
"extractYearMonth",
classOf[TimeUnitRange], classOf[Int])

val TIMESTAMP_FLOOR = Types.lookupMethod(
val TIMESTAMP_FLOOR_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils],
"timestampFloor",
classOf[TimeUnitRange], classOf[Long], classOf[TimeZone])

val TIMESTAMP_CEIL = Types.lookupMethod(
val TIMESTAMP_CEIL_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils],
"timestampCeil",
classOf[TimeUnitRange], classOf[Long], classOf[TimeZone])
Expand Down
Expand Up @@ -38,7 +38,7 @@ class CurrentTimePointCallGen(local: Boolean) extends CallGenerator {
generateNonNullField(returnType, time)

case TIMESTAMP_WITHOUT_TIME_ZONE if local =>
val timestamp = ctx.addReusableLocalTimestamp()
val timestamp = ctx.addReusableLocalDateTime()
generateNonNullField(returnType, timestamp)

case DATE =>
Expand All @@ -50,6 +50,7 @@ class CurrentTimePointCallGen(local: Boolean) extends CallGenerator {
generateNonNullField(returnType, time)

case TIMESTAMP_WITHOUT_TIME_ZONE =>
// TODO CURRENT_TIMESTAMP should return TIMESTAMP WITH TIME ZONE
val timestamp = ctx.addReusableTimestamp()
generateNonNullField(returnType, timestamp)
}
Expand Down

0 comments on commit 2e09d94

Please sign in to comment.