## Imports Statments
Importing the Necessary libraries.

In [None]:
import os
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import sqlite3
import warnings
import folium
import ydata_profiling

from time import time
from sklearn import metrics
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report, confusion_matrix, accuracy_score
from folium import CircleMarker
from folium.plugins import HeatMap
from IPython.display import display, HTML
from sklearn.cluster import KMeans
from sklearn_extra.cluster import KMedoids
from sklearn.cluster import DBSCAN
from mlxtend.preprocessing import TransactionEncoder
from sklearn.feature_selection import (SelectKBest, chi2, f_regression, f_classif)
from mlxtend.frequent_patterns import apriori, association_rules
from ydata_profiling import ProfileReport
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import cross_val_score, cross_validate
from sklearn.linear_model import LogisticRegression
from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.svm import SVC
from sklearn.neighbors import KNeighborsClassifier, LocalOutlierFactor
from sklearn.naive_bayes import GaussianNB
from imblearn.over_sampling import SMOTE
from imblearn.under_sampling import RandomUnderSampler

warnings.filterwarnings("ignore", category=DeprecationWarning)
warnings.filterwarnings("ignore", category=FutureWarning)
warnings.simplefilter("ignore", UserWarning)
%matplotlib inline
warnings.filterwarnings("ignore")


## Data Loading and Preprocessing
Loading the Data from the Database and defining helper Classes and function to aid data retrieval.

In [None]:
class DatabaseUtil(object):
    """
    Class for extracting details from the Database & tabulating Queries
    """
    def __init__(self, db_name):
        """
        Constructor that initializes the class with The Database Name
        Connection and Cursor
        """ 
        self.db_name = db_name
        self.connection = None
        self.cursor = None
        
        
    def connect_database(self):
        """
        Connects to the database, Fetch all Table names
        """
        try:
            self.connection = sqlite3.connect(self.db_name)
            self.cursor = self.connection.cursor()
            print("Successfully Connected to the database")
        except sqlite3.Error as e:
            print("Error connecting to the database:", e)
            
        self.cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
        self.tables = self.cursor.fetchall()
    
    def disconnect_database(self):
        """
        Disconnects from the database
        """
        if self.connection:
            self.cursor.close()
            self.connection.close()
            print("Disconnected from the database")

    def get_table_names_and_attributes(self):
        """
        Get Table Names and Attributes
        """
        if not self.connection or not self.cursor:
            print("Please connect a database first")
            return
        
        accident_data_attribute = {} 
        for table in self.tables:   
            table_name = table[0]

            table_attributes = self.cursor.execute(f"PRAGMA table_info({table_name})").fetchall()
            accident_data_attribute[table_name] = [column[1] for column in table_attributes]
        return accident_data_attribute
    
    def get_row_counts(self):
        """
        Get Table Row Counts
        """
        if not self.connection or not self.cursor:
            print("Please connect a database first")
            return
        
        for table in self.tables:
            table_name = table[0]
            self.cursor.execute(f"SELECT COUNT(*) FROM {table_name};")
            row_count = self.cursor.fetchone()[0]
            print(f"Table Name: {table_name}, Row Counts: {row_count}")
            
    def get_primary_keys(self):
        """
        Get the list of primary keys for each table in the database.
        """
        if not self.connection or not self.cursor:
            print("Please connect a database first")
            return

        primary_keys = {}

        for table in self.tables:
            table_name = table[0]
            table_info = self.cursor.execute(f"PRAGMA table_info({table_name})").fetchall()
        
            # Find all columns where pk flag == 1
            pk_columns = [col[1] for col in table_info if col[-1] == 1]

            if pk_columns:
                primary_keys[table_name] = pk_columns if len(pk_columns) > 1 else pk_columns[0]
            else:
                primary_keys[table_name] = None  # No PK found

        return primary_keys


    def tabulate_query(self, query):
        """
        Using Query to make a dataframe
        """
        if not self.connection or not self.cursor:
            print("Please connect a database first")
            return
        
        cmd = query
        data_frame = pd.read_sql_query(query, self.connection)
        return data_frame

In [None]:
accident_data_v1 = DatabaseUtil('accident_data_v1.0.0_2023.db') # Initializing Database
accident_data_v1.connect_database() # Connecting the Database

In [None]:
table_names_and_attributes = accident_data_v1.get_table_names_and_attributes() # Get table names and attributes
table_names_and_attributes

In [None]:
table_primary_keys = accident_data_v1.get_primary_keys() # Get Primarys Keys
table_primary_keys

In [None]:
row_counts = accident_data_v1.get_row_counts() # Get Row Count
row_counts

### Extract Data FROM 2019 Into Dataframes

In [None]:
class AccidentAnalysis:
    """
    Class to manage accident data analysis for 2019.
    """

    def __init__(self, db_util):
        """
        Initializes with a DatabaseUtil object (already connected).
        """
        self.db_util = db_util
        self.accidents = None
        self.vehicles = None
        self.casualties = None

    def load_data(self, year=2019):
        """
        Loads accidents, vehicles, and casualties data for the given year.
        """
        print(f"Loading data for year {year}...")

        query_template = "SELECT * FROM {table} WHERE accident_year = {year}"

        self.accidents = self.db_util.tabulate_query(query_template.format(table="accident", year=year))
        self.vehicles = self.db_util.tabulate_query(query_template.format(table="vehicle", year=year))
        self.casualties = self.db_util.tabulate_query(query_template.format(table="casualty", year=year))

        print(f"Data loaded: {len(self.accidents)} accidents, {len(self.vehicles)} vehicles, {len(self.casualties)} casualties")


In [None]:
# Connect to DB using existing class
db_util = DatabaseUtil("accident_data_v1.0.0_2023.db")
db_util.connect_database()

# Create AccidentAnalysis object
analysis = AccidentAnalysis(db_util)

#  Load 2019 data
analysis.load_data(2019)

