# Smart City Traffic Monitoring System

Creating Pyspark session

In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("TrafficMonitoring").getOrCreate()
spark

#### 1. Data Ingestion & Schema Analysis

Load CSV using PySpark with schema inference

In [2]:
from google.colab import drive

drive.mount('/content/drive')

traffic_df = spark.read.format('csv') \
                       .option('header', True) \
                       .option('inferSchema', True) \
                       .load('/content/drive/MyDrive/Assessment/traffic_logs.csv')

traffic_df.printSchema()

Mounted at /content/drive
root
 |-- LogID: string (nullable = true)
 |-- VehicleID: string (nullable = true)
 |-- EntryPoint: string (nullable = true)
 |-- ExitPoint: string (nullable = true)
 |-- EntryTime: timestamp (nullable = true)
 |-- ExitTime: timestamp (nullable = true)
 |-- VehicleType: string (nullable = true)
 |-- SpeedKMH: integer (nullable = true)
 |-- TollPaid : double (nullable = true)



 Manually define schema and compare

In [3]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType, DoubleType

traffic_schema = StructType([
    StructField('LogID', StringType(), True),
    StructField('VehicleID', StringType(), True),
    StructField('EntryPoint', StringType(), True),
    StructField('ExitPoint', StringType(), True),
    StructField('EntryTime', TimestampType(), True),
    StructField('ExitTime', TimestampType(), True),
    StructField('VehicleType', StringType(), True),
    StructField('SpeedKMH', IntegerType(), True),
    StructField('TollPaid', DoubleType(), True)
])

traffic_df_manual = spark.read.format('csv') \
                              .option('header', True) \
                              .schema(traffic_schema) \
                              .load('/content/drive/MyDrive/Assessment/traffic_logs.csv')

traffic_df_manual.printSchema()

root
 |-- LogID: string (nullable = true)
 |-- VehicleID: string (nullable = true)
 |-- EntryPoint: string (nullable = true)
 |-- ExitPoint: string (nullable = true)
 |-- EntryTime: timestamp (nullable = true)
 |-- ExitTime: timestamp (nullable = true)
 |-- VehicleType: string (nullable = true)
 |-- SpeedKMH: integer (nullable = true)
 |-- TollPaid: double (nullable = true)



Comparing both schemas

In [6]:
print("\n inferSchema")
traffic_df.printSchema()

print("\n manualSchema")
traffic_df_manual.printSchema()


 inferSchema
root
 |-- LogID: string (nullable = true)
 |-- VehicleID: string (nullable = true)
 |-- EntryPoint: string (nullable = true)
 |-- ExitPoint: string (nullable = true)
 |-- EntryTime: timestamp (nullable = true)
 |-- ExitTime: timestamp (nullable = true)
 |-- VehicleType: string (nullable = true)
 |-- SpeedKMH: integer (nullable = true)
 |-- TollPaid : double (nullable = true)


 manualSchema
root
 |-- LogID: string (nullable = true)
 |-- VehicleID: string (nullable = true)
 |-- EntryPoint: string (nullable = true)
 |-- ExitPoint: string (nullable = true)
 |-- EntryTime: timestamp (nullable = true)
 |-- ExitTime: timestamp (nullable = true)
 |-- VehicleType: string (nullable = true)
 |-- SpeedKMH: integer (nullable = true)
 |-- TollPaid: double (nullable = true)



Remving leading and tailing spaces in column names

In [21]:
traffic_df = traffic_df.toDF(*[col_name.strip() for col_name in traffic_df.columns])

#### 2. Derived Column Creation


Calculate
TripDurationMinutes = ExitTime - EntryTime

In [22]:
from pyspark.sql.functions import unix_timestamp, col

traffic_df = traffic_df.withColumn('TripDurationMinutes',
                                            (unix_timestamp(col('ExitTime')) - unix_timestamp(col('EntryTime'))) / 60)

traffic_df.select(
                    'LogID',
                    'VehicleID',
                    'EntryTime',
                    'ExitTime',
                    'TripDurationMinutes'
                 ).show()


+-----+---------+-------------------+-------------------+-------------------+
|LogID|VehicleID|          EntryTime|           ExitTime|TripDurationMinutes|
+-----+---------+-------------------+-------------------+-------------------+
| L001|     V001|2024-05-01 08:01:00|2024-05-01 08:20:00|               19.0|
| L002|     V002|2024-05-01 08:10:00|2024-05-01 08:45:00|               35.0|
| L003|     V003|2024-05-01 09:00:00|2024-05-01 09:18:00|               18.0|
| L004|     V004|2024-05-01 09:15:00|2024-05-01 09:35:00|               20.0|
| L005|     V005|2024-05-01 10:05:00|2024-05-01 10:40:00|               35.0|
+-----+---------+-------------------+-------------------+-------------------+



