<a href="https://colab.research.google.com/github/Zainabudp/US-Traffic-Accidents/blob/main/traffic_accidents.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
df=pd.read_csv(r"D:\US_Accidents_March23.csv\US_Accidents_March23.csv")

In [None]:
df

In [None]:
df.info()

In [None]:
df['Severity'].value_counts()

In [None]:
# Convert 'Start_Time' column to datetime format with error handling
df['Start_Time'] = pd.to_datetime(df['Start_Time'], errors='coerce')

# Display rows with parsing errors
rows_with_errors = df[df['Start_Time'].isna()]
print("Rows with parsing errors:")
print(rows_with_errors)

# Continue with the rest of the code to filter the dataset
filtered_dfs = []

# Iterate over each year from 2016 to 2023
for year in range(2016, 2024):
    # Filter the dataset for the current year and select 10,000 rows
    filtered_df = df[df['Start_Time'].dt.year == year].head(10000)
    # Append the filtered dataset to the list
    filtered_dfs.append(filtered_df)

# Concatenate the filtered datasets into a single dataframe
final_df = pd.concat(filtered_dfs, ignore_index=True)

# Display the first few rows of the final dataframe
final_df.head()


In [None]:
final_df['Severity'].value_counts()

In [None]:
# Define a function to map severity levels
def map_severity(severity):
    if severity in [1, 2]:
        return 'Low'
    else:
        return 'High'

# Apply the function to create a new column 'Binary_Severity'
final_df['Binary_Severity'] = final_df['Severity'].map(map_severity)

# Check the value counts of the new column
print(final_df['Binary_Severity'].value_counts())

In [None]:
missing_percentage = (final_df.isnull().sum() / len(df)) * 100

# Get the columns with missing values exceeding 10%
columns_to_drop = missing_percentage[missing_percentage > 10].index

# Drop the columns from the dataframe
final_df = final_df.drop(columns=columns_to_drop)

# Print the remaining columns
print("Columns after removing those with missing values more than 10%:")
print(final_df.columns)

In [None]:
df=final_df

In [None]:
# Columns to remove
columns_to_remove = ['ID', 'Start_Lat', 'Start_Lng', 'End_Lat', 'End_Lng', 'Description', 'Timezone']

# Remove specified columns
filtered_df = df.drop(columns=columns_to_remove)

In [None]:
filtered_df['County'].value_counts()

In [None]:
# Columns to remove
columns_to_remove = ['Street', 'Zipcode', 'Country','Airport_Code', 'Weather_Timestamp']

# Remove specified columns
filtered_df = filtered_df.drop(columns=columns_to_remove)

In [None]:
filtered_df

In [None]:
# Count occurrences of each unique value in 'City', 'County', and 'State'
city_counts = filtered_df['City'].value_counts()
county_counts = filtered_df['County'].value_counts()
state_counts = filtered_df['State'].value_counts()

# Identify values with less than 100 occurrences
city_to_remove = city_counts[city_counts < 100].index
county_to_remove = county_counts[county_counts < 100].index
state_to_remove = state_counts[state_counts < 100].index

# Filter out instances with values to remove
filtered_df = filtered_df[~filtered_df['City'].isin(city_to_remove)]
filtered_df = filtered_df[~filtered_df['County'].isin(county_to_remove)]
filtered_df = filtered_df[~filtered_df['State'].isin(state_to_remove)]

In [None]:
filtered_df

In [None]:
# Convert 'Start_Time' and 'End_Time' to datetime
filtered_df['Start_Time'] = pd.to_datetime(filtered_df['Start_Time'])
filtered_df['End_Time'] = pd.to_datetime(filtered_df['End_Time'])

# Extract hour and year from 'Start_Time' and 'End_Time'
filtered_df['Start_Hour'] = filtered_df['Start_Time'].dt.hour
filtered_df['Start_Year'] = filtered_df['Start_Time'].dt.year
filtered_df['End_Hour'] = filtered_df['End_Time'].dt.hour
filtered_df['End_Year'] = filtered_df['End_Time'].dt.year
# Remove original columns
filtered_df.drop(columns=['Start_Time', 'End_Time'], inplace=True)

