In [12]:
import findspark
findspark.init()

import pyspark  
from pyspark.sql import SparkSession
import os
import pyspark.sql.functions as F

In [13]:
spark = SparkSession\
    .builder\
    .master("local[*]")\
    .appName('HelloWorld')\
    .config("spark.driver.memory", "6G") \
    .config("spark.executor.memory", "6G") \
    .config("spark.driver.maxResultSize", "6G") \
    .getOrCreate()


In [14]:
df_yellow = spark.read.parquet("../resources/datasets/yellow/*/*")
df_green = spark.read.parquet("../resources/datasets/green/*/*")

set(df_yellow.columns) & set(df_green.columns) # intersection of columns using & operator



{'DOLocationID',
 'PULocationID',
 'RatecodeID',
 'VendorID',
 'congestion_surcharge',
 'extra',
 'fare_amount',
 'improvement_surcharge',
 'mta_tax',
 'passenger_count',
 'payment_type',
 'store_and_fwd_flag',
 'tip_amount',
 'tolls_amount',
 'total_amount',
 'trip_distance'}

Rows `lpep_pickup_datetime` and `lpep_dropoff_datetime` are not present in the intersection.  We can rename the columns.

In [15]:
df_green = df_green\
    .withColumnRenamed("lpep_pickup_datetime", "pickup_datetime") \
    .withColumnRenamed("lpep_dropoff_datetime", "dropoff_datetime")

#Same with yellow
df_yellow = df_yellow\
    .withColumnRenamed("tpep_pickup_datetime", "pickup_datetime") \
    .withColumnRenamed("tpep_dropoff_datetime", "dropoff_datetime")

common_columns = set(df_yellow.columns) & set(df_green.columns) # intersection of columns using & operator
print("Common columns: ", common_columns)
print("Not in both: ", set(df_green.columns).symmetric_difference(set(df_yellow.columns))) # symmetric difference of columns using symmetric_difference() method

Common columns:  {'PULocationID', 'passenger_count', 'improvement_surcharge', 'congestion_surcharge', 'payment_type', 'dropoff_datetime', 'tip_amount', 'mta_tax', 'extra', 'VendorID', 'RatecodeID', 'total_amount', 'DOLocationID', 'pickup_datetime', 'tolls_amount', 'trip_distance', 'fare_amount', 'store_and_fwd_flag'}
Not in both:  {'trip_type', 'airport_fee', 'ehail_fee'}


Adding `service_type` column to differentiate the two types of taxi services.

In [16]:
df_yellow = df_yellow.select(*common_columns).withColumn("service_type", F.lit("yellow"))
df_green = df_green.select(*common_columns).withColumn("service_type", F.lit("green"))

Now union the two dataframes

In [17]:
df_trips_data = df_green.unionAll(df_yellow)
print("Yellow count: ", df_yellow.count())
print("Green count: ", df_green.count())
print("Union count: ", df_trips_data.count())

print("Alternative way: ")
df_trips_data.groupBy("service_type").count().show()

Yellow count:  84598444
Green count:  6300985
Union count:  90899429
Alternative way: 
+------------+--------+
|service_type|   count|
+------------+--------+
|       green| 6300985|
|      yellow|84598444|
+------------+--------+



### Loading dim zones for use later in joins.


In [18]:
dim_zones  = spark.read.csv("../resources/datasets/taxi+_zone_lookup.csv", header=True)
dim_zones.show(5)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows



## Temporary Tables
Pyspark can create temporary tables from dataframes.  These tables are only available to the current session.

Note: SQL statement is copied and modified from `fact_trips.sql` and `dim_monthly_zone_revenue.sql` from Week 4.

In [19]:
df_trips_data.createOrReplaceTempView('trips_data')
dim_zones.createOrReplaceTempView('dim_zones')

