{}Problem Statement{}:
While writing data to a Hudi table with the {{hoodie.parquet.outputtimestamptype=TIMESTAMP_MILLIS}} Hudi option, the setting is not honored, and the Parquet file always defaults to {{{}timestamp-micros{}}}.
{}Solution{}:
Hudi should honor the {{hoodie.parquet.outputtimestamptype=TIMESTAMP_MILLIS}} setting when specified in the Hudi options.
Reproducible Code:
For more details, refer the following hudi issue.
{{[https://github.com//issues/12339]}}
JIRA info
Comments
05/Feb/25 05:58;ktblsva;looks like it works for BULK_INSERT mode
{code:java}
val name = this.getClass.getSimpleName.replace("$", "")
val sparkConf = new SparkConf().setAppName(name).setIfMissing("spark.master", "local[2]")
val spark = SparkSession.builder.appName(name).config(sparkConf)
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
.config("spark.sql.hive.convertMetastoreParquet", "false")
.getOrCreate()
val tableName = name
val basePath = f"file:///tmp/warehouse/$tableName"
val schema = StructType(Array(
StructField("field1", IntegerType, nullable = false),
StructField("field2", StringType, nullable = true),
StructField("field3", TimestampType, nullable = false)
))
val data = Seq(
Row(1, "A", java.sql.Timestamp.valueOf("2023-10-01 10:00:00.540040")),
Row(2, "B", java.sql.Timestamp.valueOf("2023-10-01 11:30:00.240030")),
Row(3, "C", java.sql.Timestamp.valueOf("2023-10-01 12:45:00.140022"))
)
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
// Hudi write options
val hudiOptions = Map(
"hoodie.table.name" -> tableName,
"hoodie.datasource.write.recordkey.field" -> "field1",
"hoodie.datasource.write.precombine.field" -> "field2",
"hoodie.parquet.outputtimestamptype" -> "TIMESTAMP_MILLIS",
DataSourceWriteOptions.OPERATION.key -> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL,
//"hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled" -> "true"
)
// Write the DataFrame to Hudi
df.write.format("hudi").options(hudiOptions).mode("overwrite").save(basePath)
spark.stop() {code};;;
13/Feb/25 15:11;rangareddy.avula@gmail.com;Hi [~ktblsva]
We need to make sure, it needs to work for any writer operation.;;;
{}Problem Statement{}:
While writing data to a Hudi table with the {{hoodie.parquet.outputtimestamptype=TIMESTAMP_MILLIS}} Hudi option, the setting is not honored, and the Parquet file always defaults to {{{}timestamp-micros{}}}.
{}Solution{}:
Hudi should honor the {{hoodie.parquet.outputtimestamptype=TIMESTAMP_MILLIS}} setting when specified in the Hudi options.
Reproducible Code:
For more details, refer the following hudi issue.
{{[https://github.com//issues/12339]}}
JIRA info
Comments
05/Feb/25 05:58;ktblsva;looks like it works for BULK_INSERT mode
{code:java}
val name = this.getClass.getSimpleName.replace("$", "")
val sparkConf = new SparkConf().setAppName(name).setIfMissing("spark.master", "local[2]")
val spark = SparkSession.builder.appName(name).config(sparkConf)
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
.config("spark.sql.hive.convertMetastoreParquet", "false")
.getOrCreate()
val tableName = name
val basePath = f"file:///tmp/warehouse/$tableName"
val schema = StructType(Array(
StructField("field1", IntegerType, nullable = false),
StructField("field2", StringType, nullable = true),
StructField("field3", TimestampType, nullable = false)
))
val data = Seq(
Row(1, "A", java.sql.Timestamp.valueOf("2023-10-01 10:00:00.540040")),
Row(2, "B", java.sql.Timestamp.valueOf("2023-10-01 11:30:00.240030")),
Row(3, "C", java.sql.Timestamp.valueOf("2023-10-01 12:45:00.140022"))
)
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
// Hudi write options
val hudiOptions = Map(
"hoodie.table.name" -> tableName,
"hoodie.datasource.write.recordkey.field" -> "field1",
"hoodie.datasource.write.precombine.field" -> "field2",
"hoodie.parquet.outputtimestamptype" -> "TIMESTAMP_MILLIS",
DataSourceWriteOptions.OPERATION.key -> DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL,
//"hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled" -> "true"
)
// Write the DataFrame to Hudi
df.write.format("hudi").options(hudiOptions).mode("overwrite").save(basePath)
spark.stop() {code};;;
13/Feb/25 15:11;rangareddy.avula@gmail.com;Hi [~ktblsva]
We need to make sure, it needs to work for any writer operation.;;;