In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession\
.builder\
.config("spark.shuffle.useOldfetchProtocol",'true')\
.config("spark.sql.warehouse.dir",f"/user/itv010252/warehouse")\
.config("spark.delta.formatCheck.enabled=True")\
.enableHiveSupport()\
.master('yarn')\
.getOrCreate()

In [None]:
#raw_yellowtaxi
#reading_yellowtaxi
raw_yellow = spark.read\
.load("/user/itv010252/yellow_22/*")

In [None]:
#raw_greentaxi
#reading_greentaxi
raw_green = spark.read\
.load("/user/itv010252/green_22/green_tripdata_2022-01.parquet")

In [None]:
schema = "location_id integer,borough string,zone string,service_zone string"

In [None]:
#reading location name data
location_name = spark.read\
.format("csv")\
.option("header", True)\
.schema(schema)\
.load("/user/itv010252/taxi_zone_lookup.csv")

In [None]:
spark.conf.set("spark.sql.adaptive.enabled",True)

In [None]:
#restructuring green taxi data according to bussines needs

In [None]:
modified_1_green = raw_green.drop("ehail_fee","trip_type")

In [None]:
#Excluded rows which are drop time is not year 2022
modified_2_green = modified_1_green.filter(col("lpep_dropoff_datetime").between("2022-01-01","2023-01-01"))

In [None]:
#Excluded rows where total fare is null as the bussines needs to know total revenu generated
modified_3_green = modified_2_green.filter((col("total_amount")>0) & (col("total_amount").isNotNull()))

In [None]:
#Rearranging the order of the columns and chaning datatypes of fe columns as we need to merger two taxi services
modified_4_green = modified_3_green.select("lpep_pickup_datetime","lpep_dropoff_datetime","PULocationID","DOLocationID","passenger_count","trip_distance","fare_amount",
                                           "extra","mta_tax","tip_amount","tolls_amount","improvement_surcharge","total_amount","payment_type","congestion_surcharge",
                                           "RatecodeID","store_and_fwd_flag","VendorID")

In [None]:
modified_5_green = modified_4_green.withColumn("payment_type", modified_4_green["payment_type"].cast(LongType()))\
.withColumn("Taxi_type",lit("Green_taxi"))\
.withColumn("airport_fee",lit(0.0))\
.withColumnRenamed("lpep_pickup_datetime","tpep_pickup_datetime")\
.withColumnRenamed("lpep_dropoff_datetime","tpep_dropoff_datetime")

In [None]:
#creating new column which are required for revenu calutions and some analysis
modified_6_green = modified_5_green.withColumn("Month",date_format("tpep_pickup_datetime","MMMM"))\
.withColumn("day_of_week",date_format("tpep_pickup_datetime", "EEEE"))\
.withColumn("time",when(((date_format("tpep_pickup_datetime","HH:MM:SS") > "06:00:00") & (date_format("tpep_pickup_datetime","HH:MM:SS") < "11:59:59")), 'Morning')
                                  .when(((date_format("tpep_pickup_datetime","HH:MM:SS") > "12:00:00") & (date_format("tpep_pickup_datetime","HH:MM:SS") < "15:59:59")), 'Noon')
                                  .when(((date_format("tpep_pickup_datetime","HH:MM:SS") > "16:00:00") & (date_format("tpep_pickup_datetime","HH:MM:SS") < "19:59:59")), 'evening')
                                  .when(((date_format("tpep_pickup_datetime","HH:MM:SS") > "20:00:00") & (date_format("tpep_pickup_datetime","HH:MM:SS") < "23:59:59")), 'Night')
                                  .when(((date_format("tpep_pickup_datetime","HH:MM:SS") > "00:00:00") & (date_format("tpep_pickup_datetime","HH:MM:SS") < "3:59:59")), 'Mid_night')
                                  .when(((date_format("tpep_pickup_datetime","HH:MM:SS") > "4:00:00") & (date_format("tpep_pickup_datetime","HH:MM:SS") < "5:59:59")), 'Early_morning')
                                  .otherwise("D"))

In [None]:
#Making similar change for yellow taxi raw dataframe

In [None]:
modified_1_yellow = raw_yellow.filter(col("tpep_dropoff_datetime").between("2022-01-01","2023-01-01"))

In [None]:
modified_2_yellow = modified_1_yellow.filter((col("total_amount")>0) & (col("total_amount").isNotNull()))

In [None]:
modified_3_yellow = modified_2_yellow.withColumn("Taxi_type",lit("yellow_taxi"))\

In [None]:
modified_4_yellow = modified_3_yellow.select("tpep_pickup_datetime","tpep_dropoff_datetime","PULocationID","DOLocationID","passenger_count",
                                             "trip_distance","fare_amount","extra","mta_tax","tip_amount","tolls_amount","improvement_surcharge",
                                             "total_amount","payment_type","congestion_surcharge","RatecodeID","store_and_fwd_flag","VendorID",
                                             "Taxi_type","airport_fee")

