importing required libraries, this notebook will run on a databricks workspace

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import urllib

listing the files present in default databricks stack s3 bucket

In [None]:
display(dbutils.fs.ls("/mnt/databricks-workspace-stack-9a37a-bucket"))

path,name,size,modificationTime
dbfs:/mnt/databricks-workspace-stack-9a37a-bucket/mumbai-prod/,mumbai-prod/,0,1696963970756
dbfs:/mnt/databricks-workspace-stack-9a37a-bucket/yellow_tripdata_2022-01.parquet,yellow_tripdata_2022-01.parquet,38139949,1696941884000
dbfs:/mnt/databricks-workspace-stack-9a37a-bucket/yellow_tripdata_2023-01.parquet,yellow_tripdata_2023-01.parquet,47673370,1696941884000


reading the parquet files for jan 2022, jan 2023 to spark data frames

In [None]:
file_type = "parquet"

df_jan2022 = spark.read.format(file_type) \
    .load('/mnt/databricks-workspace-stack-9a37a-bucket/yellow_tripdata_2022-01.parquet')

df_jan2023 = spark.read.format(file_type) \
    .load('/mnt/databricks-workspace-stack-9a37a-bucket/yellow_tripdata_2023-01.parquet')

In [None]:
df_jan2023.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2023-01-01 00:32:10|  2023-01-01 00:40:36|            1.0|         0.97|       1.0|                 N|         161|         141|           2|        9.3|  1.0|    0.5|       0.

doing a union of both dataframes, to write it as a single table to snowflake warehouse table, this will help in creating the analytics dashboard on tableau

In [None]:
union_df= df_jan2022.union(df_jan2023)

In [None]:
union_df.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       1| 2022-01-01 00:35:40|  2022-01-01 00:53:29|            2.0|          3.8|       1.0|                 N|         142|         236|           1|       14.5|  3.0|    0.5|      3.6

In [None]:
union_df.count()

5530697

finding outliers where year is not 2022,2023 and month is not january

In [None]:
union_df.registerTempTable('union_df')

df_outliers = spark.sql("""
                      select count(1) from union_df where  month(tpep_pickup_datetime) !=1 
                      OR year(tpep_pickup_datetime) not in (2022,2023)
                      """)

In [None]:
df_outliers.show()

+--------+
|count(1)|
+--------+
|     100|
+--------+



aggregating columns as per location zone, and year and excluding outliers , transformed using sparkSQL

In [None]:
df_result = spark.sql("""
SELECT 
    -- Reveneue grouping 
    cast(PULocationID as string) AS revenue_zone,

    cast(year(tpep_pickup_datetime) as string) AS revenue_year, 

    -- Revenue calculation 
    ROUND(SUM(fare_amount)) AS revenue_monthly_fare,
    ROUND(SUM(extra)) AS revenue_monthly_extra,
    ROUND(SUM(mta_tax)) AS revenue_monthly_mta_tax,
    ROUND(SUM(tip_amount)) AS revenue_monthly_tip_amount,
    ROUND(SUM(tolls_amount)) AS revenue_monthly_tolls_amount,
    ROUND(SUM(improvement_surcharge)) AS revenue_monthly_improvement_surcharge,
    ROUND(SUM(total_amount)) AS revenue_monthly_total_amount,
    ROUND(SUM(congestion_surcharge)) AS revenue_monthly_congestion_surcharge,

    -- Additional calculations
    ROUND(AVG(passenger_count),2) AS avg_montly_passenger_count,
    ROUND(AVG(trip_distance),2) AS avg_montly_trip_distance
FROM
    union_df where month(tpep_pickup_datetime) ='1' AND year(tpep_pickup_datetime) in ('2022','2023')
GROUP BY
    1, 2
ORDER BY cast(revenue_zone as integer),revenue_year
""")



In [None]:
df_result.show()

+------------+------------+--------------------+---------------------+-----------------------+--------------------------+----------------------------+-------------------------------------+----------------------------+------------------------------------+--------------------------+------------------------+
|revenue_zone|revenue_year|revenue_monthly_fare|revenue_monthly_extra|revenue_monthly_mta_tax|revenue_monthly_tip_amount|revenue_monthly_tolls_amount|revenue_monthly_improvement_surcharge|revenue_monthly_total_amount|revenue_monthly_congestion_surcharge|avg_montly_passenger_count|avg_montly_trip_distance|
+------------+------------+--------------------+---------------------+-----------------------+--------------------------+----------------------------+-------------------------------------+----------------------------+------------------------------------+--------------------------+------------------------+
|           1|        2022|             25341.0|                  9.0|         

defining snowflake options and credentials, and loading to snowflake table

In [None]:
options = {
    "sfURL": "https://wvaatlz-n******.snowflakecomputing.com",
    "sfUser": "******",
    "sfPassword": "******",
    "sfDatabase": "data_eng_project",
    "sfSchema": "nyc_taxi_data",
    "sfWarehouse": "COMPUTE_WH",
}


In [None]:
df_result.write.format("snowflake").options(**options).option("dbtable", "yellow_taxi_revenue_stats").mode("overwrite").save()