Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
f0192dc
[FLINK-14599][table-planner-blink] Add setTimestamp/getTimestamp inte…
docete Nov 5, 2019
90df5e4
[FLINK-14599][table-planner-blink] Use SqlTimestamp as internal repre…
docete Nov 5, 2019
22fa480
[FLINK-14599][table-planner-blink] Use SqlTimestamp in CodeGenUtils
docete Nov 5, 2019
88082bb
[FLINK-14599][table-planner-blink] Fix extract() built-in function fo…
docete Nov 5, 2019
e6d75bc
[FLINK-14599][table-planner-blink] Fix casting from Timestamp to Stri…
docete Nov 5, 2019
0ec0b46
[FLINK-14599][table-planner-blink] Fix floor()/ceil() built-in functi…
docete Nov 5, 2019
d44a523
[FLINK-14599][table-planner-blink] Fix time interval arithmetic for T…
docete Nov 5, 2019
a7ff4db
[FLINK-14599][table-planner-blink] Fix cast/reinterpret/compare built…
docete Nov 6, 2019
7cc6cf4
[FLINK-14599][table-planner-blink] Fix Timestamp literal and fixup ca…
docete Nov 6, 2019
886eac4
[FLINK-14599][table-planner-blink] Fix count distinct on timestamp fi…
docete Nov 6, 2019
a207224
[FLINK-14599][table-planner-blink] Fix timestampdiff built-in functio…
docete Nov 6, 2019
77d1789
[FLINK-14599][table-planner-blink] Fix toTimestamp built-in function
docete Nov 6, 2019
4a83ff6
[FLINK-14599][table-planner-blink] Fix current_timestamp built-in fun…
docete Nov 6, 2019
1e4b5f4
[FLINK-14599][table-planner-blink] Fix udx which use long as Timestam…
docete Nov 6, 2019
f79164e
[FLINK-14599][table-planner-blink] Fix LookupJoinTest::testInvalidLoo…
docete Nov 6, 2019
e76461e
[FLINK-14599][table-planner-blink] Fix comparator of Timestamp type
docete Nov 6, 2019
89e2ab2
[FLINK-14599][table-planner-blink] Fix ROWTIME/PROCTIME and window pr…
docete Nov 6, 2019
d3b1bc9
[FLINK-14599][table-planner-blink] Fix PROCTIME in match recognize
docete Nov 6, 2019
287c4a9
[FLINK-14599][table-planner-blink] Introduce LegacyTimestampTypeInfo …
docete Nov 6, 2019
52cad96
[FLINK-14599][table-planner-blink] Support precision of Timestamp typ…
docete Nov 6, 2019
11222cd
[FLINK-14599][table-planner-blink] Support precision of Timestamp typ…
docete Nov 8, 2019
8860597
fixup: address comments
docete Nov 8, 2019
dfeca05
fixup: fix checkstyle
docete Nov 9, 2019
d17efb5
[FLINK-14599][table-planner-blink] Make the default precision of Time…
docete Nov 11, 2019
c3412c4
[FLINK-14599][table-planner-blink] Make the interpreted string of Tim…
docete Nov 12, 2019
6b82048
fixup: ignore Gregorian shift
docete Nov 12, 2019
aab2160
fixup: address comments
docete Nov 13, 2019
ee2854a
fixup: store nanoOfMillisecond in fixed-length part to save 4 bytes p…
docete Nov 13, 2019
212e234
fixup: address comments
docete Nov 13, 2019
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ public final class TimestampType extends LogicalType {

private static final Set<String> INPUT_OUTPUT_CONVERSION = conversionSet(
java.sql.Timestamp.class.getName(),
java.time.LocalDateTime.class.getName());
java.time.LocalDateTime.class.getName(),
"org.apache.flink.table.dataformat.SqlTimestamp");

private static final Class<?> DEFAULT_CONVERSION = java.time.LocalDateTime.class;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.flink.table.planner.expressions.converter.CallExpressionConvertRule.ConvertContext;
import org.apache.flink.table.types.logical.DecimalType;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.TimestampType;

import org.apache.calcite.avatica.util.DateTimeUtils;
import org.apache.calcite.avatica.util.TimeUnit;
Expand All @@ -54,6 +55,8 @@
import org.apache.calcite.util.TimestampWithTimeZoneString;

