New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-12844][table-planner-blink] Use default conversion class LocalDate/LocalTime/LocalDateTime for DateType/TimeType/TimestampType in blink #8762
Conversation
This PR is rebased on #8757 |
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
flink-core/src/main/java/org/apache/flink/api/java/typeutils/TypeExtractor.java
Outdated
Show resolved
Hide resolved
ec88425
to
3dfcf7b
Compare
276bcfe
to
58d7331
Compare
...common/src/main/java/org/apache/flink/table/types/utils/LegacyTypeInfoDataTypeConverter.java
Outdated
Show resolved
Hide resolved
...table-planner-blink/src/main/scala/org/apache/flink/table/codegen/CodeGeneratorContext.scala
Show resolved
Hide resolved
...planner-blink/src/main/scala/org/apache/flink/table/sources/tsextractors/ExistingField.scala
Show resolved
Hide resolved
85bd1dd
to
c833a38
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @JingsongLi . This is a great improvement for blink planner to have a better handling on timestamps. 👍
I left some comments below.
@@ -99,6 +102,9 @@ | |||
addMapping(Types.FLOAT, DataTypes.FLOAT().bridgedTo(Float.class)); | |||
addMapping(Types.DOUBLE, DataTypes.DOUBLE().bridgedTo(Double.class)); | |||
addMapping(Types.BIG_DEC, createLegacyType(LogicalTypeRoot.DECIMAL, Types.BIG_DEC)); | |||
addMapping(LocalTimeTypeInfo.LOCAL_DATE, DataTypes.DATE().bridgedTo(LocalDate.class)); | |||
addMapping(LocalTimeTypeInfo.LOCAL_TIME, DataTypes.TIME(0).bridgedTo(LocalTime.class)); | |||
addMapping(LocalTimeTypeInfo.LOCAL_DATE_TIME, DataTypes.TIMESTAMP(3).bridgedTo(LocalDateTime.class)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LocalTimeTypeInfo => Types
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
@@ -500,7 +527,7 @@ class CodeGeneratorContext(val tableConfig: TableConfig) { | |||
* Adds a reusable TimeZone to the member area of the generated class. | |||
*/ | |||
def addReusableTimeZone(): String = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rename this method to addReusableLocalTimeZone
or addReusableDefaultTimeZone
? To have a better understanding from the method name that the time zone is not from TableConfig.getTimeZone
but is JVM default time zone.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it should read tableConfig.getTimeZone
and used by next support for Timestamp with local zone
.
addReusableLocalDateTime
should just read TimeZone.getDefault
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JingsongLi Maybe we should update TableConfig
and use ZoneId
instead of the old TimeZone
class. I haven't looked in to the details but I read:
A {@code ZoneId} is used to identify the rules used to convert between an {@link Instant} and a {@link LocalDateTime}.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@twalthr good suggestion, +1, ZoneId
for java 8 classes.
TimeZone
is an outdated class and troublesome.
@@ -302,7 +302,7 @@ class StreamExecGroupWindowAggregate( | |||
val builder = WindowOperatorBuilder | |||
.builder() | |||
.withInputFields(inputFields.toArray) | |||
val timeZoneOffset = -config.getTimeZone.getOffset(Calendar.ZONE_OFFSET) | |||
val timeZoneOffset = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you also remove the following constructor parameter for time zone? I think they are not needed anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, ok
@@ -399,6 +400,7 @@ class TemporalTypesTest extends ExpressionTestBase { | |||
) | |||
} | |||
|
|||
@Ignore // TODO support timestamp with local time zone |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will we support this before 1.9 release? Is that mean, the timeZone option in TableConfig
will not work for blink planner?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In next PR to support timestamp with local zone.
@@ -160,24 +160,16 @@ public TimeZone getNewInstance(String tz) { | |||
* type used for UDF parameters ({@link java.sql.Timestamp}). | |||
* | |||
* <p>The internal long represents the time milliseconds since January 1, 1970, 00:00:00 GMT. | |||
* So we don't need to take TimeZone into account. | |||
* we need a TimeZone. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove this comment or add more details why we need time zone explicitly ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll change it to calcite's implementation.
@@ -232,24 +237,43 @@ object BuiltInMethods { | |||
val NOW_OFFSET = Types.lookupMethod( | |||
classOf[SqlDateTimeUtils], "now", classOf[Long]) | |||
|
|||
val DATE_FORMAT_STRING_STRING_STRING = Types.lookupMethod( | |||
val DATE_FORMAT_STRING_STRING_STRING_TIME_ZONE = Types.lookupMethod( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can remove all the TIME_ZONE suffix methods in this file. They are not used anymore.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think them will be used in next PR to support timestamp with local zone.
@@ -128,8 +135,21 @@ | |||
* lost its specific Java format. Only DataType retains all its | |||
* Java format information. | |||
*/ | |||
@SuppressWarnings("unchecked") | |||
@Deprecated |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add a javadoc to declare which method should we use instead.
And we should replace all the places where this method is used with the new method?
* @param context context for converter. | ||
*/ | ||
@SuppressWarnings("unchecked") | ||
public static DataFormatConverter getConverterForDataType(DataType originDataType, Context context) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we pass a TimeZone
instead of Context
? Will we add more parameters in the future?
To me, the way to construct a context is verbose.
public static DataFormatConverter getConverterForDataType(DataType originDataType) { | ||
return getConverterForDataType(originDataType, new Context(DEFAULT_TIME_ZONE)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we use TableConfig.getTimeZone
as the conversion time zone instead of local time zone in flink framework?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After offline discussion with @wuchong , java.sql.Timestamp
is a special format, valueOf
and toString
in sql.Timestamp
will read the TimeZone.getDefault
, So there, we should just use TimeZone.getDefault
to correct valueOf
and toString
.
@@ -23,7 +23,7 @@ import org.apache.flink.api.java.typeutils.{MapTypeInfo, MultisetTypeInfo, Objec | |||
import org.apache.flink.table.typeutils.TimeIntervalTypeInfo | |||
import org.apache.flink.types.Row | |||
|
|||
import _root_.java.{lang, math, sql, util} | |||
import _root_.java.{lang, math, sql, time, util} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this file can be deleted
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I will delete it as first commit.
@wuchong @KurtYoung Updated, PTAL |
…Date/LocalTime/LocalDateTime for DateType/TimeType/TimestampType in blink
What is the purpose of the change
Now we still use java.sql.Timestamp to be default conversion class for TimestampType.
We should use design of new Type System to use java.time.Local***.
NOTE, This may affect user behaviour:
eg: UDF with eval(Object o), now will pass a java.time.Local*** instead of java.sql..
Compatibility method: use DataType.bridgedTo(java.sql..class);
Verifying this change
ut
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: (no)Documentation