Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SUPPORT] Upsert overwrting ordering field with invalid value #5469

Closed
jasondavindev opened this issue Apr 29, 2022 · 5 comments
Closed

[SUPPORT] Upsert overwrting ordering field with invalid value #5469

jasondavindev opened this issue Apr 29, 2022 · 5 comments
Assignees
Labels
priority:critical production down; pipelines stalled; Need help asap. spark Issues related to spark writer-core Issues relating to core transactions/write actions

Comments

@jasondavindev
Copy link

jasondavindev commented Apr 29, 2022

Describe the problem you faced

I'm writing an application to upsert records from a table. The problem is when an upsert operation is done, the ordering column of records that exists in base table and not exists in incoming data is overwritten to invalid value.
E.g.
The base table has a record with id = 1 and createddate = 2022-04-01
The incoming data has a record with id = 2 and createddate = 2022-04-02

After upsert operation the createddate of record with id = 1 is changed to 1970-xx-xx and the record with id = 2 remains intact.

To Reproduce

from pyspark.sql.functions import expr
from pyspark.sql import DataFrame, SparkSession

database = 'db'
table = 'tb'
table_path = f'/{database}/{table}'

spark = SparkSession.builder.config(
    'spark.sql.shuffle.partitions', '4').enableHiveSupport().getOrCreate()

options = {
    'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.CustomKeyGenerator',
    'hoodie.datasource.write.recordkey.field': 'id',
    'hoodie.datasource.write.partitionpath.field': 'field:simple',
    'hoodie.datasource.write.precombine.field': 'createddate',
    'hoodie.payload.event.time.field': 'createddate',
    'hoodie.datasource.write.table.type': 'COPY_ON_WRITE',
    'hoodie.table.name': table,

    'hoodie.datasource.write.hive_style_partitioning': 'true',
    'hoodie.datasource.hive_sync.partition_extractor_class': 'org.apache.hudi.hive.MultiPartKeysValueExtractor',
    'hoodie.datasource.hive_sync.enable': 'true',
    'hoodie.datasource.hive_sync.mode': 'hms',
    'hoodie.datasource.hive_sync.support_timestamp': 'true',
    'hoodie.datasource.hive_sync.database': database,
    'hoodie.datasource.hive_sync.table': table,
    'hoodie.datasource.hive_sync.partition_fields': 'field',

}

full = spark.read.parquet(
    '/opt/spark/conf/full/')
delta = spark.read.json(
    '/opt/spark/conf/delta')

full_parse: DataFrame = full \
    .withColumn('createddate', expr(f'cast(substr(createddate, 1, 19) as timestamp)'))

delta_parse: DataFrame = delta \
    .withColumn('createddate', expr(f'cast(substr(createddate, 1, 19) as timestamp)'))

full_parse \
    .write \
    .format('org.apache.hudi') \
    .options(**options) \
    .option('hoodie.datasource.write.operation', 'bulk_insert') \
    .mode('overwrite') \
    .save(table_path)

delta_parse \
    .write \
    .format('org.apache.hudi') \
    .options(**options) \
    .option('hoodie.datasource.write.operation', 'upsert') \
    .mode('append') \
    .save(table_path)

Example full file content

+------------------+-------------------+-----------+-------------------+------------------+---------+----------------+--------+------------------+
|createdbyid       |createddate        |datatype   |field              |id                |isdeleted|newvalue        |oldvalue|parentid          |
+------------------+-------------------+-----------+-------------------+------------------+---------+----------------+--------+------------------+
|0055G00000808dFQAQ|2022-03-16 16:55:13|DynamicEnum|Status_do_Imovel__c|0175G0000jIvmN7QAJ|false    |Visita Cancelada|null    |a015G00000kpbM3QAI|
+------------------+-------------------+-----------+-------------------+------------------+---------+----------------+--------+------------------+

After upsert operation

+------------------+-----------------------+-----------+-------------------+------------------+---------+----------------+----------------------+------------------+
|createdbyid       |createddate            |datatype   |field              |id                |isdeleted|newvalue        |oldvalue              |parentid          |
+------------------+-----------------------+-----------+-------------------+------------------+---------+----------------+----------------------+------------------+
|0055G00000808dFQAQ|1970-01-20 01:37:29.713|DynamicEnum|Status_do_Imovel__c|0175G0000jIvmN7QAJ|false    |Visita Cancelada|null                  |a015G00000kpbM3QAI|
+------------------+-----------------------+-----------+-------------------+------------------+---------+----------------+----------------------+------------------+

Obs: A random number of records is affected by this bug. For each execution a different number of records is affected.

1rst execution

spark-sql> select count(id) from db.tb where createddate < '1971-01-01';
97801

2nd execution

spark-sql> select count(id) from db.tb where createddate < '1971-01-01';
76356

Environment Description

  • Hudi version : 0.10.0

  • Spark version : 3.1.2

  • Storage (HDFS/S3/GCS..) : Local

  • Running on Docker? (yes/no) : Yes

@yihua
Copy link
Contributor

yihua commented Apr 30, 2022

@jasondavindev Thanks for reporting this issue and detailed information, I'll try to reproduce it. Have you tried the latest master of Hudi to see if the problem still exists?

@yihua yihua added priority:critical production down; pipelines stalled; Need help asap. spark Issues related to spark writer-core Issues relating to core transactions/write actions labels Apr 30, 2022
@yihua yihua added this to Awaiting Triage in GI Tracker Board via automation Apr 30, 2022
@yihua yihua self-assigned this Apr 30, 2022
@jasondavindev
Copy link
Author

I tried with the version 0.10.1 and there are error yet.
With the version 0.11.0 launched this weekend, zero records was found with wrong date.

@yihua
Copy link
Contributor

yihua commented May 2, 2022

@jasondavindev Thanks for confirming. If this is solved by Hudi 0.11.0 release and there is no other ask for this issue, feel free to close it.

@jasondavindev
Copy link
Author

When used INSERT operation instead BULK_INSERT, no records are affected by the bug. Thus the BULK_INSERT has a bug.

@yihua
Copy link
Contributor

yihua commented Jun 29, 2022

@jasondavindev to clarify, do you still see issues with BULK_INSERT in 0.11.0?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
priority:critical production down; pipelines stalled; Need help asap. spark Issues related to spark writer-core Issues relating to core transactions/write actions
Projects
Development

No branches or pull requests

2 participants