In [0]:
# listing files to check if uploaded files are present in directory

%fs ls FileStore/tables/accidents12.csv

path,name,size
dbfs:/FileStore/tables/accidents12.csv,accidents12.csv,859047752


In [0]:
# reading in the csv file as spark dataframe

path = "/FileStore/tables/accidents12.csv"

df = sqlContext.read.load(path, 
                      format='com.databricks.spark.csv', 
                      header='true', 
                      inferSchema='true')
# df.show()
# df.printSchema()

In [0]:
# Selecting only required columns

reqcol = ['Start_Time', 'Severity', 'Side', 'Temperature(F)', 'Humidity(%)', 'Pressure(in)', 'Visibility(mi)',  'Weather_Condition', 'Junction',
         'Traffic_Signal', 'Sunrise_Sunset']

filtered_df = df.select(reqcol)

filtered_df.printSchema()

In [0]:
# mapping response variable to a binary outcome

from pyspark.sql.types import *

# binning severity

def response_variable(inp):
  if inp >= 3:
    return 1
  else:
    return 0
  
mapper = udf(response_variable, IntegerType())
filtered_df = filtered_df.withColumn("Severity", mapper("Severity"))


In [0]:
# weather condition binning

def weather_condition(inp):
  if inp in ["Thunderstorm", "Light Freezing Rain", "Rain Showers", "Drizzle", "Heavy Drizzle", "Heavy Rain",
             "Light Rain Showers", "Light Rain", "Rain", "Light Drizzle", "Heavy Freezing Rain", "Heavy Rain Showers", "Light Thunderstorm",
             "Thunderstorms and Rain","Heavy Thunderstorms and Rain", "Light Freezing Drizzle",  "Light Thunderstorms and Rain",
             "Heavy Freezing Drizzle"]:
    return "rain"
  
  elif inp in ["Scattered Clouds", "Partly Cloudy", "Mostly Cloudy", "Funnel Cloud", "Light Haze", "Haze", "Overcast"]:
    return "cloudy"
  
  elif inp in ["Shallow Fog", "Volcanic Ash", "Widespread Dust", "Squalls", "Patches of Fog" , "Fog", "Light Freezing Fog", "Sand", "Light Fog", "Blowing Sand",
            "Mist", "Heavy Smoke", "Smoke", "Dust Whirls"]:
    return "fog"
  
  elif inp in ["Clear"]:
    return "clear"
  
  elif inp in ["Low Drifting Snow", "Blowing Snow", "Snow Grains", "Snow", "Light Snow", "Heavy Snow", "Heavy Ice Pellets", "Snow Showers", "Heavy Blowing Snow",
            "Small Hail", "Hail", "Light Hail", "Light Blowing Snow", "Light Snow Grains", "Light Snow Showers", "Light Thunderstorms and Snow",
              "Ice Pellets", "Light Ice Pellets", "Heavy Thunderstorms and Snow", "Heavy Thunderstorms with Small Hail", "Thunderstorms and Snow"]:
    return "snow"
  
  elif inp is not None:
    return inp
  
# Applying user defined function to spark df

mapper = udf(weather_condition, StringType())
filtered_df = filtered_df.withColumn("Weather_Condition", mapper("Weather_Condition"))

In [0]:
from pyspark.sql.functions import *

filtered_df = filtered_df.na.drop()

# extracting day, month, week, hour from time

filtered_df = filtered_df.withColumn('Start_Time', filtered_df['Start_Time'].cast('timestamp'))
time = filtered_df.select(month(filtered_df.Start_Time).alias('dt_month'), dayofmonth(filtered_df.Start_Time).alias('dt_day'),
                                hour(filtered_df.Start_Time).alias('dt_hr'), weekofyear(filtered_df.Start_Time).alias('dt_week'))

# creating function to combine two dataframes by joining on index

def zipindexdf(df):
    schema_new = df.schema.add("index", LongType(), False)
    return df.rdd.zipWithIndex().map(lambda l: list(l[0]) + [l[1]]).toDF(schema_new)
  
filtered_df = zipindexdf(filtered_df)
time = zipindexdf(time)
filtered_df = filtered_df.join(time, "index", "inner")
filtered_df = filtered_df.drop("index", "Start_Time")

In [0]:
(training_data, test_data) = filtered_df.randomSplit([0.9, 0.1], 11)
training_data = training_data.cache()
test_data = test_data.cache()

In [0]:
# one hot encoding of variables

from pyspark.ml.feature import StringIndexer, OneHotEncoder
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline

string_to_num = ["Side", "Weather_Condition", "Sunrise_Sunset"]
num_to_one_hot = ["Side_num", "Weather_Condition_num", "Sunrise_Sunset_num", "dt_month", "dt_day", "dt_hr", "dt_week"]

stage_string = [StringIndexer(inputCol= c, outputCol= c+"_num") for c in string_to_num]
stage_one_hot = [OneHotEncoder(inputCol= c, outputCol= c+ "_one_hot") for c in num_to_one_hot]
ppl = Pipeline(stages = stage_string + stage_one_hot)
ppl_fit = ppl.fit(training_data)
training_data = ppl_fit.transform(training_data)
training_data = training_data.drop("Side_num", "Weather_Condition_num", "Sunrise_Sunset_num", "dt_month", "dt_day", "dt_hr", "dt_week",
                                  "Side", "Weather_Condition", "Sunrise_Sunset")


In [0]:
# combining features into single vector to be fed into ML algorithms

input_col = ['Temperature(F)', 'Humidity(%)', 'Pressure(in)', 'Visibility(mi)', 'Junction', 'Traffic_Signal', 'Side_num_one_hot', 'Weather_Condition_num_one_hot',
            'Sunrise_Sunset_num_one_hot', 'dt_month_one_hot', 'dt_day_one_hot', 'dt_hr_one_hot', 'dt_week_one_hot']

assembler = VectorAssembler(inputCols = input_col, outputCol = "features")
training_data = assembler.transform(training_data)
training_data = training_data.select("features", "Severity") 



In [0]:
# preparing test data

# converting to numeric and one hot
test_data = ppl_fit.transform(test_data)
test_data = test_data.drop("Side_num", "Weather_Condition_num", "Sunrise_Sunset_num", "dt_month", "dt_day", "dt_hr", "dt_week",
                           "Side", "Weather_Condition", "Sunrise_Sunset")
test_data = assembler.transform(test_data)
test_data = test_data.select("features", "Severity") 



In [0]:
# applying Logistic regression

from pyspark.ml.classification import LogisticRegression

lrgen = LogisticRegression(labelCol="Severity", featuresCol="features", maxIter=10, standardization=True)
linearModelgen = lrgen.fit(training_data)

In [0]:
# predicting on test data

predictions = linearModelgen.transform(test_data)
y_true = predictions.select(['Severity']).collect()
y_pred = predictions.select(['prediction']).collect()

from sklearn.metrics import classification_report, confusion_matrix
print(classification_report(y_true, y_pred))

In [0]:
# predicting on train data

predictions_train = linearModelgen.transform(training_data)

y_true = predictions_train.select(['Severity']).collect()
y_pred = predictions_train.select(['prediction']).collect()

from sklearn.metrics import classification_report, confusion_matrix
print(classification_report(y_true, y_pred))

In [0]:
# Applying Random forest

from pyspark.ml.classification import RandomForestClassifier as RF


rf = RF(labelCol='Severity', featuresCol='features', numTrees=200)
fit = rf.fit(training_data)

In [0]:
# Rf on training data

predictions_train = fit.transform(training_data)

y_true = predictions_train.select(['Severity']).collect()
y_pred = predictions_train.select(['prediction']).collect()

from sklearn.metrics import classification_report, confusion_matrix
print(classification_report(y_true, y_pred))

In [0]:
# Rf on test data

predictions_test = fit.transform(test_data)

y_true = predictions_test.select(['Severity']).collect()
y_pred = predictions_test.select(['prediction']).collect()

from sklearn.metrics import classification_report, confusion_matrix
print(classification_report(y_true, y_pred))

In [0]:
# Applying Decision tree classifier

from pyspark.ml.classification import DecisionTreeClassifier

dt = DecisionTreeClassifier(labelCol="Severity", featuresCol="features")
dt_fit = dt.fit(training_data)

In [0]:
# Evaluating on training data 

predictions_train = dt_fit.transform(training_data)

y_true = predictions_train.select(['Severity']).collect()
y_pred = predictions_train.select(['prediction']).collect()

from sklearn.metrics import classification_report, confusion_matrix
print(classification_report(y_true, y_pred))

In [0]:
# Evaluating on test data 
predictions_test = dt_fit.transform(test_data)

y_true = predictions_test.select(['Severity']).collect()
y_pred = predictions_test.select(['prediction']).collect()

from sklearn.metrics import classification_report, confusion_matrix
print(classification_report(y_true, y_pred))