In [None]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
import matplotlib.patches as patches
from torch.utils.tensorboard import SummaryWriter

In [2]:
from sac.sac_torch_new import Agent
from env.car_2d_intersection import Car2DIntersection
from cbf.cbf_intersection import cbf_casadi

In [3]:
env = Car2DIntersection()
agent = Agent(input_dims=(env.observation_space.shape[0]-1,), env=env,  # ATTENTION: minus 1 for diy env
        n_actions=env.action_space.shape[0])
n_games = 10000

In [4]:
def disturbance_observer(env, x, x_hat, a):
    dt = env.dt
    x_delta = x_hat - x
    d_hat = -a * x_delta / (np.exp(a * dt) - 1)

    return d_hat

def state_predictor(env, x, x_hat, u, a):
    dt = env.dt
    x_delta = x_hat - x
    d_hat = -a * x_delta / (np.exp(a * dt) - 1)

    px, py, velocity, vx, vy, heading_angle, cos_theta, sin_theta = x
    acceleration, front_wheel_angle = u
    # beta = np.arctan(0.5 * np.tan(front_wheel_angle))
    # angle_velocity = velocity * np.sin(beta) 
    angle_velocity = 0.5 * velocity * np.tan(front_wheel_angle)

    # px, py, v, vx, vy, theta, cos, sin
    obs_dt = np.array([velocity*np.cos(heading_angle),
                      velocity*np.sin(heading_angle),
                      acceleration,
                      0,
                      0,
                      angle_velocity,
                      0,
                      0])
    x_ = x + (obs_dt + d_hat - a * x_delta) * dt
    x_[3:5] = np.array([np.cos(x_[-3])*x_[2],np.sin(x_[-3])*x_[2]])
    x_[-2:] = np.array([np.cos(x_[-3]),np.sin(x_[-3])])
    return x_

In [5]:
def save_trajectory_1(episode, obs_history, obstacles_history, sense_range, info, writer):
    """
    Generates and saves an animated GIF of a car's trajectory and surrounding obstacles.

    Parameters:
        episode (int): Episode number.
        obs_history (list): List of observations (positions) of the car.
        obstacles_history (list): List of obstacle positions for each frame.
        sense_range (float): Sensing range of the car.
        info (str): Additional info to include in the file name.
        writer (str): Writer to use for saving the GIF.
        output_dir (str): Directory to save the GIF.
    """
    fig, ax = plt.subplots()
    ax.set_xlim([-15, 15])
    ax.set_ylim([-15, 15])
    ax.set_aspect('equal', adjustable='box')
    plt.xlabel('X Coordinate')
    plt.ylabel('Y Coordinate')
    plt.title(f'Car Trajectory - Episode {episode}')
    plt.grid()

    line, = ax.plot([], [], marker='o', markersize=2, linestyle='-')
    obstacles_patches = [patches.Circle((obstacle[0], obstacle[1]), 0.4, fill=False, color='red') for obstacle in obstacles_history[0]]
    for patch in obstacles_patches:
        ax.add_patch(patch)

    # Adding lines and arcs for lanes and turns
    # Implementation details are assumed to be correct and are not repeated here for brevity

    current_pos, = ax.plot([], [], marker='o', markersize=5, color='green')
    car_circle = patches.Circle((0, 0), sense_range, fill=False, color='green', linestyle='--') if sense_range != np.inf else None
    if car_circle:
        ax.add_patch(car_circle)

    def update(frame):
        line.set_data([obs[0] for obs in obs_history[:frame+1]], [obs[1] for obs in obs_history[:frame+1]])
        current_pos.set_data([obs_history[frame][0]], [obs_history[frame][1]])
        if car_circle:
            car_circle.center = obs_history[frame][0], obs_history[frame][1]
        for patch, obstacle in zip(obstacles_patches, obstacles_history[frame]):
            patch.center = (obstacle[0], obstacle[1])
        return [line] + obstacles_patches

    anim = FuncAnimation(fig, update, frames=len(obs_history), interval=100, blit=True)
    gif_path = f'tmp/plot/epi_{episode}_{info}.gif'
    anim.save(gif_path, writer='pillow', fps=30)
    plt.close(fig)
    # writer.add_figure(f"Figure/{episode}", fig, episode)

