Skip to content

Commit

Permalink
Modify all test to LocalTime and modify functions to not use Time Zon…
Browse files Browse the repository at this point in the history
…e and support use java.sql.Time
  • Loading branch information
JingsongLi committed Jun 23, 2019
1 parent 156e150 commit ec88425
Show file tree
Hide file tree
Showing 42 changed files with 836 additions and 465 deletions.
Expand Up @@ -129,6 +129,27 @@ public static TypeInformation<java.sql.Timestamp> SQL_TIMESTAMP() {
return org.apache.flink.api.common.typeinfo.Types.SQL_TIMESTAMP;
}

/**
* Returns type information for a Table API LocalDate type.
*/
public static TypeInformation<java.time.LocalDate> LOCAL_DATE() {
return org.apache.flink.api.common.typeinfo.Types.LOCAL_DATE;
}

/**
* Returns type information for a Table API LocalTime type.
*/
public static TypeInformation<java.time.LocalTime> LOCAL_TIME() {
return org.apache.flink.api.common.typeinfo.Types.LOCAL_TIME;
}

/**
* Returns type information for a Table API LocalDateTime type.
*/
public static TypeInformation<java.time.LocalDateTime> LOCAL_DATE_TIME() {
return org.apache.flink.api.common.typeinfo.Types.LOCAL_DATE_TIME;
}