In [None]:
filtered_df

In [None]:
df_viz=filtered_df.copy()

In [None]:
severity_counts = df_viz['Binary_Severity'].value_counts()

# Plotting the bar plot
plt.figure(figsize=(8, 6))
severity_counts.plot(kind='bar', color='skyblue')
plt.title('Number of Severity Samples')
plt.xlabel('Severity')
plt.ylabel('Count')
plt.xticks(rotation=45)  # Rotate x-axis labels for better readability
plt.grid(axis='y', linestyle='--', alpha=0.7)  # Add grid lines for better visualization
plt.tight_layout()  # Adjust layout to prevent clipping of labels
plt.show()


In [None]:
from sklearn.preprocessing import LabelEncoder
# Initialize LabelEncoder
label_encoder = LabelEncoder()

# Convert categorical variables to numeric using LabelEncoder
for col in filtered_df.columns:
    if filtered_df[col].dtype == 'object':
        filtered_df[col] = label_encoder.fit_transform(filtered_df[col])

# Check the converted DataFrame
filtered_df.head()

In [None]:
filtered_df.corr()

In [None]:
filtered_df.drop(['Severity'],inplace=True, axis=1)

In [None]:
# Calculate the correlation matrix
correlation_matrix = filtered_df.corr()

# Extract the top 10 variables correlated with 'Severity' (excluding 'Severity' itself)
top_10_variables = correlation_matrix['Binary_Severity'].abs().nlargest(11)[1:]

# Select only the top 10 variables from the correlation matrix
correlation_matrix_top_10 = correlation_matrix.loc[top_10_variables.index, 'Binary_Severity']

# Plot the correlation of the top 10 variables with 'Severity' using a heatmap
plt.figure(figsize=(10, 8))
sns.heatmap(correlation_matrix_top_10.to_frame(), annot=True, cmap='coolwarm', fmt=".2f", linewidths=.5)
plt.title('Correlation with Severity - Top 10 Variables')
plt.xlabel('Variable')
plt.ylabel('Severity')
plt.xticks(rotation=45)
plt.yticks(rotation=0)
plt.tight_layout()
plt.show()


In [None]:
filtered_df.columns

In [None]:
filtered_df.isnull().sum()

In [None]:
# Import SimpleImputer
from sklearn.impute import SimpleImputer

# Define imputer strategies based on data types
imputer_strategies = {
    'float64': 'median',
    'bool': 'most_frequent',
    'int32': 'most_frequent',
    'int64': 'most_frequent'
}

# Initialize SimpleImputer
imputer = SimpleImputer(strategy='constant', fill_value=None)

# Iterate through columns and impute missing values
for col in filtered_df.columns:
    dtype = filtered_df[col].dtype
    if col != 'Binary_Severity' and filtered_df[col].isnull().sum() > 0:  # Skip Severity column and columns without missing values
        imputer.strategy = imputer_strategies[str(dtype)]
        filtered_df[col] = imputer.fit_transform(filtered_df[[col]])

# Verify if missing values are imputed
filtered_df.isnull().sum()

In [None]:
filtered_df.info()

In [None]:
from sklearn.preprocessing import LabelEncoder

# Initialize LabelEncoder
label_encoder = LabelEncoder()

# Convert categorical variables to numeric using LabelEncoder
for col in filtered_df.columns:
    if filtered_df[col].dtype == 'bool':
        filtered_df[col] = label_encoder.fit_transform(filtered_df[col])

# Check the converted DataFrame
filtered_df.head()

## Default model building

In [None]:
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from xgboost import XGBClassifier
from sklearn.neural_network import MLPClassifier
from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score, f1_score, roc_curve, roc_auc_score, classification_report
import matplotlib.pyplot as plt
import numpy as np
import time
from sklearn.preprocessing import StandardScaler

In [None]:
# Split the data into training and testing sets
X = filtered_df.drop(columns=['Binary_Severity'])
y = filtered_df['Binary_Severity']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Normalize the input variables
scaler = StandardScaler()
X_train_normalized = scaler.fit_transform(X_train)
X_test_normalized = scaler.transform(X_test)

