# CMF Computation Tool

In [1]:
import pandas as pd

In [2]:
# Read all crash data
D1_data = pd.read_excel('data/D1_Phonolite_Input.xlsx', sheet_name = "Formatting Data")
D2_data = pd.read_excel('data/D2_LWA_Input.xlsx', sheet_name = "Formatting Data")
D6_data = pd.read_excel('data/D6_HFST_Input.xlsx', sheet_name = "Formatting Data")

# Read relevant curve data
D1_curve_data = pd.read_excel('data/D1_Phonolite_Input.xlsx', sheet_name = "Curve Info", )
D6_curve_data = pd.read_excel('data/D6_HFST_Input.xlsx', sheet_name = "Curve Info")

# Create Filter Datasets
D1_single_vehicle = D1_data[D1_data["Single Vehicle"] == "Yes"]
D1_curve_crashes = D1_data[D1_data["Vehicle_Ma"].str.contains("Negotiating a Curve")]
D1_wet_road = D1_data[D1_data["Surface_Co"] == "Wet"]

D6_single_vehicle = D6_data[D6_data["Single Vehicle"] == "Yes"]
D6_curve_crashes = D6_data[D6_data["Vehicle_Ma"].str.contains("Negotiating a Curve")]
D6_wet_road = D6_data[D6_data["Surface_Co"] == "Wet"]

In [3]:
# Import SPF coefficients
D1_total_coeff = pd.read_excel('data/D1_Phonolite_Input.xlsx', sheet_name = "Total Crashes SPF Coefficients", index_col = 0)
D1_single_vehicle_coeff = pd.read_excel('data/D1_Phonolite_Input.xlsx', sheet_name = "Single Vehicle SPF Coefficients", index_col = 0)
D1_curve_crashes_coeff = pd.read_excel('data/D1_Phonolite_Input.xlsx', sheet_name = "Curve Crashes SPF Coefficients", index_col = 0)
D1_wet_road_coeff = pd.read_excel('data/D1_Phonolite_Input.xlsx', sheet_name = "Wet Road SPF Coefficients", index_col = 0)

D6_total_coeff = pd.read_excel('data/D6_HFST_Input.xlsx', sheet_name = "Total Crashes SPF Coefficients", index_col = 0)
D6_single_vehicle_coeff = pd.read_excel('data/D6_HFST_Input.xlsx', sheet_name = "Single Vehicle SPF Coefficients", index_col = 0)
D6_curve_crashes_coeff = pd.read_excel('data/D6_HFST_Input.xlsx', sheet_name = "Curve Crashes SPF Coefficients", index_col = 0)
D6_wet_road_coeff = pd.read_excel('data/D6_HFST_Input.xlsx', sheet_name = "Wet Road SPF Coefficients", index_col = 0)

## Compute Naive CMFs

In [4]:
def naive_CMF(data: pd.DataFrame, years_before_treatment=4, years_after_treatment=3):
    freq_before = (len(data[data["Relation To Treatment"] == "before treatment"].index)) / years_before_treatment
    freq_after = len(data[data["Relation To Treatment"] == "after treatment"].index) / years_after_treatment
    if freq_before == 0:
        return 1
    return freq_after/freq_before

## Compute Empirical Bayes CMFs

**Helper Functions**

In [21]:
def count_curve_crashes(data: pd.DataFrame, curve_data: pd.DataFrame):
    # Helper function to calculate the total number of crashes on each curve for curve rating purposes
    
    # Finding crashes before treatment
    before_data = data[data["Relation To Treatment"] == "before treatment"]
    before_crash_counts = before_data.groupby("CurveID")["CurveID"].count()
    curve_data = pd.merge(curve_data, before_crash_counts.to_frame("Crashes Before"),  on="CurveID", how="left")

    # Finding crashes after treatment
    after_data = data[data["Relation To Treatment"] == "after treatment"]
    after_crash_counts = after_data.groupby("CurveID")["CurveID"].count()
    curve_data = pd.merge(curve_data, after_crash_counts.to_frame("Crashes After"),  on="CurveID", how="left")

    # Finding unknown crashes
    unknown_data = data[data["Relation To Treatment"] == "unknown"]
    unknown_crash_counts = unknown_data.groupby("CurveID")["CurveID"].count()
    curve_data = pd.merge(curve_data, unknown_crash_counts.to_frame("Unknown Crashes"),  on="CurveID", how="left")

    # Finding total crashes
    total_crash_counts = data.groupby("CurveID")["CurveID"].count()
    curve_data = pd.merge(curve_data, total_crash_counts.to_frame("Total Crashes"),  on="CurveID", how="left")
    
    # Making sure that categories with 0 crashes shows up as 0 crashes and not as NaN crashes
    curve_data.fillna(0, inplace=True)
    return curve_data