/**
* Returns type information for a Table API interval of months.
*/
Expand Down
Expand Up @@ -23,7 +23,7 @@ import org.apache.flink.api.java.typeutils.{MapTypeInfo, MultisetTypeInfo, Objec
import org.apache.flink.table.typeutils.TimeIntervalTypeInfo
import org.apache.flink.types.Row

import _root_.java.{lang, math, sql, util}
import _root_.java.{lang, math, sql, time, util}

import _root_.scala.annotation.varargs

Expand Down Expand Up @@ -92,6 +92,22 @@ object Types {
*/
val SQL_TIMESTAMP: TypeInformation[sql.Timestamp] = JTypes.SQL_TIMESTAMP

/**
* Returns type information for a Table API LocalDate type.
*/
val LOCAL_DATE: TypeInformation[time.LocalDate] = JTypes.LOCAL_DATE

/**
* Returns type information for a Table API LocalTime type.
*/
val LOCAL_TIME: TypeInformation[time.LocalTime] = JTypes.LOCAL_TIME

/**
* Returns type information for a Table API LocalDateTime type.
*/
val LOCAL_DATE_TIME: TypeInformation[time.LocalDateTime] = JTypes.LOCAL_DATE_TIME


/**
* Returns type information for a Table API interval of months.
*/
Expand Down
Expand Up @@ -24,7 +24,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer
import org.apache.flink.table.api.TableConfig
import org.apache.flink.table.codegen.CodeGenUtils._
import org.apache.flink.table.codegen.GenerateUtils.generateRecordStatement
import org.apache.flink.table.dataformat.GenericRow
import org.apache.flink.table.dataformat.{DataFormatConverters, GenericRow}
import org.apache.flink.table.functions.{FunctionContext, UserDefinedFunction}
import org.apache.flink.table.runtime.TableStreamOperator
import org.apache.flink.table.runtime.util.collections._
Expand Down Expand Up @@ -433,25 +433,45 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
}

/**
* Adds a reusable local timestamp to the beginning of the SAM of the generated class.
* Adds a reusable time to the beginning of the SAM of the generated [[Function]].
*/
def addReusableLocalTimestamp(): String = {
addReusableTimestamp()
def addReusableTime(): String = {
val fieldTerm = s"time"

val timestamp = addReusableTimestamp()

// declaration
reusableMemberStatements.add(s"private int $fieldTerm;")

// assignment
// adopted from org.apache.calcite.runtime.SqlFunctions.currentTime()
val field =
s"""
|$fieldTerm = (int) ($timestamp % ${DateTimeUtils.MILLIS_PER_DAY});
|if (time < 0) {
| time += ${DateTimeUtils.MILLIS_PER_DAY};
|}
|""".stripMargin
reusablePerRecordStatements.add(field)
fieldTerm
}

/**
* Adds a reusable time to the beginning of the SAM of the generated class.
* Adds a reusable local date time to the beginning of the SAM of the generated class.
*/
def addReusableTime(): String = {
val fieldTerm = s"time"
def addReusableLocalDateTime(): String = {
val fieldTerm = s"localtimestamp"

val timeZone = addReusableTimeZone()
val timestamp = addReusableTimestamp()
// adopted from org.apache.calcite.runtime.SqlFunctions.currentTime()

// declaration
reusableMemberStatements.add(s"private long $fieldTerm;")

// assignment
val field =
s"""
|final int $fieldTerm = (int) ($timestamp % ${DateTimeUtils.MILLIS_PER_DAY});
|if (time < 0) {
| time += ${DateTimeUtils.MILLIS_PER_DAY};
|}
|$fieldTerm = $timestamp + $timeZone.getOffset($timestamp);
|""".stripMargin
reusablePerRecordStatements.add(field)
fieldTerm
Expand All @@ -462,14 +482,18 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
*/
def addReusableLocalTime(): String = {
val fieldTerm = s"localtime"
val timeZone = addReusableTimeZone()
val localtimestamp = addReusableLocalTimestamp()

val localtimestamp = addReusableLocalDateTime()

// declaration
reusableMemberStatements.add(s"private int $fieldTerm;")

// assignment
// adopted from org.apache.calcite.runtime.SqlFunctions.localTime()
val field =
s"""
|final int $fieldTerm = (int) ( ($localtimestamp + $timeZone.getOffset($localtimestamp))
| % ${DateTimeUtils.MILLIS_PER_DAY});
|""".stripMargin
s"""
|$fieldTerm = (int) ($localtimestamp % ${DateTimeUtils.MILLIS_PER_DAY});
|""".stripMargin
reusablePerRecordStatements.add(field)
fieldTerm
}
Expand All @@ -479,19 +503,22 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
*/
def addReusableDate(): String = {
val fieldTerm = s"date"

val timestamp = addReusableTimestamp()
val time = addReusableTime()
val timeZone = addReusableTimeZone()

// declaration
reusableMemberStatements.add(s"private int $fieldTerm;")

// assignment
// adopted from org.apache.calcite.runtime.SqlFunctions.currentDate()
val field =
s"""
|final int $fieldTerm = (int) (($timestamp + $timeZone.getOffset($timestamp))
| / ${DateTimeUtils.MILLIS_PER_DAY});
|if ($time < 0) {
| $fieldTerm -= 1;
|}
|""".stripMargin
s"""
|$fieldTerm = (int) ($timestamp / ${DateTimeUtils.MILLIS_PER_DAY});
|if ($time < 0) {
| $fieldTerm -= 1;
|}
|""".stripMargin
reusablePerRecordStatements.add(field)
fieldTerm
}
Expand All @@ -500,7 +527,7 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
* Adds a reusable TimeZone to the member area of the generated class.
*/
def addReusableTimeZone(): String = {
val zoneID = tableConfig.getTimeZone.getID
val zoneID = DataFormatConverters.DEFAULT_TIME_ZONE.getID
val stmt =
s"""private static final java.util.TimeZone $DEFAULT_TIMEZONE_TERM =
| java.util.TimeZone.getTimeZone("$zoneID");""".stripMargin
Expand Down
Expand Up @@ -369,14 +369,8 @@ object GenerateUtils {
generateNonNullLiteral(literalType, literalValue.toString, literalValue)

case TIMESTAMP_WITHOUT_TIME_ZONE =>
// Hack
// Currently, in RexLiteral/SqlLiteral(Calcite), TimestampString has no time zone.
// TimeString, DateString TimestampString are treated as UTC time/(unix time)
// when they are converted/formatted/validated
// Here, we adjust millis before Calcite solve TimeZone perfectly
val millis = literalValue.asInstanceOf[Long]
val adjustedValue = millis - ctx.tableConfig.getTimeZone.getOffset(millis)
generateNonNullLiteral(literalType, adjustedValue.toString + "L", adjustedValue)
generateNonNullLiteral(literalType, millis + "L", millis)

case INTERVAL_YEAR_MONTH =>
val decimal = BigDecimal(literalValue.asInstanceOf[JBigDecimal])
Expand Down
Expand Up @@ -217,6 +217,11 @@ object BuiltInMethods {
classOf[Int])

val TIMESTAMP_TO_STRING = Types.lookupMethod(
classOf[SqlDateTimeUtils],
"timestampToString",
classOf[Long], classOf[Int])

val TIMESTAMP_TO_STRING_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils],
"timestampToString",
classOf[Long], classOf[Int], classOf[TimeZone])
Expand All @@ -232,24 +237,43 @@ object BuiltInMethods {
val NOW_OFFSET = Types.lookupMethod(
classOf[SqlDateTimeUtils], "now", classOf[Long])

val DATE_FORMAT_STRING_STRING_STRING = Types.lookupMethod(
val DATE_FORMAT_STRING_STRING_STRING_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateFormat", classOf[String],
classOf[String], classOf[String], classOf[TimeZone])

val DATE_FORMAT_STIRNG_STRING = Types.lookupMethod(
val DATE_FORMAT_STIRNG_STRING_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateFormat", classOf[String], classOf[String], classOf[TimeZone])

val DATE_FORMAT_LONG_STRING = Types.lookupMethod(
val DATE_FORMAT_LONG_STRING_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateFormat", classOf[Long], classOf[String], classOf[TimeZone])

val DATE_FORMAT_STRING_STRING_STRING = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateFormat", classOf[String],
classOf[String], classOf[String])

val DATE_FORMAT_STIRNG_STRING = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateFormat", classOf[String], classOf[String])

val DATE_FORMAT_LONG_STRING = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateFormat", classOf[Long], classOf[String])

val UNIX_TIMESTAMP_FORMAT = Types.lookupMethod(
classOf[SqlDateTimeUtils],
"unixTimestamp",
classOf[String],
classOf[String])

val UNIX_TIMESTAMP_FORMAT_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils],
"unixTimestamp",
classOf[String],
classOf[String],
classOf[TimeZone])

val UNIX_TIMESTAMP_STR = Types.lookupMethod(
classOf[SqlDateTimeUtils], "unixTimestamp", classOf[String])

val UNIX_TIMESTAMP_STR_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "unixTimestamp", classOf[String], classOf[TimeZone])

val UNIX_TIMESTAMP = Types.lookupMethod(
Expand All @@ -259,41 +283,77 @@ object BuiltInMethods {
classOf[SqlDateTimeUtils], "unixTimestamp", classOf[Long])

val FROM_UNIXTIME_FORMAT = Types.lookupMethod(
classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Long], classOf[String], classOf[TimeZone])
classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Long], classOf[String])

val FROM_UNIXTIME = Types.lookupMethod(
classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Long], classOf[TimeZone])
classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Long])