# View results
analysis.accidents.head()
analysis.vehicles.head()
analysis.casualties.head()


**Disconnecting the Database after extracting all needed data into dataframes**

In [None]:
accident_data_v1.disconnect_database() # Disconecting the Database

## Data Cleaning, EDA and Imputation
Cleaning data, filling missing values and Visualise the dataframe

In [None]:
class CleanUpAnalyzeData(object):
    """
    Class cleaning data and filling missing values.
    """
    def __init__(self, data):
        """
        Constructor that initializes the class with the Dataframe
        """ 
        self.data = data
        self.columns_with_unknown_values = {}
        return data.info()
    
    def get_columns_with_initial_nan_values(self):
        """
        Get names of columns containing missing values
        """
        columns_with_missing_values = self.data.columns[self.data.isnull().any()]
        if len(columns_with_missing_values) == 0:
            return 'No Initial NAN values'
        else:
            for column in columns_with_missing_values:
                num_missing_values = self.data[column].isnull().sum()
                print(f"column '{column}' has {num_missing_values} missing values.")
                
            
            percentage_nan = (self.data.isna().sum().sum() /  len(self.data)) * 100
            if percentage_nan < 1:
                self.data.dropna(inplace=True)
                return(f"""The percentage of inital NaN values in the DataFrame is: {percentage_nan:.5f}% less than 1% and automatically dropped""")
            else:
                return(f"The percentage of inital NaN values in the DataFrame is: {percentage_nan:.5f}%")
            
    def display_columns_with_unknown_values(self):
        """
        Display Columns with -1 or empty spaces.
        """
        missing = self.data.isin([-1, '-1', '']).sum().to_dict()
        columns_with_unknown_values = {key: value for key, value in missing.items() if value > 0}
        
        if len(columns_with_unknown_values) > 0:
            self.columns_with_unknown_values = columns_with_unknown_values
            return columns_with_unknown_values
        else:
            return "No columns containing - 1 or '' "
    
    def missing_data_imputation(self):
        """
        Filling Missing the Values
        """
        if len(self.columns_with_unknown_values) >= 1:
            for column, missing_val_count in self.columns_with_unknown_values.items():
                # if a column missing value is greater than 40% drop column else fill with the mode value.
                if ((missing_val_count /  len(self.data)) * 100 ) > 40:
                    self.data = self.data.drop(column, axis=1)
                    print(f"Dropped {column} Column for having over 40% Missing Values")
                    
                else:
                    self.data[column].replace(['-1', -1, ''], np.nan, inplace=True) 
                    # filling age values with the Median
                    if 'age_' in column:
                        median_value = int(round(self.data[column].median()))
                        self.data[column].fillna(median_value, inplace=True)
                        self.data[column] = self.data[column].astype(int)
                        print(f"Filled the {column} column")
                    else:
                        # filling other categorical and string values with the mode.
                        mode_value = self.data[column].mode()[0]
                        if self.data[column].dtype == float or  self.data[column].dtype == int:
                            self.data[column].fillna(int(mode_value), inplace=True)
                            self.data[column] = self.data[column].astype(int)
                            print(f"Filled the {column} column")
                        else:
                            self.data[column].fillna(mode_value , inplace=True)
                            print(f"Filled the {column} column")
        else:
            print('No Missing Column Values to Fill')

    def overview_visualization(self, title=str):
        """
        Using Pandas Profilling to visualize the dataframe.
        """
        report = ProfileReport(self.data, title=title, progress_bar=True)
        return report
        
    def finalizie_cleaning(self):
        print('Finished Data Cleaning')
        return self.data
    

### Accident Data Cleaning
**cad means 'Clean Up Analyze Data'**

In [None]:
cad_accident_data = CleanUpAnalyzeData(analysis.accidents) # Initializing CLeaning and Overview Analysis to the accident data

In [None]:
cad_accident_data.get_columns_with_initial_nan_values() 

In [None]:
cad_accident_data.display_columns_with_unknown_values()

In [None]:
cad_accident_data.missing_data_imputation()

In [None]:
cad_accident_data.overview_visualization("Accident Data")

In [None]:
accident_data = cad_accident_data.finalizie_cleaning()

### Vehicle Data Cleaning

In [None]:
cad_vehicle_data = CleanUpAnalyzeData(analysis.vehicles) #Initializing CLeaning and Overview Analysis to the vehicle data

In [None]:
cad_vehicle_data.get_columns_with_initial_nan_values()

In [None]:
cad_vehicle_data.display_columns_with_unknown_values()

In [None]:
cad_vehicle_data.missing_data_imputation()

In [None]:
cad_vehicle_data.overview_visualization("Vehicle Table")

In [None]:
vehicle_data = cad_vehicle_data.finalizie_cleaning()

### Casualty Data Cleaning

In [None]:
cad_casualty_data = CleanUpAnalyzeData(analysis.casualties) #Initializing CLeaning and Overview Analysis to the casualty data

In [None]:
cad_casualty_data.get_columns_with_initial_nan_values()

In [None]:
cad_casualty_data.display_columns_with_unknown_values()

In [None]:
cad_casualty_data.missing_data_imputation()

In [None]:
cad_casualty_data.overview_visualization("Casualty Table")

In [None]:
casualty_data = cad_casualty_data.finalizie_cleaning()

## Analysis 

In [None]:
!pip install openpyxl

In [None]:
guide_path = 'dft-road-casualty-statistics-road-safety-open-dataset-data-guide-2024.xlsx'

try:
    data_guide = pd.read_excel(guide_path)
    data_guide.columns = data_guide.columns.str.strip().str.lower()  # Standardize columns
    print(" 2024 data guide loaded.")
except Exception as e:
    print(f" Error loading data guide: {e}")


