Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ ARG HADOOP_VERSION=3.2.4
ARG AWS_SDK_VERSION=1.11.901
ARG PYSPARK_VERSION=3.3.0
ARG SOURCE_REGION_NAME='us-east-1'
ARG HUDI_VERSION=0.12.2


# yum updates, security updates for zlib, java installation and pyspark installation
Expand All @@ -15,9 +16,11 @@ RUN yum update -y && \
yum -y install yum-plugin-versionlock && \
yum -y versionlock add java-1.8.0-openjdk-1.8.0.352.b08-0.amzn2.0.1.x86_64 && \
yum -y install java-1.8.0-openjdk && \
pip install --upgrade pip && \
pip install pyspark==$PYSPARK_VERSION && \
yum clean all


# setting the environment variable and Spark path
ENV SPARK_HOME="/var/lang/lib/python3.8/site-packages/pyspark"
ENV PATH=$PATH:$SPARK_HOME/bin
Expand All @@ -34,7 +37,8 @@ ENV PATH=$SPARK_HOME/python:$PATH

RUN mkdir $SPARK_HOME/conf && \
echo "SPARK_LOCAL_IP=127.0.0.1" > $SPARK_HOME/conf/spark-env.sh && \
wget -q https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar -P ${SPARK_HOME}/jars/ && \
wget -q https://repo1.maven.org/maven2/org/apache/hadoop/hadoop-aws/${HADOOP_VERSION}/hadoop-aws-${HADOOP_VERSION}.jar -P ${SPARK_HOME}/jars/ && \
wget -q https://repo1.maven.org/maven2/org/apache/hudi/hudi-spark3.3-bundle_2.12/${HUDI_VERSION}/hudi-spark3.3-bundle_2.12-${HUDI_VERSION}.jar -P ${SPARK_HOME}/jars/ && \
wget -q https://repo1.maven.org/maven2/com/amazonaws/aws-java-sdk-bundle/${AWS_SDK_VERSION}/aws-java-sdk-bundle-${AWS_SDK_VERSION}.jar -P ${SPARK_HOME}/jars/

# JAVA_HOME depends upon the java version used
Expand All @@ -56,7 +60,7 @@ RUN chmod -R 755 $SPARK_HOME

# Copy the Pyspark script to container

COPY sparkOnAWSLambda.py ${LAMBDA_TASK_ROOT}
COPY sparkLambdaHandler.py ${LAMBDA_TASK_ROOT}

# calling the Lambda handler
CMD [ "/var/task/sparkOnAWSLambda.lambda_handler" ]
CMD [ "/var/task/sparkLambdaHandler.lambda_handler" ]
Binary file added images/Github-diagram.jpg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
3 changes: 2 additions & 1 deletion sparkOnAWSLambda.py → spark-scripts/sparkOnAWSLambda.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,4 +42,5 @@ def lambda_handler(event, context):


print("Started Writing the CSV file to Target S3 location ", target_path)
df.write.format("csv").save(target_path)
#df.write.format("csv").save(target_path)
df.write.format("hudi").save(target_path)
69 changes: 69 additions & 0 deletions spark-scripts/spark_script_hudi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import current_timestamp
import sys
import os

def spark_script():
print("start...................")

input_path = os.environ['input_path']
target_path = os.environ['output_path']
s3_bucket = os.environ['s3_bucket']

aws_region = os.environ['REGION']
aws_access_key_id = os.environ['ACCESS_KEY_ID']
aws_secret_access_key = os.environ['SECRET_ACCESS_KEY']
session_token = os.environ['SESSION_TOKEN']


input_path = "s3a://"+s3_bucket+"/"+input_path
target_path ="s3a://"+s3_bucket+"/"+target_path

print(" ******* Input path ",input_path)
print(" ******* Target path ",target_path)

spark = SparkSession.builder \
.appName("Spark-on-AWS-Lambda") \
.master("local[*]") \
.config("spark.driver.bindAddress", "127.0.0.1") \
.config("spark.driver.memory", "5g") \
.config("spark.executor.memory", "5g") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
.config("spark.sql.hive.convertMetastoreParquet", "false") \
.config("spark.hadoop.hive.metastore.client.factory.class", "com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory") \
.config("hoodie.meta.sync.client.tool.class", "org.apache.hudi.aws.sync.AwsGlueCatalogSyncTool") \
.config("spark.hadoop.fs.s3a.access.key", aws_access_key_id) \
.config("spark.hadoop.fs.s3a.secret.key", aws_secret_access_key) \
.config("spark.hadoop.fs.s3a.session.token",session_token) \
.config("spark.hadoop.fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.TemporaryAWSCredentialsProvider") \
.enableHiveSupport().getOrCreate()



print("Started Reading the CSV file from S3 location ",input_path)

df=spark.read.option('header','true').csv(input_path)
df = df.withColumn("last_upd_timestamp", current_timestamp())
df.show()

hudi_options = {
'hoodie.table.name': 'customer_table',
'hoodie.datasource.write.recordkey.field': 'Customer_ID',
'hoodie.datasource.write.precombine.field': 'last_upd_timestamp',
'hoodie.insert.shuffle.parallelism': 2,
"hoodie.datasource.hive_sync.enable": "false",
"hoodie.datasource.hive_sync.database": "default",
"hoodie.datasource.hive_sync.table": "customer_table",
"hoodie.datasource.hive_sync.use_jdbc": "false",
"hoodie.datasource.hive_sync.mode": "hms",
"hoodie.write.markers.type":"direct", # It's not advisable to use this configuration. Working on workaround without using this config.
"hoodie.embed.timeline.server":"false" # It's not advisable to use this configuration. Working on workaround without using this config.
}

print("Started Writing the CSV file to Target hudi table ", target_path)
df.write.format("hudi").options(**hudi_options).mode("overwrite").save(target_path)
# df.write.format("csv").save(target_path)

if __name__ == '__main__':
spark_script()
16 changes: 16 additions & 0 deletions sparkLambdaHandler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import boto3
import sys
import os
import subprocess

def lambda_handler(event, context):
print("start...................")
s3_bucket_script = os.environ['SCRIPT_BUCKET']
input_script = os.environ['SPARK_SCRIPT']
s3_client = boto3.client("s3")
s3_client.download_file(s3_bucket_script, input_script, "/tmp/spark_script.py")
# Set the environment variables for the Spark application
os.environ["PYSPARK_SUBMIT_ARGS"] = "--master local pyspark-shell"
#ENV PYSPARK_SUBMIT_ARGS="--master local pyspark-shell"\
# Run the spark-submit command
subprocess.run(["spark-submit", "/tmp/spark_script.py"])