# Accident Propensity Index Calculation v16

Working with 0 and 1 values, only using 1 values for calculations, outputting both. also outputting api and api hex color codes. also outputting number of accidents, weather conditions, and times of accidents.

In [1]:
# Import relevant libraries
import pandas as pd
import math
import numpy as np
import json
import os.path

In [2]:
# function to calculate the distance between two points
def distance(point1, point2):
    lat1, lon1 = point1
    lat2, lon2 = point2
    km_per_lat = 110.574
    km_per_lon = 111.320
    dx = (lon2 - lon1) * km_per_lon * math.cos((lat1 + lat2) / 2)
    dy = (lat2 - lat1) * km_per_lat
    return math.sqrt(dx**2 + dy**2)

# function to calculate the distance between a point and a line segment
def distance_to_segment(point, segment_start, segment_end):
    px, py = point
    x1, y1 = segment_start
    x2, y2 = segment_end
    dx, dy = x2 - x1, y2 - y1
    segment_length_squared = dx*dx + dy*dy
    if segment_length_squared == 0:
        return distance(point, segment_start)
    t = max(0, min(1, ((px - x1) * dx + (py - y1) * dy) / segment_length_squared))
    x = x1 + t * dx
    y = y1 + t * dy
    return distance(point, (x, y))

# function to find accidents on a given route within a maximum distance
def find_accidents_on_route(start_point, end_point, all_relevant_accidents):
    # maximal distance of accidents from route in kilometers
    max_distance = 0.05
    # create a mask for accidents that are within the maximum distance from the route
    mask = all_relevant_accidents.apply(lambda row: distance_to_segment((row['Start_Lat'], row['Start_Lng']), start_point, end_point) <= max_distance, axis=1)

    # return the accidents that match the mask
    accidents = all_relevant_accidents.loc[mask]
    return accidents

In [3]:
# main function that splits route data into 5 segments and finds corresponding accidents, calculates metrics, and generates an output json file
def find_accidents(route_data):
    # load route data from json file
    with open(route_data) as f:
        json_data = json.load(f)

    # create pandas DataFrame from loaded route data with columns lat, lng, assign (with the 0 or 1)
    data_dict_list = [{"lat": item[0]["lat"], "lng": item[0]["lng"], "assign": item[1]} for item in json_data]
    route_data = pd.DataFrame(data_dict_list)

    # split the route DataFrame into 5 equally sized parts
    df_list = np.array_split(route_data, 5)

    # loop through the 5 route segments
    route_dict = {}
    accident_dict = {}
    for i, df in enumerate(df_list):
        
        # safe each split dataframe as a new route_df_{i} dataframe
        route_dict[f"route_df_{i+1}"] = df

        # reduce route data to those rows that are 100m points & create a new DataFrame to store the results
        route_data_assigned = df[df['assign'] == 1]
        route_data_lat_lng = route_data_assigned[['lat','lng']]
        accidents_df = pd.DataFrame()
        
        # loop through the pairs of subsequent coordinates per route segment
        for j in range(len(route_data_lat_lng) - 1):
            # get start and end point by using the subsequent coordinate pair
            start_point = j
            end_point = j + 1

            # retrieve relevant accident data; uses the midpoint between start and end point of a coordinate pair
            point_lat = (route_data_lat_lng.iloc[end_point]['lat'] + route_data_lat_lng.iloc[start_point]['lat']) / 2
            point_lng = (route_data_lat_lng.iloc[end_point]['lng'] + route_data_lat_lng.iloc[start_point]['lng']) / 2
            dataset_id = np.char.add(np.char.add(np.char.mod('%s', point_lat.astype(str)[:4]), '_'), point_lng.astype(str)[:5])
            # loads file only if it exists (there might be route parts where no accident file exists as there are no accidents (i.e., on non-highway routes))
            filename = f'data/ga_accidents_{dataset_id}.csv'
            if os.path.isfile(filename):
                all_relevant_accidents = pd.read_csv(filename)
                # call the accident-retrieving function and append the results to the segment-specific accidents DataFrame
                accidents_df = pd.concat([accidents_df, find_accidents_on_route(route_data_lat_lng.iloc[start_point], route_data_lat_lng.iloc[end_point], all_relevant_accidents)], ignore_index=True)
        
        # drop duplicate rows from the accidents DataFrame; as we're searching for accidents around each segment within 50 meters, there could be duplicates in the accidents set
        accidents_df.drop_duplicates(inplace=True)
        
        # add segment-specific accident dataframe to the accident dictionary & assign a name that reflects the segment number
        accident_dict[f"accidents_df_{i+1}"] = accidents_df

    # calculate api per segment
    api_value_dict = {}
    accidents_dfs = list(accident_dict.values())
    for i, df in enumerate(accidents_dfs, start=1):
        api_value = df["Severity"].sum() / 6035011
        api_value_dict[f'api_{i}'] = round(api_value, 8)

    # convert api values into colors
    api_color_dict = api_value_dict.copy()
    min_api_value = min(api_color_dict.values())
    max_api_value = max(api_color_dict.values())
    for key in api_color_dict:
        api_color_dict[key] = (api_value_dict[key] - min_api_value) / (max_api_value - min_api_value)
    def get_hex_color(value):
        # convert a normalized value to a hex color code representing a gradient from green to red.
        r = int(255 * value)
        g = int(255 * (1 - value))
        b = 0
        return f'{r:02x}{g:02x}{b:02x}'
    for key in api_color_dict:
        api_color_dict[key] = get_hex_color(api_color_dict[key])

    # number of accidents per segment
    num_accidents_dict = {}
    accidents_dfs = list(accident_dict.values())
    for i, df in enumerate(accidents_dfs, start=1):
        num_accidents = df.shape[0]
        num_accidents_dict[f'acci_{i}'] = num_accidents

    # time of the day of accidents per segment
    accident_time_dict = {}
    accidents_dfs = list(accident_dict.values())
    for i, df in enumerate(accidents_dfs, start=1):
        df['Start_Time'] = pd.to_datetime(df['Start_Time'])
        hour_counts = df['Start_Time'].dt.hour.value_counts().sort_index().to_dict()
        for hour in range(24):
            if hour not in hour_counts:
                hour_counts[hour] = 0
        hour_counts = dict(sorted(hour_counts.items()))
        accident_time_dict[f'time_{i}'] = hour_counts

    # top 5 weather conditions per segment
    weather_condition_dict = {}
    accidents_dfs = list(accident_dict.values())
    top_n = 5 # number of top weather conditions to include in the dictionary
    for i, df in enumerate(accidents_dfs, start=1):
        weather_counts = df['Weather_Condition'].value_counts().head(top_n).to_dict()
        weather_condition_dict[f'weather_{i}'] = weather_counts

    # create a dictionary to store the JSON data & loop over the segments and add the API and route data to the JSON dictionary
    json_dict = {}
    route_dfs = list(route_dict.values())   
    for i in range(5):
        segment_name = f"segment_{i+1}"
        json_dict[segment_name] = {}
        json_dict[segment_name]["api_value"] = api_value_dict[f"api_{i+1}"]
        json_dict[segment_name]["api_color"] = api_color_dict[f"api_{i+1}"]
        json_dict[segment_name]["num_accidents"] = num_accidents_dict[f"acci_{i+1}"]
        json_dict[segment_name]["accident_time"] = accident_time_dict[f"time_{i+1}"]
        json_dict[segment_name]["weather_condition"] = weather_condition_dict[f"weather_{i+1}"]
        json_dict[segment_name]["route"] = route_dfs[i][["lat", "lng"]].to_dict(orient="records")

    # export the JSON file
    with open("backend_output.json", "w") as f:
        json.dump(json_dict, f)

In [5]:
# run the main function with the route data
find_accidents('athens_GA_path.json')