In [39]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import count as pyspark_count,count, when, isnan, sum,coalesce,col, concat_ws
import os

In [40]:
spark = SparkSession.builder \
    .appName("Traffic Data Transformation") \
    .config("spark.hadoop.fs.defaultFS", "file:///") \
    .config("spark.sql.warehouse.dir", os.getcwd()) \
    .getOrCreate()


In [41]:
data_path='Automated_Traffic_Volume_Counts_20241218.csv'
data_df=spark.read.option('header','true').csv(data_path)

In [42]:
data_df.printSchema()

root
 |-- RequestID: string (nullable = true)
 |-- Boro: string (nullable = true)
 |-- Yr: string (nullable = true)
 |-- M: string (nullable = true)
 |-- D: string (nullable = true)
 |-- HH: string (nullable = true)
 |-- MM: string (nullable = true)
 |-- Vol: string (nullable = true)
 |-- SegmentID: string (nullable = true)
 |-- WktGeom: string (nullable = true)
 |-- street: string (nullable = true)
 |-- fromSt: string (nullable = true)
 |-- toSt: string (nullable = true)
 |-- Direction: string (nullable = true)



In [43]:
data_df.show(5)

+---------+------+----+---+---+---+---+---+---------+--------------------+--------------+--------------------+--------+---------+
|RequestID|  Boro|  Yr|  M|  D| HH| MM|Vol|SegmentID|             WktGeom|        street|              fromSt|    toSt|Direction|
+---------+------+----+---+---+---+---+---+---------+--------------------+--------------+--------------------+--------+---------+
|    32970|Queens|2021|  4| 30|  2|  0|  0|   149701|POINT (997407.099...|PULASKI BRIDGE|Newtown Creek Sho...|Dead end|       NB|
|    32970|Queens|2021|  4| 30|  2| 15|  1|   149701|POINT (997407.099...|PULASKI BRIDGE|Newtown Creek Sho...|Dead end|       NB|
|    32970|Queens|2021|  4| 30|  2| 30|  0|   149701|POINT (997407.099...|PULASKI BRIDGE|Newtown Creek Sho...|Dead end|       NB|
|    32970|Queens|2021|  4| 30|  2| 45|  0|   149701|POINT (997407.099...|PULASKI BRIDGE|Newtown Creek Sho...|Dead end|       NB|
|    32970|Queens|2021|  4| 30|  3|  0|  1|   149701|POINT (997407.099...|PULASKI BRIDGE|N

In [44]:
data_count=data_df.count()
print('Total Records:',data_count)

Total Records: 1712605


In [45]:
data_df=data_df.withColumn('Date',concat_ws('-',col('Yr'),col('M'),col('D')))


In [46]:
data_df=data_df.withColumn('Time',concat_ws(':',col("HH"),col("MM")))

In [47]:
data_df = data_df.drop("Yr", "M", "D", "HH", "MM",'Data') 

In [48]:
data_df.show(5)

+---------+------+---+---------+--------------------+--------------+--------------------+--------+---------+---------+----+
|RequestID|  Boro|Vol|SegmentID|             WktGeom|        street|              fromSt|    toSt|Direction|     Date|Time|
+---------+------+---+---------+--------------------+--------------+--------------------+--------+---------+---------+----+
|    32970|Queens|  0|   149701|POINT (997407.099...|PULASKI BRIDGE|Newtown Creek Sho...|Dead end|       NB|2021-4-30| 2:0|
|    32970|Queens|  1|   149701|POINT (997407.099...|PULASKI BRIDGE|Newtown Creek Sho...|Dead end|       NB|2021-4-30|2:15|
|    32970|Queens|  0|   149701|POINT (997407.099...|PULASKI BRIDGE|Newtown Creek Sho...|Dead end|       NB|2021-4-30|2:30|
|    32970|Queens|  0|   149701|POINT (997407.099...|PULASKI BRIDGE|Newtown Creek Sho...|Dead end|       NB|2021-4-30|2:45|
|    32970|Queens|  1|   149701|POINT (997407.099...|PULASKI BRIDGE|Newtown Creek Sho...|Dead end|       NB|2021-4-30| 3:0|
+-------

In [49]:
data_df=data_df.orderBy(col("Date").desc())

In [50]:
data_df.show(10)

+---------+--------+---+---------+--------------------+------------------+---------------+--------------------+---------+--------+-----+
|RequestID|    Boro|Vol|SegmentID|             WktGeom|            street|         fromSt|                toSt|Direction|    Date| Time|
+---------+--------+---+---------+--------------------+------------------+---------------+--------------------+---------+--------+-----+
|    37697|Brooklyn|215|    28962|POINT (990590.819...|   FLATBUSH AVENUE|Atlantic Avenue|Eastern Parkway Line|       SB|2024-6-9|11:15|
|    37697|Brooklyn|199|    28962|POINT (990590.819...|   FLATBUSH AVENUE|Atlantic Avenue|Eastern Parkway Line|       NB|2024-6-9|  0:0|
|    37699|  Queens|126|    75814|POINT (1012773.57...|NORTHERN BOULEVARD|      69 Street|           70 Street|       EB|2024-6-9|  0:0|
|    37697|Brooklyn|108|    28962|POINT (990590.819...|   FLATBUSH AVENUE|Atlantic Avenue|Eastern Parkway Line|       NB|2024-6-9| 2:30|
|    37697|Brooklyn|240|    28962|POINT (

In [51]:
null_counts = data_df.select([pyspark_count(when(col(c).isNull() | isnan(c), c)).alias(c) for c in data_df.columns])


In [52]:
null_counts.show()

+---------+----+---+---------+-------+------+------+----+---------+----+----+
|RequestID|Boro|Vol|SegmentID|WktGeom|street|fromSt|toSt|Direction|Date|Time|
+---------+----+---+---------+-------+------+------+----+---------+----+----+
|        0|   0|  0|        0|      0|     0|     0|1246|        0|   0|   0|
+---------+----+---+---------+-------+------+------+----+---------+----+----+



In [53]:
data_df=data_df.filter(col("toSt").isNotNull())

In [54]:
traffic_by_street=data_df.groupBy

In [55]:
popular_routes=data_df.groupby('fromSt','toSt').agg(count("*").alias('Route_frequency'),sum('Vol').alias("Total_Volume")).orderBy(col('Route_frequency')).orderBy("Route_frequency",ascending=False)

In [56]:
popular_routes.show(10)

+--------------------+--------------------+---------------+------------+
|              fromSt|                toSt|Route_frequency|Total_Volume|
+--------------------+--------------------+---------------+------------+
|            Dead End|            Dead end|          71389| 1.3382925E7|
|    BOROUGH BOUNDARY|       BODY OF WATER|          17593|   5107796.0|
|           45 Avenue|           46 Avenue|          11317|    567782.0|
|Harlem River Shor...|Harlem River Shor...|          10677|   2142447.0|
|            Dead End|Bronx River Shore...|           9054|   1274906.0|
|      West 60 Street|      West 61 Street|           8526|    927837.0|
|       Borden Avenue|            Dead end|           8224|    587137.0|
|Newtown Creek Sho...|            Dead end|           7980|   2141862.0|
|       Flushing Line|       Flushing Line|           7812|    799714.0|
|            Dead End|Harlem River Shor...|           6822|   2260627.0|
+--------------------+--------------------+--------

In [57]:
departure_trends=data_df.groupBy("fromSt").agg(
    count("*").alias("Total_departures"),
    sum("Vol").alias("Total_Volume")
).orderBy("Total_departures",ascending=False)

departure_trends.show(10)

+--------------------+----------------+------------+
|              fromSt|Total_departures|Total_Volume|
+--------------------+----------------+------------+
|            Dead End|          195077| 3.3134959E7|
|            Dead end|           24043|   2285230.0|
|    BOROUGH BOUNDARY|           17751|   5151806.0|
|           45 Avenue|           11911|    586064.0|
|Harlem River Shor...|           10677|   2142447.0|
|       Borden Avenue|            9376|    743295.0|
|      West 60 Street|            8526|    927837.0|
|       Flushing Line|            8096|    815804.0|
|Newtown Creek Sho...|            7980|   2141862.0|
|               Alley|            7548|    244611.0|
+--------------------+----------------+------------+
only showing top 10 rows



In [58]:
destination_trends=data_df.groupBy("toSt").agg(
    count("*").alias("Total_arrivals"),
    sum("Vol").alias("Total_Volume")
).orderBy("Total_arrivals",ascending=False)

destination_trends.show(10)

+--------------------+--------------+------------+
|                toSt|Total_arrivals|Total_Volume|
+--------------------+--------------+------------+
|            Dead end|        202638| 3.7960738E7|
|       BODY OF WATER|         32852|   9315257.0|
|            DEAD END|         25380|   4836385.0|
|Harlem River Shor...|         22597|   5384624.0|
|Bronx River Shore...|         13173|   1980249.0|
|            Dead End|         12004|   1624061.0|
|           46 Avenue|         11317|    567782.0|
|                BEND|         11142|   2825734.0|
|       Flushing Line|          9348|    915322.0|
|       8 Avenue Line|          9102|    990829.0|
+--------------------+--------------+------------+
only showing top 10 rows



In [59]:
print("Popular Routes (Departure -> Destination):")
popular_routes = data_df.groupBy("fromSt", "toSt",'Time').agg(
    count("*").alias("route_frequency"),
    sum("Vol").alias("total_volume")
).orderBy("route_frequency", ascending=False)
popular_routes.show(10)

Popular Routes (Departure -> Destination):
+--------+--------+-----+---------------+------------+
|  fromSt|    toSt| Time|route_frequency|total_volume|
+--------+--------+-----+---------------+------------+
|Dead End|Dead end| 8:30|            754|    180946.0|
|Dead End|Dead end| 8:15|            753|    183678.0|
|Dead End|Dead end| 7:45|            752|    182732.0|
|Dead End|Dead end| 9:45|            750|    168656.0|
|Dead End|Dead end| 7:30|            749|    181408.0|
|Dead End|Dead end|10:15|            749|    168232.0|
|Dead End|Dead end| 8:45|            749|    177118.0|
|Dead End|Dead end|10:45|            749|    166930.0|
|Dead End|Dead end|10:30|            748|    167949.0|
|Dead End|Dead end| 2:45|            747|     29616.0|
+--------+--------+-----+---------------+------------+
only showing top 10 rows



In [60]:
print("Departure Trends with Time:")
departure_trends_time = data_df.groupBy("fromSt", "Time").agg(
    count("*").alias("total_departures"),
    sum("Vol").alias("total_volume")
).orderBy("total_departures", ascending=False)
departure_trends_time.show(10)

print("Destination Trends with Time:")
destination_trends_time = data_df.groupBy("toSt", "Time").agg(
    count("*").alias("total_arrivals"),
    sum("Vol").alias("total_volume")
).orderBy("total_arrivals", ascending=False)
destination_trends_time.show(10)

Departure Trends with Time:
+--------+-----+----------------+------------+
|  fromSt| Time|total_departures|total_volume|
+--------+-----+----------------+------------+
|Dead End| 7:30|            2016|    436243.0|
|Dead End| 9:45|            2015|    413067.0|
|Dead End| 8:15|            2014|    442786.0|
|Dead End| 8:45|            2014|    429337.0|
|Dead End| 7:15|            2014|    419434.0|
|Dead End| 7:45|            2014|    443093.0|
|Dead End|10:30|            2013|    411361.0|
|Dead End|10:45|            2013|    412334.0|
|Dead End| 9:30|            2012|    415057.0|
|Dead End|10:15|            2012|    406715.0|
+--------+-----+----------------+------------+
only showing top 10 rows

Destination Trends with Time:
+--------+-----+--------------+------------+
|    toSt| Time|total_arrivals|total_volume|
+--------+-----+--------------+------------+
|Dead end| 8:45|          2123|    492154.0|
|Dead end| 8:30|          2122|    495082.0|
|Dead end| 7:30|          2120|  

In [61]:
departures = data_df.groupBy("fromSt", "Time").agg(
    sum("Vol").alias("departures")
).withColumnRenamed("Time", "departure_time")

arrivals = data_df.groupBy("toSt", "Time").agg(
    sum("Vol").alias("arrivals")
).withColumnRenamed("Time", "arrival_time")

departures_alias = departures.alias("dep")
arrivals_alias = arrivals.alias("arr")

traffic_discrepancies = departures_alias.join(
    arrivals_alias,
    (col("dep.fromSt") == col("arr.toSt")) & (col("dep.departure_time") == col("arr.arrival_time")),
    "full_outer"
).select(
    coalesce(col("dep.fromSt"), col("arr.toSt")).alias("street"),
    coalesce(col("dep.departure_time"), col("arr.arrival_time")).alias("Time"),
    coalesce(col("dep.departures"), col("arr.arrivals")).alias("departures"),
    coalesce(col("arr.arrivals"), col("dep.departures")).alias("arrivals")
)

traffic_discrepancies = traffic_discrepancies.withColumn(
    "net_flow", col("departures") - col("arrivals")
)

traffic_discrepancies.orderBy("net_flow", ascending=False).show(10)

time_discrepancies = traffic_discrepancies.groupBy("Time").agg(
    sum("net_flow").alias("total_discrepancy")
).orderBy("total_discrepancy", ascending=False)
time_discrepancies.show(10)


+--------+-----+----------+--------+--------+
|  street| Time|departures|arrivals|net_flow|
+--------+-----+----------+--------+--------+
|Dead End|17:15|  514808.0| 25401.0|489407.0|
|Dead End|15:30|  512229.0| 24174.0|488055.0|
|Dead End|16:15|  510908.0| 24665.0|486243.0|
|Dead End|15:15|  509605.0| 24216.0|485389.0|
|Dead End|15:45|  509000.0| 24109.0|484891.0|
|Dead End|16:30|  509532.0| 24813.0|484719.0|
|Dead End|17:30|  506843.0| 24586.0|482257.0|
|Dead End|16:45|  506826.0| 24713.0|482113.0|
|Dead End| 16:0|  502152.0| 24454.0|477698.0|
|Dead End| 17:0|  501638.0| 25100.0|476538.0|
+--------+-----+----------+--------+--------+
only showing top 10 rows

+-----+-----------------+
| Time|total_discrepancy|
+-----+-----------------+
| 12:0|           6160.0|
| 11:0|           5958.0|
|14:10|              0.0|
| 5:10|              0.0|
|11:20|              0.0|
| 9:50|              0.0|
| 6:20|              0.0|
| 2:20|              0.0|
| 0:40|              0.0|
|10:40|           

In [62]:
hubs = traffic_discrepancies.withColumn(
    "hub_type",
    when(col("net_flow") > 0, "Outflow Hub")
    .when(col("net_flow") < 0, "Inflow Hub")
    .otherwise("Balanced")
)

print("Inflow Hubs (Arrivals > Departures):")
hubs.filter(hubs["hub_type"] == "Inflow Hub").orderBy("net_flow", ascending=True).show(10)

print("Outflow Hubs (Departures > Arrivals):")
hubs.filter(hubs["hub_type"] == "Outflow Hub").orderBy("net_flow", ascending=False).show(10)

Inflow Hubs (Arrivals > Departures):
+--------+-----+----------+--------+---------+----------+
|  street| Time|departures|arrivals| net_flow|  hub_type|
+--------+-----+----------+--------+---------+----------+
|Dead end|15:30|   35963.0|576674.0|-540711.0|Inflow Hub|
|Dead end|16:15|   35381.0|574529.0|-539148.0|Inflow Hub|
|Dead end|17:15|   35636.0|574102.0|-538466.0|Inflow Hub|
|Dead end|15:15|   36155.0|574413.0|-538258.0|Inflow Hub|
|Dead end|15:45|   35406.0|570410.0|-535004.0|Inflow Hub|
|Dead end|16:30|   35814.0|570726.0|-534912.0|Inflow Hub|
|Dead end|16:45|   35687.0|569697.0|-534010.0|Inflow Hub|
|Dead end| 16:0|   35792.0|568625.0|-532833.0|Inflow Hub|
|Dead end|14:30|   34487.0|567074.0|-532587.0|Inflow Hub|
|Dead end| 17:0|   35503.0|566887.0|-531384.0|Inflow Hub|
+--------+-----+----------+--------+---------+----------+
only showing top 10 rows

Outflow Hubs (Departures > Arrivals):
+--------+-----+----------+--------+--------+-----------+
|  street| Time|departures|ar

In [63]:
for row in hubs.collect():
    if row['hub_type'] == "Outflow Hub":
        print(f"Increase green signal duration for {row['street']}")
    elif row['hub_type'] == "Inflow Hub":
        print(f"Decrease green signal duration for {row['street']}")

Decrease green signal duration for 1 AV
Increase green signal duration for 1 AV
Increase green signal duration for 1 AV
Increase green signal duration for 1 AV
Decrease green signal duration for 1 AV
Increase green signal duration for 1 AV
Increase green signal duration for 1 AV
Increase green signal duration for 1 AV
Increase green signal duration for 1 AV
Increase green signal duration for 1 AV
Increase green signal duration for 1 AV
Decrease green signal duration for 1 Avenue
Decrease green signal duration for 1 Avenue
Decrease green signal duration for 1 Avenue
Decrease green signal duration for 1 Avenue
Decrease green signal duration for 1 Avenue
Decrease green signal duration for 1 Avenue
Decrease green signal duration for 1 Avenue
Decrease green signal duration for 1 Avenue
Decrease green signal duration for 1 Avenue
Increase green signal duration for 10 AV
Increase green signal duration for 10 AV
Increase green signal duration for 10 AV
Increase green signal duration for 10 AV


In [37]:
data_df = data_df.toPandas()

data_df["prev_vol_1"] = data_df["Vol"].shift(1)
data_df["prev_vol_2"] = data_df["Vol"].shift(2)
data_df["prev_vol_3"] = data_df["Vol"].shift(3)

data_df.dropna(inplace=True)

data_df["Vol"] = data_df["Vol"].astype(float)
data_df["prev_vol_1"] = data_df["prev_vol_1"].astype(float)
data_df["prev_vol_2"] = data_df["prev_vol_2"].astype(float)
data_df["prev_vol_3"] = data_df["prev_vol_3"].astype(float)

feature_cols = ["prev_vol_1", "prev_vol_2", "prev_vol_3"]
X = data_df[feature_cols]
y = data_df["Vol"]

scaler = MinMaxScaler()
X_normalized = scaler.fit_transform(X)

X_train, X_test, y_train, y_test = train_test_split(X_normalized, y, test_size=0.2, random_state=42)

model = Sequential([
    Dense(64, activation='relu', input_shape=(X_train.shape[1],)),
    Dense(32, activation='relu'),
    Dense(1)
])

model.compile(optimizer='adam', loss='mse', metrics=['mae'])

model.fit(X_train, y_train, epochs=25, batch_size=32, validation_data=(X_test, y_test))

loss, mae = model.evaluate(X_test, y_test)
print(f"Mean Absolute Error (MAE): {mae}")

latest_data = data_df.iloc[-3:][feature_cols]
latest_data_normalized = scaler.transform(latest_data)

predicted_volume = model.predict(latest_data_normalized)
print("Predicted Volume:")
print(predicted_volume)

  super().__init__(activity_regularizer=activity_regularizer, **kwargs)


Epoch 1/25
[1m42784/42784[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m47s[0m 1ms/step - loss: 8540.1553 - mae: 35.1692 - val_loss: 2388.1340 - val_mae: 22.8152
Epoch 2/25
[1m42784/42784[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m47s[0m 1ms/step - loss: 2280.9097 - mae: 18.6952 - val_loss: 2189.9077 - val_mae: 18.2998
Epoch 3/25
[1m42784/42784[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m47s[0m 1ms/step - loss: 2160.6206 - mae: 18.5869 - val_loss: 2152.7314 - val_mae: 18.1440
Epoch 4/25
[1m42784/42784[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m47s[0m 1ms/step - loss: 2133.7603 - mae: 18.6076 - val_loss: 2146.6501 - val_mae: 18.6816
Epoch 5/25
[1m42784/42784[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m43s[0m 1ms/step - loss: 2146.0391 - mae: 18.4387 - val_loss: 2182.9980 - val_mae: 19.9031
Epoch 6/25
[1m42784/42784[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m42s[0m 990us/step - loss: 2048.3865 - mae: 18.2599 - val_loss: 2082.4031 - val_mae: 17.6223
Epoch 7/

In [38]:
spark.stop()