def calculate_frequencies(curve_data: pd.DataFrame, years_before_treatment, years_after_treatment):
    # Helper function to calculate crash frequencies on curves
    
    # Calculate crash frequencies before treatment
    crash_frequency_before = curve_data["Crashes Before"] / years_before_treatment
    curve_data = pd.merge(curve_data, crash_frequency_before.to_frame("Crash Frequency Before"), left_index=True, right_index=True)
    
    # Calculate crash frequencies after treatment
    crash_frequency_after = curve_data["Crashes After"] / years_after_treatment
    curve_data = pd.merge(curve_data, crash_frequency_after.to_frame("Crash Frequency After"), left_index=True, right_index=True)    
    
    return curve_data

def calculate_curve_ratings(curve_data: pd.DataFrame):
    # Helper function to calculate curve AADT and crash frequency ratings
    
    # Clean through curve data
    curve_AADTs = curve_data["Average AADT"]
    curve_crash_counts = curve_data["Total Crashes"]
    
    # Calculate the curve AADT and crash ratings, and make sure to output the bin boundaries
    AADT_ratings, AADT_bins = pd.qcut(curve_AADTs, 3, ["Low AADT", "Medium AADT", "High AADT"], retbins=True)
    crash_ratings, crash_bins = pd.qcut(curve_crash_counts, 2, ["Low crash frequency", "High crash frequency"], retbins=True)
    
    # Join the calculated ratings to the dataset
    curve_data = pd.merge(curve_data, AADT_ratings.to_frame("AADT Rating"), left_index=True, right_index=True)
    curve_data = pd.merge(curve_data, crash_ratings.to_frame("Crash Frequency Rating"), left_index=True, right_index=True)

    return curve_data, AADT_bins, crash_bins

def calculate_SPF_frequencies(curve_data: pd.DataFrame, coefficients: pd.DataFrame):
    # Helper function to calculate SPF predicted crash frequencies for before and after treatment
    
    # Define the SPF coefficients for legibility
    years = coefficients.loc["Years"][0]
    intercept = coefficients.loc["(Intercept)"][0] / years
    divided = coefficients.loc["divided"][0] / years
    ln_deflection_angle = coefficients.loc["log(deflection_angle)"][0] / years
    length = coefficients.loc["length"][0] / years
    ln_AADT = coefficients.loc["log(AADT)"][0] / years
    
    # Calculate the SPF frequency before
    SPF_before = intercept + (curve_data["Divided"] * divided) + (curve_data["log(devangle)"] * ln_deflection_angle) + (curve_data["Curve Length"] * length) + (curve_data["log(AADT Before)"] * ln_AADT)
    curve_data = pd.merge(curve_data, SPF_before.to_frame("SPF Frequency Before"), left_index=True, right_index=True)
    
    # Calculate the SPF frequency after
    SPF_after = intercept + (curve_data["Divided"] * divided) + (curve_data["log(devangle)"] * ln_deflection_angle) + (curve_data["Curve Length"] * length) + (curve_data["log(AADT After)"] * ln_AADT)
    curve_data = pd.merge(curve_data, SPF_after.to_frame("SPF Frequency After"), left_index=True, right_index=True)
    
    return curve_data

def calculate_cumulative_values(curve_data: pd.DataFrame, coefficients: pd.DataFrame):
    # Helper function to find aggregate values
    
    # Calculation of cumulative frequencies
    total_frequency_before = curve_data["Crash Frequency Before"].sum()
    total_frequency_after = curve_data["Crash Frequency After"].sum()
    total_spf_frequency_before = curve_data["SPF Frequency Before"].sum()
    total_spf_frequency_after = curve_data["SPF Frequency After"].sum()
    
    # Calculation of weight of SPF
    dispersion = coefficients.loc["Dispersion"][0]
    weight_of_spf = 1 / (1 + dispersion * total_spf_frequency_before)

    # Calculation of expected crashes before and after
    expected_crashes_before = (weight_of_spf * total_spf_frequency_before) + (1 - weight_of_spf) * (total_frequency_before)
    comparison_ratio = total_spf_frequency_after / total_spf_frequency_before
    expected_crashes_after = expected_crashes_before * comparison_ratio
    
    # Collect all calculated values into a dictionary
    cumulative_dict = {"Total Frequency Before" : total_frequency_before,
                       "Total Frequency After" : total_frequency_after,
                       "Total SPF Frequency Before" : total_spf_frequency_before,
                       "Total SPF Frequency After" : total_spf_frequency_after,
                       "Weight of SPF" : weight_of_spf,
                       "Expected Crashes Before" : expected_crashes_before,
                       "Comparison Ratio" : comparison_ratio,
                       "Expected Crashes After" : expected_crashes_after,
                      }
    
    return cumulative_dict

def calculate_final_outputs(cumulative_df: pd.DataFrame):
    # Helper function to calculate variance, CMF, and standard error
    
    # Calculate variance of expected after
    variance_of_expected_after = cumulative_dict["Expected Crashes After"] * cumulative_dict["Comparison Ratio"] * (1 - cumulative_dict["Weight of SPF"])
    
    # Calculate CMF
    empirical_bayes_cmf = (cumulative_dict["Total Frequency After"] / cumulative_dict["Expected Crashes After"]) / (1 + (variance_of_expected_after / (cumulative_dict["Expected Crashes After"] ** 2)))
    
    # Calculate variance of total frequency after
    # variance_of_total_frequency_after = ?????
    
    # Calculate standard deviation of CMF
    standard_deviation = ((empirical_bayes_cmf ** 2) *
                            (
                                ((variance_of_expected_after / (cumulative_dict["Expected Crashes After"] ** 2)) + (variance_of_total_frequency_after / (cumulative_dict["Total Frequency After"] ** 2)))
                                /
                                ((1 + variance_of_expected_after / (cumulative_dict["Expected Crashes After"] ** 2) ** 2))
                            )
                         ) ** 0.5
    
    return None

**Empirical Bayes Function**

In [9]:
def empirical_bayes_CMF(data: pd.DataFrame, curve_data: pd.DataFrame, coefficients: pd.DataFrame, years_before_treatment=4, years_after_treatment=3):
    curve_data = count_curve_crashes(data, curve_data)
    curve_data = calculate_frequencies(curve_data, years_before_treatment, years_after_treatment)
    curve_data, AADT_bins, crash_bins = calculate_curve_ratings(curve_data)
    curve_data = calculate_SPF_frequencies(curve_data, coefficients)
    cumulative_df = calculate_cumulative_values(curve_data, coefficients)

    return None

In [22]:
# curve = D1_curve_data["CurveID"].iloc[0]
# test = D1_curve_data.query("`CurveID` == @curve")
# display(test)

data = D1_data.copy()
curve_data = D1_curve_data.copy()
coefficients = D1_total_coeff
years_before_treatment = 4
years_after_treatment = 3

curve_data = count_curve_crashes(data, curve_data)
curve_data = calculate_frequencies(curve_data, years_before_treatment, years_after_treatment)
curve_data, AADT_bins, crash_bins = calculate_curve_ratings(curve_data)
curve_data = calculate_SPF_frequencies(curve_data, coefficients)
cumulative_dict = calculate_cumulative_values(curve_data, coefficients)

variance_of_expected_after = cumulative_dict["Expected Crashes After"] * cumulative_dict["Comparison Ratio"] * (1 - cumulative_dict["Weight of SPF"])
empirical_bayes_cmf = (cumulative_dict["Total Frequency After"] / cumulative_dict["Expected Crashes After"]) / (1 + (variance_of_expected_after / (cumulative_dict["Expected Crashes After"] ** 2)))

variance_of_total_frequency_after = 1
standard_deviation = ((empirical_bayes_cmf ** 2) *
                        (
                            ((variance_of_expected_after / (cumulative_dict["Expected Crashes After"] ** 2)) + (variance_of_total_frequency_after / (cumulative_dict["Total Frequency After"] ** 2)))
                            /
                            ((1 + variance_of_expected_after / (cumulative_dict["Expected Crashes After"] ** 2) ** 2))
                        )
                     ) ** 0.5

display(variance_of_expected_after, empirical_bayes_cmf, standard_deviation)
display(curve_data)

43.09607025131798

0.9125340545939149

0.14172321653487313

Unnamed: 0,CurveID,Divided,Deflection Angle,Curve Length,Average AADT After,Average AADT Before,BBI 95th,Speed Diff,Average AADT,log(devangle),...,Crashes Before,Crashes After,Unknown Crashes,Total Crashes,Crash Frequency Before,Crash Frequency After,AADT Rating,Crash Frequency Rating,SPF Frequency Before,SPF Frequency After
0,1,0,61.2,1245.6,3493.333333,3170.0,6.57,0.0,3331.666667,4.114147,...,0.0,1.0,0.0,1,0.00,0.333333,Medium AADT,Low crash frequency,0.312141,0.321154
1,2,0,77.4,1108.0,2526.666667,1977.5,2.85,1.1,2252.083333,4.348987,...,3.0,0.0,1.0,4,0.75,0.000000,Medium AADT,High crash frequency,0.272302,0.295044
2,3,0,56.8,765.9,2990.000000,2582.5,10.68,6.3,2786.250000,4.039536,...,2.0,2.0,0.0,4,0.50,0.666667,Medium AADT,High crash frequency,0.275330,0.288927
3,4,0,51.0,869.8,2990.000000,2582.5,6.72,0.0,2786.250000,3.931826,...,4.0,0.0,2.0,6,1.00,0.000000,Medium AADT,High crash frequency,0.274805,0.288402
4,7,0,47.7,676.2,476.666667,430.0,6.96,0.0,453.333333,3.864931,...,1.0,0.0,0.0,1,0.25,0.000000,Low AADT,Low crash frequency,0.099961,0.109522
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
63,89,0,42.1,465.6,1663.333333,1135.0,6.96,0.0,1399.166667,3.740048,...,2.0,1.0,0.0,3,0.50,0.333333,Low AADT,High crash frequency,0.178967,0.214435
64,91,0,55.1,419.6,1663.333333,1135.0,13.25,10.4,1399.166667,4.009150,...,0.0,1.0,0.0,1,0.00,0.333333,Low AADT,Low crash frequency,0.187025,0.222492
65,92,0,90.5,466.4,1663.333333,1135.0,13.03,10.4,1399.166667,4.505350,...,6.0,10.0,2.0,18,1.50,3.333333,Low AADT,High crash frequency,0.206040,0.241508
66,93,0,47.2,570.7,1663.333333,1135.0,4.28,0.0,1399.166667,3.854394,...,1.0,1.0,0.0,2,0.25,0.333333,Low AADT,Low crash frequency,0.186328,0.221795


In [7]:
naive_CMF(D6_data)

0.6806387225548902

### Total CMF

In [None]:
# pivot table
total_cmf_table = get_pivot_counts(data=crash_data, field='Relation_To_HFST_Treatment_No_Covid')
total_cmf_table


In [None]:
# cmf results
total_cmf_table = get_pivot_counts(data=crash_data, field='Relation_To_HFST_Treatment_No_Covid')
total_cmf = compute_cmf(total_cmf_table, years_after_treatment=3, years_before_treatment=3)
print("total cmf without covid data: ", total_cmf.round(2))

total_cmf_table = get_pivot_counts(data=crash_data, field='Relation_To_HFST_Treatment')
total_cmf = compute_cmf(total_cmf_table, years_after_treatment=4, years_before_treatment=3)
print("total cmf with covid data: ", total_cmf.round(2))


### Single Vehicle CMF


In [None]:
# cmf results
filtered_data = crash_data[crash_data['Single_Vehicle']]
pivot_table = get_pivot_counts(data=filtered_data, field='Relation_To_HFST_Treatment_No_Covid')
cmf = compute_cmf(pivot_table, years_after_treatment=3, years_before_treatment=3)
print("total cmf without covid data: ", cmf.round(2))

filtered_data = crash_data[crash_data['Single_Vehicle']]
pivot_table = get_pivot_counts(data=filtered_data, field='Relation_To_HFST_Treatment')
cmf = compute_cmf(pivot_table, years_after_treatment=4, years_before_treatment=3)
print("total cmf with covid data: ", cmf.round(2))


### Surface Condition

In [None]:
# cmf results
filtered_data = crash_data[crash_data['Surface_Co']=='Wet']
pivot_table = get_pivot_counts(data=filtered_data, field='Relation_To_HFST_Treatment_No_Covid')
cmf = compute_cmf(pivot_table, years_after_treatment=3, years_before_treatment=3)
print("total cmf without covid data: ", cmf.round(2))

filtered_data = crash_data[crash_data['Surface_Co']=='Wet']
pivot_table = get_pivot_counts(data=filtered_data, field='Relation_To_HFST_Treatment')
cmf = compute_cmf(pivot_table, years_after_treatment=4, years_before_treatment=3)
print("total cmf with covid data: ", cmf.round(2))
