In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, IntegerType, DoubleType
from pyspark.sql.functions import *

# Step 1: Initialize Spark
spark = SparkSession.builder.appName("SmartCityTrafficMonitoring").getOrCreate()

# Step 2: Mount Google Drive (already done by you)
# from google.colab import drive
# drive.mount('/content/drive')

# Step 3: File path
file_path = "/content/drive/My Drive/traffic_logs.csv"

# Inferred Schema
df_infer = spark.read.csv(file_path, header=True, inferSchema=True)
df_infer.printSchema()

# Manually Defined Schema
schema = StructType([
    StructField("LogID", StringType(), True),
    StructField("VehicleID", StringType(), True),
    StructField("EntryPoint", StringType(), True),
    StructField("ExitPoint", StringType(), True),
    StructField("EntryTime", TimestampType(), True),
    StructField("ExitTime", TimestampType(), True),
    StructField("VehicleType", StringType(), True),
    StructField("SpeedKMH", IntegerType(), True),
    StructField("TollPaid", IntegerType(), True),
])

df = spark.read.csv(file_path, header=True, schema=schema)
df.show()


root
 |-- LogID: string (nullable = true)
 |-- VehicleID: string (nullable = true)
 |-- EntryPoint: string (nullable = true)
 |-- ExitPoint: string (nullable = true)
 |-- EntryTime: timestamp (nullable = true)
 |-- ExitTime: timestamp (nullable = true)
 |-- VehicleType: string (nullable = true)
 |-- SpeedKMH: integer (nullable = true)
 |-- TollPaid: integer (nullable = true)

+-----+---------+----------+---------+-------------------+-------------------+-----------+--------+--------+
|LogID|VehicleID|EntryPoint|ExitPoint|          EntryTime|           ExitTime|VehicleType|SpeedKMH|TollPaid|
+-----+---------+----------+---------+-------------------+-------------------+-----------+--------+--------+
| L001|     V001|     GateA|    GateC|2024-05-01 08:01:00|2024-05-01 08:20:00|        Car|      60|      50|
| L002|     V002|     GateB|    GateC|2024-05-01 08:10:00|2024-05-01 08:45:00|      Truck|      45|     100|
| L003|     V003|     GateA|    GateD|2024-05-01 09:00:00|2024-05-01 09:18:0

In [2]:
df = df.withColumn("TripDurationMinutes", (unix_timestamp("ExitTime") - unix_timestamp("EntryTime"))/60)
df = df.withColumn("IsOverspeed", col("SpeedKMH") > 60)
df.select("LogID", "TripDurationMinutes", "IsOverspeed").show()


+-----+-------------------+-----------+
|LogID|TripDurationMinutes|IsOverspeed|
+-----+-------------------+-----------+
| L001|               19.0|      false|
| L002|               35.0|      false|
| L003|               18.0|      false|
| L004|               20.0|       true|
| L005|               35.0|      false|
+-----+-------------------+-----------+



In [4]:
# Average speed per VehicleType
df.groupBy("VehicleType").agg(avg("SpeedKMH").alias("AvgSpeed")).show()

# Total toll collected per EntryPoint
df.groupBy("EntryPoint").agg(sum("TollPaid").alias("TotalToll")).show()

# Most used ExitPoint
df.groupBy("ExitPoint").count().orderBy(desc("count")).show(1)


+-----------+--------+
|VehicleType|AvgSpeed|
+-----------+--------+
|       Bike|    55.0|
|        Car|    70.0|
|      Truck|    45.0|
|        Bus|    40.0|
+-----------+--------+

+----------+---------+
|EntryPoint|TotalToll|
+----------+---------+
|     GateA|       80|
|     GateB|      170|
|     GateC|       50|
+----------+---------+

+---------+-----+
|ExitPoint|count|
+---------+-----+
|    GateD|    2|
+---------+-----+
only showing top 1 row



In [5]:
from pyspark.sql.window import Window

# Rank vehicles by speed within VehicleType
w1 = Window.partitionBy("VehicleType").orderBy(desc("SpeedKMH"))
df = df.withColumn("SpeedRank", rank().over(w1))

# Last ExitTime for each vehicle
w2 = Window.partitionBy("VehicleID").orderBy("EntryTime")
df = df.withColumn("PrevExitTime", lag("ExitTime").over(w2))

