In [0]:
from pyspark.sql import SparkSession

Load NYC taxi data to DataLake/Blob_Storage/DataBricks and extract the data through dataframe in the notebook.
Perform Following Queries using PySpark.
Query 1. - Add a column named as "Revenue" into dataframe which is the sum of the below columns 'Fare_amount','Extra','MTA_tax','Improvement_surcharge','Tip_amount','Tolls_amount','Total_amount'

Query 2. - Increasing count of total passengers in New York City by area

Query 3. - Realtime Average fare/total earning amount earned by 2 vendors

Query 4. - Moving Count of payments made by each payment mode

Query 5. - Highest two gaining vendor's on a particular date with no of passenger and total distance by cab

Query 6. - Most no of passenger between a route of two location.

Query 7. - Get top pickup locations with most passengers in last 5/10 seconds.

In [0]:
#DATA LOAD AND PRINTING SCHEMA
path="/FileStore/tables/yellow_tripdata_2018_01.parquet"
dataframe = spark.read.parquet(path,header=True)
display(dataframe.limit(5))

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
1,2018-01-01T00:21:05.000+0000,2018-01-01T00:24:23.000+0000,1,0.5,1,N,41,24,2,4.5,0.5,0.5,0.0,0.0,0.3,5.8,,
1,2018-01-01T00:44:55.000+0000,2018-01-01T01:03:05.000+0000,1,2.7,1,N,239,140,2,14.0,0.5,0.5,0.0,0.0,0.3,15.3,,
1,2018-01-01T00:08:26.000+0000,2018-01-01T00:14:21.000+0000,2,0.8,1,N,262,141,1,6.0,0.5,0.5,1.0,0.0,0.3,8.3,,
1,2018-01-01T00:20:22.000+0000,2018-01-01T00:52:51.000+0000,1,10.2,1,N,140,257,2,33.5,0.5,0.5,0.0,0.0,0.3,34.8,,
1,2018-01-01T00:09:18.000+0000,2018-01-01T00:27:06.000+0000,2,2.5,1,N,246,239,1,12.5,0.5,0.5,2.75,0.0,0.3,16.55,,


In [0]:
#Query-1:Adding a new column having sum of the values of the given columns
from pyspark.sql.functions import *
df=dataframe.withColumn("Revenue",col("Fare_amount")+col("Extra")+col("MTA_tax")+col("Improvement_surcharge")+col("Tip_amount")+col("Tolls_amount")+col("Total_amount"))
display(df.select("Revenue").limit(5))

Revenue
11.6
30.6
16.6
69.6
33.1


In [0]:
#Query-2: Increasing count of total passengers in New York City by area
display(df.groupBy("PULocationID","DOLocationID").sum("passenger_count").limit(5))

PULocationID,DOLocationID,sum(passenger_count)
79,116,580
246,249,5073
234,144,7047
161,193,100
231,261,5812


In [0]:
#Query-3 :Realtime Average fare/total earning amount earned by 2 vendors

display(df.groupBy("VendorID").avg("total_amount").limit(5))

VendorID,avg(total_amount)
1,15.127384289902135
2,15.775723474073514


In [0]:
#Query-4 :Moving Count of payments made by each payment mode

display(df.groupBy("payment_type").count().limit(5))

payment_type,count
1,6106416
3,43204
2,2599215
4,11852


In [0]:
#Query-5 :Highest two gaining vendor's on a particular date with no of passenger and total distance by cab

df=df.withColumn("Date",to_date(col("tpep_pickup_datetime")))
temp=(df.groupBy("Date","VendorId").agg(sum("passenger_count").alias("passenger_count"),sum("trip_distance")))
display(temp.sort(desc(col("passenger_count"))).limit(5))

Date,VendorId,passenger_count,sum(trip_distance)
2018-01-26,2,354497,513454.3299999982
2018-01-25,2,351475,518394.0400000009
2018-01-18,2,349639,508573.5399999963
2018-01-13,2,349181,500069.0500000009
2018-01-19,2,341985,500606.6799999969


In [0]:
datedf=df.select("Date")
display(datedf.distinct())

Date
2009-01-01
2018-01-23
2001-01-05
2018-01-11
2018-01-08
2018-01-18
2018-01-14
2018-01-05
2018-01-13
2018-02-01


In [0]:
#Query-6 :Most no of passenger between a route of two location
#PULocationID,DOLocationID
temp=df.groupBy("PULocationID","DOLocationID").agg(sum(col("passenger_count")).alias("total_passenger"))
display(temp.sort(desc(col("total_passenger"))).limit(1))

PULocationID,DOLocationID,total_passenger
264,264,186705


In [0]:
#Query-7 :Get top pickup locations with most passengers in last 5/10 seconds.
#df=df.withColumn("TIME",col("tpep_pickup_datetime")-col("Date"))
#display(df.select("TIME"))
#display(df.groupBy("PULocationID").count().sort(desc("count")).limit(5))

In [0]:
#Sameer Code
# q7=(df.groupBy(window("tpep_pickup_datetime","5 seconds"))\
# .agg(sum("passenger_count").alias("sum_passenger"),count("PULocationID").alias("Top_pickup")))

In [0]:
#Query-7 :Get top pickup locations with most passengers in last 5/10 seconds.
from datetime import timedelta
#Query-7 :Get top pickup locations with most passengers in last 5/10 seconds
freq_time=df.agg(max(col("tpep_pickup_datetime"))).collect()
time_frame=freq_time[0][0]-timedelta(seconds=5)
q7=df.filter(df.tpep_pickup_datetime>=time_frame)
abc=q7.groupBy("PULocationID").sum("passenger_count")
display(abc.sort(desc(col("sum(passenger_count)"))).limit(1))

PULocationID,sum(passenger_count)
48,2


In [0]:
display(freq_time)

max(tpep_pickup_datetime)
2018-07-27T04:06:37.000+0000


In [0]:
# q7=q7.sort(desc(col("time")))
# q7=q7.sort("time")
# display(q7.groupBy(sum(passenger_count)))
# display(abc.sum("passenger_count").sort(desc("sum(passenger_count)")).limit(1))
# abc=q7.groupBy("PULocationID").sum("passenger_count")
# # abc=abc.sort(col("sum(passenger_count)"))
# display(abc.sort(desc("sum(passenger_count)")).limit(1))

Column<'max(tpep_pickup_datetime)'>

In [0]:
# q7.groupBy(window("tpep_pickup_datetime","5 seconds")).agg(sum("passenger_count").alias("sum_passenger"),count("PULocationID").alias("Top_pickup"))
# display(q7.groupBy(window("time","5 seconds")).agg(sum("passenger_count").alias("sum_passenger"),count("PULocationID").alias("Top_pickup")).limit(5))