# Q function == Neural network

In [1]:
#Q

import torch.nn as nn
import torch

class QFunction(nn.Module):
    def __init__(self, state_space, reward_space):
        super().__init__()
        self.fc1 = nn.Linear(state_space, 64)
        self.fc2 = nn.Linear(64, 64)
        self.fc3 = nn.Linear(64, reward_space)

    def forward(self, state):
        x = torch.relu(self.fc1(state))
        x = torch.relu(self.fc2(x))  # Utilisation correcte de x
        return self.fc3(x)


### Visual and testing function

In [2]:
import gymnasium as gym

def init_env(env_flag, max_action_by_epoch=None, render=False):
    if env_flag is None:  # CartPole
        if render:
            env = gym.make('CartPole-v1', 
                           max_episode_steps=max_action_by_epoch if max_action_by_epoch is not None else 200, 
                           render_mode="human")
        else:
            env = gym.make('CartPole-v1', 
                           max_episode_steps=max_action_by_epoch if max_action_by_epoch is not None else 200)
        input_dim = sum(env.observation_space.shape)
        output_dim = env.action_space.n  # Discrete actions
    else:  # Pendulum
        if render:
            env = gym.make('Pendulum-v1', 
                           max_episode_steps=max_action_by_epoch if max_action_by_epoch is not None else 60, 
                           render_mode="human")
        else:
            env = gym.make('Pendulum-v1', 
                           max_episode_steps=max_action_by_epoch if max_action_by_epoch is not None else 60)
        input_dim = sum(env.observation_space.shape)
        output_dim = 1  # Continuous action: single Q-value output

    return env, input_dim, output_dim


In [3]:
import torch
import numpy as np

def play_game(env_flag, q_function, render=False):
    env, _, _ = init_env(env_flag, render=render)
    state, _ = env.reset()

    while True:
        state_tensor = torch.tensor(state, dtype=torch.float32)
        q_values = q_function(state_tensor)
        
        if env_flag:  # Pendulum (actions continues)
            action = q_values.item()  # Récupère la valeur prédite pour l'action
            action = np.clip(action, env.action_space.low[0], env.action_space.high[0])  # Assure que l'action est dans les limites
            action = np.array([action])  # Formate comme tableau NumPy
        else:  # CartPole (actions discrètes)
            action = torch.argmax(q_values).item()
        
        next_state, reward, done, truncated, _ = env.step(action)

        cart_position = state[0]  # La position horizontale du chariot
        position_penalty = abs(cart_position) * 0.1  # Ajuste le coefficient 0.1 pour amplifier ou réduire le malus
        print(position_penalty)
        
        state = next_state

        if render:
            env.render()

        if done or truncated:
            break

    env.close()


# DQN Algorithm

In [4]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import random
from collections import deque


def sample_action(env, q_function, state, epsilon, env_flag):
    if np.random.rand() < epsilon:
        if env_flag is None:  # Discrete actions for CartPole
            return env.action_space.sample()
        else:  # Continuous actions for Pendulum
            return np.random.uniform(env.action_space.low, env.action_space.high)
    else:
        state_tensor = torch.tensor(state, dtype=torch.float32)
        if env_flag is None:
            return torch.argmax(q_function(state_tensor)).item()
        else:
            raw_action = q_function(state_tensor)  # Sortie brute du modèle
            # Appliquer tanh pour ramener l'action dans l'intervalle [-1, 1]
            normalized_action = torch.tanh(raw_action)
            # Projetée sur l'intervalle de l'espace d'action
            action = normalized_action.detach().numpy() * (env.action_space.high[0] - env.action_space.low[0]) / 2 + (env.action_space.high[0] + env.action_space.low[0]) / 2
            return action


def fill_replay_memory(env, replay_memory, run_to_fill_replay):
    for _ in range(run_to_fill_replay):
        state, _ = env.reset()
        for _ in range(1000):
            action = env.action_space.sample()
            next_state, reward, done, truncated, _ = env.step(action)
            replay_memory.append((state, action, reward, done, next_state))
            state = next_state
            if done or truncated:
                break