In [6]:
def save_obs(writer,observation,observation_hat,i,step):
    writer.add_scalar(f"Observation/Episode_{i}/Step/x", observation[0], step)
    writer.add_scalar(f"Observation/Episode_{i}/Step/y", observation[1], step)
    writer.add_scalar(f"Observation/Episode_{i}/Step/v", observation[2], step)
    writer.add_scalar(f"Observation/Episode_{i}/Step/theta", observation[5], step)
    writer.add_scalar(f"Observation_hat/Episode_{i}/Step/x", observation_hat[0], step)
    writer.add_scalar(f"Observation_hat/Episode_{i}/Step/y", observation_hat[1], step)
    writer.add_scalar(f"Observation_hat/Episode_{i}/Step/v", observation_hat[2], step)
    writer.add_scalar(f"Observation_hat/Episode_{i}/Step/theta", observation_hat[5], step)

In [None]:
writer = SummaryWriter('tmp/runs')
best_score = env.reward_range[0]
score_history = []
global_step = 0
env.set_seed(4)

sense_range = np.inf
use_DOB = True
use_CBF = True
save_runs = False
real_world = True
high_order_model = True

for i in range(n_games):
# for i in range(1):
    observation,_ = env.reset()  # px, py, v, vx, vy, theta, cos, sin
    modified_observation = np.delete(observation, -3)  # delete theta for better training
    observation_hat = np.array(observation)
    modified_observation_hat = np.delete(observation_hat, -3)
    dob_param = 1e-2
    done = False
    score = 0
    info = {}
    step = 0
    reward = 0
    observation_history = []
    obstacles_history = []
    observation_history.append(np.array(observation))
    obstacles_history.append(np.array(env.other_cars))
    last_action = np.zeros(env.action_space.shape[0])
    action_max = np.zeros(env.action_space.shape[0])
    while not done and step < 500:
        # early stop
        if np.any(abs(observation[:2])>20):
            break
        if save_runs:
            save_obs(writer,observation,observation_hat,i,step)
        action = agent.choose_action(modified_observation_hat)
        d_hat = np.zeros(observation.shape)
        if use_DOB:
            d_hat = disturbance_observer(env, observation, observation_hat, dob_param)            
        if use_CBF:
            action = np.array(cbf_casadi(env, observation, action, sense_range, d_hat))

        observation_hat = state_predictor(env, observation, observation_hat, action, dob_param)
        observation_, reward, done,_, info = env.step(action, last_action, real_world, high_order_model)
        modified_observation_ = np.delete(observation_, -3)
        modified_observation_hat_ = np.delete(observation_hat, -3)
        score += reward
        
        if use_DOB:
            agent.remember(modified_observation_hat, action, reward, modified_observation_hat_, done)
        else:
            agent.remember(modified_observation, action, reward, modified_observation_, done)
        
        # print('observation', observation, 'modified_observation_hat', modified_observation_hat,'modified_observation_hat_', modified_observation_hat_)
        
        agent.learn(i, step, global_step, writer, save_runs)
        observation = observation_
        modified_observation = modified_observation_
        modified_observation_hat = modified_observation_hat_
        last_action = action
        # print('observation: ',observation)
        observation_history.append(np.array(observation))
        obstacles_history.append(np.array(env.other_cars))
        step += 1
        global_step += 1
    score_history.append(score)
    avg_score = np.mean(score_history[-100:])

    if save_runs:
        writer.add_scalar('avg_score/Episode', avg_score, i)

    np.set_printoptions(formatter={'float': '{:0.2f}'.format})
    print('episode', i, ', after', step, 'steps: ', info, ', last state: ', observation[[0,1,2,5]], ', with reward: ', np.array([reward]))
    
    if info == {'goal_reached'}:
    # if any('collision' in element for element in info):
    # if info == {'collision'} or info == {'goal_reached'}:
        save_trajectory_1(i,observation_history,obstacles_history,sense_range,info,writer)
        # break

writer.close()

In [None]:
agent.save_models()

In [None]:
agent.load_models()