**Setup & Read Data**

In [0]:
# Step 1: Set Spark config to authenticate
spark.conf.set(
  "fs.azure.account.key.assgn.blob.core.windows.net",
  "wCUfMd769SCk6KsyfmUgT4ZGLEE04wASHRpCi3HAeXLk6iVCjuAKimTZTVJf1URHuIgRp299w+pp+ASt633PaQ==" 
)

# Step 2: Define file path (change filename if needed)
file_path = "wasbs://nyc-taxi-data@assgn.blob.core.windows.net/nyc-taxi-data.csv"

# Step 3: Read the CSV into a DataFrame
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Step 4: Show sample data
df.show(5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+
|       1| 2020-01-01 00:00:00|  2020-01-01 00:30:00|              1|         1.47|         1|                 Y|         165|          79|           1|       17.5| 0.74|   0.14|      2.84|        1.81|                 0.44|       23.47|
|       2| 2020-01-01 01:00:00|  2020-01-01 01:3

**Query 1 :**  Add a column named as ""Revenue"" into dataframe which is the sum of the below columns 'Fare_amount','Extra','MTA_tax','Improvement_surcharge','Tip_amount','Tolls_amount','Total_amount

In [0]:
from pyspark.sql.functions import col, sum as _sum

df = df.withColumn("Revenue",
    col("fare_amount") +
    col("extra") +
    col("mta_tax") +
    col("improvement_surcharge") +
    col("tip_amount") +
    col("tolls_amount") +
    col("total_amount")
)

df.select("Revenue", "fare_amount", "extra", "mta_tax", "tip_amount","tolls_amount", "total_amount", ).show(5)


+------------------+-----------+-----+-------+----------+------------+------------+
|           Revenue|fare_amount|extra|mta_tax|tip_amount|tolls_amount|total_amount|
+------------------+-----------+-----+-------+----------+------------+------------+
|             46.94|       17.5| 0.74|   0.14|      2.84|        1.81|       23.47|
|120.25999999999999|      43.32| 0.86|   0.41|     11.75|        3.62|       60.13|
|              37.6|       5.01| 1.69|   0.14|     10.73|         1.0|        18.8|
|             33.76|        5.8| 0.54|   0.41|       8.9|        1.02|       16.88|
|             92.38|      32.11| 0.31|   0.27|     13.44|        0.05|       46.19|
+------------------+-----------+-----+-------+----------+------------+------------+
only showing top 5 rows



**Query 2** - Increasing count of total passengers in New York City by area

In [0]:
df.groupBy("PULocationID").agg(_sum("passenger_count").alias("TotalPassengers")).orderBy("TotalPassengers", ascending=False).show(10)

+------------+---------------+
|PULocationID|TotalPassengers|
+------------+---------------+
|         227|            600|
|          59|            560|
|          17|            520|
|          98|            520|
|          40|            440|
|          29|            400|
|         215|            400|
|         108|            320|
|         164|            320|
|           3|            320|
+------------+---------------+
only showing top 10 rows



**Query 3** - Realtime Average fare/total earning amount earned by 2 vendors

In [0]:
df.groupBy("VendorID").agg({"total_amount": "avg"}).withColumnRenamed("avg(total_amount)", "AverageEarnings").show()


+--------+------------------+
|VendorID|   AverageEarnings|
+--------+------------------+
|       1| 39.20774834437135|
|       2|38.514362416107296|
+--------+------------------+



**Query 4** - Moving Count of payments made by each payment mode

In [0]:
df.groupBy("payment_type").count().orderBy("count", ascending=False).show()


+------------+-----+
|payment_type|count|
+------------+-----+
|           4| 2720|
|           2| 2640|
|           5| 2400|
|           1| 2240|
|           3| 2000|
+------------+-----+



**Query 5** - Highest two gaining vendor's on a particular date with no of passenger and total distance by cab

In [0]:
from pyspark.sql.functions import to_date

df_with_date = df.withColumn("trip_date", to_date("tpep_pickup_datetime"))
df_with_date.groupBy("trip_date", "VendorID").agg(
    _sum("total_amount").alias("TotalEarnings"),
    _sum("passenger_count").alias("TotalPassengers"),
    _sum("trip_distance").alias("TotalDistance")
).orderBy("TotalEarnings", ascending=False).show(2)


+----------+--------+------------------+---------------+------------------+
| trip_date|VendorID|     TotalEarnings|TotalPassengers|     TotalDistance|
+----------+--------+------------------+---------------+------------------+
|2020-01-11|       1| 28127.19999999997|           1760|3619.6000000000067|
|2020-01-08|       1|27503.599999999973|           1560|3916.0000000000114|
+----------+--------+------------------+---------------+------------------+
only showing top 2 rows



**Query 6** - Most no of passenger between a route of two location.

In [0]:
df.groupBy("PULocationID", "DOLocationID") \
  .agg(_sum("passenger_count").alias("TotalPassengers")) \
  .orderBy("TotalPassengers", ascending=False).show(1)


+------------+------------+---------------+
|PULocationID|DOLocationID|TotalPassengers|
+------------+------------+---------------+
|          57|          11|            160|
+------------+------------+---------------+
only showing top 1 row



**Query 7** - Get top pickup locations with most passengers in last 5/10 seconds."

In [0]:
from pyspark.sql.functions import max as _max

latest_time = df.agg(_max("tpep_pickup_datetime")).collect()[0][0]

df.filter(df["tpep_pickup_datetime"] >= latest_time) \
  .groupBy("PULocationID") \
  .agg(_sum("passenger_count").alias("TotalPassengers")) \
  .orderBy("TotalPassengers", ascending=False).show()

+------------+---------------+
|PULocationID|TotalPassengers|
+------------+---------------+
|         134|             80|
+------------+---------------+



**Save as External Parquet file**

In [0]:
output_path = "wasbs://nyc-taxi-data@assgn.blob.core.windows.net/"
 
df.write.mode("overwrite").parquet(output_path)