In [20]:
#Equivalent to fact_trips.sql
fact_trips_sql = spark.sql("""
SELECT 
    -- trips_data.tripid,
    trips_data.vendorid,
    trips_data.service_type,
    trips_data.ratecodeid,
    trips_data.PULocationID,
    pickup_zone.borough AS pickup_borough,
    pickup_zone.zone AS pickup_zone,
    trips_data.DOLocationID,
    dropoff_zone.borough AS dropoff_borough,
    dropoff_zone.zone AS dropoff_zone,

    trips_data.pickup_datetime,
    trips_data.dropoff_datetime,
    trips_data.store_and_fwd_flag,
    trips_data.passenger_count,
    trips_data.trip_distance,
    -- trips_data.trip_type, 
    trips_data.fare_amount, 
    trips_data.extra, 
    trips_data.mta_tax, 
    trips_data.tip_amount, 
    trips_data.tolls_amount, 
    -- trips_data.ehail_fee, 
    trips_data.improvement_surcharge, 
    trips_data.total_amount, 
    trips_data.payment_type, 
    -- trips_data.payment_type_description, 
    trips_data.congestion_surcharge
FROM trips_data
INNER JOIN dim_zones AS pickup_zone
    ON trips_data.PULocationID = pickup_zone.locationid
INNER JOIN dim_zones AS dropoff_zone
    ON trips_data.DOLocationID = dropoff_zone.locationid
""")
fact_trips_sql.createOrReplaceTempView('fact_trips')

In [21]:
#Equivalent to dm_monthly_zone_revenue.sql
dm_monthly_zone_revenue_sql = spark.sql("""
SELECT
    pickup_zone AS revenue_zone,
    MONTH(pickup_datetime) AS month,
    service_type,

    SUM(fare_amount) AS revenue_monthly_fare,
    SUM(extra) AS revenue_monthly_extra,
    SUM(mta_tax) AS revenue_monthly_mta_tax,
    SUM(tip_amount) AS revenue_monthly_tip_amount,
    SUM(tolls_amount) AS revenue_monthly_tolls_amount,
    -- SUM(ehail_fee) AS revenue_monthly_ehail_fee,
    SUM(improvement_surcharge) AS revenue_monthly_improvement_surcharge,
    SUM(total_amount) AS revenue_monthly_total_amount,
    SUM(congestion_surcharge) AS revenue_monthly_congestion_surcharge,

    COUNT(*) AS total_monthly_trips,
    AVG(pASsenger_count) AS avg_montly_pASsenger_count,
    AVG(trip_distance) AS avg_montly_trip_distance

FROM fact_trips
GROUP BY pickup_zone, MONTH(pickup_datetime), service_type
ORDER BY pickup_zone, MONTH(pickup_datetime), service_type""")

dm_monthly_zone_revenue_sql.write.option("header", "true").parquet("../resources/datasets/dm_monthly_zone_revenue.parquet")


AnalysisException: path file:/d:/Educational Others/2023 Data Engineering Zoomcamp/resources/datasets/dm_monthly_zone_revenue.parquet already exists.

In [None]:
# monthly_sql = spark.sql("SELECT revenue_zone, month, service_type, revenue_monthly_fare, avg_montly_trip_distance FROM dm_monthly_zone_revenue")
dm_monthly_zone_revenue_sql.show(5)
dm_monthly_zone_revenue_sql.coalesce(1).write.parquet('../resources/datasets/monthly_zone_revenue.parquet', mode='overwrite')


+--------------------+-----+------------+--------------------+---------------------+-----------------------+--------------------------+----------------------------+-------------------------------------+----------------------------+------------------------------------+-------------------+--------------------------+------------------------+
|        revenue_zone|month|service_type|revenue_monthly_fare|revenue_monthly_extra|revenue_monthly_mta_tax|revenue_monthly_tip_amount|revenue_monthly_tolls_amount|revenue_monthly_improvement_surcharge|revenue_monthly_total_amount|revenue_monthly_congestion_surcharge|total_monthly_trips|avg_montly_pASsenger_count|avg_montly_trip_distance|
+--------------------+-----+------------+--------------------+---------------------+-----------------------+--------------------------+----------------------------+-------------------------------------+----------------------------+------------------------------------+-------------------+--------------------------+---