In [None]:
import os
from typing import Callable, List, NamedTuple
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pickle
from tqdm import tqdm_notebook as tqdm
from scipy.special import erf
from domain_model import DocumentManagement, StateVariable
from simulation import ACC, idm_approaching_pars, IDMPlus, acc_approaching_pars, \
    SimulationApproaching, ACCHDM, acc_idm_approaching_pars, HDM, hdm_approaching_pars, \
    acc_lead_braking_pars, acc_hdm_lead_braking_pars, idm_lead_braking_pars, SimulationLeadBraking, \
    SimulationString, SimulationCutIn, idm_cutin_pars, acc_cutin_pars, acc_idm_cutin_pars
from stats import KDE, kde_from_file
from case_study_approaching import check_validity_approaching
from case_study_lead_braking import check_validity_lead_braking

In [None]:
OVERWRITE = False

## Show number of scenarios found

In [None]:
leaddec = DocumentManagement(os.path.join("data", "5_scenarios", "lead_braking2.json"))
print("Number of lead vehicle decelerating scenarios: {:d}"
      .format(len(leaddec.collections["scenario"])))

In [None]:
approaching = DocumentManagement(os.path.join("data", "5_scenarios", "approaching_vehicle2.json"))
print("Number of approaching slower vehicle scenarios: {:d}"
      .format(len(approaching.collections["scenario"])))

In [None]:
cutins = DocumentManagement(os.path.join("data", "5_scenarios", "cut_in_scenarios2.json"))
print("Number of cut-in scenarios: {:d}".format(len(cutins.collections["scenario"])))

## Simulations

#### Approaching stopped vehicle (false positive) with 2 vehicles

In [None]:
s = SimulationApproaching([ACC(), IDMPlus()], [acc_approaching_pars, idm_approaching_pars],
                          min_simulation_time=5)

In [None]:
s.simulation(dict(vego=20, ratio_vtar_vego=0, amin=-6, init_position=20, reactiontime=13),
             plot=True, ignore_stop=[True, False])

## Estimating risk for false positives

A vehicle is detected at distance X
Ego is at speed V
Vehicle behind with same speed (modelled using IDM+, with delay)

In [None]:
mu_lognormal = np.log(.92**2/np.sqrt(.92**2+.28**2))
sigma_lognormal = np.sqrt(np.log(1+.28**2/.92**2))
def pdf(x):
    return (np.exp(-(np.log(x) - mu_lognormal)**2 / (2*sigma_lognormal**2)) / 
            (x*sigma_lognormal*np.sqrt(2*np.pi)))
def cdf(x):
    return 0.5*(1 + erf((np.log(x) - mu_lognormal) / (sigma_lognormal*np.sqrt(2))))
def sample_reactiontime():
    return np.random.lognormal(np.log(.92**2/np.sqrt(.92**2+.28**2)),
                               np.sqrt(np.log(1+.28**2/.92**2)))

In [None]:
simulation = SimulationApproaching([ACC(), IDMPlus()], 
                                   [acc_approaching_pars, idm_approaching_pars],
                                   min_simulation_time=5)
def fp_simulation(speed, distance, reactiontime, thw):
     return simulation.simulation(dict(vego=speed, ratio_vtar_vego=0, thw=thw,
                                       amin=-6, init_position=distance, reactiontime=reactiontime),
                                  ignore_stop=[True, False])

def is_collision(speed, distance, reactiontime, thw):
    result = fp_simulation(speed, distance, reactiontime, thw)
    if result[1] < 0.0:
        return True
    return False

In [None]:
def prob_of_collision(speed, distance, thw):
    minimum = 0.3
    maximum = 4
    if is_collision(speed, distance, minimum, thw):
        return 1.0
    if not is_collision(speed, distance, maximum, thw):
        return 0.0
    while cdf(maximum) - cdf(minimum) > 0.00001:
        if is_collision(speed, distance, (maximum + minimum) / 2, thw):
            maximum = (maximum + minimum) / 2
        else:
            minimum = (maximum + minimum) / 2
    return 1 - cdf((maximum + minimum) / 2)

In [None]:
def make_heatplot_fp(dmin, dmax, vmin, vmax, thw=1.0, overwrite=False):
    name = "fp_prob_{:d}_{:d}_{:d}_{:d}_{:02.0f}".format(dmin, dmax, vmin, vmax, 10*thw)
    filename = os.path.join("data", "7_simulation_results", "{:s}.p".format(name))
    
    distances = np.linspace(dmin, dmax, 50)
    speeds = np.linspace(vmin, vmax, 50)
        
    if not overwrite and os.path.exists(filename):
        with open(filename, "rb") as file:
            prob_collision = pickle.load(file)
    else:
        prob_collision = np.zeros((len(distances), len(speeds)))
        for i, distance in enumerate(tqdm(distances)):
            for j, speed in enumerate(speeds):
                prob_collision[i, j] = prob_of_collision(speed, distance, thw)
        with open(filename, "wb") as file:
            pickle.dump(prob_collision, file)
    
    f, ax = plt.subplots(1, 1)
    heatmap = plt.contourf(speeds, distances, prob_collision, cmap='hot')
    cbar = f.colorbar(heatmap)
    plt.title("THW of follower is {:3.1f} s".format(thw))
    plt.xlabel("Speed [m/s]")
    plt.ylabel("Initial distance of false positive [m]")
    cbar.ax.set_ylabel("Probability of collision rear vehicle")
    
    f.savefig(os.path.join("figs", "{:s}.png".format(name)))

