In [22]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

In [42]:
spark.version

'3.5.5'

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/22 18:09:14 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/03/22 18:09:16 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/03/22 18:09:16 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
25/03/22 18:09:16 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.


In [3]:
df_green = spark.read.parquet('data/pq/green/*/*')

                                                                                

In [4]:
df_green.show(10,False)

                                                                                

+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|VendorID|lpep_pickup_datetime|lpep_dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|payment_type|trip_type|congestion_surcharge|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+
|2       |2020-01-23 13:10:15 |2020-01-23 13:38:16  |N                 |1         |74          |130         |1              |12.77        |36.0       |0.0  |0.5   

In [5]:
df_yellow = spark.read.parquet('data/pq/yellow/*/*')

In [6]:
df_yellow.show(5,False)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|2       |2020-01-06 09:18:38 |2020-01-06 09:33:56  |1              |3.03         |1         |N                 |263         |233         |1           |12.0       |0.0  |0.5    |1.5       |0.0         |0.3                  

In [12]:
df_green = df_green \
    .withColumnRenamed('lpep_pickup_datetime','pickup_datetime') \
    .withColumnRenamed('lpep_dropoff_datetime','dropoff_datetime')

In [13]:
df_yellow = df_yellow \
    .withColumnRenamed('tpep_pickup_datetime','pickup_datetime') \
    .withColumnRenamed('tpep_dropoff_datetime','dropoff_datetime')

In [16]:
#Selecting all columns for both
common_columns = []
yellow_columns = set(df_yellow.columns)
for col in df_green.columns:
    if col in yellow_columns:
        common_columns.append(col)

In [23]:
df_green_sel = df_green.select(common_columns) \
    .withColumn('service_type', F.lit('green'))

In [25]:
df_yellow_sel = df_yellow.select(common_columns) \
    .withColumn('service_type', F.lit('yellow'))

In [27]:
df_trips_data = df_green_sel.unionAll(df_yellow_sel)

In [28]:
df_trips_data.groupBy('service_type').count().show()



+------------+--------+
|service_type|   count|
+------------+--------+
|       green| 2304517|
|      yellow|39649199|
+------------+--------+



                                                                                

In [30]:
df_trips_data.registerTempTable('trips_data')



In [31]:
spark.sql("""
Select service_type, count(1) as count
from trips_data
group by service_type
""").show()



+------------+--------+
|service_type|   count|
+------------+--------+
|       green| 2304517|
|      yellow|39649199|
+------------+--------+



                                                                                

In [55]:
df_result = spark.sql("""
    select 
    -- Revenue grouping 
    PULocationID as revenue_zone,
    date_trunc('month', cast(pickup_datetime as TIMESTAMP)) as revenue_month,
    service_type, 

    -- Revenue calculation 
    sum(fare_amount) as revenue_monthly_fare,
    sum(extra) as revenue_monthly_extra,
    sum(mta_tax) as revenue_monthly_mta_tax,
    sum(tip_amount) as revenue_monthly_tip_amount,
    sum(tolls_amount) as revenue_monthly_tolls_amount,
    sum(improvement_surcharge) as revenue_monthly_improvement_surcharge,
    sum(total_amount) as revenue_monthly_total_amount,

    -- Additional calculations
    avg(passenger_count) as avg_monthly_passenger_count,
    avg(trip_distance) as avg_monthly_trip_distance

    from trips_data
    group by revenue_zone, revenue_month, service_type
""")

In [56]:
df_result.coalesce(1).write.parquet('data/report/revenue/', mode='overwrite')

                                                                                