# Spark ETL with Files (CSV/JSON/Parquet/Text)

Reference Notebook: 

Reference: https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page

In [1]:
#load the required library
from pyspark.sql import SparkSession

In [2]:
#Start Spaek Ssssion
spark = SparkSession.builder.appName("ETL with files").getOrCreate()

sqlContext = SparkSession(spark)

#Do not show warnings, only show errors
spark.sparkContext.setLogLevel("ERROR")



# The code below reads CSV into a Spark DataFrame

In [9]:
#Load the CSV into a Spark DataFrame
csvdf = spark.read.format("csv").option("header","true").option("inferSchema","true").load("nyc_taxi_zone.csv")

In [10]:
#Checking the dataframe schema
csvdf.printSchema()

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [11]:
#View the first 10 rows of the CSV
csvdf.show(n=10)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 10 rows



# The code below reads a JSON file into a Spark DataFrame

In [15]:
#Load the JSON file into a Spark DataFrame
jsondf = spark.read.format("json").option("multiline","true").load("nyc_taxi_zone.json")

In [12]:
jsondf.printSchema()

root
 |-- Borough: string (nullable = true)
 |-- LocationID: long (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [13]:
jsondf.show(n=10)

+-------------+----------+--------------------+------------+
|      Borough|LocationID|                Zone|service_zone|
+-------------+----------+--------------------+------------+
|          EWR|         1|      Newark Airport|         EWR|
|       Queens|         2|         Jamaica Bay|   Boro Zone|
|        Bronx|         3|Allerton/Pelham G...|   Boro Zone|
|    Manhattan|         4|       Alphabet City| Yellow Zone|
|Staten Island|         5|       Arden Heights|   Boro Zone|
|Staten Island|         6|Arrochar/Fort Wad...|   Boro Zone|
|       Queens|         7|             Astoria|   Boro Zone|
|       Queens|         8|        Astoria Park|   Boro Zone|
|       Queens|         9|          Auburndale|   Boro Zone|
|       Queens|        10|        Baisley Park|   Boro Zone|
+-------------+----------+--------------------+------------+
only showing top 10 rows



#The code below reads a Parquet file into a Spark DataFrame 

In [17]:
#Load the Parquet file into a Spark DataFrame
parquetdf = spark.read.format("parquet").load("yellow_tripdata_2024-01.parquet")

In [18]:
parquetdf.count()

2964624

The code below reads a JSON file into a Spark DataFrame

In [20]:
#Reading a textfile with no options
textdf =spark.read.text("sample.txt")

In [21]:
textdf.show()
#The df lumps all columns into a row

+------------+
|       value|
+------------+
|id,name,Fare|
|  1,Adam,1.5|
|   2,Bob,1.8|
| 3,Chris,2.1|
|4,Daniel,0.7|
| 5,Ethan,3.9|
+------------+



In [24]:
textdf2 = spark.read.option("lineSep",",").text("sample.txt")

In [25]:
textdf2.show()

+-------+
|  value|
+-------+
|     id|
|   name|
|Fare\n1|
|   Adam|
| 1.5\n2|
|    Bob|
| 1.8\n3|
|  Chris|
| 2.1\n4|
| Daniel|
| 0.7\n5|
|  Ethan|
|    3.9|
+-------+



# Create Temp table in Spark Session for the dataframes

In [27]:
csvdf.createOrReplaceTempView("tempCSV_table")
jsondf.createOrReplaceTempView("tempJSON_table")
parquetdf.createOrReplaceTempView("tempParquet_table")
textdf2.createOrReplaceTempView("tempTxt_table")

In [28]:
sqlContext.sql("SELECT * FROM tempCSV_table LIMIT 10").show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
+----------+-------------+--------------------+------------+



In [29]:
sqlContext.sql("SELECT * FROM tempJSON_table LIMIT 10").show()

+-------------+----------+--------------------+------------+
|      Borough|LocationID|                Zone|service_zone|
+-------------+----------+--------------------+------------+
|          EWR|         1|      Newark Airport|         EWR|
|       Queens|         2|         Jamaica Bay|   Boro Zone|
|        Bronx|         3|Allerton/Pelham G...|   Boro Zone|
|    Manhattan|         4|       Alphabet City| Yellow Zone|
|Staten Island|         5|       Arden Heights|   Boro Zone|
|Staten Island|         6|Arrochar/Fort Wad...|   Boro Zone|
|       Queens|         7|             Astoria|   Boro Zone|
|       Queens|         8|        Astoria Park|   Boro Zone|
|       Queens|         9|          Auburndale|   Boro Zone|
|       Queens|        10|        Baisley Park|   Boro Zone|
+-------------+----------+--------------------+------------+



In [30]:
sqlContext.sql("SELECT * FROM tempParquet_table LIMIT 10").show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2024-01-01 00:57:55|  2024-01-01 01:17:43|              1|         1.72|         1|                 N|         186|          79|           2|       17.7|  1.0|    0.5|       0.