Add
IsOverspeed = SpeedKMH > 60

In [23]:
traffic_df = traffic_df.withColumn('IsOverspeed', col('SpeedKMH') > 60)

traffic_df.select(
                    'LogID',
                    'VehicleID',
                    'SpeedKMH',
                    'VehicleType',
                    'IsOverspeed'
                 ).show()

+-----+---------+--------+-----------+-----------+
|LogID|VehicleID|SpeedKMH|VehicleType|IsOverspeed|
+-----+---------+--------+-----------+-----------+
| L001|     V001|      60|        Car|      false|
| L002|     V002|      45|      Truck|      false|
| L003|     V003|      55|       Bike|      false|
| L004|     V004|      80|        Car|       true|
| L005|     V005|      40|        Bus|      false|
+-----+---------+--------+-----------+-----------+



3. Vehicle Behavior Aggregations


Average speed per
VehicleType

In [24]:
from pyspark.sql.functions import avg
avg_speed_df = traffic_df.groupBy('VehicleType') \
                      .agg(avg('SpeedKMH').alias('AverageSpeed'))
avg_speed_df.show()


+-----------+------------+
|VehicleType|AverageSpeed|
+-----------+------------+
|       Bike|        55.0|
|        Car|        70.0|
|      Truck|        45.0|
|        Bus|        40.0|
+-----------+------------+



 Total toll collected per gate (EntryPoint)

In [26]:
from pyspark.sql.functions import sum
total_toll_collected_df = traffic_df.groupBy('EntryPoint') \
                                    .agg(sum('TollPaid').alias('TotalTollCollected')) \
                                    .orderBy('TotalTollCollected', ascending=False)

total_toll_collected_df.show()

+----------+------------------+
|EntryPoint|TotalTollCollected|
+----------+------------------+
|     GateB|             170.0|
|     GateA|              80.0|
|     GateC|              50.0|
+----------+------------------+



 Most used ExitPoint

In [28]:
most_used_exit_point_df = traffic_df.groupBy('ExitPoint') \
                                    .count() \
                                    .withColumnRenamed('count', 'ExitPointCount') \
                                    .orderBy('ExitPointCount', ascending=False)

most_used_exit_point_df.show()

+---------+--------------+
|ExitPoint|ExitPointCount|
+---------+--------------+
|    GateD|             2|
|    GateC|             2|
|    GateA|             1|
+---------+--------------+



#### 4  Window Functions


Rank vehicles by speed within
VehicleType

In [31]:
from pyspark.sql.functions import row_number, desc
from pyspark.sql import Window

window_df = Window.partitionBy('VehicleType').orderBy(desc('SpeedKMH'))
vehicles_by_speed = traffic_df.withColumn('SpeedRank', row_number().over(window_df))

vehicles_by_speed.select(
                          'VehicleID',
                          'VehicleType',
                          'SpeedKMH',
                          'SpeedRank'
                        ).show()

+---------+-----------+--------+---------+
|VehicleID|VehicleType|SpeedKMH|SpeedRank|
+---------+-----------+--------+---------+
|     V003|       Bike|      55|        1|
|     V005|        Bus|      40|        1|
|     V004|        Car|      80|        1|
|     V001|        Car|      60|        2|
|     V002|      Truck|      45|        1|
+---------+-----------+--------+---------+



Find last exit time for each vehicle using
lag()

In [35]:
from pyspark.sql.functions import lag, coalesce

window_df = Window.orderBy('EntryTime')
last_exit_time_df = traffic_df.withColumn('LastExitTime', lag('ExitTime').over(window_df))

last_exit_time_df = last_exit_time_df.withColumn('LastExitTime',
                                                  coalesce('LastExitTime', 'ExitTime')
                                                 )

last_exit_time_df.select(
                          'LogID',
                          'VehicleID',
                          'VehicleType',
                          'EntryPoint',
                          'EntryTime',
                          'ExitPoint',
                          'ExitTime',
                          'LastExitTime'
                        ).show()

