cat_names = ['Side', 'Country', 'Timezone', 'Amenity', 'Bump', 'Crossing', 
             'Give_Way', 'Junction', 'No_Exit', 'Railway', 'Roundabout', 'Station', 
             'Stop', 'Traffic_Calming', 'Traffic_Signal', 'Turning_Loop', 'Sunrise_Sunset', 
             'Civil_Twilight', 'Nautical_Twilight', 'Astronomical_Twilight']



In [0]:
from pyspark.sql.functions import col, concat_ws, split, explode, array_distinct, when, lower, lit
import re

In [0]:
data_file_path = "/Volumes/mlops_dev/corretco/data/US_Accidents_March23_short.csv"

In [0]:
raw_df = spark.read. \
    format("csv") \
    .option("header", "true") \
    .option("separator", ",") \
    .load(data_file_path)

raw_df.createOrReplaceTempView("raw_accidents")

### Useless Features

Features 'ID' doesn't provide any useful information about accidents themselves. 'TMC', 'Distance(mi)', 'End_Time' (we have start time), 'Duration', 'End_Lat', and 'End_Lng'(we have start location) can be collected only after the accident has already happened and hence cannot be predictors for serious accident prediction. For 'Description', the POI features have already been extracted from it by dataset creators. Let's get rid of these features first. 'Country' and 'Turning_Loop' dropped toom for they have only one class. More than 60% percent of 'Number', 'Wind_Chill(F)' is missing; we drop these columns too.


In [0]:
# We arbitrarily choose one source only. Different websites / databases use different collection and classification methodologies. We will focus on the source offering the most data: Source1

clean_df = spark.sql("""
    WITH median_precipitation AS (
      SELECT median(CAST(`Precipitation(in)` as decimal(12,4))) AS median_precip
      FROM raw_accidents
      WHERE `Precipitation(in)` IS NOT NULL
    )
    SELECT
        CAST(`Severity` as string) as `Severity`,
        CAST(`Start_Time` as timestamp) as `Start_Time`,
        CAST(extract(YEAR FROM CAST(`Start_Time` as timestamp)) as int) as `Year`,
        CAST(extract(MONTH FROM CAST(`Start_Time` as timestamp)) as int) as `Month`,
        CAST(WEEKDAY(CAST(`Start_Time` as timestamp)) as int) as `Weekday`,
        CAST(extract(DAY FROM CAST(`Start_Time` as timestamp)) as int) as `Day`,
        CAST(extract(HOUR FROM CAST(`Start_Time` as timestamp)) as int) as `Hour`,
        CAST(extract(MINUTE FROM CAST(`Start_Time` as timestamp)) as int) as `Minute`, 
        CAST(`Start_Lat` as decimal(38, 10)) as `Start_Lat`,
        CAST(`Start_Lng` as decimal(38, 10)) as `Start_Lng`,
        CAST(`Street` as string) as `Street`,
        CAST(`City` as string) as `City`,
        CAST(`County` as string) as `County`,
        CAST(`State` as string) as `State`,
        CAST(`Zipcode` as string) as `Zipcode`,
        CAST(`Timezone` as string) as `Timezone`,
        CAST(`Airport_Code` as string) as `Airport_Code`,
        CAST(`Weather_Timestamp` as timestamp) as `Weather_Timestamp`,
        CAST(`Temperature(F)` as decimal(38, 10)) as `Temperature(F)`,
        CAST(`Humidity(%)` as decimal(38, 10)) as `Humidity(%)`,
        CAST(`Pressure(in)` as decimal(38, 10)) as `Pressure(in)`,
        CAST(`Visibility(mi)` as decimal(38, 10)) as `Visibility(mi)`,
        CASE
          WHEN Wind_Direction = 'Calm' THEN 'CALM'
          WHEN Wind_Direction = 'West' THEN 'W'
          WHEN Wind_Direction = 'WSW' THEN 'W'
          WHEN Wind_Direction = 'South' THEN 'S'
          WHEN Wind_Direction = 'SSW' THEN 'S'
          WHEN Wind_Direction = 'North' THEN 'N'
          WHEN Wind_Direction = 'NNW' THEN 'N'
          WHEN Wind_Direction = 'East' THEN 'E'
          WHEN Wind_Direction = 'ESE' THEN 'E'
          WHEN Wind_Direction = 'Variable' THEN 'VAR'
          ELSE Wind_Direction
        END as `Wind_Direction`,
        CAST(`Wind_Speed(mph)` as decimal(38, 10)) as `Wind_Speed(mph)`,
        CASE
          WHEN `Precipitation(in)` IS NULL THEN (SELECT median_precip FROM median_precipitation)
          ELSE CAST(`Precipitation(in)` as decimal(38, 10))
        END as `Precipitation(in)`,
        CAST(`Weather_Condition` as string) as `Weather_Condition`,
        CAST(`Amenity` as boolean) as `Amenity`,
        CAST(`Bump` as boolean) as `Bump`,
        CAST(`Crossing` as boolean) as `Crossing`,
        CAST(`Give_Way` as boolean) as `Give_Way`,
        CAST(`Junction` as boolean) as `Junction`,
        CAST(`No_Exit` as boolean) as `No_Exit`,
        CAST(`Railway` as boolean) as `Railway`,
        CAST(`Roundabout` as boolean) as `Roundabout`,
        CAST(`Station` as boolean) as `Station`,
        CAST(`Stop` as boolean) as `Stop`,
        CAST(`Traffic_Calming` as boolean) as `Traffic_Calming`,
        CAST(`Traffic_Signal` as boolean) as `Traffic_Signal`,
        CAST(`Sunrise_Sunset` as string) as `Sunrise_Sunset`,
        CAST(`Civil_Twilight` as string) as `Civil_Twilight`,
        CAST(`Nautical_Twilight` as string) as `Nautical_Twilight`,
        CAST(`Astronomical_Twilight` as string) as `Astronomical_Twilight`
    FROM raw_accidents
    WHERE Source = 'Source1'
""")

