In [2]:
import pyspark
from pyspark.sql import SparkSession, types
from pyspark.sql import functions as F
from pyspark.sql.functions import sha2, concat_ws

In [3]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [4]:
df_green = spark.read.parquet('data/pq/green/*/*')

In [5]:
df_green.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- lpep_pickup_datetime: timestamp (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- trip_type: integer (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [6]:
df_yellow = spark.read.parquet('data/pq/yellow/*/*')

In [7]:
df_yellow.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



In [8]:
df_green.columns

['VendorID',
 'lpep_pickup_datetime',
 'lpep_dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'ehail_fee',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'trip_type',
 'congestion_surcharge']

In [9]:
df_green = df_green.withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime') \
                    .withColumnRenamed('lpep_dropoff_datetime', 'dropoff_datetime')

In [10]:
df_yellow.columns

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'store_and_fwd_flag',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge']

In [11]:
df_yellow = df_yellow.withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime') \
                      .withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')

In [12]:
set(df_green.columns) & set(df_yellow.columns)

{'DOLocationID',
 'PULocationID',
 'RatecodeID',
 'VendorID',
 'congestion_surcharge',
 'dropoff_datetime',
 'extra',
 'fare_amount',
 'improvement_surcharge',
 'mta_tax',
 'passenger_count',
 'payment_type',
 'pickup_datetime',
 'store_and_fwd_flag',
 'tip_amount',
 'tolls_amount',
 'total_amount',
 'trip_distance'}

In [13]:
common_columns = []
yellow_columns = set(df_yellow.columns)

for col in df_green.columns:
    if col in yellow_columns:
        common_columns.append(col)

In [14]:
common_columns

['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'congestion_surcharge']

In [15]:
df_green_sel = df_green.select(common_columns) \
                        .withColumn('service_type', F.lit('green'))

In [16]:
df_yellow_sel = df_yellow.select(common_columns) \
                        .withColumn('service_type', F.lit('yellow'))

In [17]:
df_trips_data = df_green_sel.unionAll(df_yellow_sel)

In [18]:
df_trips_data = df_trips_data.withColumn('tripid', sha2(concat_ws('VendorID', 'pickup_datetime'), 256)) 

In [19]:
df_trips_data.groupBy('service_type').count().show()

+------------+--------+
|service_type|   count|
+------------+--------+
|       green| 1734051|
|      yellow|24648499|
+------------+--------+



In [20]:
df_trips_data.createOrReplaceTempView("df_trips_data")

In [21]:
df_results = spark.sql("""
                        SELECT 
                        -- Reveneue grouping 
                        PULocationID AS revenue_zone,
                        date_trunc('month', pickup_datetime) AS revenue_month, 
                        
                        service_type, 
                        
                        -- Revenue calculation 
                        SUM(fare_amount) AS revenue_monthly_fare,
                        SUM(extra) AS revenue_monthly_extra,
                        SUM(mta_tax) AS revenue_monthly_mta_tax,
                        SUM(tip_amount) AS revenue_monthly_tip_amount,
                        SUM(tolls_amount) AS revenue_monthly_tolls_amount,
                        SUM(improvement_surcharge) AS revenue_monthly_improvement_surcharge,
                        SUM(total_amount) AS revenue_monthly_total_amount,
                        
                        -- Additional calculations
                        COUNT(tripid) AS total_monthly_trips,
                        AVG(passenger_count) AS avg_monthly_passenger_count,
                        AVG(trip_distance) AS avg_monthly_trip_distance
                        
                        FROM df_trips_data
                        GROUP BY 
                        	revenue_zone,
                        	revenue_month,
                        	service_type
                        --LIMIT 10;
                    """)

In [24]:
df_results.show(vertical=True,n=5)

-RECORD 0----------------------------------------------------
 revenue_zone                          | 20                  
 revenue_month                         | 2020-01-01 00:00:00 
 service_type                          | green               
 revenue_monthly_fare                  | 11375.420000000002  
 revenue_monthly_extra                 | 681.0               
 revenue_monthly_mta_tax               | 131.0               
 revenue_monthly_tip_amount            | 90.62               
 revenue_monthly_tolls_amount          | 479.86000000000024  
 revenue_monthly_improvement_surcharge | 147.29999999999933  
 revenue_monthly_total_amount          | 12923.200000000019  
 total_monthly_trips                   | 545                 
 avg_monthly_passenger_count           | 1.2297872340425533  
 avg_monthly_trip_distance             | 4.872311926605502   
-RECORD 1----------------------------------------------------
 revenue_zone                          | 262                 
 revenue

In [27]:
df_results.coalesce(1).write.parquet('data/report/revenue/', mode='overwrite')

In [28]:
df_results.write.parquet('data/report/revenue/2020/', mode='overwrite')