In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


25/03/06 21:09:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark.version

'3.3.2'

In [4]:
!ls -lh yellow_tripdata_2024-10.parquet

-rw-rw-r-- 1 ayo ayo 62M Dec 18 21:21 yellow_tripdata_2024-10.parquet


In [5]:
df_yellow = spark.read.parquet('yellow_tripdata_2024-10.parquet')

                                                                                

In [6]:
df_yellow.show()

[Stage 1:>                                                          (0 + 1) / 1]

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2024-10-01 00:30:44|  2024-10-01 00:48:26|              1|          3.0|         1|                 N|         162|         246|           1|       18.4|  1.0|    0.5|       1.

                                                                                

In [8]:
df_yellow = spark.read \
    .option("header", "true") \
    .parquet('yellow_tripdata_2024-10.parquet')


In [9]:
df_yellow.printSchema

<bound method DataFrame.printSchema of DataFrame[VendorID: int, tpep_pickup_datetime: timestamp, tpep_dropoff_datetime: timestamp, passenger_count: bigint, trip_distance: double, RatecodeID: bigint, store_and_fwd_flag: string, PULocationID: int, DOLocationID: int, payment_type: bigint, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double, congestion_surcharge: double, Airport_fee: double]>

In [10]:
df_yellow = df_yellow.repartition(4)

In [11]:
df_yellow.write.parquet('data/homework/yellow/trip_data/2024/10/', mode='overwrite')

                                                                                

**Q3**: How many taxi trips were there on the 15th of October?

In [14]:
from pyspark.sql import functions as F

In [22]:
df_yellow \
    .withColumn('pickup_date', F.to_date(df_yellow.tpep_pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df_yellow.tpep_dropoff_datetime)) \
    .filter("pickup_date = '2024-10-15'") \
    .count()

128893

In [17]:
df_yellow.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'Airport_fee']

In [19]:
df_yellow.registerTempTable('yellow_tripdata_2024_10')



In [21]:
spark.sql("""
SELECT
    COUNT(1)
FROM 
    yellow_tripdata_2024_10
WHERE
    tpep_pickup_datetime BETWEEN '2024-10-15 00:00:00' AND '2024-10-15 23:59:59' 
LIMIT 20;
;
""").show()

+--------+
|count(1)|
+--------+
|  128893|
+--------+



**Q4**: Longest trip for each day

In [23]:
df_yellow.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'Airport_fee']

In [39]:
df_yellow \
    .withColumn('duration', df_yellow.tpep_dropoff_datetime.cast('long') - df_yellow.tpep_pickup_datetime.cast('long')) \
    .withColumn('pickup_date', F.to_date(df_yellow.tpep_pickup_datetime)) \
    .groupBy('pickup_date') \
        .max('duration') \
    .orderBy('max(duration)', ascending=False) \
    .limit(3) \
    .show()

[Stage 86:>                                                         (0 + 4) / 4]

+-----------+-------------+
|pickup_date|max(duration)|
+-----------+-------------+
| 2024-10-16|       585424|
| 2024-10-03|       515970|
| 2024-10-22|       495938|
+-----------+-------------+



                                                                                

In [43]:
spark.sql("""
SELECT
    MAX(((CAST(tpep_dropoff_datetime AS LONG) - CAST(tpep_pickup_datetime AS LONG)) / 60) /60) AS max_duration_HOURS
FROM 
    yellow_tripdata_2024_10
""").show()



+------------------+
|max_duration_HOURS|
+------------------+
| 162.6177777777778|
+------------------+



                                                                                

**Q6** Least frequent pickup location zone

In [44]:
df_zones = spark.read \
    .option("header", "true") \
    .csv('taxi_zone_lookup.csv')

In [46]:
df_zones.schema

StructType([StructField('LocationID', StringType(), True), StructField('Borough', StringType(), True), StructField('Zone', StringType(), True), StructField('service_zone', StringType(), True)])

In [47]:
df_zones.columns

['LocationID', 'Borough', 'Zone', 'service_zone']

In [48]:
df_zones.registerTempTable('zones')

In [50]:
spark.sql("""
SELECT
    CONCAT(pul.Zone, ' / ', ytd.PULocationID) AS pu_do_pair,
    COUNT(1)
FROM 
    yellow_tripdata_2024_10 ytd LEFT JOIN zones pul ON ytd.PULocationID = pul.LocationID
GROUP BY 
    1
ORDER BY
    2 ASC
LIMIT 5;
""").show()

[Stage 114:>                                                        (0 + 4) / 4]

+--------------------+--------+
|          pu_do_pair|count(1)|
+--------------------+--------+
|Governor's Island...|       1|
|   Arden Heights / 5|       2|
| Rikers Island / 199|       2|
|     Jamaica Bay / 2|       3|
|Green-Wood Cemete...|       3|
+--------------------+--------+



                                                                                