def train_dqn(
    replay_memory_size=300,
    run_to_fill_replay=50,
    max_epoch=300,
    max_action_by_epoch=1000,
    fixed_learning_rate=0.5,
    batch_size=64,
    epsilon=0.05,
    horizon=0.99,
    update_q_step=30,
    update_epsilon_decay=0.99,
    env_flag=None
):
    env, input_dim, output_dim = init_env(env_flag, max_action_by_epoch)
    replay_memory = deque(maxlen=replay_memory_size)
    q_function = QFunction(input_dim, output_dim)
    chapeau_q_function = QFunction(input_dim, output_dim)
    optimizer = torch.optim.RMSprop(q_function.parameters(), lr=fixed_learning_rate)

    fill_replay_memory(env, replay_memory, run_to_fill_replay)
 
    total_rewards = 0
    for epoch in range(max_epoch):
        state, _ = env.reset()

        for i in range(10000):
            action = sample_action(env, q_function, state, epsilon, env_flag)
            next_state, reward, done, truncated, _ = env.step(action)

            cart_position = state[0]  # La position horizontale du chariot
            position_penalty = abs(cart_position) * 0.1  # Ajuste le coefficient 0.1 pour amplifier ou réduire le malus
            
            reward = reward - position_penalty

            total_rewards += reward
            replay_memory.append((state, action, reward, done, next_state))

            if len(replay_memory) < batch_size:
                if done or truncated:
                    break
                continue

            batch = random.sample(replay_memory, batch_size)
            states, actions, rewards, dones, next_states = map(
                lambda x: torch.tensor(np.array(x), dtype=torch.float32),
                zip(*batch)
            )

            if env_flag is None:  # Discrete actions
                actions = actions.to(torch.long).view(-1, 1)

            next_q_values = chapeau_q_function(next_states)
            max_next_q_values = torch.max(next_q_values, dim=1)[0] if env_flag is None else next_q_values.squeeze()
            experience_result = rewards + horizon * (1 - dones) * max_next_q_values.detach()

            q_values = q_function(states)
            q_values_for_actions = (
                q_values.gather(1, actions) if env_flag is None else q_values.squeeze()
            )

            loss = ((experience_result - q_values_for_actions.squeeze()) ** 2).mean()

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            state = next_state

            if done or truncated:
                break

        if epoch % update_q_step == 0:
            chapeau_q_function.load_state_dict(q_function.state_dict())

        epsilon = max(0.01, epsilon * update_epsilon_decay)

        if epoch and epoch % 10 == 0:
            print(f"Epoch: {epoch}, Avg Reward (last 10 runs): {total_rewards / 10:.2f}, Loss: {loss:.4f}")
            play_game(env_flag, q_function, True)
            total_rewards = 0
    
    play_game(env_flag, q_function, True)
    env.close()
    return q_function




# Grid Search

In [5]:
import itertools

def grid_search(hyperparameter_space, fixed_params, train_function, env_flag=None):
    """
    Effectue un grid search sur les hyperparamètres.
    
    Args:
        env: L'environnement d'entraînement.
        hyperparameter_space: Un dictionnaire des hyperparamètres à tester, avec des listes de valeurs.
        fixed_params: Un dictionnaire des paramètres fixes.
        train_function: Une fonction pour entraîner le modèle. Elle doit retourner le modèle entraîné.
        evaluation_function: Une fonction pour évaluer le modèle. Elle doit retourner une récompense totale.
    
    Returns:
        Le meilleur modèle, la meilleure récompense, et les meilleurs paramètres.
    """
    param_keys = list(hyperparameter_space.keys())
    param_values = list(hyperparameter_space.values())
    param_grid = list(itertools.product(*param_values))

    for param_set in param_grid:
        # Préparer les paramètres actuels
        current_params = {key: value for key, value in zip(param_keys, param_set)}
        print(f"_________________\nTesting with params: {current_params}")
        
        # Combiner les paramètres fixes avec les hyperparamètres testés
        all_params = {**fixed_params, **current_params}

        # Entraîner le modèle
        train_function(**all_params)


In [6]:
#CARTPOLE

hyperparameter_space = {}
fixed_params = {'env_flag':None, 'fixed_learning_rate': 0.0009, 'epsilon': 0.1, 'batch_size': 128, 'max_action_by_epoch': 200, 'replay_memory_size': 200, 'run_to_fill_replay': 120, 'max_epoch': 100, 'horizon': 0.99, 'update_q_step': 2, 'update_epsilon_decay': 0.99}

# Lancer le grid search
grid_search(
    hyperparameter_space,
    fixed_params,
    train_dqn,  # Fonction d'entraînement
)

_________________
Testing with params: {}
0.0033409668
0.003260589
0.003571307
0.004273119
0.0053661685
0.006850722
0.008727122
0.010995726
0.012877173
0.0019008023
0.001820302
0.0013494762
0.0004883406
0.00076325197
0.002405592
0.004439076
0.006864149
0.009681224
0.00010732189
0.00010689672
0.0002830446
0.0010624932
0.0022316158
0.003790737
0.005740305
0.007300404
0.00847189
0.0092555275
0.0104318755
0.012001082
0.01396333
0.00025805368
0.00030908833
0.0007494705
0.0015792212
0.0027985373
0.0044077714
0.0064073964
0.008797953
0.011579969
0.014753862
0.0010497393
0.0011184402
0.0015763555
0.0024234648
0.0036599275
0.0052860677
0.0073023424
0.008928824
0.0109466845
0.013356325
0.01615812
0.004814997
0.0047823903
0.0043586195
0.0035436843
0.0023374392
0.0007396206
0.0012501107
0.0036321
0.005627022
0.003299615
0.0032074617
0.0035052213
0.004192874
0.00527057
0.006738612
0.007816955
0.009286661
0.0111480635
0.012621433
0.013707778
0.014408006
0.014722869
0.015431831
0.015756177
0.00438305

KeyboardInterrupt: 

In [7]:
fixed_params = {'env_flag':True, 'fixed_learning_rate': 0.0001, 'epsilon': 0.1, 'batch_size': 128, 'max_action_by_epoch': 200, 'replay_memory_size': 500, 'run_to_fill_replay': 300, 'max_epoch': 1000, 'horizon': 1, 'update_q_step': 10, 'update_epsilon_decay': 0.995}
hyperparameter_space = {}

grid_search(
    hyperparameter_space,
    fixed_params,
    train_dqn,
    env_flag=True
)

_________________
Testing with params: {}


KeyboardInterrupt: 

In [4]:
x = 5

y = 5


print(x)

def plus_2(x):
    return x + 2

print(plus_2(x))

5
7
