In [0]:
%pip install kagglehub

In [0]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("sobhanmoosavi/us-accidents")

print("Path to dataset files:", path)

In [0]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.rdd import RDD
import matplotlib.pyplot as plt
import pandas as pd
import os

In [0]:
spark = SparkSession.builder \
    .appName("COMP4651Project") \
    .config("spark.driver.memory", "16g") \
    .config("spark.executor.memory", "8g") \
    .config("spark.executor.cores", "4") \
    .config("spark.sql.files.maxPartitionBytes", "128mb") \
    .getOrCreate()

df = spark.read.csv('file:' + path, header=True, inferSchema=True)

In [0]:
spark

In [0]:
df.printSchema()

# show NA values


In [0]:
df.na.drop(how="any", thresh=2).count()
rdd = df.rdd

In [0]:
df.head(1)

In [0]:
first_element = rdd.take(1)
first_element[0].ID

# Total cases

In [0]:
# Convert the DataFrame to an RDD
rdd = df.rdd

# Map each row to a count of 1
mapped_rdd = rdd.map(lambda x: 1)

# Reduce by key to sum up all the counts
total_cases = mapped_rdd.reduce(lambda a, b: a + b)

print("Total number of cases:", total_cases)

# Count by Cities

In [0]:
mapped_rdd = rdd.map(lambda x: (x.City, 1))
reduced_rdd = mapped_rdd.reduceByKey(lambda a, b: a + b)
result_df = reduced_rdd.toDF(["City", "Cases"])
result_desc_df = result_df.orderBy(col("Cases").desc()).limit(15)
result_desc_df = result_desc_df.toPandas()

In [0]:
result_df_imporved = df.groupBy("City").agg(count("*").alias("Cases"))
result_desc_df = result_df_imporved.orderBy(col("Cases").desc()).limit(15)
result_desc_df = result_desc_df.toPandas()

In [0]:
# Plotting
plt.figure(figsize=(10, 6))
plt.barh(result_desc_df["City"], result_desc_df["Cases"], color="skyblue")
plt.xlabel("Number of Accidents")
plt.title("Top 15 Cities by Number of Accidents")
plt.gca().invert_yaxis()  # Invert y-axis to have the city with the most accidents on top
plt.tight_layout()

# Show the plot
plt.show()


In [0]:
result_asec_df = result_df_imporved.orderBy(col("Cases")).limit(15)
result_asec_df = result_asec_df.toPandas()

In [0]:
# Plotting
plt.figure(figsize=(10, 6))
plt.barh(result_asec_df["City"], result_asec_df["Cases"], color="skyblue")
plt.xlabel("Number of Accidents")
plt.title("Least 15 Cities by Number of Accidents")
plt.gca().invert_yaxis()  # Invert y-axis to have the city with the most accidents on top
plt.tight_layout()

# Show the plot
plt.show()


Find the total number of cities in result_df

In [0]:
num_cities = result_df_imporved.count()
print(f"Total number of cities in result_df_improved: {num_cities}")

Find the total number of accident cases in these 15 states

In [0]:
import pandas as pd

sum_of_cases = result_desc_df['Cases'].sum()

print(f"The sum of cases in the top 15 cities is: {sum_of_cases}")
print(f"percentage :  {sum_of_cases / total_cases}")

# Count By States

In [0]:
mapped_state_rdd = rdd.map(lambda x: (x.State, 1))
reduced_rdd = mapped_state_rdd.reduceByKey(lambda a, b: a + b)
result_df = reduced_rdd.toDF(["State", "Cases"])

result_desc_df = result_df.orderBy(col("Cases").desc()).limit(15)
result_desc_df = result_desc_df.toPandas()

result_asec_df = result_df.orderBy(col("Cases")).limit(15)
result_asec_df = result_asec_df.toPandas()

In [0]:
# Calculate the total number of accidents
total_accidents = 7728394

# Plotting
plt.figure(figsize=(10, 6))
plt.barh(result_desc_df["State"], result_desc_df["Cases"], color="#87CEEB")
plt.xlabel("Number of Accidents")
plt.title("Top 15 States by Number of Accidents")
plt.gca().invert_yaxis()

# Add some grid lines
plt.grid(axis="x", linestyle="--", alpha=0.5)

# Add some padding to the y-axis labels
plt.gca().tick_params(axis="y", pad=10)

# Show the values next to each bar
for i, (value, state) in enumerate(zip(result_desc_df["Cases"], result_desc_df["State"])):
    percentage = (value / total_accidents) * 100
    plt.text(value, i, f"{value:,} ({percentage:.2f}%)", va="center", ha="left")

# Show the plot
plt.tight_layout()
plt.show()

In [0]:
# Calculate the total number of accidents
total_accidents = total_cases

# Plotting
plt.figure(figsize=(10, 6))
plt.barh(result_asec_df["State"], result_asec_df["Cases"], color="#87CEEB")
plt.xlabel("Number of Accidents")
plt.title("Top 15 States with least no. of Accidents")
plt.gca().invert_yaxis()

# Add some grid lines
plt.grid(axis="x", linestyle="--", alpha=0.5)

# Add some padding to the y-axis labels
plt.gca().tick_params(axis="y", pad=10)

# Show the values next to each bar
for i, (value, state) in enumerate(zip(result_asec_df["Cases"], result_asec_df["State"])):
    percentage = (value / total_accidents) * 100
    plt.text(value, i, f"{value:,} ({percentage:.2f}%)", va="center", ha="left")

# Show the plot
plt.tight_layout()
plt.show()

## visualize by State

In [0]:
%pip install folium

In [0]:
import folium
from folium.plugins import HeatMap

# Step 1: Prepare your data using PySpark
# Assuming df is your original DataFrame with 'Start_Lat', 'Start_Lng', and 'State'
# Filter or sample the data if necessary
df_sampled = df.select("Start_Lat", "Start_Lng").dropna()

# Optionally, sample the data for better performance if the dataset is very large
df_sampled = df_sampled.sample(withReplacement=False, fraction=400000 / df.count())

# Convert to RDD and map to coordinate pairs
rdd = df_sampled.rdd.map(lambda row: (row["Start_Lat"], row["Start_Lng"]))

# Collect the data back to the driver
heat_data = rdd.collect()

# Step 2: Create a base map
# Center the map on the mean latitude and longitude of the dataset
center_lat = df_sampled.selectExpr("avg(Start_Lat)").collect()[0][0]
center_lng = df_sampled.selectExpr("avg(Start_Lng)").collect()[0][0]
m = folium.Map(location=[center_lat, center_lng], zoom_start=5)

# Step 3: Add the heatmap layer
HeatMap(heat_data, radius=10, blur=15, max_zoom=1).add_to(m)

# Step 4: Display and save the map
# m.save("accidents_heatmap.html")
m

# Timezone Analysis

In [0]:
# slow rdd methods
# rdd = df.rdd
# mapped_rdd = rdd.map(lambda x: (x.Timezone, 1))
# # Use combineByKey for optimized aggregation

# combined_rdd = mapped_rdd.combineByKey(
#     lambda value: value,  # CreateCombiner: Initialize the count for a key
#     lambda acc, value: acc + value,  # MergeValue: Add one to the existing count
#     lambda acc1, acc2: acc1 + acc2  # MergeCombiners: Combine counts from different partitions
# )

# result_df = combined_rdd.toDF(["TimeZone", "Cases"]).toPandas()

# faster dataframe functions
from pyspark.sql import functions as F

# Group by Timezone and count the occurrences
result_df = (
    df.groupBy("Timezone")
    .agg(F.count("*").alias("Cases"))  # Count the number of cases for each timezone
    .toPandas()  # Convert to Pandas DataFrame
)

# Display the resulting DataFrame
print(result_df)



In [0]:
import seaborn as sns
# import seaborn as sns
# Plot using seaborn
plt.figure(figsize=(10, 6))
sns.barplot(x="Timezone", y="Cases", data=result_df, palette="viridis")

# Add labels and title
plt.xlabel("Timezone", fontsize=12)
plt.ylabel("Number of Accidents", fontsize=12)
plt.title("Number of Accidents by Timezone", fontsize=14)
plt.xticks(rotation=45, ha="right")  # Rotate x-axis labels for better readability

# Show the plot
plt.tight_layout()  # Adjust layout to prevent clipping of labels
plt.show()

# Severity analysis

In [0]:
rdd = df.rdd
mapped_rdd = rdd.map(lambda x: (x.Severity, 1))
# Use combineByKey for optimized aggregation

combined_rdd = mapped_rdd.combineByKey(
    lambda value: value,  # CreateCombiner: Initialize the count for a key
    lambda acc, value: acc + value,  # MergeValue: Add one to the existing count
    lambda acc1, acc2: acc1 + acc2  # MergeCombiners: Combine counts from different partitions
)
result_df = combined_rdd.toDF(["Severity", "Cases"]).toPandas()



In [0]:
result_df

In [0]:
plt.figure(figsize=(8, 8))  # Set figure size
plt.pie(
    result_df["Cases"],
    labels=result_df["Severity"],
    autopct="%1.1f%%",
    startangle=90,
    colors=["#ff9999", "#66b3ff", "#99ff99", "#ffcc99"]
)
plt.title("Distribution of Accident Severity")
plt.axis("equal")  # Equal aspect ratio ensures the pie is drawn as a circle
plt.show()  # Display the pie chart

# Accident Duration Analysis


In [0]:
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql.functions import unix_timestamp, col
from pyspark.sql.functions import unix_timestamp, col, count, round, sum as _sum

# Step 1: Calculate the duration in seconds
df_with_duration = df.\
withColumn("Duration_Seconds", unix_timestamp("End_Time") - unix_timestamp("Start_Time"))\
.filter(col("Duration_Seconds") > 0) \
.withColumn("Duration_Minutes", round(col("Duration_Seconds") / 60))

duration_counts_df = df_with_duration.groupBy("Duration_Minutes").agg(count("*").alias("Count"))

total_counts = duration_counts_df.agg(_sum("Count").alias("Total")).collect()[0]["Total"]

# Step 6: Add a "Percentage" column
duration_counts_df = duration_counts_df.withColumn("Percentage", (col("Count") / total_counts) * 100)


In [0]:
top_10_durations_df = duration_counts_df.orderBy(col("Count").desc()).limit(10).toPandas()

In [0]:
# plot the
plt.figure(figsize=(10, 6))
sns.barplot(x="Duration_Minutes", y="Percentage", data=top_10_durations_df, palette="viridis", edgecolor="black")

# Add titles and labels with custom styling
plt.title("Top 10 Frequent Accident Durations in Minutes", fontsize=16, fontweight="bold")
plt.xlabel("Duration (Minutes)", fontsize=14)
plt.ylabel("Percentage (%)", fontsize=14)

# Annotate each bar with its value
for index, row in top_10_durations_df.iterrows():
    x_pos = index
    y_pos = row["Percentage"]
    plt.text(x=x_pos, y=y_pos + 0.5, s=f"{row['Percentage']:.2f}%",
             ha="center", fontsize=12, fontweight="bold")

# Show the plot
plt.tight_layout()
plt.show()

# Year Analysis

In [0]:
from pyspark.sql.functions import year

df_by_year = df.groupBy(year("Start_Time")).count().toPandas()
df_by_year = df_by_year.sort_values(by="year(Start_Time)").reset_index(drop=True)

In [0]:
df_by_year

# Group by year and visualize by coordinate


In [0]:
from IPython.display import display

df_sampled = (
    df.selectExpr("year(Start_Time) as Year", "Start_Lat", "Start_Lng")
    .sample(withReplacement=False, fraction=600000 / df.count())  # Sample approximately 50,000 rows
    .toPandas()
    .dropna()  # Remove rows with missing data
)

# Step 2: Filter data for each year and create heatmaps
years = df_sampled["Year"].unique()  # Get the unique years
heatmaps = {}  # Dictionary to store heatmaps for each year

for year in sorted(years):
    # Filter data for the specific year
    df_filtered = df_sampled[df_sampled["Year"] == year]

    # Skip years with no data
    if df_filtered.empty:
        continue

    # Create a base map centered on the dataset
    center_lat = df_filtered["Start_Lat"].mean()
    center_lng = df_filtered["Start_Lng"].mean()
    m = folium.Map(location=[center_lat, center_lng], zoom_start=5)

    # Prepare heatmap data for the year
    heat_data = df_filtered[["Start_Lat", "Start_Lng"]].values.tolist()

    # Add heatmap layer
    HeatMap(heat_data, radius=10, blur=15, max_zoom=1).add_to(m)

    # Save the map to the dictionary for later access
    heatmaps[year] = m

    # Save the map as an HTML file for viewing
    # m.save(f"accidents_heatmap_{year}.html")

    # Display the map directly in the notebook
    display(m)

# Year Analysis based on Severity

In [0]:
from pyspark.sql.functions import year

# Select the required columns
df_selected = df.select("Severity", "Start_Time")

# Extract year from Start_Time and group by Severity and year
df_grouped = df_selected.groupBy("Severity", year("Start_Time").alias("Year")).count()

# Convert to Pandas for inspection (optional)
df_grouped_pandas = df_grouped.toPandas()
df_grouped_pandas

In [0]:
import pandas as pd
import matplotlib.pyplot as plt

# Assuming df_grouped_pandas is the grouped data in Pandas DataFrame format
# Pivot the data to make years the index, severity classes the columns, and counts the values
pivot_data = df_grouped_pandas.pivot(
    index="Year", columns="Severity", values="count"
).fillna(0)

# Sort the years for better visualization
pivot_data = pivot_data.sort_index()

# Calculate the total number of accidents per year
pivot_data["Total"] = pivot_data.sum(axis=1)

# Plot the stacked bar chart
plt.figure(figsize=(12, 8))
severity_classes = pivot_data.columns[:-1]  # Exclude the "Total" column
bottom = None  # Used for stacking bars

# Plot each severity class as a segment of the stacked bar
for severity in severity_classes:
    if bottom is None:
        bar = plt.bar(
            pivot_data.index, pivot_data[severity], label=f"Severity {severity}"
        )
        bottom = pivot_data[severity]
    else:
        bar = plt.bar(
            pivot_data.index,
            pivot_data[severity],
            bottom=bottom,
            label=f"Severity {severity}",
        )
        bottom += pivot_data[severity]

# Plot the line chart on top of the bar chart
plt.plot(
    pivot_data.index,
    pivot_data["Total"],
    color="red",
    marker="o",
    label="Total Accidents",
    linewidth=2,
)

# Add labels, title, and legend
plt.title("Number of Accidents by Year and Severity", fontsize=16)
plt.xlabel("Year", fontsize=14)
plt.ylabel("Number of Accidents", fontsize=14)
plt.xticks(pivot_data.index, rotation=45)
plt.legend(title="Severity", fontsize=12)
plt.grid(axis="y", linestyle="--", alpha=0.7)

# Show the plot
plt.tight_layout()
plt.show()

# Month Analysis

In [0]:
from pyspark.sql.functions import month

df_by_month = df.groupBy(month("Start_Time")).count().toPandas()
df_by_month = df_by_month.sort_values(by="month(Start_Time)").reset_index(drop=True)

In [0]:
# show a bar chart of the number of accidents by month
plt.figure(figsize=(10, 6))
plt.bar(df_by_month["month(Start_Time)"], df_by_month["count"])
plt.title("Number of Accidents by Month")
plt.xlabel("Month")
plt.ylabel("Number of Accidents")
plt.grid(True)
plt.show()

## Group by month + Severity

In [0]:
# Extract relevant columns and convert to RDD
rdd = df.select("Start_Time", "Severity").rdd.map(
    lambda row: (row["Start_Time"].month, row["Severity"])
)  # Extract month and severity

# Map to key-value pairs and count occurrences
rdd_grouped = rdd.map(
    lambda x: ((x[0], x[1]), 1)
).reduceByKey(  # Key: (Month, Severity), Value: 1
    lambda a, b: a + b
)  # Aggregate counts

# Convert to a more structured format for plotting
data = rdd_grouped.map(lambda x: (x[0][0], x[0][1], x[1])).collect()
df_grouped = pd.DataFrame(data, columns=["Month", "Severity", "Count"])

df_grouped = df_grouped.sort_values(by="Month").reset_index(drop=True)

In [0]:
import matplotlib.pyplot as plt
import pandas as pd

# Assuming df_grouped is already created and sorted

# Pivot the data to prepare for plotting
# Rows: Months (x-axis), Columns: Severity levels, Values: Count
df_pivot = df_grouped.pivot(index="Month", columns="Severity", values="Count").fillna(0)

# Plot the stacked bar chart
plt.figure(figsize=(12, 8))
bottom = None

# Plot each severity level as a segment of the stacked bar
for severity in df_pivot.columns:
    if bottom is None:
        plt.bar(
            df_pivot.index, df_pivot[severity], label=f"Severity {severity}"
        )  # First layer
        bottom = df_pivot[severity]
    else:
        plt.bar(
            df_pivot.index,
            df_pivot[severity],
            bottom=bottom,
            label=f"Severity {severity}",
        )  # Subsequent layers
        bottom += df_pivot[severity]

# Add labels, title, and legend
plt.title("Number of Accidents by Month and Severity", fontsize=16)
plt.xlabel("Month", fontsize=14)
plt.ylabel("Number of Accidents", fontsize=14)
plt.xticks(ticks=df_pivot.index, labels=df_pivot.index, rotation=45)
plt.legend(title="Severity", fontsize=12, loc="upper right")
plt.grid(axis="y", linestyle="--", alpha=0.7)

# Show the plot
plt.tight_layout()
plt.show()

# Day Analysis

In [0]:
from pyspark.sql.functions import date_format

# Extract the day of the week from Start_Time
df_with_day = df.withColumn("Day_of_Week", date_format("Start_Time", "EEEE"))

# Group by the day of the week and count the number of accidents
df_grouped_by_day = df_with_day.groupBy("Day_of_Week").count()

# Sort by day of the week in the correct order (Monday, Tuesday, ..., Sunday)
day_order = [
    "Monday",
    "Tuesday",
    "Wednesday",
    "Thursday",
    "Friday",
    "Saturday",
    "Sunday",
]
df_grouped_by_day = df_grouped_by_day.toPandas()
df_grouped_by_day["Day_of_Week"] = pd.Categorical(
    df_grouped_by_day["Day_of_Week"], categories=day_order, ordered=True
)
df_grouped_by_day = df_grouped_by_day.sort_values(by="Day_of_Week").reset_index(
    drop=True
)

# Display the results
print(df_grouped_by_day)

In [0]:
plt.figure(figsize=(10, 6))
plt.bar(df_grouped_by_day["Day_of_Week"], df_grouped_by_day["count"], color="skyblue")
plt.title("Number of Accidents by Day of the Week", fontsize=16)
plt.xlabel("Day of the Week", fontsize=14)
plt.ylabel("Number of Accidents", fontsize=14)
plt.xticks(rotation=45, fontsize=12)
plt.yticks(fontsize=12)
plt.grid(axis="y", linestyle="--", alpha=0.7)
plt.tight_layout()

# Show the plot
plt.show()

# Hour Analysis

creates an enhanced bar chart to visualize the number of accidents by hour of the day

In [0]:
from pyspark.sql.functions import hour
import matplotlib.pyplot as plt
import pandas as pd

# Convert the dataframe to an RDD and extract the hour from Start_Time
rdd = df.select("Start_Time").rdd.map(lambda row: row["Start_Time"].hour)

# Map each hour to a key-value pair and count occurrences using reduceByKey
rdd_grouped = rdd.map(lambda hour: (hour, 1)).reduceByKey(lambda a, b: a + b)

# Collect the results and convert to Pandas DataFrame for plotting
data = rdd_grouped.collect()  # Collect the results as a list of tuples
df_grouped_by_hour = pd.DataFrame(
    data, columns=["Hour", "Count"]
)  # Convert to Pandas DataFrame

# Sort the DataFrame by hour
df_grouped_by_hour = df_grouped_by_hour.sort_values(by="Hour").reset_index(drop=True)


In [0]:
import seaborn as sns
# Step 5: Plot the enhanced bar chart
plt.figure(figsize=(14, 8))
sns.barplot(
    x="Hour",
    y="Count",
    data=df_grouped_by_hour,
    palette="viridis",  # Use a visually appealing color palette
    edgecolor="black",
)

# Add titles and labels with custom styling
plt.title(
    "Number of Accidents by Hour of the Day", fontsize=20, fontweight="bold", pad=20
)
plt.xlabel("Hour of the Day (24-hour format)", fontsize=16, labelpad=10)
plt.ylabel("Number of Accidents", fontsize=16, labelpad=10)

# Format x-ticks and y-ticks
plt.xticks(ticks=range(0, 24), fontsize=12)
plt.yticks(fontsize=12)

# Add gridlines for better readability
plt.grid(axis="y", linestyle="--", color="gray", alpha=0.7)

# Annotate each bar with its value
for index, row in df_grouped_by_hour.iterrows():
    plt.text(
        x=row["Hour"],
        y=row["Count"]
        + max(df_grouped_by_hour["Count"]) * 0.01,  # Position slightly above the bar
        s=f"{int(row['Count'])}",  # Display the count value
        ha="center",
        va="bottom",
        fontsize=11,
        color="black",
        weight="bold",
    )

# Adjust layout for better fitting
plt.tight_layout()

# Show the plot
plt.show()

nalyzes accident severity distribution by hour intervals, counting occurrences for each severity level and visualizing the results as pie charts for each hour interval.

In [0]:
from pyspark.sql.functions import hour

# Extract the hour and severity from the dataframe and convert to RDD
rdd = df.select("Start_Time", "Severity").rdd.map(lambda row: (row["Start_Time"].hour, row["Severity"]))

# Map to key-value pairs ((hour_interval, severity), 1) and count occurrences
# Define hour intervals (e.g., 0-3, 4-7, 8-11, 12-15, 16-19, 20-23)
def get_hour_interval(hour):
    if 0 <= hour <= 3:
        return "0-3"
    elif 4 <= hour <= 7:
        return "4-7"
    elif 8 <= hour <= 11:
        return "8-11"
    elif 12 <= hour <= 15:
        return "12-15"
    elif 16 <= hour <= 19:
        return "16-19"
    else:
        return "20-23"

rdd_grouped = rdd.map(lambda x: ((get_hour_interval(x[0]), x[1]), 1)).reduceByKey(lambda a, b: a + b)

# Convert to a more structured format for plotting
data = rdd_grouped.map(lambda x: (x[0][0], x[0][1], x[1])).collect()
df_grouped = pd.DataFrame(data, columns=["Hour_Interval", "Severity", "Count"])

# Aggregate counts by severity
severity_counts = df_grouped.groupby("Severity")["Count"].sum().reset_index()

# Plot pie charts for each hour interval
hour_intervals = df_grouped["Hour_Interval"].unique()
severity_colors = {1: "#ff9999", 2: "#66b3ff", 3: "#99ff99", 4: "#ffcc99"}

fig, axes = plt.subplots(2, 3, figsize=(18, 12))
axes = axes.flatten()

for i, interval in enumerate(sorted(hour_intervals)):
    interval_data = df_grouped[df_grouped["Hour_Interval"] == interval]
    severity_counts = interval_data.groupby("Severity")["Count"].sum().reset_index()

    colors = [severity_colors[sev] for sev in severity_counts["Severity"]]

    axes[i].pie(
        severity_counts["Count"],
        labels=[f"Severity {int(sev)}" for sev in severity_counts["Severity"]],
        autopct="%1.1f%%",
        startangle=90,
        colors=colors
    )
    axes[i].set_title(f"Hour Interval {interval}")

plt.suptitle("Distribution of Accident Severity by Hour Interval", fontsize=20, fontweight="bold")
plt.tight_layout(rect=[0, 0, 1, 0.95])
plt.show()


# Road Condition Analysis

analyzes various road conditions related to accidents, visualizing the top 8 road conditions

In [0]:
from pyspark.sql.functions import col
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Step 1: Extract relevant columns
road_conditions = [
    "Amenity", "Bump", "Crossing", "Give_Way", "Junction", "No_Exit",
    "Railway", "Roundabout", "Station", "Stop", "Traffic_Calming",
    "Traffic_Signal", "Turning_Loop"
]

# Step 2: Map each condition to a key-value pair and count occurrences using reduceByKey
condition_counts_rdd = df.select(road_conditions).rdd.flatMap(
    lambda row: [(condition, 1) for condition in road_conditions if row[condition]]
).reduceByKey(lambda a, b: a + b)

# Step 3: Collect the results and convert to Pandas DataFrame
condition_counts = condition_counts_rdd.collect()
condition_df = pd.DataFrame(condition_counts, columns=["Condition", "Count"])

# Step 4: Calculate the total number of accidents for percentage calculation
total_accidents = df.count()
condition_df["Percentage"] = (condition_df["Count"] / total_accidents) * 100

# Step 5: Sort by percentage and select the top 8 conditions
top_conditions_df = condition_df.sort_values(by="Percentage", ascending=False).head(8)

# Step 6: Plot the results
plt.figure(figsize=(12, 8))
sns.barplot(
    x="Percentage", y="Condition", data=top_conditions_df,
    palette=sns.color_palette("rainbow", n_colors=8)
)
plt.xlabel("Percentage of Accidents")
plt.title("Top 8 Road Conditions by Percentage of Accidents")
plt.grid(axis="x", linestyle="--", alpha=0.7)
plt.tight_layout()

# Show the plot
plt.show()

analyzes road conditions related to accidents, counting occurrences by severity, and visualizes the severity distribution for the top 5 road conditions

In [0]:
from pyspark.sql.functions import col

# Step 1: Extract relevant columns and convert to RDD
road_conditions = [
    "Amenity", "Bump", "Crossing", "Give_Way", "Junction", "No_Exit",
    "Railway", "Roundabout", "Station", "Stop", "Traffic_Calming",
    "Traffic_Signal", "Turning_Loop"
]

# Create a list of tuples (condition, severity) for each row
rdd = df.select(road_conditions + ["Severity"]).rdd.flatMap(
    lambda row: [(condition, row["Severity"]) for condition in road_conditions if row[condition]]
)

# Step 2: Map to key-value pairs and count occurrences
rdd_grouped = rdd.map(lambda x: ((x[0], x[1]), 1)).reduceByKey(lambda a, b: a + b)

# Step 3: Convert to a more structured format for plotting
data = rdd_grouped.map(lambda x: (x[0][0], x[0][1], x[1])).collect()
df_grouped = pd.DataFrame(data, columns=["Condition", "Severity", "Count"])

# Step 4: Filter for the top 5 road conditions with the most accidents
top_5_conditions = df_grouped.groupby("Condition")["Count"].sum().nlargest(5).index
df_top_5 = df_grouped[df_grouped["Condition"].isin(top_5_conditions)]

# Define a color map for severities
severity_colors = {
    1: "#ff9999",  # Red
    2: "#66b3ff",  # Blue
    3: "#99ff99",  # Green
    4: "#ffcc99"   # Orange
}

# Step 5: Plot pie charts for each of the top 5 road conditions
fig, axes = plt.subplots(1, 5, figsize=(20, 5))

for i, condition in enumerate(top_5_conditions):
    condition_data = df_top_5[df_top_5["Condition"] == condition]
    axes[i].pie(
        condition_data["Count"],
        labels=[f"Severity {sev}" for sev in condition_data["Severity"]],
        autopct="%1.1f%%",
        startangle=140,
        colors=[severity_colors[sev] for sev in condition_data["Severity"]]
    )
    axes[i].set_title(condition)

plt.suptitle("Accident Severity Distribution for Top 5 Road Conditions", fontsize=16)
plt.tight_layout()
plt.show()


# Weather Analysis

analyzes and visualizes the distribution of accidents by weather condition, displaying the top 10 conditions

In [0]:
from pyspark.sql.functions import col
import pandas as pd
import matplotlib.pyplot as plt

# Extract the Weather_Condition column and filter out None values
weather_conditions_df = df.filter(col("Weather_Condition").isNotNull())

# Group by each weather condition and count the number of accidents
weather_condition_counts = weather_conditions_df.groupBy("Weather_Condition").count().orderBy("count", ascending=False)

# Limit to top 10 weather conditions
top_10_weather_conditions = weather_condition_counts.limit(10)

# Convert to Pandas DataFrame
weather_condition_df = top_10_weather_conditions.toPandas()

# Calculate the percentage of accidents for each weather condition
total_accidents = weather_conditions_df.count()
weather_condition_df["Percentage"] = (weather_condition_df["count"] / total_accidents) * 100

# Plot pie charts to visualize the distribution
plt.figure(figsize=(14, 10))
plt.pie(weather_condition_df["count"], labels=weather_condition_df["Weather_Condition"], autopct='%1.1f%%', startangle=140, colors=plt.cm.Paired.colors)
plt.title("Distribution of Accidents by Weather Condition (Top 10)")
plt.axis('equal')  # Equal aspect ratio ensures that pie is drawn as a circle.
plt.show()

# Provide additional statistics
print("Total number of accidents:", total_accidents)
print("\nAccidents by Weather Condition (Top 10):")
print(weather_condition_df)

# Additional bar chart for better visualization
plt.figure(figsize=(12, 8))
plt.barh(weather_condition_df["Weather_Condition"], weather_condition_df["count"], color="skyblue")
plt.xlabel("Number of Accidents")
plt.title("Number of Accidents by Weather Condition (Top 10)")
plt.gca().invert_yaxis()  # Invert y-axis to have the condition with the most accidents on top
plt.grid(axis="x", linestyle="--", alpha=0.7)
plt.tight_layout()

# Show the plot
plt.show()

