Skip to content

Allow original partition column value to be retrieved when using TimestampBasedKeyGen #14991

@hudi-bot

Description

@hudi-bot

{color:#172b4d}Currently, b/c Spark by default omits partition values from the data files (instead encoding them into partition paths for partitioned tables), using TimestampBasedKeyGenerator w/ original timestamp based-column makes it impossible to retrieve the original value (reading from Spark) even though it's persisted in the data file as well.{color}

 
{code:java}
import org.apache.hudi.DataSourceWriteOptions
import org.apache.hudi.config.HoodieWriteConfig
import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
import org.apache.hudi.hive.MultiPartKeysValueExtractor

val df = Seq((1, "z3", 30, "v1", "2018-09-23"), (2, "z3", 35, "v1", "2018-09-24")).toDF("id", "name", "age", "ts", "data_date")

// mor
df.write.format("hudi").
option(HoodieWriteConfig.TABLE_NAME, "issue_4417_mor").
option("hoodie.datasource.write.table.type", "MERGE_ON_READ").
option("hoodie.datasource.write.recordkey.field", "id").
option("hoodie.datasource.write.partitionpath.field", "data_date").
option("hoodie.datasource.write.precombine.field", "ts").
option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.TimestampBasedKeyGenerator").
option("hoodie.deltastreamer.keygen.timebased.timestamp.type", "DATE_STRING").
option("hoodie.deltastreamer.keygen.timebased.output.dateformat", "yyyy/MM/dd").
option("hoodie.deltastreamer.keygen.timebased.timezone", "GMT+8:00").
option("hoodie.deltastreamer.keygen.timebased.input.dateformat", "yyyy-MM-dd").
mode(org.apache.spark.sql.SaveMode.Append).
save("file:///tmp/hudi/issue_4417_mor")

+-------------------+--------------------+------------------+----------------------+--------------------+---+----+---+---+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name| id|name|age| ts| data_date|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+---+---+----------+
|  20220110172709324|20220110172709324...|                 2|            2018/09/24|703e56d3-badb-40b...|  2|  z3| 35| v1|2018-09-24|
|  20220110172709324|20220110172709324...|                 1|            2018/09/23|58fde2b3-db0e-464...|  1|  z3| 30| v1|2018-09-23|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+---+---+----------+

// can not query any data
spark.read.format("hudi").load("file:///tmp/hudi/issue_4417_mor").where("data_date = '2018-09-24'")
// still can not query any data
spark.read.format("hudi").load("file:///tmp/hudi/issue_4417_mor").where("data_date = '2018/09/24'").show

// cow
df.write.format("hudi").
option(HoodieWriteConfig.TABLE_NAME, "issue_4417_cow").
option("hoodie.datasource.write.table.type", "COPY_ON_WRITE").
option("hoodie.datasource.write.recordkey.field", "id").
option("hoodie.datasource.write.partitionpath.field", "data_date").
option("hoodie.datasource.write.precombine.field", "ts").
option("hoodie.datasource.write.keygenerator.class", "org.apache.hudi.keygen.TimestampBasedKeyGenerator").
option("hoodie.deltastreamer.keygen.timebased.timestamp.type", "DATE_STRING").
option("hoodie.deltastreamer.keygen.timebased.output.dateformat", "yyyy/MM/dd").
option("hoodie.deltastreamer.keygen.timebased.timezone", "GMT+8:00").
option("hoodie.deltastreamer.keygen.timebased.input.dateformat", "yyyy-MM-dd").
mode(org.apache.spark.sql.SaveMode.Append).
save("file:///tmp/hudi/issue_4417_cow")

+-------------------+--------------------+------------------+----------------------+--------------------+---+----+---+---+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name| id|name|age| ts| data_date|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+---+---+----------+
|  20220110172721896|20220110172721896...|                 2|            2018/09/24|81cc7819-a0d1-4e6...|  2|  z3| 35| v1|2018/09/24|
|  20220110172721896|20220110172721896...|                 1|            2018/09/23|d428019b-a829-41a...|  1|  z3| 30| v1|2018/09/23|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+---+---+----------+
// can not query any data
spark.read.format("hudi").load("file:///tmp/hudi/issue_4417_cow").where("data_date = '2018-09-24'").show

// but 2018/09/24 works
spark.read.format("hudi").load("file:///tmp/hudi/issue_4417_cow").where("data_date = '2018/09/24'").show  {code}
 

 

JIRA info


Comments

10/Jan/22 10:08;biyan900116@gmail.com;[~sivabalann] [~xushiyan] 
I have some doubt need to be solved.

As this case, mor and cow table have different behaviors, mor use the origin format of data as yyyy-MM-dd, but cow use the output dateformat as yyyy/MM/dd. Which one is right?

 

And users should the input format to query, or the output format to query?;;;


10/Jan/22 12:18;shivnarayan;my understanding it, it has to be output dateformat (i.e. yyyy/MM/dd in this context). Looks like MOR needs to be fixed. but really surprised how come resultant partition format differs from one table type to another. ;;;


10/Jan/22 13:06;biyan900116@gmail.com;yeah, the difference between the two table types is wried. I'll modify the MOR type's behavior to keep it  same with COW table. But when finish this, it will not be compatible with the existing MOR table.;;;


11/Jan/22 01:52;taisenki;[~biyan900116@gmail.com] [~shivnarayan] 

I think there should the input format to query ,  TimestampBasedKeyGenerator just used for the partition key but not origin data. 

There also a situation such like:

when use Read Optimized Queries to read this data( mor data), there will result to format  '2018/09/24' :

scala> spark.time(spark.sql("select * from issue_4417_mor_ro where id = '1'").select("data_date").show);
|data_date|
|----------|
|2018/09/23|

Time taken: 12067 ms
scala> spark.time(spark.sql("select * from issue_4417_mor_rt where id = '1'").select("data_date").show);
|data_date|
|----------|
|2018-09-23|

Time taken: 30927 ms
scala>

 

So I think,no matter in cow or mor, the data with partition column hasn't changed, just because fileInputformat has difference .

And, we should still query by data inputformat instead of outputformat which just used for the partition key.;;;


14/Jan/22 00:10;shivnarayan;thanks for the update [~taisenki] .

Can you try querying using  
_hoodie_partition_path
and see how that pans out for all different table types and query types. ;;;


17/Jan/22 01:37;taisenki;[~shivnarayan]

using query with _hoodie_partition_path :
{code:java}
// MOR & SNAPSHOT QUERY
scala> spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL).load(tablePath).where("_hoodie_partition_path = '2018/09/24'").show
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+---+---+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name| id|name|age| ts| data_date|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+---+---+----------+
|  20220117090936761|20220117090936761...|                 2|            2018/09/24|8216950c-cf4c-4fc...|  2|  z3| 35| v1|2018-09-24|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+---+---+----------+