val FROM_UNIXTIME_AS_DOUBLE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Double], classOf[TimeZone])
classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Double])

val FROM_UNIXTIME_AS_DECIMAL = Types.lookupMethod(
classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Decimal])

val FROM_UNIXTIME_FORMAT_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Long], classOf[String], classOf[TimeZone])

val FROM_UNIXTIME_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Long], classOf[TimeZone])

val FROM_UNIXTIME_AS_DOUBLE_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Double], classOf[TimeZone])

val FROM_UNIXTIME_AS_DECIMAL_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "fromUnixtime", classOf[Decimal], classOf[TimeZone])

val DATEDIFF_T_S = Types.lookupMethod(
val DATEDIFF_T_S_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateDiff", classOf[Long], classOf[String], classOf[TimeZone])

val DATEDIFF_S_S = Types.lookupMethod(
val DATEDIFF_S_S_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateDiff", classOf[String], classOf[String], classOf[TimeZone])

val DATEDIFF_S_T = Types.lookupMethod(
val DATEDIFF_S_T_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateDiff", classOf[String], classOf[Long], classOf[TimeZone])

val DATEDIFF_T_T = Types.lookupMethod(
val DATEDIFF_T_T_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateDiff", classOf[Long], classOf[Long], classOf[TimeZone])

val DATE_SUB_S = Types.lookupMethod(
val DATEDIFF_T_S = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateDiff", classOf[Long], classOf[String])

val DATEDIFF_S_S = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateDiff", classOf[String], classOf[String])

val DATEDIFF_S_T = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateDiff", classOf[String], classOf[Long])

val DATEDIFF_T_T = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateDiff", classOf[Long], classOf[Long])

val DATE_SUB_S_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateSub", classOf[String], classOf[Int], classOf[TimeZone])

val DATE_SUB_T = Types.lookupMethod(
val DATE_SUB_T_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateSub", classOf[Long], classOf[Int], classOf[TimeZone])

val DATE_ADD_S = Types.lookupMethod(
val DATE_SUB_S = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateSub", classOf[String], classOf[Int])

val DATE_SUB_T = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateSub", classOf[Long], classOf[Int])

val DATE_ADD_S_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateAdd", classOf[String], classOf[Int], classOf[TimeZone])

val DATE_ADD_T = Types.lookupMethod(
val DATE_ADD_T_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateAdd", classOf[Long], classOf[Int], classOf[TimeZone])

val DATE_ADD_S = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateAdd", classOf[String], classOf[Int])

val DATE_ADD_T = Types.lookupMethod(
classOf[SqlDateTimeUtils], "dateAdd", classOf[Long], classOf[Int])

val INT_TO_DATE = Types.lookupMethod(
classOf[SqlDateTimeUtils],
"toDate",
Expand All @@ -317,9 +377,19 @@ object BuiltInMethods {
val STRING_TO_TIMESTAMP = Types.lookupMethod(
classOf[SqlDateTimeUtils],
"toTimestamp",
classOf[String], classOf[TimeZone])
classOf[String])

val STRING_TO_TIMESTAMP_WITH_FORMAT = Types.lookupMethod(
classOf[SqlDateTimeUtils],
"toTimestamp",
classOf[String], classOf[String])

val STRING_TO_TIMESTAMP_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils],
"toTimestamp",
classOf[String], classOf[TimeZone])

val STRING_TO_TIMESTAMP_WITH_FORMAT_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils],
"toTimestamp",
classOf[String], classOf[String], classOf[TimeZone])
Expand All @@ -329,7 +399,7 @@ object BuiltInMethods {
"fromTimestamp",
classOf[Long])

val EXTRACT_FROM_TIMESTAMP = Types.lookupMethod(
val EXTRACT_FROM_TIMESTAMP_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils],
"extractFromTimestamp",
classOf[TimeUnitRange], classOf[Long], classOf[TimeZone])
Expand All @@ -349,12 +419,12 @@ object BuiltInMethods {
"extractYearMonth",
classOf[TimeUnitRange], classOf[Int])

val TIMESTAMP_FLOOR = Types.lookupMethod(
val TIMESTAMP_FLOOR_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils],
"timestampFloor",
classOf[TimeUnitRange], classOf[Long], classOf[TimeZone])

val TIMESTAMP_CEIL = Types.lookupMethod(
val TIMESTAMP_CEIL_TIME_ZONE = Types.lookupMethod(
classOf[SqlDateTimeUtils],
"timestampCeil",
classOf[TimeUnitRange], classOf[Long], classOf[TimeZone])
Expand Down

0 comments on commit ec88425

Please sign in to comment.