In [2]:
!pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.2.tar.gz (317.3 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.3/317.3 MB[0m [31m4.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.2-py2.py3-none-any.whl size=317812365 sha256=2ad6e17bc054851be8c8b77281980ae597440bf8443d3ff92a1ba6299e850332
  Stored in directory: /root/.cache/pip/wheels/34/34/bd/03944534c44b677cd5859f248090daa9fb27b3c8f8e5f49574
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.2


# Reading the csv file for URL and creating spark dataframe


In [3]:
# Import necessary libraries
from pyspark.sql import SparkSession
import pandas as pd
import numpy as np
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import confusion_matrix, accuracy_score, classification_report
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer
import requests

In [4]:
# Initialize Spark session
spark = SparkSession.builder \
    .appName("ReadCSVFromURL") \
    .getOrCreate()

In [5]:
# URL of the CSV file
url = "https://raw.githubusercontent.com/MoonWalkerANM/Traffic_Collisions_Project4/main/collisions_df_la.csv"

In [6]:
# Fetch the content from the URL
response = requests.get(url)

In [7]:
# Create a temporary file to write on it and store the file path
import tempfile
with tempfile.NamedTemporaryFile(mode='w', delete=False) as temp_file:
    temp_file.write(response.text)
    temp_file_path = temp_file.name

In [8]:
# Read the CSV content from the temporary file into a Spark DataFrame
df_spark = spark.read.csv(temp_file_path, header=True, inferSchema=True)

In [9]:
# Show the DataFrame schema and first few rows
df_spark.printSchema()
df_spark.show()

root
 |-- Severity: integer (nullable = true)
 |-- End_Time: timestamp (nullable = true)
 |-- End_Lat: double (nullable = true)
 |-- End_Lng: double (nullable = true)
 |-- Distance(mi): double (nullable = true)
 |-- Description: string (nullable = true)
 |-- Street: string (nullable = true)
 |-- City: string (nullable = true)
 |-- County: string (nullable = true)
 |-- State: string (nullable = true)
 |-- Zipcode: string (nullable = true)
 |-- Temperature(F): double (nullable = true)
 |-- Visibility(mi): double (nullable = true)
 |-- Wind_Speed(mph): double (nullable = true)
 |-- Weather_Condition: string (nullable = true)
 |-- Railway: boolean (nullable = true)
 |-- Stop: boolean (nullable = true)
 |-- Traffic_Signal: boolean (nullable = true)
 |-- Sunrise_Sunset: string (nullable = true)
 |-- Day: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Weekday/Weekend: string (nullable = true)
 |-- Day_of_the_Week: string (nullable = t

# Data Preprocessing

In [10]:
# Select feature columns and target variable
feature_columns = ['End_Lat', 'End_Lng', 'Left_Zipcode']
target_column = 'Severity'

In [11]:
# Assemble features into a vector
assembler = VectorAssembler(inputCols=feature_columns, outputCol='features')

In [12]:
# Create the feature vector
df_features = assembler.transform(df_spark)

In [13]:
# Index the target variable
indexer = StringIndexer(inputCol=target_column, outputCol='label')

# Transform the data to include the indexed label
df_indexed = indexer.fit(df_features).transform(df_features)

In [14]:
# Select only the features and label columns
df_final = df_indexed.select('features', 'label')

# Spliting the Data


In [15]:
# Split the data into tranin and test randomly
train_df, test_df = df_final.randomSplit([0.8, 0.2], seed=78)

# Scaling the Features


In [18]:
# Use StandardScaler to scale the features
scaler = StandardScaler(inputCol='features', outputCol='scaled_features')

# Create the pipeline to include scaling
pipeline = Pipeline(stages=[scaler])

# Fit the pipeline to the training data
scaler_model = pipeline.fit(train_df)

# Transform the training and test data
train_df_scaled = scaler_model.transform(train_df)
test_df_scaled = scaler_model.transform(test_df)

# Model Training and Evaluation

In [19]:
# Convert Spark DataFrames to pandas to use with scikit-learn
train_pd = train_df_scaled.toPandas()
test_pd = test_df_scaled.toPandas()

X_train = train_pd['scaled_features'].tolist()
y_train = train_pd['label']

X_test = test_pd['scaled_features'].tolist()
y_test = test_pd['label']

In [20]:
# Convert the lists to NumPy arrays
X_train = np.array([x.toArray() for x in X_train])
X_test = np.array([x.toArray() for x in X_test])
y_train = np.array(y_train)
y_test = np.array(y_test)

In [21]:
# Train RandomForest model with Scikit-learn
# Initialize the Random Forest classifier
rf = RandomForestClassifier(n_estimators=500, random_state=78)

# Fit the model
rf.fit(X_train, y_train)

# Make predictions
y_pred = rf.predict(X_test)

In [22]:
# Evaluate the model
# Calculate confusion matrix, accuracy, and classification report
conf_matrix = confusion_matrix(y_test, y_pred)
accuracy = accuracy_score(y_test, y_pred)
class_report = classification_report(y_test, y_pred)

print("Confusion Matrix:")
print(conf_matrix)

print(f"Accuracy Score: {accuracy}")

print("Classification Report:")
print(class_report)

Confusion Matrix:
[[10589    18    16]
 [   64    10     5]
 [   35     3    13]]
Accuracy Score: 0.9868873802659723
Classification Report:
              precision    recall  f1-score   support

         0.0       0.99      1.00      0.99     10623
         1.0       0.32      0.13      0.18        79
         2.0       0.38      0.25      0.31        51

    accuracy                           0.99     10753
   macro avg       0.57      0.46      0.49     10753
weighted avg       0.98      0.99      0.98     10753



# Remove Temporary File

In [23]:
# Clean up the temporary file
import os
os.remove(temp_file_path)

# Stop Spark Session


In [24]:
# Stop Spark session
spark.stop()