+-----+---------+-----------+----------+-------------------+---------+-------------------+-------------------+
|LogID|VehicleID|VehicleType|EntryPoint|          EntryTime|ExitPoint|           ExitTime|       LastExitTime|
+-----+---------+-----------+----------+-------------------+---------+-------------------+-------------------+
| L001|     V001|        Car|     GateA|2024-05-01 08:01:00|    GateC|2024-05-01 08:20:00|2024-05-01 08:20:00|
| L002|     V002|      Truck|     GateB|2024-05-01 08:10:00|    GateC|2024-05-01 08:45:00|2024-05-01 08:20:00|
| L003|     V003|       Bike|     GateA|2024-05-01 09:00:00|    GateD|2024-05-01 09:18:00|2024-05-01 08:45:00|
| L004|     V004|        Car|     GateC|2024-05-01 09:15:00|    GateD|2024-05-01 09:35:00|2024-05-01 09:18:00|
| L005|     V005|        Bus|     GateB|2024-05-01 10:05:00|    GateA|2024-05-01 10:40:00|2024-05-01 09:35:00|
+-----+---------+-----------+----------+-------------------+---------+-------------------+-------------------+



#### Session Segmentation

Group by VehicleID to simulate route sessions

In [41]:
from pyspark.sql.functions import count, sum, min, max

sessions_df = traffic_df.groupBy('VehicleID') \
                        .agg(
                              count('*').alias('TripCount'),
                              sum('TollPaid').alias('TotalTollPaid'),
                              min('EntryTime').alias('FirstEntryTime'),
                              max('ExitTime').alias('LastExitTime'),
                              max('SpeedKMH').alias('MaxSpeedKMH')
                        )

sessions_df.show()

+---------+---------+-------------+-------------------+-------------------+-----------+
|VehicleID|TripCount|TotalTollPaid|     FirstEntryTime|       LastExitTime|MaxSpeedKMH|
+---------+---------+-------------+-------------------+-------------------+-----------+
|     V004|        1|         50.0|2024-05-01 09:15:00|2024-05-01 09:35:00|         80|
|     V005|        1|         70.0|2024-05-01 10:05:00|2024-05-01 10:40:00|         40|
|     V001|        1|         50.0|2024-05-01 08:01:00|2024-05-01 08:20:00|         60|
|     V003|        1|         30.0|2024-05-01 09:00:00|2024-05-01 09:18:00|         55|
|     V002|        1|        100.0|2024-05-01 08:10:00|2024-05-01 08:45:00|         45|
+---------+---------+-------------+-------------------+-------------------+-----------+



Find duration between subsequent trips (idle time)

In [45]:
from pyspark.sql.functions import when

window_spec = Window.orderBy('EntryTime')

# Calculating previous trip ExitTime
idl_time_df = traffic_df.withColumn('PrevExitTime', lag('ExitTime').over(window_spec))

# Calculating idle time of EntryTime and previous ExitTime
idl_time_df = idl_time_df.withColumn('IdleTimeMinutes',
                                    (col('EntryTime').cast('long') - col('PrevExitTime').cast('long')) / 60)

# Replacing negative idle times with 0
idl_time_df = idl_time_df.withColumn('IdleTimeMinutes',
                                     when(col('IdleTimeMinutes') < 0, 0).otherwise(col('IdleTimeMinutes')))


idl_time_df.select(
                    'VehicleID',
                    'EntryTime',
                    'ExitTime',
                    'PrevExitTime',
                    'IdleTimeMinutes'
                   ).show()


+---------+-------------------+-------------------+-------------------+---------------+
|VehicleID|          EntryTime|           ExitTime|       PrevExitTime|IdleTimeMinutes|
+---------+-------------------+-------------------+-------------------+---------------+
|     V001|2024-05-01 08:01:00|2024-05-01 08:20:00|               NULL|           NULL|
|     V002|2024-05-01 08:10:00|2024-05-01 08:45:00|2024-05-01 08:20:00|            0.0|
|     V003|2024-05-01 09:00:00|2024-05-01 09:18:00|2024-05-01 08:45:00|           15.0|
|     V004|2024-05-01 09:15:00|2024-05-01 09:35:00|2024-05-01 09:18:00|            0.0|
|     V005|2024-05-01 10:05:00|2024-05-01 10:40:00|2024-05-01 09:35:00|           30.0|
+---------+-------------------+-------------------+-------------------+---------------+



#### 6. Anomaly Detection

Identify vehicles with speed > 70 and TripDuration < 10 minutes

In [53]:
speed_trip_anomaly_df = traffic_df.filter((col('SpeedKMH') > 70) & (col('TripDurationMinutes') < 10)) \
                                  .select(
                                            'LogID',
                                            'VehicleID',
                                            'EntryPoint',
                                            'EntryTime',
                                            'ExitPoint',
                                            'ExitTime',
                                            'VehicleType',
                                            'SpeedKMH',
                                            'TripDurationMinutes'
                                         )

