In [0]:
import pyspark.sql.functions as F

In [0]:
file_path="/FileStore/tables/yellow_tripdata_2018_01.parquet"
df=spark.read.parquet(file_path)
display(df.limit(5))

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
1,2018-01-01T00:21:05.000+0000,2018-01-01T00:24:23.000+0000,1,0.5,1,N,41,24,2,4.5,0.5,0.5,0.0,0.0,0.3,5.8,,
1,2018-01-01T00:44:55.000+0000,2018-01-01T01:03:05.000+0000,1,2.7,1,N,239,140,2,14.0,0.5,0.5,0.0,0.0,0.3,15.3,,
1,2018-01-01T00:08:26.000+0000,2018-01-01T00:14:21.000+0000,2,0.8,1,N,262,141,1,6.0,0.5,0.5,1.0,0.0,0.3,8.3,,
1,2018-01-01T00:20:22.000+0000,2018-01-01T00:52:51.000+0000,1,10.2,1,N,140,257,2,33.5,0.5,0.5,0.0,0.0,0.3,34.8,,
1,2018-01-01T00:09:18.000+0000,2018-01-01T00:27:06.000+0000,2,2.5,1,N,246,239,1,12.5,0.5,0.5,2.75,0.0,0.3,16.55,,


Query 1. - Add a column named as "Revenue" into dataframe which is the sum of the below columns 'Fare_amount','Extra','MTA_tax','Improvement_surcharge','Tip_amount','Tolls_amount','Total_amount'

In [0]:
df.withColumn("Revenue",
              F.col('Fare_amount')+ F.col('Extra')+F.col('MTA_tax')+F.col('Improvement_surcharge')+F.col('Tip_amount')+F.col('Tolls_amount')+F.col('Total_amount')).limit(5).display()

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee,Revenue
1,2018-01-01T00:21:05.000+0000,2018-01-01T00:24:23.000+0000,1,0.5,1,N,41,24,2,4.5,0.5,0.5,0.0,0.0,0.3,5.8,,,11.6
1,2018-01-01T00:44:55.000+0000,2018-01-01T01:03:05.000+0000,1,2.7,1,N,239,140,2,14.0,0.5,0.5,0.0,0.0,0.3,15.3,,,30.6
1,2018-01-01T00:08:26.000+0000,2018-01-01T00:14:21.000+0000,2,0.8,1,N,262,141,1,6.0,0.5,0.5,1.0,0.0,0.3,8.3,,,16.6
1,2018-01-01T00:20:22.000+0000,2018-01-01T00:52:51.000+0000,1,10.2,1,N,140,257,2,33.5,0.5,0.5,0.0,0.0,0.3,34.8,,,69.6
1,2018-01-01T00:09:18.000+0000,2018-01-01T00:27:06.000+0000,2,2.5,1,N,246,239,1,12.5,0.5,0.5,2.75,0.0,0.3,16.55,,,33.1


Query 2. - Increasing count of total passengers in New York City by area

In [0]:
df.groupby("PULocationID", "DOLocationID").sum("passenger_count").limit(5).display()


PULocationID,DOLocationID,sum(passenger_count)
79,116,580
246,249,5073
234,144,7047
161,193,100
231,261,5812


Query 3. - Realtime Average fare/total earning amount earned by 2 vendors

In [0]:
df.groupBy("VendorID").avg("total_amount").limit(5).display()

VendorID,avg(total_amount)
1,15.127384289902135
2,15.775723474073514


Query 4. - Moving Count of payments made by each payment mode

In [0]:
df.groupBy("payment_type").count().limit(5).display()

payment_type,count
1,6106416
3,43204
2,2599215
4,11852


Query 5. - Highest two gaining vendor's on a particular date with no of passenger and total distance by cab

In [0]:
display(df.select('VendorID', 'tpep_pickup_datetime', 'passenger_count', 'trip_distance').sort(df.trip_distance.desc()).limit(2))


VendorID,tpep_pickup_datetime,passenger_count,trip_distance
2,2018-01-30T11:41:02.000+0000,1,189483.84
1,2018-01-08T19:44:54.000+0000,0,830.8


Query 6. - Most no of passenger between a route of two location.

In [0]:
from pyspark.sql.functions import sum as _sum
df.groupBy('PULocationID','DOLocationID').agg(_sum('passenger_count').alias('TotalPassenger')).orderBy(F.col('TotalPassenger').desc()).limit(1).display()

PULocationID,DOLocationID,TotalPassenger
264,264,186705


Query 7. - Get top pickup locations with most passengers in last 5/10 seconds.

In [0]:
q7 = (df.groupBy(F.window('tpep_pickup_datetime', '5 seconds'))
      .agg(_sum('passenger_count').alias('sum_passenger'), F.count('PULocationID').alias('top_pick')))
qf = q7.orderBy(F.col('top_pick').desc())
display(qf.limit(5))

window,sum_passenger,top_pick
"List(2018-01-12T15:15:05.000+0000, 2018-01-12T15:15:10.000+0000)",84,64
"List(2018-01-27T15:15:25.000+0000, 2018-01-27T15:15:30.000+0000)",74,64
"List(2018-01-06T15:15:00.000+0000, 2018-01-06T15:15:05.000+0000)",71,56
"List(2018-01-07T15:15:00.000+0000, 2018-01-07T15:15:05.000+0000)",78,54
"List(2018-01-13T15:15:00.000+0000, 2018-01-13T15:15:05.000+0000)",74,53
