Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Data corrupted in the timestamp field to 1970-01-01 19:45:30.000 after subsequent upsert run #4552

Closed
denys-tyshetskyy opened this issue Jan 10, 2022 · 14 comments
Assignees
Labels
priority:critical production down; pipelines stalled; Need help asap. writer-core Issues relating to core transactions/write actions

Comments

@denys-tyshetskyy
Copy link

denys-tyshetskyy commented Jan 10, 2022

Tips before filing an issue

  • Have you gone through our FAQs?

  • Join the mailing list to engage in conversations and get faster support at dev-subscribe@hudi.apache.org.

  • If you have triaged this as a bug, then file an issue directly.

Describe the problem you faced

I am using hudi as a part of Glue jobs to manage data mutations in our data lake. The only modification that is being applied during the ETL from raw to bronze (where hudi is involved) is introduction of timestamp field that transforms date in seconds field from raw (ie 1602297930 -> 2020-10-10 02:45:30.000). I have noticed that for some entities, after subsequent ingestions, the timestamp field value becomes corrupted (even for the records that are not present in the update). I managed to isolate the bulk_insert as a source of the issue. The timestamp field gets reset to 1970-01-01 19:45:30.000. Just to note, when insert is being used in place of bulk_insert for the initial ingest, the issue is not occurring.

To Reproduce

Steps to reproduce the behavior:
Unfortunately it is pretty tricky to reproduce and it is only happening to 2 types of entities out of 16 we are using.
We are moving stripe data into the data lake and the entities that do get issues are (https://stripe.com/docs/api/invoices and https://stripe.com/docs/api/customers

  1. Create AWS Glue ETL job with hudi config
    hudi_options = { "hoodie.table.create.schema": "default", "hoodie.table.name": tableName, "hoodie.datasource.write.recordkey.field": "id", "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.ComplexKeyGenerator", "hoodie.datasource.write.partitionpath.field": "field_1, field_2", "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor", "hoodie.datasource.hive_sync.partition_fields": "field_1, field_2", 'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS', 'hoodie.cleaner.commits.retained': 10, "hoodie.datasource.write.table.name": tableName, "hoodie.datasource.write.operation": "bulk_insert", "hoodie.parquet.compression.codec": "snappy", "hoodie.datasource.hive_sync.enable": "true", "hoodie.datasource.hive_sync.use_jdbc": "false", 'hoodie.datasource.hive_sync.support_timestamp': "true", "hoodie.datasource.write.precombine.field": "lastupdate", "hoodie.datasource.hive_sync.database": f"{database_name}", "hoodie.datasource.hive_sync.table": tableName, }
    The transformation of the raw data looks as following:
    sparkDF = dfc.select(list(dfc.keys())[0]).toDF() transformed = sparkDF.select("*", from_unixtime(col("created")).cast("timestamp").alias("created_ts")) resolvechoice4 = DynamicFrame.fromDF(transformed, glueContext, "transformedDF") return DynamicFrameCollection({"CustomTransform0": resolvechoice4}, glueContext)

  2. Write data to data lake
    sparkDF.write.format("hudi").options(**hudi_options).mode("overwrite").save( basePath )

  3. Update the previously mentioned hudi config with write operation to become upsert. Update spark mode to append

  4. Trigger Glue job again with updates

Expected behavior

I expect the data to remain consistent and the fields to represent the relevant info

Environment Description

  • Hudi version : 0.9.0

  • Spark version : 3.1

  • Hive version : Using Glue data catalogue as hive metastore

  • Hadoop version : N/A

  • Glue version: 3

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) :

Additional context

Add any other context about the problem here.

Stacktrace

There is no error

@nsivabalan
Copy link
Contributor

yeah. We recently fixed the issue here. Can you try out the patch and let us know if it resolves the issue.

@nsivabalan nsivabalan added this to Awaiting Triage in GI Tracker Board via automation Jan 10, 2022
@nsivabalan nsivabalan moved this from Awaiting Triage to Triaged in GI Tracker Board Jan 10, 2022
@xushiyan xushiyan added priority:critical production down; pipelines stalled; Need help asap. writer-core Issues relating to core transactions/write actions labels Jan 18, 2022
@nsivabalan
Copy link
Contributor