speed_trip_anomaly_df.show()

+-----+---------+----------+---------+---------+--------+-----------+--------+-------------------+
|LogID|VehicleID|EntryPoint|EntryTime|ExitPoint|ExitTime|VehicleType|SpeedKMH|TripDurationMinutes|
+-----+---------+----------+---------+---------+--------+-----------+--------+-------------------+
+-----+---------+----------+---------+---------+--------+-----------+--------+-------------------+



Vehicles that paid less toll for longer trips

In [54]:
from pyspark.sql.functions import col


less_toll_anomaly = traffic_df.withColumn('TollPerMinute',
                                          col('TollPaid') / col('TripDurationMinutes'))


less_toll_anomaly = less_toll_anomaly.filter((col('TripDurationMinutes') > 30) &(col('TollPerMinute') < 2))

less_toll_anomaly.select(
                          'VehicleID',
                          'TripDurationMinutes',
                          'TollPaid',
                          'TollPerMinute'
                        ).show()


+---------+-------------------+--------+-------------+
|VehicleID|TripDurationMinutes|TollPaid|TollPerMinute|
+---------+-------------------+--------+-------------+
+---------+-------------------+--------+-------------+



Suspicious backtracking (ExitPoint earlier than EntryPoint)

In [58]:
suspicious_backtracking_df = traffic_df.filter(col('ExitPoint') < col('EntryPoint'))
suspicious_backtracking_df.select(
                                   'LogID',
                                   'VehicleID',
                                   'VehicleType',
                                   'EntryPoint',
                                   'EntryTime',
                                   'ExitPoint',
                                   'ExitTime'
                                  ).show()

+-----+---------+-----------+----------+-------------------+---------+-------------------+
|LogID|VehicleID|VehicleType|EntryPoint|          EntryTime|ExitPoint|           ExitTime|
+-----+---------+-----------+----------+-------------------+---------+-------------------+
| L005|     V005|        Bus|     GateB|2024-05-01 10:05:00|    GateA|2024-05-01 10:40:00|
+-----+---------+-----------+----------+-------------------+---------+-------------------+



#### 7. Join with Metadata

 Prepare this small
vehicle_registry.csv

In [59]:
vehicle_registry_schema = StructType(
    [
    StructField("VehicleID", StringType(), True),
    StructField("OwnerName", StringType(), True),
    StructField("Model", StringType(), True),
    StructField("RegisteredCity", StringType(), True)
    ]
  )

data = [
    ("V001", "Anil", "Hyundai i20", "Delhi"),
    ("V002", "Rakesh", "Tata Truck", "Chennai"),
    ("V003", "Sana", "Yamaha R15", "Mumbai"),
    ("V004", "Neha", "Honda City", "Bangalore"),
    ("V005","Zoya","Volvo Bus","Pune")
]

vehicle_registry_df = spark.createDataFrame(data, vehicle_registry_schema)

vehicle_registry_df.printSchema()


root
 |-- VehicleID: string (nullable = true)
 |-- OwnerName: string (nullable = true)
 |-- Model: string (nullable = true)
 |-- RegisteredCity: string (nullable = true)



 Join and group trips by
RegisteredCity

In [63]:
joined_df = traffic_df.join(vehicle_registry_df, on='VehicleID', how='left')

grouped_trips_df = joined_df.groupBy('RegisteredCity') \
                            .agg(
                                  count('*').alias('TotalTrips'),
                                  sum('TollPaid').alias('TotalTollCollected')
                                ) \
                            .orderBy('TotalTrips', ascending=False)

grouped_trips_df.show()

+--------------+----------+------------------+
|RegisteredCity|TotalTrips|TotalTollCollected|
+--------------+----------+------------------+
|     Bangalore|         1|              50.0|
|       Chennai|         1|             100.0|
|        Mumbai|         1|              30.0|
|          Pune|         1|              70.0|
|         Delhi|         1|              50.0|
+--------------+----------+------------------+



#### 8. Delta Lake Features

Save traffic_logs as Delta Table

In [None]:
traffic_df.write.format('delta') \
                .mode('overwrite') \
                .save('/content/drive/MyDrive/Assessment/transformed_traffic_logs')

 Apply
MERGE INTO to update toll rates for all Bikes

In [None]:
spark.sql("""
          MERGE INTO delta.`/content/drive/MyDrive/Assessment/transformed_traffic_logs` AS target
          USING (
                  SELECT
                    * FROM delta.`/content/drive/MyDrive/Assessment/transformed_traffic_logs`
                  WHERE VehicleType = 'Bike'
          ) AS source
          ON target.LogID = source.LogID
          WHEN MATCHED THEN
            UPDATE SET target.TollPaid = target.TollPaid + 10
""")