In [None]:
def get_label(field_name: str) -> dict:
    """
    Return a dictionary mapping code → label for a given field name.
    Works with the 2024 road safety guide and normalizes field names.
    """
    if 'field name' not in data_guide.columns or 'code/format' not in data_guide.columns or 'label' not in data_guide.columns:
        raise ValueError("Expected columns not found in data_guide.")

    match = data_guide[data_guide['field name'].str.strip().str.lower() == field_name.strip().lower()]
    
    if match.empty:
        print(f"No matching field found for '{field_name}'.")
        return {}
    
    # Build dictionary with safe keys (convert to int if possible)
    result = {}
    for _, row in match.iterrows():
        key = row['code/format']
        try:
            key = int(float(key))
        except:
            key = str(key).strip()
        result[key] = str(row['label']).strip()
    
    return result


In [None]:
created_plots = {}

def display_plot(ax, title, x, y,
                 show_x_bar=False, show_y_bar=False,
                 small=False, long=False, wide=False,
                 rotate_x=False, save_as=None):

    title_fontsize = 12
    label_fontsize = 10
    params_fontsize = 12
    legend_fontsize = 15
    sns.set_style('whitegrid')
    
    # plot sizing
    if long:
        size = (8, 10)
    elif wide:
        size = (18, 15)
        label_fontsize = 9
    elif small:
        size = (6, 6)
        title_fontsize = 8
        label_fontsize = 8
    else:
        size = (12, 12)

    fig = ax.get_figure()
    fig.set_size_inches(*size)

    # rotate x-axis
    if rotate_x:
        for label in ax.get_xticklabels():
            label.set_rotation(90)

    # annotate bars
    for p in ax.patches:
        if show_x_bar and not np.isnan(p.get_height()):
            ax.annotate(f'{round(p.get_height())}',
                        (p.get_x() + p.get_width() / 2, p.get_height()),
                        ha='center', va='bottom', fontsize=params_fontsize)
        if show_y_bar and not np.isnan(p.get_width()):
            ax.annotate(f'{round(p.get_width())}',
                        (p.get_width() + 5, p.get_y() + p.get_height() / 2),
                        va='center', fontsize=params_fontsize)

    ax.set_title(title, fontsize=title_fontsize)
    ax.set_xlabel(x, fontsize=label_fontsize)
    ax.set_ylabel(y, fontsize=label_fontsize)
    ax.tick_params(labelsize=params_fontsize)

    plt.tight_layout()
    created_plots[title] = fig

    if save_as:
        fig.savefig(save_as, dpi=300, bbox_inches='tight')

    return plt.show()


## Accident Analysis

Are there any particular hours of the day, and days of the week, on which these accidents are likely to occur?

In [None]:
#  Get the label mapping for 'day_of_week'
day_names = get_label('day_of_week')
day_order = list(day_names.values())

# Map the readable day names to the dataset
accident_data['day_name'] = accident_data['day_of_week'].map(day_names)

#  Plot the countplot
day_of_the_week_dist = sns.countplot(
    data=accident_data,
    x='day_name',
    order=day_order,
    palette='rocket_r',
    edgecolor='black',
    alpha=0.8
)

#  Annotate and display the plot
display_plot(day_of_the_week_dist, 
             'FIGURE 2: ACCIDENT DAY OF THE WEEK DISTRIBUTION', 
             'DAY', 'COUNT', 
             show_x_bar=True)



In [None]:
# describe day of the week
accident_data['day_of_week'].describe()

In [None]:
# plot accident time distribution
accident_data['decimal_time'] =  [int(date.split(':')[0]) + (int(date.split(':')[1]) / 60)  
                                                  for date in accident_data['time'] ]
day_of_the_time_dist = sns.histplot(accident_data, x='decimal_time', bins=20,
                                  edgecolor='black', alpha=0.4)

# initalize the display plot function
display_plot(day_of_the_time_dist, 'FIGURE 3: ACCIDENT TIME OF DAY DISTRIBUTION', 'TIME', 'COUNT', wide=True)

In [None]:
# get month from date
accident_data['month'] = pd.to_datetime(accident_data['date'], format='%d/%m/%Y').dt.month 

# plot accident severity by month
accident_data['accident_severity_label'] = accident_data['accident_severity'].map(
    get_label('accident_severity'))
accident_serverity_by_month = sns.countplot(x='month', hue='accident_severity_label', 
                                            data=accident_data, palette='Paired', 
                                            orient='v', dodge=False, edgecolor='black', alpha=0.8)
plt.legend(title='severity')

# initalize the display plot function
display_plot(accident_serverity_by_month, 'FIGURE 1: ACCIDENT SEVERITY BY MONTH', 'MONTH', 
             'COUNT', show_x_bar=True, wide = True)

In [None]:
# plot accident severity by day of the week
severity_day_of_the_week_dist = sns.countplot(data=accident_data, x='day_name',  
                                            hue='accident_severity_label',
                                            order=day_order, palette='Paired',
                                            edgecolor='black', dodge=False, alpha=0.8)
plt.legend(title='severity')

# initalize the display plot function
display_plot(severity_day_of_the_week_dist, 'ACCIDENT SEVERITY DAY OF THE WEEK', 'DAY', 
             'COUNT', show_x_bar=True, wide=True)

In [None]:
# plot accident severity based on time on day
severity_hour_dist = sns.histplot(data=accident_data, x='decimal_time', 
                                   hue='accident_severity_label', bins=20,
                                     palette='Paired', 
                                     legend= True, 
                                     edgecolor='black', multiple="stack")

# initalize the display plot function
display_plot(severity_hour_dist, 'ACCIDENT SEVERITY BY TIME OF DAY', 'TIME', 'COUNT', wide = True)

In [None]:
# plot accident severity based on road type
accident_data['road_type_label'] = accident_data['road_type'].map(
    get_label('road_type'))

accident_serverity_road_type = sns.countplot(x='road_type_label', hue='accident_severity_label', 
                                            data=accident_data, palette='Paired',
                                            orient='h', dodge=False, edgecolor='black', alpha=0.8)
plt.legend(title='severity')