In [None]:
# Initialize classifiers
classifiers = {
    'XGBoost': XGBClassifier(random_state=42),
    'Random Forest': RandomForestClassifier(random_state=42),
    'Gradient Boosting': GradientBoostingClassifier(random_state=42),
    'MLP': MLPClassifier(random_state=42, max_iter=1000)
}

# Train and evaluate each classifier
for name, clf in classifiers.items():
    start_time = time.time()  # Start time for training
    # Train the classifier
    clf.fit(X_train_normalized, y_train)
    end_time = time.time()  # End time for training
    training_time = end_time - start_time  # Calculate training time

    # Cross-validation scores
    cv_scores = cross_val_score(clf, scaler.transform(X), y, cv=5)
    mean_cv_score = np.mean(cv_scores)
    # Predict labels
    y_pred = clf.predict(X_test_normalized)

    # Calculate evaluation metrics
    accuracy = accuracy_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred, average='weighted')
    recall = recall_score(y_test, y_pred, average='weighted')
    f1 = f1_score(y_test, y_pred, average='weighted')

    # Calculate probabilities for MLP
    if name == 'MLP':
        # Predict class labels
        y_pred_labels = clf.predict(X_test_normalized)
        # Get the index of the positive class
        pos_index = np.where(clf.classes_ == 1)[0][0]
        # Manually calculate probabilities
        y_pred_prob = clf.predict_proba(X_test_normalized)
        y_pred_prob = y_pred_prob[:, pos_index]
    else:
        # Calculate probabilities for other classifiers
        y_pred_prob = clf.predict_proba(X_test_normalized)[:, 1]

    # Calculate ROC curve
    fpr, tpr, thresholds = roc_curve(y_test, y_pred_prob)

    # Calculate AUC score
    auc_score = roc_auc_score(y_test, y_pred_prob)

    # Plot ROC curve
    plt.figure(figsize=(8, 6))
    plt.plot(fpr, tpr, label=f'AUC = {auc_score:.2f}')
    plt.plot([0, 1], [0, 1], 'k--')
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title(f'{name} ROC Curve')
    plt.legend(loc='lower right')
    plt.show()

    # Print classification report
    print(f"{name} Classification Report:")
    print(classification_report(y_test, y_pred))

    # Calculate confusion matrix
    cm = confusion_matrix(y_test, y_pred)

    # Plot confusion matrix with imshow
    plt.figure(figsize=(10, 7))
    plt.imshow(cm, cmap='Blues', interpolation='nearest')

    # Add color bar
    plt.colorbar()

    # Set ticks
    tick_marks = np.arange(len(np.unique(y_test)))
    plt.xticks(tick_marks, np.unique(y_test), rotation=45)
    plt.yticks(tick_marks, np.unique(y_test))

    # Add annotations
    thresh = cm.max() / 2.
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            plt.text(j, i, format(cm[i, j], 'd'),
                     horizontalalignment="center",
                     color="green" if cm[i, j] > thresh else "black")

    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title(f'{name} Confusion Matrix')
    plt.tight_layout()
    plt.show()

    # Print evaluation metrics
    print(f"{name} Evaluation Metrics:")
    print("Accuracy:", accuracy)
    print("Precision:", precision)
    print("Recall:", recall)
    print("F1 Score:", f1)
    print("Mean Cross-validation Score:", mean_cv_score)
    print("Training Time:", training_time, "seconds")
    print()


## Feature Selection

In [None]:
import warnings
# Ignore warnings
warnings.filterwarnings("ignore")
from sklearn.model_selection import train_test_split

In [None]:
from sklearn.feature_selection import SelectKBest
from sklearn.feature_selection import mutual_info_classif

# Select the top 10 features using mutual information
selector = SelectKBest(score_func=mutual_info_classif, k=10)
X_selected = selector.fit_transform(X_train, y_train)

# Get selected features indices
selected_features_indices = selector.get_support(indices=True)
selected_features = X.columns[selected_features_indices]

# Print the names of the selected features
print("Selected Features:")
for feature in selected_features:
    print(feature)


In [None]:
feature_scores = selector.scores_[selected_features_indices]

# Sort the features and scores
sorted_indices = np.argsort(feature_scores)[::-1]
sorted_features = selected_features[sorted_indices]
sorted_scores = feature_scores[sorted_indices]

# Plot the top 10 features
plt.figure(figsize=(10, 6))
plt.barh(range(len(sorted_features)), sorted_scores, align='center')
plt.yticks(range(len(sorted_features)), sorted_features)
plt.xlabel('Feature Importance Score')
plt.ylabel('Feature')
plt.title('Top 10 Selected Features')
plt.show()


## Model Tuning

In [None]:
from sklearn.model_selection import GridSearchCV
from sklearn.metrics import classification_report, confusion_matrix, roc_curve, auc
import matplotlib.pyplot as plt
import numpy as np
import time

# Initialize classifiers with default hyperparameters
classifiers = {
    'XGBoost': XGBClassifier(random_state=42),
    'Random Forest': RandomForestClassifier(random_state=42),
    'Gradient Boosting': GradientBoostingClassifier(random_state=42),
    'MLP': MLPClassifier(random_state=42, max_iter=1000)
}

# Hyperparameters grid for grid search
param_grid = {
    'XGBoost': {'n_estimators': [100, 200, 300],
                'max_depth': [3, 5, 7],
                'learning_rate': [0.01, 0.1, 0.2]},
    'Random Forest': {'n_estimators': [100, 200, 300],
                      'max_depth': [10, 20],
                      'min_samples_split': [2, 5]},
    'Gradient Boosting': {'n_estimators': [100, 200, 300],
                          'learning_rate': [0.01, 0.1],
                          'max_depth': [3, 5]},
    'MLP': {'hidden_layer_sizes': [(64, 32, 16, 8), (100,)],
            'activation': ['relu', 'tanh'],
            'alpha': [0.0001, 0.001],
            'learning_rate': ['constant', 'adaptive']}
}

# Perform grid search and hyperparameter tuning for each classifier
tuned_classifiers = {}
for name, clf in classifiers.items():
    start_time = time.time()  # Start time for training
    # Perform grid search
    grid_search = GridSearchCV(clf, param_grid[name], cv=5)
    grid_search.fit(X_train_normalized, y_train)
    end_time = time.time()  # End time for training
    training_time = end_time - start_time  # Calculate training time

    # Get the best model
    best_clf = grid_search.best_estimator_

    # Train the best model on the normalized training data
    best_clf.fit(X_train_normalized, y_train)

    # Store the tuned model and best parameters
    tuned_classifiers[name] = (best_clf, grid_search.best_params_, training_time)

# Evaluate the tuned models on the normalized testing data
for name, (clf, best_params, training_time) in tuned_classifiers.items():
    # Predict labels
    y_pred = clf.predict(X_test_normalized)

    # Print best parameters
    print(f"{name} Best Parameters:", best_params)

    # Print classification report
    print(f"{name} Classification Report:")
    print(classification_report(y_test, y_pred))

    # Calculate confusion matrix
    cm = confusion_matrix(y_test, y_pred)

    # Plot confusion matrix
    plt.figure(figsize=(10, 7))
    plt.imshow(cm, cmap='Blues', interpolation='nearest')
    plt.colorbar()
    tick_marks = np.arange(len(np.unique(y_test)))
    plt.xticks(tick_marks, np.unique(y_test), rotation=45)
    plt.yticks(tick_marks, np.unique(y_test))
    thresh = cm.max() / 2.
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            plt.text(j, i, format(cm[i, j], 'd'),
                     horizontalalignment="center",
                     color="green" if cm[i, j] > thresh else "black")
    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title(f'{name} Confusion Matrix')
    plt.tight_layout()
    plt.show()

    # Plot ROC curve
    y_pred_prob = clf.predict_proba(X_test_normalized)[:, 1]
    fpr, tpr, _ = roc_curve(y_test, y_pred_prob)
    roc_auc = auc(fpr, tpr)
    plt.figure()
    plt.plot(fpr, tpr, color='darkorange', lw=2, label=f'ROC curve (area = {roc_auc:.2f})')
    plt.plot([0, 1], [0, 1], color='navy', lw=2, linestyle='--')
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title(f'{name} ROC Curve')
    plt.legend(loc="lower right")
    plt.show()

    # Print evaluation metrics
    print(f"{name} Evaluation Metrics:")
    print("Training Time:", training_time, "seconds")
    print()


