#### Read Trip Data 

In [0]:
trip_data_df = spark.read.parquet('/mnt/synapselearningadls/raw/trip_data_green_parquet', inferSchema=True)

In [0]:
display(trip_data_df.limit(10))

VendorID,lpep_pickup_datetime,lpep_dropoff_datetime,store_and_fwd_flag,RatecodeID,PULocationID,DOLocationID,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type,congestion_surcharge,year,month
2,2019-12-18T15:52:30Z,2019-12-18T15:54:39Z,N,1,264,264,5,0.0,3.5,0.5,0.5,0.01,0.0,,0.3,4.81,1,1,0.0,2020,1
2,2020-01-01T00:45:58Z,2020-01-01T00:56:39Z,N,5,66,65,2,1.28,20.0,0.0,0.0,4.06,0.0,,0.3,24.36,1,2,0.0,2020,1
2,2020-01-01T00:41:38Z,2020-01-01T00:52:49Z,N,1,181,228,1,2.47,10.5,0.5,0.5,3.54,0.0,,0.3,15.34,1,1,0.0,2020,1
1,2020-01-01T00:52:46Z,2020-01-01T01:14:21Z,N,1,129,263,2,6.3,21.0,3.25,0.5,0.0,0.0,,0.3,25.05,2,1,2.75,2020,1
1,2020-01-01T00:19:57Z,2020-01-01T00:30:56Z,N,1,210,150,1,2.3,10.0,0.5,0.5,0.0,0.0,,0.3,11.3,1,1,0.0,2020,1
1,2020-01-01T00:52:33Z,2020-01-01T01:09:54Z,N,1,35,39,1,3.0,13.5,0.5,0.5,0.0,0.0,,0.3,14.8,1,1,0.0,2020,1
2,2020-01-01T00:10:18Z,2020-01-01T00:22:16Z,N,1,25,61,1,2.77,11.0,0.5,0.5,0.0,0.0,,0.3,12.3,2,1,0.0,2020,1
2,2020-01-01T01:03:14Z,2020-01-01T01:29:45Z,N,1,225,89,1,4.98,20.5,0.5,0.5,0.0,0.0,,0.3,21.8,2,1,0.0,2020,1
2,2020-01-01T00:04:11Z,2020-01-01T00:09:48Z,N,1,129,129,1,0.71,5.5,0.5,0.5,0.0,0.0,,0.3,6.8,2,1,0.0,2020,1
2,2020-01-01T00:25:52Z,2020-01-01T00:32:16Z,N,1,129,83,1,0.8,5.5,0.5,0.5,0.0,0.0,,0.3,6.8,2,1,0.0,2020,1


#### Read Taxi Zone

In [0]:
taxi_zone_df = spark.read.csv('/mnt/synapselearningadls/raw/taxi_zone.csv', inferSchema=True, header=True)

In [0]:
display(taxi_zone_df.limit(10))

LocationID,Borough,Zone,service_zone
1,EWR,Newark Airport,EWR
2,Queens,Jamaica Bay,Boro Zone
3,Bronx,Allerton/Pelham Gardens,Boro Zone
4,Manhattan,Alphabet City,Yellow Zone
5,Staten Island,Arden Heights,Boro Zone
6,Staten Island,Arrochar/Fort Wadsworth,Boro Zone
7,Queens,Astoria,Boro Zone
8,Queens,Astoria Park,Boro Zone
9,Queens,Auburndale,Boro Zone
10,Queens,Baisley Park,Boro Zone


#### Read Payment Type 

In [0]:
payment_type_df = spark.read.json('/mnt/synapselearningadls/raw/payment_type.json')

In [0]:
display(payment_type_df)

payment_type,payment_type_desc
1,Credit card
2,Cash
3,No charge
4,Dispute
5,Unknown
6,Voided trip


#### Read Calendar

In [0]:
calendar_df = spark.read.csv('/mnt/synapselearningadls/raw/calendar.csv', header=True, inferSchema=True)

In [0]:
display(calendar_df.limit(10))

date_key,date,year,month,day,day_name,day_of_year,week_of_month,week_of_year,month_name,year_month,year_week
20200101,2020-01-01,2020,1,1,Wednesday,1,1,1,January,202001,202001
20200102,2020-01-02,2020,1,2,Thursday,2,1,1,January,202001,202001
20200103,2020-01-03,2020,1,3,Friday,3,1,1,January,202001,202001
20200104,2020-01-04,2020,1,4,Saturday,4,1,1,January,202001,202001
20200105,2020-01-05,2020,1,5,Sunday,5,2,2,January,202001,202002
20200106,2020-01-06,2020,1,6,Monday,6,2,2,January,202001,202002
20200107,2020-01-07,2020,1,7,Tuesday,7,2,2,January,202001,202002
20200108,2020-01-08,2020,1,8,Wednesday,8,2,2,January,202001,202002
20200109,2020-01-09,2020,1,9,Thursday,9,2,2,January,202001,202002
20200110,2020-01-10,2020,1,10,Friday,10,2,2,January,202001,202002


