In [1]:
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

In [2]:
import numpy as np
import pandas as pd
from pathlib import Path
import psutil
import tqdm
import yaml
import glob
import math
import os
import gc
import re

from datetime import datetime
from multiprocessing import Pool
from IPython.display import clear_output

from scipy.stats import ttest_1samp, t, chi2_contingency
from statsmodels.stats.proportion import proportion_confint

from scipy.optimize import minimize
from scipy.special import betaln
from scipy.stats import beta

from sklearn.cluster import KMeans

import matplotlib.pyplot as plt
import seaborn as sns

In [3]:
import plotly.io as pio
# pio.renderers.default = "notebook"
# pio.renderers.default = "notebook_connected"
pio.renderers.default = "iframe"

import plotly.graph_objects as go
import plotly.express as px

from plotly.subplots import make_subplots

In [4]:
from src.components.feature_extraction import data_quality_check
from src.utils import get_root_directory, float_to_int

In [5]:
# Get root directory of the project
root_dir = get_root_directory()

In [6]:
# Maximize Column Display 
pd.set_option('display.max_colwidth', None)     # Display all content within each cell without truncation
pd.set_option('display.max_columns', None)      # Display all columns
pd.set_option('display.width', None)            # Display entire width of DataFrame is displayed

pd.set_option('display.max_rows', None)         # Display all rows

In [7]:
# Get the current CPU usage as a percentage
cpu_usage = psutil.cpu_percent(interval=1)  # Interval of 1 second
print(f"Current CPU usage: {cpu_usage}%")

# Get the per-core usage
cpu_per_core = psutil.cpu_percent(interval=1, percpu=True)
print(f"CPU usage per core: {cpu_per_core}")

# Get the total number of cores
cpu_cores = psutil.cpu_count()
print(f"Total CPU cores: {cpu_cores}")

Current CPU usage: 6.2%
CPU usage per core: [6.1, 6.1, 3.0, 4.1, 22.2, 7.0, 4.0, 5.1, 5.1, 4.0, 12.0, 7.1]
Total CPU cores: 12


# Recommendation

## FDOT D5

In [8]:
# Configurations
signal_ids = [
    "1285", "1290",
    "1300", "1315", "1325", "1330", 
    "1455", "1470", "1490",
    "1500", "1555",
    "1707", "1725", "1790", "1795", 
    "1960",
    "2055", 
    "2485", 
    "2665", 
    # "D5I-3000"
]

In [9]:
def load_data(dirpath: str, signal_id: str):
    # Cycle-level SPaT
    filepaths = f"{dirpath}/{signal_id}/*"
    filepaths = [filepath for filepath in glob.glob(filepaths)]

    data = []
    for filepath in filepaths:
        data.append(pd.read_pickle(filepath))
        
    df_id = pd.concat(data, axis=0, ignore_index=True)

    return df_id

In [10]:
def bootstrap_ci(data, func=np.mean, n_iterations=1000, alpha=0.05):
    """
    Calculate bootstrapped confidence intervals with flexibility for mean or median.
    
    Parameters:
    -----------
    data : array-like
        Input data for bootstrapping. NaN values will be ignored.
    func : callable
        Function to apply to each bootstrap sample (e.g., np.mean, np.median).
    n_iterations : int
        Number of bootstrap iterations (default: 1000).
    alpha : float
        Significance level for confidence intervals (default: 0.05).
    
    Returns:
    --------
    tuple
        Lower and upper bounds of the confidence interval.
        Returns (np.nan, np.nan) if the input data is entirely NaN.
    """
    # Remove NaN values
    data = np.array(data)
    data = data[~np.isnan(data)]
    
    # Handle case where all data is NaN
    if len(data) == 0:
        return np.nan, np.nan
    
    # Perform bootstrapping
    bootstrap_samples = [func(np.random.choice(data, size=len(data), replace=True)) for _ in range(n_iterations)]
    lower = np.percentile(bootstrap_samples, alpha / 2 * 100)
    upper = np.percentile(bootstrap_samples, (1 - alpha / 2) * 100)
    
    return lower, upper

In [11]:
# Delete unwanted file
# dirpath = Path("../data/production/atspm/fdot_d5/feature_extraction/")
# file_name = "2024-06-10.pkl"

# # Recursively search for the target file in all folders and subfolders
# for filepath in dirpath.rglob(file_name):
#     print(f"Deleting: {filepath}")
#     filepath.unlink() 

### Pedestrian Recall

#### v1

##### Identification of Critical Hours

###### Pedestrian Demand Probability

In [None]:
def calculate_demand_probability(df, A_col, C_col, output_col="demandProbability"):
    """
    Calculate pedestrian demand probability for columns in a DataFrame.

    Parameters:
    df (pd.DataFrame): The input DataFrame containing pedestrian activity and cycle length.
    A_col (str): Name of the column for pedestrian activity (average arrivals per hour).
    C_col (str): Name of the column for cycle length (seconds).
    output_col (str): Name of the column to store pedestrian demand probability.

    Returns:
    pd.DataFrame: The DataFrame with an added column for pedestrian demand probability.
    """
    def calculate_pedestrian_demand(A, C):
        if C <= 0:
            return None  # Handle invalid cycle length
        return 1 - math.exp(-(A * C) / 3600)
    
    # Apply the calculation row-wise
    df[output_col] = df.apply(
        lambda row: calculate_pedestrian_demand(row[A_col], row[C_col]), axis=1
    )
    return df

In [None]:
def calculate_ci_with_demand_probability(signal_id):
    # Cycle-level SPaT
    df_spat_id = load_data(
        dirpath="../data/production/atspm/fdot_d5/feature_extraction/feature/cycle/vehicle_signal/spat",
        signal_id=signal_id
    )
    columns = [column for column in df_spat_id.columns if not any(k in column for k in ["yellow", "redClearance", "red", "1", "3", "5", "7"])]
    df_spat_id = df_spat_id[columns]
    
    duration_columns = [column for column in df_spat_id.columns if "Duration" in column]
    df_spat_id = (
        pd.melt(df_spat_id, 
                id_vars=["signalID", "date", "cycleNo", "cycleBegin", "cycleEnd", "cycleLength"], 
                value_vars=duration_columns, value_name="greenDuration", var_name="phaseNo")
    )
    
    df_spat_id["phaseNo"] = (
        df_spat_id["phaseNo"].str.extract(r'Phase(\d)').astype(int)
    )

    # Cycle-level pedestrian signal details
    df_pedestrian_cycle_profile_id = load_data(
        dirpath="../data/production/atspm/fdot_d5/feature_extraction/signal_profile/cycle/pedestrian_signal",
        signal_id=signal_id
    )
    df_pedestrian_cycle_profile_id["walkDuration"] = (
        (df_pedestrian_cycle_profile_id["pedestrianWalkEnd"] - df_pedestrian_cycle_profile_id["pedestrianWalkBegin"]).dt.seconds
    )
    df_pedestrian_cycle_profile_id["pedestrianSignalDuration"] = (
        (df_pedestrian_cycle_profile_id["pedestrianClearanceEnd"] - df_pedestrian_cycle_profile_id["pedestrianWalkBegin"]).dt.seconds
    )
    df_pedestrian_cycle_profile_id = df_pedestrian_cycle_profile_id.drop(
        columns=["correctSequenceFlag", "pedestrianWalkBegin", "pedestrianWalkEnd", "pedestrianClearanceBegin", "pedestrianClearanceEnd", "pedestrianDontWalkBegin"]
    )

    # Cycle-level pedestrian activity
    df_pedestrian_activity_id = load_data(
        dirpath="../data/production/atspm/fdot_d5/feature_extraction/feature/cycle/pedestrian_traffic/activity",
        signal_id=signal_id
    )
    columns = [
        column for column in df_pedestrian_activity_id.columns if not any(k in column for k in ["Prev", "Curr", "45"])
    ]
    df_pedestrian_activity_id = df_pedestrian_activity_id[columns]
    df_pedestrian_activity_id["hour"] = df_pedestrian_activity_id["cycleBegin"].dt.hour
    
    activity_columns = [col for col in df_pedestrian_activity_id.columns if "Activity" in col]
    df_pedestrian_activity_id = df_pedestrian_activity_id.assign(
        **{
            col: df_pedestrian_activity_id.groupby(["signalID", "date", "hour"])[col].transform("sum")
            for col in activity_columns
        }
    )
    df_pedestrian_activity_id = (
        pd.melt(df_pedestrian_activity_id, 
                id_vars=["signalID", "date", "hour", "cycleNo"], 
                value_vars=activity_columns, value_name="pedestrianActivity", var_name="phaseNo")
    )
    df_pedestrian_activity_id["phaseNo"] = (
        df_pedestrian_activity_id["phaseNo"].str.extract(r'Phase(\d)').astype(int)
    )

    # Merge
    df_pedestrian_demand_prob_id = (
        pd.merge(df_spat_id, df_pedestrian_cycle_profile_id,  
                 how="left", on=["signalID", "date", "cycleNo", "cycleBegin", "cycleEnd", "phaseNo"])
    )
    df_pedestrian_demand_prob_id["hour"] = df_pedestrian_demand_prob_id["cycleBegin"].dt.hour
    
    df_pedestrian_demand_prob_id = (
        pd.merge(df_pedestrian_demand_prob_id, df_pedestrian_activity_id, 
                 how="left", on=["signalID", "date", "hour", "cycleNo", "phaseNo"])
    )
    df_pedestrian_demand_prob_id = (
        df_pedestrian_demand_prob_id.drop(columns=["cycleBegin", "cycleEnd"])
    )
    df_pedestrian_demand_prob_id["cycleNo"] = df_pedestrian_demand_prob_id["cycleNo"].astype(int)
    df_pedestrian_demand_prob_id["pedestrianActivity"] = (
        df_pedestrian_demand_prob_id["pedestrianActivity"].apply(lambda x: int(x) if pd.notna(x) else 0)
    )
    # df_pedestrian_demand_prob_id["pedestrianSignalDuration"] = (
    #     df_pedestrian_demand_prob_id.groupby(["signalID", "date", "hour", "phaseNo"])["pedestrianSignalDuration"].transform("min")
    # )
    # CI calculation
    df_pedestrian_demand_prob_id = (
        calculate_demand_probability(df_pedestrian_demand_prob_id, 
                                     A_col="pedestrianActivity",     
                                     C_col="cycleLength")
    )
    df_pedestrian_demand_prob_id["demandProbability"] = df_pedestrian_demand_prob_id["demandProbability"].replace(0, np.nan)

    
    df_pedestrian_demand_prob_id_hourly = (
        df_pedestrian_demand_prob_id
        .groupby(["signalID", "date", "hour", "phaseNo"])
        .agg(
            greenDurationAvg=("greenDuration", lambda x: np.nan if x.isna().all() else x.mean(skipna=True)),
            pedestrianSignalDurationMin=("pedestrianSignalDuration", lambda x: np.nan if x.isna().all() else x.min(skipna=True)),
            demandProbabilityAvg=("demandProbability", lambda x: np.nan if x.isna().all() else x.mean(skipna=True)),
            # demandProbabilityAvg=("demandProbability", lambda x: np.nan if x.isna().all() else x.max(skipna=True))
        )
        .reset_index()  
    )
    
    df_pedestrian_demand_prob_id_hourly["efficiencyRatioAvg"] = (
        df_pedestrian_demand_prob_id_hourly["greenDurationAvg"] / df_pedestrian_demand_prob_id_hourly["pedestrianSignalDurationMin"]
    )

    # Group by `phaseNo` and `hour` and calculate bootstrap confidence intervals and mean
    pedestrian_demand_probs_id = []
    
    for (signal_id, hour, phase_no), group in df_pedestrian_demand_prob_id_hourly.groupby(["signalID", "hour", "phaseNo"]):
        demand_probs = group["demandProbabilityAvg"].values
        efficiency_ratios = group["efficiencyRatioAvg"].values

        # Check if demand_probs has valid values (non-NaN)
        if len(demand_probs) > 0 and not np.isnan(demand_probs).all():
            demand_prob_lower, demand_prob_upper = bootstrap_ci(demand_probs, func=np.mean)
            demand_prob_mean = np.nanmean(demand_probs)
        else:
            # Handle the case where all values are NaN
            demand_prob_lower, demand_prob_upper, demand_prob_mean = np.nan, np.nan, np.nan
        
        # Check if efficiency_ratios has valid values (non-NaN)
        if len(efficiency_ratios) > 0 and not np.isnan(efficiency_ratios).all():
            efficiency_ratio_lower, efficiency_ratio_upper = bootstrap_ci(efficiency_ratios, func=np.mean)
            efficiency_ratio_mean = np.nanmean(efficiency_ratios)
        else:
            # Handle the case where all values are NaN
            efficiency_ratio_lower, efficiency_ratio_upper, efficiency_ratio_mean = np.nan, np.nan, np.nan
    
        # Append results to the list
        pedestrian_demand_probs_id.append(
            {
                "signalID": signal_id, "hour": hour, "phaseNo": phase_no,
                "demandProbabilityAvg": demand_prob_mean,
                "demandProbabilityAvgLower": demand_prob_lower,
                "demandProbabilityAvgUpper": demand_prob_upper,
                "efficiencyRatioAvg": efficiency_ratio_mean,
                "efficiencyRatioAvgLower": efficiency_ratio_lower,
                "efficiencyRatioAvgUpper": efficiency_ratio_upper
            }
        )
    
    # Create a results DataFrame
    df_pedestrian_demand_prob_id_hourly = pd.DataFrame(pedestrian_demand_probs_id)

    return df_pedestrian_demand_prob_id, df_pedestrian_demand_prob_id_hourly

In [None]:
# _, df_pedestrian_demand_prob_id_hourly = calculate_ci_with_demand_probability(signal_id="1500")

# phase_nos = df_pedestrian_demand_prob_id_hourly["phaseNo"].unique()

# # Create a 2x2 grid for subplots
# fig = make_subplots(
#     rows=2, cols=2,
#     subplot_titles=[f"Phase {phase_no}" for phase_no in phase_nos],
#     shared_xaxes=False, shared_yaxes=False,
#     vertical_spacing=0.15
# )

# # Iterate over phases and add traces
# row_col_mapping = [(1, 1), (1, 2), (2, 1), (2, 2)]  # Map phases to subplot positions
# for idx, (phase_no, (row, col)) in enumerate(zip(phase_nos, row_col_mapping)):
#     proc_df_pedestrian_demand_prob_id_hourly = df_pedestrian_demand_prob_id_hourly[df_pedestrian_demand_prob_id_hourly["phaseNo"] == phase_no]
    
#     # Calculate error bars
#     yerr_lower = (
#         proc_df_pedestrian_demand_prob_id_hourly["demandProbabilityAvg"] - proc_df_pedestrian_demand_prob_id_hourly["demandProbabilityAvgLower"]
#     )
#     yerr_upper = (
#         proc_df_pedestrian_demand_prob_id_hourly["demandProbabilityAvgUpper"] - proc_df_pedestrian_demand_prob_id_hourly["demandProbabilityAvg"]
#     )

#     # Add scatter plot with error bars
#     fig.add_trace(
#         go.Scatter(
#             x=proc_df_pedestrian_demand_prob_id_hourly["hour"],
#             y=proc_df_pedestrian_demand_prob_id_hourly["demandProbabilityAvg"],
#             error_y=dict(
#                 type="data",
#                 symmetric=False,
#                 array=yerr_upper,
#                 arrayminus=yerr_lower,
#                 color="blue",
#                 thickness=1.5,
#                 width=3,
#             ),
#             mode="markers+lines",
#             marker=dict(size=8),
#             name=f"Phase {phase_no}",
#         ),
#         row=row, col=col
#     )

#     # Add a horizontal threshold line
#     fig.add_shape(
#         type="line",
#         x0=0,
#         x1=23,
#         y0=0.6,
#         y1=0.6,
#         line=dict(color="red", dash="dash"),
#         row=row, col=col
#     )
#     # Add a horizontal threshold line
#     fig.add_shape(
#         type="line",
#         x0=0,
#         x1=23,
#         y0=0.4,
#         y1=0.4,
#         line=dict(color="orange", dash="dash"),
#         row=row, col=col
#     )

