# US Accidents Dataset Preprocessing

In this notebook, we demonstrate our preprocessing workflow for the US Accidents dataset using PySpark. We efficiently clean, transform, and prepare the data for analysis. By the end of this notebook, we will have generated a new, cleaned CSV file that is ready for analysis and modeling in another notebook.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StringType
from pyspark.sql.functions import count, when, isnull, udf, col, concat_ws, trim, upper
from pyspark.ml.feature import QuantileDiscretizer, VectorAssembler
from pyspark.ml.stat import Correlation

## Data Loading

In this step, we load the US Accidents dataset into a PySpark DataFrame. Initially, we have loaded **7,728,394 rows** and **46 columns**.

In [2]:
# Initialize Spark session with increased memory (Kaggle Max is 30GiB)
spark = SparkSession.builder.appName("project") \
    .master("local[*]") \
    .config("spark.driver.memory", "28g") \
    .config("spark.executor.memory", "28g") \
    .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

data_path = "/kaggle/input/us-accidents/US_Accidents_March23.csv"

# Load the CSV data into a PySpark DataFrame
df = spark.read.csv(data_path, header=True, inferSchema=True).cache()

print(f"Loaded {df.count()} rows and {len(df.columns)} columns.")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/27 23:01:00 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable

Loaded 7728394 rows and 46 columns.


                                                                                                    

## Detecting Missing Values

We analyze the dataset for missing (NULL) values by counting and calculating the percentage of NULLs in each column. We found out that **End_Lat**, **End_Lng**, **Precipitation(in)**, and **Wind_Chill(F)** have a high proportion of missing values. Since these columns contain a significant amount of missing data, we will discard them in the next steps.

In [3]:
# Count total rows in the DataFrame
row_count = df.count()

# Calculate NULL count for each column
null_count_by_column = df.select([
    count(when(isnull(column_name), column_name)).alias(column_name) for column_name in df.columns
]).collect()[0].asDict()

# Calculate NULL percentage for each column
null_percentage_by_column = {
    column_name: (null_count / row_count * 100) for column_name, null_count in null_count_by_column.items()
}

# Sort columns by NULL count in descending order
null_stats_sorted = sorted(null_count_by_column.items(), key=lambda item: item[1], reverse=True)

print("=== NULL Count and Percentage by Column ===")
for column_name, null_count in null_stats_sorted:
    null_percentage = null_percentage_by_column[column_name]
    print(f"{column_name}: {null_count} ({null_percentage:.2f}%)")



=== NULL Count and Percentage by Column ===
End_Lat: 3402762 (44.03%)
End_Lng: 3402762 (44.03%)
Precipitation(in): 2203586 (28.51%)
Wind_Chill(F): 1999019 (25.87%)
Wind_Speed(mph): 571233 (7.39%)
Visibility(mi): 177098 (2.29%)
Wind_Direction: 175206 (2.27%)
Humidity(%): 174144 (2.25%)
Weather_Condition: 173459 (2.24%)
Temperature(F): 163853 (2.12%)
Pressure(in): 140679 (1.82%)
Weather_Timestamp: 120228 (1.56%)
Sunrise_Sunset: 23246 (0.30%)
Civil_Twilight: 23246 (0.30%)
Nautical_Twilight: 23246 (0.30%)
Astronomical_Twilight: 23246 (0.30%)
Airport_Code: 22635 (0.29%)
Street: 10869 (0.14%)
Timezone: 7808 (0.10%)
Zipcode: 1915 (0.02%)
City: 253 (0.00%)
Description: 5 (0.00%)
ID: 0 (0.00%)
Source: 0 (0.00%)
Severity: 0 (0.00%)
Start_Time: 0 (0.00%)
End_Time: 0 (0.00%)
Start_Lat: 0 (0.00%)
Start_Lng: 0 (0.00%)
Distance(mi): 0 (0.00%)
County: 0 (0.00%)
State: 0 (0.00%)
Country: 0 (0.00%)
Amenity: 0 (0.00%)
Bump: 0 (0.00%)
Crossing: 0 (0.00%)
Give_Way: 0 (0.00%)
Junction: 0 (0.00%)
No_Exit: 0 

                                                                                                    

## Handling Missing Data

To ensure data quality, we first dropped columns with more than 25% missing values—specifically, **End_Lat**, **End_Lng**, **Wind_Chill(F)**, and **Precipitation(in)**. Next, we dropped all remaining rows containing any NULLs, as the percentage of missing values in the remaining columns was low. This left us with a fully clean dataset of **7,051,556 rows** and **42 columns**.

In [4]:
# Drop columns with more than 25% NULLs
columns_to_drop = [col for col, pct in null_percentage_by_column.items() if pct > 25]
df = df.drop(*columns_to_drop)
print(f"Dropped columns with >25% NULLs: {columns_to_drop}")

# Recompute NULL counts and percentages after dropping columns
row_count = df.count()
null_count_by_column = df.select([
    count(when(isnull(column_name), column_name)).alias(column_name) for column_name in df.columns
]).collect()[0].asDict()
null_percentage_by_column = {
    column_name: (null_count / row_count * 100) for column_name, null_count in null_count_by_column.items()
}
null_stats_sorted = sorted(null_count_by_column.items(), key=lambda item: item[1], reverse=True)

print("=== NULL Count and Percentage by Column (After Drop) ===")
for column_name, null_count in null_stats_sorted:
    null_percentage = null_percentage_by_column[column_name]
    print(f"{column_name}: {null_count} ({null_percentage:.2f}%)")

# Drop all rows with any remaining NULLs (row-wise drop)
# This step is safe as the remaining NULL percentages are very low.
df = df.na.drop()
print("\nDropped all rows with any NULLs row-wise.")

# Confirm dataset is fully clean
row_count = df.count()
col_count = len(df.columns)
null_count_by_column = df.select([
    count(when(isnull(column_name), column_name)).alias(column_name) for column_name in df.columns
]).collect()[0].asDict()

print(f"\nAfter all drops, row count: {row_count}, column count: {col_count}")
print("\n=== NULL Count by Column (Should all be 0) ===")
for column_name, null_count in null_count_by_column.items():
    print(f"{column_name}: {null_count}")

print("\nAll NULLs have been dropped.")

Dropped columns with >25% NULLs: ['End_Lat', 'End_Lng', 'Wind_Chill(F)', 'Precipitation(in)']


                                                                                                    

=== NULL Count and Percentage by Column (After Drop) ===
Wind_Speed(mph): 571233 (7.39%)
Visibility(mi): 177098 (2.29%)
Wind_Direction: 175206 (2.27%)
Humidity(%): 174144 (2.25%)
Weather_Condition: 173459 (2.24%)
Temperature(F): 163853 (2.12%)
Pressure(in): 140679 (1.82%)
Weather_Timestamp: 120228 (1.56%)
Sunrise_Sunset: 23246 (0.30%)
Civil_Twilight: 23246 (0.30%)
Nautical_Twilight: 23246 (0.30%)
Astronomical_Twilight: 23246 (0.30%)
Airport_Code: 22635 (0.29%)
Street: 10869 (0.14%)
Timezone: 7808 (0.10%)
Zipcode: 1915 (0.02%)
City: 253 (0.00%)
Description: 5 (0.00%)
ID: 0 (0.00%)
Source: 0 (0.00%)
Severity: 0 (0.00%)
Start_Time: 0 (0.00%)
End_Time: 0 (0.00%)
Start_Lat: 0 (0.00%)
Start_Lng: 0 (0.00%)
Distance(mi): 0 (0.00%)
County: 0 (0.00%)
State: 0 (0.00%)
Country: 0 (0.00%)
Amenity: 0 (0.00%)
Bump: 0 (0.00%)
Crossing: 0 (0.00%)
Give_Way: 0 (0.00%)
Junction: 0 (0.00%)
No_Exit: 0 (0.00%)
Railway: 0 (0.00%)
Roundabout: 0 (0.00%)
Station: 0 (0.00%)
Stop: 0 (0.00%)
Traffic_Calming: 0 (0.0




After all drops, row count: 7051556, column count: 42

=== NULL Count by Column (Should all be 0) ===
ID: 0
Source: 0
Severity: 0
Start_Time: 0
End_Time: 0
Start_Lat: 0
Start_Lng: 0
Distance(mi): 0
Description: 0
Street: 0
City: 0
County: 0
State: 0
Zipcode: 0
Country: 0
Timezone: 0
Airport_Code: 0
Weather_Timestamp: 0
Temperature(F): 0
Humidity(%): 0
Pressure(in): 0
Visibility(mi): 0
Wind_Direction: 0
Wind_Speed(mph): 0
Weather_Condition: 0
Amenity: 0
Bump: 0
Crossing: 0
Give_Way: 0
Junction: 0
No_Exit: 0
Railway: 0
Roundabout: 0
Station: 0
Stop: 0
Traffic_Calming: 0
Traffic_Signal: 0
Turning_Loop: 0
Sunrise_Sunset: 0
Civil_Twilight: 0
Nautical_Twilight: 0
Astronomical_Twilight: 0

All NULLs have been dropped.


                                                                                                    

## Dropping Irrelevant Columns

Before dropping, we inspected the unique values in the **Source**, **Country**, and **Turning_Loop** columns. We then removed the following columns as they do not provide analytical value:
- **ID**: Unique identifier, not useful for analysis.
- **Country**: Contains only one value ('US').
- **Source**: Contains only a few unique values (Source1, Source2, Source3), not informative.
- **Turning_Loop**: Contains only one value (False).

After this step, our dataset contains 38 relevant columns.

In [5]:
# Show unique values for 'Source', 'Country', and 'Turning_Loop' before dropping
if "Source" in df.columns:
    print("Unique values in 'Source':")
    df.select("Source").distinct().show(truncate=False)

if "Country" in df.columns:
    print("Unique values in 'Country':")
    df.select("Country").distinct().show(truncate=False)

if "Turning_Loop" in df.columns:
    print("Unique values in 'Turning_Loop':")
    df.select("Turning_Loop").distinct().show(truncate=False)

# Identify columns to drop
cols_to_drop = []
if "ID" in df.columns:
    cols_to_drop.append("ID")
if "Country" in df.columns:
    cols_to_drop.append("Country")
if "Source" in df.columns:
    cols_to_drop.append("Source")
if "Turning_Loop" in df.columns:
    cols_to_drop.append("Turning_Loop")

# Drop the identified columns
df = df.drop(*cols_to_drop)
print(f"Dropped columns: {cols_to_drop}")

# Document reasoning
print("""
Reasoning:
- 'ID' is a unique identifier for each row and does not add analytical value.
- 'Country' has only one value ('US'), so it does not help distinguish data.
- 'Source' has only 3 values and does not add analytical value.
- 'Turning_Loop' has only one value and does not add analytical value.
""")

print(f"\nAfter all drops, {len(df.columns)} columns remain: {', '.join(df.columns)}")

Unique values in 'Source':


                                                                                                    

+-------+
|Source |
+-------+
|Source3|
|Source2|
|Source1|
+-------+

Unique values in 'Country':


                                                                                                    

+-------+
|Country|
+-------+
|US     |
+-------+

Unique values in 'Turning_Loop':




+------------+
|Turning_Loop|
+------------+
|false       |
+------------+

Dropped columns: ['ID', 'Country', 'Source', 'Turning_Loop']

Reasoning:
- 'ID' is a unique identifier for each row and does not add analytical value.
- 'Country' has only one value ('US'), so it does not help distinguish data.
- 'Source' has only 3 values and does not add analytical value.
- 'Turning_Loop' has only one value and does not add analytical value.


After all drops, 38 columns remain: Severity, Start_Time, End_Time, Start_Lat, Start_Lng, Distance(mi), Description, Street, City, County, State, Zipcode, Timezone, Airport_Code, Weather_Timestamp, Temperature(F), Humidity(%), Pressure(in), Visibility(mi), Wind_Direction, Wind_Speed(mph), Weather_Condition, Amenity, Bump, Crossing, Give_Way, Junction, No_Exit, Railway, Roundabout, Station, Stop, Traffic_Calming, Traffic_Signal, Sunrise_Sunset, Civil_Twilight, Nautical_Twilight, Astronomical_Twilight


                                                                                                    

## Checking for Redundant or Highly Correlated Columns

We now inspect the dataset for redundant or highly correlated columns. We visually examine groups of columns with similar meanings and check for high correlation among continuous variables to see if we have to remove any columns in the next cell.

**Findings:**
- **Timestamps:** Both `Start_Time` and `End_Time` are present and provide complementary information about accident duration.
- **Location:** Columns like `Street`, `City`, `County`, `State`, and `Zipcode` offer different levels of location granularity, but each may be useful for different types of analysis.
- **Twilight Columns:** `Sunrise_Sunset`, `Civil_Twilight`, `Nautical_Twilight`, and `Astronomical_Twilight` all have only two unique values (`Day`/`Night`), suggesting potential redundancy.
- **Continuous Variables:** The correlation matrix shows no extremely high correlations (absolute value close to 1) between the continuous variables, so we do not need to drop any due to multicollinearity.

Based on these results, we will consider dropping redundant twilight columns in the next step.

In [6]:
print("=== Checking for Redundant or Highly Correlated Columns ===")

# Inspect columns with similar meanings
potential_redundant_groups = [
    ["Start_Time", "End_Time"],  # Both are timestamps
    ["Street", "City", "County", "State", "Zipcode"],  # Location granularity
    ["Sunrise_Sunset", "Civil_Twilight", "Nautical_Twilight", "Astronomical_Twilight"],  # Twilight/time-of-day
]

for group in potential_redundant_groups:
    present = [c for c in group if c in df.columns]
    if len(present) > 1:
        print(f"\nSample values for columns: {present}")
        df.select(*present).show(5, truncate=False)

# Check correlation between continuous variables
continuous_cols = [
    "Distance(mi)", "Temperature(F)", "Humidity(%)", "Pressure(in)", "Visibility(mi)", "Wind_Speed(mph)"
]
present_continuous = [c for c in continuous_cols if c in df.columns]

if len(present_continuous) > 1:
    assembler = VectorAssembler(inputCols=present_continuous, outputCol="features_corr")
    df_corr = assembler.transform(df.select(*present_continuous).dropna())
    corr_matrix = Correlation.corr(df_corr, "features_corr").head()[0].toArray()
    print("\nCorrelation matrix for continuous variables:")
    print(f"Columns: {present_continuous}")
    print(corr_matrix)

# Show unique values for twilight columns to check for redundancy
twilight_cols = ["Sunrise_Sunset", "Civil_Twilight", "Nautical_Twilight", "Astronomical_Twilight"]
for col_name in twilight_cols:
    if col_name in df.columns:
        print(f"\nUnique values in '{col_name}':")
        df.select(col_name).distinct().show(truncate=False)

=== Checking for Redundant or Highly Correlated Columns ===

Sample values for columns: ['Start_Time', 'End_Time']
+-------------------+-------------------+
|Start_Time         |End_Time           |
+-------------------+-------------------+
|2016-02-08 06:49:27|2016-02-08 07:19:27|
|2016-02-08 07:23:34|2016-02-08 07:53:34|
|2016-02-08 07:39:07|2016-02-08 08:09:07|
|2016-02-08 07:44:26|2016-02-08 08:14:26|
|2016-02-08 07:59:35|2016-02-08 08:29:35|
+-------------------+-------------------+
only showing top 5 rows


Sample values for columns: ['Street', 'City', 'County', 'State', 'Zipcode']
+-------------------------+------------+----------+-----+----------+
|Street                   |City        |County    |State|Zipcode   |
+-------------------------+------------+----------+-----+----------+
|State Route 32           |Williamsburg|Clermont  |OH   |45176     |
|I-75 S                   |Dayton      |Montgomery|OH   |45417     |
|Miamisburg Centerville Rd|Dayton      |Montgomery|OH   |454

                                                                                                    


Correlation matrix for continuous variables:
Columns: ['Distance(mi)', 'Temperature(F)', 'Humidity(%)', 'Pressure(in)', 'Visibility(mi)', 'Wind_Speed(mph)']
[[ 1.         -0.05598764  0.01140223 -0.09040334 -0.0395247   0.00874711]
 [-0.05598764  1.         -0.33053564  0.12005678  0.22400662  0.03458547]
 [ 0.01140223 -0.33053564  1.          0.10986754 -0.38682745 -0.17264725]
 [-0.09040334  0.12005678  0.10986754  1.          0.04117808 -0.0222837 ]
 [-0.0395247   0.22400662 -0.38682745  0.04117808  1.          0.01454486]
 [ 0.00874711  0.03458547 -0.17264725 -0.0222837   0.01454486  1.        ]]

Unique values in 'Sunrise_Sunset':


                                                                                                    

+--------------+
|Sunrise_Sunset|
+--------------+
|Night         |
|Day           |
+--------------+


Unique values in 'Civil_Twilight':


                                                                                                    

+--------------+
|Civil_Twilight|
+--------------+
|Night         |
|Day           |
+--------------+


Unique values in 'Nautical_Twilight':


                                                                                                    

+-----------------+
|Nautical_Twilight|
+-----------------+
|Night            |
|Day              |
+-----------------+


Unique values in 'Astronomical_Twilight':




+---------------------+
|Astronomical_Twilight|
+---------------------+
|Night                |
|Day                  |
+---------------------+



                                                                                                    

## Dropping Redundant Twilight Columns

Based on our previous inspection, we drop the redundant twilight columns—**Civil_Twilight**, **Nautical_Twilight**, and **Astronomical_Twilight**—and keep only **Sunrise_Sunset** for simplicity. We retain **End_Time** for accident duration analysis and **Street** for potential rural/urban analysis in future notebooks.

In [7]:
cols_to_drop = []

# Drop redundant twilight columns (keep only 'Sunrise_Sunset')
twilight_cols_to_drop = ["Civil_Twilight", "Nautical_Twilight", "Astronomical_Twilight"]
cols_to_drop += [col for col in twilight_cols_to_drop if col in df.columns]

# Keep 'End_Time' to allow for accident duration analysis
# Keep 'Street' to combine with State for Rural/Urban Analysis

df = df.drop(*cols_to_drop)
print(f"Dropped columns due to redundancy: {cols_to_drop}")

print("""
Reasoning:
- Kept 'Street' for potential rural/urban analysis.
- Kept 'End_Time' to allow for analysis of accident durations.
- Dropped twilight columns except 'Sunrise_Sunset' as they are redundant.
""")

print(f"\nAfter this step, {len(df.columns)} columns remain: {', '.join(df.columns)}")

Dropped columns due to redundancy: ['Civil_Twilight', 'Nautical_Twilight', 'Astronomical_Twilight']

Reasoning:
- Kept 'Street' for potential rural/urban analysis.
- Kept 'End_Time' to allow for analysis of accident durations.
- Dropped twilight columns except 'Sunrise_Sunset' as they are redundant.


After this step, 35 columns remain: Severity, Start_Time, End_Time, Start_Lat, Start_Lng, Distance(mi), Description, Street, City, County, State, Zipcode, Timezone, Airport_Code, Weather_Timestamp, Temperature(F), Humidity(%), Pressure(in), Visibility(mi), Wind_Direction, Wind_Speed(mph), Weather_Condition, Amenity, Bump, Crossing, Give_Way, Junction, No_Exit, Railway, Roundabout, Station, Stop, Traffic_Calming, Traffic_Signal, Sunrise_Sunset


## Outlier Detection and Removal

We detect outliers in the continuous variables by calculating the 2nd and 98th percentiles for each. We then remove any rows where values fall outside this range. 

We experimented with different thresholds:
- Using the 1st and 99th percentiles dropped almost no rows (stayed 7 million).
- Using the 2.5th and 97.5th percentiles reduced the dataset to about 4 million rows.
- The 2nd and 98th percentiles provided a good balance, resulting in a cleaned dataset of **6,141,325 rows**.

In [8]:
print("=== Outlier Detection for Continuous Variables (2nd/98th Percentiles) ===")

continuous_cols = [
    "Distance(mi)", "Temperature(F)", "Humidity(%)", "Pressure(in)", "Visibility(mi)", "Wind_Speed(mph)"
]
present_continuous = [c for c in continuous_cols if c in df.columns]

outlier_bounds_2_98 = {}

for c in present_continuous:
    # Calculate 2nd and 98th percentiles for each column
    q2, q98 = df.approxQuantile(c, [0.02, 0.98], 0.01)
    outlier_bounds_2_98[c] = (q2, q98)
    print(f"{c}: 2nd percentile = {q2}, 98th percentile = {q98}")

print("\nSuggested action: Remove values outside [2nd, 98th] percentiles for each variable.")

# Remove outliers outside [2nd, 98th] percentiles
for c in present_continuous:
    lower, upper = outlier_bounds_2_98[c]
    df = df.filter((col(c) >= lower) & (col(c) <= upper))

print("\nOutliers outside [2nd, 98th] percentiles have been removed for continuous variables.")
print(f"After outlier removal, row count: {df.count()}")

=== Outlier Detection for Continuous Variables (2nd/98th Percentiles) ===


                                                                                                    

Distance(mi): 2nd percentile = 0.0, 98th percentile = 4.496


                                                                                                    

Temperature(F): 2nd percentile = 17.0, 98th percentile = 93.0


                                                                                                    

Humidity(%): 2nd percentile = 15.0, 98th percentile = 100.0


                                                                                                    

Pressure(in): 2nd percentile = 25.27, 98th percentile = 30.35


                                                                                                    

Visibility(mi): 2nd percentile = 1.0, 98th percentile = 10.0


                                                                                                    

Wind_Speed(mph): 2nd percentile = 0.0, 98th percentile = 20.0

Suggested action: Remove values outside [2nd, 98th] percentiles for each variable.

Outliers outside [2nd, 98th] percentiles have been removed for continuous variables.




After outlier removal, row count: 6141325


                                                                                                    

## Equal-Width Binning for Continuous Variables

We discretize the main continuous variables using equal-width binning. For each variable, we create a binned version (with the original column name) and retain the original as a `_cont` column. For temperature, we convert values from Fahrenheit to Celsius before binning. Now, each continuous variable has its corresponding discretized version in the dataset.

In [9]:
# Define binning settings for each continuous variable
equal_width_bins = {
    "Distance(mi)": {"min": 0.0, "max": 4.5, "bins": 3},
    "Temperature(F)": {"min": 17.0, "max": 93.0, "bins": 4},  # Converted to Celsius
    "Humidity(%)": {"min": 17.0, "max": 100.0, "bins": 4},
    "Pressure(in)": {"min": 25.46, "max": 30.35, "bins": 3},
    "Visibility(mi)": {"min": 1.0, "max": 10.0, "bins": 3},
    "Wind_Speed(mph)": {"min": 0.0, "max": 20.0, "bins": 4}
}

for col_name, settings in equal_width_bins.items():
    if col_name in df.columns:
        # Rename original to _cont
        if f"{col_name}_cont" not in df.columns:
            df = df.withColumnRenamed(col_name, f"{col_name}_cont")
        min_v, max_v, bins = settings["min"], settings["max"], settings["bins"]
        width = (max_v - min_v) / bins
        if col_name == "Temperature(F)":
            # Convert to Celsius
            c_min = (min_v - 32) / 1.8
            c_max = (max_v - 32) / 1.8
            c_width = (c_max - c_min) / bins
            df = df.withColumn("Temperature(C)_cont", (col("Temperature(F)_cont") - 32) / 1.8)
            # Bin in Celsius
            bin_expr = (
                when(col("Temperature(C)_cont") < c_min + c_width, f"{c_min:.2f}–{c_min + c_width:.2f}")
                .when(col("Temperature(C)_cont") < c_min + 2 * c_width, f"{c_min + c_width:.2f}–{c_min + 2 * c_width:.2f}")
                .when(col("Temperature(C)_cont") < c_min + 3 * c_width, f"{c_min + 2 * c_width:.2f}–{c_min + 3 * c_width:.2f}")
                .otherwise(f"{c_min + 3 * c_width:.2f}–{c_max:.2f}")
            )
            df = df.withColumn("Temperature(C)", bin_expr)
        else:
            # Bin the rest
            if bins == 3:
                bin_expr = (
                    when(col(f"{col_name}_cont") < min_v + width, f"{min_v:.2f}–{min_v + width:.2f}")
                    .when(col(f"{col_name}_cont") < min_v + 2 * width, f"{min_v + width:.2f}–{min_v + 2 * width:.2f}")
                    .otherwise(f"{min_v + 2 * width:.2f}–{max_v:.2f}")
                )
            elif bins == 4:
                bin_expr = (
                    when(col(f"{col_name}_cont") < min_v + width, f"{min_v:.2f}–{min_v + width:.2f}")
                    .when(col(f"{col_name}_cont") < min_v + 2 * width, f"{min_v + width:.2f}–{min_v + 2 * width:.2f}")
                    .when(col(f"{col_name}_cont") < min_v + 3 * width, f"{min_v + 2 * width:.2f}–{min_v + 3 * width:.2f}")
                    .otherwise(f"{min_v + 3 * width:.2f}–{max_v:.2f}")
                )
            df = df.withColumn(col_name, bin_expr)

# Remove Fahrenheit columns after conversion
if "Temperature(F)_cont" in df.columns:
    df = df.drop("Temperature(F)_cont")
if "Temperature(F)" in df.columns:
    df = df.drop("Temperature(F)")

# Show a sample of the new discretized columns to verify binning
discretized_cols = ["Distance(mi)", "Temperature(C)", "Humidity(%)", "Pressure(in)", "Visibility(mi)", "Wind_Speed(mph)"]
for col_name in discretized_cols:
    if col_name in df.columns:
        print(f"\nValue counts for '{col_name}':")
        df.groupBy(col_name).count().orderBy(col_name).show(truncate=False)

# Show a few rows to confirm columns exist and are populated
print("\nSample rows with discretized columns:")
df.select(discretized_cols).show(5, truncate=False)


Value counts for 'Distance(mi)':


                                                                                                    

+------------+-------+
|Distance(mi)|count  |
+------------+-------+
|0.00–1.50   |5674110|
|1.50–3.00   |343977 |
|3.00–4.50   |123238 |
+------------+-------+


Value counts for 'Temperature(C)':


                                                                                                    

+--------------+-------+
|Temperature(C)|count  |
+--------------+-------+
|-8.33–2.22    |472710 |
|12.78–23.33   |2390579|
|2.22–12.78    |1410634|
|23.33–33.89   |1867402|
+--------------+-------+


Value counts for 'Humidity(%)':


                                                                                                    

+------------+-------+
|Humidity(%) |count  |
+------------+-------+
|17.00–37.75 |729271 |
|37.75–58.50 |1532462|
|58.50–79.25 |1923952|
|79.25–100.00|1955640|
+------------+-------+


Value counts for 'Pressure(in)':


                                                                                                    

+------------+-------+
|Pressure(in)|count  |
+------------+-------+
|25.46–27.09 |117139 |
|27.09–28.72 |277357 |
|28.72–30.35 |5746829|
+------------+-------+


Value counts for 'Visibility(mi)':


                                                                                                    

+--------------+-------+
|Visibility(mi)|count  |
+--------------+-------+
|1.00–4.00     |309867 |
|4.00–7.00     |323758 |
|7.00–10.00    |5507700|
+--------------+-------+


Value counts for 'Wind_Speed(mph)':


                                                                                                    

+---------------+-------+
|Wind_Speed(mph)|count  |
+---------------+-------+
|0.00–5.00      |1672798|
|10.00–15.00    |1229628|
|15.00–20.00    |498514 |
|5.00–10.00     |2740385|
+---------------+-------+


Sample rows with discretized columns:
+------------+--------------+------------+------------+--------------+---------------+
|Distance(mi)|Temperature(C)|Humidity(%) |Pressure(in)|Visibility(mi)|Wind_Speed(mph)|
+------------+--------------+------------+------------+--------------+---------------+
|0.00–1.50   |2.22–12.78    |79.25–100.00|28.72–30.35 |7.00–10.00    |0.00–5.00      |
|0.00–1.50   |-8.33–2.22    |79.25–100.00|28.72–30.35 |7.00–10.00    |0.00–5.00      |
|0.00–1.50   |2.22–12.78    |79.25–100.00|28.72–30.35 |4.00–7.00     |0.00–5.00      |
|0.00–1.50   |2.22–12.78    |79.25–100.00|28.72–30.35 |7.00–10.00    |0.00–5.00      |
|0.00–1.50   |-8.33–2.22    |79.25–100.00|28.72–30.35 |7.00–10.00    |0.00–5.00      |
+------------+--------------+------------+------------+-

## Weather Condition Grouping

We simplify the `Weather_Condition` column by mapping all detailed weather descriptions to broader, more meaningful groups (e.g., "Rainy", "Snowy", "Cloudy") to focus on high-level weather patterns in our analysis.

In [10]:
# Drop rows where Weather_Condition is 'N/A Precipitation' (only 1000 rows)
if "Weather_Condition" in df.columns:
    df = df.filter(col("Weather_Condition") != "N/A Precipitation")

# Define mapping from detailed weather conditions to broader groups
weather_groups = {
    "Clear": ["Clear", "Fair"],

    "Cloudy": [
        "Partly Cloudy", "Mostly Cloudy", "Cloudy", "Overcast", "Scattered Clouds"
    ],

    "Hazy & Dusty": [
        "Fog", "Shallow Fog", "Patches of Fog", "Partial Fog", "Light Fog", "Mist",
        "Light Haze", "Haze", "Smoke",
        "Freezing Fog", "Light Freezing Fog",
        "Widespread Dust", "Blowing Dust", "Sand / Dust Whirls Nearby", "Sand / Dust Whirlwinds"
    ],

    "Rainy": [
        "Rain", "Light Rain", "Heavy Rain", "Rain Showers", "Rain Shower", 
        "Light Rain Showers", "Heavy Rain Showers", "Showers in the Vicinity",
        "Drizzle", "Light Drizzle", "Heavy Drizzle", "Heavy Rain Shower", "Light Rain Shower", "Squalls"
    ],

    "Thunderstorm or Hail": [
        "Thunderstorm", "Thunder", "T-Storm", "Heavy T-Storm", "Thunder in the Vicinity", 
        "Thunderstorms and Rain", "Light Thunderstorms and Rain", "Heavy Thunderstorms and Rain",
        "Thunder and Hail", "Light Thunderstorm", "Light Rain with Thunder",
        "Heavy Thunderstorms with Small Hail", "Thunder / Wintry Mix",
        "Snow and Thunder", "Light Snow with Thunder", "Light Thunderstorms and Snow",
        "Funnel Cloud", "Tornado",
        "Hail", "Small Hail", "Light Hail"
    ],

    "Snowy": [
        "Snow", "Light Snow", "Heavy Snow", "Snow Showers", "Light Snow Showers",
        "Heavy Snow Shower", "Light Snow Shower", "Drifting Snow", "Blowing Snow",
        "Light Blowing Snow", "Snow Grains", "Light Snow Grains", "Wintry Mix",
        "Snow and Sleet", "Light Snow and Sleet"
    ],

    "Freezing Rain & Ice": [
        "Freezing Rain", "Light Freezing Rain", "Heavy Freezing Rain",
        "Freezing Drizzle", "Light Freezing Drizzle", "Heavy Freezing Drizzle",
        "Ice Pellets", "Light Ice Pellets", "Heavy Ice Pellets",
        "Sleet", "Light Sleet", "Heavy Sleet", "Rain and Sleet"
    ]

}

# Invert mapping for fast lookup
weather_map = {}
for group, items in weather_groups.items():
    for item in items:
        weather_map[item] = group

# Keep only rows with Weather_Condition in the mapping
if "Weather_Condition" in df.columns:
    mapped_conditions = list(weather_map.keys())
    df = df.filter(col("Weather_Condition").isin(mapped_conditions))

def map_weather(cond):
    if cond is None:
        return "Unknown"
    return weather_map[cond]  # No default, since all values are mapped

weather_udf = udf(map_weather, StringType())
if "Weather_Condition" in df.columns:
    df = df.withColumn("Weather_Condition", weather_udf(df["Weather_Condition"]))
    print("Unique values in 'Weather_Condition' after grouping:")
    df.select("Weather_Condition").distinct().show(truncate=False)

Unique values in 'Weather_Condition' after grouping:


                                                                                                    

+--------------------+
|Weather_Condition   |
+--------------------+
|Cloudy              |
|Snowy               |
|Thunderstorm or Hail|
|Clear               |
|Rainy               |
|Freezing Rain & Ice |
|Hazy & Dusty        |
+--------------------+



## Urban vs. Rural Classification

We enrich our dataset by classifying each accident as occurring in an urban or rural area. We do this by joining with a list of US urban cities, matching on city and state. If a match is found, the accident is labeled "Urban"; otherwise, it is labeled "Rural".

In [11]:
from pyspark.sql.functions import concat_ws, trim, upper, col, when

# Load the US urban cities dataset
cities_path = "/kaggle/input/us-urban-cities/US Urban Cities.csv"
df_cities = spark.read.csv(cities_path, header=True, inferSchema=True)

# Prepare city-state keys for joining (ensure function names are not overwritten)
df = df.withColumn("city_state_key", concat_ws(", ", trim(col("City")), trim(col("State"))))
df_cities = df_cities.withColumn("city_state_key", upper(trim(col("NAME"))))

# Also uppercase the accident key for robust matching
df = df.withColumn("city_state_key", upper(col("city_state_key")))

# Join to determine Urban/Rural
df = df.join(
    df_cities.select("city_state_key").withColumn("Urban_Rural", when(col("city_state_key").isNotNull(), "Urban")),
    on="city_state_key",
    how="left"
)

# Fill nulls with "Rural"
df = df.withColumn("Urban_Rural", when(col("Urban_Rural").isNull(), "Rural").otherwise(col("Urban_Rural")))

# Drop the helper key column if you want
df = df.drop("city_state_key")

# Check the result: show counts for Urban and Rural
df.groupBy("Urban_Rural").count().show()



+-----------+-------+
|Urban_Rural|  count|
+-----------+-------+
|      Urban|1884199|
|      Rural|4255990|
+-----------+-------+



                                                                                                    

## Final Dataset: Before vs. After Preprocessing

- **Start:** 7,728,394 rows, 46 columns (with missing values, redundant, and raw features)
- **End:** 6,140,189 rows, 42 columns (fully cleaned, binned, grouped, and enriched)

**Key changes:**  
- Removed columns with high missing or low analytical value  
- Filtered outliers and binned continuous variables  
- Grouped weather conditions  
- Added Urban/Rural classification

In [12]:
print(f"Rows: {df.count()}, Columns: {len(df.columns)}")
df.printSchema()



Rows: 6140189, Columns: 42
root
 |-- Severity: integer (nullable = true)
 |-- Start_Time: timestamp (nullable = true)
 |-- End_Time: timestamp (nullable = true)
 |-- Start_Lat: double (nullable = true)
 |-- Start_Lng: double (nullable = true)
 |-- Distance(mi)_cont: double (nullable = true)
 |-- Description: string (nullable = true)
 |-- Street: string (nullable = true)
 |-- City: string (nullable = true)
 |-- County: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- Timezone: string (nullable = true)
 |-- Airport_Code: string (nullable = true)
 |-- Weather_Timestamp: timestamp (nullable = true)
 |-- Humidity(%)_cont: double (nullable = true)
 |-- Pressure(in)_cont: double (nullable = true)
 |-- Visibility(mi)_cont: double (nullable = true)
 |-- Wind_Direction: string (nullable = true)
 |-- Wind_Speed(mph)_cont: double (nullable = true)
 |-- Weather_Condition: string (nullable = true)
 |-- Amenity: boolean (nullable = true)
 |-- B

                                                                                                    

## Exporting the Cleaned Dataset

In [13]:
import os
import shutil
import pandas as pd

# Write to a directory (not a file)
output_dir = "/kaggle/working/US_Accidents_Cleaned"
df.coalesce(1).write.mode("overwrite").option("header", True).csv(output_dir)

print(f"Cleaned dataset exported as a directory to: {output_dir}")

# Stop Spark session
spark.stop()

# Find the part file and move it
part_file = [f for f in os.listdir(output_dir) if f.startswith("part-") and f.endswith(".csv")][0]
shutil.move(os.path.join(output_dir, part_file), "/kaggle/working/US_Accidents_Cleaned.csv")
shutil.rmtree(output_dir)

                                                                                                    

Cleaned dataset exported as a directory to: /kaggle/working/US_Accidents_Cleaned


## Reading Exported Dataset in Pandas

In [14]:
df_pd = pd.read_csv("/kaggle/working/US_Accidents_Cleaned.csv")
print(df_pd.info())
print(df_pd.head())

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 6140189 entries, 0 to 6140188
Data columns (total 42 columns):
 #   Column                Dtype  
---  ------                -----  
 0   Severity              int64  
 1   Start_Time            object 
 2   End_Time              object 
 3   Start_Lat             float64
 4   Start_Lng             float64
 5   Distance(mi)_cont     float64
 6   Description           object 
 7   Street                object 
 8   City                  object 
 9   County                object 
 10  State                 object 
 11  Zipcode               object 
 12  Timezone              object 
 13  Airport_Code          object 
 14  Weather_Timestamp     object 
 15  Humidity(%)_cont      float64
 16  Pressure(in)_cont     float64
 17  Visibility(mi)_cont   float64
 18  Wind_Direction        object 
 19  Wind_Speed(mph)_cont  float64
 20  Weather_Condition     object 
 21  Amenity               bool   
 22  Bump                  bool   
 23  Crossin