# TASK 
Load NYC taxi data to DataLake/Blob_Storage/DataBricks and extract the data through dataframe in the notebook. Perform Following Queries using PySpark. 

Query 1. - Add a column named as "Revenue" into dataframe which is the sum of the below columns 'Fare_amount','Extra','MTA_tax','Improvement_surcharge','Tip_amount','Tolls_amount','Total_amount'

Query 2. - Increasing count of total passengers in New York City by area

Query 3. - Realtime Average fare/total earning amount earned by 2 vendors

Query 4. - Moving Count of payments made by each payment mode

Query 5. - Highest two gaining vendor's on a particular date with no of passenger and total distance by cab

Query 6. - Most no of passenger between a route of two location.

Query 7. - Get top pickup locations with most passengers in last 5/10 seconds.

In [0]:
#ingesting File
file_path = "/FileStore/tables/yellow_tripdata_2018_01-1.parquet"
nyc_df = spark.read.parquet(file_path)
display(nyc_df.limit(10))

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
1,2018-01-01T00:21:05.000+0000,2018-01-01T00:24:23.000+0000,1,0.5,1,N,41,24,2,4.5,0.5,0.5,0.0,0.0,0.3,5.8,,
1,2018-01-01T00:44:55.000+0000,2018-01-01T01:03:05.000+0000,1,2.7,1,N,239,140,2,14.0,0.5,0.5,0.0,0.0,0.3,15.3,,
1,2018-01-01T00:08:26.000+0000,2018-01-01T00:14:21.000+0000,2,0.8,1,N,262,141,1,6.0,0.5,0.5,1.0,0.0,0.3,8.3,,
1,2018-01-01T00:20:22.000+0000,2018-01-01T00:52:51.000+0000,1,10.2,1,N,140,257,2,33.5,0.5,0.5,0.0,0.0,0.3,34.8,,
1,2018-01-01T00:09:18.000+0000,2018-01-01T00:27:06.000+0000,2,2.5,1,N,246,239,1,12.5,0.5,0.5,2.75,0.0,0.3,16.55,,
1,2018-01-01T00:29:29.000+0000,2018-01-01T00:32:48.000+0000,3,0.5,1,N,143,143,2,4.5,0.5,0.5,0.0,0.0,0.3,5.8,,
1,2018-01-01T00:38:08.000+0000,2018-01-01T00:48:24.000+0000,2,1.7,1,N,50,239,1,9.0,0.5,0.5,2.05,0.0,0.3,12.35,,
1,2018-01-01T00:49:29.000+0000,2018-01-01T00:51:53.000+0000,1,0.7,1,N,239,238,1,4.0,0.5,0.5,1.0,0.0,0.3,6.3,,
1,2018-01-01T00:56:38.000+0000,2018-01-01T01:01:05.000+0000,1,1.0,1,N,238,24,1,5.5,0.5,0.5,1.7,0.0,0.3,8.5,,
1,2018-01-01T00:17:04.000+0000,2018-01-01T00:22:24.000+0000,1,0.7,1,N,170,170,2,5.5,0.5,0.5,0.0,0.0,0.3,6.8,,


In [0]:
nyc_dataframe.printSchema()

root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- revenue: double (nullable = true)



In [0]:
# Query-1 Add a column named as "Revenue" into dataframe which is the sum of the below columns 'Fare_amount','Extra','MTA_tax','Improvement_surcharge','Tip_amount','Tolls_amount','Total_amount'
from pyspark.sql.functions import *
nyc_df = nyc_df.withColumn("revenue",nyc_df.fare_amount+nyc_df.extra+nyc_df.mta_tax+nyc_df.improvement_surcharge+nyc_df.tip_amount+nyc_df.tolls_amount+nyc_df.total_amount)
display(nyc_df.select("revenue").limit(10))

revenue
11.6
30.6
16.6
69.6
33.1
11.6
24.700000000000003
12.6
17.0
13.6


In [0]:
# Query - 2 Increasing count of total passengers in New York City by area
display(nyc_df.groupBy("PULocationID","DOLocationID").sum("passenger_count").limit(10))

PULocationID,DOLocationID,sum(passenger_count)
79,116,580
246,249,5073
234,144,7047
161,193,100
231,261,5812
143,211,260
90,231,6447
142,144,971
90,142,2835
249,225,400


In [0]:
# Query - 3 Realtime Average fare/total earning amount earned by 2 vendors
display(nyc_df.groupBy("VendorID").sum("total_amount"))

VendorID,sum(total_amount)
1,58181947.048458464
2,77530629.1266784


In [0]:
# Query - 4 Moving Count of payments made by each payment mode
display(nyc_df.groupBy("payment_type").count())

payment_type,count
1,6106416
3,43204
2,2599215
4,11852


In [0]:
# Query - 5 Highest two gaining vendor's on a particular date with no of passenger and total distance by cab
# taking out date from the timestamp
nyc_df = nyc_df.withColumn("date",to_date(col("tpep_pickup_datetime")))
display(nyc_df.groupBy('VendorId','date').agg(sum("passenger_count").alias("total_passenger_count"),sum("trip_distance").alias("total_distance")).sort(desc(col("total_passenger_count"))).limit(5))

VendorId,date,total_passenger_count,total_distance
2,2018-01-26,354497,513454.3299999982
2,2018-01-25,351475,518394.0400000009
2,2018-01-18,349639,508573.5399999963
2,2018-01-13,349181,500069.0500000009
2,2018-01-19,341985,500606.6799999969


In [0]:
# Query 6. - Most no of passenger between a route of two location.
display(nyc_df.groupBy("PULocationID","DOLocationID").agg(max("passenger_count").alias("maximum_passenger")).sort(desc(col("maximum_passenger"))).limit(10))

PULocationID,DOLocationID,maximum_passenger
236,138,9
264,68,9
264,261,9
264,265,9
265,265,9
1,1,9
161,95,9
163,163,9
161,1,9
138,265,9


In [0]:
# Query 7. - Get top pickup locations with most passengers in last 5/10 seconds.
from pyspark.sql.functions import *
from datetime import timedelta 

diff_timestamp =  nyc_df.select(max("tpep_pickup_datetime").alias("timestamp")).first()["timestamp"] - timedelta(seconds=10)
nyc_df_filter = nyc_df.filter(nyc_df["tpep_pickup_datetime"] > diff_timestamp)
nyc_df_filter = nyc_df_filter.groupBy("PULocationID").agg(sum("passenger_count").alias("total_passenger")).sort(desc(col("total_passenger")))

display(nyc_df_filter.limit(10))

PULocationID,total_passenger
48,2