#### Read Trip Type 

In [0]:
trip_type_df = spark.read.csv('/mnt/synapselearningadls/raw/trip_type.tsv', sep=r'\t', header=True)

In [0]:
display(trip_type_df)

trip_type,trip_type_desc
1,Street-hail
2,Dispatch


#### Join and Transform Data 

In [0]:
from pyspark.sql.types import DateType

In [0]:
from pyspark.sql.functions import col, count, when, sum, round, cast

In [0]:
final_taxi_df = trip_data_df.join(taxi_zone_df, trip_data_df.PULocationID == taxi_zone_df.LocationID) \
                .join(calendar_df, trip_data_df.lpep_pickup_datetime.cast(DateType()) == calendar_df.date) \
                .join(payment_type_df, trip_data_df.payment_type == payment_type_df.payment_type) \
                .join(trip_type_df, trip_data_df.trip_type == trip_type_df.trip_type) \
                .groupBy(trip_data_df.year, trip_data_df.month, 'borough', col('lpep_pickup_datetime').cast(DateType()).alias('trip_date'), 'day_name') \
                .agg(sum(when(col('payment_type_desc').ilike('Cash'), 1).otherwise(0)).alias('cash_trip_count'), 
                     sum(when(col('payment_type_desc').ilike('Credit card'), 1).otherwise(0)).alias('card_trip_count'),
                     sum(when(col('trip_type_desc').ilike('Dispatch'), 1).otherwise(0)).alias('despatch_trip_count'), 
                     sum(when(col('trip_type_desc').ilike('Street-hail'), 1).otherwise(0)).alias('street_hail_trip_count'), 
                     round(sum(col('trip_distance')), 2).alias('trip_distance'), 
                     round(sum((col('lpep_dropoff_datetime').cast('long') - col('lpep_pickup_datetime').cast('long'))/60), 0).alias('trip_duration'),
                     round(sum('fare_amount'), 2).alias('fare_amount')) \
                .withColumn('trip_day_week_end_ind', when(col('day_name').isin(['Saturday', 'Sunday']), 'Y').otherwise('N')) \
                .select(trip_data_df.year, trip_data_df.month, 'borough', 'trip_date', 
                        col('day_name').alias('trip_day'), 'trip_day_week_end_ind', 'cash_trip_count', 'card_trip_count', 'despatch_trip_count', 'street_hail_trip_count', 'trip_distance', 'trip_duration', 'fare_amount')

#  For calculating trip_duration in Minutes, I am casting it to long and dividing by 60 because when you cast a timestamp to long in PySpark, it converts the timestamp into Unix timestamp format, which is the number of seconds that have elapsed since 1970-01-01 00:00:00 UTC. Please note that this does not account for leap seconds.

#  This transformation can easily done on sql using `SUM(DATEDIFF(MINUTE, lpep_dropoff_datetime, lpep_pickup_datetime))`

In [0]:
display(final_taxi_df.limit(15))

year,month,borough,trip_date,trip_day,trip_day_week_end_ind,cash_trip_count,card_trip_count,despatch_trip_count,street_hail_trip_count,trip_distance,trip_duration,fare_amount
2020,2,Queens,2020-03-01,Sunday,Y,7,3,0,10,32.0,106.0,118.5
2020,1,Queens,2020-01-10,Friday,N,1809,1689,97,3423,9175.7,59035.0,42050.42
2020,1,Queens,2020-01-09,Thursday,N,1761,1630,102,3314,8541.04,61778.0,39608.31
2020,2,Manhattan,2020-02-01,Saturday,Y,1983,2221,65,4184,9718.39,68776.0,42592.03
2020,3,Bronx,2020-03-19,Thursday,N,97,87,33,154,665.93,4422.0,3237.08
2020,1,Bronx,2020-01-16,Thursday,N,181,273,58,402,1434.36,8179.0,7426.49
2020,3,Brooklyn,2020-02-19,Wednesday,N,1,0,0,1,7.97,30.0,26.0
2020,1,Staten Island,2020-01-29,Wednesday,N,1,3,0,4,52.07,91.0,146.5
2020,2,Bronx,2020-02-11,Tuesday,N,133,202,52,285,1153.04,7448.0,5824.74
2020,3,Staten Island,2020-03-21,Saturday,Y,0,1,0,1,31.87,44.0,85.0


##### Writing transformed file in processed container

In [0]:
final_taxi_df.write.mode('append').partitionBy('year', 'month').format('delta').save('/mnt/synapselearningadls/processed/trip_data_green')