In [1]:
spark

In [2]:
from pyspark.sql.functions import col, isnan, isnull, when, count, udf, mean
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType
import matplotlib.pyplot as plt
import numpy as np

In [21]:
#Set up the path
bucket = 'my-bigdata-project-sh'
filename = '/landing/Motor_Vehicle_Collisions_-_Crashes_20240213 (1).csv'
file_path = 'gs://' + bucket + filename
# Read data from GCS bucket to spark DF
sdf = spark.read.csv(file_path,header=True)

In [22]:
sdf.printSchema()

root
 |-- CRASH DATE: string (nullable = true)
 |-- CRASH TIME: string (nullable = true)
 |-- BOROUGH: string (nullable = true)
 |-- ZIP CODE: string (nullable = true)
 |-- LATITUDE: string (nullable = true)
 |-- LONGITUDE: string (nullable = true)
 |-- LOCATION: string (nullable = true)
 |-- ON STREET NAME: string (nullable = true)
 |-- CROSS STREET NAME: string (nullable = true)
 |-- OFF STREET NAME: string (nullable = true)
 |-- NUMBER OF PERSONS INJURED: string (nullable = true)
 |-- NUMBER OF PERSONS KILLED: string (nullable = true)
 |-- NUMBER OF PEDESTRIANS INJURED: string (nullable = true)
 |-- NUMBER OF PEDESTRIANS KILLED: string (nullable = true)
 |-- NUMBER OF CYCLIST INJURED: string (nullable = true)
 |-- NUMBER OF CYCLIST KILLED: string (nullable = true)
 |-- NUMBER OF MOTORIST INJURED: string (nullable = true)
 |-- NUMBER OF MOTORIST KILLED: string (nullable = true)
 |-- CONTRIBUTING FACTOR VEHICLE 1: string (nullable = true)
 |-- CONTRIBUTING FACTOR VEHICLE 2: string (nu

In [23]:
sdf.show()

24/05/18 03:19:13 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+----------+----------+---------+--------+---------+----------+--------------------+--------------------+--------------------+--------------------+-------------------------+------------------------+-----------------------------+----------------------------+-------------------------+------------------------+--------------------------+-------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+-----------------------------+------------+--------------------+--------------------+-------------------+-------------------+-------------------+
|CRASH DATE|CRASH TIME|  BOROUGH|ZIP CODE| LATITUDE| LONGITUDE|            LOCATION|      ON STREET NAME|   CROSS STREET NAME|     OFF STREET NAME|NUMBER OF PERSONS INJURED|NUMBER OF PERSONS KILLED|NUMBER OF PEDESTRIANS INJURED|NUMBER OF PEDESTRIANS KILLED|NUMBER OF CYCLIST INJURED|NUMBER OF CYCLIST KILLED|NUMBER OF MOTORIST INJURED|NUMBER OF MOTORIST KILLED|CONTRIBUTING

In [24]:
sdf.columns

['CRASH DATE',
 'CRASH TIME',
 'BOROUGH',
 'ZIP CODE',
 'LATITUDE',
 'LONGITUDE',
 'LOCATION',
 'ON STREET NAME',
 'CROSS STREET NAME',
 'OFF STREET NAME',
 'NUMBER OF PERSONS INJURED',
 'NUMBER OF PERSONS KILLED',
 'NUMBER OF PEDESTRIANS INJURED',
 'NUMBER OF PEDESTRIANS KILLED',
 'NUMBER OF CYCLIST INJURED',
 'NUMBER OF CYCLIST KILLED',
 'NUMBER OF MOTORIST INJURED',
 'NUMBER OF MOTORIST KILLED',
 'CONTRIBUTING FACTOR VEHICLE 1',
 'CONTRIBUTING FACTOR VEHICLE 2',
 'CONTRIBUTING FACTOR VEHICLE 3',
 'CONTRIBUTING FACTOR VEHICLE 4',
 'CONTRIBUTING FACTOR VEHICLE 5',
 'COLLISION_ID',
 'VEHICLE TYPE CODE 1',
 'VEHICLE TYPE CODE 2',
 'VEHICLE TYPE CODE 3',
 'VEHICLE TYPE CODE 4',
 'VEHICLE TYPE CODE 5']

In [38]:
sdf = sdf.drop('LATITUDE',
 'LONGITUDE',
 'LOCATION',
 'ON STREET NAME',
 'CROSS STREET NAME',
 'OFF STREET NAME','CONTRIBUTING FACTOR VEHICLE 2',
 'CONTRIBUTING FACTOR VEHICLE 3',
 'CONTRIBUTING FACTOR VEHICLE 4',
 'CONTRIBUTING FACTOR VEHICLE 5','BOROUGH',
 'ZIP CODE', 'CRASH DATE',
 'CRASH TIME', 'VEHICLE TYPE CODE 2',
 'VEHICLE TYPE CODE 3',
 'VEHICLE TYPE CODE 4',
 'VEHICLE TYPE CODE 5','NUMBER OF PERSONS KILLED',
 'NUMBER OF PEDESTRIANS INJURED',
 'NUMBER OF PEDESTRIANS KILLED',
 'NUMBER OF CYCLIST INJURED',
 'NUMBER OF CYCLIST KILLED',
 'NUMBER OF MOTORIST INJURED',
 'NUMBER OF MOTORIST KILLED')


sdf.columns

['NUMBER OF PERSONS INJURED',
 'CONTRIBUTING FACTOR VEHICLE 1',
 'COLLISION_ID',
 'VEHICLE TYPE CODE 1']

In [26]:
null = sdf.select([count(when(isnull(c), c)).alias(c) for c in sdf.columns])
print(null.limit(10).toPandas())

[Stage 13:>                                                         (0 + 1) / 1]

   NUMBER OF PERSONS INJURED  NUMBER OF PERSONS KILLED  \
0                         19                        33   

   NUMBER OF PEDESTRIANS INJURED  NUMBER OF PEDESTRIANS KILLED  \
0                              2                             2   

   NUMBER OF CYCLIST INJURED  NUMBER OF CYCLIST KILLED  \
0                          1                         1   

   NUMBER OF MOTORIST INJURED  NUMBER OF MOTORIST KILLED  \
0                           1                          2   

   CONTRIBUTING FACTOR VEHICLE 1  COLLISION_ID  VEHICLE TYPE CODE 1  
0                           6720             2                13480  


                                                                                

In [27]:
sdf=sdf.dropna()
null = sdf.select([count(when(isnull(c), c)).alias(c) for c in sdf.columns])
print(null.limit(10).toPandas())



   NUMBER OF PERSONS INJURED  NUMBER OF PERSONS KILLED  \
0                          0                         0   

   NUMBER OF PEDESTRIANS INJURED  NUMBER OF PEDESTRIANS KILLED  \
0                              0                             0   

   NUMBER OF CYCLIST INJURED  NUMBER OF CYCLIST KILLED  \
0                          0                         0   

   NUMBER OF MOTORIST INJURED  NUMBER OF MOTORIST KILLED  \
0                           0                          0   

   CONTRIBUTING FACTOR VEHICLE 1  COLLISION_ID  VEHICLE TYPE CODE 1  
0                              0             0                    0  


                                                                                

In [39]:
sdf.show(10)

+-------------------------+-----------------------------+------------+--------------------+
|NUMBER OF PERSONS INJURED|CONTRIBUTING FACTOR VEHICLE 1|COLLISION_ID| VEHICLE TYPE CODE 1|
+-------------------------+-----------------------------+------------+--------------------+
|                      2.0|         Aggressive Drivin...|     4455765|               Sedan|
|                      1.0|            Pavement Slippery|     4513547|               Sedan|
|                      0.0|         Following Too Clo...|     4541903|               Sedan|
|                      0.0|                  Unspecified|     4456314|               Sedan|
|                      0.0|                  Unspecified|     4407458|                Dump|
|                      0.0|          Passing Too Closely|     4486555|               Sedan|
|                      2.0|                  Unspecified|     4486660|               Sedan|
|                      0.0|          Driver Inexperience|     4487074|          

In [40]:
sdf.printSchema()

root
 |-- NUMBER OF PERSONS INJURED: float (nullable = true)
 |-- CONTRIBUTING FACTOR VEHICLE 1: string (nullable = true)
 |-- COLLISION_ID: string (nullable = true)
 |-- VEHICLE TYPE CODE 1: string (nullable = true)



In [41]:
sdf = sdf.withColumn('NUMBER OF PERSONS INJURED', col('NUMBER OF PERSONS INJURED').cast('float'))

In [42]:
sdf.printSchema()

root
 |-- NUMBER OF PERSONS INJURED: float (nullable = true)
 |-- CONTRIBUTING FACTOR VEHICLE 1: string (nullable = true)
 |-- COLLISION_ID: string (nullable = true)
 |-- VEHICLE TYPE CODE 1: string (nullable = true)



In [32]:
bucket = 'my-bigdata-project-sh'
filename = 'trusted/Collisions.parquet'
file_path = 'gs://' + bucket + '/' + filename 

sdf.write.mode("overwrite").parquet(file_path)

                                                                                

In [43]:
# Pipeline Example: Importing functions and modules
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml import Pipeline
# Import the logistic regression model
from pyspark.ml.classification import LogisticRegression, LogisticRegressionModel
# Import the evaluation module
from pyspark.ml.evaluation import *
# Import the model tuning module
from pyspark.ml.tuning import *
import numpy as np

In [44]:
# Pipeline Example: Adding Indexers, Encoders  and Assembler
# Create a label.  =1 if soda, =0 otherwise

# Create an indexer for the three string based columns. 
indexer = StringIndexer(inputCols=["CONTRIBUTING FACTOR VEHICLE 1", "VEHICLE TYPE CODE 1"], outputCols=["CONTRIBUTING FACTOR VEHICLE 1Index", "VEHICLE TYPE CODE 1Index"])

# Create an encoder for the three indexes and the age integer column.
encoder = OneHotEncoder(inputCols=["CONTRIBUTING FACTOR VEHICLE 1Index", "VEHICLE TYPE CODE 1Index"],
                        outputCols=["CONTRIBUTINGVector", "VEHICLETYPEVector"],
                        dropLast=False)

# Create an assembler for the individual feature vectors and the float/double columns
assembler = VectorAssembler(inputCols=["CONTRIBUTINGVector", "VEHICLETYPEVector"], 
                            outputCol="features")

In [46]:
# Pipeline Example: Adding Stages to the Pipeline
# Create the pipeline
Vehicle_pipe = Pipeline(stages=[indexer, encoder, assembler])

# Call .fit to transform the data
transformed_sdf = Vehicle_pipe.fit(sdf).transform(sdf)

# transformed_sdf.show(5)
# Review the transformed features
transformed_sdf.select('features').show()

                                                                                

+--------------------+
|            features|
+--------------------+
|(1672,[22,61],[1....|
|(1672,[17,61],[1....|
|(1672,[3,61],[1.0...|
|(1672,[0,61],[1.0...|
|(1672,[0,85],[1.0...|
|(1672,[7,61],[1.0...|
|(1672,[0,61],[1.0...|
|(1672,[12,61],[1....|
|(1672,[7,61],[1.0...|
|(1672,[6,62],[1.0...|
|(1672,[8,61],[1.0...|
|(1672,[10,62],[1....|
|(1672,[13,61],[1....|
|(1672,[16,61],[1....|
|(1672,[7,100],[1....|
|(1672,[31,61],[1....|
|(1672,[11,62],[1....|
|(1672,[0,62],[1.0...|
|(1672,[3,62],[1.0...|
|(1672,[3,62],[1.0...|
+--------------------+
only showing top 20 rows



In [47]:
# Pipeline Example: Model Specification

# Optional: Take a small sample of the data while developing the rest of the code
transformed_sdf = transformed_sdf.sample(False, .25)

# Split the data into 70% training and 30% test sets  
trainingData, testData = transformed_sdf.randomSplit([0.7, 0.3], seed=42)

# You can stop here and run .show() to see what trainingData and testData look like.
# trainingData.show()

# Create a LogisticRegression Estimator
lr = LogisticRegression(featuresCol="features", labelCol="NUMBER OF PERSONS INJURED")

# Fit the model to the training data - This can take a long time depending on the size of the data
model = lr.fit(trainingData)

# Show model coefficients and intercept
print("Coefficients: ", model.coefficients)
print("Intercept: ", model.intercept)
# pd.DataFrame({'coefficients':model.coefficients, 'feature':list(pd.DataFrame(trainingData.schema["features"].metadata["ml_attr"]["attrs"]['numeric']).sort_values('idx')['name'])})

                                                                                

Py4JJavaError: An error occurred while calling o1029.coefficients.
: org.apache.spark.SparkException: Multinomial models contain a matrix of coefficients, use coefficientMatrix instead.
	at org.apache.spark.ml.classification.LogisticRegressionModel.coefficients(LogisticRegression.scala:1080)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:829)


In [48]:
# Test the model on the testData
test_results = model.transform(testData)

confusion_matrix = test_results.groupby('"NUMBER OF PERSONS INJURED"').pivot('prediction').count().fillna(0).collect()

def calculate_recall_precision(confusion_matrix):
    tn = confusion_matrix[0][1]  # True Negative
    fp = confusion_matrix[0][2]  # False Positive
    fn = confusion_matrix[1][1]  # False Negative
    tp = confusion_matrix[1][2]  # True Positive
    precision = tp / ( tp + fp )            
    recall = tp / ( tp + fn )
    accuracy = ( tp + tn ) / ( tp + tn + fp + fn )
    f1_score = 2 * ( ( precision * recall ) / ( precision + recall ) )
    return accuracy, precision, recall, f1_score

print("Accuracy, Precision, Recall, F1 Score")
print( calculate_recall_precision(confusion_matrix) )

                                                                                

AnalysisException: Column '`"NUMBER OF PERSONS INJURED"`' does not exist. Did you mean one of the following? [NUMBER OF PERSONS INJURED, VEHICLE TYPE CODE 1, VEHICLE TYPE CODE 1Index, COLLISION_ID, CONTRIBUTING FACTOR VEHICLE 1, CONTRIBUTINGVector, VEHICLETYPEVector, rawPrediction, CONTRIBUTING FACTOR VEHICLE 1Index, features, prediction, probability];
'Pivot ArrayBuffer('"NUMBER OF PERSONS INJURED"), prediction#2295: double, [0.0, 1.0, 2.0, 7.0], [count(1)]
+- Project [NUMBER OF PERSONS INJURED#1844, CONTRIBUTING FACTOR VEHICLE 1#881, COLLISION_ID#886, VEHICLE TYPE CODE 1#887, CONTRIBUTING FACTOR VEHICLE 1Index#2070, VEHICLE TYPE CODE 1Index#2071, CONTRIBUTINGVector#2092, VEHICLETYPEVector#2093, features#2115, rawPrediction#2263, probability#2277, UDF(rawPrediction#2263) AS prediction#2295]
   +- Project [NUMBER OF PERSONS INJURED#1844, CONTRIBUTING FACTOR VEHICLE 1#881, COLLISION_ID#886, VEHICLE TYPE CODE 1#887, CONTRIBUTING FACTOR VEHICLE 1Index#2070, VEHICLE TYPE CODE 1Index#2071, CONTRIBUTINGVector#2092, VEHICLETYPEVector#2093, features#2115, rawPrediction#2263, UDF(rawPrediction#2263) AS probability#2277]
      +- Project [NUMBER OF PERSONS INJURED#1844, CONTRIBUTING FACTOR VEHICLE 1#881, COLLISION_ID#886, VEHICLE TYPE CODE 1#887, CONTRIBUTING FACTOR VEHICLE 1Index#2070, VEHICLE TYPE CODE 1Index#2071, CONTRIBUTINGVector#2092, VEHICLETYPEVector#2093, features#2115, UDF(features#2115) AS rawPrediction#2263]
         +- Sample 0.7, 1.0, false, 42
            +- Sort [NUMBER OF PERSONS INJURED#1844 ASC NULLS FIRST, CONTRIBUTING FACTOR VEHICLE 1#881 ASC NULLS FIRST, COLLISION_ID#886 ASC NULLS FIRST, VEHICLE TYPE CODE 1#887 ASC NULLS FIRST, CONTRIBUTING FACTOR VEHICLE 1Index#2070 ASC NULLS FIRST, VEHICLE TYPE CODE 1Index#2071 ASC NULLS FIRST, CONTRIBUTINGVector#2092 ASC NULLS FIRST, VEHICLETYPEVector#2093 ASC NULLS FIRST, features#2115 ASC NULLS FIRST], false
               +- Sample 0.0, 0.25, false, 8601964020331560155
                  +- Project [NUMBER OF PERSONS INJURED#1844, CONTRIBUTING FACTOR VEHICLE 1#881, COLLISION_ID#886, VEHICLE TYPE CODE 1#887, CONTRIBUTING FACTOR VEHICLE 1Index#2070, VEHICLE TYPE CODE 1Index#2071, CONTRIBUTINGVector#2092, VEHICLETYPEVector#2093, UDF(struct(CONTRIBUTINGVector, CONTRIBUTINGVector#2092, VEHICLETYPEVector, VEHICLETYPEVector#2093)) AS features#2115]
                     +- Project [NUMBER OF PERSONS INJURED#1844, CONTRIBUTING FACTOR VEHICLE 1#881, COLLISION_ID#886, VEHICLE TYPE CODE 1#887, CONTRIBUTING FACTOR VEHICLE 1Index#2070, VEHICLE TYPE CODE 1Index#2071, UDF(cast(CONTRIBUTING FACTOR VEHICLE 1Index#2070 as double), 0) AS CONTRIBUTINGVector#2092, UDF(cast(VEHICLE TYPE CODE 1Index#2071 as double), 1) AS VEHICLETYPEVector#2093]
                        +- Project [NUMBER OF PERSONS INJURED#1844, CONTRIBUTING FACTOR VEHICLE 1#881, COLLISION_ID#886, VEHICLE TYPE CODE 1#887, UDF(cast(CONTRIBUTING FACTOR VEHICLE 1#881 as string)) AS CONTRIBUTING FACTOR VEHICLE 1Index#2070, UDF(cast(VEHICLE TYPE CODE 1#887 as string)) AS VEHICLE TYPE CODE 1Index#2071]
                           +- Project [cast(NUMBER OF PERSONS INJURED#1322 as float) AS NUMBER OF PERSONS INJURED#1844, CONTRIBUTING FACTOR VEHICLE 1#881, COLLISION_ID#886, VEHICLE TYPE CODE 1#887]
                              +- Project [NUMBER OF PERSONS INJURED#1322, CONTRIBUTING FACTOR VEHICLE 1#881, COLLISION_ID#886, VEHICLE TYPE CODE 1#887]
                                 +- Project [NUMBER OF PERSONS INJURED#1322, NUMBER OF PERSONS KILLED#1334, NUMBER OF PEDESTRIANS INJURED#1346, NUMBER OF PEDESTRIANS KILLED#1358, NUMBER OF CYCLIST INJURED#1370, NUMBER OF CYCLIST KILLED#1382, NUMBER OF MOTORIST INJURED#1394, cast(NUMBER OF MOTORIST KILLED#880 as float) AS NUMBER OF MOTORIST KILLED#1406, CONTRIBUTING FACTOR VEHICLE 1#881, COLLISION_ID#886, VEHICLE TYPE CODE 1#887]
                                    +- Project [NUMBER OF PERSONS INJURED#1322, NUMBER OF PERSONS KILLED#1334, NUMBER OF PEDESTRIANS INJURED#1346, NUMBER OF PEDESTRIANS KILLED#1358, NUMBER OF CYCLIST INJURED#1370, NUMBER OF CYCLIST KILLED#1382, cast(NUMBER OF MOTORIST INJURED#879 as float) AS NUMBER OF MOTORIST INJURED#1394, NUMBER OF MOTORIST KILLED#880, CONTRIBUTING FACTOR VEHICLE 1#881, COLLISION_ID#886, VEHICLE TYPE CODE 1#887]
                                       +- Project [NUMBER OF PERSONS INJURED#1322, NUMBER OF PERSONS KILLED#1334, NUMBER OF PEDESTRIANS INJURED#1346, NUMBER OF PEDESTRIANS KILLED#1358, NUMBER OF CYCLIST INJURED#1370, cast(NUMBER OF CYCLIST KILLED#878 as float) AS NUMBER OF CYCLIST KILLED#1382, NUMBER OF MOTORIST INJURED#879, NUMBER OF MOTORIST KILLED#880, CONTRIBUTING FACTOR VEHICLE 1#881, COLLISION_ID#886, VEHICLE TYPE CODE 1#887]
                                          +- Project [NUMBER OF PERSONS INJURED#1322, NUMBER OF PERSONS KILLED#1334, NUMBER OF PEDESTRIANS INJURED#1346, NUMBER OF PEDESTRIANS KILLED#1358, cast(NUMBER OF CYCLIST INJURED#877 as float) AS NUMBER OF CYCLIST INJURED#1370, NUMBER OF CYCLIST KILLED#878, NUMBER OF MOTORIST INJURED#879, NUMBER OF MOTORIST KILLED#880, CONTRIBUTING FACTOR VEHICLE 1#881, COLLISION_ID#886, VEHICLE TYPE CODE 1#887]
                                             +- Project [NUMBER OF PERSONS INJURED#1322, NUMBER OF PERSONS KILLED#1334, NUMBER OF PEDESTRIANS INJURED#1346, cast(NUMBER OF PEDESTRIANS KILLED#876 as float) AS NUMBER OF PEDESTRIANS KILLED#1358, NUMBER OF CYCLIST INJURED#877, NUMBER OF CYCLIST KILLED#878, NUMBER OF MOTORIST INJURED#879, NUMBER OF MOTORIST KILLED#880, CONTRIBUTING FACTOR VEHICLE 1#881, COLLISION_ID#886, VEHICLE TYPE CODE 1#887]
                                                +- Project [NUMBER OF PERSONS INJURED#1322, NUMBER OF PERSONS KILLED#1334, cast(NUMBER OF PEDESTRIANS INJURED#875 as float) AS NUMBER OF PEDESTRIANS INJURED#1346, NUMBER OF PEDESTRIANS KILLED#876, NUMBER OF CYCLIST INJURED#877, NUMBER OF CYCLIST KILLED#878, NUMBER OF MOTORIST INJURED#879, NUMBER OF MOTORIST KILLED#880, CONTRIBUTING FACTOR VEHICLE 1#881, COLLISION_ID#886, VEHICLE TYPE CODE 1#887]
                                                   +- Project [NUMBER OF PERSONS INJURED#1322, cast(NUMBER OF PERSONS KILLED#874 as float) AS NUMBER OF PERSONS KILLED#1334, NUMBER OF PEDESTRIANS INJURED#875, NUMBER OF PEDESTRIANS KILLED#876, NUMBER OF CYCLIST INJURED#877, NUMBER OF CYCLIST KILLED#878, NUMBER OF MOTORIST INJURED#879, NUMBER OF MOTORIST KILLED#880, CONTRIBUTING FACTOR VEHICLE 1#881, COLLISION_ID#886, VEHICLE TYPE CODE 1#887]
                                                      +- Project [cast(NUMBER OF PERSONS INJURED#873 as float) AS NUMBER OF PERSONS INJURED#1322, NUMBER OF PERSONS KILLED#874, NUMBER OF PEDESTRIANS INJURED#875, NUMBER OF PEDESTRIANS KILLED#876, NUMBER OF CYCLIST INJURED#877, NUMBER OF CYCLIST KILLED#878, NUMBER OF MOTORIST INJURED#879, NUMBER OF MOTORIST KILLED#880, CONTRIBUTING FACTOR VEHICLE 1#881, COLLISION_ID#886, VEHICLE TYPE CODE 1#887]
                                                         +- Filter atleastnnonnulls(11, NUMBER OF PERSONS INJURED#873, NUMBER OF PERSONS KILLED#874, NUMBER OF PEDESTRIANS INJURED#875, NUMBER OF PEDESTRIANS KILLED#876, NUMBER OF CYCLIST INJURED#877, NUMBER OF CYCLIST KILLED#878, NUMBER OF MOTORIST INJURED#879, NUMBER OF MOTORIST KILLED#880, CONTRIBUTING FACTOR VEHICLE 1#881, COLLISION_ID#886, VEHICLE TYPE CODE 1#887)
                                                            +- Project [NUMBER OF PERSONS INJURED#873, NUMBER OF PERSONS KILLED#874, NUMBER OF PEDESTRIANS INJURED#875, NUMBER OF PEDESTRIANS KILLED#876, NUMBER OF CYCLIST INJURED#877, NUMBER OF CYCLIST KILLED#878, NUMBER OF MOTORIST INJURED#879, NUMBER OF MOTORIST KILLED#880, CONTRIBUTING FACTOR VEHICLE 1#881, COLLISION_ID#886, VEHICLE TYPE CODE 1#887]
                                                               +- Relation [CRASH DATE#863,CRASH TIME#864,BOROUGH#865,ZIP CODE#866,LATITUDE#867,LONGITUDE#868,LOCATION#869,ON STREET NAME#870,CROSS STREET NAME#871,OFF STREET NAME#872,NUMBER OF PERSONS INJURED#873,NUMBER OF PERSONS KILLED#874,NUMBER OF PEDESTRIANS INJURED#875,NUMBER OF PEDESTRIANS KILLED#876,NUMBER OF CYCLIST INJURED#877,NUMBER OF CYCLIST KILLED#878,NUMBER OF MOTORIST INJURED#879,NUMBER OF MOTORIST KILLED#880,CONTRIBUTING FACTOR VEHICLE 1#881,CONTRIBUTING FACTOR VEHICLE 2#882,CONTRIBUTING FACTOR VEHICLE 3#883,CONTRIBUTING FACTOR VEHICLE 4#884,CONTRIBUTING FACTOR VEHICLE 5#885,COLLISION_ID#886,... 5 more fields] csv


In [1]:
from pyspark.sql import SparkSession
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Initialize Spark session
spark = SparkSession.builder.appName("MotorVehicleCollisions").getOrCreate()

# Load CSV file into DataFrame
file_path = "Motor_Vehicle_Collisions_-_Crashes_20240213 (1).csv"
df = spark.read.csv(file_path, header=True, inferSchema=True)

# Show the schema to understand the structure of the DataFrame
df.printSchema()

# Select relevant columns and filter for rows where people were killed
df_filtered = df.select("CONTRIBUTING_FACTOR_VEHICLE_1", "NUMBER_OF_PERSONS_KILLED")

# Group by contributing factor and sum the number of people killed
df_grouped = df_filtered.groupBy("CONTRIBUTING_FACTOR_VEHICLE_1") \
    .sum("NUMBER_OF_PERSONS_KILLED") \
    .withColumnRenamed("sum(NUMBER_OF_PERSONS_KILLED)", "TOTAL_PERSONS_KILLED")

# Convert to Pandas DataFrame for visualization
pd_df = df_grouped.toPandas()

# Sort the Pandas DataFrame by the number of people killed for better visualization
pd_df = pd_df.sort_values(by="TOTAL_PERSONS_KILLED", ascending=False)

# Plotting using Seaborn
plt.figure(figsize=(12, 8))
sns.barplot(x="TOTAL_PERSONS_KILLED", y="CONTRIBUTING_FACTOR_VEHICLE_1", data=pd_df)
plt.xlabel("Total Persons Killed")
plt.ylabel("Contributing Factor")
plt.title("Number of People Killed Based on Contributing Factor")
plt.show()

# Stop Spark session
spark.stop()


24/05/18 19:09:51 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


AnalysisException: Path does not exist: hdfs://cluster20240201a-m/user/root/Motor_Vehicle_Collisions_-_Crashes_20240213 (1).csv