In [None]:
modified_5_yellow = modified_4_yellow.withColumn("Month",date_format("tpep_pickup_datetime","MMMM"))\
.withColumn("day_of_week",date_format("tpep_pickup_datetime", "EEEE"))\
.withColumn("time",when(((date_format("tpep_pickup_datetime","HH:MM:SS") > "06:00:00") & (date_format("tpep_pickup_datetime","HH:MM:SS") < "11:59:59")), 'Morning')
                                  .when(((date_format("tpep_pickup_datetime","HH:MM:SS") > "12:00:00") & (date_format("tpep_pickup_datetime","HH:MM:SS") < "15:59:59")), 'Noon')
                                  .when(((date_format("tpep_pickup_datetime","HH:MM:SS") > "16:00:00") & (date_format("tpep_pickup_datetime","HH:MM:SS") < "19:59:59")), 'evening')
                                  .when(((date_format("tpep_pickup_datetime","HH:MM:SS") > "20:00:00") & (date_format("tpep_pickup_datetime","HH:MM:SS") < "23:59:59")), 'Night')
                                  .when(((date_format("tpep_pickup_datetime","HH:MM:SS") > "00:00:00") & (date_format("tpep_pickup_datetime","HH:MM:SS") < "3:59:59")), 'Mid_night')
                                  .when(((date_format("tpep_pickup_datetime","HH:MM:SS") > "4:00:00") & (date_format("tpep_pickup_datetime","HH:MM:SS") < "5:59:59")), 'Early_morning')
                                  .otherwise("D"))

In [None]:
#Grouping and aggregating data to find total revenu genrated month wise for both taxi types

In [None]:
farecollected_green = modified_6_green.groupBy("Month").agg(sum("total_amount").alias("each_month_total"))                                                              

In [None]:
farecollected_yellow = modified_5_yellow.groupBy("Month").agg(sum("total_amount").alias("each_month_total"))

In [None]:
#Filtering ,grouping and aggregating to find total ride done weekend fopr both taxi types

In [None]:
filtered_yellow = modified_5_yellow.filter((col("day_of_week")=="Saturday")|((col("day_of_week")=="Friday") \
& ((col("time") == "evening") |(col("time") == "Night")|(col("time") == "Night")))|(col("day_of_week")=="Sunday"))

In [None]:
Location_total_yellow = filtered_yellow.groupby("PULocationID","day_of_week")\
.agg(sum("passenger_count").alias("total_passengers_yellowtaxi"),count("tpep_pickup_datetime").alias("total_rides_yellowtaxi"))

In [None]:
filtered_green = modified_6_green.filter((col("day_of_week")=="Saturday")|((col("day_of_week")=="Friday") & ((col("time") == "evening") \
|(col("time") == "Night")|(col("time") == "Night")))|(col("day_of_week")=="Sunday"))

In [None]:
Location_total_green = filtered_green.groupby("PULocationID","day_of_week")\
.agg(sum("passenger_count").alias("total_passengers_greentaxi"),count("tpep_pickup_datetime").alias("total_rides_greentaxi"))

In [None]:
#Writing above dataframe as spark table to find bussines insights, 
#as the data is aggregated the data will be in smaller oin size which can be easily queried

In [None]:
Location_total_yellow.write.saveAsTable("Location_total_yellowtaxi")

In [None]:
Location_total_green.write.saveAsTable("Location_total_green")

In [None]:
farecollected_green.write.saveAsTable("farecollected_green")

In [None]:
farecollected_yellow.write.saveAsTable("farecollected_yellow")

In [None]:
location_name.write.saveAsTable("location_name")

In [None]:
spark.sql("use mydb_010252")

In [None]:
spark.sql("show tables").show(truncate=False)

In [None]:
#running sample quries to find business insights

In [None]:
spark.sql("""select y.PULocationID,y.day_of_week,y.total_rides_yellowtaxi,l.zone from Location_total_yellowtaxi as y 
join location_name as l on l.location_id = y.PULocationID order by total_rides_yellowtaxi desc""")\
.show(truncate = False)

In [None]:
spark.sql("""select  y.PULocationID,y.day_of_week,y.total_rides_greentaxi,l.zone from Location_total_green as y
 join location_name as l on l.location_id = y.PULocationID order by total_rides_greentaxi desc""")\
.show(truncate = False)

In [None]:
spark.sql("""SELECT y.month,CAST(y.each_month_total AS DECIMAL(30, 0)) AS monthwise_fare_collected_yellow, 
CAST(g.each_month_total AS DECIMAL(30, 0)) AS monthwise_fare_collected_green FROM farecollected_yellow as y 
join farecollected_green as g on y.month = g.month order by monthwise_fare_collected_yellow""")\
.show()

In [None]:
spark.stop()