In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("june19assignment1").getOrCreate()
spark

In [0]:
# Data Ingestion & Schema Analysis
# Load CSV using PySpark with schema inference
spark.conf.set("fs.azure.account.key.hestore.blob.core.windows.net","---------AccessKeyyy----------")

traffic_log_df=spark.read.csv("wasbs://june19assignment1@hestore.blob.core.windows.net/traffic_logs.csv",header=True,inferSchema=True)

traffic_log_df.printSchema()
# Manually define schema and compare
from pyspark.sql.types import *
from pyspark.sql.functions import *

traffic_log_schema=StructType([StructField("LogID",StringType(),True),StructField("VehicleID",StringType(),True),StructField("EntryPoint",StringType(),True),StructField("ExitPoint",StringType(),True),StructField("EntryTime",TimestampType(),True),StructField("ExitTime",TimestampType(),True),StructField("VehicleType",StringType(),True),StructField("SpeedKMH",IntegerType(),True),StructField("TollPaid",IntegerType(),True)])

traffic_log_df=spark.read.csv("wasbs://june19assignment1@hestore.blob.core.windows.net/traffic_logs.csv",header=True,schema=traffic_log_schema)
traffic_log_df.printSchema()
# Ensure EntryTime/ExitTime are timestamp
traffic_log_df=spark.read.csv("wasbs://june19assignment1@hestore.blob.core.windows.net/traffic_logs.csv",header=True,inferSchema=True)
traffic_log_df=traffic_log_df.withColumn("EntryTime",to_timestamp("EntryTime"))
traffic_log_df=traffic_log_df.withColumn("ExitTime",to_timestamp("ExitTime"))
traffic_log_df.printSchema()


root
 |-- LogID: string (nullable = true)
 |-- VehicleID: string (nullable = true)
 |-- EntryPoint: string (nullable = true)
 |-- ExitPoint: string (nullable = true)
 |-- EntryTime: timestamp (nullable = true)
 |-- ExitTime: timestamp (nullable = true)
 |-- VehicleType: string (nullable = true)
 |-- SpeedKMH: integer (nullable = true)
 |-- TollPaid: integer (nullable = true)

root
 |-- LogID: string (nullable = true)
 |-- VehicleID: string (nullable = true)
 |-- EntryPoint: string (nullable = true)
 |-- ExitPoint: string (nullable = true)
 |-- EntryTime: timestamp (nullable = true)
 |-- ExitTime: timestamp (nullable = true)
 |-- VehicleType: string (nullable = true)
 |-- SpeedKMH: integer (nullable = true)
 |-- TollPaid: integer (nullable = true)

