Skip to content

[SUPPORT] Hudi Bootstrap with METADATA_ONLY with Hive Sync Fails on EMR Serverlkess 6.10 #8565

@soumilshah1995

Description

@soumilshah1995

Step 1 Download the Sample FIles

Link : https://drive.google.com/drive/folders/1BwNEK649hErbsWcYLZhqCWnaXFX3mIsg?usp=share_link

Step 2 : Aditya mentioned to use following Jar files mentioned on ticket #8412

image

Step 3: Submit Job


"""
Download the dataset
https://drive.google.com/drive/folders/1BwNEK649hErbsWcYLZhqCWnaXFX3mIsg?usp=share_link

"""
try:
    import json
    import uuid
    import os
    import boto3
    from dotenv import load_dotenv

    load_dotenv("../.env")
except Exception as e:
    pass

global AWS_ACCESS_KEY
global AWS_SECRET_KEY
global AWS_REGION_NAME

AWS_ACCESS_KEY = os.getenv("DEV_ACCESS_KEY")
AWS_SECRET_KEY = os.getenv("DEV_SECRET_KEY")
AWS_REGION_NAME = os.getenv("DEV_REGION")

client = boto3.client("emr-serverless",
                      aws_access_key_id=AWS_ACCESS_KEY,
                      aws_secret_access_key=AWS_SECRET_KEY,
                      region_name=AWS_REGION_NAME)


def lambda_handler_test_emr(event, context):
    # ------------------Hudi settings ---------------------------------------------
    glue_db = "hudidb"
    table_name = "tbl_invoices_bootstrap"
    op = "UPSERT"
    table_type = "COPY_ON_WRITE"

    record_key = 'invoiceid'
    precombine = "replicadmstimestamp"
    target_path = "s3://delta-streamer-demo-hudi/hudi/"
    raw_path = "s3://delta-streamer-demo-hudi/raw/"
    partition_feild = "destinationstate"
    MODE = 'METADATA_ONLY'  # FULL_RECORD  | METADATA_ONLY

    # ---------------------------------------------------------------------------------
    #                                       EMR
    # --------------------------------------------------------------------------------
    ApplicationId = os.getenv("ApplicationId")
    ExecutionTime = 600
    ExecutionArn = os.getenv("ExecutionArn")
    JobName = 'delta_streamer_bootstrap_{}'.format(table_name)
    jar_path = "s3://delta-streamer-demo-hudi/jar_test/hudi-utilities-bundle_2.12-0.13.0.jar"
    # --------------------------------------------------------------------------------


    spark_submit_parameters = ' --conf spark.serializer=org.apache.spark.serializer.KryoSerializer'
    spark_submit_parameters += ' --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer'

    arguments = [
        '--run-bootstrap',
        "--target-base-path", target_path,
        "--target-table", table_name,
        "--table-type", table_type,
        "--hoodie-conf", "hoodie.bootstrap.base.path={}".format(raw_path),
        "--hoodie-conf", "hoodie.datasource.write.recordkey.field={}".format(record_key),
        "--hoodie-conf", "hoodie.datasource.write.precombine.field={}".format(precombine),
        "--hoodie-conf", "hoodie.datasource.write.partitionpath.field={}".format(partition_feild),
        "--hoodie-conf", "hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator",
        "--hoodie-conf","hoodie.bootstrap.full.input.provider=org.apache.hudi.bootstrap.SparkParquetBootstrapDataProvider",
        "--hoodie-conf","hoodie.bootstrap.mode.selector=org.apache.hudi.client.bootstrap.selector.BootstrapRegexModeSelector",
        "--hoodie-conf", "hoodie.bootstrap.mode.selector.regex.mode={}".format(MODE),
        "--enable-sync",
        "--hoodie-conf", "hoodie.database.name={}".format(glue_db),
        "--hoodie-conf", "hoodie.datasource.hive_sync.enable=true",
        "--hoodie-conf", "hoodie.datasource.hive_sync.table={}".format(table_name),
        "--hoodie-conf", "hoodie.datasource.hive_sync.partition_fields={}".format(partition_feild),

    ]

    response = client.start_job_run(
        applicationId=ApplicationId,
        clientToken=uuid.uuid4().__str__(),
        executionRoleArn=ExecutionArn,
        jobDriver={
            'sparkSubmit': {
                'entryPoint': jar_path,
                'entryPointArguments': arguments,
                'sparkSubmitParameters': spark_submit_parameters
            },
        },
        executionTimeoutMinutes=ExecutionTime,
        name=JobName,
    )
    print("response", end="\n")
    print(response)


lambda_handler_test_emr(context=None, event=None)

Error Logs

image

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    Status

    ✅ Done

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions