## Import and explore public dataset on UK road traffic accidents
#### Data available from Kaggle: https://www.kaggle.com/daveianhickey/2000-16-traffic-flow-england-scotland-wales
#### Guide to column defintions available here: https://beta.ukdataservice.ac.uk/datacatalogue/studies/study?id=7752&type=Data%20catalogue#!/documentation

#### Consider importing more data to improve size of undersampled dataset
#### fix randomstate for repeatability
#### Expand or reduce dimensions
#### Write results to a table and save

### 1. Import data from DBFS

In [4]:
# File location and type

def create_dataframe (location):
  

  #file_location = "/FileStore/tables/accidents_2012_to_2014.csv"
  #file_location = "/FileStore/tables/accidents_2009_to_2011.csv"

  file_type = "csv"

  # CSV options
  infer_schema = "true"
  first_row_is_header = "true"
  delimiter = ","

  # The applied options are for CSV files. For other file types, these will be ignored.
  df = spark.read.format(file_type) \
    .option("inferSchema", infer_schema) \
    .option("header", first_row_is_header) \
    .option("sep", delimiter) \
    .load(location)
  
  return(df)

# illegal characters for saveastable but not for createorreplacetempview
df_0911 = create_dataframe("/FileStore/tables/accidents_2009_to_2011.csv")

temp_table_name = "accidents_0911"
df_0911.createOrReplaceTempView(temp_table_name)
#df_0911.write.saveAsTable(temp_table_name)


df_1214 = create_dataframe("/FileStore/tables/accidents_2012_to_2014.csv")
temp_table_name = "accidents_1214"
df_1214.createOrReplaceTempView(temp_table_name)
#df_1214.write.saveAsTable(temp_table_name)



In [5]:
%sql
-- UNION works here, but is it the right join given the similarity of the two tables?
CREATE OR REPLACE TEMPORARY VIEW joined_tables AS
(SELECT *
FROM accidents_0911
UNION
SELECT *
FROM accidents_1214)

In [6]:
%sql
SELECT Accident_severity, count(1)
FROM joined_tables
GROUP BY Accident_severity
ORDER BY Accident_severity

Accident_severity,count(1)
1,10488
2,124623
3,764876


### 3. Main task: train classifiers to for predicting accident severity given hand-picked inputs

In [8]:
%sql
-- select limited subset of available columns (preferring non-geographic-specific data types)
-- remove unknown and null values (is this necessary?)
-- new candidates: day_of_week
SELECT Accident_severity, Road_Type, Speed_limit,
Light_Conditions, Weather_Conditions, Road_Surface_Conditions, Urban_or_Rural_Area
FROM joined_tables
WHERE (Road_Type <> 'Unknown' AND Weather_Conditions <> 'Unknown' AND Road_Surface_Conditions IS NOT NULL
AND Urban_or_Rural_Area <> 3)
SORT BY Accident_severity

Accident_severity,Road_Type,Speed_limit,Light_Conditions,Weather_Conditions,Road_Surface_Conditions,Urban_or_Rural_Area
1,Single carriageway,30,Daylight: Street light present,Fine without high winds,Dry,1
1,Single carriageway,30,Daylight: Street light present,Fine without high winds,Dry,1
1,Single carriageway,60,Darkness: Street lights present and lit,Fine without high winds,Flood (Over 3cm of water),2
1,Dual carriageway,50,Darkeness: No street lighting,Fine without high winds,Frost/Ice,2
1,Dual carriageway,70,Daylight: Street light present,Fine without high winds,Dry,2
1,Single carriageway,60,Daylight: Street light present,Fine without high winds,Dry,2
1,Dual carriageway,70,Daylight: Street light present,Fine without high winds,Dry,2
1,Single carriageway,50,Daylight: Street light present,Fine without high winds,Dry,2
1,Single carriageway,60,Daylight: Street light present,Fine without high winds,Dry,2
1,Single carriageway,60,Daylight: Street light present,Fine without high winds,Dry,2


### 4. Convert table subset to pandas dataframe

In [10]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

pdDF = sql("""SELECT Accident_severity, Road_Type, Speed_limit,
Light_Conditions, Weather_Conditions, Road_Surface_Conditions, Urban_or_Rural_Area, day_of_week
FROM joined_tables
WHERE (Road_Type <> 'Unknown' AND Weather_Conditions <> 'Unknown' AND Road_Surface_Conditions IS NOT NULL
AND Urban_or_Rural_Area <> 3)""").toPandas()

### 5. Test-train split

In [12]:
from sklearn.model_selection import train_test_split

X = pdDF.drop("Accident_severity", axis=1)
y = pdDF["Accident_severity"].values
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)


### 6. Linear regression, Decision Tree, Random Forest and SVM classification pipelines, all with one-hot encoding

In [14]:
%python
from sklearn.linear_model import LinearRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.svm import LinearSVC
from sklearn.ensemble import RandomForestClassifier

from sklearn.preprocessing import OneHotEncoder
from sklearn.pipeline import Pipeline
from sklearn import metrics
import numpy as np

# can't beat simply guessing the same class for all cases

# not sure how the one hot encoding works here. Does the function look for string columns automatically?
ohe = ("ohe", OneHotEncoder(handle_unknown="ignore"))
class1 = ("lr", LinearRegression(fit_intercept=True, normalize=True))
class2 = ("lr2", DecisionTreeClassifier(random_state=0, max_depth = 16))  # max_depth = 2,3 - all 3's output
# max_depth>15 - same as linear regressor
class3 = ("lr3", LinearSVC(random_state = 0, tol = 0.1))
class4 = ('lr4', RandomForestClassifier(random_state=0, max_depth = 16))

def classification(classifier,train_data, train_label, test_data, test_label):
  pipeline = Pipeline(steps = [ohe, classifier]).fit(train_data, train_label)
  pred_label = pipeline.predict(test_data)
  pred_label = np.asarray([int(np.round(x)) for x in pred_label], dtype = int)
  accuracy = metrics.accuracy_score(test_label, pred_label)
  return accuracy

print('Full dataset accuracy')
print('Accuracy - All same class: ' + str(np.round(metrics.accuracy_score(y_test, np.ones(len(y_test))*3),5)))
print('Accuracy - Lin. Reg.: ' + str(np.round(classification(class1,X_train, y_train, X_test, y_test),5)))
print('Accuracy - Dec. Tree: ' + str(np.round(classification(class2,X_train, y_train, X_test, y_test),5)))
print('Accuracy - SVM: ' + str(np.round(classification(class3,X_train, y_train, X_test, y_test),5)))
print('Accuracy - Ran. For.: ' + str(np.round(classification(class4,X_train, y_train, X_test, y_test),5)))



### 7. Try undersampling to balance the dataset (obtain roughly equal numbers of the three accident_severity classes)

In [16]:
import pandas as pd
g = pdDF.groupby('Accident_severity')
# does this undersample? How does it work? Seems to take random subset
g = pd.DataFrame(g.apply(lambda x: x.sample(g.size().min()).reset_index(drop=True)))
# undersampled dataset requires further exploration to check the size and composition

## Convert into Spark DataFrame
spark_g = spark.createDataFrame(g)
## Write Frame out as Table
spark_g.write.mode("overwrite").saveAsTable("under_sampled")

In [17]:
%sql
SELECT Accident_severity, Weather_conditions
FROM under_sampled


Accident_severity,Weather_conditions
1,Fine without high winds
1,Fine without high winds
1,Raining without high winds
1,Raining without high winds
1,Fine without high winds
1,Raining without high winds
1,Fine without high winds
1,Raining without high winds
1,Other
1,Fine without high winds


In [18]:
X_und = g.drop("Accident_severity", axis=1)
y_und = g["Accident_severity"].values
X_train_und, X_test_und, y_train_und, y_test_und = train_test_split(X_und, y_und, test_size=0.2, random_state=42)

In [19]:
print("Accuracy on undersmapled dataset")
print('Accuracy - All same class: ' + str(np.round(metrics.accuracy_score(y_test_und, np.ones(len(y_test_und))*3),5)))
print('Accuracy - Lin. Reg.: ' + str(np.round(classification(class1,X_train_und, y_train_und, X_test_und, y_test_und),5)))
print('Accuracy - Dec. Tree: ' + str(np.round(classification(class2,X_train_und, y_train_und, X_test_und, y_test_und),5)))
print('Accuracy - SVM: ' + str(np.round(classification(class3,X_train_und, y_train_und, X_test_und, y_test_und),5)))
print('Accuracy - Ran. For.: ' + str(np.round(classification(class4,X_train_und, y_train_und, X_test_und, y_test_und),5)))


print('')
print("Trained on undersampled dataset; tested on full dataset")
print('Accuracy - All same class: ' + str(np.round(metrics.accuracy_score(y_test, np.ones(len(y_test))*3),5)))
print('Accuracy - Lin. Reg.: ' + str(np.round(classification(class1,X_train_und, y_train_und, X_test, y_test),5)))
print('Accuracy - Dec. Tree: ' + str(np.round(classification(class2,X_train_und, y_train_und, X_test, y_test),5)))
print('Accuracy - SVM: ' + str(np.round(classification(class3,X_train_und, y_train_und, X_test, y_test),5)))
print('Accuracy - Ran For.: ' + str(np.round(classification(class4,X_train_und, y_train_und, X_test, y_test),5)))
