In [1]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
from pyspark.sql import Row
import numpy as np
import pandas as pd
from pyspark.sql.types import *
from pyspark.sql.functions import *
import matplotlib.pyplot as plt
from pyspark.sql import functions as fn
from pyspark.ml import feature, regression, evaluation, Pipeline
import seaborn as sns
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.regression import LinearRegression
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import OneHotEncoderEstimator
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator,MulticlassClassificationEvaluator
from sklearn.metrics import classification_report
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.tuning import CrossValidator,ParamGridBuilder

In [2]:
# Do not delete or change this cell

import os

# Define a function to determine if we are running on data bricks
# Return true if running in the data bricks environment, false otherwise
def is_databricks():
    # get the databricks runtime version
    db_env = os.getenv("DATABRICKS_RUNTIME_VERSION")
    
    # if running on data bricks
    if db_env != None:
        return True
    else:
        return False

# Define a function to read the data file.  The full path data file name is constructed
# by checking runtime environment variables to determine if the runtime environment is 
# databricks, or a student's personal computer.  The full path file name is then
# constructed based on the runtime env.
# 
# Params
#   data_file_name: The base name of the data file to load
# 
# Returns the full path file name based on the runtime env
#
def get_training_filename(data_file_name):    
    # if running on data bricks
    if is_databricks():
        # build the full path file name assuming data brick env
        full_path_name = "/FileStore/tables/%s" % data_file_name
    # else the data is assumed to be in the same dir as this notebook
    else:
        # Assume the student is running on their own computer and load the data
        # file from the same dir as this notebook
        full_path_name = data_file_name
    
    # return the full path file name to the caller
    return full_path_name

In [3]:
# Sampling will only be performed on training data
# below we import the data
training_df=spark.read.csv(get_training_filename("USAccident_train_categorical.csv"),inferSchema=True,header=True)

# Balancing for Multiclass

**For balancing the multiclass data, we will oversample the class 4 as it has the least data and undersample class 2 with the most data.**

## Oversampling Target 4

In [4]:
# The class 4 will be oversampled in  such a way that the new number of rows of class 4 matches the number of class 3.
major_df = training_df.filter(col("Severity") == 3)
minor_df = training_df.filter(col("Severity") == 4)

In [5]:
# calculating the ratio of number of rows in class 3 by class 4
oversampling_ratio = int(major_df.count()/minor_df.count())

In [6]:
a=range(oversampling_ratio)

In [7]:
# storing the new oversampled data of class 4
oversampled_df = minor_df.withColumn("dummy", explode(array([lit(x) for x in a]))).drop('dummy')

## Undersampling Target 2

In [8]:
# The class 2 will be undersampled in  such a way that the new number of rows of class 2 matches the number of class 3.
major_df = training_df.filter(col("Severity") == 2)
minor_df = training_df.filter(col("Severity") == 3)

In [9]:
# calculating the ratio of number of rows in class 2 by class 3
ratio=int(major_df.count()/minor_df.count())

In [10]:
#Performs the undersampling
undersampled_df = major_df.sample(False, 1/ratio)

In [11]:
unsampled_class_data=training_df.filter(col("Severity") == 3)

In [12]:
# Combining the data to create our final dataset
temp_data=unsampled_class_data.unionAll(undersampled_df)
balanced_data=temp_data.unionAll(oversampled_df)

In [13]:
# saving in a csv file
balanced_data.toPandas().to_csv("USAccident_balanced_train_categorical.csv",index=False)

### One Hot encoding for balanced data

In [14]:
#list of all categorical columns
categorical_columns=['Source','Side','Wind_Direction','month_of_year','day_of_week',"TMC",'Sunrise_Sunset','Civil_Twilight',
                     'Nautical_Twilight','Astronomical_Twilight',"Hour"]

In [16]:
#list of columns to be one hot encoded
categorical_columns2=[i+"_Index"for i in categorical_columns]

In [17]:
# Creating dummies of categorical column
for category in categorical_columns2:
    categ = balanced_data.select(category).distinct().rdd.flatMap(lambda x:x).collect()
    exprs = [fn.when(fn.col(category) == cat,1).otherwise(0)\
                .alias(category+"_"+str(int(float(cat)))) for cat in categ]
    balanced_data = balanced_data.select(exprs+balanced_data.columns)

In [18]:
# Dropping all the original categorical columns
balanced_data=balanced_data.drop(*(categorical_columns2))

In [19]:
# From the n dummies made for each categorical column, dropping the nth dummy
balanced_data=balanced_data.drop(*([i+"_Index_0" for i in categorical_columns]))

In [20]:
# Saves as csv
balanced_data.toPandas().to_csv("USAccident_balanced_train_categorical_OHE.csv",index=False)

# Balancing for Binary Data

In [21]:
# binarizing the target variable
training_df_new=training_df.withColumn("Severity",fn.when(((training_df["Severity"]==1) | (training_df["Severity"]==2)),0).otherwise(1))

In [22]:
# The class 0 will be undersampled in  such a way that the new number of rows of class 0 matches the number of class 1.
major_df = training_df_new.filter(col("Severity") == 0)
minor_df = training_df_new.filter(col("Severity") == 1)

In [23]:
ratio=int(major_df.count()/minor_df.count())

In [24]:
# performs the undersampling
undersampled_df = major_df.sample(False, 1/ratio)

In [25]:
# Combining the data to create the balanced dataset for binary output
balanced_data_binary=training_df_new.filter(col("Severity") == 1).unionAll(undersampled_df)

In [26]:
balanced_data_binary.toPandas().to_csv("USAccident_balanced_train_binary.csv",index=False)

### One Hot Encoding for balanced data

In [27]:
#list of all categorical columns
categorical_columns=['Source','Side','Wind_Direction','month_of_year','day_of_week',"TMC",'Sunrise_Sunset','Civil_Twilight',
                     'Nautical_Twilight','Astronomical_Twilight',"Hour"]

In [29]:
#list of columns to be one hot encoded
categorical_columns2=[i+"_Index"for i in categorical_columns]

In [32]:
# Creating dummies of categorical column
for category in categorical_columns2:
    categ = balanced_data_binary.select(category).distinct().rdd.flatMap(lambda x:x).collect()
    exprs = [fn.when(fn.col(category) == cat,1).otherwise(0)\
                .alias(category+"_"+str(int(float(cat)))) for cat in categ]
    balanced_data_binary = balanced_data_binary.select(exprs+balanced_data_binary.columns)

In [33]:
# Dropping all the original categorical columns
balanced_data_binary=balanced_data_binary.drop(*(categorical_columns2))

In [34]:
# From the n dummies made for each categorical column, dropping the nth dummy
balanced_data_binary=balanced_data_binary.drop(*([i+"_Index_0" for i in categorical_columns]))

In [35]:
# Saves as csv
balanced_data_binary.toPandas().to_csv("USAccident_balanced_train_binary_OHE.csv",index=False)