In [1]:
import pyspark

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()
print(spark)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/03/07 02:57:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/03/07 02:58:00 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
<pyspark.sql.session.SparkSession object at 0x7f5f264fede0>


In [4]:
df = spark.read.parquet(('yellow_tripdata_2024-10.parquet'), header=True)

df.show(1)

                                                                                

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2024-10-01 00:30:44|  2024-10-01 00:48:26|              1|          3.0|         1|                 N|         162|         246|           1|       18.4|  1.0|    0.5|       1.

Q1
Answer - 3.3.2

In [5]:
spark.version

'3.3.2'

Q2
Answer - 25MB

In [6]:
# Repartition Dataframe to 4 partitions and save it to parquet.
df = df.repartition(4)
     


In [7]:
df.write.parquet('yellow_trip_4_partitions', mode='overwrite')

                                                                                

In [8]:
!ls -lh yellow_trip_4_partitions/

total 97M
-rw-r--r-- 1 ars ars   0 Mar  7 02:58 _SUCCESS
-rw-r--r-- 1 ars ars 25M Mar  7 02:58 part-00000-de7bf1e5-6cac-476c-b99d-2470180f103a-c000.snappy.parquet
-rw-r--r-- 1 ars ars 25M Mar  7 02:58 part-00001-de7bf1e5-6cac-476c-b99d-2470180f103a-c000.snappy.parquet
-rw-r--r-- 1 ars ars 25M Mar  7 02:58 part-00002-de7bf1e5-6cac-476c-b99d-2470180f103a-c000.snappy.parquet
-rw-r--r-- 1 ars ars 25M Mar  7 02:58 part-00003-de7bf1e5-6cac-476c-b99d-2470180f103a-c000.snappy.parquet


In [9]:
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)



Q3
Answer - 125,567 (closer to 128893 which i get from query below)

In [10]:
df.createOrReplaceTempView('yellow')

In [11]:
spark.sql("""
SELECT 
    count(*)
FROM 
    yellow
WHERE 
    CAST(tpep_pickup_datetime as date) = '2024-10-15'
""").show()



+--------+
|count(1)|
+--------+
|  128893|
+--------+



                                                                                

Q4
Answer - 162

In [12]:
spark.sql("""
SELECT 
    TIMESTAMPDIFF(HOUR, tpep_pickup_datetime, tpep_dropoff_datetime) AS whole_hours_diff
FROM 
    yellow
WHERE 
    1=1
ORDER BY whole_hours_diff desc
LIMIT 1
""").show()

[Stage 13:>                                                         (0 + 4) / 4]

+----------------+
|whole_hours_diff|
+----------------+
|             162|
+----------------+



                                                                                

Q5
Answer - 4040

--Monitoring
https://spark.apache.org/docs/3.5.4/cluster-overview.html

Q6
Answer - Governor's Island/Ellis Island/Liberty Island

In [13]:
df_lookup = spark.read.csv(('taxi_zone_lookup.csv'), header=True, inferSchema = True)

In [14]:
df_lookup.show(5)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows



In [15]:
df_lookup.printSchema()

root
 |-- LocationID: integer (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [16]:
df_lookup.createOrReplaceTempView('zones')

In [17]:
spark.sql("""
SELECT 
    z.Zone,
    COUNT(*) as trip_count
FROM 
    yellow y
JOIN 
    zones z
ON 
    y.PULocationID = z.LocationID
GROUP BY 
    z.Zone
ORDER BY
    trip_count ASC
LIMIT 
    1
""").show()

[Stage 21:>                                                         (0 + 4) / 4]

+--------------------+----------+
|                Zone|trip_count|
+--------------------+----------+
|Governor's Island...|         1|
+--------------------+----------+



                                                                                