# Install Spark and Initiate Spark Session

In [249]:
# Install Pyspark

!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#Check this site for the latest download link https://www.apache.org/dyn/closer.lua/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j

import os
import sys

import findspark
findspark.init()
findspark.find()

import pyspark

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, col, explode, create_map, countDistinct


[33m0% [Working][0m            Get:1 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
[33m0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.36)] [[0m[33m0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.36)] [[0m                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Hit:3 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [784 kB]
Get:5 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
Get:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
Hit:7 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 http://archive.ubuntu.com/ubuntu jammy-backports In

In [250]:
#Start a Spark Session

spark = SparkSession.builder.appName("FormatETL").getOrCreate()
sqlContext = SparkSession(spark)

#Set log level to show error not warning

spark.sparkContext.setLogLevel("ERROR")

# Read and Load CSV

In [251]:
#Load CSV file into dataframe

complaints_csv_df = spark.read.format("csv").option("header","true").option("inferSchema","true").load("Consumer_Complaints.csv")

In [252]:
#Show dataframe schema

complaints_csv_df.printSchema()

root
 |-- Complaint ID: integer (nullable = true)
 |-- Submitted via: string (nullable = true)
 |-- Date submitted: date (nullable = true)
 |-- Date received: date (nullable = true)
 |-- State: string (nullable = true)
 |-- Product: string (nullable = true)
 |-- Sub-product: string (nullable = true)
 |-- Issue: string (nullable = true)
 |-- Sub-issue: string (nullable = true)
 |-- Company public response: string (nullable = true)
 |-- Company response to consumer: string (nullable = true)
 |-- Timely response?: string (nullable = true)



In [253]:
#preview dataframe

complaints_csv_df.show(n=10)

+------------+-------------+--------------+-------------+-----+--------------------+--------------------+--------------------+--------------------+-----------------------+----------------------------+----------------+
|Complaint ID|Submitted via|Date submitted|Date received|State|             Product|         Sub-product|               Issue|           Sub-issue|Company public response|Company response to consumer|Timely response?|
+------------+-------------+--------------+-------------+-----+--------------------+--------------------+--------------------+--------------------+-----------------------+----------------------------+----------------+
|     4848023|     Referral|    2021-10-24|   2021-10-27|   NY|            Mortgage|Conventional home...|Applying for a mo...|                NULL|   Company has respo...|        Closed with expla...|             Yes|
|     3621464|          Web|    2020-04-24|   2020-04-24|   FL|Money transfer, v...|Refund anticipati...|Lost or stolen check|  

In [254]:
#preview dataframe shape

print((complaints_csv_df.count(), len(complaints_csv_df.columns)))

(62516, 12)


In [255]:
# select required columns from dataframe - number of complaints made in each state
from pyspark.sql.functions import count

complaints_state = complaints_csv_df.groupBy('State').agg(count('Complaint ID').alias('Complaints_Count'))

complaints_state = complaints_state.withColumnRenamed("State", "State_Abbrv")

complaints_state.show()

+-----------+----------------+
|State_Abbrv|Complaints_Count|
+-----------+----------------+
|         AZ|            1516|
|         SC|             822|
|         LA|             246|
|         MN|             382|
|         NJ|            2664|
|         DC|             353|
|         OR|             620|
|         VA|            1731|
|         RI|             249|
|         KY|             157|
|         WY|              22|
|         NH|             199|
|         MI|            1395|
|         NV|            1221|
|         WI|             291|
|         ID|             122|
|         CA|           13709|
|         CT|            1097|
|         NE|              83|
|         MT|              70|
+-----------+----------------+
only showing top 20 rows



# Read and Load JSON

In [256]:
#Load JSON file into dataFrame

population_json_df = spark.read.format("json").option("multiline","true").load("us-states-population.json")

In [257]:
#Show dataframe schema

population_json_df.printSchema()

root
 |-- AK: struct (nullable = true)
 |    |-- pop_2014: long (nullable = true)
 |    |-- state: string (nullable = true)
 |-- AL: struct (nullable = true)
 |    |-- pop_2014: long (nullable = true)
 |    |-- state: string (nullable = true)
 |-- AR: struct (nullable = true)
 |    |-- pop_2014: long (nullable = true)
 |    |-- state: string (nullable = true)
 |-- AZ: struct (nullable = true)
 |    |-- pop_2014: long (nullable = true)
 |    |-- state: string (nullable = true)
 |-- CA: struct (nullable = true)
 |    |-- pop_2014: long (nullable = true)
 |    |-- state: string (nullable = true)
 |-- CO: struct (nullable = true)
 |    |-- pop_2014: long (nullable = true)
 |    |-- state: string (nullable = true)
 |-- CT: struct (nullable = true)
 |    |-- pop_2014: long (nullable = true)
 |    |-- state: string (nullable = true)
 |-- DC: struct (nullable = true)
 |    |-- pop_2014: long (nullable = true)
 |    |-- state: string (nullable = true)
 |-- DE: struct (nullable = true)
 |    |--

In [258]:
#preview dataframe
population_json_df.show(n=10)

+----------------+------------------+-------------------+------------------+--------------------+-------------------+--------------------+--------------------+------------------+-------------------+-------------------+-----------------+---------------+----------------+--------------------+------------------+-----------------+-------------------+--------------------+--------------------+-------------------+----------------+-------------------+--------------------+-------------------+--------------------+------------------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+-----------------+--------------------+----------------+-------------------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+---------------+-------------------+-----------------+--------------------+--------------------+--------------------+--------------

In [259]:
#Adding headers to dataframe

population_json_df = population_json_df.withColumn("State", lit("Population"))

population_json_df.show(n=10)


+----------------+------------------+-------------------+------------------+--------------------+-------------------+--------------------+--------------------+------------------+-------------------+-------------------+-----------------+---------------+----------------+--------------------+------------------+-----------------+-------------------+--------------------+--------------------+-------------------+----------------+-------------------+--------------------+-------------------+--------------------+------------------+--------------------+--------------------+-------------------+--------------------+--------------------+--------------------+-----------------+--------------------+----------------+-------------------+-----------------+--------------------+--------------------+--------------------+--------------------+--------------------+-----------------+---------------+-------------------+-----------------+--------------------+--------------------+--------------------+--------------

In [260]:
population_json_df.printSchema()

root
 |-- AK: struct (nullable = true)
 |    |-- pop_2014: long (nullable = true)
 |    |-- state: string (nullable = true)
 |-- AL: struct (nullable = true)
 |    |-- pop_2014: long (nullable = true)
 |    |-- state: string (nullable = true)
 |-- AR: struct (nullable = true)
 |    |-- pop_2014: long (nullable = true)
 |    |-- state: string (nullable = true)
 |-- AZ: struct (nullable = true)
 |    |-- pop_2014: long (nullable = true)
 |    |-- state: string (nullable = true)
 |-- CA: struct (nullable = true)
 |    |-- pop_2014: long (nullable = true)
 |    |-- state: string (nullable = true)
 |-- CO: struct (nullable = true)
 |    |-- pop_2014: long (nullable = true)
 |    |-- state: string (nullable = true)
 |-- CT: struct (nullable = true)
 |    |-- pop_2014: long (nullable = true)
 |    |-- state: string (nullable = true)
 |-- DC: struct (nullable = true)
 |    |-- pop_2014: long (nullable = true)
 |    |-- state: string (nullable = true)
 |-- DE: struct (nullable = true)
 |    |--

In [261]:
#Transpose dataframe using Pandas

population_json_df = population_json_df.toPandas().set_index("State").transpose().reset_index()

population_json_df.head()


State,index,Population
0,AK,"(736732, Alaska)"
1,AL,"(4849377, Alabama)"
2,AR,"(2966369, Arkansas)"
3,AZ,"(6731484, Arizona)"
4,CA,"(38802500, California)"


In [262]:
# Convert Transposed dataframe to Pyspark dataframe

population_json_df = spark.createDataFrame(population_json_df)

population_json_df.printSchema()

root
 |-- index: string (nullable = true)
 |-- Population: struct (nullable = true)
 |    |-- pop_2014: long (nullable = true)
 |    |-- state: string (nullable = true)



In [263]:
# Split Struct structure in dataframe into different columns

population_json_df = population_json_df.select("index","Population.*")


In [264]:
# Preview Transformed dataframe

population_json_df.show(n=10)

+-----+--------+--------------------+
|index|pop_2014|               state|
+-----+--------+--------------------+
|   AK|  736732|              Alaska|
|   AL| 4849377|             Alabama|
|   AR| 2966369|            Arkansas|
|   AZ| 6731484|             Arizona|
|   CA|38802500|          California|
|   CO| 5355866|            Colorado|
|   CT| 3596677|         Connecticut|
|   DC|  658893|District of Columbia|
|   DE|  935614|            Delaware|
|   FL|19893297|             Florida|
+-----+--------+--------------------+
only showing top 10 rows



# Read and Load Parquet

In [276]:
#Load Parquet file into dataFrame
location_parquet_df = spark.read.format("parquet").load("US _States_ Long_Lat.parquet")

In [277]:
#Preview dataframe Schema
location_parquet_df.printSchema()

root
 |-- state&teritory: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- Name: string (nullable = true)



In [278]:
#Preview dataframe

location_parquet_df.show(10)

+--------------+---------+-----------+--------------------+
|state&teritory| latitude|  longitude|                Name|
+--------------+---------+-----------+--------------------+
|            AK|63.588753|-154.493062|              Alaska|
|            AL|32.318231| -86.902298|             Alabama|
|            AR| 35.20105| -91.831833|            Arkansas|
|            AZ|34.048928|-111.093731|             Arizona|
|            CA|36.778261|-119.417932|          California|
|            CO|39.550051|-105.782067|            Colorado|
|            CT|41.603221| -73.087749|         Connecticut|
|            DC|38.905985| -77.033418|District of Columbia|
|            DE|38.910832|  -75.52767|            Delaware|
|            FL|27.664827| -81.515754|             Florida|
+--------------+---------+-----------+--------------------+
only showing top 10 rows



In [279]:
#Count rows
location_parquet_df.count()

61

# Creating Temp Views

In [269]:
complaints_csv_df.createOrReplaceTempView("tempCSV")
population_json_df.createOrReplaceTempView("tempJSON")
location_parquet_df.createOrReplaceTempView("tempParquet")

In [270]:
sqlContext.sql("SELECT * FROM tempCSV LIMIT 10").show()

+------------+-------------+--------------+-------------+-----+--------------------+--------------------+--------------------+--------------------+-----------------------+----------------------------+----------------+
|Complaint ID|Submitted via|Date submitted|Date received|State|             Product|         Sub-product|               Issue|           Sub-issue|Company public response|Company response to consumer|Timely response?|
+------------+-------------+--------------+-------------+-----+--------------------+--------------------+--------------------+--------------------+-----------------------+----------------------------+----------------+
|     4848023|     Referral|    2021-10-24|   2021-10-27|   NY|            Mortgage|Conventional home...|Applying for a mo...|                NULL|   Company has respo...|        Closed with expla...|             Yes|
|     3621464|          Web|    2020-04-24|   2020-04-24|   FL|Money transfer, v...|Refund anticipati...|Lost or stolen check|  

In [271]:
sqlContext.sql("SELECT * FROM tempJSON LIMIT 10").show()

+-----+--------+--------------------+
|index|pop_2014|               state|
+-----+--------+--------------------+
|   AK|  736732|              Alaska|
|   AL| 4849377|             Alabama|
|   AR| 2966369|            Arkansas|
|   AZ| 6731484|             Arizona|
|   CA|38802500|          California|
|   CO| 5355866|            Colorado|
|   CT| 3596677|         Connecticut|
|   DC|  658893|District of Columbia|
|   DE|  935614|            Delaware|
|   FL|19893297|             Florida|
+-----+--------+--------------------+



In [272]:
sqlContext.sql("SELECT * FROM tempParquet LIMIT 10").show()

+--------------+---------+-----------+--------------------+
|state&teritory| latitude|  longitude|                Name|
+--------------+---------+-----------+--------------------+
|            AK|63.588753|-154.493062|              Alaska|
|            AL|32.318231| -86.902298|             Alabama|
|            AR| 35.20105| -91.831833|            Arkansas|
|            AZ|34.048928|-111.093731|             Arizona|
|            CA|36.778261|-119.417932|          California|
|            CO|39.550051|-105.782067|            Colorado|
|            CT|41.603221| -73.087749|         Connecticut|
|            DC|38.905985| -77.033418|District of Columbia|
|            DE|38.910832|  -75.52767|            Delaware|
|            FL|27.664827| -81.515754|             Florida|
+--------------+---------+-----------+--------------------+



# Join data

In [273]:

# Join relevant columns from CSV, JSON and Parquet dataframe to create desired dataframe

joined_df_1 = complaints_state.join(population_json_df, complaints_state['State_Abbrv'] == population_json_df['index'], 'inner').drop('index')

joined_df_1.show(10)



+-----------+----------------+--------+--------------------+
|State_Abbrv|Complaints_Count|pop_2014|               state|
+-----------+----------------+--------+--------------------+
|         AK|             110|  736732|              Alaska|
|         AL|             269| 4849377|             Alabama|
|         AR|             266| 2966369|            Arkansas|
|         AZ|            1516| 6731484|             Arizona|
|         CA|           13709|38802500|          California|
|         CO|             576| 5355866|            Colorado|
|         CT|            1097| 3596677|         Connecticut|
|         DC|             353|  658893|District of Columbia|
|         DE|             268|  935614|            Delaware|
|         FL|            6488|19893297|             Florida|
+-----------+----------------+--------+--------------------+
only showing top 10 rows



In [274]:
joined_df_2 = joined_df_1.join(location_parquet_df, joined_df_1['State_Abbrv'] == location_parquet_df['state&teritory'], 'inner').drop("state&teritory", 'Name')
joined_df_2.show(10)

+-----------+----------------+--------+--------------------+---------+-----------+
|State_Abbrv|Complaints_Count|pop_2014|               state| latitude|  longitude|
+-----------+----------------+--------+--------------------+---------+-----------+
|         AK|             110|  736732|              Alaska|63.588753|-154.493062|
|         AL|             269| 4849377|             Alabama|32.318231| -86.902298|
|         AR|             266| 2966369|            Arkansas| 35.20105| -91.831833|
|         AZ|            1516| 6731484|             Arizona|34.048928|-111.093731|
|         CA|           13709|38802500|          California|36.778261|-119.417932|
|         CO|             576| 5355866|            Colorado|39.550051|-105.782067|
|         CT|            1097| 3596677|         Connecticut|41.603221| -73.087749|
|         DC|             353|  658893|District of Columbia|38.942142| -77.025955|
|         DC|             353|  658893|District of Columbia|38.905985| -77.033418|
|   

In [275]:
final_df = joined_df_2.select("state",'State_Abbrv',"Complaints_Count","pop_2014","latitude", 'longitude')
final_df = final_df.dropDuplicates(['state'])
final_df.show(10)

+--------------------+-----------+----------------+--------+---------+-----------+
|               state|State_Abbrv|Complaints_Count|pop_2014| latitude|  longitude|
+--------------------+-----------+----------------+--------+---------+-----------+
|             Alabama|         AL|             269| 4849377|32.318231| -86.902298|
|              Alaska|         AK|             110|  736732|63.588753|-154.493062|
|             Arizona|         AZ|            1516| 6731484|34.048928|-111.093731|
|            Arkansas|         AR|             266| 2966369| 35.20105| -91.831833|
|          California|         CA|           13709|38802500|36.778261|-119.417932|
|            Colorado|         CO|             576| 5355866|39.550051|-105.782067|
|         Connecticut|         CT|            1097| 3596677|41.603221| -73.087749|
|            Delaware|         DE|             268|  935614|38.910832|  -75.52767|
|District of Columbia|         DC|             353|  658893|38.942142| -77.025955|
|   

# Convert and Store Data

In [232]:
#Storing final dataframe as csv

final_df.write.csv('ETL_df.csv')

In [233]:
#Storing final dataframe as json

final_df.write.json('ETL_df.json')

In [234]:
#Storing final dataframe as parquet

final_df.write.parquet('ETL_df.parquet')