In [1]:
import pyspark
import os

WDIR = os.path.abspath(os.path.dirname(""))
print(f"pyspark.__version__: {pyspark.__version__}")
print(WDIR)

pyspark.__version__: 3.5.1
/Users/ronaldfung/Projects/data-engineering-zoomcamp-2024/modules/05-batch


In [19]:
from pyspark.sql import SparkSession
from pyspark.sql import types

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [20]:
schemas = {
    "fhv": types.StructType([
        types.StructField("dispatching_base_num", types.StringType(), True),
        types.StructField("pickup_datetime", types.TimestampType(), True),
        types.StructField("dropoff_datetime", types.TimestampType(), True),
        types.StructField("pulocationid", types.IntegerType(), True),
        types.StructField("dolocationid", types.IntegerType(), True),
        types.StructField("sr_flag", types.StringType(), True),
        types.StructField("affiliated_base_number", types.StringType(), True),
    ])
}


In [None]:
for vehicle_type in ["fhv"]:
    for year in range(2019, 2020): # 2019
        for month in range(10, 11): # 10
            input_path = os.path.join(WDIR, "bucket", "bronze", f"{vehicle_type}", f"{year}", f"{month:02d}")
            df = spark.read \
                    .option("header", "true") \
                    .option("compression", "gzip") \
                    .schema(schemas[vehicle_type]) \
                    .csv(input_path)

            df \
                .repartition(6) \
                .write.parquet(f'bucket/pq/{vehicle_type}/{year}/{month:02d}')

In [45]:
from pyspark.sql.functions import to_date, lit, unix_timestamp, round, trim, concat

In [50]:
df_fhv = spark.read.parquet("bucket/pq/fhv/2019/10")
df_fhv = df_fhv.withColumn('tripid', concat(trim(df_fhv.dispatching_base_num), lit('|'), trim(df_fhv.pickup_datetime.cast('string'))))
df_fhv.show(5)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+--------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|pulocationid|dolocationid|sr_flag|affiliated_base_number|              tripid|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+--------------------+
|              B02546|2019-10-04 12:01:47|2019-10-04 12:15:31|         264|         247|   NULL|                B02546|B02546|2019-10-04...|
|              B00037|2019-10-15 08:55:16|2019-10-15 09:21:04|         264|         155|   NULL|                B00037|B00037|2019-10-15...|
|              B00445|2019-10-07 03:54:41|2019-10-07 04:02:41|         252|         138|   NULL|                B00445|B00445|2019-10-07...|
|     B01711         |2019-10-20 14:53:46|2019-10-20 15:07:34|          73|          92|   NULL|       B01711         |B01711|2019-10-20...|
|            

Question 3 - Count taxi trips on 15th October

In [23]:
df_fhv \
    .where(lit(to_date(df_fhv.pickup_datetime)) == "2019-10-15") \
    .count()

62610

Question 4 - Length of the longest trip

In [61]:
df_fhv \
    .withColumn('diff_in_hours', round((unix_timestamp("dropoff_datetime") - unix_timestamp('pickup_datetime'))/3600, 2)) \
    .orderBy('diff_in_hours', ascending=False) \
    .select(['dispatching_base_num', 'pickup_datetime', 'dropoff_datetime', 'diff_in_hours']) \
    .show(5)

+--------------------+-------------------+-------------------+-------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|diff_in_hours|
+--------------------+-------------------+-------------------+-------------+
|              B02832|2019-10-28 09:00:00|2091-10-28 09:30:00|     631152.5|
|              B02832|2019-10-11 18:00:00|2091-10-11 18:30:00|     631152.5|
|              B02416|2019-10-31 23:46:33|2029-11-01 00:13:00|     87672.44|
|     B00746         |2019-10-01 21:43:42|2027-10-01 21:45:23|     70128.03|
|              B02921|2019-10-17 14:00:00|2020-10-18 00:00:00|       8794.0|
+--------------------+-------------------+-------------------+-------------+
only showing top 5 rows



Question 6 - Least frequent pickup location zone

In [51]:
df_zone_data = spark.read \
    .option("header", "true") \
    .csv(os.path.join(WDIR, "data", 'taxi+_zone_lookup.csv'))
df_zone_data.show(5)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows



In [39]:
df_zone_data.printSchema()

root
 |-- LocationID: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [60]:
df_fhv.select(['tripid', 'pulocationid']) \
    .join(df_zone_data.select(['LocationID', 'Zone']), df_fhv.pulocationid == df_zone_data.LocationID, "left") \
    .groupBy('Zone') \
    .count() \
    .orderBy('count', ascending=True) \
    .show()


+--------------------+-----+
|                Zone|count|
+--------------------+-----+
|         Jamaica Bay|    1|
|Governor's Island...|    2|
| Green-Wood Cemetery|    5|
|       Broad Channel|    8|
|     Highbridge Park|   14|
|        Battery Park|   15|
|Saint Michaels Ce...|   23|
|Breezy Point/Fort...|   25|
|Marine Park/Floyd...|   26|
|        Astoria Park|   29|
|    Inwood Hill Park|   39|
|       Willets Point|   47|
|Forest Park/Highl...|   53|
|  Brooklyn Navy Yard|   57|
|        Crotona Park|   62|
|        Country Club|   77|
|     Freshkills Park|   89|
|       Prospect Park|   98|
|     Columbia Street|  105|
|  South Williamsburg|  110|
+--------------------+-----+
only showing top 20 rows