# initalize the display plot function
display_plot(accident_serverity_road_type, 'FIGURE 5: ACCIDENT SEVERITY BY ROAD TYPE','COUNT', 
             'ROAD TYPE', wide=True)

In [None]:
# accident severity by roady type by weather
accident_data['light_conditions_label'] = accident_data['light_conditions'].map(
    get_label('light_conditions'))

accident_serverity_road_type = sns.countplot(y='light_conditions_label', hue='accident_severity_label', 
                                            data=accident_data, palette='Paired',
                                            orient='h', dodge=False, edgecolor='black', alpha=0.8)
plt.legend(title='severity')

# initalize the display plot function
display_plot(accident_serverity_road_type,'FIGURE 4: ACCIDENT SEVERITY BY LIGHT CONDITIONS', 
             'COUNT', 'LIGHT CONDITIONS', wide=True)

In [None]:
# accident severity by weather
accident_data['weather_conditions_label'] = accident_data['weather_conditions'].map(
    get_label('weather_conditions'))

accident_serverity_road_type = sns.countplot(y='weather_conditions_label', hue='accident_severity_label', 
                                            data=accident_data, palette='Paired',
                                            orient='h', dodge=False, edgecolor='black', alpha=0.8)
plt.legend(title='severity')

# initalize the display plot function
display_plot(accident_serverity_road_type, 'ACCIDENT SEVERITY BY WEATHER CONDITIONS','COUNT', 
             'WEATHER', wide =True)

### Accident & Casualty Analysis

In [None]:
# merge accident casualty
accident_casualty =  pd.merge(accident_data, casualty_data, on='accident_index')
print(f"Number of duplicates: {accident_casualty.duplicated().sum()}")

*For pedestrians involved in accidents, are there significant hours of the day, and days of the week, on which they are more likely to be involved?*

In [None]:
pedestrian_accident_hours_day = accident_casualty[accident_casualty["casualty_class"] == 3].copy()

In [None]:
# plot pedestrian accidents by day of week
accident_casualty['day_name'] = accident_casualty['day_of_week'].map(day_names)

pedestrian_accident_day_dist = sns.countplot(data=pedestrian_accident_hours_day,
                     x='day_name',
                     order=day_order, palette='rocket_r',
                     edgecolor='black', alpha=0.8)


display_plot(pedestrian_accident_day_dist, 'FIGURE 11: PEDESTRIAN ACCIDENT DAY OF THE WEEK DISTRIBUTION', 'DAY', 
                                            'COUNT', True)

In [None]:
# plot pedestrian accident hours
pedestrian_accident_hours_dist = sns.histplot(pedestrian_accident_hours_day, 
                                             x='decimal_time', bins=20, 
                                              kde=True,
                                             edgecolor='black', alpha=0.4)

# initalize the display plot function
display_plot(pedestrian_accident_hours_dist, 'FIGURE 12: PEDESTRIAN ACCIDENT TIME OF DAY DISTRIBUTION', 
             'TIME', 'COUNT', wide=True)

In [None]:
# describe pedestrain time
pedestrian_accident_hours_day['decimal_time'].describe()

In [None]:
# map sex of casualty label
accident_casualty['sex_label'] = accident_casualty['sex_of_casualty'].map(get_label('sex_of_casualty'))

# plot severity by sex
severity_by_sex =  sns.countplot(x='accident_severity_label', hue='sex_label', 
                                 data=accident_casualty, palette='Paired',
                                edgecolor='black', alpha=0.8)
plt.legend(title='Gender')

# initalize the display plot function
display_plot(severity_by_sex, 'ACCIDENT SEVERITY DISTRIBUTION BY GENDER', 'SEVERITY', 'COUNT', True, wide=True)

In [None]:
# map casualty class label
accident_casualty['class_label'] = accident_casualty['casualty_class'].map(get_label('casualty_class'))

casualty_by_sex =  sns.countplot(x='class_label', hue='sex_label', 
                                 data=accident_casualty, palette='Paired',
                                edgecolor='black', alpha=0.9, dodge=True)
plt.legend(title='Gender')

# initalize the display plot function
display_plot(casualty_by_sex, 'FIGURE 9: CASUALTY CLASS BY GENDER', 'CLASS', 'COUNT', True, wide=True)

In [None]:
# plot casualty by sex
casualty_by_sex =  sns.countplot(x='class_label', hue='accident_severity_label', 
                                 data=accident_casualty, palette='Paired',
                                edgecolor='black', alpha=0.8, dodge=True)
plt.legend(title='severity')

# initalize the display plot function
display_plot(casualty_by_sex, 'FIGURE 10: CASUALTY CLASS BY ACCIDENT SEVERITY', 'CLASS', 'COUNT', True, wide=True)

In [None]:
# add casualty severity 
# plot casualty by sex
accident_casualty['casualty_severity_label'] = accident_casualty['casualty_severity'].map(
    get_label('casualty_severity'))
casualty_by_sex =  sns.countplot(x='casualty_severity_label', hue='sex_label', 
                                 data=accident_casualty, palette='Paired',
                                edgecolor='black', alpha=0.8, dodge=True)
plt.legend(title='Gender')

# initalize the display plot function
display_plot(casualty_by_sex, 'CASUALTY SEVERITY BY GENDER', 'CLASS', 'COUNT', True, wide= True)

In [None]:
# ploting casulty class by severity
casualty_by_sex =  sns.countplot(x='class_label', hue='casualty_severity_label', 
                                 data=accident_casualty, palette='Paired',
                                edgecolor='black', alpha=0.8, dodge=True)
plt.legend(title='severity')

# initalize the display plot function
display_plot(casualty_by_sex, 'CASUALTY CLASS BY CASUALTY SEVERITY', 'CLASS', 'COUNT', True, wide = True)

In [None]:
# map casualty type label
accident_casualty['casualty_type_label'] = accident_casualty['casualty_type'].map(get_label('casualty_type'))

top_10_casualty_classes = accident_casualty['casualty_type_label'].value_counts().nlargest(10).index

# Filter the DataFrame to keep only the rows with the top 10 casualty classes
accident_casualty_top_10 = accident_casualty[accident_casualty['casualty_type_label'].isin(top_10_casualty_classes)]

# Create a stacked countplot using Seaborn with the filtered data
casualty_type_by_sex_top_10 = sns.countplot(y='casualty_type_label', hue='sex_label', 
                                            data=accident_casualty_top_10, palette='Paired', 
                                            orient='h', dodge=False, edgecolor='black', alpha=0.7)

plt.legend(title='Gender')

# initalize the display plot function
display_plot(casualty_type_by_sex_top_10, 'TOP 10 CASUALTY TYPE BY GENDER', 'COUNT', 'TYPE')

In [None]:
# mad age band label
accident_casualty['age_band_label'] = accident_casualty['age_band_of_casualty'].map(
    get_label('age_band_of_casualty'))

# define correct age order
agebandidx = ['0 - 5', '6 - 10',
              '11 - 15', '16 - 20', 
              '21 - 25', '26 - 35', 
              '36 - 45', '46 - 55',
              '56 - 65', '66 - 75', 
              'Over 75']

# plot casualty by sex age group
casualty_by_sex_age_group =  sns.countplot(x='age_band_label', hue='sex_label', 
                                 data=accident_casualty, palette='Paired', orient='h',
                                edgecolor='black', alpha=0.8, 
                                order=agebandidx, dodge=False)
plt.legend(title='Gender')

# initalize the display plot function
display_plot(casualty_by_sex_age_group, 'FIGURE 8: CASUALTY AGE BAND BY GENDER', 'AGE BAND', 'COUNT', wide =True)

In [None]:
# pedestrian movement by severity
pedestrian_accident_hours_day['pedestrian_movement_label'] = pedestrian_accident_hours_day['pedestrian_movement'].map(
    get_label('pedestrian_movement'))

casualty_by_pedestrian_movement =  sns.countplot(y='pedestrian_movement_label', hue='accident_severity_label', 
                                 data=pedestrian_accident_hours_day, palette='Paired',
                                edgecolor='black', alpha=0.8, dodge=True)
plt.legend(title='Accident Severity')

# initalize the display plot function
display_plot(casualty_by_pedestrian_movement, 'SEVERITY BY PEDESTRIAN MOVEMENT', 'CLASS', 'COUNT')


### Accidents & Vehicles Analysis

In [None]:
accident_vehicle = pd.merge(accident_data, vehicle_data, on='accident_index')
print(f"Number of duplicates: {accident_vehicle.duplicated().sum()}")

*For motorbikes, are there significant hours of the day, and days of the week, on which accidents occur? We suggest a focus on: Motorcycle 125cc and under, Motorcycle over 125cc and up to 500cc, and Motorcycle over 500cc.*

In [None]:
# group motorcycle accident
motorcycle_accident = accident_vehicle[accident_vehicle["vehicle_type"].isin([5, 4, 3])].copy()

In [None]:
# map motorcycle vehicle label
motorcycle_accident["vehicle_type_label"] = motorcycle_accident["vehicle_type"].map(get_label("vehicle_type"))

In [None]:
# Get the unique vehicle types in the motorcycle accident data
vehicle_types = motorcycle_accident['vehicle_type_label'].unique()

In [None]:
# Loop through each vehicle type
fig_count = 14
for i, vehicle_type in enumerate(vehicle_types):
    # Filter the data for the current vehicle type
    data_subset = motorcycle_accident[motorcycle_accident['vehicle_type_label'] == vehicle_type]

    # Create a countplot for the current vehicle type
    plot = sns.countplot(data=data_subset, x='day_name', order=day_order, 
                     palette='rocket_r',
                     edgecolor='black', 
                     alpha=0.8)
    fig_count += 1
    # initalize the display plot function
    display_plot(plot, f"FIGURE {fig_count}: {vehicle_type}", 'DAYS', 'COUNT', True)

In [None]:
# Loop through each vehicle type
for j, vehicle_type in enumerate(vehicle_types):
    # Filter the data for the current vehicle type
    data_subset = motorcycle_accident[motorcycle_accident['vehicle_type_label'] == vehicle_type]
    
    # plot motorcycle accident time
    plot_time = sns.histplot(data_subset, x='decimal_time', edgecolor='black', alpha=0.5, bins=20)
    
    fig_count += 1
    # initalize the display plot function
    display_plot(plot_time, f"FIGURE {fig_count}: {vehicle_type}",'TIME', 'COUNT', wide = True)

In [None]:
# map accident vehicle type
accident_vehicle['vehicle_type_label'] = accident_vehicle['vehicle_type'].map(get_label('vehicle_type'))

top_10_casualty_classes = accident_vehicle['vehicle_type_label'].value_counts().nlargest(10).index

# Filter the DataFrame to keep only the rows with the top 10 vehicle type
accident_vehicle_top_10 = accident_vehicle[accident_vehicle['vehicle_type_label'].isin(top_10_casualty_classes)]

# Create a stacked countplot using Seaborn with the filtered data
casualty_type_by_sex_top_10 = sns.countplot(y='vehicle_type_label', hue='accident_severity_label', 
                                            data=accident_vehicle_top_10, palette='Paired', 
                                            orient='h', dodge=False, edgecolor='black', alpha=0.8)
plt.legend(title='Severity')

# initalize the display plot function
display_plot(casualty_type_by_sex_top_10, 'FIGURE 14: TOP 10 ACCIDENT VEHICLES BY SEVERITY', 
             'COUNT', 'TYPE', wide = True)

In [None]:
# map driver age band label
accident_vehicle['age_band_of_driver_label'] = accident_vehicle['age_band_of_driver'].map(
    get_label('age_band_of_driver'))

agebandidx = ['0 - 5', '6 - 10',
              '11 - 15', '16 - 20', 
              '21 - 25', '26 - 35', 
              '36 - 45', '46 - 55',
              '56 - 65', '66 - 75', 
              'Over 75']