// MOR & READ_OPTIMIZED QUERY
scala> spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL).load(tablePath).where("_hoodie_partition_path = '2018/09/24'").show
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+---+---+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name| id|name|age| ts| data_date|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+---+---+----------+
|  20220117090936761|20220117090936761...|                 2|            2018/09/24|8216950c-cf4c-4fc...|  2|  z3| 35| v1|2018/09/24|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+---+---+----------+

// MOR & INCREMENTAL QUERY
scala> spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, 20220117090936760L).load(tablePath).where("_hoodie_partition_path = '2018/09/24'").show
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+---+---+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name| id|name|age| ts| data_date|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+---+---+----------+
|  20220117090936761|20220117090936761...|                 2|            2018/09/24|8216950c-cf4c-4fc...|  2|  z3| 35| v1|2018-09-24|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+---+---+----------+ {code}
 

 

using query with partition column data_date: 

 
{code:java}
// MOR & SNAPSHOT QUERY
scala> spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL).load(tablePath).where("data_date = '2018/09/24'").show
+-------------------+--------------------+------------------+----------------------+-----------------+---+----+---+---+---------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name| id|name|age| ts|data_date|
+-------------------+--------------------+------------------+----------------------+-----------------+---+----+---+---+---------+
+-------------------+--------------------+------------------+----------------------+-----------------+---+----+---+---+---------+

