In [None]:
# Necessary imports
import findspark
findspark.init() # Find Spark installation

from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, DoubleType, StringType, BooleanType, TimestampType, StructType, StructField

# For ML tasks (even if demonstrating MapReduce concepts, preprocessing often uses MLlib)
from pyspark.ml.feature import StringIndexer, VectorAssembler, StandardScaler, Bucketizer
from pyspark.ml import Pipeline
from pyspark.ml.linalg import Vectors, VectorUDT, DenseVector
from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix # For potential matrix operations

import math
import heapq
from collections import Counter, defaultdict
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd 

import sys
from pathlib import Path

sys.path.append(str(Path("../..").resolve()))

from src.data_ingestion import *
from src.data_preprocessing import *
from src.descriptive_analytics import *

from pyspark.sql import DataFrame
from pyspark.sql.functions import col
from pyspark.sql import functions as F

import seaborn as sns

import numpy as np

from itertools import combinations

from scipy import stats

import matplotlib.pyplot as plt

import pandas as pd
from pyspark.sql.window import Window

from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.linalg import Vectors




In [None]:
spark = init_spark()
df = load_data(spark, "../../data/US_Accidents_March23.csv")

In [None]:
from pyspark.sql.functions import hour, dayofweek, month, year, when, col, sqrt
from pyspark.ml.feature import Imputer
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from math import sqrt as math_sqrt

# Step 1: Preprocessing and Feature Engineering
df = df.withColumn("hour_of_day", hour(col("Start_Time")))
df = df.withColumn("day_of_week", dayofweek(col("Start_Time")))
df = df.withColumn("month", month(col("Start_Time")))
df = df.withColumn("year", year(col("Start_Time")))
df = df.withColumn("weather_condition_cat", 
                   when(col("Weather_Condition") == "Clear", 0)
                   .when(col("Weather_Condition") == "Rain", 1)
                   .when(col("Weather_Condition") == "Snow", 2)
                   .otherwise(3))
df = df.withColumn("is_night", 
                   when((col("hour_of_day") >= 18) | (col("hour_of_day") < 6), 1).otherwise(0))
df = df.withColumn("severe_accident", 
                   when(col("Severity") >= 3, 1).otherwise(0))

# Impute missing values
imputer = Imputer(
    inputCols=["Temperature(F)", "Wind_Speed(mph)", "Humidity(%)"],
    outputCols=["Temperature_imputed", "Wind_Speed_imputed", "Humidity_imputed"]
)
df = imputer.fit(df).transform(df)

# Drop rows with missing critical columns
df = df.dropna(subset=["Severity", "Start_Lat", "Start_Lng"])

# Step 2: Prepare RDD for MapReduce
# Convert DataFrame to RDD of (point, metadata) tuples
# point: [Start_Lat, Start_Lng], metadata: other columns for analysis
rdd = df.select("Start_Lat", "Start_Lng", "Severity", "weather_condition_cat", 
                "is_night", "severe_accident", "City", "State")\
        .rdd.map(lambda row: (
            [float(row["Start_Lat"]), float(row["Start_Lng"])],
            {
                "Severity": row["Severity"],
                "weather_condition_cat": row["weather_condition_cat"],
                "is_night": row["is_night"],
                "severe_accident": row["severe_accident"],
                "City": row["City"],
                "State": row["State"]
            }
        ))

# Step 3: Initialize Cluster Centers
k = 5
num_iterations = 10
# Randomly select k points as initial centers
initial_centers = rdd.takeSample(False, k, seed=42)
centers = [point for point, _ in initial_centers]

def euclidean_distance(point1, point2):
    return math_sqrt(sum((a - b) ** 2 for a, b in zip(point1, point2)))

# Step 4: MapReduce K-means Implementation
for iteration in range(num_iterations):
    # Map Phase: Assign each point to the nearest center
    def map_to_nearest_center(point_metadata):
        point, metadata = point_metadata
        min_distance = float("inf")
        closest_center_idx = 0
        for idx, center in enumerate(centers):
            distance = euclidean_distance(point, center)
            if distance < min_distance:
                min_distance = distance
                closest_center_idx = idx
        return (closest_center_idx, (point, metadata))
    
    clustered_rdd = rdd.map(map_to_nearest_center)
    
    # Reduce Phase: Compute new centers by averaging points in each cluster
    def reduce_centers(data):
        cluster_idx, points_metadata = data
        points = [p for p, _ in points_metadata]
        count = len(points)
        if count == 0:
            return (cluster_idx, centers[cluster_idx])  # Keep old center if no points
        # Sum coordinates
        sum_coords = [0.0] * len(points[0])
        for point in points:
            for i, coord in enumerate(point):
                sum_coords[i] += coord
        # Average coordinates
        new_center = [coord / count for coord in sum_coords]
        return (cluster_idx, new_center)
    
    new_centers_rdd = clustered_rdd.groupByKey().map(reduce_centers)
    new_centers = new_centers_rdd.collectAsMap()
    
    # Update centers
    centers = [new_centers.get(i, centers[i]) for i in range(k)]
    
    print(f"Iteration {iteration + 1}: Centers updated")

# Step 5: Assign Final Clusters
def assign_final_cluster(point_metadata):
    point, metadata = point_metadata
    min_distance = float("inf")
    closest_center_idx = 0
    for idx, center in enumerate(centers):
        distance = euclidean_distance(point, center)
        if distance < min_distance:
            min_distance = distance
            closest_center_idx = idx
    return {
        "Start_Lat": point[0],
        "Start_Lng": point[1],
        "cluster": closest_center_idx,
        "Severity": metadata["Severity"],
        "weather_condition_cat": metadata["weather_condition_cat"],
        "is_night": metadata["is_night"],
        "severe_accident": metadata["severe_accident"],
        "City": metadata["City"],
        "State": metadata["State"]
    }

final_rdd = rdd.map(assign_final_cluster)
df_clustered = spark.createDataFrame(final_rdd)

# Step 6: Visualize Clusters
df_pd = df_clustered.select("Start_Lat", "Start_Lng", "cluster", "Severity", 
                           "weather_condition_cat", "is_night", "severe_accident", 
                           "City", "State").toPandas()

cluster_centers = np.array(centers)
plt.figure(figsize=(10, 6))
scatter = plt.scatter(df_pd['Start_Lat'], df_pd['Start_Lng'], c=df_pd['cluster'], 
                     cmap='viridis', alpha=0.5, s=10)
plt.colorbar(scatter, label='Cluster Label')
plt.scatter(cluster_centers[:, 0], cluster_centers[:, 1], marker='x', color='red', 
           s=100, label="Cluster Centers")
plt.xlabel('Latitude')
plt.ylabel('Longitude')
plt.title('Geographical Clusters of US Accidents (MapReduce K-means)')
plt.legend()
plt.show()

# Step 7: Analyze Clusters for Insights
# Compute cluster size, average severity, and night/severe ratios
cluster_summary = df_clustered.groupBy("cluster")\
    .agg(
        F.count("*").alias("Accident_Count"),
        F.avg("Severity").alias("Avg_Severity"),
        F.avg("is_night").alias("Night_Accident_Ratio"),
        F.avg("severe_accident").alias("Severe_Accident_Ratio")
    )

# Compute top weather condition per cluster
weather_summary = df_clustered.groupBy("cluster", "weather_condition_cat")\
    .count()\
    .orderBy("cluster", col("count").desc())\
    .groupBy("cluster")\
    .agg(
        F.first("weather_condition_cat").alias("Top_Weather_Code"),
        F.first("count").alias("Weather_Count")
    )\
    .withColumn("Top_Weather", 
                when(col("Top_Weather_Code") == 0, "Clear")
                .when(col("Top_Weather_Code") == 1, "Rain")
                .when(col("Top_Weather_Code") == 2, "Snow")
                .otherwise("Other"))

# Find closest city/state to each cluster center
centers_df = spark.createDataFrame(
    [(i, float(center[0]), float(center[1])) for i, center in enumerate(centers)],
    ["center_cluster", "center_lat", "center_lng"]
)
df_with_centers = df_clustered.crossJoin(centers_df)\
    .withColumn("distance", 
                sqrt((col("Start_Lat") - col("center_lat"))**2 + 
                     (col("Start_Lng") - col("center_lng"))**2).cast(DoubleType()))



In [None]:
# 1) Build a DataFrame of your centers, tagged with the same cluster ID
centers_df = spark.createDataFrame(
    [(i, float(center[0]), float(center[1])) for i, center in enumerate(centers)],
    ["center_cluster", "center_lat", "center_lng"]
)

# 2) Join each accident point to its cluster center
#    — Use an inner join on df_clustered.cluster == centers_df.center_cluster
df_with_centers = (
    df_clustered.alias("d")
      .join(centers_df.alias("c"),
            F.col("d.cluster") == F.col("c.center_cluster"))
      .withColumn(
          "distance",
          F.sqrt(
            (F.col("d.Start_Lat") - F.col("c.center_lat"))**2 +
            (F.col("d.Start_Lng") - F.col("c.center_lng"))**2
          ).cast(DoubleType())
      )
)

# 3) For each cluster, find the minimum distance
min_distances = (
    df_with_centers
      .groupBy("cluster")
      .agg(F.min("distance").alias("min_distance"))
)

# 4) Grab the City/State for whatever point realizes that min distance
closest_points = (
    df_with_centers
      .join(min_distances, on="cluster")
      .where(F.col("distance") == F.col("min_distance"))
      .select("cluster", "City", "State")
      .distinct()
)

# Show to verify
closest_points.show(truncate=False)





# Combine insights
cluster_insights = cluster_summary.join(weather_summary, "cluster")\
                                 .join(closest_points, "cluster")\
                                 .select(
                                     "cluster", "Accident_Count", "Avg_Severity", 
                                     "Night_Accident_Ratio", "Severe_Accident_Ratio", 
                                     "Top_Weather", "Weather_Count", "City", "State"
                                 )

# Display insights
cluster_insights.show(truncate=False)

# Save insights
cluster_insights.toPandas().to_csv("cluster_insights_mapreduce.csv", index=False)

In [None]:
import matplotlib.pyplot as plt

# Convert to Pandas for easy plotting
cluster_insights_pd = cluster_insights.toPandas()

# Plot Accident Count by Cluster
plt.figure(figsize=(5, 3))
plt.bar(cluster_insights_pd['cluster'], cluster_insights_pd['Accident_Count'], color='skyblue')
plt.title('Accident Count by Cluster')
plt.xlabel('Cluster')
plt.ylabel('Accident Count')
plt.xticks(cluster_insights_pd['cluster'])
plt.tight_layout()
plt.show()


In [None]:
# Plot Average Severity by Cluster
plt.figure(figsize=(5, 3))
plt.bar(cluster_insights_pd['cluster'], cluster_insights_pd['Avg_Severity'], color='lightcoral')
plt.title('Average Severity by Cluster')
plt.xlabel('Cluster')
plt.ylabel('Average Severity')
plt.xticks(cluster_insights_pd['cluster'])
plt.tight_layout()
plt.show()