
# Chapter 4 -> Spark ETL with AWS (S3 bucket)

Task to do 
1. Install required spark libraries
2. Create connection with AWS S3 bucket
3. Read data from S3 bucket and store into dataframe
4. Transform data
5. write data into parquet file 
6. write data into JSON file

Reference:
https://aws.amazon.com/marketplace/pp/prodview-yljvnozhnwige?sr=0-20&ref_=beagle&applicationId=AWSMPContessa#resources

AWS CLI commands <br/>
aws s3 ls --no-sign-request s3://multi-token-completion/human_annotated/ <br/>
aws s3 cp --no-sign-request s3://multi-token-completion/human_annotated/labeled_completions.csv test.csv

In [2]:
# First Load all the required library and also Start Spark Session
# Load all the required library
from pyspark.sql import SparkSession

In [3]:
#Start Spark Session
spark = SparkSession.builder.appName("chapter4")\
       .config("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider") \
       .getOrCreate()
sqlContext = SparkSession(spark)
#Dont Show warning only error
spark.sparkContext.setLogLevel("ERROR")

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/03/13 22:23:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Use AWS CLI to check folder structure

![image.png](attachment:d6c3bdd2-5409-4d0f-ae95-2846b3437eac.png)

You can also download sample parquet file 

![image.png](attachment:f9c91ab9-a4a6-4205-97a7-423c9ff9fea8.png)

2. Create connection with AWS S3 bucket
3. Read data from S3 bucket and store into dataframe

In [13]:
# df = spark.read.csv("s3a://multi-token-completion/human_annotated/labeled_completions.csv").option("header","true")
df = spark.read.format("csv").option("header","true").load("s3a://multi-token-completion/human_annotated/labeled_completions.csv")

In [14]:
df.printSchema()

root
 |-- _c0: string (nullable = true)
 |-- masked_sent: string (nullable = true)
 |-- original_phrase: string (nullable = true)
 |-- suggested_completion: string (nullable = true)
 |-- is_valid: string (nullable = true)
 |-- is_fact: string (nullable = true)
 |-- is_more_abstract: string (nullable = true)
 |-- is_factually_correct: string (nullable = true)
 |-- method: string (nullable = true)



In [15]:
df.show()

[Stage 10:>                                                         (0 + 1) / 1]

+---+--------------------+-------------------+--------------------+--------+-------+----------------+--------------------+-------+
|_c0|         masked_sent|    original_phrase|suggested_completion|is_valid|is_fact|is_more_abstract|is_factually_correct| method|
+---+--------------------+-------------------+--------------------+--------+-------+----------------+--------------------+-------+
|  0|Line B was intend...|the State of Mexico|        buenos aires|       1|      1|             0.0|                 0.0|  t5-3b|
|  1|Line B was intend...|the State of Mexico|      Rio de Janeiro|       1|      1|             0.0|                 0.0|    ilm|
|  2|Line B was intend...|the State of Mexico|              Cuenca|       1|      1|             0.0|                 0.0|   EMAT|
|  3|Line B was intend...|the State of Mexico|            the u.s.|       1|      1|             1.0|                 0.0|t5-base|
|  4|Lloyd Richards, s...|             Raisin|                 day|       1|      1

                                                                                

4. Transform data

In [16]:
print('Register the DataFrame as a SQL temporary view: source')
df.createOrReplaceTempView('tempSource')

Register the DataFrame as a SQL temporary view: source


In [17]:
print('Displaying top 10 rows: ')
display(spark.sql('SELECT * FROM tempSource LIMIT 10'))

Displaying top 10 rows: 


DataFrame[_c0: string, masked_sent: string, original_phrase: string, suggested_completion: string, is_valid: string, is_fact: string, is_more_abstract: string, is_factually_correct: string, method: string]

In [18]:
newdf = spark.sql('SELECT * FROM tempSource LIMIT 1000')

5. write data into parquet file 
6. write data into JSON file

In [19]:
newdf.write.format("parquet").option("compression","snappy").save("parquetdata1",mode='append')

                                                                                

In [20]:
newdf.write.format("csv").option("header","true").save("csvdata1",mode='append')

                                                                                