## Module 5: Batch Processing

#### Download Data

In [1]:
#!wget https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-10.parquet

In [2]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('lesson1') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/05 19:51:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


#### Question 1: Install Spark and PySpark


In [3]:
spark.version

'3.5.5'

In [4]:
df = spark.read.option("header", "true").parquet("yellow_tripdata_2024-10.parquet")

                                                                                

In [5]:
# df.schema
print (df.count())
df.printSchema()

3833771
root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



In [6]:
df.repartition(4)
df.write.parquet('yellow_tripdata/2024/10/', mode='overwrite')

                                                                                

#### Question 2: Yellow October 2024 
What is the average size of the Parquet (ending with .parquet extension) Files that were created (in MB)? Select the answer which most closely matches.
Ans: 25MB

#### Question 3: Count records


In [7]:
df.head()

Row(VendorID=2, tpep_pickup_datetime=datetime.datetime(2024, 10, 1, 0, 30, 44), tpep_dropoff_datetime=datetime.datetime(2024, 10, 1, 0, 48, 26), passenger_count=1, trip_distance=3.0, RatecodeID=1, store_and_fwd_flag='N', PULocationID=162, DOLocationID=246, payment_type=1, fare_amount=18.4, extra=1.0, mta_tax=0.5, tip_amount=1.5, tolls_amount=0.0, improvement_surcharge=1.0, total_amount=24.9, congestion_surcharge=2.5, Airport_fee=0.0)

In [9]:
from pyspark.sql import functions as F

cnt = df.filter(F.date_format(F.col("tpep_pickup_datetime"), "yyyy-MM-dd") == "2024-10-15").count()

print(cnt)



128893


                                                                                

#### Question 4: Longest trip

In [10]:
df = df.withColumn('trip_time_hr', (F.unix_timestamp(F.col("tpep_dropoff_datetime")) - F.unix_timestamp(F.col("tpep_pickup_datetime")))/3600)

In [11]:
df.head()

Row(VendorID=2, tpep_pickup_datetime=datetime.datetime(2024, 10, 1, 0, 30, 44), tpep_dropoff_datetime=datetime.datetime(2024, 10, 1, 0, 48, 26), passenger_count=1, trip_distance=3.0, RatecodeID=1, store_and_fwd_flag='N', PULocationID=162, DOLocationID=246, payment_type=1, fare_amount=18.4, extra=1.0, mta_tax=0.5, tip_amount=1.5, tolls_amount=0.0, improvement_surcharge=1.0, total_amount=24.9, congestion_surcharge=2.5, Airport_fee=0.0, trip_time_hr=0.295)

In [12]:
# df_hr_max = df.filter(F.col("trip_time_hr") == F.max(F.col("trip_time_hr")))
df.createOrReplaceTempView("taxi_data")
result_df = spark.sql("SELECT max(trip_time_hr) FROM taxi_data")


In [13]:
result_df.show()

+------------------+
| max(trip_time_hr)|
+------------------+
|162.61777777777777|
+------------------+



#### Question 5: User Interface
Spark’s User Interface which shows the application's dashboard runs on which local port?

_Ans: 4040_

#### Question 6: Least frequent pickup location zone


In [14]:
#!wget https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv

In [15]:
df_zones = spark.read.option("header", "true").csv("taxi_zone_lookup.csv")

In [16]:
df_zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
|         6|Staten Island|Arrochar/Fort Wad...|   Boro Zone|
|         7|       Queens|             Astoria|   Boro Zone|
|         8|       Queens|        Astoria Park|   Boro Zone|
|         9|       Queens|          Auburndale|   Boro Zone|
|        10|       Queens|        Baisley Park|   Boro Zone|
|        11|     Brooklyn|          Bath Beach|   Boro Zone|
|        12|    Manhattan|        Battery Park| Yellow Zone|
|        13|    Manhattan|   Battery Park City| Yellow Zone|
|        14|     Brookly

In [26]:
df_joined = df.join(df_zones, df_zones.LocationID == df.PULocationID, how='outer')
df_joined.createOrReplaceTempView("taxi_data_joined")


In [29]:
df_joined.head()

                                                                                

Row(VendorID=2, tpep_pickup_datetime=datetime.datetime(2024, 10, 1, 11, 30, 21), tpep_dropoff_datetime=datetime.datetime(2024, 10, 1, 11, 30, 25), passenger_count=2, trip_distance=0.0, RatecodeID=5, store_and_fwd_flag='N', PULocationID=1, DOLocationID=1, payment_type=1, fare_amount=115.0, extra=0.0, mta_tax=0.0, tip_amount=23.2, tolls_amount=0.0, improvement_surcharge=1.0, total_amount=139.2, congestion_surcharge=0.0, Airport_fee=0.0, trip_time_hr=0.0011111111111111111, LocationID='1', Borough='EWR', Zone='Newark Airport', service_zone='EWR')

In [23]:
count_df = spark.sql("SELECT count(1) FROM taxi_data_joined")
count_df.show()

[Stage 38:>                                                         (0 + 1) / 1]

+--------+
|count(1)|
+--------+
| 3833775|
+--------+



                                                                                

In [35]:
least_freq_df = spark.sql("""
    SELECT 
        Zone, 
        count(1) cnt
    FROM taxi_data_joined 
    group by Zone
    order by cnt asc
""")



In [37]:
least_freq_df.show(truncate=False)

[Stage 93:>                                                         (0 + 1) / 1]

+---------------------------------------------+---+
|Zone                                         |cnt|
+---------------------------------------------+---+
|Freshkills Park                              |1  |
|Great Kills Park                             |1  |
|Rikers Island                                |2  |
|Arden Heights                                |2  |
|Governor's Island/Ellis Island/Liberty Island|3  |
|Jamaica Bay                                  |3  |
|Green-Wood Cemetery                          |3  |
|Eltingville/Annadale/Prince's Bay            |4  |
|Rossville/Woodrow                            |4  |
|West Brighton                                |4  |
|Port Richmond                                |4  |
|Charleston/Tottenville                       |4  |
|Great Kills                                  |6  |
|Crotona Park                                 |6  |
|Mariners Harbor                              |7  |
|Heartland Village/Todt Hill                  |7  |
|Saint Georg

                                                                                