In [None]:
from sklearn.decomposition import PCA
# Apply PCA to reduce dimensionality
pca = PCA(n_components=10)  # Specify the number of principal components1
X_train_pca = pca.fit_transform(X_train_normalized)
X_test_pca = pca.transform(X_test_normalized)

# Initialize classifiers
classifiers = {
    'XGBoost': XGBClassifier(random_state=42),
    'Random Forest': RandomForestClassifier(random_state=42),
    'Gradient Boosting': GradientBoostingClassifier(random_state=42),
    'MLP': MLPClassifier(random_state=42, max_iter=1000)
}

# Train and evaluate each classifier
for name, clf in classifiers.items():
    start_time = time.time()  # Start time for training
    # Train the classifier
    clf.fit(X_train_pca, y_train)
    end_time = time.time()  # End time for training
    training_time = end_time - start_time  # Calculate training time

    # Cross-validation scores
    cv_scores = cross_val_score(clf, pca.transform(X), y, cv=5)
    mean_cv_score = np.mean(cv_scores)
    # Predict labels
    y_pred = clf.predict(X_test_pca)

    # Calculate evaluation metrics
    accuracy = accuracy_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred, average='weighted')
    recall = recall_score(y_test, y_pred, average='weighted')
    f1 = f1_score(y_test, y_pred, average='weighted')

    # Calculate probabilities for MLP
    if name == 'MLP':
        # Predict class labels
        y_pred_labels = clf.predict(X_test_pca)
        # Get the index of the positive class
        pos_index = np.where(clf.classes_ == 1)[0][0]
        # Manually calculate probabilities
        y_pred_prob = clf.predict_proba(X_test_pca)
        y_pred_prob = y_pred_prob[:, pos_index]
    else:
        # Calculate probabilities for other classifiers
        y_pred_prob = clf.predict_proba(X_test_pca)[:, 1]

    # Calculate ROC curve
    fpr, tpr, thresholds = roc_curve(y_test, y_pred_prob)

    # Calculate AUC score
    auc_score = roc_auc_score(y_test, y_pred_prob)

    # Plot ROC curve
    plt.figure(figsize=(8, 6))
    plt.plot(fpr, tpr, label=f'AUC = {auc_score:.2f}')
    plt.plot([0, 1], [0, 1], 'k--')
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title(f'{name} ROC Curve')
    plt.legend(loc='lower right')
    plt.show()

    # Print classification report
    print(f"{name} Classification Report:")
    print(classification_report(y_test, y_pred))

    # Calculate confusion matrix
    cm = confusion_matrix(y_test, y_pred)

    # Plot confusion matrix with imshow
    plt.figure(figsize=(10, 7))
    plt.imshow(cm, cmap='Blues', interpolation='nearest')

    # Add color bar
    plt.colorbar()

    # Set ticks
    tick_marks = np.arange(len(np.unique(y_test)))
    plt.xticks(tick_marks, np.unique(y_test), rotation=45)
    plt.yticks(tick_marks, np.unique(y_test))

    # Add annotations
    thresh = cm.max() / 2.
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            plt.text(j, i, format(cm[i, j], 'd'),
                     horizontalalignment="center",
                     color="green" if cm[i, j] > thresh else "black")

    plt.xlabel('Predicted')
    plt.ylabel('True')
    plt.title(f'{name} Confusion Matrix')
    plt.tight_layout()
    plt.show()

    # Print evaluation metrics
    print(f"{name} Evaluation Metrics:")
    print("Accuracy:", accuracy)
    print("Precision:", precision)
    print("Recall:", recall)
    print("F1 Score:", f1)
    print("Mean Cross-validation Scores:", mean_cv_score)
    print("Training Time:", training_time, "seconds")
    print()