In [29]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg, count, window, desc

##### Initialize Spark session

In [30]:
spark = SparkSession.builder.appName("NYC Taxi Data Analysis").getOrCreate()

### Load the data into a DataFrame

In [31]:
data_path = "yellow_tripdata_2020-01.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True)

### Show the schema of the dataframe

In [32]:
df.printSchema()

root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp (nullable = true)
 |-- tpep_dropoff_datetime: timestamp (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: integer (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)



# Query 1:
    Add a column named "Revenue" into DataFrame

In [33]:
from pyspark.sql.functions import col

df = df.withColumn("Revenue", col("Fare_amount") + col("Extra") + col("MTA_tax") + col("Improvement_surcharge") + col("Tip_amount") + col("Tolls_amount") + col("Total_amount"))
df.show()

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|           Revenue|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+------------------+
|       1| 2020-01-01 00:28:15|  2020-01-01 00:33:03|              1|          1.2|         1|                 N|         238|         239|           1|        6.0|  

# Query 2:
    Increasing count of total passengers in New York City by area

In [34]:
df.groupBy("PULocationID").sum("passenger_count").withColumnRenamed("sum(passenger_count)", "Total_passengers").orderBy("Total_passengers", ascending=False).show()

+------------+----------------+
|PULocationID|Total_passengers|
+------------+----------------+
|         237|          433243|
|         161|          425986|
|         236|          403347|
|         230|          360096|
|         162|          351011|
|         186|          338952|
|         132|          326402|
|          48|          297148|
|         142|          294502|
|         170|          289593|
|         234|          284965|
|         163|          267047|
|         239|          263583|
|          79|          244515|
|         141|          237341|
|          68|          227635|
|         164|          218138|
|         107|          215684|
|         238|          199367|
|         138|          197352|
+------------+----------------+
only showing top 20 rows



# Query 3:
    Real-time Average fare/total earning amount earned by 2 vendors

In [35]:
df.groupBy("VendorID").avg("Total_amount").withColumnRenamed("avg(Total_amount)", "Average_fare").orderBy("Average_fare", ascending=False).limit(2).show()

+--------+------------------+
|VendorID|      Average_fare|
+--------+------------------+
|    NULL|37.217091425863046|
|       2|18.648347164036302|
+--------+------------------+



# Query 4:
    Moving Count of payments made by each payment mode

In [36]:
df.groupBy("payment_type").count().withColumnRenamed("count", "Payment_count").orderBy("Payment_count", ascending=False).show()

+------------+-------------+
|payment_type|Payment_count|
+------------+-------------+
|           1|      4694897|
|           2|      1593834|
|        NULL|        65441|
|           3|        32770|
|           4|        18065|
|           5|            1|
+------------+-------------+



# Query 5:
    Highest two gaining vendors on a particular date with the number of passengers and total distance by cab

In [38]:
from pyspark.sql.functions import to_date

In [39]:
date = "2020-01-15"  # Example date
df_filtered = df.filter(to_date(col("tpep_pickup_datetime")) == date)
df_filtered.groupBy("VendorID").agg(
    {"passenger_count": "sum", "trip_distance": "sum", "Total_amount": "sum"}
).withColumnRenamed("sum(passenger_count)", "Total_passengers")\
.withColumnRenamed("sum(trip_distance)", "Total_distance")\
.withColumnRenamed("sum(Total_amount)", "Total_revenue")\
.orderBy("Total_revenue", ascending=False).limit(2).show()

+--------+------------------+----------------+------------------+
|VendorID|     Total_revenue|Total_passengers|    Total_distance|
+--------+------------------+----------------+------------------+
|       2| 2700441.549999132|          233339| 410271.7600000014|
|       1|1319816.5300006857|           82508|190960.49999999945|
+--------+------------------+----------------+------------------+



# Query 6:
    Most number of passengers between a route of two locations

In [40]:
df.groupBy("PULocationID", "DOLocationID").sum("passenger_count").withColumnRenamed("sum(passenger_count)", "Total_passengers").orderBy("Total_passengers", ascending=False).show()

+------------+------------+----------------+
|PULocationID|DOLocationID|Total_passengers|
+------------+------------+----------------+
|         237|         236|           67885|
|         236|         236|           57662|
|         236|         237|           56488|
|         237|         237|           49757|
|         264|         264|           44789|
|         239|         238|           30402|
|         239|         142|           28755|
|         161|         237|           27492|
|         142|         239|           27260|
|         186|         230|           25857|
|         141|         236|           25850|
|         238|         239|           25431|
|         237|         162|           25160|
|         230|         186|           24846|
|         237|         161|           24779|
|         186|         161|           23489|
|         237|         141|           22829|
|         263|         236|           22628|
|         239|         239|           22052|
|         

# Query 7:
    Get top pickup locations with most passengers in the last 5/10 seconds

In [41]:
top_pickup_locations = df.groupBy(window(col("tpep_pickup_datetime"), "10 seconds"), "PULocationID").agg(count("passenger_count").alias("total_passengers")).orderBy(desc("total_passengers")).limit(1)
top_pickup_locations.show()

+--------------------+------------+----------------+
|              window|PULocationID|total_passengers|
+--------------------+------------+----------------+
|{2020-01-08 23:11...|         142|              12|
+--------------------+------------+----------------+