Closing this one out since we know the root cause and have a solution. Feel free to re-open if you have more questions. would be happy to help.

GI Tracker Board automation moved this from Triaged to Done Jan 18, 2022
@rafcis02
Copy link

rafcis02 commented Jan 21, 2022

I tried the patch from #4203 but it still does not work for me.
The only difference is that I upsert data using SQL MERGE INTO statement.
I run my job in the AWS Glue - first job execution dumps data to S3 and creates table in Glue Data Catalog(BULK_INSERT). Then I add property(tried with serde and tbl properties) to Glue table hoodie.datasource.write.keygenerator.consistent.logical.timestamp.enabled=true, so the SQL command can read this property. But when I'm running upsert job, it still corrupts all the timestamps within the files that are being updated.
@nsivabalan Could you help me with it, please?

@nsivabalan
Copy link
Contributor

Did you set the property when you did BULK_INSERT? guess thats where the config is effective or rather the fix was made if I am not wrong. Basically can you ensure you set the config value in all operations (bulk_insert and uspert) and see what happens.

@rafcis02
Copy link

I've tried it for BULK_INSERT and UPSERT as well, bot nothing works for me.

I prepared sample test job of that so you can reproduce it or just review it (I hope I just misconfigured it 😀).
Upsert operation corrupts timestamps, no matter if I set this option or not - I tested it using DataFrame writer and SQL MERGE INTO.

AWS Glue 2.0(Spark 2.4)
Hudi 0.10.1

import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig.TBL_NAME
import org.apache.hudi.hive.MultiPartKeysValueExtractor
import org.apache.hudi.hive.ddl.HiveSyncMode
import org.apache.hudi.hive.util.ConfigUtils
import org.apache.hudi.keygen.ComplexKeyGenerator
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.functions.{current_timestamp, expr, lit}
import org.apache.spark.sql.hudi.HoodieOptionConfig.{SQL_KEY_PRECOMBINE_FIELD, SQL_KEY_TABLE_PRIMARY_KEY, SQL_KEY_TABLE_TYPE}
import org.apache.spark.sql.hudi.{HoodieOptionConfig, HoodieSparkSessionExtension}
import org.apache.spark.sql.{SaveMode, SparkSession}

import scala.collection.JavaConverters.mapAsJavaMapConverter

case class HudiTableOptions(tableName: String,
                            databaseName: String,
                            operationType: String,
                            partitionColumns: List[String],
                            recordKeyColumns: List[String],
                            preCombineColumn: String) {

  def hudiTableOptions = Map(
    TABLE_TYPE.key() -> COW_TABLE_TYPE_OPT_VAL,
    OPERATION.key() -> operationType,
    TBL_NAME.key() -> tableName,
    RECORDKEY_FIELD.key() -> recordKeyColumns.mkString(","),
    PARTITIONPATH_FIELD.key() -> partitionColumns.mkString(","),
    KEYGENERATOR_CLASS_NAME.key() -> classOf[ComplexKeyGenerator].getName,
    PRECOMBINE_FIELD.key() -> preCombineColumn,
    URL_ENCODE_PARTITIONING.key() -> false.toString,
    KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key() -> true.toString
  )

  def hiveTableProperties = Map(
    SQL_KEY_TABLE_TYPE.sqlKeyName -> HoodieOptionConfig.SQL_VALUE_TABLE_TYPE_COW,
    SQL_KEY_TABLE_PRIMARY_KEY.sqlKeyName -> hudiTableOptions(RECORDKEY_FIELD.key()),
    SQL_KEY_PRECOMBINE_FIELD.sqlKeyName -> hudiTableOptions(PRECOMBINE_FIELD.key())
  )

  def hiveTableOptions = Map(
    HIVE_SYNC_MODE.key() -> HiveSyncMode.HMS.name(),
    HIVE_SYNC_ENABLED.key() -> true.toString,
    HIVE_DATABASE.key() -> databaseName,
    HIVE_TABLE.key() -> hudiTableOptions(TBL_NAME.key()),
    HIVE_PARTITION_FIELDS.key() -> hudiTableOptions(PARTITIONPATH_FIELD.key()),
    HIVE_PARTITION_EXTRACTOR_CLASS.key() -> classOf[MultiPartKeysValueExtractor].getName,
    HIVE_STYLE_PARTITIONING.key() -> true.toString,
    HIVE_SUPPORT_TIMESTAMP_TYPE.key() -> true.toString,
    HIVE_TABLE_SERDE_PROPERTIES.key() -> ConfigUtils.configToString(hiveTableProperties.asJava)
  )

  def writerOptions: Map[String, String] = hudiTableOptions ++ hiveTableOptions
}

