In [1]:
import findspark
findspark.init()
import pyspark

from pyspark.sql.functions import *
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType


spark=spark = SparkSession \
    .builder \
    .appName("US_accidents") \
    .getOrCreate()
print(spark)

<pyspark.sql.session.SparkSession object at 0x000001A634E423A0>


In [2]:
df = spark.read.options(header="True", InferSchema="True", nullValue="null" ).csv("dataset/US_Accidents_Dec21_updated.csv")
#split the date and creates variable duration stores in minutes
#cast boolean columns to int
df = df.withColumn("dayofweek", dayofweek(col("Start_Time")).alias("dayofweek"))\
    .withColumn("year", year(col("Start_Time")).alias("year"))\
    .withColumn("month", month(col("Start_Time")).alias("month"))\
    .withColumn("dayofmonth", dayofmonth(col("Start_Time")).alias("dayofmonth"))\
    .withColumn("hour", hour(col("Start_Time")).alias("hour"))\
    .withColumn("Duration", (col("End_Time").cast("long") - col("Start_Time").cast("long"))/60)\
    .withColumn("Amenity",df.Amenity.cast(IntegerType()))\
    .withColumn("Crossing",df.Crossing.cast(IntegerType()))\
    .withColumn("Give_Way",df.Give_Way.cast(IntegerType()))\
    .withColumn("Junction",df.Junction.cast(IntegerType()))\
    .withColumn("No_Exit",df.No_Exit.cast(IntegerType()))\
    .withColumn("Railway",df.Railway.cast(IntegerType()))\
    .withColumn("Roundabout",df.Roundabout.cast(IntegerType()))\
    .withColumn("Station",df.Station.cast(IntegerType()))\
    .withColumn("Stop",df.Stop.cast(IntegerType()))\
    .withColumn("Traffic_Calming",df.Traffic_Calming.cast(IntegerType()))\
    .withColumn("Traffic_Signal",df.Traffic_Signal.cast(IntegerType()))

In [3]:
#dropping columns
#drop starttime, endtime, Weather_Tiestamp because I no longer need them
#drop number because it has 1700000 missing values and extreemly low correlation with target variable
#drop country because it's always the same 
#drop ID and Description because were causing an error with fitting the model on the pipeline both are unnecessary
#drop end_lng, end_lat, bump, wind_chill concluded from the correlation analysis (high corr with other variables)
#drop turningloop because has no correlation with other variables
#drop sunrizes and sunsets because I dont need them, I have hour
#drop airport code and Timezone because are not important and have a few missing values
df = df.drop('Start_Time', 'End_Time', 'Weather_Timestamp', 'Number', 'Country', 'Description','ID',
                 'End_Lng', 'End_Lat', 'Bump', 'Turning_Loop', 'Wind_Chill(F)', 'Sunrise_Sunset','Civil_Twilight',
                 'Nautical_Twilight','Astronomical_Twilight','Airport_Code','Timezone')

#instead of dropping precipitation, fill with 0 because most of the missing data in this column
#is when the weather_condition wasn't rainy, therefor are no measures for precipitation
df=df.na.fill(value=0,subset=["Precipitation(in)"])

#remaining columns with missing values:::
#Street:2; City:137; Zipcode:1319; Temperature:69274; Humidity:73092; Pressure:59200; Visibility:70546;
#Wind_Direction:73775; Wind_Speed:157944; Weather_Condition:70636;
df = df.na.drop() 
#drop rows with missing values, in total 187383 rows lost

In [4]:
newdf=df.groupby("Severity").count().show()

+--------+-------+
|Severity|  count|
+--------+-------+
|       3| 135914|
|       4| 112405|
|       2|2384141|
|       1|  25499|
+--------+-------+



In [5]:
import pyspark.sql.functions as F
#orders the dataframe by a random column to create garantee that the partitions are not ordered by date, 
#this ensures that various dates (years) are included
df = df.select("*").orderBy(F.rand())

In [7]:
df.repartition(1).write.option("header",True) \
        .partitionBy("Severity") \
        .csv("dataset/severity")