False P (resulting from triggering condition)
 - Obtain impact speed heatmap
False N (resulting from triggering condition)  (3 -> 2 (ego) -> 1 (stationary, not detection))
 - Focus on this one

Low-mu (triggering condition) -> lead vehicle braking, approaching, cut-in, cut-out (scenario)
Slope (triggering condition) -> lead vehicle braking, approaching, cut-in, cut-out (scenario)

gap(t) = inital_gap(thw=1.1s) - 0.5*a*t^2
Make heatmap of impact speed

In [None]:
make_heatplot_fp(10, 100, 5, 40, 1.0, overwrite=OVERWRITE)

In [None]:
make_heatplot_fp(10, 100, 5, 40, 0.6, overwrite=OVERWRITE)

In [None]:
make_heatplot_fp(10, 100, 5, 40, 0.3, overwrite=OVERWRITE)

#### Heatmap for impact speed

In [None]:
def find_reaction_time_collision(speed, distance, thw):
    minimum = 0.1
    maximum = 10
    if not is_collision(speed, distance, maximum, thw):
        return -1
    if is_collision(speed, distance, minimum, thw):
        return minimum
    while (maximum - minimum) >= 0.01:
        if is_collision(speed, distance, (maximum+minimum)/2, thw):
            maximum = (maximum + minimum) / 2
        else:
            minimum = (maximum + minimum) / 2
    return maximum

In [None]:
def get_new_cdf(xmin, xmax=5):
    x = np.linspace(xmin, xmax, 100)
    y = np.array([cdf(xx) for xx in x])
    y -= y[0]
    y /= y[-1]
    return x, y

In [None]:
def make_heatplot_fp_vimpact(dmin, dmax, vmin, vmax, thw=1.0, overwrite=False):
    name = "fp_vimpact_{:d}_{:d}_{:d}_{:d}_{:02.0f}".format(dmin, dmax, vmin, vmax, 10*thw)
    filename = os.path.join("data", "7_simulation_results", "{:s}.p".format(name))
    
    distances = np.linspace(dmin, dmax, 50)
    speeds = np.linspace(vmin, vmax, 50)
    n_simulations = 20
        
    if not overwrite and os.path.exists(filename):
        with open(filename, "rb") as file:
            vimpact = pickle.load(file)
    else:
        vimpact = np.zeros((len(distances), len(speeds), n_simulations))
        for i, distance in enumerate(tqdm(distances)):
            for j, speed in enumerate(speeds):
                min_reaction_time = find_reaction_time_collision(speed, distance, thw)
                if min_reaction_time < 0:
                    continue
                cdfx, cdfy = get_new_cdf(min_reaction_time)
                cdf_q = np.linspace(.5/n_simulations, 1-.5/n_simulations, n_simulations)
                for k in range(n_simulations):
                    reaction_time = np.interp(cdf_q[k], cdfy, cdfx)
                    result = fp_simulation(speed, distance, reaction_time, thw)[1]
                    if result < 0:  # Only use result if there is an impact.
                        vimpact[i, j, k] = -result
        with open(filename, "wb") as file:
            pickle.dump(vimpact, file)
    
    f, ax = plt.subplots(1, 1)
    heatmap = plt.contourf(speeds, distances, np.mean(vimpact, axis=2), cmap='hot')
    cbar = f.colorbar(heatmap)
    plt.title("THW of follower is {:3.1f} s".format(thw))
    plt.xlabel("Speed [m/s]")
    plt.ylabel("Initial distance ego vehicle to ghost object [m]")
    cbar.ax.set_ylabel("Expected impact speed with collision [m/s]")
    
    f.savefig(os.path.join("figs", "{:s}.png".format(name)))

In [None]:
make_heatplot_fp_vimpact(10, 100, 5, 40, 1.0, overwrite=OVERWRITE)

In [None]:
make_heatplot_fp_vimpact(10, 100, 5, 40, 0.6, overwrite=OVERWRITE)

In [None]:
make_heatplot_fp_vimpact(10, 100, 5, 40, 0.3, overwrite=OVERWRITE)

## Estimating risk for false negatives

#### Single simulation

In [None]:
s_fn = SimulationApproaching([IDMPlus()], [idm_approaching_pars], min_simulation_time=5)

In [None]:
s_fn.simulation(dict(vego=10, init_position=25.1, amin=-6, reactiontime=1.57,
                     ratio_vtar_vego=0), plot=True)

In [None]:
s_fn = SimulationApproaching([ACCHDM()], [acc_idm_approaching_pars], min_simulation_time=5)

In [None]:
s_fn.simulation(dict(vego=10, ratio_vtar_vego=0, amin=-6, reactiontime=1.59, k1_acc=0, k2_acc=0),
             plot=True)

#### Compute collision probability over varying speed

In [None]:
def find_breakpoint(func, minimum=.2, maximum=5, tol=1e-5, **kwargs):
    """ Find the point where the function switches from positive to negative.
    
    :param func: The function for which to find the change point.
    :param minimum: The minimum x-value to search for.
    :param maximum: The maximum x-value to search for.
    :param tol: Search until (cdf(maximum)-cdf(minimum)) is within this tolerance.
    :param kwargs: Parameters to be passed to func.
    :return: (x of change point, probability that sample is larger than x,
              latest simulation result)
    """
    result = func(maximum, **kwargs)[0]
    if result > 0:
        return maximum, 0.0, result
    result = func(minimum, **kwargs)[0]
    if result < 0:
        minimum, 1.0, result
    while cdf(maximum) - cdf(minimum) > tol:
        result = func((maximum+minimum)/2, **kwargs)[0]
        if result > 0:
            minimum = (maximum+minimum)/2
        else:
            maximum = (maximum+minimum)/2
    return (maximum+minimum)/2, 1-cdf((maximum+minimum)/2), result

In [None]:
def func_fn(reactiontime, speed, ttcwarning):
    return s_fn.simulation(dict(vego=speed, ratio_vtar_vego=0, amin=-6, 
                                init_position=ttcwarning*speed, reactiontime=reactiontime))[0]

In [None]:
def make_heatplot_fn(vmin, vmax, ttcmin, ttcmax, overwrite=False):
    name = "fn_prob_v{:d}_{:d}_ttc{:d}_{:d}".format(vmin, vmax, ttcmin, ttcmax)
    filename = os.path.join("data", "7_simulation_results", "{:s}.p".format(name))
    
    speeds = np.linspace(vmin, vmax, 50)
    ttcs = np.linspace(ttcmin, ttcmax, 50)
    
    if not overwrite and os.path.exists(filename):
        with open(filename, "rb") as file:
            prob_collision = pickle.load(file)
    else:
        prob_collision = np.zeros((len(ttcs), len(speeds)))
        for i, ttc in enumerate(tqdm(ttcs)):
            for j, speed in enumerate(speeds):
                prob_collision[i, j] = find_breakpoint(func_fn, speed=speed, ttcwarning=ttc)[1]
        with open(filename, "wb") as file:
            pickle.dump(prob_collision, file)
    
    f, ax = plt.subplots(1, 1)
    heatmap = plt.contourf(speeds, ttcs, prob_collision, cmap='hot', levels=np.linspace(0, 1, 6))
    cbar = f.colorbar(heatmap)
    plt.title("Probability of collision")
    plt.xlabel("Initial speed of ego vehicle [m/s]")
    plt.ylabel("TTC at which warning is given [s]")
    cbar.ax.set_ylabel("Probability of collision with front vehicle")
    
    f.savefig(os.path.join("figs", "{:s}.png".format(name)))

In [None]:
make_heatplot_fn(5, 30, 1, 4, overwrite=OVERWRITE)

#### Compute expected impact speed

In [None]:
def make_heatplot_fn_vimpact(vmin, vmax, ttcmin, ttcmax, overwrite=False):
    name = "fn_vimpact_v{:d}_{:d}_ttc{:d}_{:d}".format(vmin, vmax, ttcmin, ttcmax)
    filename = os.path.join("data", "7_simulation_results", "{:s}.p".format(name))
    
    speeds = np.linspace(vmin, vmax, 50)
    ttcs = np.linspace(ttcmin, ttcmax, 50)
    n_simulations = 20
    
    if not overwrite and os.path.exists(filename):
        with open(filename, "rb") as file:
            vimpact = pickle.load(file)
    else:
        vimpact = np.zeros((len(ttcs), len(speeds), n_simulations))
        for i, ttc in enumerate(tqdm(ttcs)):
            for j, speed in enumerate(speeds):
                min_reaction_time = find_breakpoint(func_fn, speed=speed, ttcwarning=ttc)[0]
                if min_reaction_time < 0:
                    continue
                cdfx, cdfy = get_new_cdf(min_reaction_time)
                cdf_q = np.linspace(.5/n_simulations, 1-.5/n_simulations, n_simulations)
                for k in range(n_simulations):
                    reaction_time = np.interp(cdf_q[k], cdfy, cdfx)
                    result = func_fn(reaction_time, speed, ttc)
                    if result < 0:  # Only use result if there is an impact.
                        vimpact[i, j, k] = -result
        with open(filename, "wb") as file:
            pickle.dump(vimpact, file)
    
    f, ax = plt.subplots(1, 1)
    heatmap = plt.contourf(speeds, ttcs, np.mean(vimpact, axis=2), cmap='hot')
    cbar = f.colorbar(heatmap)
    plt.title("Expected impact speed in case of collision")
    plt.xlabel("Initial speed of ego vehicle [m/s]")
    plt.ylabel("TTC at which warning is given [s]")
    cbar.ax.set_ylabel("Expected impact speed with collision [m/s]")
    
    f.savefig(os.path.join("figs", "{:s}.png".format(name)))

In [None]:
make_heatplot_fn_vimpact(5, 30, 1, 4, overwrite=OVERWRITE)

## Construct the KDEs

#### Lead vehicle braking

Three parameters:

- Initial speed ego vehicle (and lead vehicle);
- Mean deceleration of lead vehicle;
- Speed difference of lead vehicle.

In [None]:
filename = os.path.join("data", "6_kde", "scenario_lead_braking.p")
if os.path.exists(filename) and not OVERWRITE:
    kde_lead_braking = kde_from_file(filename)
else:
    pars = []
    for key in leaddec.collections["scenario"]:
        scenario = leaddec.get_item("scenario", key)
        
        vstart, vdiff, amean = 0, 0, 0
        for activity in scenario.activities:
            if activity.name == "deceleration target":
                vstart, vend = activity.get_state(time=[activity.get_tstart(), 
                                                        activity.get_tend()])[0]
                vdiff = vstart-vend
                amean = vdiff/(activity.get_tend()-activity.get_tstart())
                break

        if vstart > 0 and vdiff > 0 and amean > 0:
            pars.append([vstart, amean, vdiff])

    kde_lead_braking = KDE(np.array(pars))
    kde_lead_braking.compute_bandwidth()
    kde_lead_braking.pickle(filename)

#### Approaching slower vehicle

Two parameters:

- Initial speed ego vehicle;
- Ratio speed ego vehicle over speed target vehicle [0, 1).

In [None]:
filename = os.path.join("data", "6_kde", "scenario_approaching.p")
if os.path.exists(filename) and not OVERWRITE:
    kde_approaching = kde_from_file(filename)
else:
    pars = []
    for key in approaching.collections["scenario"]:
        scenario = approaching.get_item("scenario", key)
        vego = scenario.get_state(scenario.get_actor_by_name("ego vehicle"), StateVariable.SPEED,
                                  scenario.get_tstart())
        vtarget = scenario.get_state(scenario.get_actor_by_name("target vehicle"),
                                     StateVariable.LON_TARGET, scenario.get_tstart())[0]
        if vego > vtarget:
            pars.append([vego, vtarget/vego])

    kde_approaching = KDE(np.array(pars))
    kde_approaching.compute_bandwidth()
    kde_approaching.pickle(filename)

#### Cut-in

Three parameters

- Distance at cut-in
- Speed lead vehicle
- Speed ego vehicle

In [None]:
filename = os.path.join("data", "6_kde", "scenario_cutin.p")
if os.path.exists(filename) and not OVERWRITE:
    kde_cutin = kde_from_file(filename)
else:
    pars = []
    for key in cutins.collections["scenario"]:
        scenario = cutins.get_item("scenario", key)
        t_center = (scenario.get_tstart() + scenario.get_tend())/2
        vtarget, distance = scenario.get_state(scenario.get_actor_by_name("target vehicle"), 
                                               StateVariable.LON_TARGET, t_center)
        vego = scenario.get_state(scenario.get_actor_by_name("ego vehicle"), StateVariable.SPEED,
                                  t_center)
        if distance > 0:
            pars.append([distance, vtarget, vego])

    kde_cutin = KDE(np.array(pars))
    kde_cutin.compute_bandwidth()
    kde_cutin.pickle(filename)

## Risk in 'normal' situation

To calculate the risk, we need to apply importance sampling!

In [None]:
# Case study options list
CaseStudy = NamedTuple("CaseStudy", [("name", str),
                                     ("n", int),
                                     ("parameters", List[str]),
                                     ("default_parameters", dict),
                                     ("percentile", int),
                                     ("simulator", SimulationString),
                                     ("kde", KDE),
                                     ("func_validity_check", Callable),
                                     ("func_get_result", Callable),
                                     ("func_process_result", Callable)])

In [None]:
def generate_parms(options, par_func):
    np.random.seed(0)
    pars = np.zeros((options.n, len(options.parameters)))
    tries = np.zeros(options.n)
    i = 0
    while i < options.n:
        pars[i, :] = par_func()[0]
        tries[i] += 1
        if options.func_validity_check(pars[i, :]):
            i += 1
    df = pd.DataFrame(data=pars, columns=options.parameters)
    df["tries"] = tries
    return df, pars

In [None]:
def do_simulations(df, pars, options):
    result = np.zeros(options.n)
    kpi = np.zeros(options.n)
    for i, par in enumerate(tqdm(pars)):
        par_dict = dict(zip(options.parameters, par))
        par_dict.update(options.default_parameters)
        result[i] = options.func_get_result(options, par_dict)
        kpi[i] = options.func_process_result(result[i])
    df["result"] = result
    df["kpi"] = kpi

In [None]:
def monte_carlo(options, overwrite=False):
    filename_df = os.path.join("data", "7_simulation_results",
                               "{:s}_mc.csv".format(options.name))
    if os.path.exists(filename_df) and not overwrite:
        return pd.read_csv(filename_df, index_col=0)
    
    df, pars = generate_parms(options, options.kde.sample)
    do_simulations(df, pars, options)
    df.to_csv(filename_df)
    return df

In [None]:
def mc_result(df_mc):
    prob = np.mean(df_mc["kpi"])
    sigma = np.sqrt(np.sum((df_mc["kpi"] - prob)**2)) / len(df_mc)
    print("Monte Carlo:         Probability of collision: {:.3e} %".format(prob), end="")
    print(" +/- {:.3e} %".format(sigma), end="")
    print(" ({:d} simulations)".format(len(df_mc)))

In [None]:
def create_is(df_mc, options, overwrite=False):
    filename = os.path.join("data", "6_kde", "{:s}_is.p".format(options.name))
    if os.path.exists(filename) and not overwrite:
        return kde_from_file(filename)
    
    df_sorted = df_mc.sort_values("result")
    df_is = df_sorted.iloc[:options.n*options.percentile//100]
    data = df_is[options.parameters].values
    kde = KDE(data)
    kde.compute_bandwidth()
    kde.pickle(filename)
    return kde

In [None]:
def importance_sampling(kde, options, overwrite=False):
    filename_df = os.path.join("data", "7_simulation_results",
                               "{:s}_is.csv".format(options.name))
    if os.path.exists(filename_df) and not overwrite:
        return pd.read_csv(filename_df, index_col=0)
    
    # Generate the parameters.
    df, pars = generate_parms(options, kde.sample)
    df["density_orig"] = options.kde.score_samples(pars)
    df["density_is"] = kde.score_samples(pars)
    do_simulations(df, pars, options)
    
    # Write to file
    df.to_csv(filename_df)
    return df

In [None]:
def is_result(df_is, df_mc):
    values = df_is["kpi"] * df_is["density_orig"] / df_is["density_is"]
    values *= np.sum(df_mc["tries"]) / len(df_mc)
    values /= np.sum(df_is["tries"]) / len(df_is)
    prob = np.mean(values)
    sigma = np.sqrt(np.sum((values - prob)**2)) / len(values)
    print("Importance sampling: Probability of collision: {:.3e} %".format(prob), end="")
    print(" +/- {:.3e} %".format(sigma), end="")
    print(" ({:d} simulations)".format(len(values)))

In [None]:
def case_study(options, overwrite=False):
    df_mc = monte_carlo(options, overwrite=overwrite)
    mc_result(df_mc)
    kde_is = create_is(df_mc, options, overwrite=overwrite)
    df_is = importance_sampling(kde_is, options, overwrite=overwrite)
    is_result(df_is, df_mc)
    return df_mc, df_is

#### Lead vehicle braking

In [None]:
def get_kpi_idm(result):
    return max(1-result, 0)
def get_kpi_acc(result):
    if result < 0:
        return 1
    return 0
def check_lb_pars(pars):
    if pars[2] <= 0 or pars[2] > pars[0] or pars[1] <= 0:  # dv<0, vend>=0, amean>0
        return False
    return True
def func_for_breakpoint(reactiontime, options, **kwargs):
    pars = dict(reactiontime=reactiontime, **kwargs)
    return options.simulator.simulation(pars)
def func_idm(options, par_dict):
    _, prob, result = find_breakpoint(func_for_breakpoint, options=options, **par_dict)
    if prob > 0:
        return 1 - prob
    else:
        return 1 + result
def func_acc(options, par_dict):
    return options.simulator.simulation(par_dict)
    
s_lb_idm = SimulationLeadBraking(follower=IDMPlus(), follower_parameters=idm_lead_braking_pars)
s_lb_acc = SimulationLeadBraking(follower=ACC(), follower_parameters=acc_lead_braking_pars)
s_lb_accidm = SimulationLeadBraking(follower=ACCHDM(), follower_parameters=acc_hdm_lead_braking_pars)

In [None]:
default_parameters = dict(n=2000,
                          default_parameters=dict(amin=-6),
                          parameters=["v0", "amean", "dv"],
                          percentile=10,
                          kde=kde_lead_braking,
                          func_validity_check=check_lb_pars)
parameters = dict(name="lead_braking_idm",
                  simulator=s_lb_idm,
                  func_get_result=func_idm,
                  func_process_result=get_kpi_idm)
parameters.update(default_parameters)
df_mc, df_is = case_study(CaseStudy(**parameters), overwrite=OVERWRITE)

In [None]:
parameters = dict(name="lead_braking_acc",
                  simulator=s_lb_acc,
                  func_get_result=func_acc,
                  func_process_result=get_kpi_acc)
parameters.update(default_parameters)
df_mc, df_is = case_study(CaseStudy(**parameters), overwrite=OVERWRITE)

In [None]:
plt.plot(df_mc["dv"], df_mc["v0"], '.', label="Normal sampling")
plt.plot(df_is["dv"], df_is["v0"], '.', label="Importance sampling")
plt.xlabel("Speed difference [m/s]")
plt.ylabel("Initial speed [m/s]")
plt.legend()

In [None]:
parameters = dict(name="lead_braking_accidm",
                  simulator=s_lb_accidm,
                  func_get_result=func_idm,
                  func_process_result=get_kpi_idm)
parameters.update(default_parameters)
_ = case_study(CaseStudy(**parameters), overwrite=OVERWRITE)

#### Approaching slower vehicle

In [None]:
def check_as_pars(pars):
    if pars[0] <= 0 or pars[1] < 0 or pars[1] >= 1:  # vego>0, 0<=ratio_vtar_vego<1
        return False
    return True
    
s_as_idm = SimulationApproaching([IDMPlus()], [idm_approaching_pars], min_simulation_time=5)
s_as_acc = SimulationApproaching([ACC()], [acc_approaching_pars], min_simulation_time=5)
s_as_accidm = SimulationApproaching([ACCHDM()], [acc_idm_approaching_pars], min_simulation_time=5)

In [None]:
default_parameters = dict(n=2000,
                          default_parameters=dict(amin=-6),
                          parameters=["vego", "ratio_vtar_vego"],
                          percentile=10,
                          kde=kde_approaching,
                          func_validity_check=check_as_pars)
parameters = dict(name="approaching_slower_idm",
                  simulator=s_as_idm,
                  func_get_result=func_idm,
                  func_process_result=get_kpi_idm)
parameters.update(default_parameters)
_ = case_study(CaseStudy(**parameters), overwrite=OVERWRITE)

In [None]:
parameters = dict(name="approaching_slower_acc",
                  simulator=s_as_acc,
                  func_get_result=func_acc,
                  func_process_result=get_kpi_acc)
parameters.update(default_parameters)
_ = case_study(CaseStudy(**parameters), overwrite=OVERWRITE)

In [None]:
parameters = dict(name="approaching_slower_accidm",
                  simulator=s_as_accidm,
                  func_get_result=func_idm,
                  func_process_result=get_kpi_idm)
parameters.update(default_parameters)
df_mc, df_is = case_study(CaseStudy(**parameters), overwrite=OVERWRITE)

#### Cut-in

In [None]:
def check_ci_pars(pars):
    if pars[0] <= 0 or pars[1] < 0 or pars[2] <= 0:  # vego>0, 0<=ratio_vtar_vego<1
        return False
    return True
    
s_ci_idm = SimulationCutIn(IDMPlus(), idm_cutin_pars, min_simulation_time=5)
s_ci_acc = SimulationCutIn(ACC(), acc_cutin_pars, min_simulation_time=5)
s_ci_accidm = SimulationCutIn(ACCHDM(), acc_idm_cutin_pars, min_simulation_time=5)

In [None]:
default_parameters = dict(n=2000,
                          default_parameters=dict(amin=-6),
                          parameters=["dinit", "vlead", "vego"],
                          percentile=10,
                          kde=kde_cutin,
                          func_validity_check=check_ci_pars)
parameters = dict(name="cutin_idm",
                  simulator=s_ci_idm,
                  func_get_result=func_idm,
                  func_process_result=get_kpi_idm)
parameters.update(default_parameters)
_ = case_study(CaseStudy(**parameters), overwrite=OVERWRITE)

In [None]:
parameters = dict(name="cutin_acc",
                  simulator=s_ci_acc,
                  func_get_result=func_acc,
                  func_process_result=get_kpi_acc)
parameters.update(default_parameters)
df_mc, df_is = case_study(CaseStudy(**parameters), overwrite=OVERWRITE)

In [None]:
plt.plot(df_mc["dinit"], df_mc["vego"]-df_mc["vlead"], '.', label="Normal sampling")
plt.plot(df_is["dinit"], df_is["vego"]-df_mc["vlead"], '.', label="Importance sampling")
plt.xlabel("Distance at cut-in [m]")
plt.ylabel("Speed ego vehicle [m/s]")
plt.legend()

In [None]:
parameters = dict(name="cutin_slower_accidm",
                  simulator=s_ci_accidm,
                  func_get_result=func_idm,
                  func_process_result=get_kpi_idm)
parameters.update(default_parameters)
df_mc, df_is = case_study(CaseStudy(**parameters), overwrite=OVERWRITE)

In [None]:
def risk(overwrite=False):
    nmc = 1000

In [None]:
def risk_lead_braking(follower, follower_parms, name_follower, amin=-3, overwrite=False):
    nmc = 10000
    np.random.seed(0)
    
    name = "risk_lowmu_leadbraking_{:s}_amin{:.0f}".format(name_follower, int(-amin))
    filename = os.path.join("data", "7_simulation_results", "{:s}.p".format(name))
    if os.path.exists(filename) and not overwrite:
        with open(filename, "rb") as file:
            result = pickle.load(file)
    else:
        simulator = SimulationLeadBraking(follower=follower, follower_parameters=follower_parms)
        result = np.zeros(nmc)
        for i in tqdm(range(nmc)):
            pars = kde_lead_braking.sample()[0]
            while not check_validity_lead_braking(pars):
                pars = kde_lead_braking.sample()[0]
            result[i] = simulator.simulation(dict(v0=pars[0], amean=pars[1], dv=pars[2], amin=amin))
        
        with open(filename, "wb") as file:
            pickle.dump(result, file)
    
    print("Probability of collision: {:.4f}".format(np.mean(result < 0)))
    return result

## Risks with low-$\mu$

#### Lead vehicle decelerating

In [None]:
def risk_lowmu_lead_braking(follower, follower_parms, name_follower, amin=-3, overwrite=False):
    nmc = 10000
    np.random.seed(0)
    
    name = "risk_lowmu_leadbraking_{:s}_amin{:.0f}".format(name_follower, int(-amin))
    filename = os.path.join("data", "7_simulation_results", "{:s}.p".format(name))
    if os.path.exists(filename) and not overwrite:
        with open(filename, "rb") as file:
            result = pickle.load(file)
    else:
        simulator = SimulationLeadBraking(follower=follower, follower_parameters=follower_parms)
        result = np.zeros(nmc)
        for i in tqdm(range(nmc)):
            pars = kde_lead_braking.sample()[0]
            while not check_validity_lead_braking(pars):
                pars = kde_lead_braking.sample()[0]
            result[i] = simulator.simulation(dict(v0=pars[0], amean=pars[1], dv=pars[2], amin=amin))
        
        with open(filename, "wb") as file:
            pickle.dump(result, file)
    
    print("Probability of collision: {:.4f}".format(np.mean(result < 0)))
    return result

In [None]:
r_idm = risk_lowmu_lead_braking(IDMPlus(), idm_lead_braking_pars, "idmplus")

In [None]:
r_acc = risk_lowmu_lead_braking(ACC(), acc_lead_braking_pars, "acc")

In [None]:
r_accidm = risk_lowmu_lead_braking(ACCHDM(), acc_hdm_lead_braking_pars, "accidm")

#### Approaching slower vehicle

In [None]:
def risk_lowmu_approaching(follower, follower_parms, name_follower, amin=-3, overwrite=False):
    nmc = 10000
    np.random.seed(0)
    
    name = "risk_lowmu_approaching_{:s}_amin{:.0f}".format(name_follower, int(-amin))
    filename = os.path.join("data", "7_simulation_results", "{:s}.p".format(name))
    if os.path.exists(filename) and not overwrite:
        with open(filename, "rb") as file:
            result = pickle.load(file)
    else:
        simulator = SimulationApproaching([follower], [follower_parms], min_simulation_time=5)
        result = np.zeros(nmc)
        for i in tqdm(range(nmc)):
            pars = kde_approaching.sample()[0]
            while not check_validity_approaching(pars):
                pars = kde_approaching.sample()[0]
            result[i] = simulator.simulation(dict(vego=pars[0], ratio_vtar_vego=pars[1], amin=amin))
        
        with open(filename, "wb") as file:
            pickle.dump(result, file)
    
    print("Probability of collision: {:.4f}".format(np.mean(result < 0)))
    return result

In [None]:
r_idm = risk_lowmu_approaching(IDMPlus(), idm_approaching_pars, "idmplus")

In [None]:
r_acc = risk_lowmu_approaching(ACC(), acc_approaching_pars, "acc")

In [None]:
r_accidm = risk_lowmu_approaching(ACCHDM(), acc_idm_approaching_pars, "accidm")

#### Cut-in

In [None]:
def check_validity_cutin(pars):
    # Speeds should be strictly positive
    if pars[1] <= 0 or pars[2] <= 0:
        return False
    
    # Initial distance should be strictly positive
    if pars[0] <= 0:
        return False
    
    return True

In [None]:
def risk_lowmu_cutin(follower, follower_parms, name_follower, amin=-3, overwrite=False):
    nmc = 10000
    np.random.seed(0)
    
    name = "risk_lowmu_cutin_{:s}_amin{:.0f}".format(name_follower, int(-amin))
    filename = os.path.join("data", "7_simulation_results", "{:s}.p".format(name))
    if os.path.exists(filename) and not overwrite:
        with open(filename, "rb") as file:
            result = pickle.load(file)
    else:
        simulator = SimulationApproaching([follower], [follower_parms], min_simulation_time=5)
        result = np.zeros(nmc)
        for i in tqdm(range(nmc)):
            pars = kde_cutin.sample()[0]
            while not check_validity_cutin(pars):
                pars = kde_cutin.sample()[0]
                
            # If speed target is faster, no need to simulate
            if pars[1] >= pars[2]:
                result[i] = pars[0]
                continue
                
            result[i] = simulator.simulation(dict(vego=pars[2], ratio_vtar_vego=pars[1]/pars[2], 
                                                  amin=amin, init_position=pars[0]))
        
        with open(filename, "wb") as file:
            pickle.dump(result, file)
    
    print("Probability of collision: {:.4f}".format(np.mean(result < 0)))
    return result

In [None]:
r_idm = risk_lowmu_cutin(IDMPlus(), idm_approaching_pars, "idmplus")

In [None]:
r_acc = risk_lowmu_cutin(ACC(), acc_approaching_pars, "acc")

In [None]:
r_accidm = risk_lowmu_cutin(ACCHDM(), acc_idm_approaching_pars, "accidm")

## Risk of late notice

#### Lead vehicle decelerating

In [None]:
def risk_latenotice_lead_braking(follower, follower_parms, name_follower, distance_notice=60, 
                                 overwrite=False):
    nmc = 10000
    np.random.seed(0)
    
    name = "risk_latenotice_leadbraking_{:s}_d{:d}".format(name_follower, int(distance_notice))
    filename = os.path.join("data", "7_simulation_results", "{:s}.p".format(name))
    if os.path.exists(filename) and not overwrite:
        with open(filename, "rb") as file:
            result = pickle.load(file)
    else:
        simulator = SimulationLeadBraking(follower=follower, follower_parameters=follower_parms)
        result = np.zeros(nmc)
        for i in tqdm(range(nmc)):
            pars = kde_lead_braking.sample()[0]
            while not check_validity_lead_braking(pars):
                pars = kde_lead_braking.sample()[0]
                
            parms = dict(v0=pars[0], amean=pars[1], dv=pars[2], amin=-6)
            if name_follower in ["idmplus", "accidm"]:
                parms["max_view"] = distance_notice
            if name_follower in ["acc", "accidm"]:
                parms["sensor_range"] = distance_notice
            result[i] = simulator.simulation(parms)
        
        with open(filename, "wb") as file:
            pickle.dump(result, file)
    
    print("Probability of collision: {:.4f}".format(np.mean(result < 0)))
    return result

In [None]:
r_idm = risk_latenotice_lead_braking(IDMPlus(), idm_lead_braking_pars, "idmplus")

In [None]:
r_acc = risk_latenotice_lead_braking(ACC(), acc_lead_braking_pars, "acc")

In [None]:
r_accidm = risk_latenotice_lead_braking(ACCHDM(), acc_hdm_lead_braking_pars, "accidm")

#### Approaching slower vehicle

In [None]:
def risk_latenotice_approaching(follower, follower_parms, name_follower, , 
                                overwrite=False):
    nmc = 10000
    np.random.seed(0)
    
    name = "risk_latenotice_approaching_{:s}_d{:.0f}".format(name_follower, int(distance_notice))
    filename = os.path.join("data", "7_simulation_results", "{:s}.p".format(name))
    if os.path.exists(filename) and not overwrite:
        with open(filename, "rb") as file:
            result = pickle.load(file)
    else:
        simulator = SimulationApproaching([follower], [follower_parms], min_simulation_time=5)
        result = np.zeros(nmc)
        for i in tqdm(range(nmc)):
            pars = kde_approaching.sample()[0]
            while not check_validity_approaching(pars):
                pars = kde_approaching.sample()[0]
            parms = dict(vego=pars[0], ratio_vtar_vego=pars[1], amin=-6)
            if name_follower in ["idmplus", "accidm"]:
                parms["max_view"] = distance_notice
            if name_follower in ["acc", "accidm"]:
                parms["sensor_range"] = distance_notice
            result[i] = simulator.simulation(parms)
        
        with open(filename, "wb") as file:
            pickle.dump(result, file)
    
    print("Probability of collision: {:.4f}".format(np.mean(result < 0)))
    return result

In [None]:
r_idm = risk_latenotice_approaching(IDMPlus(), idm_approaching_pars, "idmplus")

In [None]:
r_acc = risk_latenotice_approaching(ACC(), acc_approaching_pars, "acc")

In [None]:
r_accidm = risk_latenotice_approaching(ACCHDM(), acc_idm_approaching_pars, "accidm")

#### Cut-in

In [None]:
def risk_lowmu_cutin(follower, follower_parms, name_follower, amin=-3, overwrite=False):
    nmc = 10000
    np.random.seed(0)
    
    name = "risk_lowmu_cutin_{:s}_amin{:.0f}".format(name_follower, int(-amin))
    filename = os.path.join("data", "7_simulation_results", "{:s}.p".format(name))
    if os.path.exists(filename) and not overwrite:
        with open(filename, "rb") as file:
            result = pickle.load(file)
    else:
        simulator = SimulationApproaching([follower], [follower_parms], min_simulation_time=5)
        result = np.zeros(nmc)
        for i in tqdm(range(nmc)):
            pars = kde_cutin.sample()[0]
            while not check_validity_cutin(pars):
                pars = kde_cutin.sample()[0]
                
            # If speed target is faster, no need to simulate
            if pars[1] >= pars[2]:
                result[i] = pars[0]
                continue
                
            result[i] = simulator.simulation(dict(vego=pars[2], ratio_vtar_vego=pars[1]/pars[2], 
                                                  amin=amin, init_position=pars[0]))
        
        with open(filename, "wb") as file:
            pickle.dump(result, file)
    
    print("Probability of collision: {:.4f}".format(np.mean(result < 0)))
    return result