object GlueApp {
  def main(args: Array[String]): Unit = {
    Logger.getRootLogger.setLevel(Level.WARN)
    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)
    Logger.getLogger("org.apache.hudi").setLevel(Level.INFO)

    val spark: SparkSession = SparkSession.builder()
      .appName("Test")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("hive.metastore.glue.catalogid", "****")
      .withExtensions(new HoodieSparkSessionExtension())
      .enableHiveSupport()
      .getOrCreate()

    import spark.implicits._

    val tableName = "hudi_test"
    val databaseName = "test_database"
    val path = "s3://s3bucketname/hudi-test/"
    val tableOptions = HudiTableOptions(tableName, databaseName, BULK_INSERT_OPERATION_OPT_VAL, List("year"), List("id"), "ts")

    val dataFrameForBulkInsert = Range(0, 10000).toDF("id")
      .withColumn("year", lit(2022))
      .withColumn("ts", current_timestamp() - expr("INTERVAL 100 DAYS"))
      .withColumn("other", current_timestamp() - expr("INTERVAL 150 DAYS"))

    val dataFrameForUpsert = Range(5000, 15000).toDF("id")
      .withColumn("year", lit(2022))
      .withColumn("ts", current_timestamp() - expr("INTERVAL 10 DAYS"))
      .withColumn("other", current_timestamp() - expr("INTERVAL 50 DAYS"))

    // ------------------------ BULK INSERT ------------------------------------------
    dataFrameForBulkInsert.write
      .format("org.apache.hudi")
      .options(tableOptions.writerOptions)
      .mode(SaveMode.Overwrite)
      .save(path)
    // -----------------------------------------------------------------------------

    Thread.sleep(10 * 1000)

    // --------------------- UPSERT Spark DataFrame Writer --------------------------
//    dataFrameForUpsert
//      .write
//      .format("org.apache.hudi")
//      .options(tableOptions.copy(operationType = UPSERT_OPERATION_OPT_VAL).writerOptions)
//      .mode(SaveMode.Append)
//      .save(path)
    // ------------------------------------------------------------------------------

    // --------------------- UPSERT SQL MERGE INTO-----------------------------------
    val mergeIntoStatement =
      s"""MERGE INTO $databaseName.$tableName AS t
         | USING source_data_set s
         | ON t.id=s.id
         | WHEN MATCHED THEN UPDATE SET *
         | WHEN NOT MATCHED THEN INSERT *
         |""".stripMargin
    dataFrameForUpsert.createTempView("source_data_set")
    spark.sql(mergeIntoStatement)
    // -------------------------------------------------------------------------------
  }
}

@nsivabalan
Copy link
Contributor

nsivabalan commented Jan 31, 2022

@YannByron : Looks like the timestamp consistency fix we(guess you and sagar) landed might have some gaps. Do you think you can follow up here please?
Essentially try a bulk_insert and then an upsert for same set of record keys with timestamp as one of the columns has some issues.

@nsivabalan nsivabalan reopened this Jan 31, 2022
GI Tracker Board automation moved this from Done to User Action Jan 31, 2022
@YannByron
Copy link
Contributor

@denys-tyshetskyy @rafcis02
This issue, i think, is a bug for Bulk_Insert and hoodie.datasource.write.row.writer.enable.
But for now, you can disable hoodie.datasource.write.row.writer.enable when bulk_insert. That will work well. And we'll fix it asap.

@YannByron
Copy link
Contributor

@denys-tyshetskyy @rafcis02
update this: you can set hoodie.parquet.outputtimestamptype to TIMESTAMP_MICROS when you bulk_insert datas. It'll work nicely.

@rafcis02
Copy link

rafcis02 commented Feb 7, 2022

Thanks a lot @YannByron, hoodie.parquet.outputtimestamptype works perfectly