In [0]:
# Special treatment of weather conditions

# We first simplify the weather conditions to a few categories
clean_df = clean_df.withColumn('Clear', when(lower(col('Weather_Condition')).contains('clear'), True).otherwise(False))
clean_df = clean_df.withColumn('Cloud', when(lower(col('Weather_Condition')).contains('cloud') | lower(col('Weather_Condition')).contains('overcast'), True).otherwise(False))
clean_df = clean_df.withColumn('Rain', when(lower(col('Weather_Condition')).contains('rain') | lower(col('Weather_Condition')).contains('storm'), True).otherwise(False))
clean_df = clean_df.withColumn('Heavy_Rain', when(lower(col('Weather_Condition')).contains('heavy rain') |
                                       lower(col('Weather_Condition')).contains('rain shower') |
                                       lower(col('Weather_Condition')).contains('heavy t-storm') |
                                       lower(col('Weather_Condition')).contains('heavy thunderstorms'), True).otherwise(False))
clean_df = clean_df.withColumn('Snow', when(lower(col('Weather_Condition')).contains('snow') |
                                 lower(col('Weather_Condition')).contains('sleet') |
                                 lower(col('Weather_Condition')).contains('ice'), True).otherwise(False))
clean_df = clean_df.withColumn('Heavy_Snow', when(lower(col('Weather_Condition')).contains('heavy snow') |
                                       lower(col('Weather_Condition')).contains('heavy sleet') |
                                       lower(col('Weather_Condition')).contains('heavy ice pellets') |
                                       lower(col('Weather_Condition')).contains('snow showers') |
                                       lower(col('Weather_Condition')).contains('squalls'), True).otherwise(False))
clean_df = clean_df.withColumn('Fog', when(lower(col('Weather_Condition')).contains('fog'), True).otherwise(False))

# Show weather conditions as NULLs if the Weather_Condition is NULL
weather = ['Clear', 'Cloud', 'Rain', 'Heavy_Rain', 'Snow', 'Heavy_Snow', 'Fog']
for condition in weather:
    clean_df = clean_df.withColumn(condition,
                       when(col('Weather_Condition').isNull(), lit(None))
                       .otherwise(col(condition)))

In [0]:
# Drop rows with null values in the specified columns
clean_df = clean_df.dropna(subset=['City', 'Zipcode', 'Airport_Code',
                       'Sunrise_Sunset', 'Civil_Twilight', 'Nautical_Twilight', 'Astronomical_Twilight'])

In [0]:
display(clean_df)