root
 |-- LogID: string (nullable = true)
 |-- VehicleID: string (nullable = true)
 |-- EntryPoint: string (nullable = true)
 |-- ExitPoint: string (nullable = true)
 |-- EntryTime: timestamp (nullable = true)
 |-- ExitTime: timestamp (nullab

In [0]:
# Derived Column Creation
# Calculate TripDurationMinutes = ExitTime - EntryTime
traffic_log_df=traffic_log_df.withColumn("TripDurationMinutes",round((unix_timestamp("ExitTime")-unix_timestamp("EntryTime"))/60))
# Add IsOverspeed = SpeedKMH > 60
traffic_log_df=traffic_log_df.withColumn("IsOverspeed",when(col("SpeedKMH")>60,1).otherwise(0))
traffic_log_df.printSchema()
traffic_log_df.show()

root
 |-- LogID: string (nullable = true)
 |-- VehicleID: string (nullable = true)
 |-- EntryPoint: string (nullable = true)
 |-- ExitPoint: string (nullable = true)
 |-- EntryTime: timestamp (nullable = true)
 |-- ExitTime: timestamp (nullable = true)
 |-- VehicleType: string (nullable = true)
 |-- SpeedKMH: integer (nullable = true)
 |-- TollPaid: integer (nullable = true)
 |-- TripDurationMinutes: double (nullable = true)
 |-- IsOverspeed: integer (nullable = false)

+-----+---------+----------+---------+-------------------+-------------------+-----------+--------+--------+-------------------+-----------+
|LogID|VehicleID|EntryPoint|ExitPoint|          EntryTime|           ExitTime|VehicleType|SpeedKMH|TollPaid|TripDurationMinutes|IsOverspeed|
+-----+---------+----------+---------+-------------------+-------------------+-----------+--------+--------+-------------------+-----------+
| L001|     V001|     GateA|    GateC|2024-05-01 08:01:00|2024-05-01 08:20:00|        Car|      60|   

In [0]:
# Vehicle Behavior Aggregations
# Average speed per VehicleType
traffic_log_df.groupBy("VehicleType").agg(avg("SpeedKMH").alias("AvgSpeedKMH")).show()
# Total toll collected per gate (EntryPoint)
traffic_log_df.groupBy("ExitPoint").agg(sum("TollPaid").alias("TotalTollPaid")).show
# Most used ExitPoint
traffic_log_df.groupBy("ExitPoint").count().orderBy(desc("count")).show(1)

+-----------+-----------+
|VehicleType|AvgSpeedKMH|
+-----------+-----------+
|       Bike|       55.0|
|        Car|       70.0|
|      Truck|       45.0|
|        Bus|       40.0|
+-----------+-----------+

+---------+-----+
|ExitPoint|count|
+---------+-----+
|    GateD|    2|
+---------+-----+
only showing top 1 row



In [0]:
# Window Functions
# Rank vehicles by speed within VehicleType
from pyspark.sql.window import Window
traffic_log_df_window=traffic_log_df.withColumn("Rank",dense_rank().over(Window.partitionBy("VehicleType").orderBy(desc("SpeedKMH"))))
t=traffic_log_df_window
traffic_log_df_window.show()
# Find last exit time for each vehicle using lag()
traffic_log_df_window=traffic_log_df_window.withColumn("LastExitTime",lag("ExitTime",1).over(Window.partitionBy("VehicleID").orderBy(desc("ExitTime"))))
traffic_log_df_window.show()

+-----+---------+----------+---------+-------------------+-------------------+-----------+--------+--------+-------------------+-----------+----+
|LogID|VehicleID|EntryPoint|ExitPoint|          EntryTime|           ExitTime|VehicleType|SpeedKMH|TollPaid|TripDurationMinutes|IsOverspeed|Rank|
+-----+---------+----------+---------+-------------------+-------------------+-----------+--------+--------+-------------------+-----------+----+
| L003|     V003|     GateA|    GateD|2024-05-01 09:00:00|2024-05-01 09:18:00|       Bike|      55|      30|               18.0|          0|   1|
| L005|     V005|     GateB|    GateA|2024-05-01 10:05:00|2024-05-01 10:40:00|        Bus|      40|      70|               35.0|          0|   1|
| L004|     V004|     GateC|    GateD|2024-05-01 09:15:00|2024-05-01 09:35:00|        Car|      80|      50|               20.0|          1|   1|
| L001|     V001|     GateA|    GateC|2024-05-01 08:01:00|2024-05-01 08:20:00|        Car|      60|      50|               1

In [0]:
# Session Segmentation
# Group by VehicleID to simulate route sessions
traffic_log_df_window=traffic_log_df_window.groupBy("VehicleID").agg(min("EntryTime").alias("SessionStart"),max("ExitTime").alias("SessionEnd"))
traffic_log_df_window.show()
# Find duration between subsequent trips (idle time)
traffic_log_df_window.withColumn("IdleTime",round((unix_timestamp("SessionEnd")-unix_timestamp("SessionStart"))/60)).show()


+---------+-------------------+-------------------+
|VehicleID|       SessionStart|         SessionEnd|
+---------+-------------------+-------------------+
|     V004|2024-05-01 09:15:00|2024-05-01 09:35:00|
|     V005|2024-05-01 10:05:00|2024-05-01 10:40:00|
|     V001|2024-05-01 08:01:00|2024-05-01 08:20:00|
|     V003|2024-05-01 09:00:00|2024-05-01 09:18:00|
|     V002|2024-05-01 08:10:00|2024-05-01 08:45:00|
+---------+-------------------+-------------------+

+---------+-------------------+-------------------+--------+
|VehicleID|       SessionStart|         SessionEnd|IdleTime|
+---------+-------------------+-------------------+--------+
|     V004|2024-05-01 09:15:00|2024-05-01 09:35:00|    20.0|
|     V005|2024-05-01 10:05:00|2024-05-01 10:40:00|    35.0|
|     V001|2024-05-01 08:01:00|2024-05-01 08:20:00|    19.0|
|     V003|2024-05-01 09:00:00|2024-05-01 09:18:00|    18.0|
|     V002|2024-05-01 08:10:00|2024-05-01 08:45:00|    35.0|
+---------+-------------------+------------

In [0]:
# Anomaly Detection
# Identify vehicles with speed > 70 and TripDuration < 10 minutes
t.filter((col("SpeedKMH")>70) & (col("TripDurationMinutes")<10)).show()
# Vehicles that paid less toll for longer trips
traffic_log_df=traffic_log_df.withColumn("TripTime",round((unix_timestamp("ExitTime")-unix_timestamp("EntryTime"))/60))
traffic_log_df.filter((col("TripTime")<60) & (col("TollPaid")<100)).show()
# Suspicious backtracking (ExitPoint earlier than EntryPoint)
traffic_log_df.filter((col("ExitTime")>col("EntryTime"))).show()

+-----+---------+----------+---------+---------+--------+-----------+--------+--------+-------------------+-----------+----+
|LogID|VehicleID|EntryPoint|ExitPoint|EntryTime|ExitTime|VehicleType|SpeedKMH|TollPaid|TripDurationMinutes|IsOverspeed|Rank|
+-----+---------+----------+---------+---------+--------+-----------+--------+--------+-------------------+-----------+----+
+-----+---------+----------+---------+---------+--------+-----------+--------+--------+-------------------+-----------+----+

+-----+---------+----------+---------+-------------------+-------------------+-----------+--------+--------+-------------------+-----------+--------+
|LogID|VehicleID|EntryPoint|ExitPoint|          EntryTime|           ExitTime|VehicleType|SpeedKMH|TollPaid|TripDurationMinutes|IsOverspeed|TripTime|
+-----+---------+----------+---------+-------------------+-------------------+-----------+--------+--------+-------------------+-----------+--------+
| L001|     V001|     GateA|    GateC|2024-05-01 

In [0]:
from pyspark.sql import Row

vehicle_data = [
    Row(VehicleID="V001", OwnerName="Anil", Model="Hyundai i20", RegisteredCity="Delhi"),
    Row(VehicleID="V002", OwnerName="Rakesh", Model="Tata Truck", RegisteredCity="Chennai"),
    Row(VehicleID="V003", OwnerName="Sana", Model="Yamaha R15", RegisteredCity="Mumbai"),
    Row(VehicleID="V004", OwnerName="Neha", Model="Honda City", RegisteredCity="Bangalore"),
    Row(VehicleID="V005", OwnerName="Zoya", Model="Volvo Bus", RegisteredCity="Pune"),
]

df_registry = spark.createDataFrame(vehicle_data)

# Join and group trips by RegisteredCity
df_joined = traffic_log_df.join(df_registry, on="VehicleID", how="left")
df_joined.groupBy("RegisteredCity").count().show()

+--------------+-----+
|RegisteredCity|count|
+--------------+-----+
|     Bangalore|    1|
|       Chennai|    1|
|        Mumbai|    1|
|          Pune|    1|
|         Delhi|    1|
+--------------+-----+



In [0]:
# Q8_DeltaLakeFeatures

from pyspark.sql.functions import col, expr
from delta.tables import DeltaTable

# Save as Delta
traffic_log_df.write.format("delta").mode("overwrite").save("dbfs:/Workspace/Shared/traffic_delta")

# Load Delta table
traffic_delta = DeltaTable.forPath(spark, "dbfs:/Workspace/Shared/traffic_delta")

# Merge: Update tolls for Bikes (+10)
traffic_delta.alias("old").merge(
    traffic_log_df.filter(col("VehicleType") == "Bike").withColumn("TollPaid", expr("TollPaid + 10")).alias("new"),
    "old.LogID = new.LogID"
).whenMatchedUpdate(set={"TollPaid": "new.TollPaid"}).execute()

# Delete trips > 60 minutes
traffic_delta.delete("TripDurationMinutes > 60")

# Describe history programmatically
traffic_delta.history().show(truncate=False)

# Read version 0
df_v0 = spark.read.format("delta").option("versionAsOf", 0).load("dbfs:/Workspace/Shared/traffic_delta")
df_v0.show()


+-------+-------------------+----------------+----------------------------------+---------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----+------------------+--------------------+-----------+-----------------+-------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [0]:
# Advanced Conditions
# when/otherwise : Tag trip type as:
# "Short" <15min
# "Medium" 15-30min
# "Long" >30min
traffic_log_df_t2=traffic_log_df.withColumn("TripType",when(col("TripDurationMinutes")<15,"Short").when((col("TripDurationMinutes")>=15) & (col("TripDurationMinutes")<=30),"Medium").otherwise("Long"))
traffic_log_df_t2.show()
# Flag vehicles with more than 3 trips in a day
traffic_log_df_t=traffic_log_df.withColumn("TripDate",date_format("EntryTime","yyyy-MM-dd"))
traffic_log_df_t=traffic_log_df_t.groupBy("VehicleID","TripDate").count().withColumnRenamed("count","TripCount")
traffic_log_df_t=traffic_log_df_t.withColumn("IsMoreThan3Trips",when(col("TripCount")>3,1).otherwise(0))
traffic_log_df_t.show()

+-----+---------+----------+---------+-------------------+-------------------+-----------+--------+--------+-------------------+-----------+--------+--------+
|LogID|VehicleID|EntryPoint|ExitPoint|          EntryTime|           ExitTime|VehicleType|SpeedKMH|TollPaid|TripDurationMinutes|IsOverspeed|TripTime|TripType|
+-----+---------+----------+---------+-------------------+-------------------+-----------+--------+--------+-------------------+-----------+--------+--------+
| L001|     V001|     GateA|    GateC|2024-05-01 08:01:00|2024-05-01 08:20:00|        Car|      60|      50|               19.0|          0|    19.0|  Medium|
| L002|     V002|     GateB|    GateC|2024-05-01 08:10:00|2024-05-01 08:45:00|      Truck|      45|     100|               35.0|          0|    35.0|    Long|
| L003|     V003|     GateA|    GateD|2024-05-01 09:00:00|2024-05-01 09:18:00|       Bike|      55|      30|               18.0|          0|    18.0|  Medium|
| L004|     V004|     GateC|    GateD|2024-05-

In [0]:
# Export & Reporting
# Write final enriched DataFrame to:
# Parquet partitioned by VehicleType
traffic_log_df.printSchema()

traffic_log_df.write.mode('overwrite').partitionBy("VehicleType").format("parquet").mode("overwrite").save("dbfs:/FileStore/shared_uploads/traffic_logs_enriched")
# CSV for dashboards
traffic_log_df.write.mode('overwrite').format("csv").mode("overwrite").save("dbfs:/FileStore/shared_uploads/traffic_logs_summary")
# Create summary SQL View: total toll by VehicleType + ExitPoint
spark.sql("CREATE OR REPLACE TEMP VIEW traffic_logs_summary AS SELECT VehicleType, ExitPoint, sum(TollPaid) AS TotalTollPaid FROM traffic_logs_delta GROUP BY VehicleType, ExitPoint")

root
 |-- LogID: string (nullable = true)
 |-- VehicleID: string (nullable = true)
 |-- EntryPoint: string (nullable = true)
 |-- ExitPoint: string (nullable = true)
 |-- EntryTime: timestamp (nullable = true)
 |-- ExitTime: timestamp (nullable = true)
 |-- VehicleType: string (nullable = true)
 |-- SpeedKMH: integer (nullable = true)
 |-- TollPaid: integer (nullable = true)
 |-- TripDurationMinutes: double (nullable = true)
 |-- IsOverspeed: integer (nullable = false)
 |-- TripTime: double (nullable = true)



DataFrame[]