# Reinforcement learning for robotics
This is the initial notebook that you will need to fill out through the semester. 

### Setup
First let's make sure that everything is working properly:

In [None]:
import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
import os
import pickle
import torch.nn as nn
import torch
import torch.distributions as dist
from torch.distributions import Normal
from IPython.display import clear_output
assert gym.__version__=='1.0.0',"You need a newer version of gym"
print("Everything seems good")

### Outline
As teaching a robot how to walk is tricky, you will first test your algorithm on a much simpler task: Balancing an inverted pendulum.
This week, you will:
- Setup a first enviromnent
- Run a random policy
- Modify the distribution of this policy

In [None]:
#create the environment
envname="InvertedPendulum-v5" 
env = gym.make_vec(envname,num_envs=1,render_mode='rgb_array',vectorization_mode='sync')

This environment is called InvertedPendulum, and is running in the Mujoco simulator. You can check what it can do by reading the [documentation](https://gymnasium.farama.org/environments/mujoco/inverted_pendulum/)

Your first task is to find what are the state space and the action space. Additionally, answer the following questions:
- What is the dimension of the state space?
- What is the dimension of the action space?
- How could you get these dimentions directly in your code?
- When your ran these commands, you should have gotten an array of dimension 2. What does each of the dimension represent?

_Answers_: - What is the dimension of the state space? 4
- What is the dimension of the action space? 1   

In [None]:
obs, info = env.reset()
print("State space dimension:", env.single_observation_space.shape)
print("Action space dimension:", env.single_action_space.shape)
print("Action space low:", env.single_action_space.low)
print("Action space high:", env.single_action_space.high)

state_dim = env.single_observation_space.shape[0]
action_dim = env.single_action_space.shape[0]
num_params = state_dim * action_dim

print("Number of parameters (linear model without bias):", num_params)

### Visualization
Gymnasium is providing a visualisation function, let's try it

In [None]:
def render_notebook(env,id,title=""):
    clear_output(wait=True)
    plt.imshow(env.render()[id])
    plt.axis('off')
    plt.tight_layout()
    plt.title(title)
    plt.show()
    plt.pause(0.1)

In [None]:
obs, info = env.reset()
render_notebook(env, 0, "Example state:")


Nice right?

### Random policy
Now you will try to implement a random policy: Uniformely chose a random action at each time step.

In [None]:
terminated = [False]
env.reset()
ret=0
while not all(terminated):
    action = np.random.uniform(-3, 3, size= env.action_space.shape)
    _,reward, terminated,truncated,info = env.step(action)
    terminated = terminated|truncated
    ret+=reward
    render_notebook(env,0,f"{ret=}")

### Other distribution
This policy is quite terrible, so let's try to improve it by using a gaussian distribution instead. Test several standard deviations and see which one works best

In [None]:

terminated = [False]
std = 0.1
env.reset()
ret=0
while not all(terminated):
    action = np.random.normal(0, std, size=env.action_space.shape)
    _,reward, terminated,truncated,info = env.step(action)
    terminated = terminated|truncated
    ret+=reward
    render_notebook(env,0,f"{ret=}")

This concludes this first part of the project, next week we will try to implement a feedback controler in this system. 
In the meantime, feel free to get more confortable with the documentation of gymnasium

### Feedback Policy

FeedBack Policy u=-Kx avec K a determiner

In [None]:
K = np.array([[1.0 , 2.0, 3.0, 4.0]]) 
terminated = [False]
env.reset()
ret=0
while not all(terminated):
    action = np.dot(K, obs.T)
    obs,reward, terminated,truncated,info = env.step(action)
    terminated = terminated|truncated
    ret+=reward
    render_notebook(env,0,f"{ret=}")
    

In [None]:
def simulate(K):
    obs, info = env.reset()
    terminated = [False]
    ret = 0
    while not all(terminated):
        action = np.dot(K, obs[0])
        obs, reward, terminated, truncated, info = env.step([action])
        terminated = terminated | truncated
        ret += reward
    return ret

bret = -np.inf  
bK = None
num = 100000
for i in range(num):
    K_cand = np.random.uniform(low=-5, high=5, size=(
                                                     env.action_space.shape[1],
                                                     env.single_observation_space.shape[0]))
    cret = simulate(K_cand)
    if cret > bret:
        bret = cret
        bK = K_cand 
    if np.isclose(cret.item(), 1000.0):
        print(f"Iteration {i}: reward = {cret}, best reward = {bret}")
        break
    print(f"Iteration {i}: reward = {cret}, best reward = {bret}")

print(f"Best K found : {bK}", f"best Reward = {bret}")


In [None]:
obs, info = env.reset()
terminated = [False]
ret = 0

while not all(terminated):
    action = np.dot(bK, obs.T)
    obs, reward, terminated, truncated, info = env.step(action)
    print(obs)
    terminated = terminated | truncated
    ret += reward
    render_notebook(env, 0, f"{ret=}")

### Reinforcement learning (RL) Week 3 (test) Flo

In [None]:
#policy : a= K s   => deterministic, linear, parameterized policy 
#grad compute with finite difference Not WORKING

def episode(K):
    obs, info = env.reset()
    terminated = [False]
    ret = 0.0
    while not all(terminated):
        action = -np.dot(K, obs[0])
        action = np.clip(action, -3.0, 3.0)
        obs, reward, terminated, truncated, info = env.step([action])
        terminated = terminated | truncated
        ret += float(reward[0]) 
        
    return ret


def compute_J(K, n_episodes=100):
    total = 0.0
    for i in range(n_episodes):
        total += episode(K)
    return total / n_episodes


def compute_gradJ(K, h=0.01):
    gradJ = np.zeros_like(K)
    for i in range(K.shape[0]):
        for j in range(K.shape[1]):
            K_plus = K.copy()
            K_minus = K.copy()

            K_plus[i, j]  += h
            K_minus[i, j] -= h

            r_plus  = compute_J(K_plus)
            r_minus = compute_J(K_minus)

            gradJ[i, j] = (r_plus - r_minus) / (2.0 * h)

    return gradJ


def training(a, iterations, h):
    K = np.random.uniform(low=-5, high=5, size=(env.action_space.shape[1],env.single_observation_space.shape[0]))
    #K = np.array([[-0.0, -4, -1, -2.3]]).reshape(env.action_space.shape[1],env.single_observation_space.shape[0])
    best_reward = -np.inf
    best_K = K.copy()
    iteration_reached = None
    target = 1000.0
    for i in range(iterations):
        gradJ = compute_gradJ(K, h=h)
        K = K + a * gradJ
        cret = compute_J(K)
        if cret > best_reward:
            best_reward = cret
            best_K = K.copy()
        print(f"Iteration {i}: reward = {cret:.2f}, best reward = {best_reward:.2f}")
        print(K)
        print(gradJ)
        if cret >= target:
            iteration_reached = i
            print(f">>> Cible de reward {target} atteinte à l'itération {i}.")
            break
    print(f"Best K found: {best_K}")
    print(f"Best reward: {best_reward:.2f}")

    return best_K, best_reward, iteration_reached



training(a=0.001, iterations=100000, h=0.00001)

In [None]:
##RL linéaire
K = np.random.uniform(low=-3, high=3, size=(env.action_space.shape[1],env.single_observation_space.shape[0]))
gamma = 0.99
alpha = 0.01 
sigma = 0.5
bret = -np.inf  
bK = None

def policy(s):
    return np.random.normal(np.dot(K,s),sigma)

def compute_log_gradient(s, a):
    mu = np.dot(K, s)
    return ((a - mu) * s / sigma**2) 

for episode in range(5000):
    obs, info = env.reset()
    terminated = [False]
    ret = 0.0
    obss,actions,rewards=[],[],[]
    while not all(terminated):
        s = obs[0]
        action = policy(s)
        action = np.clip(action, -3.0, 3.0)
        next_obs, reward, terminated, truncated, info = env.step([action])
        terminated = terminated | truncated
        obss.append(s)
        actions.append(action)  
        rewards.append(reward)
        ret += reward
        obs = next_obs

    if ret >= bret:
        bret = ret 
        bK = K.copy()

    returns = np.zeros(len(rewards))
    G = 0
    for t in reversed(range(len(rewards))):
        G = rewards[t] + gamma * G
        returns[t] = G
    
    alpha = 0.0005 / (1 + 0.00005 * episode)
    
    for t in range(len(obss)):
        grad = compute_log_gradient(obss[t], actions[t])  
        K += alpha * grad * returns[t]  
    print(f"Episode {episode}, Total Reward: {ret}, Best Reward: {bret}, K:{K}" )
    
print(bK)

In [None]:
obs, info = env.reset()
terminated = [False]
ret = 0

while not all(terminated):
    action = np.dot(bK, obs.T)
    obs, reward, terminated, truncated, info = env.step(action)
    terminated = terminated | truncated
    ret += reward
    render_notebook(env, 0, f"{ret=}")


In [None]:
obs.T.shape

In [None]:
bK.shape

##NON linear Policy using tanh version qui fonctionne

In [None]:
K = np.random.uniform(low=-1, high=1, size=(env.action_space.shape[1],env.single_observation_space.shape[0]))
gamma = 0.99
sigma = 0.5
bret = -np.inf  
bK = None
alpha = 0.000005
patience = 200
cc=0
maxret=1000


def policy(s):
    return np.random.normal(env.single_action_space.high * np.tanh(np.dot(K, s)), sigma)

def compute_log_gradient(s, a):
    z  = np.dot(K, s)         
    mu = env.single_action_space.high * np.tanh(z)       #env.single_action_space.high = 3 for InvertedPendulum-v5
    return np.outer(((a - mu) * env.single_action_space.high * (1 - np.tanh(z)**2)) / sigma**2,s)
for episode in range(100000):
    obs, info = env.reset()
    terminated = [False]
    ret = 0.0
    obss,actions,rewards=[],[],[]
    while not all(terminated):
        s = obs[0]
        action = policy(s)
        action = np.clip(action, env.single_action_space.low, env.single_action_space.high)
        next_obs, reward, terminated, truncated, info = env.step([action])
        terminated = terminated | truncated
        obss.append(s)
        actions.append(action)  
        rewards.append(reward)
        ret += reward
        obs = next_obs

    if ret >= bret:
        bret = ret 
        bK = K.copy()

    returns = np.zeros(len(rewards))
    G = 0
    for t in reversed(range(len(rewards))):
        G = rewards[t] + gamma * G
        returns[t] = G
    
    for t in range(len(obss)):
        grad = compute_log_gradient(obss[t], actions[t])  
        K += alpha * grad * returns[t]
    print(f"Episode {episode}, Total Reward: {ret}, Best Reward: {bret}, K:{K}" )
    if ret>=maxret:
        cc+=1
    else:
        cc=0
    
    if cc>=patience:
        print(f"Early stopping: {patience} consecutive episodes with reward >= {maxret}")
        break
    

In [None]:
obs, info = env.reset()
terminated = [False]
ret = 0

while not all(terminated):
    action = np.dot(K, obs.T)
    obs, reward, terminated, truncated, info = env.step(action)
    terminated = terminated | truncated
    ret += reward
    render_notebook(env, 0, f"{ret=}")

### Reinforcement learning (RL) Week 3 (test) Selim

 One run

In [None]:
state_dim = env.observation_space.shape[1]
action_dim = env.action_space.shape[1]

# Initialize policy parameters
theta = np.random.randn(action_dim,state_dim)*0.1
best_reward = -np.inf
best_theta = None
action_std=0.25
num_episodes=5000
gamma=0.99
time = 0

def policy(state):
    mean = np.dot(theta,state)
    action=np.random.normal(mean, action_std)
    return action

def compute_log_gradient(state, action):
    mean = np.dot(theta, state)
    log_gradient = np.outer((action - mean), state) / action_std**2
    return log_gradient

gradient_norms = []
reward_each_episode=[]
for episode in range(num_episodes):
    obs, info = env.reset()
    state = obs[0]
    trajectory = [] 
    total_reward = 0
    done = False
    while not done:
        action = policy(state)
        print(action)
        next_state, reward, done, truncated, _ = env.step([action])
        done = done or truncated
        trajectory.append((state, action, reward))  # Store in trajectory
        total_reward += reward
        state = next_state[0]

    # Track best performance
    if total_reward == best_reward:
        time += 1  
    else:
        time = 0  

    if time >= 20:  # Stop if no improvement for 20 episodes
        break
    
    if total_reward > best_reward:  # Update best_reward only if strictly better
        best_reward = total_reward
        best_theta = theta  

    returns = np.zeros(len(trajectory))
    G = 0
    for t in reversed(range(len(trajectory))):
        G = trajectory[t][2] + gamma * G 
        returns[t] = G
    
    alpha = 0.001
    for t, (state, action, _) in enumerate(trajectory):
        grad = compute_log_gradient(state, action)
        theta += alpha * grad * returns[t]
        
        grad_norm = 0
        grad_norm = np.linalg.norm(grad) 
        gradient_norms.append(grad_norm)  
    print(theta)     
    reward_each_episode.append(total_reward)
    print(f"Episode {episode}, Total Reward: {total_reward}, Best Reward: {best_reward}")

theta = best_theta
np.save('trained_theta.npy', theta)

In [None]:
# Load trained parameters
theta = np.load('trained_theta.npy')
obs, info = env.reset()
terminated = [False]
ret = 0
while not all(terminated):
    state=obs[0]
    action = np.clip(np.dot(theta,state), env.action_space.low, env.action_space.high)  # Get action from the trained policy
    obs, reward, terminated, truncated, info = env.step(action)
    terminated = terminated | truncated
    ret += reward
    render_notebook(env, 0, f"{ret=}")

Alpha plot

In [None]:
state_dim = env.observation_space.shape[1]
action_dim = env.action_space.shape[1]

action_std=0.25
num_episodes=5000
gamma=0.99
alphas=[0.00001,0.0001,0.001]
number_of_runs=10

def policy(state):
    mean = np.dot(theta,state)
    action=np.random.normal(mean, action_std)
    return action

def compute_log_gradient(state, action): 
    mean = np.dot(theta, state)
    log_gradient = np.outer((action - mean), state) / action_std**2
    return log_gradient

results_file = "saved_results_alpha.pkl"
overwrite_existing = True  # set to True if you want to force rerun

# Automatically load if it exists and we don't want to overwrite
if os.path.exists(results_file) and not overwrite_existing:
    print("Loading previously saved results...")
    with open(results_file, "rb") as f:
        results = pickle.load(f)
else:
    print("Starting new experiment. Previous results will be replaced.")
    results = {alpha: [] for alpha in alphas}

    for alpha in alphas:
        print(f"\n=== Testing alpha = {alpha} ===")
        run_rewards = []
        for run in range(number_of_runs):
            print(f"\n  -> Run {run + 1}/{number_of_runs} (alpha={alpha})")
            time = 0
            theta = np.random.randn(action_dim, state_dim) * 0.1
            best_reward = -np.inf
            best_theta = None
            reward_each_episode = []

            for episode in range(num_episodes):
                obs, info = env.reset()
                state = obs[0]
                trajectory = []
                total_reward = 0
                done = False
                while not done:
                    action = policy(state)
                    next_state, reward, done, truncated, _ = env.step([action])
                    done = done or truncated
                    trajectory.append((state, action, reward))
                    total_reward += reward
                    state = next_state[0]

                if total_reward == best_reward:
                    time += 1
                else:
                    time = 0

                if time >= 100:
                    break

                if total_reward > best_reward:
                    best_reward = total_reward
                    best_theta = theta

                returns = np.zeros(len(trajectory))
                G = 0
                for t in reversed(range(len(trajectory))):
                    G = trajectory[t][2] + gamma * G
                    returns[t] = G

                for t, (state, action, _) in enumerate(trajectory):
                    grad = compute_log_gradient(state, action)
                    theta += alpha * grad * returns[t]

                reward_each_episode.append(total_reward)

            run_rewards.append(reward_each_episode)
            print(f"  -> Run {run + 1} finished. Best Reward: {best_reward}")

        results[alpha] = run_rewards
        print(f"=== Finished testing alpha = {alpha} ===\n")

    # Save results
    with open(results_file, "wb") as f:
        pickle.dump(results, f)
    print(f"Results saved to {results_file}")


In [None]:
# Load saved results from file
with open("saved_results_alpha.pkl", "rb") as f:
        results = pickle.load(f)

# Now plot from loaded data
for alpha in results:
    if not results.get(alpha) or len(results[alpha]) == 0:
        print(f"Skipping action_std={alpha} (no runs available)")
        continue

    plt.figure(figsize=(8, 5))

    # Find the longest and shortest run lengths
    max_len = max(len(run) for run in results[alpha])
    min_len = min(len(run) for run in results[alpha])
    fastest_run_idx = [i for i, run in enumerate(results[alpha]) if len(run) == min_len][0]

    padded_runs = []
    for run_rewards in results[alpha]:
        run_rewards = np.array(run_rewards).flatten()
        pad_length = max_len - len(run_rewards)
        if pad_length > 0:
            padded = np.concatenate([run_rewards, np.full(pad_length, 1000.0)])
        else:
            padded = run_rewards
        padded_runs.append(padded)

    all_runs = np.stack(padded_runs)

    # Compute mean and std
    mean_rewards = np.mean(all_runs,axis=0)
    lower_percentile = np.percentile(all_runs, 5, axis=0)
    upper_percentile = np.percentile(all_runs,95, axis=0)

    episodes = np.arange(len(mean_rewards))
    plt.plot(episodes, mean_rewards, label=f"Mean Reward (alpha={alpha})", color="green")
    plt.fill_between(episodes, lower_percentile, upper_percentile, alpha=0.2, color="green", label="5–95 percentile")
    # Mark the fastest run
    plt.axvline(x=min_len, color="red", linestyle="--", linewidth=1.5, label=f"Fastest run (ep. {min_len})")

    plt.xlabel("Episodes")
    plt.ylabel("Reward")
    plt.title(f"Mean Learning Curve for alpha={alpha}")
    plt.grid(True)
    plt.legend()
    plt.savefig(f"mean_curve_alpha_{alpha}.png")
    plt.show()

Action std plot

In [None]:
state_dim = env.observation_space.shape[1]
action_dim = env.action_space.shape[1]

action_stds=[0.2,0.3,0.4]
num_episodes=5000
gamma=0.99
alpha=0.0001
number_of_runs=10

def policy(state):
    mean = np.dot(theta,state)
    action=np.random.normal(mean, action_std)
    return action

def compute_log_gradient(state, action): 
    mean = np.dot(theta, state)
    log_gradient = np.outer((action - mean), state) / action_std**2
    return log_gradient

results_file = "saved_results_actionstd.pkl"
overwrite_existing = True  # set to True if you want to force rerun

# Automatically load if it exists and we don't want to overwrite
if os.path.exists(results_file) and not overwrite_existing:
    print("Loading previously saved results...")
    with open(results_file, "rb") as f:
        results = pickle.load(f)
else:
    print("Starting new experiment. Previous results will be replaced.")
    results = {action_std: [] for action_std in action_stds}

    for action_std in action_stds:
        print(f"\n=== Testing action_std = {action_std} ===")
        run_rewards = []
        for run in range(number_of_runs):
            print(f"\n  -> Run {run + 1}/{number_of_runs} (action_std={action_std})")
            time = 0
            theta = np.random.randn(action_dim, state_dim) * 0.1
            best_reward = -np.inf
            best_theta = None
            reward_each_episode = []

            for episode in range(num_episodes):
                obs, info = env.reset()
                state = obs[0]
                trajectory = []
                total_reward = 0
                done = False
                while not done:
                    action = policy(state)
                    next_state, reward, done, truncated, _ = env.step([action])
                    done = done or truncated
                    trajectory.append((state, action, reward))
                    total_reward += reward
                    state = next_state[0]

                if total_reward == best_reward:
                    time += 1
                else:
                    time = 0

                if time >= 100:
                    break

                if total_reward > best_reward:
                    best_reward = total_reward
                    best_theta = theta

                returns = np.zeros(len(trajectory))
                G = 0
                for t in reversed(range(len(trajectory))):
                    G = trajectory[t][2] + gamma * G
                    returns[t] = G

                for t, (state, action, _) in enumerate(trajectory):
                    grad = compute_log_gradient(state, action)
                    theta += alpha * grad * returns[t]

                reward_each_episode.append(total_reward)

            run_rewards.append(reward_each_episode)
            print(f"  -> Run {run + 1} finished. Best Reward: {best_reward}")

        results[action_std] = run_rewards
        print(f"=== Finished testing std = {action_std} ===\n")

    # Save results
    with open(results_file, "wb") as f:
        pickle.dump(results, f)
    print(f"Results saved to {results_file}")


In [None]:
# Load saved results from file
with open("saved_results_actionstd.pkl", "rb") as f:
        results = pickle.load(f)

# Now plot from loaded data
for action_std in results:
    if not results.get(action_std) or len(results[action_std]) == 0:
        print(f"Skipping action_std={action_std} (no runs available)")
        continue

    plt.figure(figsize=(8, 5))

    # Find the longest and shortest run lengths
    max_len = max(len(run) for run in results[action_std])
    min_len = min(len(run) for run in results[action_std])
    fastest_run_idx = [i for i, run in enumerate(results[action_std]) if len(run) == min_len][0]

    padded_runs = []
    for run_rewards in results[action_std]:
        run_rewards = np.array(run_rewards).flatten()
        pad_length = max_len - len(run_rewards)
        if pad_length > 0:
            padded = np.concatenate([run_rewards, np.full(pad_length, 1000.0)])
        else:
            padded = run_rewards
        padded_runs.append(padded)

    all_runs = np.stack(padded_runs)

    mean_rewards = np.mean(all_runs,axis=0)
    lower_percentile = np.percentile(all_runs, 5, axis=0)
    upper_percentile = np.percentile(all_runs,95, axis=0)

    episodes = np.arange(len(mean_rewards))
    plt.plot(episodes, mean_rewards, label=f"Mean Reward (action_std={action_std})", color="green")
    plt.fill_between(episodes, lower_percentile, upper_percentile, alpha=0.2, color="green", label="5–95 percentile")
    # Mark the fastest run
    plt.axvline(x=min_len, color="red", linestyle="--", linewidth=1.5, label=f"Fastest run (ep. {min_len})")

    plt.xlabel("Episodes")
    plt.ylabel("Reward")
    plt.title(f"Mean Learning Curve for action_std={action_std}")
    plt.grid(True)
    plt.legend()
    plt.savefig(f"mean_curve_action_std_{action_std}.png")
    plt.show()



### LQR Loi optimale mais instable 

In [None]:
from scipy.linalg import solve_discrete_are

def solve_discounted_lqr(A, B, Q, R, gamma, max_iter=10000, tol=1e-10):
    n = A.shape[0]
    P = Q.copy()
    for _ in range(max_iter):
        S = R + gamma*(B.T @ P @ B)
        S_inv = np.linalg.inv(S)
        P_next = Q + gamma*(A.T @ P @ A) \
               - gamma**2 * (A.T @ P @ B) @ S_inv @ (B.T @ P @ A)
        if np.linalg.norm(P_next - P) < tol:
            P = P_next
            break
        P = P_next

    S = R + gamma*(B.T @ P @ B)
    S_inv = np.linalg.inv(S)
    K = -S_inv @ (gamma * B.T @ P @ A)
    return P, K

def solve_lqr_classic(A, B, Q, R):

    P = solve_discrete_are(A, B, Q, R)
    
    S_inv = np.linalg.inv(R + B.T @ P @ B)
    K = -S_inv @ (B.T @ P @ A)
    
    return P, K


In [None]:
#A and B given

A= np.array([[ 1.00000000e+00, -2.35296084e-03,  3.99338571e-02, 1.21166283e-04],
             [-1.38777878e-16,  1.02354968e+00,  1.52695351e-04, 3.87872361e-02],
             [ 1.19739288e-15, -1.16537817e-01,  9.96701460e-01, 5.20975868e-03],
             [-5.46784840e-15,  1.16687611e+00,  7.56271952e-03, 9.47825276e-01]])
B = np.array([[ 0.00664611],
              [-0.01530383],
              [ 0.33142047],
              [-0.7575789 ]])

Q = np.diag([1.0, 1.0, 1000.0, 1.0])
R = np.array([[0.1]])
#A = np.array([[2]])
#B = np.array([[1]])
#Q = np.array([[1]])
#R = np.array([[1]])
found_controller = False
for gamma in np.arange(1, 0, -0.01):
    P_sol, K_sol = solve_discounted_lqr(A, B, Q, R, gamma)
    A_cl = A + B @ K_sol 
    eigvals, _ = np.linalg.eig(A_cl)
    spectral_radius = np.max(np.abs(eigvals))
    print(f"gamma = {gamma:.4f} | Spectral Radius = {spectral_radius:.4f}")
    if spectral_radius > 1.0:
        print(f"Succes : For gamma = {gamma:.4f}, closed-loop system unstable (spectral radius = {spectral_radius:.4f}).")
        found_controller = True
        break

if not found_controller:
    print("No controller found with eigenvalues > 1 in the gamma interval [0,1].")

obs, info = env.reset()
terminated = [False]
ret = 0
while not all(terminated):
    action = np.dot(K_sol, obs.T)
    action = np.clip(action, env.action_space.low, env.action_space.high)
    obs, reward, terminated, truncated, info = env.step(action)
    terminated = terminated | truncated
    ret += reward
print(f"Reward total dans l'environnement : {ret}")

In [None]:
A= np.array([[ 1.00000000e+00, -2.35296084e-03,  3.99338571e-02, 1.21166283e-04],
             [-1.38777878e-16,  1.02354968e+00,  1.52695351e-04, 3.87872361e-02],
             [ 1.19739288e-15, -1.16537817e-01,  9.96701460e-01, 5.20975868e-03],
             [-5.46784840e-15,  1.16687611e+00,  7.56271952e-03, 9.47825276e-01]])
B = np.array([[ 0.00664611],
              [-0.01530383],
              [ 0.33142047],
              [-0.7575789 ]])

Q = np.diag([1.0, 1.0, 100.0, 1.0])
R = np.array([[0.1]])
print(f"gamma = {gamma:.4f}")
P_sol, K_sol = solve_discounted_lqr(A, B, Q, R,gamma)
print("P matrix:\n", P_sol)
print("K gain:\n", K_sol)

A_cl = A + B @ K_sol 
eigvals, _ = np.linalg.eig(A_cl)
print("closed-loop eigenvalues:\n", eigvals)

spectral_radius = np.max(np.abs(eigvals))
print(f"Spectral Radius = {spectral_radius}")
if spectral_radius < 1.0:
    print("=> stable")
else:
    print("=> unstable.")

obs, info = env.reset()
terminated = [False]
ret = 0
while not all(terminated):
    action = np.dot(K_sol, obs.T) 
    action = np.clip(action, env.action_space.low, env.action_space.high)
    obs, reward, terminated, truncated, info = env.step(action)
    terminated = terminated | truncated
    ret += reward
print(f"Total reward in env: {ret}")
# We get something stable localy stable (ie spectral radius of closed systems <1) but when we launch the env it's unstable after approx 25 iterations.
# The discount create a myopic controller, it does not stabilize the system globally (for 1000 iterations) but it stabilizes it for the launch (for the first iterations)


In [None]:
##RL linéaire
K = np.array([-3.00969248e-06,7.93095359e+00,9.70670245e-03,1.36614601e+00]).reshape(env.action_space.shape[1],env.single_observation_space.shape[0])
print("K =",K)
print(f"gamma = {gamma:.4f}")
sigma = 0.25
bret = -np.inf  
bK = None

def policy(s):
    return np.random.normal(np.dot(K,s),sigma)

def compute_log_gradient(s, a):
    mu = np.dot(K, s)
    return ((a - mu) * s / sigma**2) 

for episode in range(5000):
    obs, info = env.reset()
    terminated = [False]
    ret = 0.0
    obss,actions,rewards=[],[],[]
    while not all(terminated):
        s = obs[0]
        action = policy(s)
        action = np.clip(action, -3.0, 3.0)
        next_obs, reward, terminated, truncated, info = env.step([action])
        terminated = terminated | truncated
        obss.append(s)
        actions.append(action)  
        rewards.append(reward)
        ret += reward
        obs = next_obs

    if ret >= bret:
        bret = ret 
        bK = K.copy()

    returns = np.zeros(len(rewards))
    G = 0
    for t in reversed(range(len(rewards))):
        G = rewards[t] + gamma * G
        returns[t] = G
    
    alpha = 0.000000005 / (1 + 0.00005 * episode)
    
    for t in range(len(obss)):
        grad = compute_log_gradient(obss[t], actions[t])  
        K += alpha * grad * returns[t]  
    print(f"Episode {episode}, Total Reward: {ret}, Best Reward: {bret}, K:{K}" )
    
print(bK)

### Neural Network

In [None]:
#create the environment with parallel envs
envname="InvertedPendulum-v5" 
env = gym.make_vec(envname,num_envs=8,render_mode='rgb_array',vectorization_mode='sync')

In [None]:
gamma=0.99
num_episodes=500
action_std=0.3

class Network(nn.Module):
    def __init__(self):
        super().__init__()
        self.layer1 = nn.Linear(4, 10,device='cuda')  # input size 1 → output size 10
        self.layer2 = nn.Linear(10, 10,device='cuda')  
        self.layer3 = nn.Linear(10, 1,device='cuda')  # 10 → 1

    def forward(self, state):
        output = torch.relu(self.layer1(state))
        output = torch.relu(self.layer2(output))
        output = self.layer3(output)
        return output

def rollout(env, policy_net, max_steps=1000):
    state, _ = env.reset()

    alive = torch.ones(8, dtype=torch.bool, device='cuda') ## (8,)
    states, actions, rewards, log_probs = [], [], [], []

    for _ in range(max_steps):

        if not alive.any(): ## alive.any(): True if any entry in alive is True
            break

        state_tensor = torch.tensor(state, dtype=torch.float32,device='cuda') ## (8,4) on GPU 
        mean = policy_net(state_tensor) ## (8,4)→(8,1)
        dist = Normal(mean, action_std)
        action = dist.sample() ## (8,1)
        log_prob = dist.log_prob(action).sum(dim=-1) ## (8,1).sum(dim=-1) -> (8,)
        
        next_state, reward, done, truncated, _ = env.step(action.cpu().numpy())
        done = np.logical_or(done, truncated)
        
        # mask out dead envs
        reward = torch.tensor(reward, device='cuda') ##(8,)
        reward = reward * alive.float()
        mask=alive.float().unsqueeze(1) ## (8,)-> (8,1)
        action = action * mask ##(8,1)
    
        states.append(state) ## (N,4)
        actions.append(action) ## (N,1)
        rewards.append(reward) ## (N,)
        log_probs.append(log_prob * alive.float()) ## (N,)

        alive = alive & (~torch.tensor(done, device='cuda'))
        
        state = next_state
    
    return states, actions, rewards, log_probs

def compute_returns(rewards, gamma=0.99):
    T = len(rewards)
    returns = torch.zeros(T, dtype=torch.float32, device='cuda')
    G = 0.0
    for t in range(T-1, -1, -1):    
        G = rewards[t] + gamma * G
        returns[t] = G
    return returns

policy_net=Network()
optimizer = torch.optim.Adam(policy_net.parameters(), lr=1e-2)

for episode in range(num_episodes):
    states, actions, rewards, log_probs=rollout(env,policy_net)
    all_returns=[]
    # Split rewards per environment
    for env_idx in range(8):
        # Collect rewards only for env_idx
        rewards_per_env = [rewards[t][env_idx] for t in range(len(rewards))]
        returns = compute_returns(rewards_per_env, gamma)
        all_returns.append(returns)

    returns_tensor = torch.stack(all_returns, dim=1)##(N,8) 
    returns_tensor=(returns_tensor-returns_tensor.mean())/(returns_tensor.std()+0.0000001)
    log_probs_tensor = torch.stack(log_probs) ##(N,8)
    loss = -(log_probs_tensor * returns_tensor).sum() ##scalar

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    rewards_tensor = torch.stack(rewards, dim=0)  ## (N, 8)
    total_per_env      = rewards_tensor.sum(dim=0) ## (8,)
    avg_total_reward   = total_per_env.mean().item() ## scalar

    print("Avg total return:", avg_total_reward,"episode:", episode)
    if avg_total_reward >= 999:
        break
    

In [None]:
obs, _ = env.reset()
terminated = [False]
ret = 0

while not all(terminated):
    state = obs

    with torch.no_grad():
        state_tensor = torch.tensor(state, dtype=torch.float32,device='cuda') # shape: [1, 4]
        mean= policy_net(state_tensor)
        action = mean.cpu().numpy()  # Use mean as deterministic action at test time
    # Step in the environment
    obs, reward, terminated, truncated, info = env.step(action)
    ret += reward[0].item()
    
    # Visualize
    render_notebook(env, 0, f"{ret=:.2f}")