In [None]:
from pyspark.sql import SparkSession
spark=SparkSession.builder\
    .appName("Read CSV Example")\
    .getOrCreate()

In [None]:
df=spark.read\
    .option("header","true")\
    .option("inferSchema","true")\
    .csv("traffic_data_large.csv")

In [None]:
df.printSchema()

root
 |-- sensor_id: string (nullable = true)
 |-- location: string (nullable = true)
 |-- road_name: string (nullable = true)
 |-- vehicle_count: string (nullable = true)
 |-- avg_speed: double (nullable = true)
 |-- temperature: integer (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- status: string (nullable = true)



In [None]:
df.count()

500000

In [None]:
from pyspark.sql import functions as F
for c in df.columns:
    df=df.withColumn(c,F.trim(F.col(c)))

In [None]:
df=df.withColumn("vehicle_count_clean",F.when(F.col("vehicle_count").rlike("^[0-9]+$"),F.col("vehicle_count").cast("int")))

In [None]:
df=df.withColumn("avg_speed_clean",F.col("avg_speed").cast("double"))

In [None]:
df = df.withColumn(
    "event_time",
    F.coalesce(
        F.to_timestamp("timestamp", "yyyy-MM-dd HH:mm:ss"),
        F.to_timestamp("timestamp", "dd/MM/yyyy HH:mm:ss"),
        F.to_timestamp("timestamp", "yyyy/MM/dd HH:mm:ss")
    ))

phase 3


In [None]:
df.filter(F.col("vehicle_count_clean").isNull()).count()

49873

In [None]:
df.filter(F.col("event_time").isNull()).count()

In [None]:
clean_df=df.filter(F.col("status")=="ACTIVE")

In [None]:
clean_df.groupBy("road_name")\
.agg(F.sum("vehicle_count_clean").alias("total_vehicle_count"))

DataFrame[road_name: string, total_vehicle_count: bigint]

In [None]:
clean_df.groupBy("location",F.hour("event_time").alias("hour"))\
.agg(F.sum("vehicle_count_clean").alias("total_vehicle_count"))\
.orderBy("vehicles",ascending=False)

In [None]:
clean_df.groupBy("road_name")\
.agg(F.avg("avg_speed_clean").alias("avg_speed"))\
.orderBy("avg_speed",ascending=False)

DataFrame[road_name: string, avg_speed: double]

In [None]:
from pyspark.sql.window import Window
road_window = Window.orderBy("avg_speed")
loc_window = Window.partitionBy("location").orderBy(F.desc("vehicle_count_clean"))

In [None]:
congestion_rank = clean_df.groupBy("road_name") \
    .agg(F.avg("avg_speed_clean").alias("avg_speed")) \
    .withColumn("rank", F.rank().over(road_window))

In [None]:
ranked_roads = clean_df.withColumn("rank", F.rank().over(loc_window))

In [None]:
ranked_roads.filter(F.col("rank") <= 3)

DataFrame[sensor_id: string, location: string, road_name: string, vehicle_count: string, avg_speed: string, temperature: string, timestamp: string, status: string, vehicle_count_clean: int, avg_speed_clean: double, event_time: timestamp, rank: int]

In [None]:
w=Window.partitionBy("sensor_id").orderBy("event_time")

In [None]:
clean_df.withColumn("prev_speed",F.lag("avg_speed_clean").over(w)).filter(F.col("avg_speed_clean")<F.col("prev_speed")*0.7)

DataFrame[sensor_id: string, location: string, road_name: string, vehicle_count: string, avg_speed: string, temperature: string, timestamp: string, status: string, vehicle_count_clean: int, avg_speed_clean: double, event_time: timestamp, prev_speed: double]

In [None]:
clean_df.withColumn("prev_count",F.lag("vehicle_count_clean").over(w)).filter(F.col("vehicle_count_clean")<F.col("prev_count")*1.5)

DataFrame[sensor_id: string, location: string, road_name: string, vehicle_count: string, avg_speed: string, temperature: string, timestamp: string, status: string, vehicle_count_clean: int, avg_speed_clean: double, event_time: timestamp, prev_count: int]

In [None]:
clean_df.rdd.getNumPartitions()
clean_df.cache()
clean_df.explain(True)
clean_df=clean_df.repartition("location")

== Parsed Logical Plan ==
'Filter '`=`('status, ACTIVE)
+- Project [sensor_id#46, location#47, road_name#48, vehicle_count#49, avg_speed#50, temperature#51, timestamp#52, status#53, vehicle_count_clean#91, avg_speed_clean#100, coalesce(to_timestamp(timestamp#52, Some(yyyy-MM-dd HH:mm:ss), TimestampType, Some(Etc/UTC), true), to_timestamp(timestamp#52, Some(dd/MM/yyyy HH:mm:ss), TimestampType, Some(Etc/UTC), true), to_timestamp(timestamp#52, Some(yyyy/MM/dd HH:mm:ss), TimestampType, Some(Etc/UTC), true)) AS event_time#101]
   +- Project [sensor_id#46, location#47, road_name#48, vehicle_count#49, avg_speed#50, temperature#51, timestamp#52, status#53, vehicle_count_clean#91, cast(avg_speed#50 as double) AS avg_speed_clean#100]
      +- Project [sensor_id#46, location#47, road_name#48, vehicle_count#49, avg_speed#50, temperature#51, timestamp#52, status#53, CASE WHEN RLIKE(vehicle_count#49, ^[0-9]+$) THEN cast(vehicle_count#49 as int) END AS vehicle_count_clean#91]
         +- Project [sen

In [None]:
traffic_rdd=clean_df.rdd

In [None]:
traffic_rdd.map(lambda x:x.vehicle_count_clean or 0).reduce(lambda x,y:x+y)

In [None]:
traffic_rdd.map(lambda x:(x.location,1))\
.reduceByKey(lambda a,b:a+b)

In [None]:
slow_roads = clean_df.filter(F.col("avg_speed_clean") < 25).select("road_name")
busy_roads = clean_df.filter(F.col("vehicle_count_clean") > 60).select("road_name")
slow_roads.intersect(busy_roads)
slow_roads.subtract(busy_roads)

DataFrame[road_name: string]

In [None]:
clean_df.write \
    .mode("overwrite") \
    .partitionBy("location") \
    .parquet("traffic_parquet")

In [None]:
congestion_rank.write \
    .mode("overwrite") \
    .orc("traffic_congestion_orc")

In [None]:
spark.read.parquet("traffic_parquet").show()
spark.read.orc("traffic_congestion_orc").show()