<font size="5">**Part 3: Prediction of Accident Severity**</font>

<font size = "4">**Objectives:**</font>

•	Develop sophisticated predictive models that can accurately forecast the severity of traffic accidents based on a multifaceted set of factors, including location, time of day, weather conditions, and types of vehicles involved. This initiative aims to leverage machine learning algorithms to anticipate the potential impact of accidents.

•	Employ machine learning techniques to categorize crashes into distinct severity levels. This classification will aid emergency response teams in prioritizing incidents and optimizing resource deployment, ultimately enhancing the efficiency and effectiveness of emergency services.


<font size="5">**Install and update the libraries**</font>

In [1]:
# Install Java
!apt-get install openjdk-11-jdk-headless -qq > /dev/null

# Download Spark
!wget -q https://dlcdn.apache.org/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz

# Unzip the Spark archive
!tar xf spark-3.5.1-bin-hadoop3.tgz

# Install findspark - a library that makes it easier to locate Spark
!pip install -q findspark

!pip install --upgrade seaborn

Collecting seaborn
  Downloading seaborn-0.13.2-py3-none-any.whl (294 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m294.9/294.9 kB[0m [31m6.6 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: seaborn
  Attempting uninstall: seaborn
    Found existing installation: seaborn 0.13.1
    Uninstalling seaborn-0.13.1:
      Successfully uninstalled seaborn-0.13.1
Successfully installed seaborn-0.13.2


<font size="5">**Mount the google drive for data loading**</font>

In [2]:
from google.colab import drive
import os
import findspark

drive.mount('/content/drive')

# Set Java and Spark home based on the installation paths
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"


Mounted at /content/drive


<font size="5">**Start the SparkSession and load the data into it**</font>

In [3]:
import findspark
findspark.init()

from pyspark.sql import SparkSession

# Now you can create a SparkSession without the FileNotFoundError
spark = SparkSession.builder \
    .appName("NYC Motor Vehicle Collisions Analysis") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .master("local[*]") \
    .getOrCreate()
data_path = "/content/drive/MyDrive/Motor_Vehicle_Collisions_-_Crashes.csv"
df = spark.read.csv(data_path, header=True, inferSchema=True)
df.printSchema()

root
 |-- CRASH DATE: string (nullable = true)
 |-- CRASH TIME: string (nullable = true)
 |-- BOROUGH: string (nullable = true)
 |-- ZIP CODE: string (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)
 |-- LOCATION: string (nullable = true)
 |-- ON STREET NAME: string (nullable = true)
 |-- CROSS STREET NAME: string (nullable = true)
 |-- OFF STREET NAME: string (nullable = true)
 |-- NUMBER OF PERSONS INJURED: string (nullable = true)
 |-- NUMBER OF PERSONS KILLED: integer (nullable = true)
 |-- NUMBER OF PEDESTRIANS INJURED: integer (nullable = true)
 |-- NUMBER OF PEDESTRIANS KILLED: integer (nullable = true)
 |-- NUMBER OF CYCLIST INJURED: integer (nullable = true)
 |-- NUMBER OF CYCLIST KILLED: string (nullable = true)
 |-- NUMBER OF MOTORIST INJURED: string (nullable = true)
 |-- NUMBER OF MOTORIST KILLED: integer (nullable = true)
 |-- CONTRIBUTING FACTOR VEHICLE 1: string (nullable = true)
 |-- CONTRIBUTING FACTOR VEHICLE 2: strin

<font size="5">**Standardize the features**</font>

In [4]:
from pyspark.sql.functions import col, count, when

# Standardize the vehicle type
replacement_dict = {
    "SPORT UTILITY / STATION WAGON": "Station Wagon/Sport Utility Vehicle",
    "PICK-UP TRUCK": "Pick-up Truck",
    "TAXI": "Taxi",
    "4 dr sedan": "Sedan"
}

for original_value, new_value in replacement_dict.items():
    df = df.withColumn("VEHICLE TYPE CODE 1",
                       when(col("VEHICLE TYPE CODE 1") == original_value, new_value)
                       .otherwise(col("VEHICLE TYPE CODE 1")))
    df = df.withColumn("VEHICLE TYPE CODE 2",
                       when(col("VEHICLE TYPE CODE 2") == original_value, new_value)
                       .otherwise(col("VEHICLE TYPE CODE 2")))

In [5]:
from pyspark.sql.functions import to_timestamp, concat, lit, col, year, month, hour, date_format


# "CRASH DATE" is in the format "MM/dd/yyyy" and "CRASH TIME" is in "HH:mm"
# Concatenate date and time into a single column with a space separator
df = df.withColumn("CRASH DATE TIME", to_timestamp(concat(col("CRASH DATE"), lit(" "), col("CRASH TIME")), 'MM/dd/yyyy HH:mm'))

# Now proceed to extract components as previously shown
df = df.withColumn("CRASH YEAR", year("CRASH DATE TIME"))
df = df.withColumn("CRASH MONTH", month("CRASH DATE TIME"))
df = df.withColumn("CRASH MONTH NAME", date_format("CRASH DATE TIME", 'MMM'))
df = df.withColumn("CRASH HOUR", hour("CRASH DATE TIME"))
df = df.withColumn("CRASH WEEK", date_format("CRASH DATE TIME", 'E'))

# Show the results to verify
# df.select("CRASH DATE TIME", "CRASH YEAR", "CRASH MONTH", "CRASH HOUR", "CRASH WEEK").show(5)


In [6]:
# Assuming your Spark session is already created and DataFrame is loaded as `df`
df = df.withColumn(
    "CRASH SEASON",
    when((col("CRASH MONTH") >= 3) & (col("CRASH MONTH") <= 5), "Spring")
    .when((col("CRASH MONTH") >= 6) & (col("CRASH MONTH") <= 8), "Summer")
    .when((col("CRASH MONTH") >= 9) & (col("CRASH MONTH") <= 11), "Autumn")
    .otherwise("Winter")
)

df.select("CRASH MONTH", "CRASH SEASON").show()

+-----------+------------+
|CRASH MONTH|CRASH SEASON|
+-----------+------------+
|          9|      Autumn|
|          3|      Spring|
|          6|      Summer|
|          9|      Autumn|
|         12|      Winter|
|          4|      Spring|
|         12|      Winter|
|         12|      Winter|
|         12|      Winter|
|         12|      Winter|
|         12|      Winter|
|         12|      Winter|
|         12|      Winter|
|         12|      Winter|
|         12|      Winter|
|         12|      Winter|
|         12|      Winter|
|         12|      Winter|
|         12|      Winter|
|         12|      Winter|
+-----------+------------+
only showing top 20 rows



In [7]:
# Proceed hour into daytime for later onehot-encoding
df = df.withColumn(
    "CRASH DAYTIME",
    when((col("CRASH HOUR") >= 6) & (col("CRASH HOUR") < 13), "Morning")
    .when((col("CRASH HOUR") >= 13) & (col("CRASH HOUR") < 19), "Afternoon")
    .when((col("CRASH HOUR") >= 19) & (col("CRASH HOUR") <= 23), "Evening")
    .otherwise("Early Morning")
)

df.select("CRASH HOUR", "CRASH DAYTIME").show()

+----------+-------------+
|CRASH HOUR|CRASH DAYTIME|
+----------+-------------+
|         2|Early Morning|
|        11|      Morning|
|         6|      Morning|
|         9|      Morning|
|         8|      Morning|
|        12|      Morning|
|        17|    Afternoon|
|         8|      Morning|
|        21|      Evening|
|        14|    Afternoon|
|         0|Early Morning|
|        16|    Afternoon|
|         8|      Morning|
|         0|Early Morning|
|        23|      Evening|
|        17|    Afternoon|
|        20|      Evening|
|         1|Early Morning|
|        19|      Evening|
|        14|    Afternoon|
+----------+-------------+
only showing top 20 rows



In [8]:
retain = ["CRASH DAYTIME", "CRASH WEEK", "CRASH SEASON","LATITUDE", "LONGITUDE", "NUMBER OF PERSONS INJURED", "NUMBER OF PERSONS KILLED",
          "CONTRIBUTING FACTOR VEHICLE 1", "CONTRIBUTING FACTOR VEHICLE 2",
          "VEHICLE TYPE CODE 1", "VEHICLE TYPE CODE 2"]

# 选择数据
nyc_traffic_collisions_analysis = df.select(*retain)

# 删除重复数据
nyc_traffic_collisions_analysis = nyc_traffic_collisions_analysis.dropDuplicates()

# 使用printSchema()来查看结构，使用describe().show()来查看统计信息，或者使用count()来看行数
nyc_traffic_collisions_analysis.printSchema()
# nyc_traffic_collisions_analysis.describe().show()
# print("Total rows:", nyc_traffic_collisions_analysis.count())

root
 |-- CRASH DAYTIME: string (nullable = false)
 |-- CRASH WEEK: string (nullable = true)
 |-- CRASH SEASON: string (nullable = false)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)
 |-- NUMBER OF PERSONS INJURED: string (nullable = true)
 |-- NUMBER OF PERSONS KILLED: integer (nullable = true)
 |-- CONTRIBUTING FACTOR VEHICLE 1: string (nullable = true)
 |-- CONTRIBUTING FACTOR VEHICLE 2: string (nullable = true)
 |-- VEHICLE TYPE CODE 1: string (nullable = true)
 |-- VEHICLE TYPE CODE 2: string (nullable = true)



In [9]:
# nyc_traffic_collisions_analysis.select("VEHICLE TYPE CODE 1").distinct().count()

In [10]:
# nyc_traffic_collisions_analysis.select("VEHICLE TYPE CODE 2").distinct().count()

In [11]:
# nyc_traffic_collisions_analysis.select("CONTRIBUTING FACTOR VEHICLE 1").distinct().count()

In [12]:
# nyc_traffic_collisions_analysis.select("CONTRIBUTING FACTOR VEHICLE 2").distinct().count()

In [13]:
# Create one more column for evaluating the severity
from pyspark.sql.types import IntegerType
nyc_traffic_collisions_analysis = nyc_traffic_collisions_analysis.withColumn("NUMBER OF PERSONS INJURED", col("NUMBER OF PERSONS INJURED").cast(IntegerType()))
nyc_traffic_collisions_analysis = nyc_traffic_collisions_analysis.withColumn(
    "SEVERITY",
    # if no people injured or killed
    when((col("NUMBER OF PERSONS INJURED") == 0) & (col("NUMBER OF PERSONS KILLED") == 0), "No People Loss")

    # if no people die and only small amount of injured
    .when((((col("NUMBER OF PERSONS INJURED") > 0) & (col("NUMBER OF PERSONS INJURED") < 5)) & (col("NUMBER OF PERSONS KILLED") == 0)), "Light Severity")

    # if people killed or injured amount larger than 5
    .when((((10 > col("NUMBER OF PERSONS INJURED")) & (col("NUMBER OF PERSONS INJURED") > 5)) & ((col("NUMBER OF PERSONS KILLED") < 5) & (col("NUMBER OF PERSONS KILLED") >= 0))) | \
     ((col("NUMBER OF PERSONS KILLED") < 5) & (col("NUMBER OF PERSONS KILLED") >= 0)), "Medium Severity")

    .otherwise("Heavy Severity")
)

# nyc_traffic_collisions_analysis.select("NUMBER OF PERSONS INJURED", "NUMBER OF PERSONS KILLED", "SEVERITY").show(20)

In [14]:
# nyc_traffic_collisions_analysis.show()

<font size="5">**Process the data with NA values**</font>

* In this part, we need to decide whether the data should be drop or not as some of them just have only one factor, therefore use "unspecified" will be better than just drop them.
* In contrast, if both two contributing factors and vehicle codes are null(unspecified), drop it.

In [15]:
# Proceed with NA value

# These are accidents that no any info for vehicle, dropped later
nyc_traffic_collisions_analysis_NA = nyc_traffic_collisions_analysis.filter((nyc_traffic_collisions_analysis["CONTRIBUTING FACTOR VEHICLE 1"].isNull() == True) \
                                                                         & (nyc_traffic_collisions_analysis["CONTRIBUTING FACTOR VEHICLE 2"].isNull()==True) \
                                                                         & (nyc_traffic_collisions_analysis["VEHICLE TYPE CODE 1"].isNull()==True) & \
                                                                         (nyc_traffic_collisions_analysis["VEHICLE TYPE CODE 2"].isNull()==True))
nyc_traffic_collisions_analysis_NA.count()
nyc_traffic_collisions_analysis_NAProcessed = nyc_traffic_collisions_analysis.subtract(nyc_traffic_collisions_analysis_NA)
nyc_traffic_collisions_analysis_NAProcessed = nyc_traffic_collisions_analysis_NAProcessed.dropna(subset=['LATITUDE','LONGITUDE'])
nyc_traffic_collisions_analysis_NAProcessed.count()

1834112

In [16]:
# filled NA value for Unspecified
nyc_traffic_collisions_analysis_NAProcessed_2 = nyc_traffic_collisions_analysis_NAProcessed.fillna({"CONTRIBUTING FACTOR VEHICLE 1": "Unspecified",\
                        "CONTRIBUTING FACTOR VEHICLE 2": "Unspecified",
                        "VEHICLE TYPE CODE 1": "Unspecified",
                        "VEHICLE TYPE CODE 2": "Unspecified"})

In [17]:
nyc_traffic_collisions_analysis_NAProcessed_2 = nyc_traffic_collisions_analysis_NAProcessed_2.filter((nyc_traffic_collisions_analysis_NAProcessed_2["CONTRIBUTING FACTOR VEHICLE 1"] != "Unspecified") \
                                                                         & (nyc_traffic_collisions_analysis_NAProcessed_2["CONTRIBUTING FACTOR VEHICLE 2"] != "Unspecified") \
                                                                         & (nyc_traffic_collisions_analysis_NAProcessed_2["VEHICLE TYPE CODE 1"] != "Unspecified") & \
                                                                         (nyc_traffic_collisions_analysis_NAProcessed_2["VEHICLE TYPE CODE 2"] != "Unspecified"))

<font size="5">**Assemble the onehot-encoder**</font>
* In this part, we need to convert string attributes into the string index, and then use the onehotencoder to assemble them for later learning.

In [18]:
from pyspark.ml.feature import OneHotEncoder, StringIndexer
# We need to use one hot to get the season, weekday, daytime, vehicle, and severity
# For efficiency, drop the origin one after converted one

# CRASH DAYTIME
crash_daytime_indexer = StringIndexer(inputCol = "CRASH DAYTIME", outputCol = "CRASH DAYTIME INDEX").fit(nyc_traffic_collisions_analysis_NAProcessed_2)
nyc_traffic_collisions_analysis_NAProcessed_2 = crash_daytime_indexer.transform(nyc_traffic_collisions_analysis_NAProcessed_2)
crash_daytime_encoder = OneHotEncoder(inputCol = "CRASH DAYTIME INDEX", outputCol = "CRASH DAYTIME VECTOR")
ohe = crash_daytime_encoder.fit(nyc_traffic_collisions_analysis_NAProcessed_2)
nyc_traffic_collisions_analysis_NAProcessed_2 = ohe.transform(nyc_traffic_collisions_analysis_NAProcessed_2)
nyc_traffic_collisions_analysis_NAProcessed_2 = nyc_traffic_collisions_analysis_NAProcessed_2.drop("CRASH DAYTIME")

# CRASH WEEK
crash_week_indexer = StringIndexer(inputCol = "CRASH WEEK", outputCol = "CRASH WEEK INDEX").fit(nyc_traffic_collisions_analysis_NAProcessed_2)
nyc_traffic_collisions_analysis_NAProcessed_2 = crash_week_indexer.transform(nyc_traffic_collisions_analysis_NAProcessed_2)
crash_week_encoder = OneHotEncoder(inputCol = "CRASH WEEK INDEX", outputCol = "CRASH WEEK VECTOR")
ohe = crash_week_encoder.fit(nyc_traffic_collisions_analysis_NAProcessed_2)
nyc_traffic_collisions_analysis_NAProcessed_2 = ohe.transform(nyc_traffic_collisions_analysis_NAProcessed_2)
nyc_traffic_collisions_analysis_NAProcessed_2 = nyc_traffic_collisions_analysis_NAProcessed_2.drop("CRASH WEEK")

# CRASH SEASON
crash_season_indexer = StringIndexer(inputCol = "CRASH SEASON", outputCol = "CRASH SEASON INDEX").fit(nyc_traffic_collisions_analysis_NAProcessed_2)
nyc_traffic_collisions_analysis_NAProcessed_2 = crash_season_indexer.transform(nyc_traffic_collisions_analysis_NAProcessed_2)
crash_season_encoder = OneHotEncoder(inputCol = "CRASH SEASON INDEX", outputCol = "CRASH SEASON VECTOR")
ohe = crash_season_encoder.fit(nyc_traffic_collisions_analysis_NAProcessed_2)
nyc_traffic_collisions_analysis_NAProcessed_2 = ohe.transform(nyc_traffic_collisions_analysis_NAProcessed_2)
nyc_traffic_collisions_analysis_NAProcessed_2 = nyc_traffic_collisions_analysis_NAProcessed_2.drop("CRASH SEASON")

# CONTRIBUTING FACTOR VEHICLE 1
cfv1_indexer = StringIndexer(inputCol = "CONTRIBUTING FACTOR VEHICLE 1", outputCol = "CONTRIBUTING FACTOR VEHICLE 1 INDEX").fit(nyc_traffic_collisions_analysis_NAProcessed_2)
nyc_traffic_collisions_analysis_NAProcessed_2 = cfv1_indexer.transform(nyc_traffic_collisions_analysis_NAProcessed_2)
cfv1_encoder = OneHotEncoder(inputCol = "CONTRIBUTING FACTOR VEHICLE 1 INDEX", outputCol = "CONTRIBUTING FACTOR VEHICLE 1 VECTOR")
ohe = cfv1_encoder.fit(nyc_traffic_collisions_analysis_NAProcessed_2)
nyc_traffic_collisions_analysis_NAProcessed_2 = ohe.transform(nyc_traffic_collisions_analysis_NAProcessed_2)
nyc_traffic_collisions_analysis_NAProcessed_2 = nyc_traffic_collisions_analysis_NAProcessed_2.drop("CONTRIBUTING FACTOR VEHICLE 1")

# CONTRIBUTING FACTOR VEHICLE 2
cfv2_indexer = StringIndexer(inputCol = "CONTRIBUTING FACTOR VEHICLE 2", outputCol = "CONTRIBUTING FACTOR VEHICLE 2 INDEX").fit(nyc_traffic_collisions_analysis_NAProcessed_2)
nyc_traffic_collisions_analysis_NAProcessed_2 = cfv2_indexer.transform(nyc_traffic_collisions_analysis_NAProcessed_2)
cfv2_encoder = OneHotEncoder(inputCol = "CONTRIBUTING FACTOR VEHICLE 2 INDEX", outputCol = "CONTRIBUTING FACTOR VEHICLE 2 VECTOR")
ohe = cfv2_encoder.fit(nyc_traffic_collisions_analysis_NAProcessed_2)
nyc_traffic_collisions_analysis_NAProcessed_2 = ohe.transform(nyc_traffic_collisions_analysis_NAProcessed_2)
nyc_traffic_collisions_analysis_NAProcessed_2 = nyc_traffic_collisions_analysis_NAProcessed_2.drop("CONTRIBUTING FACTOR VEHICLE 2")

# VEHICLE TYPE CODE 1
vtc1_indexer = StringIndexer(inputCol = "VEHICLE TYPE CODE 1", outputCol = "VEHICLE TYPE CODE 1 INDEX").fit(nyc_traffic_collisions_analysis_NAProcessed_2)
nyc_traffic_collisions_analysis_NAProcessed_2 = vtc1_indexer.transform(nyc_traffic_collisions_analysis_NAProcessed_2)
vtc1_encoder = OneHotEncoder(inputCol = "VEHICLE TYPE CODE 1 INDEX", outputCol = "VEHICLE TYPE CODE 1 VECTOR")
ohe = vtc1_encoder.fit(nyc_traffic_collisions_analysis_NAProcessed_2)
nyc_traffic_collisions_analysis_NAProcessed_2 = ohe.transform(nyc_traffic_collisions_analysis_NAProcessed_2)
nyc_traffic_collisions_analysis_NAProcessed_2 = nyc_traffic_collisions_analysis_NAProcessed_2.drop("VEHICLE TYPE CODE 1")

# VEHICLE TYPE CODE 2
vtc2_indexer = StringIndexer(inputCol = "VEHICLE TYPE CODE 2", outputCol = "VEHICLE TYPE CODE 2 INDEX").fit(nyc_traffic_collisions_analysis_NAProcessed_2)
nyc_traffic_collisions_analysis_NAProcessed_2 = vtc2_indexer.transform(nyc_traffic_collisions_analysis_NAProcessed_2)
vtc2_encoder = OneHotEncoder(inputCol = "VEHICLE TYPE CODE 2 INDEX", outputCol = "VEHICLE TYPE CODE 2 VECTOR")
ohe = vtc2_encoder.fit(nyc_traffic_collisions_analysis_NAProcessed_2)
nyc_traffic_collisions_analysis_NAProcessed_2 = ohe.transform(nyc_traffic_collisions_analysis_NAProcessed_2)
nyc_traffic_collisions_analysis_NAProcessed_2 = nyc_traffic_collisions_analysis_NAProcessed_2.drop("VEHICLE TYPE CODE 2")

# SEVERITY
severity_indexer = StringIndexer(inputCol = "SEVERITY", outputCol = "SEVERITY INDEX").fit(nyc_traffic_collisions_analysis_NAProcessed_2)
nyc_traffic_collisions_analysis_NAProcessed_2 = severity_indexer.transform(nyc_traffic_collisions_analysis_NAProcessed_2)
severity_encoder = OneHotEncoder(inputCol = "SEVERITY INDEX", outputCol = "SEVERITY VECTOR")
ohe = severity_encoder.fit(nyc_traffic_collisions_analysis_NAProcessed_2)
nyc_traffic_collisions_analysis_NAProcessed_2 = ohe.transform(nyc_traffic_collisions_analysis_NAProcessed_2)
# nyc_traffic_collisions_analysis_NAProcessed_2 = nyc_traffic_collisions_analysis_NAProcessed_2.drop("VEHICLE TYPE CODE 2")

# nyc_traffic_collisions_analysis_NAProcessed_2.show(3)

<font size="5">**Drop unnecessary columns and assemble them into one vector**</font>

In [19]:
# Extract the needed ones foe later training
needed = ["CRASH DAYTIME VECTOR", "CRASH WEEK VECTOR", "CRASH SEASON VECTOR", "LATITUDE", "LONGITUDE",\
          "CONTRIBUTING FACTOR VEHICLE 1 VECTOR", "CONTRIBUTING FACTOR VEHICLE 2 VECTOR", "VEHICLE TYPE CODE 1 VECTOR", "VEHICLE TYPE CODE 2 VECTOR","SEVERITY INDEX"]
nyc_process_data = nyc_traffic_collisions_analysis_NAProcessed_2.select(*needed)
# nyc_process_data.show()

In [20]:
from pyspark.ml.feature import VectorAssembler
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType
nyc_assembler = VectorAssembler(inputCols=["CRASH DAYTIME VECTOR", "CRASH WEEK VECTOR", "CRASH SEASON VECTOR", "LATITUDE", "LONGITUDE",\
          "CONTRIBUTING FACTOR VEHICLE 1 VECTOR", "CONTRIBUTING FACTOR VEHICLE 2 VECTOR", "VEHICLE TYPE CODE 1 VECTOR", "VEHICLE TYPE CODE 2 VECTOR"], outputCol="features")
nyc_process_data = nyc_assembler.transform(nyc_process_data)

nyc_process_data.select(['features','SEVERITY INDEX']).show(10)
# # UDF
# get_vector_size = udf(lambda vector: len(vector), IntegerType())
# nyc_process_data_with_size = nyc_process_data.withColumn("feature_size", get_vector_size("features"))
# nyc_process_data_with_size.select("feature_size").show()

+--------------------+--------------+
|            features|SEVERITY INDEX|
+--------------------+--------------+
|(1087,[1,6,9,12,1...|           0.0|
|(1087,[1,4,11,12,...|           1.0|
|(1087,[0,5,10,12,...|           1.0|
|(1087,[1,3,12,13,...|           0.0|
|(1087,[1,7,11,12,...|           0.0|
|(1087,[1,3,9,12,1...|           0.0|
|(1087,[1,8,10,12,...|           0.0|
|(1087,[0,4,12,13,...|           1.0|
|(1087,[0,4,11,12,...|           1.0|
|(1087,[0,6,9,12,1...|           0.0|
+--------------------+--------------+
only showing top 10 rows



In [21]:
model_df = nyc_process_data.select(['features', 'SEVERITY INDEX'])
train_df, test_df = model_df.randomSplit([0.8, 0.2])
print(train_df.count())

191018


<font size="5">**Method: Random Forest**</font>
* The reason to choose the random forest is that it has a relatively great performance for the multi-class classification job
* After trying different hyperparameters, there is only a little difference for the performance. The key of improving performance is to changing the choose of  the valid features.

In [26]:
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
num_Trees = [50]
maxDepths = [6]
impurities = ["gini"]
seed=17721487907

for num_Tree in num_Trees:
  for maxDepth in maxDepths:
    for impurity in impurities:
      print(f"Start Process -- numTrees: {num_Tree}, maxDepths: {maxDepth}, impurity: {impurity}")
      rf_classifier = RandomForestClassifier(labelCol='SEVERITY INDEX', maxDepth = maxDepth ,maxMemoryInMB=512, impurity=impurity, numTrees = num_Tree, seed = seed).fit(train_df)

      # Test
      rf_prediction = rf_classifier.transform(test_df)
      rf_accuracy = MulticlassClassificationEvaluator(labelCol = "SEVERITY INDEX", metricName = "accuracy").evaluate(rf_prediction)
      print(f" Result of numTrees: {num_Tree}, maxDepths: {maxDepth}, impurity: {impurity}")
      print("The accuracy of RF on test data is {0:.0%}".format(rf_accuracy))

      rf_precision = MulticlassClassificationEvaluator(labelCol = "SEVERITY INDEX", metricName = "weightedPrecision").evaluate(rf_prediction)
      print("The precision rate on test data is {0:.0%}".format(rf_precision))
      print("-------------------------------------------------------------------------")

Start Process -- numTrees: 50, maxDepths: 6, impurity: gini
 Result of numTrees: 50, maxDepths: 6, impurity: gini
The accuracy of RF on test data is 76%
The precision rate on test data is 78%
-------------------------------------------------------------------------


In [23]:
# from pyspark.ml.classification import MultilayerPerceptronClassifier
# from pyspark.ml.evaluation import MulticlassClassificationEvaluator


# # specify layers for the neural network:
# # input layer of size 4 (features), two intermediate of size 10 and 9
# # and output of size 4 (classes)
# layers = [1087,256,64,4]

# # create the trainer and set its parameters
# trainer = MultilayerPerceptronClassifier(labelCol="SEVERITY INDEX", maxIter=100, layers=layers, blockSize=128, seed=1234)

# # train the model
# model = trainer.fit(train_df)

# # # compute accuracy on the test set
# result = model.transform(test_df)

# predictionAndLabels = result.select("prediction", "SEVERITY INDEX")
# evaluator = MulticlassClassificationEvaluator(labelCol = "SEVERITY INDEX",metricName="accuracy")
# print("Test set accuracy = " + str(evaluator.evaluate(predictionAndLabels)))

In [24]:
# from pyspark.ml.classification import GBTClassifier
# gbt_classifier = GBTClassifier(featuresCol='features',labelCol='SEVERITY INDEX', seed = seed).fit(train_df)

# gbt_prediction = gbt_classifier.transform(test_df)
# gbt_accuracy = MulticlassClassificationEvaluator(labelCol = "SEVERITY INDEX", metricName = "accuracy").evaluate(gbt_prediction)
# print("The accuracy of RF on test data is {0:.0%}".format(rf_accuracy))

# gbt_precision = MulticlassClassificationEvaluator(labelCol = "SEVERITY INDEX", metricName = "weightedPrecision").evaluate(gbt_prediction)
# print("The precision rate on test data is {0:.0%}".format(rf_precision))
# print("-------------------------------------------------------------------------")

In [25]:
# from pyspark.ml.evaluation import MulticlassClassificationEvaluator

# rf_prediction = rf_classifier.transform(test_df)
# rf_accuracy = MulticlassClassificationEvaluator(labelCol = "SEVERITY INDEX", metricName = "accuracy").evaluate(rf_prediction)
# print("The accuracy of RF on test data is {0:.0%}".format(rf_accuracy))

# rf_precision = MulticlassClassificationEvaluator(labelCol = "SEVERITY INDEX", metricName = "weightedPrecision").evaluate(rf_prediction)
# print("The precision rate on test data is {0:.0%}".format(rf_precision))