In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/04 21:06:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df = spark.read \
    .option("header", "true") \
    .parquet('yellow_tripdata_2024-10.parquet')

df.show()

                                                                                

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|Airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2024-10-01 00:30:44|  2024-10-01 00:48:26|              1|          3.0|         1|                 N|         162|         246|           1|       18.4|  1.0|    0.5|       1.

In [10]:
df.schema

StructType([StructField('VendorID', IntegerType(), True), StructField('tpep_pickup_datetime', TimestampNTZType(), True), StructField('tpep_dropoff_datetime', TimestampNTZType(), True), StructField('passenger_count', LongType(), True), StructField('trip_distance', DoubleType(), True), StructField('RatecodeID', LongType(), True), StructField('store_and_fwd_flag', StringType(), True), StructField('PULocationID', IntegerType(), True), StructField('DOLocationID', IntegerType(), True), StructField('payment_type', LongType(), True), StructField('fare_amount', DoubleType(), True), StructField('extra', DoubleType(), True), StructField('mta_tax', DoubleType(), True), StructField('tip_amount', DoubleType(), True), StructField('tolls_amount', DoubleType(), True), StructField('improvement_surcharge', DoubleType(), True), StructField('total_amount', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True), StructField('Airport_fee', DoubleType(), True)])

In [5]:
df = df.repartition(4)
df.write.parquet('data_split/')

                                                                                

## How many taxi trips were there on the 15th of October?

In [24]:
df.registerTempTable('trips_data')
df_result = spark.sql("""
SELECT count(*)
FROM
    trips_data
WHERE DATE(tpep_pickup_datetime)="2024-10-15"
""").show()

+--------+
|count(1)|
+--------+
|  128893|
+--------+



In [30]:
df.registerTempTable('trips_data')
df_result = spark.sql("""
SELECT (UNIX_TIMESTAMP(tpep_dropoff_datetime) - UNIX_TIMESTAMP(tpep_pickup_datetime))/3600 AS hours
FROM
    trips_data
ORDER BY hours DESC
LIMIT 3
""").show()



+------------------+
|             hours|
+------------------+
|162.61777777777777|
|           143.325|
|137.76055555555556|
+------------------+



                                                                                

In [42]:
df.registerTempTable('trips_data')
df_result = spark.sql("""
SELECT PULocationID,count(1) as pickup_cnt
FROM
    trips_data
GROUP BY PULocationID
ORDER by pickup_cnt
LIMIT 3
""").show()

+------------+----------+
|PULocationID|pickup_cnt|
+------------+----------+
|         105|         1|
|         199|         2|
|           5|         2|
+------------+----------+



In [44]:
lookup_df = spark.read.option("header", "true").csv('taxi_zone_lookup.csv')

In [46]:
lookup_df.show(5)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows



In [52]:
lookup_df.registerTempTable('lookup_data')
df_result = spark.sql("""
SELECT lookup_data.Zone,count(1) as pickup_cnt
FROM
    trips_data 
JOIN lookup_data 
ON trips_data.PULocationID = lookup_data.LocationID
GROUP BY lookup_data.Zone
ORDER by pickup_cnt
LIMIT 3
""").show()

+--------------------+----------+
|                Zone|pickup_cnt|
+--------------------+----------+
|Governor's Island...|         1|
|       Rikers Island|         2|
|       Arden Heights|         2|
+--------------------+----------+