spark.sql("""
          SELECT
            VehicleID,
            VehicleType,
            TollPaid
          FROM delta.`/content/drive/MyDrive/Assessment/transformed_traffic_logs`
          WHERE VehicleType = 'Bike'
""").show()


Use
DESCRIBE HISTORY and
VERSION AS OF

In [None]:
# DESCRIBE HISTORY
spark.sql("""
          DESCRIBE HISTORY delta.`/content/drive/MyDrive/Assessment/transformed_traffic_logs`
          """).show()


# VERSION AS OF
df_version = spark.read.format('delta') \
                       .option('versionAsOf', 0) \
                       .load('/content/drive/MyDrive/Assessment/transformed_traffic_logs')

df_version.show()


#### 9. Advanced Conditions

when/otherwise : Tag trip type as: \
 "Short" <15min \
 "Medium" 15-30min \
 "Long" >30min

In [66]:
traffic_df = traffic_df.withColumn('TripType',
                                    when(col('TripDurationMinutes') < 15, 'Short')
                                    .when((col('TripDurationMinutes') >= 15) & (col('TripDurationMinutes') <= 30), 'Medium')
                                    .otherwise('Long')
                                   )

traffic_df.select(
                  'VehicleID',
                  'TripDurationMinutes',
                  'TripType').show()


+---------+-------------------+--------+
|VehicleID|TripDurationMinutes|TripType|
+---------+-------------------+--------+
|     V001|               19.0|  Medium|
|     V002|               35.0|    Long|
|     V003|               18.0|  Medium|
|     V004|               20.0|  Medium|
|     V005|               35.0|    Long|
+---------+-------------------+--------+



 Flag vehicles with more than 3 trips in a day

In [67]:
from pyspark.sql.functions import to_date

traffic_df = traffic_df.withColumn('TripDate', to_date(col('EntryTime')))

trip_window = Window.partitionBy('VehicleID', 'TripDate')

traffic_df = traffic_df.withColumn('TripCount', count('*').over(trip_window))

traffic_df = traffic_df.withColumn('Flag',
                                   when(col('TripCount') > 3, True) \
                                  .otherwise(False))

traffic_df.select(
                  'VehicleID',
                  'EntryTime',
                  'TripDate',
                  'TripCount',
                  'Flag').show()


+---------+-------------------+----------+---------+-----+
|VehicleID|          EntryTime|  TripDate|TripCount| Flag|
+---------+-------------------+----------+---------+-----+
|     V001|2024-05-01 08:01:00|2024-05-01|        1|false|
|     V002|2024-05-01 08:10:00|2024-05-01|        1|false|
|     V003|2024-05-01 09:00:00|2024-05-01|        1|false|
|     V004|2024-05-01 09:15:00|2024-05-01|        1|false|
|     V005|2024-05-01 10:05:00|2024-05-01|        1|false|
+---------+-------------------+----------+---------+-----+



#### 10. Export & Reporting

Write final enriched DataFrame to:
 Parquet partitioned by VehicleType

In [68]:
traffic_df.write.format('parquet') \
                .mode('overwrite') \
                .partitionBy('VehicleType') \
                .save('/content/drive/MyDrive/Assessment/enriched_traffic_logs')

Write final enriched DataFrame to: CSV for Dashboards

In [69]:
traffic_df.write.format('csv') \
                .mode('overwrite') \
                .option('header', True) \
                .save('/content/drive/MyDrive/Assessment/enriched_traffic_logs_csv')

 Create summary SQL View: total toll by VehicleType + ExitPoint

In [72]:
traffic_df.createOrReplaceTempView('traffic_logs')

spark.sql("""
          CREATE OR REPLACE TEMP VIEW toll_summary AS
          SELECT
            VehicleType,
            ExitPoint,
            SUM(TollPaid) AS TotalTollCollected
        FROM traffic_logs
        GROUP BY VehicleType, ExitPoint
        ORDER BY TotalTollCollected DESC
""")

spark.sql("SELECT * FROM toll_summary").show()

+-----------+---------+------------------+
|VehicleType|ExitPoint|TotalTollCollected|
+-----------+---------+------------------+
|      Truck|    GateC|             100.0|
|        Bus|    GateA|              70.0|
|        Car|    GateD|              50.0|
|        Car|    GateC|              50.0|
|       Bike|    GateD|              30.0|
+-----------+---------+------------------+