@nsivabalan
Copy link
Contributor

thanks Yann 👍
@rafcis02 @denys-tyshetskyy : if you are good, can we close the issue.

@denys-tyshetskyy
Copy link
Author

Thanks guys, feel free to close

GI Tracker Board automation moved this from User Action to Done Feb 7, 2022
@roSimoes
Copy link

roSimoes commented Apr 5, 2022

Hello @nsivabalan, I have the same problem.
The solution presented here solves the problem for the bulk insert but the upsert remains the same.

In my case I have a job that on the 1st load does bulk_insert and then inserts data on new loads, all these new loads have dates from 1970.

I'll leave my configs attached:
Initial Load :
commonConfig = {'className' : 'org.apache.hudi',
'hoodie.datasource.hive_sync.use_jdbc':'false',
'hoodie.datasource.write.precombine.field': 'updated_date',
'hoodie.datasource.write.recordkey.field': primaryKey,
'hoodie.table.name': tableName,
'hoodie.consistency.check.enabled': 'true',
'hoodie.datasource.hive_sync.database': database,
'hoodie.datasource.hive_sync.table': tableName,
'hoodie.datasource.hive_sync.enable': 'true',
"hoodie.datasource.hive_sync.support_timestamp": "true",
'path': targetPath
}

partitionDataConfig = {'hoodie.datasource.write.partitionpath.field': partitionKey,
'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
'hoodie.datasource.hive_sync.partition_fields': partitionKey
}

initLoadConfig = {'hoodie.bulkinsert.shuffle.parallelism': 68,
'hoodie.datasource.write.operation': 'bulk_insert',
'hoodie.datasource.write.payload.class': 'org.apache.hudi.common.model.EmptyHoodieRecordPayload',
'hoodie.datasource.write.partitions.to.delete': targetPath,
'hoodie.parquet.outputtimestamptype' : 'TIMESTAMP_MICROS'
}

combinedConf = {**commonConfig, **partitionDataConfig, **initLoadConfig}

glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "marketplace.spark", connection_options = combinedConf)

Incremental Load :

commonConfig = {'className' : 'org.apache.hudi',
'hoodie.datasource.hive_sync.use_jdbc':'false',
'hoodie.datasource.write.precombine.field': 'updated_date',
'hoodie.datasource.write.recordkey.field': primaryKey,
'hoodie.table.name': tableName,
'hoodie.consistency.check.enabled': 'true',
'hoodie.datasource.hive_sync.database': database,
'hoodie.datasource.hive_sync.table': tableName,
'hoodie.datasource.hive_sync.enable': 'true',
'hoodie.datasource.hive_sync.support_timestamp': 'true',
'path': targetPath
}

partitionDataConfig = {'hoodie.datasource.write.partitionpath.field': partitionKey,
'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
'hoodie.datasource.hive_sync.partition_fields': partitionKey
}

incrementalConfig = {'hoodie.upsert.shuffle.parallelism': 20,
'hoodie.datasource.write.operation': 'upsert',
'hoodie.cleaner.policy': 'KEEP_LATEST_COMMITS',
'hoodie.parquet.outputtimestamptype' : 'TIMESTAMP_MICROS'
}

combinedConf = {**commonConfig, **partitionDataConfig, **incrementalConfig}

DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DynamicFrame.fromDF(DataSourceDF0, glueContext, "initDf"), connection_type = "marketplace.spark", connection_options = combinedConf)

This problem is a critical issue, my partition key is partitionKey = 'created_date' , I'm going to upsert 66 records, and after the upsert, I have more than 800 records with the wrong date, it must have to do with the partitions.
Is there any way to force upsert to be TIMESTAMP_MICROS ?

Would it be possible for you to help me?
thanks in advance

@yuriy-os
Copy link

yuriy-os commented Apr 6, 2022

I second that @roSimoes, I have exactly same use case: initial bulk_insert big table and then upsert new loads into it.

Timestamps of upserted rows end up corrupted to 1970-01-01 aswell. Have you found any solution?

@jasondavindev
Copy link

#5469
The 0.11.0 version fixed it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
priority:critical production down; pipelines stalled; Need help asap. writer-core Issues relating to core transactions/write actions
Projects
Development

No branches or pull requests

8 participants