Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] Hudi Spark DataSource saves TimestampType as bigInt #2509

Open
zuyanton opened this issue Feb 1, 2021 · 25 comments
Open

[SUPPORT] Hudi Spark DataSource saves TimestampType as bigInt #2509

zuyanton opened this issue Feb 1, 2021 · 25 comments
Assignees
Labels
pre-0.10.0 priority:critical production down; pipelines stalled; Need help asap. schema-and-data-types spark Issues related to spark spark-sql

Comments

@zuyanton
Copy link

zuyanton commented Feb 1, 2021

Describe the problem you faced

It looks like org.apache.spark.sql.types.TimestampType when saved to hudi table gets converted to bigInt

To Reproduce

create dataframe with TimestampType

var seq = Seq((1, "2020-01-01 11:22:30", 2, 2))
var df = seq.toDF("pk", "time_string" , "partition", "sort_key")
df= df.withColumn("timestamp", col("time_string").cast(TimestampType))

preview dataframe

df.show
+---+-------------------+---------+--------+-------------------+
| pk|        time_string|partition|sort_key|          timestamp|
+---+-------------------+---------+--------+-------------------+
|  1|2020-01-01 11:22:30|        2|       2|2020-01-01 11:22:30|
+---+-------------------+---------+--------+-------------------+

save dataframe to hudi table

df.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save("s3://location")

view hudi table

spark.sql("select * from testTable2").show

result, timestamp column is bigint

+-------------------+--------------------+------------------+----------------------+--------------------+---+-------------------+--------+----------------+---------+
|_hoodie_commit_time|_hoodie_commit_seqno|_hoodie_record_key|_hoodie_partition_path|   _hoodie_file_name| pk|        time_string|sort_key|       timestamp|partition|
+-------------------+--------------------+------------------+----------------------+--------------------+---+-------------------+--------+----------------+---------+
|     20210201004527|  20210201004527_0_1|              pk:1|                     2|2972ef96-279b-438...|  1|2020-01-01 11:22:30|       2|1577877750000000|        2|
+-------------------+--------------------+------------------+----------------------+--------------------+---+-------------------+--------+----------------+---------+

view schema

spark.sql("describe testTable2").show

result

+--------------------+---------+-------+
|            col_name|data_type|comment|
+--------------------+---------+-------+
| _hoodie_commit_time|   string|   null|
|_hoodie_commit_seqno|   string|   null|
|  _hoodie_record_key|   string|   null|
|_hoodie_partition...|   string|   null|
|   _hoodie_file_name|   string|   null|
|                  pk|      int|   null|
|         time_string|   string|   null|
|            sort_key|      int|   null|
|           timestamp|   bigint|   null|
|           partition|      int|   null|
|# Partition Infor...|         |       |
|          # col_name|data_type|comment|
|           partition|      int|   null|
+--------------------+---------+-------+

Environment Description

  • Hudi version : 0.7.0

  • Spark version :

  • Hive version :

  • Hadoop version :

  • Storage (HDFS/S3/GCS..) :S3

  • Running on Docker? (yes/no) : no

Additional context

full code snippet

    import org.apache.spark.sql.functions._
    import org.apache.spark.sql.types._
    import org.apache.hudi.hive.MultiPartKeysValueExtractor
    import org.apache.hudi.QuickstartUtils._
    import scala.collection.JavaConversions._
    import org.apache.spark.sql.SaveMode
    import org.apache.hudi.DataSourceReadOptions._
    import org.apache.hudi.DataSourceWriteOptions._
    import org.apache.hudi.DataSourceWriteOptions
    import org.apache.hudi.config.HoodieWriteConfig._
    import org.apache.hudi.config.HoodieWriteConfig
    import org.apache.hudi.keygen.ComplexKeyGenerator
    import org.apache.hudi.common.model.DefaultHoodieRecordPayload
    import org.apache.hadoop.hive.conf.HiveConf
    val hiveConf = new HiveConf()
    val hiveMetastoreURI = hiveConf.get("hive.metastore.uris").replaceAll("thrift://", "")
    val hiveServer2URI = hiveMetastoreURI.substring(0, hiveMetastoreURI.lastIndexOf(":"))
    var hudiOptions = Map[String,String](
      HoodieWriteConfig.TABLE_NAME → "testTable2",
      "hoodie.consistency.check.enabled"->"true",
      DataSourceWriteOptions.STORAGE_TYPE_OPT_KEY -> "COPY_ON_WRITE",
      DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY -> "pk",
      DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY -> classOf[ComplexKeyGenerator].getName,
      DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY ->"partition",
      DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY -> "sort_key",
      DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY → "true",
      DataSourceWriteOptions.HIVE_TABLE_OPT_KEY → "testTable2",
      DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY → "partition",
      DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY → classOf[MultiPartKeysValueExtractor].getName,
      DataSourceWriteOptions.HIVE_URL_OPT_KEY ->s"jdbc:hive2://$hiveServer2URI:10000",
      "hoodie.payload.ordering.field" -> "sort_key",
      DataSourceWriteOptions.PAYLOAD_CLASS_OPT_KEY -> classOf[DefaultHoodieRecordPayload].getName
    )

