Skip to content

[SUPPORT] Unable to Use DynamoDB Based Lock with Hudi PySpark Job Locally #11391

@soumilshah1995

Description

@soumilshah1995

Description:
I'm encountering an issue while attempting to use DynamoDB Based Lock with an Apache Hudi PySpark job running locally. The goal is to have the job access DynamoDB in a specified region for locking purposes.

Configuration Used:

'hoodie.write.concurrency.mode': 'optimistic_concurrency_control',
'hoodie.cleaner.policy.failed.writes': 'LAZY',
'hoodie.write.lock.provider': 'org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider',
'hoodie.write.lock.dynamodb.table': 'hudi-lock-table',  # DynamoDB table name for locking
'hoodie.write.lock.dynamodb.region': curr_region,  # DynamoDB region
'hoodie.write.lock.dynamodb.endpoint_url': f'dynamodb.{curr_region}.amazonaws.com',
'hoodie.write.lock.dynamodb.billing_mode': 'PAY_PER_REQUEST',

Error Logs

org.apache.hudi.exception.HoodieException: Unable to load class
    at org.apache.hudi.common.util.ReflectionUtils.getClass(ReflectionUtils.java:58)
    ...
Caused by: java.lang.ClassNotFoundException: org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider
    ...

Code

try:
    import os
    import sys
    import uuid
    import pyspark
    import datetime
    from pyspark.sql import SparkSession
    from pyspark import SparkConf, SparkContext
    from faker import Faker
    import datetime
    from datetime import datetime
    import random
    import pandas as pd
    from pyspark.sql.types import StructType, StructField, StringType, DateType, FloatType
    from pyspark.sql.functions import col

    from datetime import datetim

    print("Imports loaded ")

except Exception as e:
    print("error", e)

HUDI_VERSION = '0.14.0'
SPARK_VERSION = '3.4'

os.environ["JAVA_HOME"] = "/opt/homebrew/opt/openjdk@11"
SUBMIT_ARGS = f"--packages org.apache.hudi:hudi-spark{SPARK_VERSION}-bundle_2.12:{HUDI_VERSION} pyspark-shell"
os.environ["PYSPARK_SUBMIT_ARGS"] = SUBMIT_ARGS
os.environ['PYSPARK_PYTHON'] = sys.executable

spark = SparkSession.builder \
    .config('spark.serializer', 'org.apache.spark.serializer.KryoSerializer') \
    .config('spark.sql.extensions', 'org.apache.spark.sql.hudi.HoodieSparkSessionExtension') \
    .config('className', 'org.apache.hudi') \
    .config('spark.sql.hive.convertMetastoreParquet', 'false') \
    .getOrCreate()

schema = StructType([
    StructField("orderID", StringType(), True),
    StructField("productSKU", StringType(), True),
    StructField("customerID", StringType(), True),
    StructField("orderDate", StringType(), True),
    StructField("orderAmount", FloatType(), True),
    StructField("state", StringType(), True)
])

# Create the data array with the additional state value
data = [
    ("order1", "prod001", "cust001", "2024-01-15", 150.00, "CA"),
    ("order002", "prod002", "cust002", "2024-01-16", 200.00, "NY"),
    ("order003", "prod003", "cust003", "2024-01-17", 300.00, "TX"),
    ("order004", "prod004", "cust004", "2024-01-18", 250.00, "FL"),
    ("order005", "prod005", "cust005", "2024-01-19", 100.00, "WA"),
    ("order006", "prod006", "cust006", "2024-01-20", 350.00, "CA"),
    ("order007", "prod007", "cust007", "2024-01-21", 400.00, "NY")
]

# Create the DataFrame
df = spark.createDataFrame(data, schema)

# Show the DataFrame with the new "state" column
df.show()


def write_to_hudi(spark_df,
                  table_name,
                  db_name,
                  method='upsert',
                  table_type='COPY_ON_WRITE',
                  recordkey='',
                  precombine='',
                  partition_fields='',
                  index_type='BLOOM',
                  curr_region='us-east-1'
                  ):
    path = f"file:///Users/soumilshah/IdeaProjects/SparkProject/tem/database={db_name}/table_name={table_name}"

    hudi_options = {
        'hoodie.table.name': table_name,
        'hoodie.datasource.write.table.type': table_type,
        'hoodie.datasource.write.table.name': table_name,
        'hoodie.datasource.write.operation': method,
        'hoodie.datasource.write.recordkey.field': recordkey,
        'hoodie.datasource.write.precombine.field': precombine,
        "hoodie.datasource.write.partitionpath.field": partition_fields,

        'hoodie.write.concurrency.mode': 'optimistic_concurrency_control',
        'hoodie.cleaner.policy.failed.writes': 'LAZY',
        'hoodie.write.lock.provider': 'org.apache.hudi.aws.transaction.lock.DynamoDBBasedLockProvider',
        'hoodie.write.lock.dynamodb.table': 'hudi-lock-table',  # DynamoDB table name for locking
        'hoodie.write.lock.dynamodb.region': curr_region,  # DynamoDB region
        'hoodie.write.lock.dynamodb.endpoint_url': f'dynamodb.{curr_region}.amazonaws.com',
        'hoodie.write.lock.dynamodb.billing_mode': 'PAY_PER_REQUEST',

    }
    print(hudi_options)

    print("\n")
    print(path)
    print("\n")

    spark_df.write.format("hudi"). \
        options(**hudi_options). \
        mode("append"). \
        save(path)


write_to_hudi(
    spark_df=df,
    db_name="default",
    table_name="orders",
    recordkey="orderID",
    precombine="orderDate",
    partition_fields="state",
    index_type="RECORD_INDEX"
)

Metadata

Metadata

Assignees

No one assigned

    Labels

    area:concurrencyConcurrency control and multi-writerpriority:highSignificant impact; potential bugsstatus:triagedIssue has been reviewed and categorized

    Type

    No type

    Projects

    Status

    ✅ Done

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions