# Data Cleaning

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
    .master("local[*]")\
    .appName("Accident")\
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

spark.sparkContext.setLogLevel("ERROR")

In [5]:
data = spark.read.csv("../data/US_Accidents_March23.csv", header=True, inferSchema=True)
data.show(5, truncate=False)

                                                                                

+---+-------+--------+-------------------+-------------------+-----------------+------------------+-------+-------+------------+-------------------------------------------------------------------------------------+-------------------------+------------+----------+-----+----------+-------+----------+------------+-------------------+--------------+-------------+-----------+------------+--------------+--------------+---------------+-----------------+-----------------+-------+-----+--------+--------+--------+-------+-------+----------+-------+-----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+
|ID |Source |Severity|Start_Time         |End_Time           |Start_Lat        |Start_Lng         |End_Lat|End_Lng|Distance(mi)|Description                                                                          |Street                   |City        |County    |State|Zipcode   |Country|Timezone  |Airport_Code|Weather_Timestamp  |T

In [6]:
data.printSchema()

root
 |-- ID: string (nullable = true)
 |-- Source: string (nullable = true)
 |-- Severity: integer (nullable = true)
 |-- Start_Time: timestamp (nullable = true)
 |-- End_Time: timestamp (nullable = true)
 |-- Start_Lat: double (nullable = true)
 |-- Start_Lng: double (nullable = true)
 |-- End_Lat: double (nullable = true)
 |-- End_Lng: double (nullable = true)
 |-- Distance(mi): double (nullable = true)
 |-- Description: string (nullable = true)
 |-- Street: string (nullable = true)
 |-- City: string (nullable = true)
 |-- County: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- Timezone: string (nullable = true)
 |-- Airport_Code: string (nullable = true)
 |-- Weather_Timestamp: timestamp (nullable = true)
 |-- Temperature(F): double (nullable = true)
 |-- Wind_Chill(F): double (nullable = true)
 |-- Humidity(%): double (nullable = true)
 |-- Pressure(in): double (nullable = true)
 |-- V

## Drop Useless Columns

In [7]:
columns_to_drop = [
    "ID",
    "Source",
    "Description",
    "Street",
    "Zipcode",
    "Airport_Code",
    "End_Lat",
    "End_Lng",
    "Country",
    "Timezone",
    "Weather_Timestamp"
]


In [8]:
data = data.drop(*columns_to_drop)

## Handle String columns

In [9]:
columns_to_encode = [
    "Sunrise_Sunset",
    "Civil_Twilight",
    "Nautical_Twilight",
    "Astronomical_Twilight",
    "Weather_Condition",
    "Wind_Direction",
    "State",
    "City"
]

In [10]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder

indexers = [StringIndexer(inputCol=col, outputCol=col + "_indexed", handleInvalid='keep') for col in columns_to_encode]
encoders = [OneHotEncoder(inputCol=col + "_indexed", outputCol=col + "_encoded") for col in columns_to_encode]

pipeline = Pipeline(stages=indexers + encoders)
model = pipeline.fit(data)
data = model.transform(data)


                                                                                

In [11]:
from pyspark.sql.functions import col

boolean_columns = [col for col in data.columns if data.select(col).dtypes[0][1] == 'boolean']

for col_name in boolean_columns:
    data = data.withColumn(col_name, col(col_name).cast("integer"))

## NaN or NULL

On calcule le pourcentage de NULL ou NaN values pour chaque colonne pour etre sur que ca ne soit pas majoritaire

In [12]:
from pyspark.sql.functions import col, isnan, when, count

total_rows = data.count()

col_type_dict = dict(data.dtypes)

missing_df = data.select([
    (
        count(
            when(
                (col(c).isNull() | isnan(c)) if col_type_dict[c] in ["double", "float"] else col(c).isNull(),
                c
            )
        ) / total_rows
    ).alias(c)
    for c in data.columns
])

missing_df.show()




+--------+----------+--------+---------+---------+------------+--------------------+------+-----+--------------------+-------------------+-------------------+-------------------+--------------------+--------------------+-------------------+------------------+-------------------+-------+----+--------+--------+--------+-------+-------+----------+-------+----+---------------+--------------+------------+--------------------+--------------------+--------------------+---------------------+----------------------+----------------------+-------------------------+-----------------------------+-------------------------+----------------------+-------------+------------+----------------------+----------------------+-------------------------+-----------------------------+-------------------------+----------------------+-------------+------------+
|Severity|Start_Time|End_Time|Start_Lat|Start_Lng|Distance(mi)|                City|County|State|      Temperature(F)|      Wind_Chill(F)|        Humidity(

                                                                                

In [13]:
numeric_cols = [f.name for f in data.schema.fields if f.dataType.simpleString() in ["double", "integer"]]

for col_name in numeric_cols:
    median_val = data.approxQuantile(col_name, [0.5], 0.01)[0]
    data = data.fillna({col_name: median_val})

                                                                                

In [14]:
categorical_cols = [f.name for f in data.schema.fields if f.dataType.simpleString() == "string"]

data = data.fillna({col_name: "Unknown" for col_name in categorical_cols})

In [15]:
data.show(5, truncate=False)

+--------+-------------------+-------------------+-----------------+------------------+------------+------------+----------+-----+--------------+-------------+-----------+------------+--------------+--------------+---------------+-----------------+-----------------+-------+----+--------+--------+--------+-------+-------+----------+-------+----+---------------+--------------+------------+--------------+--------------+-----------------+---------------------+----------------------+----------------------+-------------------------+-----------------------------+-------------------------+----------------------+-------------+------------+----------------------+----------------------+-------------------------+-----------------------------+-------------------------+----------------------+---------------+--------------------+
|Severity|Start_Time         |End_Time           |Start_Lat        |Start_Lng         |Distance(mi)|City        |County    |State|Temperature(F)|Wind_Chill(F)|Humidity(%)|Pr

In [17]:
data.write.mode("overwrite").parquet("../data/processed_accidents.parquet")


                                                                                

In [18]:
spark.stop()