//spark.sql("drop table if exists testTable1")
var seq = Seq((1, "2020-01-01 11:22:30", 2, 2))
var df = seq.toDF("pk", "time_string" , "partition", "sort_key")
df= df.withColumn("timestamp", col("time_string").cast(TimestampType))
df.show
df.write.format("org.apache.hudi").options(hudiOptions).mode(SaveMode.Append).save("s3://location")
spark.sql("select * from testTable2").show
@n3nash
Copy link
Contributor

n3nash commented Feb 2, 2021

@satishkotha Could you take a look at this one ?

@satishkotha
Copy link
Member

satishkotha commented Feb 2, 2021

Hi

If you set support_timestamp property mentioned here, hudi will convert the field to timestamp type in hive.

Note that you need to verify compatibility of this with hive/presto/athena versions you are using. We made some changes to interpret the field correctly as timestamp. Refer to this change in presto for example. We did similar changes in our internal hive deployment.

Some more background: Hudi uses parquet-avro module which converts timestamp to INT64 with logical type TIMESTAMP_MICROS. Hive and other query engines expect timestamp to be in INT96 format. But INT96 is no longer supported. Recommended path forward is to deprecate int96 and change query engines to work with int64 type https://issues.apache.org/jira/browse/PARQUET-1883 has additional details.

@rubenssoto
Copy link

Great to know, I will test this feature in Athena and Redshift Spectrum, if someone already made this test, please let me know.

@zuyanton
Copy link
Author

zuyanton commented Feb 3, 2021

@satishkotha I added that parameter to my example, now after writing data into s3 , when I run spark.sql("describe testTable3").show I get

+--------------------+---------+-------+  
|            col_name|data_type|comment|
+--------------------+---------+-------+
....
|           timestamp|timestamp|   null|

which is good , however when I run spark.sql("select * from testTable3").show I get exception
java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.hive.serde2.io.TimestampWritable

@satishkotha
Copy link
Member

satishkotha commented Feb 3, 2021

@zuyanton yes, as i mentioned earlier some changes are needed in query engines. Refer to this change in presto for example. See this ticket for how this is fixed upstream in hive. You likely need to port this change to your hive deployment to make this work. (Or you could also upgrade your hive version to 4)

@rubenssoto
Copy link

@satishkotha could you help me how to explain to aws support which fixes should be applied to athena.

@umehrot2 do you know if anything should be changed on emr?

thank you

@satishkotha
Copy link
Member

@rubenssoto AFAIK, athena is built on top of Presto. So you could ask them to apply above presto change. You can say this is needed for interpreting Parquet INT64 timestamp correctly.

@vinothchandar vinothchandar added schema-and-data-types priority:major degraded perf; unable to move forward; potential bugs labels Feb 6, 2021
@vinothchandar
Copy link
Member

Thanks for jumping in @satishkotha

@vinothchandar
Copy link
Member

Going back to @zuyanton 's point, that is still from Spark. And are you suggesting that Spark's Hive version needs to also pick up the change? (that sounds painful)

@rubenssoto
Copy link

Hi, @vinothchandar @satishkotha @zuyanton
Hello, I made some tests with redshift spectrum and athena, with redshift spectrum worked very good, but athena I will attach an image.
Captura de Tela 2021-02-08 às 11 02 07

Is there any workaround? I oppened an aws ticket but probably will take a while because the difference of presto version.

I have some tables in regular parquet with timestamp fields, and it work, what the difference comparing to Hudi?

thank you

@rubenssoto
Copy link

@vinothchandar @satishkotha @zuyanton

I think the only workaround here is to convert the timestamp column to string, do you have better ideas?
My timestamp column is not timestamp micro, my hudi avro timestamp is, it make senses?

thank you.

@nsivabalan
Copy link
Contributor

@rubenssoto : Here is a link to suggestions from Athena support on timestamp conversion.
#2123 (comment)

@rubenssoto
Copy link

@nsivabalan it worked but I think a view it is not a good solution, because we will have a maintenence problem.

It is not a Hudi fault, so we need to wait for athena, but I think it should not be solved soon...

in Hudi side is there anything what we can do? My timestamp is not a timestamp micro is timestamp milisecond

@satishkotha
Copy link
Member

I think if you query using spark datasource APIs, queries will be able to read timestamp field correctly. Querying through Athena, i don't think there is another workaround unfortunately.

@rubenssoto
Copy link

rubenssoto commented Apr 9, 2021

Hello Guys,

@satishkotha @nsivabalan

Athena Behavior changes,

