In [None]:
import numpy as np
import dill
import matplotlib.pyplot as plt
import matplotlib.animation as animation
from matplotlib.collections import LineCollection
import matplotlib.patches as patches
import auxiliaries as aux

In [None]:
"""Define auxiliary functions and classes for the evaluation."""
class EmptyAgent:
    def __init__(self):
        pass

def load_data(path):
    import dill
    with open(path, 'rb') as f:
        data = dill.load(f)
        
    print(f"Data loaded from {path}.")
    print(f"Number of agents: {data['MAS_parameters']['num_agents']}")
    print(f"MAS type: {data['MAS_parameters']['MAS_type']}")
    
    agents = []
    for agentid in data['agents']:
        agent = EmptyAgent()
        agent.id = agentid
        agent.cl_x = data['agents'][agentid]['cl_x']
        agent.cl_u = data['agents'][agentid]['cl_u']
        agents.append(agent)
        
    return data, agents

def extract_keys(d, parent_key=""):
    keys = set()
    
    if isinstance(d, dict):
        for key, value in d.items():
            full_key = f"{parent_key}.{key}" if parent_key else key
            keys.add(full_key)
            keys.update(extract_keys(value, full_key))
    
    return keys

# Compute the values for the boundaries.
horizontal_length = data['cooperative_task']['horizontal_length']
vertical_length = data['cooperative_task']['vertical_length']
vertical_length_tightened = data['cooperative_task']['vertical_length_tightened']
exponent = data['cooperative_task']['exponent']
def lower_x(x, y, n, horizontal, vertical):
    if n % 2 != 0:
        raise ValueError("n must be even")
    return (x / horizontal)**n + ((y + vertical) / 1)**n - 1

def upper_x(x, y, n, horizontal, vertical):
    if n % 2 != 0:
        raise ValueError("n must be even")
    return (x / horizontal)**n + ((y - vertical) / 1)**n - 1

x1 = np.linspace(-horizontal_length-1, horizontal_length+1, 400)
x2 = np.linspace(-3, 3, 400)
X1, X2 = np.meshgrid(x1, x2)

Blx = lower_x(X1, X2, exponent, horizontal_length, vertical_length)
Bux = upper_x(X1, X2, exponent, horizontal_length, vertical_length)
Bly = lower_x(X1, X2, exponent, horizontal_length, vertical_length_tightened)
Buy = upper_x(X1, X2, exponent, horizontal_length, vertical_length_tightened)

In [None]:
"""Define colours for plotting."""
colours = [
    "#0072B2",  # blue
    "#D55E00",  # orange
    "#009E73",  # green
    "#CC79A7",  # magenta
    "#56B4E9",  # light blue
    "#E69F00",  # yellow-orange
    "#B22222",  # red
    "#6A3D9A",  # purple
    "#117733",  # teal green
    "#88CCEE",  # cyan
    "#DDCC77",  # muted yellow-orange
]

In [None]:
path = "./data/narrow_path_data.dill"  # Specify the path to your data file.
animate = False                        # Set to True if you want to generate an animation (requires ffmpeg).

data, agents = load_data(path)  # Load the data from the specified file.
# Print the simulation parameters.
for key in data['sim_pars']:  
    print(f"{key}:", data['sim_pars'][key])

In [None]:
"""Extract and transform data."""
max_sim_time = data['sim_data']['max_sim_time']

if type(data['sim_data']['cooperative_cost']) is list:
    data['sim_data']['cooperative_cost'] = np.vstack(data['sim_data']['cooperative_cost']).flatten()
    data['sim_data']['tracking_cost'] = np.vstack(data['sim_data']['tracking_cost']).flatten()
    data['sim_data']['change_cost'] = np.vstack(data['sim_data']['change_cost']).flatten()
    data['sim_data']['J'] = np.vstack(data['sim_data']['J']).flatten()
    
for agent in agents:
    if type(agent.cl_x) == list:
        agent.cl_x = np.hstack(agent.cl_x)
        agent.cl_u = np.hstack(agent.cl_u)
    

In [None]:
"""Value function"""
# Plot from t1 to t2.
t1 = 0
t2 = agents[0].cl_x.shape[1]-1

# Draw the evolution in state space.
fig_V, ax_V = plt.subplots(figsize=(10, 4), num='state evolution')

