In [80]:
import findspark

In [81]:
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import LogisticRegression, DecisionTreeClassifier, RandomForestClassifier, NaiveBayes
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [82]:
spark = SparkSession.builder.appName("PySpark in Jupyter").config("spark.executor.memory", "4g").config("spark.driver.memory", "4g").getOrCreate()

In [83]:
path_to_data = "results/cleaned_accidents"
df = spark.read.csv(path_to_data, header=False, inferSchema=True)

                                                                                

In [84]:
df = df.toDF("ID", "Source", "Severity", "Start_Time", "End_Time", "Year", "Month", "Hour", "Start_Lat", "Start_Lng", "City", "State", "Temperature", "Humidity", "Pressure", "Visibility", "Wind_Speed", "Precipitation", "Weather_Category", "Traffic_Signal")

In [85]:
df = df.drop(*["ID", "Source", "Start_Time", "End_Time", "Start_Lat", "Start_Lng"])

### Logistic Regression Model 

In [86]:
string_cols = ["City", "State", "Weather_Category"]
stages = []

for col in string_cols:
    indexer = StringIndexer(inputCol=col, outputCol=f"{col}Index", handleInvalid="skip")
    stages += [indexer]

In [87]:
numerical_cols = ["Year", "Month", "Hour", "Temperature", "Humidity", "Pressure", "Visibility", "Wind_Speed", "Precipitation", "Traffic_Signal"]
feature_cols = [f"{col}Index" for col in string_cols] + numerical_cols

In [88]:
base_stages = stages.copy()
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
stages.append(assembler)

In [89]:
log_reg = LogisticRegression(featuresCol='features', labelCol='Severity', family="multinomial")
stages.append(log_reg)

In [90]:
pipeline = Pipeline(stages=stages)

In [91]:
train_data, test_data = df.randomSplit([0.8, 0.2])
test_data_dt = test_data.select("*")
test_data_nb = test_data.select("*")
test_data_rf = test_data.select("*")
lr_model = pipeline.fit(train_data)
predictions = lr_model.transform(test_data)

                                                                                

In [92]:
evaluator = MulticlassClassificationEvaluator(labelCol="Severity", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print(f"Accuracy: {accuracy}")



Accuracy: 0.7930930591114191


                                                                                

### Decision Tree Model with 5 Multiple Levels

In [93]:
decision_tree_stages = base_stages.copy()
feature_columns_dt = [f for f in feature_cols if f not in ["CityIndex"]]
assembler = VectorAssembler(inputCols=feature_columns_dt, outputCol="features")
dec_tree = DecisionTreeClassifier(featuresCol="features", labelCol="Severity", maxDepth=5, maxBins=100)
decision_tree_stages.append(assembler)
decision_tree_stages.append(dec_tree)

In [94]:
dec_tree_pipeline = Pipeline(stages=decision_tree_stages)

In [95]:
dec_tree_model = dec_tree_pipeline.fit(train_data)
dec_tree_predictions = dec_tree_model.transform(test_data_dt)

                                                                                

In [None]:
dt_evaluator = MulticlassClassificationEvaluator(
    labelCol="Severity", predictionCol="dt_prediction", metricName="accuracy"
)
dt_accuracy = evaluator.evaluate(dec_tree_predictions)

accuracy_dec_tree = evaluator.evaluate(dec_tree_predictions)
print(f"Accuracy: {accuracy_dec_tree}")

[Stage 596:>                                                      (0 + 16) / 17]

### Random Forest

In [38]:
random_forest_stages = base_stages.copy()
feature_columns_rf = [f for f in feature_cols if f not in ["CityIndex"]]
assembler = VectorAssembler(inputCols=feature_columns_rf, outputCol="features")
rf = RandomForestClassifier(featuresCol="features", labelCol="Severity", numTrees=20, maxDepth=8, maxBins=100)
random_forest_stages.append(assembler)
random_forest_stages.append(rf)

In [39]:
rf_pipeline = Pipeline(stages=random_forest_stages)
rf_model = rf_pipeline.fit(train_data)
rf_predictions = rf_model.transform(test_data_rf)

24/11/24 21:01:08 WARN MemoryStore: Not enough space to cache rdd_904_0 in memory! (computed 40.8 MiB so far)
24/11/24 21:01:08 WARN MemoryStore: Not enough space to cache rdd_904_4 in memory! (computed 40.8 MiB so far)
24/11/24 21:01:08 WARN BlockManager: Persisting block rdd_904_0 to disk instead.
24/11/24 21:01:08 WARN BlockManager: Persisting block rdd_904_4 to disk instead.
24/11/24 21:01:08 WARN MemoryStore: Not enough space to cache rdd_904_2 in memory! (computed 40.8 MiB so far)
24/11/24 21:01:08 WARN BlockManager: Persisting block rdd_904_2 to disk instead.
24/11/24 21:01:08 WARN MemoryStore: Not enough space to cache rdd_904_8 in memory! (computed 40.8 MiB so far)
24/11/24 21:01:08 WARN MemoryStore: Not enough space to cache rdd_904_12 in memory! (computed 27.0 MiB so far)
24/11/24 21:01:08 WARN BlockManager: Persisting block rdd_904_8 to disk instead.
24/11/24 21:01:08 WARN BlockManager: Persisting block rdd_904_12 to disk instead.
24/11/24 21:01:08 WARN MemoryStore: Not eno

In [40]:
rf_evaluator = MulticlassClassificationEvaluator(
    labelCol="Severity", predictionCol="prediction", metricName="accuracy"
)

rf_accuracy = rf_evaluator.evaluate(rf_predictions)
print(f"Random Forest Test Accuracy: {rf_accuracy}")

24/11/24 21:01:39 WARN DAGScheduler: Broadcasting large task binary with size 1056.1 KiB

Random Forest Test Accuracy: 0.8032697347456303


                                                                                

In [50]:
feature_importances_rf = rf_model.stages[-1].featureImportances
for index, feature in enumerate(feature_importances_rf):
    print(f"Feature {feature_columns_rf[index]} has importance: {feature}")

Feature StateIndex has importance: 0.29385319558828005
Feature Weather_CategoryIndex has importance: 0.011215113293415312
Feature Year has importance: 0.49111195753170095
Feature Month has importance: 0.03326176896560343
Feature Hour has importance: 0.020170429414216092
Feature Temperature has importance: 0.004615902113005496
Feature Humidity has importance: 0.0022515236284440646
Feature Pressure has importance: 0.018264757686120438
Feature Visibility has importance: 0.0005283423112059942
Feature Wind_Speed has importance: 0.00119354882974698
Feature Precipitation has importance: 0.0004653935243234539
Feature Traffic_Signal has importance: 0.12306806711393774


### Naive Bayes

In [77]:
train_data_nb = train_data.select("*")
assembler = VectorAssembler(inputCols=["Temperature"], outputCol="temp_vector")
train_data_nb = assembler.transform(train_data_nb)

In [78]:
naive_bayes_stages = base_stages.copy()
assembler = VectorAssembler(inputCols=feature_cols, outputCol="features")
nb = NaiveBayes(featuresCol="features", labelCol="Severity", modelType="multinomial")
naive_bayes_stages.append(assembler)
naive_bayes_stages.append(nb)

In [79]:
nb_pipeline = Pipeline(stages=naive_bayes_stages)
nb_model = nb_pipeline.fit(train_data_nb)
nb_predictions = nb_model.transform(test_data_nb)

24/11/24 21:35:32 ERROR Executor: Exception in task 3.0 in stage 439.0 (TID 4532)
java.lang.RuntimeException: Vector values MUST NOT be Negative, NaN or Infinity, but got [27.0,22.0,0.0,2019.0,3.0,1.0,-2.2,85.0,30.15,10.0,5.8,0.0,0.0]
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:165)
	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.<init>(ObjectAggregationIterator.scala:83)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfu

Py4JJavaError: An error occurred while calling o3409.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 439.0 failed 1 times, most recent failure: Lost task 3.0 in stage 439.0 (TID 4532) (172.31.204.238 executor driver): java.lang.RuntimeException: Vector values MUST NOT be Negative, NaN or Infinity, but got [27.0,22.0,0.0,2019.0,3.0,1.0,-2.2,85.0,30.15,10.0,5.8,0.0,0.0]
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:165)
	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.<init>(ObjectAggregationIterator.scala:83)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:114)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1$adapted(ObjectHashAggregateExec.scala:90)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:878)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:878)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2790)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2726)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2725)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2725)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1211)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1211)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1211)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2989)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2928)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2917)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: java.lang.RuntimeException: Vector values MUST NOT be Negative, NaN or Infinity, but got [27.0,22.0,0.0,2019.0,3.0,1.0,-2.2,85.0,30.15,10.0,5.8,0.0,0.0]
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.project_doConsume_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.processInputs(ObjectAggregationIterator.scala:165)
	at org.apache.spark.sql.execution.aggregate.ObjectAggregationIterator.<init>(ObjectAggregationIterator.scala:83)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1(ObjectHashAggregateExec.scala:114)
	at org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec.$anonfun$doExecute$1$adapted(ObjectHashAggregateExec.scala:90)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:878)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:878)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:331)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)


In [None]:
nb_evaluator = MulticlassClassificationEvaluator(
    labelCol="Severity", predictionCol="prediction", metricName="accuracy"
)
nb_accuracy = nb_evaluator.evaluate(nb_predictions)

print(f"Naive Bayes Test Accuracy: {nb_accuracy}")

24/11/24 21:35:32 WARN TaskSetManager: Lost task 9.0 in stage 439.0 (TID 4538) (172.31.204.238 executor driver): TaskKilled (Stage cancelled)
24/11/24 21:35:32 WARN TaskSetManager: Lost task 10.0 in stage 439.0 (TID 4539) (172.31.204.238 executor driver): TaskKilled (Stage cancelled)
24/11/24 21:35:32 WARN TaskSetManager: Lost task 15.0 in stage 439.0 (TID 4544) (172.31.204.238 executor driver): TaskKilled (Stage cancelled)
24/11/24 21:35:32 WARN TaskSetManager: Lost task 11.0 in stage 439.0 (TID 4540) (172.31.204.238 executor driver): TaskKilled (Stage cancelled)
24/11/24 21:35:32 WARN TaskSetManager: Lost task 12.0 in stage 439.0 (TID 4541) (172.31.204.238 executor driver): TaskKilled (Stage cancelled)
24/11/24 21:35:32 WARN TaskSetManager: Lost task 14.0 in stage 439.0 (TID 4543) (172.31.204.238 executor driver): TaskKilled (Stage cancelled)
24/11/24 21:35:32 WARN TaskSetManager: Lost task 13.0 in stage 439.0 (TID 4542) (172.31.204.238 executor driver): TaskKilled (Stage cancelled)