# plot vehivle driver band
vehicle_driver_band_severity =  sns.countplot(x='age_band_of_driver_label', hue='accident_severity_label', 
                                 data=accident_vehicle, palette='Paired', orient='h',
                                edgecolor='black', alpha=0.8, 
                                order=agebandidx, dodge=True)
plt.legend(title='severity')

# initalize the display plot function
display_plot(vehicle_driver_band_severity, 'SEVERITY BY DRIVER AGE BAND', 'AGE BAND', 'COUNT', wide =True)

In [None]:
# Plotting the age band and vehicle type then calculate the counts
grouped = accident_vehicle.groupby(['age_band_of_driver_label', 'vehicle_type_label']).size().unstack()

# Reorder the index of 'grouped' DataFrame
age_band_order = agebandidx
grouped = grouped.reindex(age_band_order)

# Set the Seaborn color palette
sns.set_palette("colorblind")

# Create a figure with a 2x3 grid of subplots
fig, axs = plt.subplots(2, 3, figsize=(25, 18))

# Flatten the axs array to easily access individual subplots
axs = axs.flatten()

# Loop through each segment and plot the corresponding data
for i in range(6):
    start_col = i * len(grouped.columns) // 6
    end_col = (i + 1) * len(grouped.columns) // 6
    grouped.iloc[:, start_col:end_col].plot(kind='bar', stacked=True, ax=axs[i])
    axs[i].set_xlabel('Age Band')
    axs[i].set_ylabel('Count')
    axs[i].set_title(f'Figure 13: Vehicles Types by Age Band (Part {i+1})')
    axs[i].set_xticklabels(grouped.index, rotation=45)
    axs[i].legend(title='Vehicle Type')

# Adjust layout and show the plot

plt.tight_layout()
plt.savefig('accident_vehicle_type_plot.png', dpi=700, bbox_inches='tight', format='png', 
                                       transparent = True)

plt.show()


In [None]:
# map sex of driver label
accident_vehicle['sex_label'] = accident_vehicle['sex_of_driver'].map(
    get_label('sex_of_driver'))

# plot vehicle driver
vehicle_driver_band_gender =  sns.countplot(x='age_band_of_driver_label', hue='sex_label', 
                                 data=accident_vehicle, palette='Paired', orient='h',
                                edgecolor='black', alpha=0.7, 
                                order=agebandidx, dodge=False)
plt.legend(title='Gender')

# initalize the display plot function
display_plot(vehicle_driver_band_gender, 'DRIVER AGE BAND BY GENDER', 'AGE BAND', 'COUNT', wide = True)

In [None]:
# map  direction
accident_vehicle['vehicle_direction_fr_label'] = accident_vehicle['vehicle_direction_from'].map(
                                                get_label('vehicle_direction_from'))
# plot severity by direction
accident_vehicle_direction_from =  sns.countplot(y='vehicle_direction_fr_label', 
                                                hue='accident_severity_label', 
                                                orient='h',
                                                data=accident_vehicle, palette='Paired',
                                                edgecolor='black', alpha=0.8, dodge=False)

    
plt.legend(title='severity')

# initalize the display plot function
display_plot(accident_vehicle_direction_from, 'ACCIDENT SEVERITY BY DIRECTION FROM', 
             'COUNT', 'DIRECTION FROM', show_y_bar=True, wide= True)

## Clustering

In [None]:
humber_side_accidents = accident_data[accident_data['police_force']== 16].copy()
humber_side_accidents.head()

In [None]:
longitude_and_latitude = humber_side_accidents.loc[:, ['longitude', 'latitude']]
longitude_and_latitude

In [None]:
# using the elbow method to determine the best k for geographic clustering
max_k = 5
K = range(1,max_k+5, 1)
## iterations
distortions = []

for i in K:
    model = KMeans(n_clusters=i, init='k-means++', max_iter=300, n_init=10, random_state=0)
    model.fit(longitude_and_latitude)
    distortions.append(model.inertia_)

elbow_plot = sns.lineplot(x=K, y=distortions, marker='x', linestyle='-', color='blue')
display_plot(elbow_plot, 'FIGURE 23: ELBOW METHOD', 'NUMBER OF CLUSTERS', 'DISTORTION')

In [None]:
# defines K-Means
kmeans = KMeans(n_clusters = 5, random_state = 42)
kmeans.fit(longitude_and_latitude)

In [None]:
# predict labels
labels = kmeans.predict(longitude_and_latitude)
centroids = kmeans.cluster_centers_

In [None]:
labels[0:5]

In [None]:
centroids

In [None]:
locationtuple = np.array(list(zip(longitude_and_latitude['longitude'], longitude_and_latitude['latitude'])))
locationtuple

In [None]:
long_lat_severity = sns.scatterplot(data=humber_side_accidents, x="longitude", y="latitude", 
                                    hue="accident_severity_label",
                                    style="accident_severity_label", 
                                    legend="full", s=50, palette="Set1", rasterized = True)
sns.scatterplot(x=centroids[:,0], y=centroids[:,1], color = 'black', s=120)

display_plot(long_lat_severity, 'FIGURE 24: LONGITUDE LATITUDE ACCIDENT SEVERITY HUE',
             'LONGITUDE', 'LATITUDE', long=True)

In [None]:
humber_side_accidents["urban_or_rural_area_labels"] = humber_side_accidents["urban_or_rural_area"].map(
                                                                get_label("urban_or_rural_area"))
long_lat_urban = sns.scatterplot(data=humber_side_accidents, x="longitude", y="latitude", 
                                    hue="urban_or_rural_area_labels",
                                    s=50, palette="mako", rasterized = True)
sns.scatterplot(x=centroids[:,0], y=centroids[:,1], color = 'black', s=120)

display_plot(long_lat_urban, 'FIGURE 25: LONGITUDE LATITUDE ACCIDENT URBAN & RURAL', 
             'LONGITUDE', 'LATITUDE', long=True)

In [None]:
contamination = [0.0025, 0.001]
model = LocalOutlierFactor(n_neighbors=30,contamination=contamination[0])
y_pred = model.fit_predict(longitude_and_latitude)

LOF_Scores = model.negative_outlier_factor_
LOF_pred = pd.Series(y_pred).replace([-1,1], [1,0])
LOF_pred.value_counts()

In [None]:
LOF_anomalies = longitude_and_latitude[LOF_pred.values == 1]
LOF_anomalies

In [None]:
long_lat_anomailes = sns.scatterplot(data=humber_side_accidents, x="longitude", y="latitude", 
                                      palette="husl")
sns.scatterplot(x=LOF_anomalies['longitude'], y=LOF_anomalies['latitude'], alpha = 0.9, marker='o', color='red')


display_plot(long_lat_anomailes, 'HUMBER SIDE LONGITUDE LATITUDE ACCIDENT ANOMALIES', 
             'LONGITUDE', 'LATITUDE', long=True)

## Viewining the cluster location on the humber side map

In [None]:
latitude, longitude = 53.574501, -0.095008
# Number of clusters you want to create
num_clusters = 5

# Select only the longitude and latitude columns for clustering
X = humber_side_accidents[['longitude', 'latitude']]
humber_side_accidents['cluster'] = labels

# List comprehension to make out list of lists
heat_data = [[row['latitude'],row['longitude']] for index, row in longitude_and_latitude.iterrows()]

map_cluster = folium.Map(location=[latitude, longitude], zoom_start = 8.8)
HeatMap(X.values).add_to(map_cluster)

# Define cluster colors
cluster_colors = ['yellow', 'green', 'brown', 'blue', 'orange']

# Define hue colors (e.g., for different accident severity)
hue_colors = {
    'Fatal': '#4daf4a',
    'Slight': '#377eb8',
    'Serious': '#e41a1c'
    # Add more severity and their corresponding colors here
}

# Add the points to the map with corresponding colors for each cluster and hue for each severity
for cluster_id in range(num_clusters):
    cluster_data = humber_side_accidents[humber_side_accidents['cluster'] == cluster_id]
    for idx, row in cluster_data.iterrows():
        # Get the severity for the current point
        category = row['accident_severity_label']
        
        # Choose the color for the point based on cluster and hue (severity)
        color = cluster_colors[cluster_id]
        if category in hue_colors:
            color = hue_colors[category]
        
            folium.CircleMarker(
                location=[row['latitude'], row['longitude']],
                radius=5,
                color=cluster_colors[cluster_id],
                fill=True,
                fill_color=color,
                fill_opacity=0.8
            ).add_to(map_cluster)


In [None]:
map_cluster.save('humberside_cluster_map.html')

In [None]:
map_cluster

## Analyzing Accident Severity with Apriori Algorithm

In [None]:
# creating data encoding for Apriori Algorithm.
accident_details_encoding = pd.DataFrame()

# list of selected columns
selected_columns = ['accident_severity', 'road_type', 
                    'junction_detail', 'vehicle_type', 
                    'speed_limit', 'first_point_of_impact', 
                    'number_of_vehicles', 'number_of_casualties']

# looping over columns to encode data
encoded_data = []
for column in selected_columns:
    dummies = pd.get_dummies(accident_vehicle[column], prefix = column)
    encoded_data.append(dummies)
    
accident_details_encoding = pd.concat(encoded_data, axis=1)
    

In [None]:
# first 5 encoded data
accident_details_encoding.head()

In [None]:
# define list of frequent set
frequent_set = apriori(accident_details_encoding, min_support=0.4, use_colnames=True)

In [None]:
frequent_set

In [None]:
# defining rules
rules_by_lift = association_rules(frequent_set, metric='lift', min_threshold=1)
rules_by_lift.head(10)

In [None]:

plt.figure(figsize=(10, 6))
plt.scatter(rules_by_lift['support'], rules_by_lift['lift'], alpha=0.7, edgecolor='black')
plt.title('FIGURE: Association Rule Strength (Lift vs Support)')
plt.xlabel('Support')
plt.ylabel('Lift')
plt.grid(True)
plt.show()


## Time Series 
create a separate time series model for each policing area chosen to predict weekly accident counts for 2019 based on historical data from 2017 to 2018. 

In [None]:
accident_data_v1 = DatabaseUtil('accident_data_v1.0.0_2023.db')
accident_data_v1.connect_database()

In [None]:
accidents_all_years = accident_data_v1.tabulate_query("""
    SELECT *
    FROM accident
    WHERE accident_year IN (2017, 2018, 2019)
""")

In [None]:
accidents_all_years.head()
accidents_all_years['accident_year'].value_counts()

In [None]:
# convert date column to datetime format
accidents_all_years['date'] = pd.to_datetime(accidents_all_years['date'], format='%d/%m/%Y')

In [None]:
# Create weekly buckets
accidents_all_years['week'] = accidents_all_years['date'].dt.to_period('W').apply(lambda r: r.start_time)

In [None]:
get_label("police_force")

In [None]:
# Grouop by week and police force
selected_police_forces = [1, 6, 17]

weekly_grouped = (
    accidents_all_years[accidents_all_years['police_force'].isin(selected_police_forces)]
    .groupby(['police_force', 'week'])
    .size()
    .reset_index(name='weekly_accidents')
)
weekly_grouped.head()

In [None]:
from statsmodels.tsa.holtwinters import ExponentialSmoothing
import matplotlib.pyplot as plt

# Pick one force
pf = 1
data_pf = weekly_grouped[weekly_grouped['police_force'] == pf].set_index('week')

# Split into train (2017-2018) and test (2019)
train = data_pf[data_pf.index.year < 2019]
test = data_pf[data_pf.index.year == 2019]

# Fit model
model = ExponentialSmoothing(train['weekly_accidents'], trend='add', seasonal='add', seasonal_periods=52).fit()

# Forecast for 2019
forecast = model.forecast(len(test))

# Plot
plt.figure(figsize=(12, 5))
plt.plot(train.index, train['weekly_accidents'], label='Train')
plt.plot(test.index, test['weekly_accidents'], label='Actual 2019')
plt.plot(test.index, forecast, label='Forecast 2019')
plt.title(f"Police Force {pf} Weekly Accident Forecast")
plt.legend()
plt.show()


In [None]:
for pf in [1, 6, 17]:
    data_pf = weekly_grouped[weekly_grouped['police_force'] == pf].set_index('week')
    train = data_pf[data_pf.index.year < 2019]
    test = data_pf[data_pf.index.year == 2019]
    
    model = ExponentialSmoothing(train['weekly_accidents'], trend='add', seasonal='add', seasonal_periods=52).fit()
    forecast = model.forecast(len(test))
    
    plt.figure(figsize=(12, 5))
    plt.plot(train.index, train['weekly_accidents'], label='Train')
    plt.plot(test.index, test['weekly_accidents'], label='Actual 2019')
    plt.plot(test.index, forecast, label='Forecast 2019')
    plt.title(f"Police Force {pf} Weekly Accident Forecast")
    plt.legend()
    plt.show()


In [None]:
from sklearn.metrics import mean_absolute_error, mean_squared_error

def forecast_weekly_by_force(weekly_grouped, forces=[1, 6, 17], seasonal_periods=52):
    results = []

    for pf in forces:
        df_pf = weekly_grouped[weekly_grouped['police_force'] == pf].set_index('week').sort_index()

        train = df_pf[df_pf.index.year < 2019]['weekly_accidents']
        test  = df_pf[df_pf.index.year == 2019]['weekly_accidents']

        if len(train) < seasonal_periods + 5 or len(test) == 0:
            print(f"Police Force {pf}: Not enough data to model. Skipping.")
            continue

        # Fit Holt-Winters model
        model = ExponentialSmoothing(train, trend='add', seasonal='add', seasonal_periods=seasonal_periods)
        model_fit = model.fit(optimized=True)
        forecast = model_fit.forecast(len(test))

        # Metrics
        mae = mean_absolute_error(test, forecast)
        rmse = np.sqrt(mean_squared_error(test, forecast))
        results.append({'Police Force': pf, 'MAE': mae, 'RMSE': rmse})
        print(f"Police Force {pf} — MAE: {mae:.2f}, RMSE: {rmse:.2f}")

    return pd.DataFrame(results).sort_values(by='RMSE')


        
# Run the function
metrics_df = forecast_weekly_by_force(weekly_grouped, forces=[1, 6, 17])
display(metrics_df)


## LSOA forecasting for the city of hull

In [None]:
accident_data['date'] = pd.to_datetime(accident_data['date'], format='%d/%m/%Y')

# Filter just for Hull (local_authority_district 31 = Kingston upon Hull)
hull_lsoas_only = accident_data[
    (accident_data['accident_year'] == 2019) &
    (accident_data['date'].between('2019-01-01', '2019-03-31')) &
    (accident_data['local_authority_district'] == 31)
]

# Get top 30 LSOAs based on accident count
top_30_lsoas = (
    hull_lsoas_only['lsoa_of_accident_location']
    .value_counts()
    .nlargest(30)
    .index
)


In [None]:
# Filter Jan–June 2019 data for top 30 LSOAs
hull_top30_jan_jul = accident_data[
    (accident_data['accident_year'] == 2019) &
    (accident_data['lsoa_of_accident_location'].isin(top_30_lsoas)) &
    (accident_data['local_authority_district'] == 31)
]

# Aggregate daily accident counts
daily_counts = (
    hull_top30_jan_jul
    .groupby('date')
    .size()
    .reindex(pd.date_range('2019-01-01', '2019-07-31'), fill_value=0)
    .rename('daily_accidents')
    .to_frame()
)

In [None]:
from statsmodels.tsa.holtwinters import ExponentialSmoothing

# Split data
train = daily_counts[:'2019-06-30']
test = daily_counts['2019-07-01':]

# Fit model (additive trend & seasonality — daily)
model = ExponentialSmoothing(train['daily_accidents'], trend='add', seasonal=None)
model_fit = model.fit()

# Forecast July
forecast = model_fit.forecast(len(test))


In [None]:
plt.figure(figsize=(14, 6))
plt.plot(train.index, train['daily_accidents'], label='Train (Jan–Jun)', color='blue')
plt.plot(test.index, test['daily_accidents'], label='Actual (July)', color='black')
plt.plot(test.index, forecast, label='Forecast (July)', linestyle='--', color='orange')
plt.title('Daily Accident Forecast for Hull Top 30 LSOAs – July 2019')
plt.xlabel('Date'); plt.ylabel('Accident Count')
plt.legend()
plt.tight_layout()
plt.show()

In [None]:
mae = mean_absolute_error(test['daily_accidents'], forecast)
rmse = np.sqrt(mean_squared_error(test['daily_accidents'], forecast))

print(f"MAE: {mae:.2f}, RMSE: {rmse:.2f}")

In [None]:
accident_data_v1.disconnect_database() # Disconecting the Database

In [None]:
#### Save images.

In [None]:
def save_created_plots(created_plots):
    folder_path = 'plot_visualisation' # Create plot destination.

    try: 
        if not os.path.exists(folder_path): # Check if folder already exists
            os.mkdir(folder_path)

        for plot_name, plot_figure in created_plots.items():
            plot_figure.figure.savefig('./' + folder_path + '/' + plot_name +'.png',
                                       dpi=500, bbox_inches='tight', format='png', 
                                       transparent = True)

        print(f"Saved Created Plots, see the '{folder_path}' folder for images.")
    except OSError as error: 
        print(error) 
    

save_created_plots(created_plots)