diff --git a/Dockerfile b/Dockerfile index a2096be..1b3027d 100644 --- a/Dockerfile +++ b/Dockerfile @@ -6,6 +6,7 @@ ARG HADOOP_VERSION=3.2.4 ARG AWS_SDK_VERSION=1.11.901 ARG PYSPARK_VERSION=3.3.0 ARG SOURCE_REGION_NAME='us-east-1' +ARG HUDI_VERSION=0.12.2 # yum updates, security updates for zlib, java installation and pyspark installation @@ -15,9 +16,11 @@ RUN yum update -y && \ yum -y install yum-plugin-versionlock && \ yum -y versionlock add java-1.8.0-openjdk-1.8.0.352.b08-0.amzn2.0.1.x86_64 && \ yum -y install java-1.8.0-openjdk && \ + pip install --upgrade pip && \ pip install pyspark==$PYSPARK_VERSION && \ yum clean all + # setting the environment variable and Spark path ENV SPARK_HOME="/var/lang/lib/python3.8/site-packages/pyspark" ENV PATH=$PATH:$SPARK_HOME/bin @@ -34,7 +37,8 @@ ENV PATH=$SPARK_HOME/python:$PATH RUN mkdir $SPARK_HOME/conf && \ echo "SPARK_LOCAL_IP=127.0.0.1" > $SPARK_HOME/conf/spark-env.sh && \ - wget -q https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar -P ${SPARK_HOME}/jars/ && \ + wget -q https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar -P ${SPARK_HOME}/jars/ && \ + wget -q https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark3.3-bundle_2.12/${HUDI_VERSION}/hudi-spark3.3-bundle_2.12-${HUDI_VERSION}.jar -P ${SPARK_HOME}/jars/ && \ wget -q https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/${AWS_SDK_VERSION}/aws-java-sdk-bundle-${AWS_SDK_VERSION}.jar -P ${SPARK_HOME}/jars/ # JAVA_HOME depends upon the java version used @@ -56,7 +60,7 @@ RUN chmod -R 755 $SPARK_HOME # Copy the Pyspark script to container -COPY sparkOnAWSLambda.py ${LAMBDA_TASK_ROOT} +COPY sparkLambdaHandler.py ${LAMBDA_TASK_ROOT} # calling the Lambda handler -CMD [ "/var/task/sparkOnAWSLambda.lambda_handler" ] \ No newline at end of file +CMD [ "/var/task/sparkLambdaHandler.lambda_handler" ] \ No newline at end of file diff --git a/images/Github-diagram.jpg b/images/Github-diagram.jpg new file mode 100644 index 0000000..d74b93e Binary files /dev/null and b/images/Github-diagram.jpg differ diff --git a/sparkOnAWSLambda.py b/spark-scripts/sparkOnAWSLambda.py similarity index 94% rename from sparkOnAWSLambda.py rename to spark-scripts/sparkOnAWSLambda.py index 97617af..a7049c1 100644 --- a/sparkOnAWSLambda.py +++ b/spark-scripts/sparkOnAWSLambda.py @@ -42,4 +42,5 @@ def lambda_handler(event, context): print("Started Writing the CSV file to Target S3 location ", target_path) - df.write.format("csv").save(target_path) + #df.write.format("csv").save(target_path) + df.write.format("hudi").save(target_path) diff --git a/spark-scripts/spark_script_hudi.py b/spark-scripts/spark_script_hudi.py new file mode 100644 index 0000000..bdad382 --- /dev/null +++ b/spark-scripts/spark_script_hudi.py @@ -0,0 +1,69 @@ +from pyspark.sql import SparkSession +from pyspark.sql.types import * +from pyspark.sql.functions import current_timestamp +import sys +import os + +def spark_script(): + print("start...................") + + input_path = os.environ['input_path'] + target_path = os.environ['output_path'] + s3_bucket = os.environ['s3_bucket'] + + aws_region = os.environ['REGION'] + aws_access_key_id = os.environ['ACCESS_KEY_ID'] + aws_secret_access_key = os.environ['SECRET_ACCESS_KEY'] + session_token = os.environ['SESSION_TOKEN'] + + + input_path = "s3a://"+s3_bucket+"/"+input_path + target_path ="s3a://"+s3_bucket+"/"+target_path + + print(" ******* Input path ",input_path) + print(" ******* Target path ",target_path) + + spark = SparkSession.builder \ + .appName("Spark-on-AWS-Lambda") \ + .master("local[*]") \ + .config("spark.driver.bindAddress", "127.0.0.1") \ + .config("spark.driver.memory", "5g") \ + .config("spark.executor.memory", "5g") \ + .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \ + .config("spark.sql.hive.convertMetastoreParquet", "false") \ + .config("spark.hadoop.hive.metastore.client.factory.class", "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory") \ + .config("hoodie.meta.sync.client.tool.class", "org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool") \ + .config("spark.hadoop.fs.s3a.access.key", aws_access_key_id) \ + .config("spark.hadoop.fs.s3a.secret.key", aws_secret_access_key) \ + .config("spark.hadoop.fs.s3a.session.token",session_token) \ + .config("spark.hadoop.fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider") \ + .enableHiveSupport().getOrCreate() + + + + print("Started Reading the CSV file from S3 location ",input_path) + + df=spark.read.option('header','true').csv(input_path) + df = df.withColumn("last_upd_timestamp", current_timestamp()) + df.show() + + hudi_options = { + 'hoodie.table.name': 'customer_table', + 'hoodie.datasource.write.recordkey.field': 'Customer_ID', + 'hoodie.datasource.write.precombine.field': 'last_upd_timestamp', + 'hoodie.insert.shuffle.parallelism': 2, + "hoodie.datasource.hive_sync.enable": "false", + "hoodie.datasource.hive_sync.database": "default", + "hoodie.datasource.hive_sync.table": "customer_table", + "hoodie.datasource.hive_sync.use_jdbc": "false", + "hoodie.datasource.hive_sync.mode": "hms", + "hoodie.write.markers.type":"direct", # It's not advisable to use this configuration. Working on workaround without using this config. + "hoodie.embed.timeline.server":"false" # It's not advisable to use this configuration. Working on workaround without using this config. + } + + print("Started Writing the CSV file to Target hudi table ", target_path) + df.write.format("hudi").options(**hudi_options).mode("overwrite").save(target_path) + # df.write.format("csv").save(target_path) + +if __name__ == '__main__': + spark_script() \ No newline at end of file diff --git a/sparkLambdaHandler.py b/sparkLambdaHandler.py new file mode 100644 index 0000000..122dcc7 --- /dev/null +++ b/sparkLambdaHandler.py @@ -0,0 +1,16 @@ +import boto3 +import sys +import os +import subprocess + +def lambda_handler(event, context): + print("start...................") + s3_bucket_script = os.environ['SCRIPT_BUCKET'] + input_script = os.environ['SPARK_SCRIPT'] + s3_client = boto3.client("s3") + s3_client.download_file(s3_bucket_script, input_script, "/tmp/spark_script.py") + # Set the environment variables for the Spark application + os.environ["PYSPARK_SUBMIT_ARGS"] = "--master local pyspark-shell" + #ENV PYSPARK_SUBMIT_ARGS="--master local pyspark-shell"\ + # Run the spark-submit command + subprocess.run(["spark-submit", "/tmp/spark_script.py"]) \ No newline at end of file