import java.math.BigDecimal;
import java.sql.Timestamp;
import java.time.LocalDateTime;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Date;
Expand Down Expand Up @@ -135,8 +138,26 @@ public RexNode visit(ValueLiteralExpression valueLiteral) {
return relBuilder.getRexBuilder().makeTimeLiteral(TimeString.fromCalendarFields(
valueAsCalendar(extractValue(valueLiteral, java.sql.Time.class))), 0);
case TIMESTAMP_WITHOUT_TIME_ZONE:
return relBuilder.getRexBuilder().makeTimestampLiteral(TimestampString.fromCalendarFields(
valueAsCalendar(extractValue(valueLiteral, java.sql.Timestamp.class))), 3);
TimestampType timestampType = (TimestampType) type;
Class<?> clazz = valueLiteral.getOutputDataType().getConversionClass();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • First, these logical should in extractValue.
  • Second, we should just use valueLiteral.getValueAs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can just use valueLiteral.getValueAs(LocalDateTime.class).

LocalDateTime datetime = null;
if (clazz == LocalDateTime.class) {
datetime = valueLiteral.getValueAs(LocalDateTime.class)
.orElseThrow(() -> new TableException("Invalid literal."));
} else if (clazz == Timestamp.class) {
datetime = valueLiteral.getValueAs(Timestamp.class)
.orElseThrow(() -> new TableException("Invalid literal.")).toLocalDateTime();
} else {
throw new TableException(String.format("Invalid literal of %s.", clazz.getCanonicalName()));
}
return relBuilder.getRexBuilder().makeTimestampLiteral(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could have util in SqlDateTimeUtils or other class: toTimestampString(LocalDateTime)

new TimestampString(
datetime.getYear(),
datetime.getMonthValue(),
datetime.getDayOfMonth(),
datetime.getHour(),
datetime.getMinute(),
datetime.getSecond()).withNanos(datetime.getNano()), timestampType.getPrecision());
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
TimeZone timeZone = TimeZone.getTimeZone(((FlinkContext) ((FlinkRelBuilder) this.relBuilder)
.getCluster().getPlanner().getContext()).getTableConfig().getLocalTimeZone());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ public SqlMonotonicity getMonotonicity(SqlOperatorBinding call) {
public static final SqlFunction TO_TIMESTAMP = new SqlFunction(
"TO_TIMESTAMP",
SqlKind.OTHER_FUNCTION,
ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.TIMESTAMP), SqlTypeTransforms.FORCE_NULLABLE),
ReturnTypes.cascade(ReturnTypes.explicit(SqlTypeName.TIMESTAMP, 3), SqlTypeTransforms.FORCE_NULLABLE),
null,
OperandTypes.or(
OperandTypes.family(SqlTypeFamily.CHARACTER),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public ProctimeMaterializeSqlFunction() {
"PROCTIME_MATERIALIZE",
SqlKind.OTHER_FUNCTION,
ReturnTypes.cascade(
ReturnTypes.explicit(SqlTypeName.TIMESTAMP),
ReturnTypes.explicit(SqlTypeName.TIMESTAMP, 3),
SqlTypeTransforms.TO_NULLABLE),
InferTypes.RETURN_TYPE,
OperandTypes.family(SqlTypeFamily.TIMESTAMP),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class FlinkTypeFactory(typeSystem: RelDataTypeSystem) extends JavaTypeFactoryImp
timestampType.getKind match {
case TimestampKind.PROCTIME => createProctimeIndicatorType(true)
case TimestampKind.ROWTIME => createRowtimeIndicatorType(true)
case TimestampKind.REGULAR => createSqlType(TIMESTAMP)
case TimestampKind.REGULAR => createSqlType(TIMESTAMP, timestampType.getPrecision)
}
case _ =>
seenTypes.get(t) match {
Expand Down Expand Up @@ -424,11 +424,12 @@ object FlinkTypeFactory {
// blink runner support precision 3, but for consistent with flink runner, we set to 0.
new TimeType()
case TIMESTAMP =>
if (relDataType.getPrecision > 3) {
val precision = relDataType.getPrecision
if (precision > TimestampType.MAX_PRECISION || precision < TimestampType.MIN_PRECISION) {
throw new TableException(
s"TIMESTAMP precision is not supported: ${relDataType.getPrecision}")
s"TIMESTAMP precision is not supported: $precision")
}
new TimestampType(3)
new TimestampType(precision)
case TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
if (relDataType.getPrecision > 3) {
throw new TableException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
package org.apache.flink.table.planner.calcite

import org.apache.flink.table.runtime.typeutils.TypeCheckUtils
import org.apache.flink.table.types.logical.{DecimalType, LogicalType}

import org.apache.flink.table.types.logical.{DecimalType, LogicalType, TimestampType}
import org.apache.calcite.rel.`type`.{RelDataType, RelDataTypeFactory, RelDataTypeSystemImpl}
import org.apache.calcite.sql.`type`.{SqlTypeName, SqlTypeUtil}

Expand All @@ -41,8 +40,12 @@ class FlinkTypeSystem extends RelDataTypeSystemImpl {
case SqlTypeName.VARCHAR | SqlTypeName.VARBINARY =>
Int.MaxValue

// by default we support timestamp with microseconds precision
case SqlTypeName.TIMESTAMP =>
TimestampType.DEFAULT_PRECISION

// we currently support only timestamps with milliseconds precision
case SqlTypeName.TIMESTAMP | SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
case SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
3

case _ =>
Expand All @@ -53,6 +56,10 @@ class FlinkTypeSystem extends RelDataTypeSystemImpl {
case SqlTypeName.VARCHAR | SqlTypeName.CHAR | SqlTypeName.VARBINARY | SqlTypeName.BINARY =>
Int.MaxValue

// The maximal precision of TIMESTAMP is 3, change it to 9 to support nanoseconds precision
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is this "maximal precision of TIMESTAMP is 3" mean? Is that a typo?

case SqlTypeName.TIMESTAMP =>
TimestampType.MAX_PRECISION

case _ =>
super.getMaxPrecision(typeName)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,8 @@ object CodeGenUtils {

val STRING_UTIL: String = className[BinaryStringUtil]

val SQL_TIMESTAMP: String = className[SqlTimestamp]

// ----------------------------------------------------------------------------------------

private val nameCounter = new AtomicInteger
Expand Down Expand Up @@ -133,7 +135,7 @@ object CodeGenUtils {

case DATE => "int"
case TIME_WITHOUT_TIME_ZONE => "int"
case TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_WITH_LOCAL_TIME_ZONE => "long"
case TIMESTAMP_WITH_LOCAL_TIME_ZONE => "long"
case INTERVAL_YEAR_MONTH => "int"
case INTERVAL_DAY_TIME => "long"

Expand All @@ -151,7 +153,7 @@ object CodeGenUtils {

case DATE => className[JInt]
case TIME_WITHOUT_TIME_ZONE => className[JInt]
case TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_WITH_LOCAL_TIME_ZONE => className[JLong]
case TIMESTAMP_WITH_LOCAL_TIME_ZONE => className[JLong]
case INTERVAL_YEAR_MONTH => className[JInt]
case INTERVAL_DAY_TIME => className[JLong]

Expand All @@ -162,6 +164,7 @@ object CodeGenUtils {
case ARRAY => className[BaseArray]
case MULTISET | MAP => className[BaseMap]
case ROW => className[BaseRow]
case TIMESTAMP_WITHOUT_TIME_ZONE => className[SqlTimestamp]

case ANY => className[BinaryGeneric[_]]
}
Expand Down Expand Up @@ -190,7 +193,7 @@ object CodeGenUtils {
case VARCHAR | CHAR => s"$BINARY_STRING.EMPTY_UTF8"

case DATE | TIME_WITHOUT_TIME_ZONE => "-1"
case TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_WITH_LOCAL_TIME_ZONE => "-1L"
case TIMESTAMP_WITH_LOCAL_TIME_ZONE => "-1L"
case INTERVAL_YEAR_MONTH => "-1"
case INTERVAL_DAY_TIME => "-1L"

Expand Down Expand Up @@ -223,7 +226,8 @@ object CodeGenUtils {
case DECIMAL => s"$term.hashCode()"
case DATE => s"${className[JInt]}.hashCode($term)"
case TIME_WITHOUT_TIME_ZONE => s"${className[JInt]}.hashCode($term)"
case TIMESTAMP_WITHOUT_TIME_ZONE | TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
case TIMESTAMP_WITHOUT_TIME_ZONE => s"$term.hashCode()"
case TIMESTAMP_WITH_LOCAL_TIME_ZONE =>
s"${className[JLong]}.hashCode($term)"
case INTERVAL_YEAR_MONTH => s"${className[JInt]}.hashCode($term)"
case INTERVAL_DAY_TIME => s"${className[JLong]}.hashCode($term)"
Expand Down Expand Up @@ -414,8 +418,10 @@ object CodeGenUtils {
// temporal types
case DATE => s"$rowTerm.getInt($indexTerm)"
case TIME_WITHOUT_TIME_ZONE => s"$rowTerm.getInt($indexTerm)"
case TIMESTAMP_WITHOUT_TIME_ZONE |
TIMESTAMP_WITH_LOCAL_TIME_ZONE => s"$rowTerm.getLong($indexTerm)"
case TIMESTAMP_WITHOUT_TIME_ZONE =>
val dt = t.asInstanceOf[TimestampType]
s"$rowTerm.getTimestamp($indexTerm, ${dt.getPrecision})"
case TIMESTAMP_WITH_LOCAL_TIME_ZONE => s"$rowTerm.getLong($indexTerm)"
case INTERVAL_YEAR_MONTH => s"$rowTerm.getInt($indexTerm)"
case INTERVAL_DAY_TIME => s"$rowTerm.getLong($indexTerm)"

Expand Down Expand Up @@ -539,8 +545,10 @@ object CodeGenUtils {
case BOOLEAN => s"$binaryRowTerm.setBoolean($index, $fieldValTerm)"
case DATE => s"$binaryRowTerm.setInt($index, $fieldValTerm)"
case TIME_WITHOUT_TIME_ZONE => s"$binaryRowTerm.setInt($index, $fieldValTerm)"
case TIMESTAMP_WITHOUT_TIME_ZONE |
TIMESTAMP_WITH_LOCAL_TIME_ZONE => s"$binaryRowTerm.setLong($index, $fieldValTerm)"
case TIMESTAMP_WITHOUT_TIME_ZONE =>
val dt = t.asInstanceOf[TimestampType]
s"$binaryRowTerm.setTimestamp($index, $fieldValTerm, ${dt.getPrecision})"
case TIMESTAMP_WITH_LOCAL_TIME_ZONE => s"$binaryRowTerm.setLong($index, $fieldValTerm)"
case INTERVAL_YEAR_MONTH => s"$binaryRowTerm.setInt($index, $fieldValTerm)"
case INTERVAL_DAY_TIME => s"$binaryRowTerm.setLong($index, $fieldValTerm)"
case DECIMAL =>
Expand Down Expand Up @@ -568,8 +576,7 @@ object CodeGenUtils {
case BOOLEAN => s"$rowTerm.setBoolean($indexTerm, $fieldTerm)"
case DATE => s"$rowTerm.setInt($indexTerm, $fieldTerm)"
case TIME_WITHOUT_TIME_ZONE => s"$rowTerm.setInt($indexTerm, $fieldTerm)"
case TIMESTAMP_WITHOUT_TIME_ZONE |
TIMESTAMP_WITH_LOCAL_TIME_ZONE => s"$rowTerm.setLong($indexTerm, $fieldTerm)"
case TIMESTAMP_WITH_LOCAL_TIME_ZONE => s"$rowTerm.setLong($indexTerm, $fieldTerm)"
case INTERVAL_YEAR_MONTH => s"$rowTerm.setInt($indexTerm, $fieldTerm)"
case INTERVAL_DAY_TIME => s"$rowTerm.setLong($indexTerm, $fieldTerm)"
case _ => s"$rowTerm.setNonPrimitiveValue($indexTerm, $fieldTerm)"
Expand All @@ -590,8 +597,7 @@ object CodeGenUtils {
case DOUBLE => s"$arrayTerm.setNullDouble($index)"
case TIME_WITHOUT_TIME_ZONE => s"$arrayTerm.setNullInt($index)"
case DATE => s"$arrayTerm.setNullInt($index)"
case TIMESTAMP_WITHOUT_TIME_ZONE |
TIMESTAMP_WITH_LOCAL_TIME_ZONE => s"$arrayTerm.setNullLong($index)"
case TIMESTAMP_WITH_LOCAL_TIME_ZONE => s"$arrayTerm.setNullLong($index)"
case INTERVAL_YEAR_MONTH => s"$arrayTerm.setNullInt($index)"
case INTERVAL_DAY_TIME => s"$arrayTerm.setNullLong($index)"
case _ => s"$arrayTerm.setNullLong($index)"
Expand Down Expand Up @@ -640,8 +646,10 @@ object CodeGenUtils {
s"$writerTerm.writeDecimal($indexTerm, $fieldValTerm, ${dt.getPrecision})"
case DATE => s"$writerTerm.writeInt($indexTerm, $fieldValTerm)"
case TIME_WITHOUT_TIME_ZONE => s"$writerTerm.writeInt($indexTerm, $fieldValTerm)"
case TIMESTAMP_WITHOUT_TIME_ZONE |
TIMESTAMP_WITH_LOCAL_TIME_ZONE => s"$writerTerm.writeLong($indexTerm, $fieldValTerm)"
case TIMESTAMP_WITHOUT_TIME_ZONE =>
val dt = t.asInstanceOf[TimestampType]
s"$writerTerm.writeTimestamp($indexTerm, $fieldValTerm, ${dt.getPrecision})"
case TIMESTAMP_WITH_LOCAL_TIME_ZONE => s"$writerTerm.writeLong($indexTerm, $fieldValTerm)"
case INTERVAL_YEAR_MONTH => s"$writerTerm.writeInt($indexTerm, $fieldValTerm)"
case INTERVAL_DAY_TIME => s"$writerTerm.writeLong($indexTerm, $fieldValTerm)"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,8 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
val fieldTerm = s"timestamp"
val field =
s"""
|final long $fieldTerm = java.lang.System.currentTimeMillis();
|final $SQL_TIMESTAMP $fieldTerm =
| $SQL_TIMESTAMP.fromEpochMillis(java.lang.System.currentTimeMillis());
|""".stripMargin
reusablePerRecordStatements.add(field)
fieldTerm
Expand All @@ -431,7 +432,7 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
// adopted from org.apache.calcite.runtime.SqlFunctions.currentTime()
val field =
s"""
|$fieldTerm = (int) ($timestamp % ${DateTimeUtils.MILLIS_PER_DAY});
|$fieldTerm = (int) ($timestamp.getMillisecond() % ${DateTimeUtils.MILLIS_PER_DAY});
|if (time < 0) {
| time += ${DateTimeUtils.MILLIS_PER_DAY};
|}
Expand All @@ -449,12 +450,14 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
val timestamp = addReusableTimestamp()

// declaration
reusableMemberStatements.add(s"private long $fieldTerm;")
reusableMemberStatements.add(s"private $SQL_TIMESTAMP $fieldTerm;")

// assignment
val field =
s"""
|$fieldTerm = $timestamp + java.util.TimeZone.getDefault().getOffset($timestamp);
|$fieldTerm = $SQL_TIMESTAMP.fromEpochMillis(
| $timestamp.getMillisecond() +
| java.util.TimeZone.getDefault().getOffset($timestamp.getMillisecond()));
|""".stripMargin
reusablePerRecordStatements.add(field)
fieldTerm
Expand All @@ -475,7 +478,7 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
// adopted from org.apache.calcite.runtime.SqlFunctions.localTime()
val field =
s"""
|$fieldTerm = (int) ($localtimestamp % ${DateTimeUtils.MILLIS_PER_DAY});
|$fieldTerm = (int) ($localtimestamp.getMillisecond() % ${DateTimeUtils.MILLIS_PER_DAY});
|""".stripMargin
reusablePerRecordStatements.add(field)
fieldTerm
Expand All @@ -497,7 +500,7 @@ class CodeGeneratorContext(val tableConfig: TableConfig) {
// adopted from org.apache.calcite.runtime.SqlFunctions.currentDate()
val field =
s"""
|$fieldTerm = (int) ($timestamp / ${DateTimeUtils.MILLIS_PER_DAY});
|$fieldTerm = (int) ($timestamp.getMillisecond() / ${DateTimeUtils.MILLIS_PER_DAY});
|if ($time < 0) {
| $fieldTerm -= 1;
|}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ import org.apache.flink.table.runtime.typeutils.TypeCheckUtils
import org.apache.flink.table.runtime.typeutils.TypeCheckUtils.{isNumeric, isTemporal, isTimeInterval}
import org.apache.flink.table.types.logical._
import org.apache.flink.table.typeutils.TimeIndicatorTypeInfo

import org.apache.calcite.rex._
import org.apache.calcite.sql.SqlOperator
import org.apache.calcite.sql.`type`.{ReturnTypes, SqlTypeName}
import org.apache.calcite.util.TimestampString

import scala.collection.JavaConversions._

Expand Down Expand Up @@ -388,7 +388,12 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)

override def visitLiteral(literal: RexLiteral): GeneratedExpression = {
val resultType = FlinkTypeFactory.toLogicalType(literal.getType)
val value = literal.getValue3
val value = resultType.getTypeRoot match {
case LogicalTypeRoot.TIMESTAMP_WITHOUT_TIME_ZONE =>
literal.getValueAs(classOf[TimestampString])
case _ =>
literal.getValue3
}
generateLiteral(ctx, resultType, value)
}

Expand Down Expand Up @@ -730,7 +735,7 @@ class ExprCodeGenerator(ctx: CodeGeneratorContext, nullableInput: Boolean)
generateProctimeTimestamp(ctx, contextTerm)

case STREAMRECORD_TIMESTAMP =>
generateRowtimeAccess(ctx, contextTerm)
generateTimestampAccess(ctx, contextTerm)

case _: SqlThrowExceptionFunction =>
val nullValue = generateNullLiteral(resultType, nullCheck = true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,22 +23,22 @@ import org.apache.flink.configuration.Configuration
import org.apache.flink.metrics.MetricGroup
import org.apache.flink.table.api.{TableConfig, TableException}
import org.apache.flink.table.dataformat.BinaryStringUtil.safeToString
import org.apache.flink.table.dataformat.{BinaryString, Decimal, GenericRow}
import org.apache.flink.table.dataformat.{BinaryString, Decimal, GenericRow, SqlTimestamp}
import org.apache.flink.table.functions.{FunctionContext, UserDefinedFunction}
import org.apache.flink.table.planner.calcite.FlinkTypeFactory
import org.apache.flink.table.planner.codegen.FunctionCodeGenerator.generateFunction
import org.apache.flink.table.planner.plan.utils.PythonUtil.containsPythonCall
import org.apache.flink.table.runtime.functions.SqlDateTimeUtils
import org.apache.flink.table.types.logical.RowType

import org.apache.calcite.avatica.util.ByteString
import org.apache.calcite.rex.{RexBuilder, RexExecutor, RexNode}
import org.apache.calcite.sql.`type`.SqlTypeName
import org.apache.calcite.util.TimestampString
import org.apache.commons.lang3.StringEscapeUtils

import java.io.File
import java.util.TimeZone


import scala.collection.JavaConverters._
import scala.collection.mutable.ListBuffer

Expand Down Expand Up @@ -172,6 +172,19 @@ class ExpressionReducer(
}
reducedValues.add(maySkipNullLiteralReduce(rexBuilder, value, unreduced))
reducedIdx += 1
case SqlTypeName.TIMESTAMP =>
val reducedValue = reduced.getField(reducedIdx)
val value = if (reducedValue != null) {
val ts = reducedValue.asInstanceOf[SqlTimestamp]
val milliseconds = ts.getMillisecond
val nanoseconds = ts.toLocalDateTime.getNano
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just use LocalDateTime to TimestampString?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A little tricky here, take CAST('1500-04-30 00:00:00' AS TIMESTAMP(3)) as an example:
we use SqlDateTimeUtils.toTimestamp[1] to cast the string to SqlTimestamp and use ExpressionReducer[2] to make a Timestamp literal.

The [1] step considers Gregorian cutovers and again the [2] step uses TimestampString.fromMillisSinceEpoch which calls our copied DateTimeUtils.julianToString considers Gregorian cutovers too. Two wrongs make a correct result.

I will change the toTimestamp and this ExpressionReducer to ignore Gregorian cutovers in next PR and that will be the final one.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can fix it, and it is very worthwhile to fix. Otherwise it will bring a lot of strange code.

val timestampString = TimestampString.fromMillisSinceEpoch(milliseconds)
timestampString.withNanos(nanoseconds)
} else {
reducedValue
}
reducedValues.add(maySkipNullLiteralReduce(rexBuilder, value, unreduced))
reducedIdx += 1
case _ =>
val reducedValue = reduced.getField(reducedIdx)
// RexBuilder handle double literal incorrectly, convert it into BigDecimal manually
Expand Down
Loading