analyzes weather data by calculating and visualizing the top 5 percentage intervals for Temperature, Humidity, Pressure, Wind_Chill, Wind_Speed and Visibility

In [0]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from pyspark.sql import functions as F

# Define the columns to analyze
columns = ["Temperature(F)", "Humidity(%)", "Pressure(in)", "Wind_Chill(F)", "Wind_Speed(mph)", "Visibility(mi)"]

# Initialize a dictionary to store the results
intervals_dict = {}

# Function to find top 5 intervals with highest percentage
def find_top_intervals(df, column):
    try:
        # Round to nearest 10 and group by intervals
        df_intervals = df.select(column).filter(F.col(column).isNotNull()).withColumn(
            "Interval", (F.round(F.col(column) / 10) * 10)
        ).groupBy("Interval").agg(
            F.count("*").alias("Count")
        )

        # Calculate total count
        total_count = df_intervals.agg(F.sum("Count")).collect()[0][0]

        # Calculate percentage
        if total_count > 0:
            df_intervals = df_intervals.withColumn("Percentage", (F.col("Count") / total_count) * 100)
        else:
            df_intervals = df_intervals.withColumn("Percentage", F.lit(0))

        # Get top 5 intervals
        top_intervals = df_intervals.orderBy(F.col("Percentage").desc()).limit(5).toPandas()
        return top_intervals
    except Exception as e:
        print(f"Error processing column {column}: {e}")
        return pd.DataFrame(columns=["Interval", "Count", "Percentage"])

# Analyze each column and store the results
for column in columns:
    intervals_dict[column] = find_top_intervals(df, column)

# Plot the results
for column, data in intervals_dict.items():
    if not data.empty:
        plt.figure(figsize=(10, 6))
        sns.barplot(x="Interval", y="Percentage", data=data, palette="viridis", edgecolor="black")
        plt.title(f"Top 5 Intervals with Highest Percentage for {column}", fontsize=16)
        plt.xlabel("Interval", fontsize=14)
        plt.ylabel("Percentage (%)", fontsize=14)
        plt.grid(axis="y", linestyle="--", alpha=0.7)
        plt.tight_layout()
        plt.show()

processes accident data by weather conditions and severity, counting occurrences and visualizing the top 10 conditions as pie charts.

In [0]:
from pyspark.sql.functions import col

# Extract relevant columns and convert to RDD
weather_conditions = df.select("Weather_Condition", "Severity").filter(col("Weather_Condition").isNotNull()).rdd

# Create a list of tuples (weather_condition, severity) for each row
rdd = weather_conditions.map(lambda row: (row["Weather_Condition"], row["Severity"]))

# Map to key-value pairs and count occurrences
rdd_grouped = rdd.map(lambda x: ((x[0], x[1]), 1)).reduceByKey(lambda a, b: a + b)

# Convert to a more structured format for plotting
data = rdd_grouped.map(lambda x: (x[0][0], x[0][1], x[1])).collect()
df_grouped = pd.DataFrame(data, columns=["Weather_Condition", "Severity", "Count"])

# Keep only the top 10 weather conditions by total count
top_10_weather_conditions = df_grouped.groupby("Weather_Condition")["Count"].sum().nlargest(10).index
df_grouped = df_grouped[df_grouped["Weather_Condition"].isin(top_10_weather_conditions)]

# Define a consistent color mapping for severities
severity_colors = {
    1: "#ff9999",  # Red
    2: "#66b3ff",  # Blue
    3: "#99ff99",  # Green
    4: "#ffcc99"   # Orange
}

# Plot pie charts for each weather condition
fig, axes = plt.subplots(5, 2, figsize=(20, 25))
fig.suptitle("Accidents by Weather Condition and Severity (Top 10)", fontsize=20)

# Plot each weather condition as a separate pie chart
for i, weather_condition in enumerate(top_10_weather_conditions):
    ax = axes[i // 2, i % 2]
    data = df_grouped[df_grouped["Weather_Condition"] == weather_condition]
    labels = [f"Severity {severity}" for severity in data["Severity"]]
    colors = [severity_colors[severity] for severity in data["Severity"]]
    ax.pie(
        data["Count"],
        labels=labels,
        autopct="%1.1f%%",
        startangle=140,
        colors=colors,
    )
    ax.set_title(weather_condition, fontsize=16)

plt.tight_layout(rect=[0, 0, 1, 0.96])
plt.show()
