Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG][Spark] Reliability issues when upserting data to ADLS using Delta in Synapse Analytics #3286

Open
2 of 8 tasks
WarSame opened this issue Jun 20, 2024 · 0 comments
Open
2 of 8 tasks
Labels
bug Something isn't working

Comments

@WarSame
Copy link

WarSame commented Jun 20, 2024

Bug

Which Delta project/connector is this regarding?

  • Spark
  • Standalone
  • Flink
  • Kernel
  • Other (fill in here)

Describe the problem

When upserting data from an Azure Synapse Analytics Spark session to ADLS Delta tables, we are seeing significant reliability issues in our job. In particular, when doing the Delta upsert, we are seeing executors will fail by running OOM. Once enough executors fail this causes the job to fail, bringing down the pipeline.

Our data size is not particularly large.

According to Azure support, our nodes are reaching maximum memory and CPU usage, not spilling to disk, then shutting down.

We are not sure if this is due to an issue with Delta upsert, Synapse analytics runtime, or something else. However, we have been able to isolate the issue to the writing step to ADLS by writing to a Synapse data catalog table, then reading that in to materialize the data and break the lineage.

Steps to reproduce

Create table in Synapse data catalog with around 40 GB of data in ~70 columns. Must contain columns named "year", "month", "file_date" and 2 PK columns "pk_1" and "pk_2" - "pk_1" is not unique, and is clustered on a few values, whereas the combination of "pk_1" and "pk_2" is unique.

Use the below code to overwrite into Delta table:

def overwrite_table(df: DataFrame):
    print(f"INFO: Creating or reinitializing the Structured Delta table")
    df.write\
        .format("delta")\
        .option("overwriteSchema", "true")\
        .partitionBy("year","month","file_date")\
        .mode("overwrite").save(targetLakePath)

Load a similarly sized amount of data with same structure into another data catalog table.
Use the below code to upsert the data into the same Delta table:

   mergeList = []
    
    mergeList.extend(hive_partition_cols)
    mergeList.extend(pkList)

    mergeColumnsList = [ f"tgt.{mergeColumn} = src.{mergeColumn}" for mergeColumn in mergeList ]
    mergeColumns = " and ".join(mergeColumnsList)

    print(f"INFO: merge condition: {mergeColumns}")

    dtStruct = DeltaTable.forPath(spark, targetLakePath)

    colsdict = determine_cols_to_update(df)
    dtStruct.alias("tgt") \
        .merge(
            df.alias("src"),
            mergeColumns
        ) \
        .whenMatchedUpdate(set = colsdict).whenNotMatchedInsertAll() \
        .execute() 

where the function referenced is:

def determine_cols_to_update(df: DataFrame):
    colsdict =  {}
    for dfcolumn in df.columns:
        if dfcolumn not in ['Create_Audit_ID','Create_Audit_Date','Update_Audit_ID'] :
            colsdict[f'tgt.{dfcolumn}'] = f'src.{dfcolumn}'
        elif dfcolumn == 'Update_Audit_ID':
            colsdict[f'tgt.{dfcolumn}'] = f'{auditID}'
    print(f"Updating on the following cols: {colsdict}")
    return colsdict

Observed results

The execution runs fairly quickly, but seems to have executors failing. This can be inferred because tasks fail in groups that are the same size as the # of vCores allocated to each node, and the tasks fail with the message:

ExecutorLostFailure (executor 21 exited caused by one of the running tasks) Reason: Container from a bad node: container_e01_1718824424890_0001_01_000002 on host: vm-0a304008. Exit status: 137. Diagnostics: [2024-06-19 20:52:33.201]Container killed on request. Exit code is 137

I have also just recently seen a failure, which may be a Delta issue:

Job aborted due to stage failure: Task 99 in stage 84.0 failed 10 times, most recent failure: Lost task 99.9 in stage 84.0 (TID 13309) (vm-cb733359 executor 23): org.apache.spark.sql.delta.DeltaFileNotFoundException: File path abfss://{ADLS}.dfs.core.windows.net/structured/{table_path}/_temporary/f2a946f1-f9c3-4e21-a78b-accb192568bf/bitvector-files/year=2024/month=05/file_date=2024-05-01/part-00450-430a3400-ff55-4193-bfbb-74048082d1ae.c000.snappy.parquet.bin

I have not deleted this folder location nor truncated the Delta log.

Expected results

I would expect Spark to spill to memory or disk if required, rather than fully exhausting the resourced of nodes. There is currently no spillage.

I would also not expect Spark to generally need to spill in this case because the amount of data is so small relative to the memory available to the nodes, but Spark does keep multiple copies of data so spillage may be needed.

I would also expect Delta to be able to find the files in the Delta temporary folder, which doesn't seem to be properly happening. This could be happening due to the executor failures, but they don't seem to be happening side-by-side.

Further details

Our incoming dataset is around 40 GB before being loaded into Spark. This data is loaded into ADLS as Parquet without proper partitioning currently, which could effect how it is read in. We read it using last modified timestamps, but cannot apply Hive partitioning to it. Copying the data and applying proper partitions didn't seem to resolve failures.

We are upserting into about 72 GB of data. Our target data is partitioned into 23 partitions, but all are used in the upsert.

We have run into failures with various node sizes and amounts. We are currently using 6 XXL Azure nodes (64 vCores, 400 GB memory each) and still seeing these failures. The node size or amount doesn't seem to significantly effect the failures.

We have tried various optimization techniques including salting and repartitioning. We have not tried z-ordering, but can try that if it might address the root cause.

We have also tried doing a MERGE delete, then append, and we've seen node failure issues with this approach as well.

Environment information

  • Delta Lake version: 2.4
  • Spark version: 3.4
  • Scala version: 2.12.17

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

  • Yes. I can contribute a fix for this bug independently.
  • Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
  • No. I cannot contribute a bug fix at this time.
@WarSame WarSame added the bug Something isn't working label Jun 20, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

No branches or pull requests

1 participant