[GLUTEN-10247][FLINK] Support date_format function#10248
[GLUTEN-10247][FLINK] Support date_format function#10248shuai-xu merged 29 commits intoapache:mainfrom
Conversation
| Arrays.asList(Row.of(1, "2024-12-31 12:12:12"), Row.of(2, "2025-02-28 12:12:12")); | ||
| createSimpleBoundedValuesTable("dateFormatTbl", "a int, b string", rows); | ||
| String query = | ||
| "select a, DATE_FORMAT(cast(b as Timestamp(3)), 'yyyy-MM-dd') from dateFormatTbl"; |
There was a problem hiding this comment.
In Spark and Velox, date_format respects the configured session timezone. I assume Flink should also respect it.
The test here cannot expose the issue that local session timezone is not respected. The default timezone (utc) is used in cast string to timestamp. Then, the same default timezone is used for date_format. As long as the timezone is consistent, there is no issue exposed. But the actual timezone configured for Flink is not actually respected.
I think we need to pass Flink's session timezone to Velox's config, as Gluten Spark code does.
There was a problem hiding this comment.
Now I have pass the flink session timezone to velox, and add the ut to test this. please take review again. @PHILO-HE
There was a problem hiding this comment.
It seems not properly to pass the session timezone config to the velox, as in StatefulTask#initOperators will check the memory reversed, and would throw exceptions if it's not 0.
ce89ecd to
3ad14c9
Compare
| Map<String, String> configMap = new HashMap<>(); | ||
| if (config.get(ADJUST_TIMESTMP_TO_SESSION_TIMEZONE)) { | ||
| String localTimeZone = config.get(TableConfigOptions.LOCAL_TIME_ZONE); | ||
| configMap.put(keyVeloxAdjustTimestampToSessionTimeZone, "true"); |
There was a problem hiding this comment.
In Spark, we always set this to true to let velox respect configured session timezone. I assume we should also do this for Flink, not depending on the user's setting for ADJUST_TIMESTMP_TO_SESSION_TIMEZONE.
| VeloxQueryConfig.ADJUST_TIMESTMP_TO_SESSION_TIMEZONE.key(), | ||
| "true", | ||
| TableConfigOptions.LOCAL_TIME_ZONE.key(), | ||
| "America/Los_Angeles"); |
There was a problem hiding this comment.
In this designed test, the result cannot reflect the timezone is changed. As I mentioned, both cast & date_format are respecting the timezone. As long as the timezone they are using is same, the result aligns with the input date time string, even though America/Los_Angeles is not passed to velox but the default timezone is in use. So the test here cannot confirm this new timezone is passed to Velox and taking effect.
I haven't come up with a good way. Do you have any idea?
There was a problem hiding this comment.
I think we can set some illegal timezone value to velox, but which is correct to flink , e.g : default timezone. And velox would throw a exception with invalid timezone, but flink can pass it. and in contrast, we pass a valid timezone to the test, e.g : America/Los_Angeles, can get the correct result. and it will prove that the timezone is passed to velox. @PHILO-HE
There was a problem hiding this comment.
@KevinyhZou, it may not well test the behavior of date_format function on respecting timezone.
Could you confirm if the suggestion below makes sense?
-
Set timezone to UTC
CREATE TEMPRARY VIEW my_view AS SELECT CAST(b AS TIMESTAMP) as c1 FROM tbl; -
Set timezone to "America/Los_Angeles"
SELECT date_format(c1) FROM my_view;
Check the result to see if it matches the expected string (-8 hours offset, compared with the string column b). -
Set timezone to "Asia/Shanghai"
SELECT date_format(c1) FROM my_view;
Check the result to see if it matches the expected string (+8 hours offset, compared with the string column b).
There was a problem hiding this comment.
It does not work. It always return the same string while time zone set to different(UTC, Asia/Shanghai, America/Los_Angeles).
There was a problem hiding this comment.
Do you know the reason? If different timestamp is set in Velox, date_format should return different date/time string for the same timestamp input.
There was a problem hiding this comment.
The flink create view is logical view, data is not materialized, so you described above it is the same as select ... date_format(cast(xxx as timestamp), 'yyyy-MM-dd HH:mm:ss'), and this is different from spark.
And we can still test this, by insert the result of 1st step to a table instead of view, and then the 2nd and 3rd step result can be affected by the timezone, but the timestamp with timezone type is not supported, so the 1st step would cause errors, and I have tried this yesterday.
There was a problem hiding this comment.
Do you mean the inserting will create TIMESTAMP_LTZ type in the 1st step, not TIMESTAMP type?
There was a problem hiding this comment.
Yes, similar to that. cast(xxx as Timestamp(3)) will return a Timestamp vector with timezone by using the arrow's TimeStampMilliTZVector, and now we only map the TimeStampMilliVector to flink's timestamp type, but not do anything about TimeStampMilliTZVector and flink's timestamp type with timezone, like TIMESTAMP_LTZ. maybe we should open another issue for this?
There was a problem hiding this comment.
@KevinyhZou, if I understand correctly, we should ensure the result type is Flink's timestamp type for cast(xxx as timestamp). You can submit another PR to fix the issue.
I investigated Flink just now. It seems Flink's timestamp type is corresponding to Spark's timestamp_ntz type (no counterpart in Velox), and Flinks' timestamp_ltz type is corresponding to Spark/Velox's timestamp type. Please confirm this to avoid wrong type mapping.
In Flink, when casting datetime string to timestamp_ltz or outputting string represented result for timestamp_ltz, timezone is respected. But regarding timestamp type, timezone is not respected in such conversions.
Both timestamp and timestamp_ltz are supported for Flink's date_format function. So if timestamp is used, the result is not influenced by session timezone. But if timestamp_ltz is used, the result should be adjusted based on session timezone.
| return Config.create(configMap); | ||
| } | ||
|
|
||
| private static boolean isConfigUsedForTest(Configuration config) { |
There was a problem hiding this comment.
Why need to add test related code here?
| public void close() throws Exception { | ||
| inputQueue.close(); | ||
| task.close(); | ||
| if (task != null) { |
|
depends on pr: bigo-sg/velox#9 |
|
I have consider timestamp timezone in |
There was a problem hiding this comment.
Thank for your updating. The added tests are what I wanted, which ensures timezone is actually respected. I posted only few minor comments.
BTW, are there any other code changes required to map Flink's ? Could you verify the query is offloaded to Gluten with this PR? Thanks.timestampLtz type to Velox's timestamp type
I note the type mapping already exists. It would be better to add some comments to clarify.
https://github.com/apache/incubator-gluten/blob/4d2a200c45d6a8849124d6eed4d54f1ae48568fd/gluten-flink/runtime/src/main/java/org/apache/gluten/util/LogicalTypeConverter.java#L109-L111
gluten-flink/runtime/src/main/java/org/apache/gluten/table/runtime/config/VeloxQueryConfig.java
Show resolved
Hide resolved
| Arrays.asList( | ||
| Row.of(1, LocalDateTime.parse("2024-12-31 12:12:12", formatter)), | ||
| Row.of(2, LocalDateTime.parse("2025-02-28 12:12:12", formatter))); | ||
| createSimpleBoundedValuesTable("dateFormatTbl", "a int, b Timestamp(3)", rows); |
There was a problem hiding this comment.
Nit:
Use descriptive table name: timestampTable,
| Row.of( | ||
| 2, | ||
| LocalDateTime.parse("2025-02-28 12:12:12", formatter).toInstant(ZoneOffset.UTC))); | ||
| createSimpleBoundedValuesTable("dateFormatTblLTZ", "a int, b Timestamp_LTZ(3)", rows); |
There was a problem hiding this comment.
dateFormatTblLTZ->timestampLtzTable
| 2, | ||
| LocalDateTime.parse("2025-02-28 12:12:12", formatter).toInstant(ZoneOffset.UTC))); | ||
| createSimpleBoundedValuesTable("dateFormatTblLTZ", "a int, b Timestamp_LTZ(3)", rows); | ||
| String query1 = |
There was a problem hiding this comment.
Nit: maybe, just use "query = " to overwrite the previous value.
...link/ut/src/test/java/org/apache/gluten/table/runtime/stream/custom/ScalarFunctionsTest.java
Show resolved
Hide resolved
| "+I[1, 2024-12-31, 2024-12-31 12:12:12]", "+I[2, 2025-02-28, 2025-02-28 12:12:12]")); | ||
|
|
||
| Map<String, String> configs = new HashMap<>(); | ||
| configs.put(TableConfigOptions.LOCAL_TIME_ZONE.key(), "America/Los_Angeles"); |
There was a problem hiding this comment.
I note in Flink, the following code can be called to set timezone. It may be better to directly call such API to do the setting before the test. Then, no need to do the setting inside runAndCheck.
tEnv.getConfig().setLocalTimeZone(ZoneId.of("Asia/Shanghai"));|
I think the failure reason is the pr: bigo-sg/velox#9 that not merged. But from the ci log, it seems to be was blocked for 4 hours, and then was cancelled. @PHILO-HE . Please help to reiew:bigo-sg/velox#9 @shuai-xu , and merge it first if it's no problems. |
|
@KevinyhZou, you can temporarily change the CI code to use your Velox branch for testing, then to see if your patch can fix this failure. Another question. |
Timestamp with timezone would be specified with |
f36f78a to
0bd0d77
Compare
@KevinyhZou, please add the below command in flink.yml to install a new version of tzdata. We have merged some patch in upstream Velox to remove the dependency on new tzdata, and we've also removed the installation for new tzdata from docker image. I assume an old Velox version is used by Gluten Flink. So before the upgrade, we have to install new tzdata in runtime environment (not only the requirement for CI), as long as timezone is used in Velox. |
@KevinyhZou, would you mind clarifying how this works if both Flink's timestamp and timestampLTZ are used in one session, even in one query? |
|
Both |
|
@KevinyhZou, thanks for the clarification. My understanding is that TimestampLTZ data is internally represented as a UTC timestamp, even though it is associated with a specific local timezone, which is different from Timestamp data. |
PHILO-HE
left a comment
There was a problem hiding this comment.
Looks good. Thanks for your continuous efforts. Please revert the changes for Velox repo in flink.yml.
|
done |
|
@shuai-xu, do you have any comments? |
| memoryManager = MemoryManager.create(AllocationListener.NOOP); | ||
| session = Velox4j.newSession(memoryManager); | ||
| query = new Query(glutenPlan, Config.empty(), ConnectorConfig.empty()); | ||
| query = |
There was a problem hiding this comment.
It seems missing the GlutenVectorOneInputOperator
|
@KevinyhZou, could you check if the CI failure is related? |
|
velox4j has not been updated yet,and we will re-run the ci after it update. @PHILO-HE |
What changes were proposed in this pull request?
(Please fill in changes proposed in this fix)
(Fixes: #10247)
How was this patch tested?
UT