stop_time = data['sim_data']['cooperative_cost'][t1:t2].shape[0]
ax_V.plot(range(t1, min(t2, stop_time)), data['sim_data']['cooperative_cost'][t1:t2], label='cooperative', color=colours[0])
ax_V.plot(range(t1, min(t2, stop_time)), data['sim_data']['tracking_cost'][t1:t2], label='tracking', color=colours[1])
ax_V.plot(range(max(t1,1), min(t2, stop_time)), data['sim_data']['change_cost'][max(t1,1):t2], label='change', color=colours[2])
ax_V.plot(range(t1, min(t2, stop_time)), data['sim_data']['J'][t1:t2], '--', label='J', color=colours[3])
    
ax_V.set_xlabel('time steps')
ax_V.set_title(f'Value function over time')
ax_V.grid(True)
ax_V.legend()

# Set the y-axis to logarithmic scale.
# ax_V.set_yscale('log')

plt.show()

print(f'Value function difference between the first and last time step: {data["sim_data"]["J"][-1] - data["sim_data"]["J"][0]}')
print(f'Value function at start: {data["sim_data"]["J"][0]}')
print(f'Value function at stop:  {data["sim_data"]["J"][-1]}')
print(f'Cooperation cost at stop: {data["sim_data"]["cooperative_cost"][-1]}')
print(f'Tracking cost at stop: {data["sim_data"]["tracking_cost"][-1]}')

In [None]:
"""2D position"""
# Plot from t1 to t2.
t1 = 0#max_sim_time+1-max(2*T, 5)
t2 = max_sim_time+1
step = 1

# Select a feasible start time (the end time is controlled automatically).
t1 = min(t1, max_sim_time+1)

# Draw the evolution in state space:
fig_cl, ax_cl = plt.subplots(figsize=(10, 5), num='state evolution')

for i, agent in enumerate(agents):
    cl_x = agent.cl_x
    
    t2 = min(t2, cl_x.shape[1]-1)
    
    ax_cl.plot(cl_x[0,:], cl_x[1,:], color=colours[i])  # Plot the 2D trajectory of the agent.
    ax_cl.plot(cl_x[0,t1], cl_x[1,t1], color=colours[i], marker='o', markersize=6)  # Mark the initial state with a circle.
    ax_cl.plot(cl_x[0,t2], cl_x[1,t2], color=colours[i], marker='x', markersize=6)  # Mark the final state with a cross.

    # quiver_tikz = []
    # vel_norm = 2
    # colour = 'vivid_blue' if i == 0 else 'tomato_red' if i == 1 else 'black'
    # for j in range(t1, t2, step):
    #     x1, y1 = cl_x[0, j], cl_x[1, j]
    #     vx, vy = cl_x[2, j], cl_x[3, j]
    #     x2, y2 = x1 + 0.7 * vx, y1 + 0.7 * vy  # Scale factor for velocity
    #     velocity_mag = np.sqrt(vx**2 + vy**2)  # Compute velocity magnitude
    #     velocity_scale = 0.1 + 2 * (velocity_mag / vel_norm)
    #     quiver_tikz.append(f"\\draw [{colour}, -{{Triangle[scale={velocity_scale}]}}] (axis cs:{x1},{y1}) -- (axis cs:{x2},{y2});")

    # with open(f"quiver_arrows_{agent.id}.tex", "w") as f:
    #     f.write("\n".join(quiver_tikz))
        
    # trajectory_table = "x y\n"  # Table header for LaTeX (pgfplots requires column names)

    # for j in range(t1, t2, step):
    #     trajectory_table += f"{cl_x[0, j]} {cl_x[1, j]}\n"

    # # Write to file.
    # with open(f"position_{agent.id}.tex", "w") as f:
    #     f.write(trajectory_table)
        
ax_cl.set_xlabel('$x_1$')
ax_cl.set_ylabel('$x_2$')
ax_cl.contour(X1, X2, Blx, levels=[0], colors='black', linewidths=1)  # Plot the boundary for the narrow path.
ax_cl.contour(X1, X2, Bux, levels=[0], colors='black', linewidths=1)  # Plot the boundary for the narrow path.
ax_cl.contour(X1, X2, Bly, levels=[0], colors='gray', linewidths=1, linestyles='--')  # Plot the tightened boundary constraints for the artificial cooperation reference trajectory.
ax_cl.contour(X1, X2, Buy, levels=[0], colors='gray', linewidths=1, linestyles='--')  # Plot the tightened boundary constraints for the artificial cooperation reference trajectory.
ax_cl.grid()