Captura de Tela 2021-04-08 às 22 55 24

Captura de Tela 2021-04-08 às 22 58 09

This is a great news, but BETWEEN operator doesn't work.

For exemple, this query works:
select count(1) FROM "order" WHERE created_date >= cast('2021-04-07 03:00:00.000' as timestamp)

and this query doens't work:
select count(1) FROM "order" WHERE created_date between cast('2021-04-09 14:00:00.000' as timestamp) and cast('2021-04-09 15:00:00.000' as timestamp)

@nsivabalan
Copy link
Contributor

@rubenssoto : just incase you haven't seen this #2544. talks about timestamp and hive.

@n3nash n3nash added this to Triaged in GI Tracker Board Apr 22, 2021
@vinothchandar vinothchandar changed the title [SUPPORT]Hudi saves TimestampType as bigInt [SUPPORT] Hudi Spark DataSource saves TimestampType as bigInt Jun 5, 2021
@vinothchandar
Copy link
Member

Just reading through this again. We def need to understand if this is an issue even when using Spark as the only engine (i.e no registration to HMS). and understand if parquet-avro is the problem child.
Running this with row writer enabled is a good way to quickly weed that out

@n3nash n3nash moved this from Triaged to Repro Needed in GI Tracker Board Jun 9, 2021
@nsivabalan nsivabalan added priority:critical production down; pipelines stalled; Need help asap. and removed priority:major degraded perf; unable to move forward; potential bugs labels Aug 31, 2021
@nsivabalan nsivabalan self-assigned this Oct 19, 2021
@Gatsby-Lee
Copy link

AWS Glue3

  • Spark: 3.1.1-amzn-0
  • Hive: 2.3.7-amzn-4
  • Hudi: 0.9

I had this issue.
Although I can see timestamp type, the type I see through AWS Athena was bigint.

I was able to handle this issue by setting this value when I insert data.
"hoodie.datasource.hive_sync.support_timestamp": "true"

But, I am not sure if there is any downside of setting this value to true.

@nsivabalan nsivabalan added the query-engine trino, presto, athena, impala, etc label Dec 12, 2021
@nsivabalan
Copy link
Contributor

@Gatsby-Lee : hoodie.datasource.hive_sync.support_timestamp is the right way to go.

@rubenssoto : is everything resolved on your end or are you still having any issues. Let us know. if things are resolved, feel free to close out the issue.

@Gatsby-Lee
Copy link

@nsivabalan

Thank you for your comment.
I am using "hoodie.datasource.hive_sync.support_timestamp"

BTW, AWS Athena fails to read MoR Realtime table. ( Read Optimized table is ok )
I found some articles that say this is related to the Query Engine. ( in this case, it's the managed Presto )
so, I created a support ticket to AWS.

Any input you want me to provide to AWS Athena team?

@nsivabalan nsivabalan moved this from Repro Needed to Triaged, waiting for user to ack in GI Tracker Board Dec 21, 2021
@xushiyan xushiyan moved this from Triaged, waiting for user to ack to Triaged in GI Tracker Board Dec 28, 2021
@nsivabalan
Copy link
Contributor

@umehrot2 @zhedoubushishi : Do you folks have any pointers on this.
@Gatsby-Lee : I guess athena added support for real time query in one of the latest versions. Did you try using latest athena?

@Gatsby-Lee
Copy link

Gatsby-Lee commented Jan 10, 2022

@nsivabalan
after I got your msg, I queried to RT table. It still fails.
I heard from AWS that the fix will be shipped out soon.

@arpanrkl7
Copy link

When i am trying to read using spark-sql getting below error which was same mentioned by @zuyanton .
java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.hive.serde2.io.TimestampWritable

@xushiyan xushiyan added spark-sql spark Issues related to spark and removed schema-and-data-types triaged labels Jan 31, 2022
@xushiyan xushiyan moved this from Triaged to Awaiting Triage in GI Tracker Board Jan 31, 2022
@xushiyan xushiyan added schema-and-data-types and removed query-engine trino, presto, athena, impala, etc labels May 18, 2022
@codope
Copy link
Member

codope commented Jun 17, 2022

I think this is related to https://issues.apache.org/jira/browse/HUDI-83 and we have a patch. Can you please try out with #3391

@codope codope moved this from Awaiting Triage to Awaiting Ack Triaged in GI Tracker Board Jun 17, 2022
@nsivabalan nsivabalan assigned bvaradar and unassigned nsivabalan Sep 4, 2022
@codope
Copy link
Member

codope commented Sep 7, 2022

@zuyanton Did you get a chance to try out the suggested patch?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
pre-0.10.0 priority:critical production down; pipelines stalled; Need help asap. schema-and-data-types spark Issues related to spark spark-sql
Projects
GI Tracker Board
Awaiting Ack Triaged
Status: 👤 User Action
Development

No branches or pull requests