Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EMR CLI run throws error #24

Open
soumilshah1995 opened this issue Aug 31, 2023 · 8 comments
Open

EMR CLI run throws error #24

soumilshah1995 opened this issue Aug 31, 2023 · 8 comments

Comments

@soumilshah1995
Copy link

Hello I am using Mac OS running Docker container


emr run \
--entry-point entrypoint.py \
--application-id <ID> \
--job-role arn:aws:iam::043916019468:role/AmazonEMR-ExecutionRole-1693489747108 \
--s3-code-uri s3://jXXXv/emr_scripts/ \
--spark-submit-opts "--conf spark.jars=/usr/lib/hudi/hudi-spark-bundle.jar --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.hadoop.hive.metastore.client.factory.class=com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory"  \
--build \
--wait

Error

[emr-cli]: Packaging assets into dist/
[+] Building 23.2s (10/15)                                                                                                                                                                                docker:desktop-linux
 => [internal] load build definition from Dockerfile                                                                                                                                                                      0.0s
 => => transferring dockerfile: 2.76kB                                                                                                                                                                                    0.0s
 => [internal] load .dockerignore                                                                                                                                                                                         0.0s
 => => transferring context: 83B                                                                                                                                                                                          0.0s
 => [internal] load metadata for docker.io/library/amazonlinux:2                                                                                                                                                          0.6s
 => [auth] library/amazonlinux:pull token for registry-1.docker.io                                                                                                                                                        0.0s
 => [base 1/7] FROM docker.io/library/amazonlinux:2@sha256:e218c279c7954a94c6f4c8ab106c3ea389675d429feec903bc4c93fa66ed4fd0                                                                                               0.0s
 => [internal] load build context                                                                                                                                                                                         0.0s
 => => transferring context: 486B                                                                                                                                                                                         0.0s
 => CACHED [base 2/7] RUN yum install -y python3 tar gzip                                                                                                                                                                 0.0s
 => CACHED [base 3/7] RUN python3 -m venv /opt/venv                                                                                                                                                                       0.0s
 => CACHED [base 4/7] RUN python3 -m pip install --upgrade pip                                                                                                                                                            0.0s
 => ERROR [base 5/7] RUN curl -sSL https://install.python-poetry.org | python3 -                                                                                                                                         22.6s
------
 > [base 5/7] RUN curl -sSL https://install.python-poetry.org | python3 -:
22.53 Retrieving Poetry metadata
22.53 
22.53 # Welcome to Poetry!
22.53 
22.53 This will download and install the latest version of Poetry,
22.53 a dependency and package manager for Python.
22.53 
22.53 It will add the `poetry` command to Poetry's bin directory, located at:
22.53 
22.53 /root/.local/bin
22.53 
22.53 You can uninstall at any time by executing this script with the --uninstall option,
22.53 and these changes will be reverted.
22.53 
22.53 Installing Poetry (1.6.1)
22.53 Installing Poetry (1.6.1): Creating environment
22.53 Installing Poetry (1.6.1): Installing Poetry
22.53 Installing Poetry (1.6.1): An error occurred. Removing partial environment.
22.53 Poetry installation failed.
22.53 See /poetry-installer-error-ulqu13bo.log for error logs.
------
Dockerfile:32
--------------------
  30 |     
  31 |     RUN python3 -m pip install --upgrade pip
  32 | >>> RUN curl -sSL https://install.python-poetry.org | python3 -
  33 |     
  34 |     ENV PATH="$PATH:/root/.local/bin"
