In [0]:
# ingest All Yellow Trip data Files into DBFS Landing location
# further ingest from s3 bucket


In [0]:
from pyspark.sql.types import *
from pyspark.sql.functions import month


In [0]:

# Define schema for yellow trip data
yellowTripSchema = (

                StructType
                (
                    [															
                        StructField("VendorID",                 IntegerType(), True),
                        StructField("Pickup_datetime",          TimestampType(), True),
                        StructField("Dropoff_datetime",         TimestampType(), True),
                        StructField("store_and_fwd_flag",       TimestampType(), True),
                        StructField("RatecodeID",               IntegerType(), True),
                        StructField("PickupLocationID",         IntegerType(), True),
                        StructField("DropLocationID",           IntegerType(), True),
                        StructField("Passenger_count",          IntegerType(), True),
                        StructField("Trip_distance",            DoubleType(), True),
                        StructField("Fare_amount",              DoubleType(), True),
                        StructField("extra",                    DoubleType(), True),
                        StructField("Mta_tax",                  DoubleType(), True),
                        StructField("Tip_amount",               DoubleType(), True),
                        StructField("Tolls_amount",             DoubleType(), True),
                        StructField("Ehail_fee",                StringType(), True),
                        StructField("Improvement_surcharge",    DoubleType(), True),
                        StructField("Total_amount",             DoubleType(), True),
                        StructField("Payment_type",             IntegerType(), True),
                        StructField("Trip_type",                IntegerType(), True),
                        StructField("Congestion_surcharge",     StringType(), True)

                    ]
                )


)

# Read yellow trip data for june 2019
all_months_df = spark.read.option("header","true").schema(yellowTripSchema).csv("dbfs:/FileStore/tables/hemanth/nyctaxidata/landing/yellowdata")

# all_months_df.printSchema()
# all_months_df.display()
# all_months_df.count()

# Filter valid yellow trip records
june_month_df = all_months_df.where(month("Pickup_datetime")==6).where("Pickup_datetime != Dropoff_datetime").where("Passenger_count != 0").where("Trip_distance != 0").where("Fare_amount != 0").where("Total_amount !=0")

In [0]:
# june_month_df.select("VendorID").distinct().display()

# Get the total valid trip count of yellow taxi for each Vendor for June
vendor_trip_count_df = june_month_df.groupBy("VendorID").count()

vendor_trip_count_df.display()

VendorID,count
1,18078
4,168
2,31393


In [0]:
# Get the total trip cost for each yellow taxi Vendor for June month for all valid trips
vendor_trip_cost_df = june_month_df.groupBy("VendorID").sum("Total_amount")

vendor_trip_cost_df.display()

VendorID,sum(Total_amount)
1,336815.8899999411
4,3339.250000000005
2,605676.1299999539


In [0]:
# Get the report of total passenger count for each yellow taxi Vendor for June month for all valid trips
vendor_passenger_count_df = june_month_df.groupBy("VendorID").sum("Passenger_count")

vendor_passenger_count_df.display()

VendorID,sum(Passenger_count)
1,2734740
4,24941
2,4712131


In [0]:
# Get the report of total trip distance for each yellow taxi Vendor
vendor_trip_distance_df = june_month_df.groupBy("VendorID").sum("Trip_distance")

vendor_trip_distance_df.display()

VendorID,sum(Trip_distance)
1,2695342.0
4,23936.0
2,4713530.0


In [0]:
# Get the report of total passenger count for each day
daily_passenger_count_df = june_month_df.groupBy(dayofmonth("Pickup_datetime").alias("day")).sum("Passenger_count").orderBy("day")

daily_passenger_count_df.display()

day,sum(Passenger_count)
1,7471243
2,305
3,264


In [0]:
# Get the report of total passenger count for each Vendor and each day
vendor_daily_passenger_count_df = june_month_df.groupBy("VendorID",dayofmonth("Pickup_datetime")).sum("Passenger_count").orderBy("VendorID", dayofmonth("Pickup_datetime"))

vendor_daily_passenger_count_df.display()

VendorID,dayofmonth(Pickup_datetime),sum(Passenger_count)
1,1,2734740
2,1,4711562
2,2,305
2,3,264
4,1,24941


In [0]:

# Save the results into DBFS location
vendor_trip_count_df.write.csv("dbfs:/FileStore/tables/hemanth/nyctaxidata/stage3_spark/passcount_vendorday/vendor_trip_count", header=True)
vendor_trip_cost_df.write.csv("dbfs:/FileStore/tables/hemanth/nyctaxidata/stage3_spark/vendor_trip_cost", header=True)
vendor_passenger_count_df.write.csv("dbfs:/FileStore/tables/hemanth/nyctaxidata/stage3_spark/vendor_passenger_count", header=True)
vendor_trip_distance_df.write.csv("dbfs:/FileStore/tables/hemanth/nyctaxidata/stage3_spark/vendor_trip_distance", header=True)
daily_passenger_count_df.write.csv("dbfs:/FileStore/tables/hemanth/nyctaxidata/stage3_spark/daily_passenger_count", header=True)
vendor_daily_passenger_count_df.write.csv("dbfs:/FileStore/tables/hemanth/nyctaxidata/stage3_spark/vendor_daily_passenger_count", header=True)