ax_cl.set_xlim(-20.5, 20.5)
ax_cl.set_ylim(-0.5, 0.5)

plt.show()

In [None]:
"""All states and inputs"""
# Define time range
t1 = 0
t2_state = max_sim_time + 1
t2_input = max_sim_time

# Number of states and inputs
num_states = 4
num_inputs = 2

# Agents to plot
agents2plot = agents[:]

# Select feasible start time
t1 = min(t1, max_sim_time + 1)

# Create a figure with multiple subplots.
fig, axes = plt.subplots(3, 2, figsize=(12, 12), num='state and input evolution')

## --- Plot all states ---
for idx_state in range(num_states):
    ax = axes[idx_state // 2, idx_state % 2]
    title_state = f'Closed-loop state $x_{idx_state+1}$'

    for i, agent in enumerate(agents):
        if agent not in agents2plot:
            continue
        tf = min(t2_state, agent.cl_x.shape[1] - 1)
        ax.plot(range(t1, tf+1), agent.cl_x[idx_state, t1:tf+1], 
                color=colours[i], label=f'{agent.id}_x{idx_state+1}', markersize=0, linewidth=2, marker='o')

    if np.linalg.norm(ax.get_ylim()) < 1e-8:
        ax.set_ylim(-0.1, 0.1)
        
    ax.grid()
    ax.legend()
    ax.set_title(title_state)
    ax.set_xlabel('time steps')
    ax.set_ylabel(f'$x_{idx_state+1}$')

## --- Plot all inputs ---
for idx_input in range(num_inputs):
    ax = axes[-1, idx_input]
    title_input = f'Closed-loop input $u_{idx_input+1}$'

    for i, agent in enumerate(agents):
        if agent not in agents2plot:
            continue
        tf = min(t2_input, agent.cl_u.shape[1] - 1)
        ax.plot(range(t1, tf+1), agent.cl_u[idx_input, t1:tf+1], 
                color=colours[i], label=f'{agent.id}_u{idx_input+1}', markersize=0, linewidth=2, marker='o')

    if np.linalg.norm(ax.get_ylim()) < 1e-8:
        ax.set_ylim(-0.1, 0.1)
        
    ax.grid()
    ax.legend()
    ax.set_title(title_input)
    ax.set_xlabel('time steps')
    ax.set_ylabel(f'$u_{idx_input+1}$')

# Adjust the layout and draw the plot.
plt.tight_layout()
plt.show()


In [None]:
"""Collision"""
# Plot from t1 to t2.
t1 = 0 
t2 = max_sim_time + 1

fig_d, axes = plt.subplots(1, 2, figsize=(12, 6), num='distance', sharex=True)

ax_d = axes[0]  # Original plot
ax_zoom = axes[1]  # Zoomed-in plot

considered_pairs = {}
for i, agent in enumerate(agents):
    a1 = agent
    tf = min(t2_state, a1.cl_x.shape[1] - 1)
    for a2 in agents:
        if a2 == a1:
            continue
        if (a2.id, a1.id) in considered_pairs:
            continue
        else:
            considered_pairs[(a1.id, a2.id)] = True

        distances = []
        for t in range(t1, tf + 1):
            distances.append(np.linalg.norm(a1.cl_x[0:2, t] - a2.cl_x[0:2, t]))

        # Plot on both axes.
        for ax in [ax_d, ax_zoom]:
            ax.plot(range(t1, tf + 1), distances, color=colours[i],
                    label=f'$\\Vert z_{{{a1.id[1:]}}} - z_{{{a2.id[1:]}}} \\Vert$',
                    markersize=0, linewidth=2, marker='o')

# Plot collision threshold on both axes.
for ax in [ax_d, ax_zoom]:
    ax.plot(range(t1, tf + 1),
            [data['MAS_parameters']['collision_distance']] * len(range(t1, tf + 1)),
            color='black', label='minimum distance', linewidth=2, linestyle='--')

# Labels, titles, and grid
ax_d.set_xlabel('$t$')
ax_d.set_ylabel(f"distance between agents' positions")
ax_d.set_title(f'Distance from t = {t1} to t = {t2}')
ax_d.grid(True)
ax_d.legend()

# Zoomed-in plot settings
y_min = data['MAS_parameters']['collision_distance'] - 0.05
y_max = data['MAS_parameters']['collision_distance'] + 0.05
ax_zoom.set_ylim(y_min, y_max)
ax_zoom.set_xlabel('$t$')
ax_zoom.set_title('Zoomed-in on collision threshold')
ax_zoom.grid(True)

plt.tight_layout()
plt.show()

In [None]:
"""Animation"""
if animate:
    # Define the figure and axis
    fig, ax = plt.subplots(figsize=(10, 10))
    ax.set_xlim(-20.5, 20.5)
    ax.set_ylim(-0.5, 0.5)
    ax.grid()
    ax.set_xlabel('$x_1$')
    ax.set_ylabel('$x_2$')

    dark_rgb_colours = [
        (0.00, 0.45, 0.70),
        (0.83, 0.37, 0.00), 
        (0.00, 0.62, 0.45), 
        (0.94, 0.89, 0.26), 
        (0.80, 0.47, 0.65), 
        (0.34, 0.71, 0.91),
        (0.90, 0.56, 0.00), 
        (0.60, 0.60, 0.60),  
        (0.70, 0.13, 0.13),  
        (0.42, 0.24, 0.60), 
        (0.07, 0.45, 0.20),  
        (0.53, 0.80, 0.93), 
        (0.87, 0.80, 0.47), 
    ]

    ax.contour(X1, X2, Blx, levels=[0], colors='black', linewidths=1)
    ax.contour(X1, X2, Bux, levels=[0], colors='black', linewidths=1)

    # Store the maximum time index
    t1 = 0
    t2 = agents[0].cl_x.shape[1] - 1
    step = 1
    history_length = 30  # Number of previous steps to fade out

    # Plot elements to be updated
    agent_plots = []  # Stores the scatter objects
    trail_collections = []  # Stores the faded trail collections

    # Initialize the plot elements
    for i, agent in enumerate(agents):
        # Current position marker
        agent_plot, = ax.plot([], [], color=dark_rgb_colours[i], marker='o', markersize=6, linestyle='None')
        agent_plots.append(agent_plot)

        # Create a LineCollection for the fading trail with the correct colour
        trail_collection = LineCollection([], linewidth=1.5, colors=[dark_rgb_colours[i]])
        trail_collections.append(trail_collection)
        ax.add_collection(trail_collection)

    # Function to update the animation frame
    def update(frame):
        for i, agent in enumerate(agents):
            # Get the current and past positions
            current_x = np.array([agent.cl_x[0, frame]])  # Convert to sequence (array)
            current_y = np.array([agent.cl_x[1, frame]])

            past_start = max(0, frame - history_length)
            trail_x = agent.cl_x[0, past_start:frame+1]
            trail_y = agent.cl_x[1, past_start:frame+1]

            # Update the agent's position (scatter)
            agent_plots[i].set_data(current_x, current_y)

            # Create faded segments only if there are enough points
            if len(trail_x) > 1:
                segments = [((trail_x[j], trail_y[j]), (trail_x[j+1], trail_y[j+1])) for j in range(len(trail_x)-1)]
                
                # Generate alpha values for fading effect
                alpha_values = np.linspace(0.1, 1, len(segments))  # Fading from transparent (0.1) to opaque (1)

                # Convert agent color to RGBA and apply fading alpha
                faded_colors = [(dark_rgb_colours[i][0], dark_rgb_colours[i][1], dark_rgb_colours[i][2], alpha) for alpha in alpha_values]

                # Update the LineCollection with new segments and colors
                trail_collections[i].set_segments(segments)
                trail_collections[i].set_color(faded_colors)  # Set individual segment colors
            else:
                trail_collections[i].set_segments([])  # Clear trail if no valid segments

        return agent_plots + trail_collections


    # Create animation (disable blitting for compatibility)
    ani = animation.FuncAnimation(fig, update, frames=range(t1, t2, step), interval=100, blit=False)

    filename = "./data/" + "narrow_path.mp4"
    ani.save(filename, writer="ffmpeg", dpi=300)