scala> spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_SNAPSHOT_OPT_VAL).load(tablePath).where("data_date = '2018-09-24'").show
+-------------------+--------------------+------------------+----------------------+-----------------+---+----+---+---+---------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name| id|name|age| ts|data_date|
+-------------------+--------------------+------------------+----------------------+-----------------+---+----+---+---+---------+
+-------------------+--------------------+------------------+----------------------+-----------------+---+----+---+---+---------+

// MOR & READ_OPTIMIZED QUERY
scala> spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL).load(tablePath).where("data_date = '2018/09/24'").show
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+---+---+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name| id|name|age| ts| data_date|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+---+---+----------+
|  20220117090936761|20220117090936761...|                 2|            2018/09/24|8216950c-cf4c-4fc...|  2|  z3| 35| v1|2018/09/24|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+---+---+----------+

scala> spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_READ_OPTIMIZED_OPT_VAL).load(tablePath).where("data_date = '2018-09-24'").show
+-------------------+--------------------+------------------+----------------------+-----------------+---+----+---+---+---------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name| id|name|age| ts|data_date|
+-------------------+--------------------+------------------+----------------------+-----------------+---+----+---+---+---------+
+-------------------+--------------------+------------------+----------------------+-----------------+---+----+---+---+---------+

// MOR & INCREMENTAL QUERY
scala> spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, 20220117090936760L).load(tablePath).where("data_date = '2018-09-24'").show
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+---+---+----------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name| id|name|age| ts| data_date|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+---+---+----------+
|  20220117090936761|20220117090936761...|                 2|            2018/09/24|8216950c-cf4c-4fc...|  2|  z3| 35| v1|2018-09-24|
+-------------------+--------------------+------------------+----------------------+--------------------+---+----+---+---+----------+

scala> spark.read.format("hudi").option(DataSourceReadOptions.QUERY_TYPE.key(), DataSourceReadOptions.QUERY_TYPE_INCREMENTAL_OPT_VAL).option(DataSourceReadOptions.BEGIN_INSTANTTIME_OPT_KEY, 20220117090936760L).load(tablePath).where("data_date = '2018/09/24'").show
+-------------------+--------------------+------------------+----------------------+-----------------+---+----+---+---+---------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|_hoodie_file_name| id|name|age| ts|data_date|
+-------------------+--------------------+------------------+----------------------+-----------------+---+----+---+---+---------+
+-------------------+--------------------+------------------+----------------------+-----------------+---+----+---+---+---------+
{code}
 

 

So we can see , difference between on each query type with partition column , and i perfer to incremental query type result format.;;;


27/Jan/22 03:45;biyan900116@gmail.com;[~shivnarayan] [~taisenki] 

i also agree that the result of incremental query is better. According to this, we have a list to do:

for cow, the field of data_date should return the 'yyyy-MM-dd' original format. It should be fixed.

for mor. in snapshot query and read_opitimized query, hudi should response correctly by 'data_date' using 'yyyy-MM-dd' format. ;;;


18/Apr/22 22:46;alexey.kudinkin;We have to rollback the original fix to this, due to the performance regression it was producing. More details could be found in HUDI-3902.;;;


16/Oct/24 01:56;yihua;[~jonvex] is this already done as part of your previous fix?;;;


16/Oct/24 21:16;yihua;Jon has put up two fixes on reading the original value of the timestamp partition column (not from the string partition path): HUDI-5807 ([https://github.com//pull/11770]), HUDI-8098 ([https://github.com//pull/11895]).  So the issue is fixed.;;;


16/Oct/24 21:18;yihua;The new file group reader-based and HadoopFsRelation-based (using new HoodieParquetFileFormat) query logic in Spark can read the original value of the timestamp partition column properly.  The deprecated relation classes do not support this yet.;;;


16/Oct/24 21:24;yihua;We may not want to fix this issue in the deprecated relation classes which is only used by clustering (to be revisited and fixed).  We should follow up to remove the relation classes so we only maintain the new reader code path going forward.;;;

Sub-issues

Metadata

Metadata

Assignees

Type

No type

Projects

No projects

Relationships

None yet

Development

No branches or pull requests

Issue actions