--------------------
ERROR: failed to solve: process "/bin/sh -c curl -sSL https://install.python-poetry.org | python3 -" did not complete successfully: exit code: 1
Traceback (most recent call last):
  File "/Users/soumilnitinshah/IdeaProjects/DemoProject/venv/bin/emr", line 8, in <module>
    sys.exit(cli())
             ^^^^^
  File "/Users/soumilnitinshah/IdeaProjects/DemoProject/venv/lib/python3.11/site-packages/click/core.py", line 829, in __call__
    return self.main(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/soumilnitinshah/IdeaProjects/DemoProject/venv/lib/python3.11/site-packages/click/core.py", line 782, in main
    rv = self.invoke(ctx)
         ^^^^^^^^^^^^^^^^
  File "/Users/soumilnitinshah/IdeaProjects/DemoProject/venv/lib/python3.11/site-packages/click/core.py", line 1259, in invoke
    return _process_result(sub_ctx.command.invoke(sub_ctx))
                           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/soumilnitinshah/IdeaProjects/DemoProject/venv/lib/python3.11/site-packages/click/core.py", line 1066, in invoke
    return ctx.invoke(self.callback, **ctx.params)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/soumilnitinshah/IdeaProjects/DemoProject/venv/lib/python3.11/site-packages/click/core.py", line 610, in invoke
    return callback(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/soumilnitinshah/IdeaProjects/DemoProject/venv/lib/python3.11/site-packages/click/decorators.py", line 33, in new_func
    return f(get_current_context().obj, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/soumilnitinshah/IdeaProjects/DemoProject/venv/lib/python3.11/site-packages/emr_cli/emr_cli.py", line 238, in run
    p.build()
  File "/Users/soumilnitinshah/IdeaProjects/DemoProject/venv/lib/python3.11/site-packages/emr_cli/packaging/python_project.py", line 56, in build

@soumilshah1995
Copy link
Author

we need to do this in docker file


# FIX
RUN curl -sSL https://install.python-poetry.org | python3 - --version 1.1.11
# REMOVE THIS 
# RUN curl -sSL https://install.python-poetry.org | python3 -

@dacort
Copy link
Contributor

dacort commented Aug 31, 2023

Thanks @soumilshah1995 for filing. Yea, I noticed in #23 that something changed with Poetry 1.6.x and it no longer installs on Python 3.7, even though 3.7 support wasn't (officially) deprecated until 1.6.1 I think.

That said, 1.5.x still works but I should fix this in the Dockerfile template.

@soumilshah1995
Copy link
Author

There is some more issue

I have been working on project where I am trying to use boto3 and creating SQS client
looks like due to python 3.7 which was dedicated when using EMR cli with SQS client it throws error and shows warning on logs. suggestion would be to upgrade the python version in template as well.


/home/hadoop/environment/lib64/python3.7/site-packages/boto3/compat.py:82: PythonDeprecationWarning: Boto3 will no longer support Python 3.7 starting December 13, 2023. To continue receiving service updates, bug fixes, and security updates please upgrade to Python 3.8 or later. More information can be found here: https://aws.amazon.com/blogs/developer/python-support-policy-updates-for-aws-sdks-and-tools/
  warnings.warn(warning, PythonDeprecationWarning)

@dacort
Copy link
Contributor

dacort commented Sep 6, 2023

Yea, the CLI maintains compatibility with 3.7 today because EMR still uses 3.7. Once that changes, I'll deprecate 3.7 support.

@soumilshah1995
Copy link
Author

Any suggestion on how to fix the issue as I wanted to use Boto3 SQS client looks like due to older python version it does not work as intended any suggestion would be great

@dacort
Copy link
Contributor

dacort commented Sep 11, 2023

That's just a warning, right? The error message indicates boto3 updates will no longer be available after December, but for now you should be fine.

Does that warning message cause any errors?

@soumilshah1995
Copy link
Author

Yes I am not able to POLL my SQS queue for messages
I was using S3 Events + SQS Consuming Messages in EMR

I tried executing code in EMR Serverless using EMR CLI and I had really tough time to get it to work due to boto3 SQS client not working correctly I was using EMR 6.9

Code

import sys, json
from pyspark.sql import SparkSession
import boto3

spark = SparkSession \
    .builder \
    .appName("hudi_cow") \
    .getOrCreate()

spark = (
    SparkSession.builder.appName("SparkSQL")
    .enableHiveSupport()
    .getOrCreate()
)


class Poller:
    def __init__(self, queue_url):
        self.queue_url = queue_url
        print("IN CLASS")
        self.sqs_client = boto3.client('sqs'
                                       )
        print("Creating client object BOTO 3")
        self.batch_size = 10
        self.messages_to_delete = []

    def get_messages(self, batch_size):
        response = self.sqs_client.receive_message(
            QueueUrl=self.queue_url,
            MaxNumberOfMessages=batch_size,
            WaitTimeSeconds=20
        )

        if 'Messages' in response:
            messages = response['Messages']
            for message in messages:
                print("Message", message)
                self.messages_to_delete.append({
                    'ReceiptHandle': message['ReceiptHandle'],
                    'Body': message['Body']
                })
            return messages
        else:
            return []

    def commit(self):
        for message in self.messages_to_delete:
            self.sqs_client.delete_message(
                QueueUrl=self.queue_url,
                ReceiptHandle=message['ReceiptHandle']
            )
        self.messages_to_delete = []


def read_data_s3(path, format):
    if format == "parquet" or format == "json":
        spark_df = spark.read.json(path)

        print(spark_df.show())

        return spark_df


def upsert_hudi_table(glue_database, table_name, record_id, precomb_key, table_type, spark_df, partition_fields,
                      enable_partition, enable_cleaner, enable_hive_sync, enable_clustering,
                      enable_meta_data_indexing,
                      use_sql_transformer, sql_transformer_query,
                      target_path, index_type, method='upsert', clustering_column='default'):
    """
    Upserts a dataframe into a Hudi table.

    Args:
        glue_database (str): The name of the glue database.
        table_name (str): The name of the Hudi table.
        record_id (str): The name of the field in the dataframe that will be used as the record key.
        precomb_key (str): The name of the field in the dataframe that will be used for pre-combine.
        table_type (str): The Hudi table type (e.g., COPY_ON_WRITE, MERGE_ON_READ).
        spark_df (pyspark.sql.DataFrame): The dataframe to upsert.
        partition_fields this is used to parrtition data
        enable_partition (bool): Whether or not to enable partitioning.
        enable_cleaner (bool): Whether or not to enable data cleaning.
        enable_hive_sync (bool): Whether or not to enable syncing with Hive.
        use_sql_transformer (bool): Whether or not to use SQL to transform the dataframe before upserting.
        sql_transformer_query (str): The SQL query to use for data transformation.
        target_path (str): The path to the target Hudi table.
        method (str): The Hudi write method to use (default is 'upsert').
        index_type : BLOOM or GLOBAL_BLOOM
    Returns:
        None
    """
    # These are the basic settings for the Hoodie table
    hudi_final_settings = {
        "hoodie.table.name": table_name,
        "hoodie.datasource.write.table.type": table_type,
        "hoodie.datasource.write.operation": method,
        "hoodie.datasource.write.recordkey.field": record_id,
        "hoodie.datasource.write.precombine.field": precomb_key,
    }

    # These settings enable syncing with Hive
    hudi_hive_sync_settings = {
        "hoodie.parquet.compression.codec": "gzip",
        "hoodie.datasource.hive_sync.enable": "true",
        "hoodie.datasource.hive_sync.database": glue_database,
        "hoodie.datasource.hive_sync.table": table_name,
        "hoodie.datasource.hive_sync.partition_extractor_class": "org.apache.hudi.hive.MultiPartKeysValueExtractor",
        "hoodie.datasource.hive_sync.use_jdbc": "false",
        "hoodie.datasource.hive_sync.mode": "hms",
    }

    # These settings enable automatic cleaning of old data
    hudi_cleaner_options = {
        "hoodie.clean.automatic": "true",
        "hoodie.clean.async": "true",
        "hoodie.cleaner.policy": 'KEEP_LATEST_FILE_VERSIONS',
        "hoodie.cleaner.fileversions.retained": "3",
        "hoodie-conf hoodie.cleaner.parallelism": '200',
        'hoodie.cleaner.commits.retained': 5
    }

    # These settings enable partitioning of the data
    partition_settings = {
        "hoodie.datasource.write.partitionpath.field": partition_fields,
        "hoodie.datasource.hive_sync.partition_fields": partition_fields,
        "hoodie.datasource.write.hive_style_partitioning": "true",
    }

    hudi_clustering = {
        "hoodie.clustering.execution.strategy.class": "org.apache.hudi.client.clustering.run.strategy.SparkSortAndSizeExecutionStrategy",
        "hoodie.clustering.inline": "true",
        "hoodie.clustering.plan.strategy.sort.columns": clustering_column,
        "hoodie.clustering.plan.strategy.target.file.max.bytes": "1073741824",
        "hoodie.clustering.plan.strategy.small.file.limit": "629145600"
    }

    # Define a dictionary with the index settings for Hudi
    hudi_index_settings = {
        "hoodie.index.type": index_type,  # Specify the index type for Hudi
    }

    # Define a dictionary with the Fiel Size
    hudi_file_size = {
        "hoodie.parquet.max.file.size": 512 * 1024 * 1024,  # 512MB
        "hoodie.parquet.small.file.limit": 104857600,  # 100MB
    }

    hudi_meta_data_indexing = {
        "hoodie.metadata.enable": "true",
        "hoodie.metadata.index.async": "false",
        "hoodie.metadata.index.column.stats.enable": "true",
        "hoodie.metadata.index.check.timeout.seconds": "60",
        "hoodie.write.concurrency.mode": "optimistic_concurrency_control",
        "hoodie.write.lock.provider": "org.apache.hudi.client.transaction.lock.InProcessLockProvider"
    }

    if enable_meta_data_indexing == True or enable_meta_data_indexing == "True" or enable_meta_data_indexing == "true":
        for key, value in hudi_meta_data_indexing.items():
            hudi_final_settings[key] = value  # Add the key-value pair to the final settings dictionary

    if enable_clustering == True or enable_clustering == "True" or enable_clustering == "true":
        for key, value in hudi_clustering.items():
            hudi_final_settings[key] = value  # Add the key-value pair to the final settings dictionary

    # Add the Hudi index settings to the final settings dictionary
    for key, value in hudi_index_settings.items():
        hudi_final_settings[key] = value  # Add the key-value pair to the final settings dictionary

    for key, value in hudi_file_size.items():
        hudi_final_settings[key] = value  # Add the key-value pair to the final settings dictionary

    # If partitioning is enabled, add the partition settings to the final settings
    if enable_partition == "True" or enable_partition == "true" or enable_partition == True:
        for key, value in partition_settings.items(): hudi_final_settings[key] = value

    # If data cleaning is enabled, add the cleaner options to the final settings
    if enable_cleaner == "True" or enable_cleaner == "true" or enable_cleaner == True:
        for key, value in hudi_cleaner_options.items(): hudi_final_settings[key] = value

    # If Hive syncing is enabled, add the Hive sync settings to the final settings
    if enable_hive_sync == "True" or enable_hive_sync == "true" or enable_hive_sync == True:
        for key, value in hudi_hive_sync_settings.items(): hudi_final_settings[key] = value

    # If there is data to write, apply any SQL transformations and write to the target path
    if spark_df.count() > 0:
        if use_sql_transformer == "True" or use_sql_transformer == "true" or use_sql_transformer == True:
            spark_df.createOrReplaceTempView("temp")
            spark_df = spark.sql(sql_transformer_query)

        spark_df.write.format("hudi"). \
            options(**hudi_final_settings). \
            mode("append"). \
            save(target_path)


def process_message(messages, file_format='json'):
    try:
        batch_files = []

        for message in messages:
            payload = json.loads(message['Body'])
            records = payload['Records']
            s3_files = [f"s3://{record['s3']['bucket']['name']}/{record['s3']['object']['key']}" for record in records]
            print('s3_files', s3_files)

            for item in s3_files: batch_files.append(item)

        if batch_files != []:
            spark_df = read_data_s3(
                path=batch_files,
                format=file_format
            )
            print("**************")
            spark_df.show()
            print("**************")

            upsert_hudi_table(
                glue_database="hudidb",
                table_name="customers",
                record_id="customer_id",
                precomb_key="ts",
                table_type='COPY_ON_WRITE',
                partition_fields="state",
                method='upsert',
                index_type='BLOOM',
                enable_partition=True,
                enable_cleaner=True,
                enable_hive_sync=False,
                enable_clustering=False,
                clustering_column='default',
                enable_meta_data_indexing='true',
                use_sql_transformer=False,
                sql_transformer_query='default',
                target_path="s3://datateam-sandbox-qa-demo/silver/table_name=customers/",
                spark_df=spark_df,
            )

    except Exception as e:
        print("Error processing message:", e)
        raise Exception("Error processing message:", e)


def run_job():
    print("IN")
    queue_url = '<URL>'
    print(queue_url)
    print("*")
    poller = Poller(queue_url)
    print("Eneteing while loop")


    while True:
        messages = poller.get_messages(poller.batch_size)
        if not messages:
            print("No messages to process. Exiting.")
            break
        else:
            process_message(messages)

        poller.commit()


run_job()

I could not get SQS Client for BOTO3 to work for some reason

@dacort
Copy link
Contributor

dacort commented Sep 11, 2023

Ah got it. Does your EMR Serverless job error out at all? I haven't made any SQS samples, but might be able to check tomorrow.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants