In [2]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.collections import LineCollection
from matplotlib.colors import ListedColormap, BoundaryNorm
from tqdm import tqdm
import pandas as pd
import seaborn as sns


In [106]:
%matplotlib qt

def surface_friction_dynamics(patch_locs=None, patch_sizes=None, friction_values=None, is_growings=None, road_length=None):
    num_patches = 10
    min_patch_size = 10 # m
    max_patch_size = 0.5 * road_length/num_patches # m
    if patch_locs is None:
        # initialize random patches        
        patch_locs = np.random.uniform(0, road_length, num_patches)
        is_growings = np.random.choice([True, False], num_patches)
        patch_sizes = np.random.uniform(min_patch_size, max_patch_size, num_patches)
        friction_values = np.random.uniform(0.2, 0.6, num_patches)
        return patch_locs, patch_sizes, friction_values, is_growings
    for i in range(len(patch_locs)):
        if np.random.rand() < 0.01*dt:
            # randomly change its location and size
            patch_locs[i] = np.random.uniform(0, road_length)
            patch_sizes[i] = np.random.uniform(min_patch_size, max_patch_size)
    return patch_locs, patch_sizes, friction_values, is_growings


def get_friction(x, patch_locs, patch_sizes, friction_values, road_length):
    # if any x is more than the road_length, wrap it around
    while any(x > road_length):
        x = np.where(x > road_length, x - road_length, x)
    friction = np.full_like(x, 0.8)  # Initialize with default friction value of 0.1
    for i in range(len(patch_locs)):
        patch_start = patch_locs[i]
        patch_end = patch_locs[i] + patch_sizes[i]
        # Apply friction values only within the patch bounds
        mask = (x >= patch_start) & (x < patch_end)
        friction[mask] = friction_values[i]
    return friction

def get_friction_estimate(x, x_, x_context, friction_context):
    # for each p and p_ in x and x_ find the index of context points between p and p_
    # then take the average of friction values of those context points
    fric_est = np.zeros_like(x)
    for i in range(len(x)):
        idx = np.where((x_context > x[i]) & (x_context < x_[i]))[0]
        # if there are no context points between x and x_, take the closest context point
        if len(idx) == 0:
            idx = np.argmin(np.abs(x_context - x[i]))
            fric_est[i] = friction_context[idx]
        else:    
            fric_est[i] = np.mean(friction_context[idx])
    return fric_est
    



def intelligent_driver_model(x, v, x_, v_, v_desired, s_0, vehicle_length, T=2, friction_estimates=0.5):
    delta = 4
    a = 2.5 # m/s^2 (comfortable acceleration)
    b = 4 # m/s^2 (comfortable deceleration)
    reaction_time = 0.5 # s

    T = reaction_time + v/(2*friction_estimates*9.81)

    s = x_ - x - vehicle_length
    delta_v = v - v_
    s_desired = s_0 + v*T + (v*delta_v) / (2 * np.sqrt(a * b))
    v_dot = a * (1 - (v/v_desired)**delta - (s_desired/s)**2)
    # make sure vechiles are not going backwards (negative velocity)
    v_dot[v <= 0] = np.maximum(v_dot[v <= 0], 0)
    return v_dot

num_vehicles = 100
ring_radius = 1e3 # m
max_speed = 40 # m/s
confortable_acceleration = 3 # m/s^2
confortable_deceleration = 5 # m/s^2
desired_min_space = 1 # m
vehicle_length = 5 # m
dt = 0.1 # s
max_timesteps = 36_000 # s
is_crashed = np.zeros(num_vehicles, dtype=bool)
online_plot = False

s = np.array([0])

tries = 0
while any(s < 0.01):
    tries += 1
    theta = np.random.uniform(0, 2*np.pi, num_vehicles)
    theta = np.sort(theta)
    # v = np.random.uniform(0, max_speed, num_vehicles)
    v = np.zeros(num_vehicles)
    x = ring_radius * theta
    x_ = np.roll(x, -1)
    x_[-1] = x[0] + ring_radius * 2 * np.pi
    s = x_ - x - vehicle_length
    if tries > 10000:
        raise Exception("Could not find a valid initial condition after 1000 tries. Try increasing the ring radius or decreasing the number of vehicles.")




num_context_points = 200
num_crashed = 0
data = np.zeros((max_timesteps, num_vehicles, 5))
context = []
# df = pd.DataFrame(data, columns=['t','x', 'v', 'is_crashed', 'friction_true', 'friction_measurement'])
t = 0
i = 0
patch_locs, patch_sizes, friction_values, is_growings = surface_friction_dynamics(road_length=ring_radius * 2 * np.pi)

if online_plot:
    plt.close('all')
    fig, axs = plt.subplots(1, 2)
    # axs[1] = plt.subplot(1, 2, 2, polar=True)

####################################################################################################
######################################### START SIMULATION #########################################
####################################################################################################

for i in tqdm(range(max_timesteps)):
    x_ = np.roll(x, -1)
    x_[-1] = x[0] + ring_radius * 2 * np.pi
    v_ = np.roll(v, -1)


    """ Crash Simulator"""
    # if vehicle is_crashed, set its velocity to 0 for the rest of the simulation
    is_crashed_new = x_ - x < vehicle_length
    is_crashed = is_crashed | is_crashed_new
    if any(is_crashed):
        # print(is_crashed)
        idx = np.where(is_crashed)[0]
        v[idx] = 0
    # number of vehicles that have crashed
    num_crashed = np.sum(is_crashed)
    

    """Stop Sign Simulator"""
    # randomly select 1/10 of vehicles to change its x_ value for a 30-sec stop (assuming dt=0.1s)
    num_stop_signs = num_vehicles // 10
    if i % 300 == 0:
        stopped = np.random.choice(num_vehicles, num_stop_signs, replace=False)
        stop_locations = x_[stopped]

    x_[stopped] = stop_locations
    v_[stopped] = stop_locations*0
        

    fric_est = get_friction_estimate(x, x_, x_context, friction_context)
    # print(fric_est.shape)

    # fric_est = 0.8 * np.ones_like(x)

    """ Intelligent Driver Model"""
    driver_accelerations = intelligent_driver_model(x, v, x_, v_, max_speed, desired_min_space, vehicle_length, friction_estimates=fric_est)
    
    
    """ Dynamic Friction Simulator """
    patch_locs, patch_sizes, friction_values, is_growing = surface_friction_dynamics(patch_locs=patch_locs, 
                                                                                     patch_sizes=patch_sizes,
                                                                                       friction_values=friction_values,
                                                                                         is_growings=is_growings,
                                                                                           road_length=ring_radius * 2 * np.pi)
    
    # get friction values for each vehicle
    friction_trues = get_friction(x, patch_locs, patch_sizes, friction_values, ring_radius * 2 * np.pi)
    # make them noisy
    friction_measurements = friction_trues + np.random.normal(0, 0.2, num_vehicles)
    x_context = np.linspace(0, ring_radius * 2 * np.pi, num_context_points)
    # x_context = np.random.uniform(0, ring_radius * 2 * np.pi, num_context_points)
    x_context = np.sort(x_context)
    
    friction_context = get_friction(x_context, patch_locs, patch_sizes, friction_values, ring_radius * 2 * np.pi)
    # Create a colormap
    cmap = plt.get_cmap('coolwarm')
    norm = plt.Normalize(0.2, 0.8)
    X_plot = np.cos(x_context/ring_radius)
    Y_plot = np.sin(x_context/ring_radius)
    # Create line segments
    points = np.array([X_plot, Y_plot]).T.reshape(-1, 1, 2)
    segments = np.concatenate([points[:-1], points[1:]], axis=1)
    # Create the LineCollection object
    lc = LineCollection(segments, cmap=cmap, norm=norm, linewidths=10, alpha=.5)
    lc.set_array(friction_context)  # Set the colors according to z

    """ Apply Friction on the Dynamics"""
    # friction_trues = 1
    a = np.clip(driver_accelerations, -9.81*friction_trues, 9.81*friction_trues)

    x += v * dt
    v += a * dt
    t += dt

    data[i, :, 0] = x
    data[i, :, 1] = v
    data[i, :, 2] = is_crashed
    data[i, :, 3] = friction_trues
    data[i, :, 4] = friction_measurements
    if i % int(10/dt) == 0:
        for i in range(len(x_context)):
            context.append([t, x_context[i], friction_context[i]])


    if online_plot:

        # make status_color array to default blue
        status_color = np.full(num_vehicles, 'b')
        # make the color of the vehicles that have abs(dirver_accelerations) more than value a yellow to indicate skidding
        status_color = np.where(np.abs(driver_accelerations) > np.abs(a), 'y', status_color) 
        # make the status_color red for vehicles that have crashed
        status_color = np.where(is_crashed, 'r', status_color)
        
        axs[0].add_collection(lc)
        axs[0].scatter(np.cos(x/ring_radius), np.sin(x/ring_radius), c=status_color, cmap='viridis', s=10)
        axs[0].set_xlim(-1.2, 1.2)
        axs[0].set_ylim(-1.2, 1.2)
        axs[0].set_title("Number of Crashes: {}".format(num_crashed))
        # plt.axes().axis('equal')
        # axcb = fig.colorbar(lc)  # Add a colorbar
        # axcb.set_label('Surface friction coefficient')

        # axs[1].cla()
        # axs[1].plot(np.linspace(0,2*np.pi,len(test_friction)), 1+test_friction, color='k', alpha=0.5)
        # axs[1].set_title("Friction")
        # axs[1].set_ylim(0, 1)


        plt.pause(0.001)
        axs[0].cla()



 25%|██▌       | 9150/36000 [00:18<00:54, 495.65it/s]


KeyboardInterrupt: 

In [6]:
# build a set of colors using num_vehicles
colors = plt.cm.viridis(np.linspace(0, 1, num_vehicles))

# make two subplots
fig, axs = plt.subplots(2)

for i in range(num_vehicles):

    axs[0].plot(data[:, i, 0], color='k', alpha=0.1)
    axs[0].set_title("Position")

    axs[1].plot(data[:, i, 1], color='k', alpha=0.1)
    axs[1].set_title("Velocity")

plt.show()



In [6]:
print(data.shape)

(1000, 200, 5)


In [44]:
# collapse data into a pandas dataframe where each row is timestep, vehicle_index, x, v, is_crashed, friction_true, friction_measurement
df = pd.DataFrame(data.reshape(-1, 5), columns=['x', 'v', 'is_crashed', 'friction_true', 'friction_measurement'])

# make a vehicle index column
df['vehicle_index'] = np.tile(np.arange(num_vehicles), max_timesteps)

# make a timestep column
df['timestep'] = np.repeat(np.arange(max_timesteps), num_vehicles)

print(df.head())




            x         v  is_crashed  friction_true  friction_measurement  \
0  192.195160  0.299993         0.0       0.800000              0.980482   
1  409.442578  0.299942         0.0       0.800000              0.340901   
2  486.492860  0.203600         0.0       0.207543              0.415506   
3  544.584432  0.203600         0.0       0.207543              0.347130   
4  560.101318  0.203600         0.0       0.207543              0.319018   

   vehicle_index  timestep  
0              0         0  
1              1         0  
2              2         0  
3              3         0  
4              4         0  


In [105]:

context = np.array(context)
# randomly select 10000 context points
# context = context[np.random.choice(context.shape[0], 10000, replace=False)]
print(context.shape)
# convert context into a pandas dataframe
df_context = pd.DataFrame(context.reshape(-1, 3), columns=['t', 'x', 'friction'])
# df_context['timestep'] = np.repeat(np.arange(max_timesteps), num_context_points)


# sns.scatterplot(data=df_context, x='t', y='x', hue='friction', palette='coolwarm', alpha=0.5, edgecolor=None, size=0.01)

from scipy.interpolate import griddata

# Preparing data for interpolation
# Creating mesh grid for t and x
grid_t, grid_x = np.meshgrid(
    np.linspace(df_context['t'].min(), df_context['t'].max(), 100),   # 100 points from min to max of t
    np.linspace(df_context['x'].min(), df_context['x'].max(), 100)   # 100 points from min to max of x
)

# Interpolating friction data onto the mesh grid
grid_friction = griddata(
    points=(df_context['t'], df_context['x']),
    values=df_context['friction'],
    xi=(grid_t, grid_x),
    method='linear'  # 'linear' interpolation; can also use 'nearest' or 'cubic'
)

# Plotting the contour plot
plt.figure(figsize=(10, 8))
contour = plt.contourf(grid_x, grid_t, grid_friction, levels=50, cmap='viridis')
plt.title('Filled Contour Plot of Friction')
plt.xlabel('x')
plt.ylabel('t')
plt.colorbar(contour)
plt.show()


(100000, 3)