df.select("VehicleID", "SpeedKMH", "SpeedRank", "EntryTime", "ExitTime", "PrevExitTime").show()


+---------+--------+---------+-------------------+-------------------+------------+
|VehicleID|SpeedKMH|SpeedRank|          EntryTime|           ExitTime|PrevExitTime|
+---------+--------+---------+-------------------+-------------------+------------+
|     V001|      60|        2|2024-05-01 08:01:00|2024-05-01 08:20:00|        NULL|
|     V002|      45|        1|2024-05-01 08:10:00|2024-05-01 08:45:00|        NULL|
|     V003|      55|        1|2024-05-01 09:00:00|2024-05-01 09:18:00|        NULL|
|     V004|      80|        1|2024-05-01 09:15:00|2024-05-01 09:35:00|        NULL|
|     V005|      40|        1|2024-05-01 10:05:00|2024-05-01 10:40:00|        NULL|
+---------+--------+---------+-------------------+-------------------+------------+



In [6]:
df = df.withColumn("IdleTime", (unix_timestamp("EntryTime") - unix_timestamp("PrevExitTime"))/60)
df.select("VehicleID", "EntryTime", "ExitTime", "IdleTime").show()


+---------+-------------------+-------------------+--------+
|VehicleID|          EntryTime|           ExitTime|IdleTime|
+---------+-------------------+-------------------+--------+
|     V001|2024-05-01 08:01:00|2024-05-01 08:20:00|    NULL|
|     V002|2024-05-01 08:10:00|2024-05-01 08:45:00|    NULL|
|     V003|2024-05-01 09:00:00|2024-05-01 09:18:00|    NULL|
|     V004|2024-05-01 09:15:00|2024-05-01 09:35:00|    NULL|
|     V005|2024-05-01 10:05:00|2024-05-01 10:40:00|    NULL|
+---------+-------------------+-------------------+--------+



In [7]:
# Speed > 70 and TripDuration < 10
df.filter((col("SpeedKMH") > 70) & (col("TripDurationMinutes") < 10)).show()

# Paid less toll for longer trips (Assume suspicious if > 30 min but toll < 50)
df.filter((col("TripDurationMinutes") > 30) & (col("TollPaid") < 50)).show()

# Suspicious backtracking (ExitPoint alphabetically earlier than EntryPoint)
df.filter(col("ExitPoint") < col("EntryPoint")).show()


+-----+---------+----------+---------+---------+--------+-----------+--------+--------+-------------------+-----------+---------+------------+--------+
|LogID|VehicleID|EntryPoint|ExitPoint|EntryTime|ExitTime|VehicleType|SpeedKMH|TollPaid|TripDurationMinutes|IsOverspeed|SpeedRank|PrevExitTime|IdleTime|
+-----+---------+----------+---------+---------+--------+-----------+--------+--------+-------------------+-----------+---------+------------+--------+
+-----+---------+----------+---------+---------+--------+-----------+--------+--------+-------------------+-----------+---------+------------+--------+

+-----+---------+----------+---------+---------+--------+-----------+--------+--------+-------------------+-----------+---------+------------+--------+
|LogID|VehicleID|EntryPoint|ExitPoint|EntryTime|ExitTime|VehicleType|SpeedKMH|TollPaid|TripDurationMinutes|IsOverspeed|SpeedRank|PrevExitTime|IdleTime|
+-----+---------+----------+---------+---------+--------+-----------+--------+--------+

In [8]:
# Prepare vehicle_registry.csv
vehicle_registry_path = "/content/drive/My Drive/vehicle_registry.csv"

registry_schema = StructType([
    StructField("VehicleID", StringType(), True),
    StructField("OwnerName", StringType(), True),
    StructField("Model", StringType(), True),
    StructField("RegisteredCity", StringType(), True),
])

df_registry = spark.read.csv(vehicle_registry_path, header=True, schema=registry_schema)

# Join
df_joined = df.join(df_registry, on="VehicleID", how="left")

# Group trips by RegisteredCity
df_joined.groupBy("RegisteredCity").agg(count("*").alias("TotalTrips")).show()


+--------------+----------+
|RegisteredCity|TotalTrips|
+--------------+----------+
|     Bangalore|         1|
|       Chennai|         1|
|        Mumbai|         1|
|          Pune|         1|
|         Delhi|         1|
+--------------+----------+