# # Update layout
# fig.update_layout(
#     # title="Proportion of Cycles Recommended with PR by Hour with 95% Confidence Intervals",
#     height=800,
#     width=1400,
#     showlegend=False,
#     xaxis_title="Hour of Day",
#     yaxis_title="Probability of Pedestrian Demand",
#     yaxis=dict(range=[0, 1]),  # Set y-axis range
#     font=dict(size=14)
# )

# # Update axis labels for shared x/y axes
# fig.update_xaxes(
#     title_text="Hour of Day",
#     # dtick=1,
#     # tickvals=list(range(24)),
#     # ticktext=[f"{hour}:00" for hour in range(24)],
#     # tickformat="%H:00",
#     # tickangle=-45,
#     # row=2, col=1,  # Shared x-axis title position
#     title_font=dict(size=16),
#     tickfont=dict(size=16),
# )

# # Update y-axes for shared configuration
# fig.update_yaxes(
#     title_text="Probability of Pedestrian Demand", 
#     range=[0, 0.75],  # Set y-axis range for all subplots
#     tickvals=np.linspace(0, 1, 11),  # Tick values from 0 to 1 with step 0.1
#     title_font=dict(size=16),
#     tickfont=dict(size=16),
# )

# # Show plot
# fig.show()

In [None]:
# _, df_pedestrian_demand_prob_id_hourly = calculate_ci_with_demand_probability(signal_id="1500")

# phase_nos = df_pedestrian_demand_prob_id_hourly["phaseNo"].unique()

# # Create a 2x2 grid for subplots
# fig = make_subplots(
#     rows=2, cols=2,
#     subplot_titles=[f"Phase {phase_no}" for phase_no in phase_nos],
#     shared_xaxes=False, shared_yaxes=False,
#     vertical_spacing=0.15
# )

# # Iterate over phases and add traces
# row_col_mapping = [(1, 1), (1, 2), (2, 1), (2, 2)]  # Map phases to subplot positions
# for idx, (phase_no, (row, col)) in enumerate(zip(phase_nos, row_col_mapping)):
#     proc_df_pedestrian_demand_prob_id_hourly = df_pedestrian_demand_prob_id_hourly[df_pedestrian_demand_prob_id_hourly["phaseNo"] == phase_no]
    
#     # Calculate error bars
#     yerr_lower = (
#         proc_df_pedestrian_demand_prob_id_hourly["efficiencyRatioAvg"] - proc_df_pedestrian_demand_prob_id_hourly["efficiencyRatioAvgLower"]
#     )
#     yerr_upper = (
#         proc_df_pedestrian_demand_prob_id_hourly["efficiencyRatioAvgUpper"] - proc_df_pedestrian_demand_prob_id_hourly["efficiencyRatioAvg"]
#     )

#     # Add scatter plot with error bars
#     fig.add_trace(
#         go.Scatter(
#             x=proc_df_pedestrian_demand_prob_id_hourly["hour"],
#             y=proc_df_pedestrian_demand_prob_id_hourly["efficiencyRatioAvg"],
#             error_y=dict(
#                 type="data",
#                 symmetric=False,
#                 array=yerr_upper,
#                 arrayminus=yerr_lower,
#                 color="blue",
#                 thickness=1.5,
#                 width=3,
#             ),
#             mode="markers+lines",
#             marker=dict(size=8),
#             name=f"Phase {phase_no}",
#         ),
#         row=row, col=col
#     )

#     # Add a horizontal threshold line
#     fig.add_shape(
#         type="line",
#         x0=0,
#         x1=23,
#         y0=1,
#         y1=1,
#         line=dict(color="red", dash="dash"),
#         row=row, col=col
#     )

# # Update layout
# fig.update_layout(
#     # title="Proportion of Cycles Recommended with PR by Hour with 95% Confidence Intervals",
#     height=800,
#     width=1400,
#     showlegend=False,
#     xaxis_title="Hour of Day",
#     yaxis_title="Efficiency Ratio",
#     yaxis=dict(range=[0, 1]),  # Set y-axis range
#     font=dict(size=14)
# )

# # Update axis labels for shared x/y axes
# fig.update_xaxes(
#     title_text="Hour of Day",
#     # dtick=1,
#     # tickvals=list(range(24)),
#     # ticktext=[f"{hour}:00" for hour in range(24)],
#     # tickformat="%H:00",
#     # tickangle=-45,
#     # row=2, col=1,  # Shared x-axis title position
#     title_font=dict(size=16),
#     tickfont=dict(size=16),
# )

# # Update y-axes for shared configuration
# fig.update_yaxes(
#     title_text="Efficiency Ratio", 
#     range=[0, 0.75],  # Set y-axis range for all subplots
#     tickvals=np.linspace(0, 1, 11),  # Tick values from 0 to 1 with step 0.1
#     title_font=dict(size=16),
#     tickfont=dict(size=16),
# )

# # Show plot
# fig.show()

In [None]:
def hour_with_demand_probability(signal_id: str, with_efficiency: bool):
    dict_hour_with_demand_probability_id = {"signalID": [], "approachDir": [], "phaseNo": []}
    dict_hour_with_demand_probability_id.update(
        {str(hour): [] for hour in range(24)}
    )
    df_config_id = pd.read_csv(f"../data/interim/atspm/fdot_d5/signal_config/{signal_id}.csv")
    
    phase_nos_config = df_config_id["phaseNo"].unique().tolist()
    phase_nos_config = [int(phase_no) for phase_no in phase_nos_config if pd.notna(phase_no) and phase_no % 2 == 0]
    
    try:
        _, df_pedestrian_demand_prob_id_hourly = calculate_ci_with_demand_probability(signal_id=signal_id)
        
        phase_nos_demand = df_pedestrian_demand_prob_id_hourly["phaseNo"].unique().tolist()
        phase_nos_demand = [int(phase_no) for phase_no in phase_nos_demand]
    
        sequence = [2, 6, 4, 8]
        phase_nos = sorted(list(set(phase_nos_config + phase_nos_demand)), key=lambda x: sequence.index(x))
        
        for phase_no in phase_nos:
            dict_hour_with_demand_probability_id["signalID"].append(signal_id)
    
            if phase_no in phase_nos_config:
                approach_dir = df_config_id[df_config_id["phaseNo"] == phase_no]["approach"].unique()[0].split()[-1][0:-1]
            else:
                approach_dir = "NA"
        
            dict_hour_with_demand_probability_id["approachDir"].append(approach_dir)
            dict_hour_with_demand_probability_id["phaseNo"].append(phase_no)
        
            for i in range(24):
                proc_df_pedestrian_demand_prob_id_hourly = (
                    df_pedestrian_demand_prob_id_hourly
                    .query("hour == @i & phaseNo == @phase_no")
                )
        
                if len(proc_df_pedestrian_demand_prob_id_hourly) == 0:
                    dict_hour_with_demand_probability_id[str(i)].append(np.nan)
                else:
                    proc_df_pedestrian_demand_prob_id_hourly = proc_df_pedestrian_demand_prob_id_hourly.reset_index(drop=True)
                    demand_prob_lower = proc_df_pedestrian_demand_prob_id_hourly.loc[0, "demandProbabilityAvgLower"]
                    efficiency_prob_lower = proc_df_pedestrian_demand_prob_id_hourly.loc[0, "efficiencyRatioAvgLower"]
                    
                    if demand_prob_lower >= 0.6:
                        dict_hour_with_demand_probability_id[str(i)].append(1)
                    elif demand_prob_lower >= 0.4:
                        if with_efficiency:
                            if efficiency_prob_lower >= 1:
                                dict_hour_with_demand_probability_id[str(i)].append(1)
                            else:
                                dict_hour_with_demand_probability_id[str(i)].append(0)
                        else:
                            dict_hour_with_demand_probability_id[str(i)].append(1)
                    else:
                        dict_hour_with_demand_probability_id[str(i)].append(0)
    
        return pd.DataFrame(dict_hour_with_demand_probability_id)
    except:
        for phase_no in phase_nos_config:
            dict_hour_with_demand_probability_id["signalID"].append(signal_id)
            dict_hour_with_demand_probability_id["approachDir"].append("NA")
            dict_hour_with_demand_probability_id["phaseNo"].append(np.nan)
                                                                      
            for i in range(24):
                dict_hour_with_demand_probability_id[str(i)].append(np.nan)

        return pd.DataFrame(dict_hour_with_demand_probability_id)

In [None]:
# df_hour_with_demand_probability = pd.DataFrame()

# for signal_id in tqdm.tqdm(signal_ids):
#     df_hour_with_demand_probability_id = hour_with_demand_probability(signal_id=signal_id, with_efficiency=False)
#     df_hour_with_demand_probability = (
#         pd.concat([df_hour_with_demand_probability, df_hour_with_demand_probability_id], axis=0, ignore_index=True)
#     )

In [None]:
# float_to_int(df_hour_with_demand_probability)

###### Pedestrian Delay

In [None]:
def calculate_ci_with_delay(signal_id):
    df_pedestrian_delay_id = load_data(
        dirpath="../data/production/atspm/fdot_d5/feature_extraction/feature/cycle/pedestrian_traffic/delay",
        signal_id=signal_id
    )
    columns = [
        col for col in df_pedestrian_delay_id.columns if all(suffix not in col for suffix in ["CurrCycle", "PrevCycle"]) and ("Delay" in col) and df_pedestrian_delay_id[col].dtype != "O"
    ]
    
    df_pedestrian_delay_id["hour"] = df_pedestrian_delay_id["cycleBegin"].dt.hour
    df_pedestrian_delay_id = (
        pd.melt(df_pedestrian_delay_id, 
                id_vars=["signalID", "date", "hour"], 
                var_name="phaseNo",
                value_vars=columns,
                value_name="pedestrianDelay"
               )
    )
    df_pedestrian_delay_id["phaseNo"] = df_pedestrian_delay_id["phaseNo"].str.extract(r'Phase(\d)').astype(int)

    # Filter non-zero values
    df_pedestrian_delay_id = (
        df_pedestrian_delay_id[df_pedestrian_delay_id["pedestrianDelay"] > 0]
    )
    # df_pedestrian_delay_id["recommendation"] = (
    #     df_pedestrian_delay_id["pedestrianDelay"].apply(lambda x: 1 if x>=60 else 0)
    # )
    df_pedestrian_delay_id_hourly = (
        df_pedestrian_delay_id
        .groupby(["signalID", "date", "hour", "phaseNo"])
        .agg(
            pedestrianDelayAvg=("pedestrianDelay", "mean")
        )
        .reset_index()  
    )

    # Group by `phaseNo` and `hour` and calculate bootstrap confidence intervals and mean
    pedestrian_delays_id = []
    for (signal_id, hour, phase_no), group in df_pedestrian_delay_id_hourly.groupby(["signalID", "hour", "phaseNo"]):
        pedestrian_delays = group["pedestrianDelayAvg"].values
        
        pedestrian_delay_lower, pedestrian_delay_upper = bootstrap_ci(pedestrian_delays)
        pedestrian_delay_mean = np.mean(pedestrian_delays)  # Calculate the mean
        
        pedestrian_delays_id.append(
            {
                "signalID": signal_id, "hour": hour, "phaseNo": phase_no, 
                "pedestrianDelayAvg": pedestrian_delay_mean, 
                "pedestrianDelayAvgLower": pedestrian_delay_lower, 
                "pedestrianDelayAvgUpper": pedestrian_delay_upper
            }
        )
    
    # Create a results DataFrame
    df_pedestrian_delay_id_hourly = pd.DataFrame(pedestrian_delays_id)

    return df_pedestrian_delay_id, df_pedestrian_delay_id_hourly

In [None]:
# _, df_pedestrian_delay_id_hourly = calculate_ci_with_delay(signal_id="1500")
# phase_nos = df_pedestrian_delay_id_hourly["phaseNo"].unique()

# # Create a 2x2 grid for subplots
# fig = make_subplots(
#     rows=2, cols=2,
#     subplot_titles=[f"Phase {phase_no}" for phase_no in phase_nos],
#     shared_xaxes=False, shared_yaxes=False,
#     vertical_spacing=0.15
# )

# # Iterate over phases and add traces
# row_col_mapping = [(1, 1), (1, 2), (2, 1), (2, 2)]  # Map phases to subplot positions
# for idx, (phase_no, (row, col)) in enumerate(zip(phase_nos, row_col_mapping)):
#     proc_df_pedestrian_delay_id_hourly = df_pedestrian_delay_id_hourly[df_pedestrian_delay_id_hourly["phaseNo"] == phase_no]
    
#     # Calculate error bars
#     yerr_lower = (
#         proc_df_pedestrian_delay_id_hourly["pedestrianDelayAvg"] - proc_df_pedestrian_delay_id_hourly["pedestrianDelayAvgLower"]
#     )
#     yerr_upper = (
#         proc_df_pedestrian_delay_id_hourly["pedestrianDelayAvgUpper"] - proc_df_pedestrian_delay_id_hourly["pedestrianDelayAvg"]
#     )

#     # Add scatter plot with error bars
#     fig.add_trace(
#         go.Scatter(
#             x=proc_df_pedestrian_delay_id_hourly["hour"],
#             y=proc_df_pedestrian_delay_id_hourly["pedestrianDelayAvg"],
#             error_y=dict(
#                 type="data",
#                 symmetric=False,
#                 array=yerr_upper,
#                 arrayminus=yerr_lower,
#                 color="blue",
#                 thickness=1.5,
#                 width=3,
#             ),
#             mode="markers",
#             marker=dict(size=8),
#             name=f"Phase {phase_no}",
#         ),
#         row=row, col=col
#     )

#     # Add a horizontal threshold line
#     fig.add_shape(
#         type="line",
#         x0=0,
#         x1=23,
#         y0=45,
#         y1=45,
#         line=dict(color="red", dash="dash"),
#         row=row, col=col
#     )

# # Update layout
# fig.update_layout(
#     # title="Proportion of Cycles Recommended with PR by Hour with 95% Confidence Intervals",
#     height=800,
#     width=1400,
#     showlegend=False,
#     xaxis_title="Hour of Day",
#     yaxis_title="Pedestrian Delay (sec)",
#     font=dict(size=14)
# )

# # Update axis labels for shared x/y axes
# fig.update_xaxes(
#     title_text="Hour of Day",
#     # dtick=1,
#     # tickvals=list(range(24)),
#     # ticktext=[f"{hour}:00" for hour in range(24)],
#     # tickformat="%H:00",
#     # tickangle=-45,
#     # row=2, col=1,  # Shared x-axis title position
#     title_font=dict(size=16),
#     tickfont=dict(size=16),
# )

# # Update y-axes for shared configuration
# fig.update_yaxes(
#     title_text="Pedestrian Delay (sec)", 
#     title_font=dict(size=16),
#     tickfont=dict(size=16),
# )

# # Export the Plotly figure as a high-resolution image
# fig.write_image("../reports/6.2.png", width=1400, height=800, scale=2)

# # Show plot
# fig.show()

In [None]:
def hour_with_delay(signal_id):
    dict_hour_with_delay_id = {"signalID": [], "approachDir": [], "phaseNo": []}
    dict_hour_with_delay_id.update(
        {str(hour): [] for hour in range(24)}
    )
    df_config_id = pd.read_csv(f"../data/interim/atspm/fdot_d5/signal_config/{signal_id}.csv")
    
    phase_nos_config = df_config_id["phaseNo"].unique().tolist()
    phase_nos_config = [int(phase_no) for phase_no in phase_nos_config if pd.notna(phase_no) and phase_no % 2 == 0]
    
    try:
        _, df_pedestrian_delay_id_hourly = calculate_ci_with_delay(signal_id=signal_id)
        
        phase_nos_demand = df_pedestrian_delay_id_hourly["phaseNo"].unique().tolist()
        phase_nos_demand = [int(phase_no) for phase_no in phase_nos_demand]
    
        sequence = [2, 6, 4, 8]
        phase_nos = sorted(list(set(phase_nos_config + phase_nos_demand)), key=lambda x: sequence.index(x))
        
        for phase_no in phase_nos:
            dict_hour_with_delay_id["signalID"].append(signal_id)
    
            if phase_no in phase_nos_config:
                approach_dir = df_config_id[df_config_id["phaseNo"] == phase_no]["approach"].unique()[0].split()[-1][0:-1]
            else:
                approach_dir = "NA"
        
            dict_hour_with_delay_id["approachDir"].append(approach_dir)
            dict_hour_with_delay_id["phaseNo"].append(phase_no)
        
            for i in range(24):
                proc_df_pedestrian_delay_id_hourly = (
                    df_pedestrian_delay_id_hourly
                    .query("hour == @i & phaseNo == @phase_no")
                )
        
                if len(proc_df_pedestrian_delay_id_hourly) == 0:
                    dict_hour_with_delay_id[str(i)].append(np.nan)
                else:
                    proc_df_pedestrian_delay_id_hourly = proc_df_pedestrian_delay_id_hourly.reset_index(drop=True)
                    pedestrian_delay_lower = proc_df_pedestrian_delay_id_hourly.loc[0, "pedestrianDelayAvgLower"]
                    
                    if pedestrian_delay_lower >= 45: # HCM 2022
                        dict_hour_with_delay_id[str(i)].append(1)
                    else:
                        dict_hour_with_delay_id[str(i)].append(0)
    
        return pd.DataFrame(dict_hour_with_delay_id)
    except:
        for phase_no in phase_nos_config:
            dict_hour_with_delay_id["signalID"].append(signal_id)
            dict_hour_with_delay_id["approachDir"].append("NA")
            dict_hour_with_delay_id["phaseNo"].append(np.nan)
                                                                      
            for i in range(24):
                dict_hour_with_delay_id[str(i)].append(np.nan)

        return pd.DataFrame(dict_hour_with_delay_id)

In [None]:
# df_hour_with_delay = pd.DataFrame()

# for signal_id in tqdm.tqdm(signal_ids):
#     df_critical_hour_with_delat_id = hour_with_delay(signal_id=signal_id)
#     df_hour_with_delay = (
#         pd.concat([df_hour_with_delay, df_critical_hour_with_delay_id], axis=0, ignore_index=True)
#     )

In [None]:
# float_to_int(df_hour_with_delay)

##### Recommendation

In [None]:
# pr_recommendations = (
#     float_to_int(df_hour_with_demand_probability.iloc[:, 3:].replace(np.nan, 1)).values 
#     * 
#     float_to_int(df_hour_with_delay.iloc[:, 3:].replace(np.nan, 1)).values
# )
# data = np.hstack(
#     [df_hour_with_demand_probability[["signalID", "approachDir", "phaseNo"]].values, pr_recommendations]
# )

# df_pr_recommendations = pd.DataFrame(data, columns=df_hour_with_demand_probability.columns)
# df_pr_recommendations["phaseNo"] = df_pr_recommendations[["phaseNo"]].astype("Int64")

# df_pr_recommendations.to_csv("../reports/recommendations/pr_recommendation_with_efficiency.csv", 
#                              index=False)

In [None]:
# pr_recommendations = (
#     float_to_int(df_hour_with_demand_probability.iloc[:, 3:].replace(np.nan, 1)).values 
#     * 
#     float_to_int(df_hour_with_delay.iloc[:, 3:].replace(np.nan, 1)).values
# )
# data = np.hstack(
#     [df_hour_with_demand_probability[["signalID", "approachDir", "phaseNo"]].values, pr_recommendations]
# )

# df_pr_recommendations = pd.DataFrame(data, columns=df_hour_with_demand_probability.columns)
# df_pr_recommendations["phaseNo"] = df_pr_recommendations[["phaseNo"]].astype("Int64")

# df_pr_recommendations.to_csv("../reports/recommendations/pr_recommendation_without_efficiency.csv", 
#                              index=False)

In [None]:
# proc_df_pr_recommendations = df_pr_recommendations[df_pr_recommendations["signalID"] == "1500"]

# # Create a 2x2 grid for the subplots
# fig = make_subplots(
#     rows=2,
#     cols=2,
#     subplot_titles=[f"Phase {phase}" for phase in proc_df_pr_recommendations["phaseNo"].unique()],
#     shared_xaxes=False,
#     # shared_yaxes=False,
#     vertical_spacing=0.6
# )

# # Define the color mapping for PR recommendations
# colors = {0: "#27ae60", 1: "#cb4335"}  # Green: PR Not Needed, Red: PR Needed

# # Add a heatmap for each phase in the 2x2 layout
# phase_nos = proc_df_pr_recommendations["phaseNo"].unique()

# for idx, phase_no in enumerate(phase_nos, start=1):
#     # Get subplot row and column
#     row, col = divmod(idx - 1, 2)
#     row += 1
#     col += 1

#     # Filter data for the current phase
#     proc_df_pr_recommendations_phase = (
#         proc_df_pr_recommendations[proc_df_pr_recommendations["phaseNo"] == phase_no].iloc[:, 3:]
#     )  # Extract hourly binary data
    
#     pr_recommendations_phase = proc_df_pr_recommendations_phase.values  # Convert to matrix
#     x_labels = list(range(24))  # Hours
#     y_labels = [
#         f"{row['signalID']} ({row['approachDir']})" for _, row in proc_df_pr_recommendations[proc_df_pr_recommendations["phaseNo"] == phase_no].iterrows()
#     ]

#     # Add heatmap trace
#     fig.add_trace(
#         go.Heatmap(
#             z=pr_recommendations_phase,
#             x=x_labels,
#             y=y_labels,
#             colorscale=[[0, "#27ae60"], [1, "#cb4335"]],  # Define two discrete colors
#             showscale=False,  # Disable color bar
#             zmin=0,
#             zmax=0.5,
#         ),
#         row=row,
#         col=col,
#     )

# # Add a legend manually
# fig.add_trace(
#     go.Scatter(
#         x=[None], y=[None],
#         mode="markers",
#         marker=dict(size=14, color="#27ae60"),
#         name="PR Not Recommended",
#     )
# )
# fig.add_trace(
#     go.Scatter(
#         x=[None], y=[None],
#         mode="markers",
#         marker=dict(size=14, color="#cb4335"),
#         name="PR Recommended",
#     )
# )

# # Update layout
# fig.update_layout(
#     height=400,
#     width=1400,
#     # title="Hourly Signal PR Recommendations for Each Phase",
#     # yaxis=dict(title="Signal ID (Direction)"),
# )

# # Update layout with legend font size
# fig.update_layout(
#     legend=dict(
#         orientation="h",  # Horizontal legend
#         x=0.5,            # Center horizontally
#         y=-0.4,           # Position below the plot
#         xanchor="center",
#         yanchor="top",
#         font=dict(size=14),  # Set font size for the legend items
#         # title=dict(text="Status", font=dict(size=14)),  # Title with font size
#     )
# )

# # Customize x-axis for each subplot
# for i in range(1, len(phase_nos) + 1):
#     fig.update_xaxes(
#         title="Hour of Day",
#         tickmode="array",
#         tickvals=list(range(24)),
#         ticktext=[f"{hour:02d}:00" for hour in range(24)],
#         row=((i - 1) // 2) + 1,
#         col=((i - 1) % 2) + 1,
#         tickangle=-90,
#         title_font=dict(size=14),
#         tickfont=dict(size=14),
#     )

#     fig.update_yaxes(
#         title_font=dict(size=16),
#         tickfont=dict(size=14),
#     )

# fig.show()

In [None]:
# proc_df_recommendation_with_delay = (
#     df_recommendation_with_delay
#     .groupby(["signalID"])
#     .agg(
#         **{
#             str(col): (str(col), "max")  
#             for col in range(24)            
#         }
#     )
#     .reset_index()
# )

# proc_df_hour_with_demand_probability = (
#     df_hour_with_demand_probability
#     .groupby(["signalID"])
#     .agg(
#         **{
#             str(col): (str(col), "max")  
#             for col in range(24)            
#         }
#     )
#     .reset_index()
# )

In [None]:
# recommendations = (
#     proc_df_recommendation_with_delay.iloc[:, 1:].values * proc_df_hour_with_demand_probability.iloc[:, 1:].values
# )
# data = np.hstack([np.array(signal_ids).reshape(-1, 1), recommendations])

# df_pr_recommendations = pd.DataFrame(data, columns=proc_df_recommendation_with_delay.columns)
# df_pr_recommendations.to_csv("../reports/recommendations/intersection_level/pr_recommendation_with_efficiency.csv", 
#                              index=False)

#### v2

##### Calculation of Pedestrian Presence Probability

In [None]:
def calculate_hourly_activity_summary(signal_id: str):
    df_spat_id = (
        load_data(
            dirpath="../data/production/atspm/fdot_d5/feature_extraction/feature/cycle/vehicle_signal/spat/",
            signal_id=signal_id
        )
    )
    
    df_pedestrian_activity_id = (
        load_data(
            dirpath="../data/production/atspm/fdot_d5/feature_extraction/feature/cycle/pedestrian_traffic/activity",
            signal_id=signal_id
        )
    )
    
    columns = [
        column for column in df_pedestrian_activity_id.columns if not any(k in column for k in ["45"])
    ]
    df_pedestrian_activity_id = df_pedestrian_activity_id[columns]
    df_pedestrian_activity_id = float_to_int(df_pedestrian_activity_id)
    
    # Join
    df_pedestrian_activity_id = pd.merge(
        df_spat_id[["signalID", "date", "cycleNo", "cycleBegin", "cycleEnd"]], df_pedestrian_activity_id, 
        on=["signalID", "date", "cycleNo", "cycleBegin", "cycleEnd"], 
        how="left"
    )
    
    # Shift
    columns = [column for column in df_pedestrian_activity_id.columns if "Prev" in column]
    
    for column in columns:
        df_pedestrian_activity_id[column] = df_pedestrian_activity_id[column].shift(-1)
    
    df_pedestrian_activity_id = df_pedestrian_activity_id.fillna(0)
    
    phase_nos = [
        int(column[-1]) for column in df_pedestrian_activity_id.columns if "90" in column and not any(k in column for k in ["Cycle"])
    ]
    
    dict_indicator = {
        f"pedestrianActivityIndicatorPhase{phase_no}": [] for phase_no in phase_nos
    }
    
    for i in range(len(df_pedestrian_activity_id)):
        for phase_no in phase_nos:
            cycle_no_curr = df_pedestrian_activity_id.loc[i, "cycleNo"]
            pedestrian_activity_curr = df_pedestrian_activity_id.loc[i, f"pedestrianActivity90Phase{phase_no}CurrCycle"]
            pedestrian_activity_prev = df_pedestrian_activity_id.loc[i, f"pedestrianActivity90Phase{phase_no}PrevCycle"]
            
            if i == len(df_pedestrian_activity_id) - 1:
                if pedestrian_activity_curr > 0 or pedestrian_activity_prev > 0:
                    dict_indicator[f"pedestrianActivityIndicatorPhase{phase_no}"].append(1)
                else:
                    dict_indicator[f"pedestrianActivityIndicatorPhase{phase_no}"].append(0)
            else:
                cycle_no_next = df_pedestrian_activity_id.loc[i+1, "cycleNo"]	
                if (cycle_no_next - cycle_no_curr) > 1:
                    dict_indicator[f"pedestrianActivityIndicatorPhase{phase_no}"].append(0)
                else:
                    if pedestrian_activity_curr > 0 or pedestrian_activity_prev > 0:
                        dict_indicator[f"pedestrianActivityIndicatorPhase{phase_no}"].append(1)
                    else:
                        dict_indicator[f"pedestrianActivityIndicatorPhase{phase_no}"].append(0)
    
    
    columns = [column for column in df_pedestrian_activity_id.columns if "Activity" in column]
    
    df_pedestrian_activity_id = df_pedestrian_activity_id.drop(columns=columns)
    df_pedestrian_activity_id = pd.concat([df_pedestrian_activity_id, pd.DataFrame(dict_indicator)], 
                                          axis=1)
    
    df_pedestrian_activity_id["hour"] = df_pedestrian_activity_id["cycleBegin"].dt.hour
        
    activity_columns = [col for col in df_pedestrian_activity_id.columns if "Activity" in col]
    
    df_pedestrian_activity_id_hourly = (
        df_pedestrian_activity_id
        .groupby(["signalID", "date", "hour"])[activity_columns]
        .agg(['sum', 'count'])
        .reset_index()
    )
    
    df_pedestrian_activity_id_hourly.columns = [
        ''.join([col[0], col[1].capitalize()]).rstrip('_') if isinstance(col, tuple) else col
        for col in df_pedestrian_activity_id_hourly.columns
    ] # Sum: Count of 1s; Count: Count of 1s and 0s

    sum_columns = [col for col in df_pedestrian_activity_id_hourly.columns if 'Sum' in col]
    df_pedestrian_activity_id_hourly_sum = pd.melt(
        df_pedestrian_activity_id_hourly, 
        id_vars=["signalID", "date", "hour"], 
        value_vars=sum_columns, 
        var_name="phaseNo", 
        value_name="cyclesWithPedestrian"
    )
    df_pedestrian_activity_id_hourly_sum["phaseNo"] = (
        df_pedestrian_activity_id_hourly_sum["phaseNo"].str.extract(r'Phase(\d+)').astype(int)
    )

    count_columns = [col for col in df_pedestrian_activity_id_hourly.columns if 'Count' in col]
    df_pedestrian_activity_id_hourly_count = pd.melt(
        df_pedestrian_activity_id_hourly, 
        id_vars=["signalID", "date", "hour"], 
        value_vars=count_columns, 
        var_name="phaseNo", 
        value_name="totalCycles"
    )
    df_pedestrian_activity_id_hourly_count["phaseNo"] = (
        df_pedestrian_activity_id_hourly_count["phaseNo"].str.extract(r'Phase(\d+)').astype(int)
    )
    
    df_pedestrian_activity_id_hourly = pd.merge(
        df_pedestrian_activity_id_hourly_sum, df_pedestrian_activity_id_hourly_count, 
        on=["signalID", "date", "hour", "phaseNo"],
        how="inner"
    )
    
    return df_pedestrian_activity_id_hourly

In [None]:
# df_pedestrian_activity_id_hourly = calculate_hourly_activity_summary(signal_id="1500")

# # Get unique phases
# phase_nos = sorted(df_pedestrian_activity_id_hourly["phaseNo"].unique())

# # Create a 2x2 grid of subplots
# fig = make_subplots(
#     rows=2,
#     cols=2,
#     subplot_titles=[f"Phase No: {phase_no}" for phase_no in phase_nos],
#     shared_yaxes=False,  
#     vertical_spacing=0.15,
# )

# # Map phase to subplot positions
# row_col_mapping = [(1, 1), (1, 2), (2, 1), (2, 2)]

# # Add boxplots for each phase
# for phase_no, (row, col) in zip(phase_nos, row_col_mapping):
#     df_pedestrian_activity_id_hourly_phase = (
#         df_pedestrian_activity_id_hourly[df_pedestrian_activity_id_hourly["phaseNo"] == phase_no]
#     )
#     df_pedestrian_activity_id_hourly_phase = df_pedestrian_activity_id_hourly_phase.copy()

#     df_pedestrian_activity_id_hourly_phase["pedestrianProportion"] = (
#         df_pedestrian_activity_id_hourly_phase['cyclesWithPedestrian'] / df_pedestrian_activity_id_hourly_phase['totalCycles']
#     )

#     fig.add_trace(
#         go.Box(
#             x=df_pedestrian_activity_id_hourly_phase["hour"],
#             y=df_pedestrian_activity_id_hourly_phase["pedestrianProportion"],
#             # boxpoints='all', 
#             # jitter=0.3,  
#             # pointpos=-1.8,  
#             name=f"Phase No: {phase_no}",  # Legend name (optional)
#             marker_color="#408eb3",
#         ),
#         row=row,
#         col=col
#     )

# # Update layout
# fig.update_layout(
#     height=800,
#     width=1400,
#     title_text="Day-to-Day Variability in Pedestrian Presence by Hour",
#     showlegend=False,  # Optional: Hide legend as subplot titles already explain phases
#     font=dict(size=14),
# )

# # Set consistent Y-axis range for all subplots (0 to 1 for proportions)
# fig.update_yaxes(
#     # range=[0, 1], 
#     title_text='Proportion of Cycles with Pedestrians'
# )
# fig.update_xaxes(title_text='Hour of Day')

# # Export the figure as a high-resolution image
# fig.write_image("pedestrian_proportion.png",  width=1400, height=800, scale=2)

# # Display plot
# fig.show()

In [None]:
def beta_binomial_log_likelihood(params, successes, trials):
    alpha, beta = params
    # Log-likelihood for the Beta-Binomial model
    log_likelihood = np.sum(
        betaln(successes + alpha, trials - successes + beta) - betaln(alpha, beta)
    )
    return -log_likelihood

In [None]:
def calculate_ci_with_pedestrian_presence_probability(signal_id: str):
    df_pedestrian_activity_id_hourly = calculate_hourly_activity_summary(signal_id=signal_id)
    
    results = []
    
    # Get unique phases
    phase_nos = sorted(df_pedestrian_activity_id_hourly["phaseNo"].unique())
    
    for phase_no in phase_nos:
        df_pedestrian_activity_id_hourly_phase = (
            df_pedestrian_activity_id_hourly[df_pedestrian_activity_id_hourly["phaseNo"] == phase_no]
            .reset_index(drop=True)
        )
        signal_id = df_pedestrian_activity_id_hourly_phase.loc[0, "signalID"]
        
        for hour in np.arange(0, 24):
            df_pedestrian_activity_id_hourly_hour = (
                df_pedestrian_activity_id_hourly_phase[df_pedestrian_activity_id_hourly_phase["hour"] == hour]
            )
            successes = df_pedestrian_activity_id_hourly_hour["cyclesWithPedestrian"].values
            trials = df_pedestrian_activity_id_hourly_hour["totalCycles"].values
        
            # Skip hour if no data
            if len(successes) == 0:
                continue
        
            result = minimize(
                beta_binomial_log_likelihood,
                x0=[1, 1],
                args=(successes, trials),
                bounds=[(0.01, None), (0.01, None)],
            )
        
            alpha_hat, beta_hat = result.x
            mean_prob = alpha_hat / (alpha_hat + beta_hat)
        
            # 95% credible interval using Beta distribution
            lower_bound = beta.ppf(0.025, alpha_hat, beta_hat)
            upper_bound = beta.ppf(0.975, alpha_hat, beta_hat)
        
            results.append({
                'signalID': signal_id,
                'phaseNo': phase_no,
                'hour': hour,
                'alpha': alpha_hat,
                'beta': beta_hat,
                'probability': mean_prob,
                'lowerBound': lower_bound,
                'upperBound': upper_bound
            })
    
    df_pedestrian_presence_probability_id_hourly = pd.DataFrame(results)

    return df_pedestrian_presence_probability_id_hourly

##### Identification of Critical Hours

In [None]:
def concat_df():
    df_pedestrian_presence_probability_hourly = pd.DataFrame()
    
    for signal_id in tqdm.tqdm(signal_ids):
        try:
            df_pedestrian_presence_probability_id_hourly = calculate_ci_with_pedestrian_presence_probability(signal_id=signal_id)
            df_pedestrian_presence_probability_hourly = pd.concat(
                [df_pedestrian_presence_probability_hourly, df_pedestrian_presence_probability_id_hourly], 
                axis=0, ignore_index=True
            )
        except:
            print(signal_id)
    
    df_pedestrian_presence_probability_hourly = (
        df_pedestrian_presence_probability_hourly[df_pedestrian_presence_probability_hourly["phaseNo"].isin([2, 4, 6, 8])]
        .reset_index(drop=True)
    )

    return df_pedestrian_presence_probability_hourly

In [None]:
# # Filter and reshape data
# # turn_conflict_propensity_scores = (
# #     df_turn_conflict_propensity_scores_id_hourly
# #     .loc[:, "turnConflictPropensity"]
# #     .values
# # )

# pedestrian_presence_probability_hourly = (
#     df_pedestrian_presence_probability_hourly
#     .loc[:, "probability"]
#     .values
# )

# # Reshape data for K-Means
# pedestrian_presence_probability_hourly = pedestrian_presence_probability_hourly.reshape(-1, 1)

# # Apply k-means clustering with 2 clusters
# kmeans = KMeans(n_clusters=2, random_state=42)
# kmeans.fit(pedestrian_presence_probability_hourly)

# # Get cluster centroids and labels
# centroids = kmeans.cluster_centers_.flatten()
# labels = kmeans.labels_

# # Assuming pedestrian_presence_probability_hourly and labels are of equal length
# pedestrian_presence_probability_hourly = pedestrian_presence_probability_hourly.flatten()  # Ensure 1D array
# labels = labels.flatten()  # Ensure 1D array

# # Sort centroids to define thresholds logically
# centroids = np.sort(centroids)
# centroid = centroids[1]

# # Display thresholds
# print("Boundaries for Pedestrian Presence Probability:")
# print(f"Centroid: {centroid}")

# # Create a scatter plot using Plotly
# fig = px.scatter(
#     x=pedestrian_presence_probability_hourly, 
#     y=labels.astype(int),  # Ensure labels are integers
#     color=labels.astype(str),  # Convert labels to string for color grouping
#     labels={"x": "Pedestrian Presence Probability", "y": "Cluster Label", "color": "Cluster"},
# )

# # Add threshold lines
# fig.add_trace(go.Scatter(
#     x=[centroid, centroid],
#     y=[-0.5, max(labels) + 0.5],
#     mode="lines",
#     line=dict(color="red", dash="dash"),
#     name=f"Centroid: {np.round(centroid, 2)}"
# ))

# # Update layout with integer y-ticks
# fig.update_layout(
#     height=600,
#     width=1400,
#     xaxis=dict(title=dict(text="Pedestrian Presence Probability", font=dict(size=16))),
#     yaxis=dict(
#         title=dict(text="Cluster Label", font=dict(size=16)),
#         tickmode="linear",  # Ensures consistent tick intervals
#         tick0=0,  # Starting tick
#         dtick=1,  # Interval between ticks to force integers
#     ),
#     legend=dict(
#         title=dict(text="Cluster Legend", font=dict(size=16, color="black")),  # Bold legend title
#         font=dict(size=16, color="black"),  # Bold legend items
#     )
# )

# # Export the Plotly figure as a high-resolution image
# fig.write_image("../reports/6.1(a).png", width=1400, height=600, scale=2)

# # Show the plot
# fig.show()

In [None]:
def centroid():
    df_pedestrian_presence_probability_hourly = concat_df()

    pedestrian_presence_probability_hourly = (
        df_pedestrian_presence_probability_hourly
        .loc[:, "probability"]
        .values
    )
    
    # Reshape data for K-Means
    pedestrian_presence_probability_hourly = pedestrian_presence_probability_hourly.reshape(-1, 1)
    
    # Apply k-means clustering with 2 clusters
    kmeans = KMeans(n_clusters=2, random_state=42)
    kmeans.fit(pedestrian_presence_probability_hourly)
    
    # Get cluster centroids and labels
    centroids = kmeans.cluster_centers_.flatten()
    labels = kmeans.labels_
    
    # Assuming pedestrian_presence_probability_hourly and labels are of equal length
    pedestrian_presence_probability_hourly = pedestrian_presence_probability_hourly.flatten()  # Ensure 1D array
    labels = labels.flatten()  # Ensure 1D array
    
    # Sort centroids to define thresholds logically
    centroids = np.sort(centroids)
    centroid = centroids[1]

    return centroid, df_pedestrian_presence_probability_hourly

In [None]:
# centroid, df_pedestrian_presence_probability_hourly = centroid()

In [None]:
# df_pedestrian_presence_probability_id_hourly = (
#     df_pedestrian_presence_probability_hourly[df_pedestrian_presence_probability_hourly["signalID"] == "1500"]
# )
# phase_nos = df_pedestrian_presence_probability_id_hourly["phaseNo"].unique()
# phase_nos = [2, 6, 4, 8]

# # Create a 2x2 grid for subplots
# fig = make_subplots(
#     rows=2, cols=2,
#     subplot_titles=[f"Phase {phase_no}" for phase_no in phase_nos],
#     shared_xaxes=False, shared_yaxes=False,
#     vertical_spacing=0.15
# )

# # Iterate over phases and add traces
# row_col_mapping = [(1, 1), (1, 2), (2, 1), (2, 2)]  # Map phases to subplot positions
# for idx, (phase_no, (row, col)) in enumerate(zip(phase_nos, row_col_mapping)):
#     proc_df_pedestrian_presence_probability_id_hourly = (
#         df_pedestrian_presence_probability_id_hourly[df_pedestrian_presence_probability_id_hourly["phaseNo"] == phase_no]
#     )
    
#     # Calculate error bars
#     yerr_lower = (
#         proc_df_pedestrian_presence_probability_id_hourly["probability"] - proc_df_pedestrian_presence_probability_id_hourly["lowerBound"]
#     )
#     yerr_upper = (
#         proc_df_pedestrian_presence_probability_id_hourly["upperBound"] - proc_df_pedestrian_presence_probability_id_hourly["probability"]
#     )

#     # Add scatter plot with error bars
#     fig.add_trace(
#         go.Scatter(
#             x=proc_df_pedestrian_presence_probability_id_hourly["hour"],
#             y=proc_df_pedestrian_presence_probability_id_hourly["probability"],
#             error_y=dict(
#                 type="data",
#                 symmetric=False,
#                 array=yerr_upper,
#                 arrayminus=yerr_lower,
#                 color="blue",
#                 thickness=1.5,
#                 width=3,
#             ),
#             mode="markers+lines",
#             marker=dict(size=8),
#             name=f"Phase {phase_no}",
#         ),
#         row=row, col=col
#     )

#     # Add a horizontal threshold line
#     fig.add_shape(
#         type="line",
#         x0=0,
#         x1=23,
#         y0=0.13,
#         y1=0.13,
#         line=dict(color="red", dash="dash"),
#         row=row, col=col
#     )
#     # # Add a horizontal threshold line
#     # fig.add_shape(
#     #     type="line",
#     #     x0=0,
#     #     x1=23,
#     #     y0=0.05,
#     #     y1=0.05,
#     #     line=dict(color="orange", dash="dash"),
#     #     row=row, col=col
#     # )

# # Update layout
# fig.update_layout(
#     # title="Probability of Pedestrian Presence by Hour",
#     height=800,
#     width=1400,
#     showlegend=False,
#     xaxis_title="Hour of Day",
#     yaxis_title="Probability of Pedestrian Demand",
#     # yaxis=dict(range=[0, 1]),  
#     font=dict(size=14)
# )

# # Update axis labels for shared x/y axes
# fig.update_xaxes(
#     title_text="Hour of Day",
#     # dtick=1,
#     # tickvals=list(range(24)),
#     # ticktext=[f"{hour}:00" for hour in range(24)],
#     # tickformat="%H:00",
#     # tickangle=-45,
#     # row=2, col=1,  # Shared x-axis title position
#     title_font=dict(size=16),
#     tickfont=dict(size=16),
# )

# # Update y-axes for shared configuration
# fig.update_yaxes(
#     title_text="Probability of Pedestrian Presence", 
#     # range=[0, 0.75],  
#     tickvals=np.linspace(0, 1, 11),  # Tick values from 0 to 1 with step 0.1
#     title_font=dict(size=16),
#     tickfont=dict(size=16),
# )

# # Export the figure as a high-resolution image
# fig.write_image("../reports/6.1(b).png",  width=1400, height=800, scale=2)

# # Show plot
# fig.show()

In [None]:
def critical_hour_with_pedestrian_presence_probability(signal_id, centroid):
    dict_critical_hour_with_pedestrian_presence_probability_id = {"signalID": [], "approachDir": [], "phaseNo": []}
    dict_critical_hour_with_pedestrian_presence_probability_id.update(
        {str(hour): [] for hour in range(24)}
    )
    df_config_id = pd.read_csv(f"../data/interim/atspm/fdot_d5/signal_config/{signal_id}.csv")
    
    phase_nos_config = df_config_id["phaseNo"].unique().tolist()
    phase_nos_config = [int(phase_no) for phase_no in phase_nos_config if pd.notna(phase_no) and phase_no % 2 == 0]
    
    try:
        df_pedestrian_presence_probability_id_hourly = calculate_ci_with_pedestrian_presence_probability(signal_id=signal_id)
        df_pedestrian_presence_probability_id_hourly = (
            df_pedestrian_presence_probability_id_hourly[df_pedestrian_presence_probability_id_hourly["phaseNo"].isin([2, 4, 6, 8])]
            .reset_index(drop=True)
        )
        
        phase_nos_demand = df_pedestrian_presence_probability_id_hourly["phaseNo"].unique().tolist()
        phase_nos_demand = [int(phase_no) for phase_no in phase_nos_demand]
    
        sequence = [2, 6, 4, 8]
        phase_nos = sorted(list(set(phase_nos_config + phase_nos_demand)), key=lambda x: sequence.index(x))
        
        for phase_no in phase_nos:
            dict_critical_hour_with_pedestrian_presence_probability_id["signalID"].append(signal_id)
    
            if phase_no in phase_nos_config:
                approach_dir = df_config_id[df_config_id["phaseNo"] == phase_no]["approach"].unique()[0].split()[-1][0:-1]
            else:
                approach_dir = "NA"
        
            dict_critical_hour_with_pedestrian_presence_probability_id["approachDir"].append(approach_dir)
            dict_critical_hour_with_pedestrian_presence_probability_id["phaseNo"].append(phase_no)
        
            for i in range(24):
                proc_df_pedestrian_presence_probability_id_hourly = (
                    df_pedestrian_presence_probability_id_hourly
                    .query("hour == @i & phaseNo == @phase_no")
                )
        
                if len(proc_df_pedestrian_presence_probability_id_hourly) == 0:
                    dict_critical_hour_with_pedestrian_presence_probability_id[str(i)].append(np.nan)
                else:
                    proc_df_pedestrian_presence_probability_id_hourly = (
                        proc_df_pedestrian_presence_probability_id_hourly
                        .reset_index(drop=True)
                    )
                    pedestrian_presence_probability_lower = (
                        proc_df_pedestrian_presence_probability_id_hourly.loc[0, "lowerBound"]
                    )
                    
                    if pedestrian_presence_probability_lower >= centroid:
                        dict_critical_hour_with_pedestrian_presence_probability_id[str(i)].append(1)
                    else:
                        dict_critical_hour_with_pedestrian_presence_probability_id[str(i)].append(0)
    
        return pd.DataFrame(dict_critical_hour_with_pedestrian_presence_probability_id)
    except:
        for phase_no in phase_nos_config:
            dict_critical_hour_with_pedestrian_presence_probability_id["signalID"].append(signal_id)
            dict_critical_hour_with_pedestrian_presence_probability_id["approachDir"].append("NA")
            dict_critical_hour_with_pedestrian_presence_probability_id["phaseNo"].append(np.nan)
                                                                      
            for i in range(24):
                dict_critical_hour_with_pedestrian_presence_probability_id[str(i)].append(np.nan)

        return pd.DataFrame(dict_critical_hour_with_pedestrian_presence_probability_id)

In [None]:
# df_critical_hour_with_pedestrian_presence_probability = pd.DataFrame()

# for signal_id in tqdm.tqdm(signal_ids):
#     df_critical_hour_with_pedestrian_presence_probability_id = critical_hour_with_pedestrian_presence_probability(
#         signal_id=signal_id, centroid=centroid
#     )
#     df_critical_hour_with_pedestrian_presence_probability = (
#         pd.concat(
#             [df_critical_hour_with_pedestrian_presence_probability, df_critical_hour_with_pedestrian_presence_probability_id], 
#             axis=0, ignore_index=True)
#     )

##### Recommendation

In [None]:
# df_pr_recommendations = float_to_int(df_critical_hour_with_pedestrian_presence_probability)
# df_pr_recommendations.to_csv("../reports/recommendations/pr_recommendations.csv", 
#                              index=False)

In [None]:
# proc_df_pr_recommendations = df_pr_recommendations[df_pr_recommendations["signalID"] == "1500"]

# # Create a 2x2 grid for the subplots
# fig = make_subplots(
#     rows=2,
#     cols=2,
#     subplot_titles=[f"Phase {phase}" for phase in proc_df_pr_recommendations["phaseNo"].unique()],
#     shared_xaxes=False,
#     # shared_yaxes=False,
#     vertical_spacing=0.6
# )

# # Define the color mapping for PR recommendations
# colors = {0: "#27ae60", 1: "#cb4335"}  # Green: PR Not Needed, Red: PR Needed

# # Add a heatmap for each phase in the 2x2 layout
# phase_nos = proc_df_pr_recommendations["phaseNo"].unique()

# for idx, phase_no in enumerate(phase_nos, start=1):
#     # Get subplot row and column
#     row, col = divmod(idx - 1, 2)
#     row += 1
#     col += 1

#     # Filter data for the current phase
#     proc_df_pr_recommendations_phase = (
#         proc_df_pr_recommendations[proc_df_pr_recommendations["phaseNo"] == phase_no].iloc[:, 3:]
#     )  # Extract hourly binary data
    
#     pr_recommendations_phase = proc_df_pr_recommendations_phase.values  # Convert to matrix
#     x_labels = list(range(24))  # Hours
#     y_labels = [
#         f"{row['signalID']} ({row['approachDir']})" for _, row in proc_df_pr_recommendations[proc_df_pr_recommendations["phaseNo"] == phase_no].iterrows()
#     ]

#     # Add heatmap trace
#     fig.add_trace(
#         go.Heatmap(
#             z=pr_recommendations_phase,
#             x=x_labels,
#             y=y_labels,
#             colorscale=[[0, "#27ae60"], [1, "#cb4335"]],  # Define two discrete colors
#             showscale=False,  # Disable color bar
#             zmin=0,
#             zmax=0.5,
#         ),
#         row=row,
#         col=col,
#     )

# # Add a legend manually
# fig.add_trace(
#     go.Scatter(
#         x=[None], y=[None],
#         mode="markers",
#         marker=dict(size=14, color="#27ae60"),
#         name="PR Not Recommended",
#     )
# )
# fig.add_trace(
#     go.Scatter(
#         x=[None], y=[None],
#         mode="markers",
#         marker=dict(size=14, color="#cb4335"),
#         name="PR Recommended",
#     )
# )

# # Update layout
# fig.update_layout(
#     height=400,
#     width=1400,
#     # title="Hourly Signal PR Recommendations for Each Phase",
#     # yaxis=dict(title="Signal ID (Direction)"),
# )

# # Update layout with legend font size
# fig.update_layout(
#     legend=dict(
#         orientation="h",  # Horizontal legend
#         x=0.5,            # Center horizontally
#         y=-0.4,           # Position below the plot
#         xanchor="center",
#         yanchor="top",
#         font=dict(size=14),  # Set font size for the legend items
#         # title=dict(text="Status", font=dict(size=14)),  # Title with font size
#     )
# )

# # Customize x-axis for each subplot
# for i in range(1, len(phase_nos) + 1):
#     fig.update_xaxes(
#         title="Hour of Day",
#         tickmode="array",
#         tickvals=list(range(24)),
#         ticktext=[f"{hour:02d}:00" for hour in range(24)],
#         row=((i - 1) // 2) + 1,
#         col=((i - 1) % 2) + 1,
#         tickangle=-90,
#         title_font=dict(size=14),
#         tickfont=dict(size=14),
#     )

#     fig.update_yaxes(
#         title_font=dict(size=16),
#         tickfont=dict(size=14),
#     )

# # Export the figure as a high-resolution image
# fig.write_image("../reports/6.2.png",  width=1400, height=400, scale=2)

# fig.show()

### Leading Pedestrian Interval and No Right Turn on Red

#### v1

##### Calculation of Pedestrian-Vehicle (Right-Turn) Conflict Propensity

In [None]:
def hourly_aggregate(df):
    """
    Aggregates a DataFrame to calculate hourly statistics.

    Parameters
    ----------
    df : pd.DataFrame
        The input DataFrame containing cycle-level data.

    Returns
    -------
    pd.DataFrame
        The aggregated DataFrame with hourly statistics.
    """
    # Convert cycleBegin to datetime if it exists
    if "cycleBegin" in df.columns:
        df["cycleBegin"] = pd.to_datetime(df["cycleBegin"])  # Ensure cycleBegin is in datetime format

        # Add an hour column for grouping (extract hour from cycleBegin)
        df["hour"] = df["cycleBegin"].dt.hour

        # # Add isWeekday column to distinguish weekdays (True for Monday-Friday, False for weekends)
        # df["isWeekday"] = df["cycleBegin"].dt.dayofweek < 5

        # Drop all datetime columns after extracting necessary information
        df = df.loc[:, ~df.columns.map(lambda col: pd.api.types.is_datetime64_any_dtype(df[col]))]

    # Drop unnecessary columns
    if "cycleNo" in df.columns:  # Drop the cycle number column if it exists
        df = df.drop(columns="cycleNo")

    # Define column categories for aggregation
    occupancy_columns = [col for col in df.columns if "Occupancy" in col]  # Columns related to occupancy
    volume_columns = [col for col in df.columns if "Volume" in col]    # Columns related to activity
    
    duration_columns = [col for col in df.columns if "Duration" in col]   # Columns related to duration
    activity_columns = [col for col in df.columns if "Activity" in col and col not in duration_columns]  # Activity columns excluding durations

    df[occupancy_columns + volume_columns + duration_columns + activity_columns] = (
        df
        [occupancy_columns + volume_columns + duration_columns + activity_columns]
        .replace(0, np.nan)
    )
    
    # Prepare the aggregation dictionary
    agg_dict = {
        **{col: "mean" for col in occupancy_columns + duration_columns},  # Calculate the mean for occupancy columns
        **{col: "sum" for col in activity_columns + volume_columns}       # Calculate the sum for activity and activity columns
    }

    # Perform the aggregation by grouping by signalID, date, isWeekday, and hour
    # df_hourly = df.groupby(["signalID", "date", "isWeekday", "hour"]).agg(agg_dict).reset_index()
    df_hourly = df.groupby(["signalID", "date", "hour"]).agg(agg_dict).reset_index()

    # Calculate additional statistics (max and min) for the grouped data
    # agg_dict = {
    #     **{col: ["max"] for col in df.columns if col not in ["signalID", "date", "isWeekday", "hour"]}
    # }
    # df_hourly_stats = df_hourly.groupby(["signalID", "isWeekday", "hour"]).agg(agg_dict).reset_index()
    agg_dict = {
        **{col: ["max"] for col in df.columns if col not in ["signalID", "date", "hour"]}
    }
    df_hourly_stats = df_hourly.groupby(["signalID", "hour"]).agg(agg_dict).reset_index()

    # Flatten multi-level column names for readability
    df_hourly_stats.columns = [
        f"{col[0]}{col[1].capitalize()}" if col[1] else col[0] for col in df_hourly_stats.columns.to_flat_index()
    ]

    # Merge max/min results back with the aggregated hourly data
    # df_hourly = df_hourly.merge(
    #     df_hourly_stats,
    #     on=["signalID", "isWeekday", "hour"],
    #     how="left"
    # )

    df_hourly = df_hourly.merge(
        df_hourly_stats,
        on=["signalID", "hour"],
        how="left"
    )

    # Filter and sort the columns
    columns = [
        column for column in df_hourly.columns
        for param in ["ActivityDuration", "Occupancy", "Volume", "ActivityPhase"]
        if param in column
    ]
    columns = sorted(columns, key=lambda x: x.split("Phase")[-1][0])  # Sort columns by phase number

    # Reorganize DataFrame columns to make it consistent and intuitive
    # df_hourly = df_hourly[["signalID", "date", "isWeekday", "hour"] + columns]
    df_hourly = df_hourly[["signalID", "date", "hour"] + columns]

    # Return the aggregated DataFrame
    return df_hourly

In [None]:
def compute_diminishing_returns(array, k=0.1):
    """
    Apply diminishing returns using an exponential decay function.

    Parameters:
        array (np.ndarray): The input array of values.
        k (float): The decay rate (higher values result in faster diminishing returns).
    
    Returns:
        np.ndarray: Transformed array with diminishing returns applied.
    """
    if not isinstance(array, np.ndarray):
        raise ValueError("Input must be a NumPy array.")
    
    # Apply the diminishing return formula: 1 - e^(-k * x)
    return 1 - np.exp(-k * array)

In [None]:
def sigmoid(array):
    """
    Apply the sigmoid function to a NumPy array.

    Parameters:
        array (np.ndarray): The input array of values.
    
    Returns:
        np.ndarray: Transformed array after applying the sigmoid function.
    """
    if not isinstance(array, np.ndarray):
        raise ValueError("Input must be a NumPy array.")
    
    # Apply the sigmoid formula: 1 / (1 + e^(-x))
    return 1 / (1 + np.exp(-array))

In [None]:
def compute_conflict_propensity_scores(df: pd.DataFrame, dict_lane_type_weights: dict = None):
    """
    Computes conflict propensity scores for each phase in each row of the DataFrame,
    considering all lanes and combining them into phase-level scores using weighted averages.

    Parameters
    ----------
    df : pd.DataFrame
        DataFrame with dynamically generated columns representing phases, e.g.,
        'vehicleOccupancyPhase4R', 'vehicleActivityPhase4R', etc.

    dict_lane_type_weights : dict, optional
        Dictionary mapping lane type suffixes (e.g., 'R', 'TR', 'LTR') to numeric weights.
        Example:
            {
                "R":   1.0,    # Right-only
                "TR":  0.5,    # Through+Right
                "LTR": 0.33    # Left+Through+Right
            }
        If None, default weights will be used.

    Returns
    -------
    pd.DataFrame
        A DataFrame with phase-level conflict propensity scores for each row.
    """
    # Assign default lane type weights if none are provided
    if dict_lane_type_weights is None:
        dict_lane_type_weights = {
            "R":   1.0,   # Right-only
            "TR":  0.5,   # Through+Right
            "LTR": 0.33   # Left+Through+Right
        }

    # Extract column names and identify unique phases dynamically
    columns = df.columns
    phase_nos = sorted(
        {col.split("Phase")[-1][0] for col in columns if "Phase" in col and col.split("Phase")[-1][0].isdigit()}
    )

    # Initialize a dictionary to store conflict propensity scores for each phase
    dict_conflict_propensity_scores = {
        f"turnConflictPropensityPhase{phase_no}": [] for phase_no in phase_nos
    }

    # Compute conflict propensity scores for each phase
    for phase_no in phase_nos:
        weighted_lane_scores = np.zeros(len(df))  # Initialize weighted scores
        total_lane_weight = 0  # Initialize total lane weight for normalization

        # Calculate total lane weights for normalization
        for lane_type, lane_weight in dict_lane_type_weights.items():
            volume_col = f"vehicleVolumePhase{phase_no}{lane_type}"
            occupancy_col = f"vehicleOccupancyPhase{phase_no}{lane_type}"

            # Only consider lanes that exist in the dataset
            if any(col in columns for col in [volume_col, occupancy_col]):
                total_lane_weight += lane_weight

        # Safeguard: Skip if no valid lanes exist
        if total_lane_weight == 0:
            continue

        # Process each lane type
        for lane_type, lane_weight in dict_lane_type_weights.items():
            volume_col = f"vehicleVolumePhase{phase_no}{lane_type}"
            occupancy_col = f"vehicleOccupancyPhase{phase_no}{lane_type}"
            ped_activity_col = f"pedestrianActivityPhase{phase_no}"
            ped_activity_duration_col = f"pedestrianActivityDurationPhase{phase_no}"

            # Ensure pedestrian activity columns exist
            if ped_activity_col not in columns or ped_activity_duration_col not in columns:
                continue

            # Compute vehicle factors
            vehicle_factors = np.zeros(len(df))  # Initialize as zeros
            if all(col in columns for col in [occupancy_col, volume_col]):
                # Calculate occupancy and volume factors if both columns exist
                volume_factors = compute_diminishing_returns(df[volume_col].values, k=0.2) * 0.5
                occupancy_factors = (df[occupancy_col] / df[ped_activity_duration_col].replace(0, 1)).values * 0.5
                vehicle_factors = volume_factors + occupancy_factors
            elif occupancy_col in columns:
                vehicle_factors = (df[occupancy_col] / df[ped_activity_duration_col].replace(0, 1)).values
            elif volume_col in columns:
                vehicle_factors = compute_diminishing_returns(df[volume_col].values, k=0.2)

            # Compute pedestrian factors
            pedestrian_factors = df[ped_activity_col].values

            # Compute lane-level scores
            lane_scores = compute_diminishing_returns(vehicle_factors * pedestrian_factors, k=0.25)

            # Update weighted scores
            weighted_lane_scores += (lane_scores * lane_weight) / (lane_weight / total_lane_weight)

        # Store phase-level scores
        dict_conflict_propensity_scores[f"turnConflictPropensityPhase{phase_no}"] = weighted_lane_scores.tolist()

    # Return the final DataFrame with conflict propensity scores
    return pd.DataFrame(dict_conflict_propensity_scores)

In [None]:
def get_conflict_propensity_thresholds(data, n_clusters=3, random_state=42):
    """
    Calculate thresholds for conflict propensity using k-means clustering.

    Parameters:
    data (numpy.ndarray): Array of conflict propensity scores.
    n_clusters (int): Number of clusters for k-means.
    random_state (int): Random state for reproducibility.

    Returns:
    tuple: (medium_propensity_threshold, high_propensity_threshold)
    """
    # Reshape data if needed
    if len(data.shape) == 1:
        data = data.reshape(-1, 1)

    # Apply k-means clustering
    kmeans = KMeans(n_clusters=n_clusters, random_state=random_state)
    kmeans.fit(data)

    # Get cluster centroids and sort them
    centroids = np.sort(kmeans.cluster_centers_.flatten())

    # Define thresholds
    medium_propensity_threshold = centroids[1]
    high_propensity_threshold = centroids[2]

    return medium_propensity_threshold, high_propensity_threshold

In [None]:
def caluate_ci_with_conflict_propensity(signal_id):
    df_turn_conflict_propensity_id = load_data(
        dirpath="../data/production/atspm/fdot_d5/feature_extraction/feature/cycle/pedestrian_traffic/turn_conflict_intensity",
        signal_id=f"{signal_id}"
    )
    
    df_turn_conflict_propensity_id["hour"] = df_turn_conflict_propensity_id["cycleBegin"].dt.hour
    
    columns = (
        [col for col in df_turn_conflict_propensity_id.columns if not any(key in col for key in ["cycle", "Begin", "End"])]
    )
    columns = sorted(columns, key=lambda x: x[0] + x[-1])
    
    df_turn_conflict_propensity_id = df_turn_conflict_propensity_id[columns]
    
    df_turn_conflict_propensity_id_hourly = hourly_aggregate(df=df_turn_conflict_propensity_id)
    df_turn_conflict_propensity_scores_id_hourly = compute_conflict_propensity_scores(df=df_turn_conflict_propensity_id_hourly)
    columns = df_turn_conflict_propensity_scores_id_hourly.columns.tolist()
    
    df_turn_conflict_propensity_scores_id_hourly["signalID"] = df_turn_conflict_propensity_id_hourly[["signalID"]]
    df_turn_conflict_propensity_scores_id_hourly["date"] = df_turn_conflict_propensity_id_hourly[["date"]]
    df_turn_conflict_propensity_scores_id_hourly["hour"] = df_turn_conflict_propensity_id_hourly[["hour"]]
    
    df_turn_conflict_propensity_scores_id_hourly = (
        pd.melt(df_turn_conflict_propensity_scores_id_hourly, 
                id_vars=["signalID", "date", "hour"], 
                value_vars=columns, var_name="phaseNo", value_name="turnConflictPropensity")
    )
    
    df_turn_conflict_propensity_scores_id_hourly["phaseNo"] = (
        df_turn_conflict_propensity_scores_id_hourly["phaseNo"]
        .str
        .extract(r"Phase(\d)")
        .astype(int)
    )
    
    df_turn_conflict_propensity_scores_id_hourly = (
        df_turn_conflict_propensity_scores_id_hourly[df_turn_conflict_propensity_scores_id_hourly["turnConflictPropensity"] > 0]
    )

    # Group by `phaseNo` and `hour` and calculate bootstrap confidence intervals and mean
    turn_conflict_propensity_scores_id = []
    for (signal_id, hour, phase_no), group in df_turn_conflict_propensity_scores_id_hourly.groupby(["signalID", "hour", "phaseNo"]):
        values = group["turnConflictPropensity"].values
        lower, upper = bootstrap_ci(values)
        mean_value = np.mean(values)  # Calculate the mean
        turn_conflict_propensity_scores_id.append(
            {
                "signalID": signal_id, "hour": hour, "phaseNo": phase_no, 
                "turnConflictPropensityAvg": mean_value, "lowerCI": lower, "upperCI": upper
            }
        )
    
    # Create a results DataFrame
    proc_df_turn_conflict_propensity_scores_id_hourly = pd.DataFrame(turn_conflict_propensity_scores_id)

    return df_turn_conflict_propensity_scores_id_hourly, proc_df_turn_conflict_propensity_scores_id_hourly

In [None]:
# !pip install -U kaleido

In [None]:
# df_turn_conflict_propensity_scores_id_hourly, proc_df_turn_conflict_propensity_scores_id_hourly = (
#     caluate_ci_with_conflict_propensity(signal_id="1500")
# )

# # Filter and reshape data
# # turn_conflict_propensity_scores = (
# #     df_turn_conflict_propensity_scores_id_hourly
# #     .loc[:, "turnConflictPropensity"]
# #     .values
# # )

# turn_conflict_propensity_scores = (
#     proc_df_turn_conflict_propensity_scores_id_hourly
#     .loc[:, "turnConflictPropensityAvg"]
#     .values
# )

# # Reshape data for K-Means
# turn_conflict_propensity_scores = turn_conflict_propensity_scores.reshape(-1, 1)

# # Apply k-means clustering with 3 clusters (low, medium, high propensity)
# kmeans = KMeans(n_clusters=3, random_state=42)
# kmeans.fit(turn_conflict_propensity_scores)

# # Get cluster centroids and labels
# centroids = kmeans.cluster_centers_.flatten()
# labels = kmeans.labels_

# # Assuming turn_conflict_propensity_scores and labels are of equal length
# turn_conflict_propensity_scores = turn_conflict_propensity_scores.flatten()  # Ensure 1D array
# labels = labels.flatten()  # Ensure 1D array

# # Sort centroids to define thresholds logically
# centroids = np.sort(centroids)
# medium_propensity_centroid = centroids[1]
# high_propensity_centroid = centroids[2]

# # Display thresholds
# print("Boundaries for Conflict Propensity:")
# print(f"Medium Propensity Centroid: {medium_propensity_centroid}")
# print(f"High Propensity Centroid: {high_propensity_centroid}")

# # Create a scatter plot using Plotly
# fig = px.scatter(
#     x=turn_conflict_propensity_scores, 
#     y=labels.astype(int),  # Ensure labels are integers
#     color=labels.astype(str),  # Convert labels to string for color grouping
#     labels={"x": "Conflict Propensity", "y": "Cluster Label", "color": "Cluster"},
#     # title="Clustering for Conflict Propensity"
# )

# # Add threshold lines
# fig.add_trace(go.Scatter(
#     x=[medium_propensity_centroid, medium_propensity_centroid],
#     y=[-0.5, max(labels) + 0.5],
#     mode="lines",
#     line=dict(color="orange", dash="dash"),
#     name=f"Medium Propensity Centroid: {np.round(medium_propensity_centroid, 2)}"
# ))
# fig.add_trace(go.Scatter(
#     x=[high_propensity_centroid, high_propensity_centroid],
#     y=[-0.5, max(labels) + 0.5],
#     mode="lines",
#     line=dict(color="red", dash="dash"),
#     name=f"High Propensity Centroid: {np.round(high_propensity_centroid, 2)}"
# ))

# # Update layout with integer y-ticks
# fig.update_layout(
#     height=600,
#     width=1400,
#     xaxis=dict(title=dict(text="Conflict Propensity", font=dict(size=16))),
#     yaxis=dict(
#         title=dict(text="Cluster Label", font=dict(size=16)),
#         tickmode="linear",  # Ensures consistent tick intervals
#         tick0=0,  # Starting tick
#         dtick=1,  # Interval between ticks to force integers
#     ),
#     legend=dict(
#         title=dict(text="Cluster Legend", font=dict(size=16, color="black")),  # Bold legend title
#         font=dict(size=16, color="black"),  # Bold legend items
#     )
# )

# # Export the Plotly figure as a high-resolution image
# fig.write_image("../reports/7.2(a).png", width=1400, height=600, scale=2)

# # Show the plot
# fig.show()

In [None]:
# df_turn_conflict_propensity_scores_id_hourly, proc_df_turn_conflict_propensity_scores_id_hourly = (
#     caluate_ci_with_conflict_propensity(signal_id="1500")
# )
# phase_nos = proc_df_turn_conflict_propensity_scores_id_hourly["phaseNo"].unique().tolist()
        
# # Create a 2x2 grid for subplots
# fig = make_subplots(
#     rows=2, cols=2,
#     subplot_titles=[f"Phase {phase_no}" for phase_no in phase_nos],
#     shared_xaxes=False, shared_yaxes=False,
#     vertical_spacing=0.15
# )

# # Iterate over phases and add traces
# row_col_mapping = [(1, 1), (1, 2), (2, 1), (2, 2)]  # Map phases to subplot positions
# for idx, (phase_no, (row, col)) in enumerate(zip(phase_nos, row_col_mapping)):
#     proc_df_turn_conflict_propensity_scores_id_hourly_phase = (
#         proc_df_turn_conflict_propensity_scores_id_hourly[proc_df_turn_conflict_propensity_scores_id_hourly["phaseNo"] == phase_no]
#     )

#     turn_conflict_propensity_scores = (
#         proc_df_turn_conflict_propensity_scores_id_hourly
#         .loc[:, "turnConflictPropensityAvg"]
#         .values
#     )
#     # turn_conflict_propensity_scores = (
#     #     # df_turn_conflict_propensity_scores_id_hourly[df_turn_conflict_propensity_scores_id_hourly["phaseNo"] == phase_no]
#     #     df_turn_conflict_propensity_scores_id_hourly
#     #     .loc[:, "turnConflictPropensity"]
#     #     .values
#     # )
    
#     medium_propensity_threshold, high_propensity_threshold = get_conflict_propensity_thresholds(turn_conflict_propensity_scores)
    
#     # Calculate error bars
#     yerr_lower = proc_df_turn_conflict_propensity_scores_id_hourly_phase["turnConflictPropensityAvg"] - proc_df_turn_conflict_propensity_scores_id_hourly_phase["lowerCI"]
#     yerr_upper = proc_df_turn_conflict_propensity_scores_id_hourly_phase["upperCI"] - proc_df_turn_conflict_propensity_scores_id_hourly_phase["turnConflictPropensityAvg"]

#     # Add scatter plot with error bars
#     fig.add_trace(
#         go.Scatter(
#             x=proc_df_turn_conflict_propensity_scores_id_hourly_phase["hour"],
#             y=proc_df_turn_conflict_propensity_scores_id_hourly_phase["turnConflictPropensityAvg"],
#             error_y=dict(
#                 type="data",
#                 symmetric=False,
#                 array=yerr_upper,
#                 arrayminus=yerr_lower,
#                 color="blue",
#                 thickness=1.5,
#                 width=3,
#             ),
#             mode="markers+lines",
#             marker=dict(size=8),
#             name=f"Phase {phase_no}",
#         ),
#         row=row, col=col
#     )

#     # Add a horizontal threshold line
#     fig.add_shape(
#         type="line",
#         x0=0,
#         x1=23,
#         y0=medium_propensity_threshold,
#         y1=medium_propensity_threshold,
#         line=dict(color="orange", dash="dash"),
#         row=row, col=col
#     )
#     # Add a horizontal threshold line
#     fig.add_shape(
#         type="line",
#         x0=0,
#         x1=23,
#         y0=high_propensity_threshold,
#         y1=high_propensity_threshold,
#         line=dict(color="red", dash="dash"),
#         row=row, col=col
#     )

# # Update layout
# fig.update_layout(
#     # title="Proportion of Cycles Recommended with PR by Hour with 95% Confidence Intervals",
#     height=800,
#     width=1400,
#     showlegend=False,
#     xaxis_title="Hour of Day",
#     yaxis_title="Conflict Propensity",
#     # yaxis=dict(range=[0, 1]),  # Set y-axis range
#     font=dict(size=14)
# )

# # Update axis labels for shared x/y axes
# fig.update_xaxes(
#     title_text="Hour of Day",
#     # dtick=1,
#     # tickvals=list(range(24)),
#     # ticktext=[f"{hour}:00" for hour in range(24)],
#     # tickformat="%H:00",
#     # tickangle=-45,
#     # row=2, col=1,  # Shared x-axis title position
#     title_font=dict(size=16),
#     tickfont=dict(size=16),
# )

# # Update y-axes for shared configuration
# fig.update_yaxes(
#     title_text="Conflict Propensity", 
#     # range=[0, 1],  # Set y-axis range for all subplots
#     # tickvals=np.linspace(0, 1, 11),  # Tick values from 0 to 1 with step 0.1
#     title_font=dict(size=16),
#     tickfont=dict(size=16),
# )

# # Export the Plotly figure as a high-resolution image
# fig.write_image("../reports/7.2(b).png", width=1400, height=800, scale=2)

# # Show plot
# fig.show()

##### Identification of Critical Hour

In [None]:
def hour_with_conflict_propensity(signal_id, metric="lpi"):
    dict_hour_with_conflict_propensity_id = {"signalID": [], "approachDir": [], "phaseNo": []}
    dict_hour_with_conflict_propensity_id.update(
        {str(hour): [] for hour in range(24)}
    )
    df_config_id = pd.read_csv(f"../data/interim/atspm/fdot_d5/signal_config/{signal_id}.csv")
    
    phase_nos_config = df_config_id["phaseNo"].unique().tolist()
    phase_nos_config = [int(phase_no) for phase_no in phase_nos_config if pd.notna(phase_no) and phase_no % 2 == 0]
    
    try:
        df_turn_conflict_propensity_scores_id_hourly, proc_df_turn_conflict_propensity_scores_id_hourly = (
            caluate_ci_with_conflict_propensity(signal_id=signal_id)
        )
        
        phase_nos_conflict_propensity = proc_df_turn_conflict_propensity_scores_id_hourly["phaseNo"].unique().tolist()
        phase_nos_conflict_propensity = [int(phase_no) for phase_no in phase_nos_conflict_propensity]
        
        sequence = [2, 6, 4, 8]
        phase_nos = sorted(list(set(phase_nos_config + phase_nos_conflict_propensity)), key=lambda x: sequence.index(x))
    
        turn_conflict_propensity_scores = (
            proc_df_turn_conflict_propensity_scores_id_hourly
            # [proc_df_turn_conflict_propensity_scores_id_hourly["phaseNo"] == phase_no]
            .loc[:, "turnConflictPropensityAvg"]
            .values
        )
        
        # turn_conflict_propensity_scores = (
        #     df_turn_conflict_propensity_scores_id_hourly
        #     # df_turn_conflict_propensity_scores_id_hourly[df_turn_conflict_propensity_scores_id_hourly["phaseNo"] == phase_no]
        #     .loc[:, "turnConflictPropensity"]
        #     .values
        # )
        
        medium_propensity_threshold, high_propensity_threshold = get_conflict_propensity_thresholds(turn_conflict_propensity_scores)
        
        if metric == "lpi": # leading pedestrian interval
            threshold = medium_propensity_threshold
        elif metric == "nrtor": # no right turn on red
            threshold = high_propensity_threshold
        
        for phase_no in phase_nos:
            dict_hour_with_conflict_propensity_id["signalID"].append(signal_id)
    
            if phase_no in phase_nos_config:
                approach = df_config_id[df_config_id["phaseNo"] == phase_no]["approach"].unique()[0]
                approach_dir = df_config_id[df_config_id["phaseNo"] == phase_no]["approach"].unique()[0].split()[-1][0:-1]
            else:
                approach = "NA"
                approach_dir = "NA"
        
            dict_hour_with_conflict_propensity_id["approachDir"].append(approach_dir)
            dict_hour_with_conflict_propensity_id["phaseNo"].append(phase_no)
    
            lane_types = df_config_id[df_config_id["phaseNo"] == phase_no]["laneType"].unique().tolist()
            lane_types = [lane_type for lane_type in lane_types if "Right" in lane_type and len(lane_type.split()) == 1]
    
            if approach != "NA" and len(lane_types) != 0:
                stop_bar_distances = (
                    df_config_id.query("approach == @approach and laneType == @lane_types[-1]")["stopBarDistance"].values.tolist()
                ) 
                # if stop_bar_distances == []:
                #     stop_bar_distances = [np.nan]
            else:
                stop_bar_distances = (
                    df_config_id.query("phaseNo == @phase_nos")["stopBarDistance"].values.tolist()
                ) 
    
            if (all(pd.isna(stop_bar_distance) for stop_bar_distance in stop_bar_distances) or approach == "NA") and len(lane_types) != 0:
                for i in range(24):
                    dict_hour_with_conflict_propensity_id[str(i)].append(np.nan)
            else:
                for i in range(24):
                    proc_df_turn_conflict_propensity_scores_id_hourly_phase = (
                        proc_df_turn_conflict_propensity_scores_id_hourly
                        .query("hour == @i & phaseNo == @phase_no")
                    )
            
                    if len(proc_df_turn_conflict_propensity_scores_id_hourly_phase) == 0:
                        dict_hour_with_conflict_propensity_id[str(i)].append(0)
                    else:
                        proc_df_turn_conflict_propensity_scores_id_hourly_phase = (
                            proc_df_turn_conflict_propensity_scores_id_hourly_phase.reset_index(drop=True)
                        )
                        lower_ci = proc_df_turn_conflict_propensity_scores_id_hourly_phase.loc[0, "lowerCI"]
                        
                        if lower_ci >= threshold:
                            dict_hour_with_conflict_propensity_id[str(i)].append(1)
                        else:
                            dict_hour_with_conflict_propensity_id[str(i)].append(0)
    
        return pd.DataFrame(dict_hour_with_conflict_propensity_id)
    except:
        for phase_no in phase_nos_config:
            dict_hour_with_conflict_propensity_id["signalID"].append(signal_id)
            dict_hour_with_conflict_propensity_id["approachDir"].append("NA")
            dict_hour_with_conflict_propensity_id["phaseNo"].append(np.nan)
                                                                      
            for i in range(24):
                dict_hour_with_conflict_propensity_id[str(i)].append(np.nan)

        return pd.DataFrame(dict_hour_with_conflict_propensity_id)

###### LPI

In [None]:
# df_lpi_hour_with_conflict_propensity = pd.DataFrame()

# for signal_id in tqdm.tqdm(signal_ids):
#     df_lpi_hour_with_conflict_propensity_id = hour_with_conflict_propensity(signal_id=signal_id, metric="lpi")
#     df_lpi_hour_with_conflict_propensity = (
#         pd.concat([df_lpi_hour_with_conflict_propensity, df_lpi_hour_with_conflict_propensity_id], axis=0, ignore_index=True)
#     )

###### NRTOR

In [None]:
# df_nrtor_hour_with_conflict_propensity = pd.DataFrame()

# for signal_id in tqdm.tqdm(signal_ids):
#     df_nrtor_hour_with_conflict_propensity_id = hour_with_conflict_propensity(signal_id=signal_id, metric="nrtor")
#     df_nrtor_hour_with_conflict_propensity = (
#         pd.concat([df_nrtor_hour_with_conflict_propensity, df_nrtor_hour_with_conflict_propensity_id], axis=0, ignore_index=True)
#     )

##### Recommendation

###### LPI

In [None]:
# # LPI Recommendation
# df_lpi_recommendations = float_to_int(df_lpi_hour_with_conflict_propensity)
# df_lpi_recommendations.to_csv("../reports/recommendations/lpi_recommendation.csv", 
#                               index=False)

In [None]:
# proc_df_lpi_recommendations = (
#     df_lpi_recommendations[df_lpi_recommendations["signalID"] == "1500"]
# )

# # Create a 2x2 grid for the subplots
# fig = make_subplots(
#     rows=2,
#     cols=2,
#     subplot_titles=[f"Phase {phase}" for phase in proc_df_lpi_recommendations["phaseNo"].unique()],
#     shared_xaxes=False,
#     # shared_yaxes=False,
#     vertical_spacing=0.6
# )

# # Define the color mapping for LPI recommendations
# colors = {0: "#27ae60", 1: "#cb4335"}  # Green: LPI Not Needed, Red: LPI Needed

# # Add a heatmap for each phase in the 2x2 layout
# phase_nos = proc_df_lpi_recommendations["phaseNo"].unique()

# for idx, phase_no in enumerate(phase_nos, start=1):
#     # Get subplot row and column
#     row, col = divmod(idx - 1, 2)
#     row += 1
#     col += 1

#     # Filter data for the current phase
#     proc_df_lpi_recommendations_phase = (
#         proc_df_lpi_recommendations[proc_df_lpi_recommendations["phaseNo"] == phase_no].iloc[:, 3:]
#     )  # Extract hourly binary data
    
#     lpi_recommendations_phase = proc_df_lpi_recommendations_phase.values  # Convert to matrix
#     x_labels = list(range(24))  # Hours
#     y_labels = [
#         f"{row['signalID']} ({row['approachDir']})" for _, row in proc_df_lpi_recommendations[proc_df_lpi_recommendations["phaseNo"] == phase_no].iterrows()
#     ]

#     # Add heatmap trace
#     fig.add_trace(
#         go.Heatmap(
#             z=lpi_recommendations_phase,
#             x=x_labels,
#             y=y_labels,
#             colorscale=[[0, "#27ae60"], [1, "#cb4335"]],  # Define two discrete colors
#             showscale=False,  # Disable color bar
#             zmin=0,
#             zmax=0.5,
#         ),
#         row=row,
#         col=col,
#     )

# # Add a legend manually
# fig.add_trace(
#     go.Scatter(
#         x=[None], y=[None],
#         mode="markers",
#         marker=dict(size=14, color="#27ae60"),
#         name="LPI Not Recommended",
#     )
# )
# fig.add_trace(
#     go.Scatter(
#         x=[None], y=[None],
#         mode="markers",
#         marker=dict(size=14, color="#cb4335"),
#         name="LPI Recommended",
#     )
# )

# # Update layout
# fig.update_layout(
#     height=400,
#     width=1400,
#     # title="Hourly Signal PR Recommendations for Each Phase",
#     # yaxis=dict(title="Signal ID (Direction)"),
# )

# # Update layout with legend font size
# fig.update_layout(
#     legend=dict(
#         orientation="h",  # Horizontal legend
#         x=0.5,            # Center horizontally
#         y=-0.4,           # Position below the plot
#         xanchor="center",
#         yanchor="top",
#         font=dict(size=14),  # Set font size for the legend items
#         # title=dict(text="Status", font=dict(size=14)),  # Title with font size
#     )
# )

# # Customize x-axis for each subplot
# for i in range(1, len(phase_nos) + 1):
#     fig.update_xaxes(
#         title="Hour of Day",
#         tickmode="array",
#         tickvals=list(range(24)),
#         ticktext=[f"{hour:02d}:00" for hour in range(24)],
#         row=((i - 1) // 2) + 1,
#         col=((i - 1) % 2) + 1,
#         tickangle=-90,
#         title_font=dict(size=14),
#         tickfont=dict(size=14),
#     )

#     fig.update_yaxes(
#         title_font=dict(size=16),
#         tickfont=dict(size=14),
#     )

# fig.show()

###### NRTOR

In [None]:
# # NRTOR Recommendation
# df_nrtor_recommendations = float_to_int(df_nrtor_hour_with_conflict_propensity)
# df_nrtor_recommendations.to_csv("../reports/recommendations/nrtor_recommendation.csv", 
#                               index=False)

In [None]:
# proc_df_nrtor_recommendations = (
#     df_nrtor_recommendations[df_nrtor_recommendations["signalID"] == "1500"]
# )

# # Create a 2x2 grid for the subplots
# fig = make_subplots(
#     rows=2,
#     cols=2,
#     subplot_titles=[f"Phase {phase}" for phase in proc_df_nrtor_recommendations["phaseNo"].unique()],
#     shared_xaxes=False,
#     # shared_yaxes=False,
#     vertical_spacing=0.6
# )

# # Define the color mapping for NRTOR recommendations
# colors = {0: "#27ae60", 1: "#cb4335"}  # Green: NRTOR Not Needed, Red: NRTOR Needed

# # Add a heatmap for each phase in the 2x2 layout
# phase_nos = proc_df_nrtor_recommendations["phaseNo"].unique()

# for idx, phase_no in enumerate(phase_nos, start=1):
#     # Get subplot row and column
#     row, col = divmod(idx - 1, 2)
#     row += 1
#     col += 1

#     # Filter data for the current phase
#     proc_df_nrtor_recommendations_phase = (
#         proc_df_nrtor_recommendations[proc_df_nrtor_recommendations["phaseNo"] == phase_no].iloc[:, 3:]
#     )  # Extract hourly binary data
    
#     nrtor_recommendations_phase = proc_df_nrtor_recommendations_phase.values  # Convert to matrix
#     x_labels = list(range(24))  # Hours
#     y_labels = [
#         f"{row['signalID']} ({row['approachDir']})" for _, row in proc_df_nrtor_recommendations[proc_df_nrtor_recommendations["phaseNo"] == phase_no].iterrows()
#     ]

#     # Add heatmap trace
#     fig.add_trace(
#         go.Heatmap(
#             z=nrtor_recommendations_phase,
#             x=x_labels,
#             y=y_labels,
#             colorscale=[[0, "#27ae60"], [1, "#cb4335"]],  # Define two discrete colors
#             showscale=False,  # Disable color bar
#             zmin=0,
#             zmax=0.5,
#         ),
#         row=row,
#         col=col,
#     )

# # Add a legend manually
# fig.add_trace(
#     go.Scatter(
#         x=[None], y=[None],
#         mode="markers",
#         marker=dict(size=14, color="#27ae60"),
#         name="NRTOR Not Recommended",
#     )
# )
# fig.add_trace(
#     go.Scatter(
#         x=[None], y=[None],
#         mode="markers",
#         marker=dict(size=14, color="#cb4335"),
#         name="NRTOR Recommended",
#     )
# )

# # Update layout
# fig.update_layout(
#     height=400,
#     width=1400,
#     # title="Hourly Signal PR Recommendations for Each Phase",
#     # yaxis=dict(title="Signal ID (Direction)"),
# )

# # Update layout with legend font size
# fig.update_layout(
#     legend=dict(
#         orientation="h",  # Horizontal legend
#         x=0.5,            # Center horizontally
#         y=-0.4,           # Position below the plot
#         xanchor="center",
#         yanchor="top",
#         font=dict(size=14),  # Set font size for the legend items
#         # title=dict(text="Status", font=dict(size=14)),  # Title with font size
#     )
# )

# # Customize x-axis for each subplot
# for i in range(1, len(phase_nos) + 1):
#     fig.update_xaxes(
#         title="Hour of Day",
#         tickmode="array",
#         tickvals=list(range(24)),
#         ticktext=[f"{hour:02d}:00" for hour in range(24)],
#         row=((i - 1) // 2) + 1,
#         col=((i - 1) % 2) + 1,
#         tickangle=-90,
#         title_font=dict(size=14),
#         tickfont=dict(size=14),
#     )

#     fig.update_yaxes(
#         title_font=dict(size=16),
#         tickfont=dict(size=14),
#     )

# fig.show()

#### v2

##### Calculation of Pedestrian-Vehicle (Right-Turn) Conflict Propensity

In [16]:
def hourly_aggregate(df):
    """
    Aggregates a DataFrame to calculate hourly statistics.

    Parameters
    ----------
    df : pd.DataFrame
        The input DataFrame containing cycle-level data.

    Returns
    -------
    pd.DataFrame
        The aggregated DataFrame with hourly statistics.
    """
    df = df.copy()
    
    # Define column categories for aggregation
    columns = [col for col in df.columns if any(key in col for key in ["Duration", "Occupancy"])]  
    
    df[columns] = df[columns].replace(0, np.nan)
    
    # Prepare the aggregation dictionary
    agg_dict = {
        **{col: "mean" for col in columns}  
    }

    # Perform the aggregation by grouping by signalID, date, and hour
    df_hourly = df.groupby(["signalID", "date", "hour"]).agg(agg_dict).reset_index()

    # Filter and sort the columns
    columns = [
        column for column in df_hourly.columns
        for param in ["Duration", "Occupancy"]
        if param in column
    ]
    columns = sorted(columns, key=lambda x: x.split("Phase")[-1][0])  # Sort columns by phase number

    # Reorganize DataFrame columns to make it consistent and intuitive
    df_hourly = df_hourly[["signalID", "date", "hour"] + columns]

    # Return the aggregated DataFrame
    return df_hourly

In [179]:
def compute_diminishing_returns(array, k=0.1):
    """
    Apply diminishing returns using an exponential decay function, returning 0 for NaN values.

    Parameters:
        array (np.ndarray): The input array of values.
        k (float): The decay rate (higher values result in faster diminishing returns).
    
    Returns:
        np.ndarray: Transformed array with diminishing returns applied, with NaN values returning 0.
    """
    if not isinstance(array, np.ndarray):
        raise ValueError("Input must be a NumPy array.")
    
    # Replace NaN values with 0 before computing diminishing returns
    array = np.nan_to_num(array, nan=0)
    
    # Apply the diminishing return formula: 1 - e^(-k * x)
    return 1 - np.exp(-k * array)

In [210]:
def compute_sigmoid_scaling(array, k=0.1, C=None):
    """
    Apply sigmoid transformation to scale values between 0 and 1.

    Parameters:
        array (np.ndarray): The input array of values.
        k (float): The scaling factor controlling sensitivity of the sigmoid function.
        C (float, optional): The threshold parameter. 

    Returns:
        np.ndarray: Transformed array with sigmoid scaling applied.
    """
    if not isinstance(array, np.ndarray):
        raise ValueError("Input must be a NumPy array.")
    
    # Replace NaN values with 0 before computing sigmoid scaling
    array = np.nan_to_num(array, nan=0)

    # Set threshold C if not provided
    if C is None:
        # C = np.median(array)
        C = np.percentile(array, 1)  # Use 75th percentile instead of median

    # Apply sigmoid function: 1 / (1 + e^(-k * (x - C)))
    return 1 / (1 + np.exp(-k * (array - C)))


In [291]:
def compute_conflict_propensity_scores(df: pd.DataFrame, dict_lane_type_weights: dict = None, k: float = None):
    """
    Computes conflict propensity scores for each phase in each row of the DataFrame,
    considering all lanes and combining them into phase-level scores using weighted averages.

    Parameters
    ----------
    df : pd.DataFrame
        DataFrame with dynamically generated columns representing phases, e.g.,
        'vehicleOccupancyPhase4R', 'vehicleActivityPhase4R', etc.

    dict_lane_type_weights : dict, optional
        Dictionary mapping lane type suffixes (e.g., 'R', 'TR', 'LTR') to numeric weights.
        Example:
            {
                "R":   1.0,    # Right-only
                "TR":  0.5,    # Through+Right
                "LTR": 0.33    # Left+Through+Right
            }
        If None, default weights will be used.
        
    k (float): The decay rate (higher values result in faster diminishing returns).

    Returns
    -------
    pd.DataFrame
        A DataFrame with phase-level conflict propensity scores for each row.
    """
    # df_turn_conflict_propensity_hourly = concat_df()
    
    # Assign default lane type weights if none are provided
    if dict_lane_type_weights is None:
        dict_lane_type_weights = {
            "R":   1.0,   # Right-only
            "TR":  0.5,   # Through+Right
            "LTR": 0.33   # Left+Through+Right
        }

    # Extract column names and identify unique phases dynamically
    columns = df.columns
    phase_nos = sorted(
        {int(col.split("Phase")[-1][0]) for col in columns if "Phase" in col and col.split("Phase")[-1][0].isdigit()}
    )
    phase_nos = [phase_no for phase_no in phase_nos if phase_no in [2, 4, 6, 8]]

    # Initialize a dictionary to store conflict propensity scores for each phase
    dict_conflict_propensity_scores = {
        f"turnConflictPropensityPhase{phase_no}": [] for phase_no in phase_nos
    }

    # Compute conflict propensity scores for each phase
    for phase_no in phase_nos:
        weighted_lane_scores = np.zeros(len(df))  # Initialize weighted scores
        total_lane_weight = 0  # Initialize total lane weight for normalization

        # Calculate total lane weights for normalization
        for lane_type, lane_weight in dict_lane_type_weights.items():
            vehicle_occupancy_col = f"vehicleOccupancyPhase{phase_no}{lane_type}"

            # Only consider lanes that exist in the dataset
            if any(col in columns for col in [vehicle_occupancy_col]):
                total_lane_weight += lane_weight

        # Safeguard: Skip if no valid lanes exist
        if total_lane_weight == 0:
            continue

        # Process each lane type
        for lane_type, lane_weight in dict_lane_type_weights.items():
            vehicle_occupancy_col = f"vehicleOccupancyPhase{phase_no}{lane_type}"
            pedestrian_activity_duration_col = f"pedestrianActivityDurationPhase{phase_no}"

            # Ensure pedestrian activity columns exist
            if pedestrian_activity_duration_col not in columns or vehicle_occupancy_col not in columns:
                continue

            # Calculate exposure 
            pedestrian_activity_durations = df[pedestrian_activity_duration_col].values
            vehicle_occupancies = df[vehicle_occupancy_col].values

            # exposures = pedestrian_activity_durations * vehicle_occupancies
            # exposures = np.sqrt(pedestrian_activity_durations * vehicle_occupancies)
            exposures = (
                (2 * (pedestrian_activity_durations * vehicle_occupancies)) / (pedestrian_activity_durations + vehicle_occupancies)
            )

            # # Determine k
            # all_exposures = (
            #     df_turn_conflict_propensity_hourly[pedestrian_activity_duration_col].values * 
            #     df_turn_conflict_propensity_hourly[vehicle_occupancy_col].values
            # )

            # k = 1 / np.nanmax(all_exposures) if np.nanmax(all_exposures) > 0 else 0

            # Calculate dimishing return
            diminishing_returns = compute_diminishing_returns(array=exposures, k=k)

            # Calculate lane-level scores
            lane_scores = lane_weight * diminishing_returns
            # lane_scores = diminishing_returns

            # # Calculate dimishing return
            # sigmoid_scaling = compute_sigmoid_scaling(array=exposures, k=k)

            # # Calculate lane-level scores
            # lane_scores = lane_weight * sigmoid_scaling

            # Compute weighted lane-level scores
            weighted_lane_scores = lane_scores * (lane_weight / total_lane_weight)

        # Store phase-level scores
        dict_conflict_propensity_scores[f"turnConflictPropensityPhase{phase_no}"] = weighted_lane_scores.tolist()
        # dict_conflict_propensity_scores[f"turnConflictPropensityPhase{phase_no}"] = exposures.tolist()

    # Return the final DataFrame with conflict propensity scores
    return pd.DataFrame(dict_conflict_propensity_scores)

In [263]:
def caluate_ci_with_conflict_propensity(signal_id: str, k: float = None):
    df_turn_conflict_propensity_id = load_data(
        dirpath="../data/production/atspm/fdot_d5/feature_extraction/feature/cycle/pedestrian_traffic/turn_conflict_intensity",
        signal_id=f"{signal_id}"
    )
    df_turn_conflict_propensity_id["hour"] = df_turn_conflict_propensity_id["cycleBegin"].dt.hour
    
    columns = (
        [col for col in df_turn_conflict_propensity_id.columns if any(key in col for key in ["signalID", "date", "hour", "Duration", "Occupancy"])]
    )
    columns = sorted(columns, key=lambda x: x[0] + x[-1])
    
    df_turn_conflict_propensity_id = df_turn_conflict_propensity_id[columns]
    
    df_turn_conflict_propensity_id_hourly = hourly_aggregate(df=df_turn_conflict_propensity_id)
    df_turn_conflict_propensity_scores_id_hourly = (
        compute_conflict_propensity_scores(df=df_turn_conflict_propensity_id_hourly, k=k)
    )
    columns = df_turn_conflict_propensity_scores_id_hourly.columns.tolist()
    
    df_turn_conflict_propensity_scores_id_hourly["signalID"] = df_turn_conflict_propensity_id_hourly[["signalID"]]
    df_turn_conflict_propensity_scores_id_hourly["date"] = df_turn_conflict_propensity_id_hourly[["date"]]
    df_turn_conflict_propensity_scores_id_hourly["hour"] = df_turn_conflict_propensity_id_hourly[["hour"]]
    
    df_turn_conflict_propensity_scores_id_hourly = (
        pd.melt(df_turn_conflict_propensity_scores_id_hourly, 
                id_vars=["signalID", "date", "hour"], 
                value_vars=columns, var_name="phaseNo", value_name="turnConflictPropensity")
    )
    
    df_turn_conflict_propensity_scores_id_hourly["phaseNo"] = (
        df_turn_conflict_propensity_scores_id_hourly["phaseNo"]
        .str
        .extract(r"Phase(\d)")
        .astype(int)
    )
    
    df_turn_conflict_propensity_scores_id_hourly = (
        df_turn_conflict_propensity_scores_id_hourly[df_turn_conflict_propensity_scores_id_hourly["turnConflictPropensity"] > 0]
    )

    # Group by `phaseNo` and `hour` and calculate bootstrap confidence intervals and mean
    turn_conflict_propensity_scores_id = []
    for (signal_id, hour, phase_no), group in df_turn_conflict_propensity_scores_id_hourly.groupby(["signalID", "hour", "phaseNo"]):
        values = group["turnConflictPropensity"].values
        lower, upper = bootstrap_ci(values)
        mean_value = np.mean(values)  # Calculate the mean
        turn_conflict_propensity_scores_id.append(
            {
                "signalID": signal_id, "hour": hour, "phaseNo": phase_no, 
                "turnConflictPropensityAvg": mean_value, "lowerCI": lower, "upperCI": upper
            }
        )
    
    # Create a results DataFrame
    proc_df_turn_conflict_propensity_scores_id_hourly = pd.DataFrame(turn_conflict_propensity_scores_id)

    return df_turn_conflict_propensity_scores_id_hourly, proc_df_turn_conflict_propensity_scores_id_hourly

##### Identification of Critical Hours

In [301]:
# def concat_df():
#     df_turn_conflict_propensity_hourly = pd.DataFrame()
    
#     for signal_id in signal_ids:
#         try:
#             df_turn_conflict_propensity_id = load_data(
#                 dirpath="../data/production/atspm/fdot_d5/feature_extraction/feature/cycle/pedestrian_traffic/turn_conflict_intensity",
#                 signal_id=f"{signal_id}"
#             )
#             df_turn_conflict_propensity_id["hour"] = df_turn_conflict_propensity_id["cycleBegin"].dt.hour
            
#             columns = (
#                 [col for col in df_turn_conflict_propensity_id.columns if any(key in col for key in ["signalID", "date", "hour", "Duration", "Occupancy"])]
#             )
#             columns = sorted(columns, key=lambda x: x[0] + x[-1])
            
#             df_turn_conflict_propensity_id = df_turn_conflict_propensity_id[columns]
#             df_turn_conflict_propensity_id_hourly = hourly_aggregate(df=df_turn_conflict_propensity_id)

#             df_turn_conflict_propensity_hourly = pd.concat(
#                 [df_turn_conflict_propensity_hourly, df_turn_conflict_propensity_id_hourly], 
#                 axis=0, ignore_index=True
#             )
#         except:
#             print(signal_id)

#     return df_turn_conflict_propensity_hourly

In [300]:
# df_turn_conflict_propensity_scores_id_hourly, proc_df_turn_conflict_propensity_scores_id_hourly = (
#     caluate_ci_with_conflict_propensity(signal_id="1500", k=0.01)
# )
# phase_nos = proc_df_turn_conflict_propensity_scores_id_hourly["phaseNo"].unique().tolist()
# phase_nos = [2, 6, 4, 8]
        
# # Create a 2x2 grid for subplots
# fig = make_subplots(
#     rows=2, cols=2,
#     subplot_titles=[f"Phase {phase_no}" for phase_no in phase_nos],
#     shared_xaxes=False, shared_yaxes=False,
#     vertical_spacing=0.15
# )

# # Iterate over phases and add traces
# row_col_mapping = [(1, 1), (1, 2), (2, 1), (2, 2)]  # Map phases to subplot positions
# for idx, (phase_no, (row, col)) in enumerate(zip(phase_nos, row_col_mapping)):
#     proc_df_turn_conflict_propensity_scores_id_hourly_phase = (
#         proc_df_turn_conflict_propensity_scores_id_hourly[proc_df_turn_conflict_propensity_scores_id_hourly["phaseNo"] == phase_no]
#     )

#     turn_conflict_propensity_scores = (
#         proc_df_turn_conflict_propensity_scores_id_hourly
#         .loc[:, "turnConflictPropensityAvg"]
#         .values
#     )
#     # turn_conflict_propensity_scores = (
#     #     # df_turn_conflict_propensity_scores_id_hourly[df_turn_conflict_propensity_scores_id_hourly["phaseNo"] == phase_no]
#     #     df_turn_conflict_propensity_scores_id_hourly
#     #     .loc[:, "turnConflictPropensity"]
#     #     .values
#     # )
    
#     # Calculate error bars
#     yerr_lower = proc_df_turn_conflict_propensity_scores_id_hourly_phase["turnConflictPropensityAvg"] - proc_df_turn_conflict_propensity_scores_id_hourly_phase["lowerCI"]
#     yerr_upper = proc_df_turn_conflict_propensity_scores_id_hourly_phase["upperCI"] - proc_df_turn_conflict_propensity_scores_id_hourly_phase["turnConflictPropensityAvg"]

#     # Add scatter plot with error bars
#     fig.add_trace(
#         go.Scatter(
#             x=proc_df_turn_conflict_propensity_scores_id_hourly_phase["hour"],
#             y=proc_df_turn_conflict_propensity_scores_id_hourly_phase["turnConflictPropensityAvg"],
#             error_y=dict(
#                 type="data",
#                 symmetric=False,
#                 array=yerr_upper,
#                 arrayminus=yerr_lower,
#                 color="blue",
#                 thickness=1.5,
#                 width=3,
#             ),
#             mode="markers+lines",
#             marker=dict(size=8),
#             name=f"Phase {phase_no}",
#         ),
#         row=row, col=col
#     )

#     # Add a horizontal threshold line
#     fig.add_shape(
#         type="line",
#         x0=0,
#         x1=23,
#         y0=0.5,
#         y1=0.5,
#         line=dict(color="red", dash="dash"),
#         row=row, col=col
#     )

# # Update layout
# fig.update_layout(
#     # title="Proportion of Cycles Recommended with PR by Hour with 95% Confidence Intervals",
#     height=800,
#     width=1400,
#     showlegend=False,
#     xaxis_title="Hour of Day",
#     yaxis_title="Conflict Propensity",
#     # yaxis=dict(range=[0, 1]),  # Set y-axis range
#     font=dict(size=14)
# )

# # Update axis labels for shared x/y axes
# fig.update_xaxes(
#     title_text="Hour of Day",
#     # dtick=1,
#     # tickvals=list(range(24)),
#     # ticktext=[f"{hour}:00" for hour in range(24)],
#     # tickformat="%H:00",
#     # tickangle=-45,
#     # row=2, col=1,  # Shared x-axis title position
#     title_font=dict(size=16),
#     tickfont=dict(size=16),
# )

# # Update y-axes for shared configuration
# fig.update_yaxes(
#     title_text="Conflict Propensity", 
#     range=[0, 1],  # Set y-axis range for all subplots
#     # tickvals=np.linspace(0, 1, 11),  # Tick values from 0 to 1 with step 0.1
#     title_font=dict(size=16),
#     tickfont=dict(size=16),
# )

# # Export the Plotly figure as a high-resolution image
# fig.write_image("../reports/7.1.png", width=1400, height=800, scale=2)

# # Show plot
# fig.show()

In [323]:
def critical_hour_with_conflict_propensity(signal_id, threshold, k):
    dict_hour_with_conflict_propensity_id = {"signalID": [], "approachDir": [], "phaseNo": []}
    dict_hour_with_conflict_propensity_id.update(
        {str(hour): [] for hour in range(24)}
    )
    df_config_id = pd.read_csv(f"../data/interim/atspm/fdot_d5/signal_config/{signal_id}.csv")
    
    phase_nos_config = df_config_id["phaseNo"].unique().tolist()
    phase_nos_config = [int(phase_no) for phase_no in phase_nos_config if pd.notna(phase_no) and phase_no % 2 == 0]
    
    try:
        _, proc_df_turn_conflict_propensity_scores_id_hourly = (
            caluate_ci_with_conflict_propensity(signal_id=signal_id, k=k)
        )

        proc_df_turn_conflict_propensity_scores_id_hourly = (
            proc_df_turn_conflict_propensity_scores_id_hourly
            [proc_df_turn_conflict_propensity_scores_id_hourly["phaseNo"].isin([2, 4, 6, 8])]
            .reset_index(drop=True)
        )
        
        phase_nos_conflict_propensity = proc_df_turn_conflict_propensity_scores_id_hourly["phaseNo"].unique().tolist()
        phase_nos_conflict_propensity = [int(phase_no) for phase_no in phase_nos_conflict_propensity]
        
        sequence = [2, 6, 4, 8]
        phase_nos = sorted(list(set(phase_nos_config + phase_nos_conflict_propensity)), key=lambda x: sequence.index(x))
        
        for phase_no in phase_nos:
            dict_hour_with_conflict_propensity_id["signalID"].append(signal_id)
    
            if phase_no in phase_nos_config:
                approach = df_config_id[df_config_id["phaseNo"] == phase_no]["approach"].unique()[0]
                approach_dir = df_config_id[df_config_id["phaseNo"] == phase_no]["approach"].unique()[0].split()[-1][0:-1]
            else:
                approach = "NA"
                approach_dir = "NA"
        
            dict_hour_with_conflict_propensity_id["approachDir"].append(approach_dir)
            dict_hour_with_conflict_propensity_id["phaseNo"].append(phase_no)
    
            lane_types = df_config_id[df_config_id["phaseNo"] == phase_no]["laneType"].unique().tolist()
            lane_types = [lane_type for lane_type in lane_types if "Right" in lane_type and len(lane_type.split()) == 1]
    
            if approach != "NA" and len(lane_types) != 0:
                stop_bar_distances = (
                    df_config_id.query("approach == @approach and laneType == @lane_types[-1]")["stopBarDistance"].values.tolist()
                ) 
                # if stop_bar_distances == []:
                #     stop_bar_distances = [np.nan]
            else:
                stop_bar_distances = (
                    df_config_id.query("phaseNo == @phase_nos")["stopBarDistance"].values.tolist()
                ) 
    
            if (all(pd.isna(stop_bar_distance) for stop_bar_distance in stop_bar_distances) or approach == "NA") and len(lane_types) != 0:
                for i in range(24):
                    dict_hour_with_conflict_propensity_id[str(i)].append(np.nan)
            else:
                for i in range(24):
                    proc_df_turn_conflict_propensity_scores_id_hourly_phase = (
                        proc_df_turn_conflict_propensity_scores_id_hourly
                        .query("hour == @i & phaseNo == @phase_no")
                    )
            
                    if len(proc_df_turn_conflict_propensity_scores_id_hourly_phase) == 0:
                        dict_hour_with_conflict_propensity_id[str(i)].append(0)
                    else:
                        proc_df_turn_conflict_propensity_scores_id_hourly_phase = (
                            proc_df_turn_conflict_propensity_scores_id_hourly_phase.reset_index(drop=True)
                        )
                        lower_ci = proc_df_turn_conflict_propensity_scores_id_hourly_phase.loc[0, "lowerCI"]
                        
                        if lower_ci >= threshold:
                            dict_hour_with_conflict_propensity_id[str(i)].append(1)
                        else:
                            dict_hour_with_conflict_propensity_id[str(i)].append(0)
    
        return pd.DataFrame(dict_hour_with_conflict_propensity_id)
    except:
        for phase_no in phase_nos_config:
            dict_hour_with_conflict_propensity_id["signalID"].append(signal_id)
            dict_hour_with_conflict_propensity_id["approachDir"].append("NA")
            dict_hour_with_conflict_propensity_id["phaseNo"].append(np.nan)
                                                                      
            for i in range(24):
                dict_hour_with_conflict_propensity_id[str(i)].append(np.nan)

        return pd.DataFrame(dict_hour_with_conflict_propensity_id)

In [324]:
df_critical_hour_with_conflict_propensity = pd.DataFrame()

for signal_id in tqdm.tqdm(signal_ids):
    df_critical_hour_with_conflict_propensity_id = critical_hour_with_conflict_propensity(
        signal_id=signal_id, threshold=0.5, k=0.025
    )
    df_critical_hour_with_conflict_propensity = (
        pd.concat(
            [df_critical_hour_with_conflict_propensity, df_critical_hour_with_conflict_propensity_id], 
            axis=0, ignore_index=True)
    )

100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 19/19 [00:15<00:00,  1.20it/s]


##### Recommendation

In [327]:
df_lpi_nrtor_recommendations = float_to_int(df_critical_hour_with_conflict_propensity)
df_lpi_nrtor_recommendations.to_csv("../reports/recommendations/lpi_nrtor_recommendations.csv", 
                                    index=False)

In [322]:
# proc_df_lpi_nrtor_recommendations = df_lpi_nrtor_recommendations[df_lpi_nrtor_recommendations["signalID"] == "1500"]

# # Create a 2x2 grid for the subplots
# fig = make_subplots(
#     rows=2,
#     cols=2,
#     subplot_titles=[f"Phase {phase}" for phase in proc_df_lpi_nrtor_recommendations["phaseNo"].unique()],
#     shared_xaxes=False,
#     # shared_yaxes=False,
#     vertical_spacing=0.6
# )

# # Define the color mapping for PR recommendations
# colors = {0: "#27ae60", 1: "#cb4335"}  # Green: PR Not Needed, Red: PR Needed

# # Add a heatmap for each phase in the 2x2 layout
# phase_nos = proc_df_lpi_nrtor_recommendations["phaseNo"].unique()

# for idx, phase_no in enumerate(phase_nos, start=1):
#     # Get subplot row and column
#     row, col = divmod(idx - 1, 2)
#     row += 1
#     col += 1

#     # Filter data for the current phase
#     proc_df_lpi_nrtor_recommendations_phase = (
#         proc_df_lpi_nrtor_recommendations[proc_df_lpi_nrtor_recommendations["phaseNo"] == phase_no].iloc[:, 3:]
#     )  # Extract hourly binary data
    
#     lpi_nrtor_recommendations_phase = proc_df_lpi_nrtor_recommendations_phase.values  # Convert to matrix
#     x_labels = list(range(24))  # Hours
#     y_labels = [
#         f"{row['signalID']} ({row['approachDir']})" for _, row in proc_df_lpi_nrtor_recommendations[proc_df_lpi_nrtor_recommendations["phaseNo"] == phase_no].iterrows()
#     ]

#     # Add heatmap trace
#     fig.add_trace(
#         go.Heatmap(
#             z=lpi_nrtor_recommendations_phase,
#             x=x_labels,
#             y=y_labels,
#             colorscale=[[0, "#27ae60"], [1, "#cb4335"]],  # Define two discrete colors
#             showscale=False,  # Disable color bar
#             zmin=0,
#             zmax=0.5,
#         ),
#         row=row,
#         col=col,
#     )

# # Add a legend manually
# fig.add_trace(
#     go.Scatter(
#         x=[None], y=[None],
#         mode="markers",
#         marker=dict(size=14, color="#27ae60"),
#         name="NRTOR Not Recommended",
#     )
# )
# fig.add_trace(
#     go.Scatter(
#         x=[None], y=[None],
#         mode="markers",
#         marker=dict(size=14, color="#cb4335"),
#         name="NRTOR Recommended",
#     )
# )

# # Update layout
# fig.update_layout(
#     height=400,
#     width=1400,
#     # title="Hourly Signal PR Recommendations for Each Phase",
#     # yaxis=dict(title="Signal ID (Direction)"),
# )

# # Update layout with legend font size
# fig.update_layout(
#     legend=dict(
#         orientation="h",  # Horizontal legend
#         x=0.5,            # Center horizontally
#         y=-0.4,           # Position below the plot
#         xanchor="center",
#         yanchor="top",
#         font=dict(size=14),  # Set font size for the legend items
#         # title=dict(text="Status", font=dict(size=14)),  # Title with font size
#     )
# )

# # Customize x-axis for each subplot
# for i in range(1, len(phase_nos) + 1):
#     fig.update_xaxes(
#         title="Hour of Day",
#         tickmode="array",
#         tickvals=list(range(24)),
#         ticktext=[f"{hour:02d}:00" for hour in range(24)],
#         row=((i - 1) // 2) + 1,
#         col=((i - 1) % 2) + 1,
#         tickangle=-90,
#         title_font=dict(size=14),
#         tickfont=dict(size=14),
#     )

#     fig.update_yaxes(
#         title_font=dict(size=16),
#         tickfont=dict(size=14),
#     )

# # Export the figure as a high-resolution image
# fig.write_image("../reports/7.2(b).png",  width=1400, height=400, scale=2)

# fig.show()