In [11]:
from pyspark.sql.functions import when

# Simulated Save as Delta Table
# (In real Delta Lake environment use format("delta"))
delta_simulated_path = "/content/drive/My Drive/delta/traffic_logs_simulated"
df.write.mode("overwrite").parquet(delta_simulated_path)  # Simulate Delta Table using Parquet
print("Traffic logs saved as simulated Delta table (Parquet format).")

# Simulated MERGE INTO (Update TollPaid for Bikes)
df_bike_updated = df.withColumn(
    "TollPaid",
    when(col("VehicleType") == "Bike", 40).otherwise(col("TollPaid"))
)
df_bike_updated.select("VehicleID", "VehicleType", "TollPaid").show()

# Simulated DELETE FROM Delta Table (Trips longer than 60 mins)
df_filtered = df_bike_updated.filter(col("TripDurationMinutes") <= 60)
df_filtered.select("VehicleID", "TripDurationMinutes").show()

# Simulated VERSIONING by saving different versions manually
version_1_path = "/content/drive/My Drive/delta/traffic_logs_version_1"
version_2_path = "/content/drive/My Drive/delta/traffic_logs_version_2"

# Version 1 - original
df.write.mode("overwrite").parquet(version_1_path)

# Version 2 - after update + delete
df_filtered.write.mode("overwrite").parquet(version_2_path)

# Simulated DESCRIBE HISTORY (Manually printing version info)
from datetime import datetime

print("\nSimulated Delta History:")
print(f"Version 1: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - Original saved")
print(f"Version 2: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} - After Bike toll update and trip filter")


Traffic logs saved as simulated Delta table (Parquet format).
+---------+-----------+--------+
|VehicleID|VehicleType|TollPaid|
+---------+-----------+--------+
|     V001|        Car|      50|
|     V002|      Truck|     100|
|     V003|       Bike|      40|
|     V004|        Car|      50|
|     V005|        Bus|      70|
+---------+-----------+--------+

+---------+-------------------+
|VehicleID|TripDurationMinutes|
+---------+-------------------+
|     V001|               19.0|
|     V002|               35.0|
|     V003|               18.0|
|     V004|               20.0|
|     V005|               35.0|
+---------+-------------------+


Simulated Delta History:
Version 1: 2025-06-19 04:41:46 - Original saved
Version 2: 2025-06-19 04:41:46 - After Bike toll update and trip filter


In [12]:
# Trip Type Tagging
df = df.withColumn("TripType", when(col("TripDurationMinutes") < 15, "Short")
                                  .when(col("TripDurationMinutes") <= 30, "Medium")
                                  .otherwise("Long"))

# Trips per day per vehicle
df = df.withColumn("TripDate", to_date("EntryTime"))
vehicle_trip_counts = df.groupBy("VehicleID", "TripDate").agg(count("*").alias("DailyTrips"))

# Flag vehicles with >3 trips/day
vehicle_trip_counts.filter(col("DailyTrips") > 3).show()


+---------+--------+----------+
|VehicleID|TripDate|DailyTrips|
+---------+--------+----------+
+---------+--------+----------+



In [13]:
# Write as Parquet partitioned by VehicleType
df.write.mode("overwrite").partitionBy("VehicleType").parquet("/content/drive/My Drive/output/traffic_parquet")

# Write as CSV for dashboards
df.write.mode("overwrite").option("header", True).csv("/content/drive/My Drive/output/traffic_csv")

# Register temporary SQL View
df.createOrReplaceTempView("traffic_view")

# SQL Summary View: Total toll by VehicleType + ExitPoint
summary = spark.sql("""
    SELECT VehicleType, ExitPoint, SUM(TollPaid) AS TotalToll
    FROM traffic_view
    GROUP BY VehicleType, ExitPoint
    ORDER BY VehicleType, TotalToll DESC
""")
summary.show()


+-----------+---------+---------+
|VehicleType|ExitPoint|TotalToll|
+-----------+---------+---------+
|       Bike|    GateD|       30|
|        Bus|    GateA|       70|
|        Car|    GateD|       50|
|        Car|    GateC|       50|
|      Truck|    GateC|      100|
+-----------+---------+---------+

