In [None]:
#Train
#Action = 2
#Memory
import gymnasium as gym
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers
import matplotlib.pyplot as plt
import os
from tensorflow import keras
import sys
 
# Créer l'environnement Freeway de gymnasium Atari
env = gym.make('ALE/Freeway-v5', difficulty=0, mode=0)#, render_mode="human" render_mode="human"
 
# Dimensions de l'image après redimensionnement
img_height, img_width = 84, 84
img_height, img_width = 174, 84
 
 
def create_q_model(num_actions):
    inputs = layers.Input(shape=(174, 84, 4,))
 
    # Convolutions sur les images à l'écran
    layer1 = layers.Conv2D(32, 8, strides=4, activation="relu")(inputs)
    layer2 = layers.Conv2D(64, 4, strides=2, activation="relu")(layer1)
    layer3 = layers.Conv2D(64, 3, strides=1, activation="relu")(layer2)
 
    layer4 = layers.Flatten()(layer3)
 
    layer5 = layers.Dense(512, activation="relu")(layer4)
    action = layers.Dense(num_actions, activation="linear")(layer5)
 
    return keras.Model(inputs=inputs, outputs=action)
 
# Fonction pour entraîner le modèle
def model_loss(target, network, action):
    lossess = tf.square(target - network) # action
    return tf.reduce_mean(lossess* action)
  
def reward_policy(reward, ob, action):
    if reward == 1:
        reward = 1
    elif ob[16] == 1:  # Collision!
        reward = -1
    elif action != 1:  # Ne pas l'inciter à rester immobile
        reward = -0.25
 
    return reward

def reduce_state(ob):
    # Peut importe où on a été touché
    ob[16] = 1 if ob[16] != 255 else 0
 
    # Réduit la position du poulet
    ob[14] = ob[14] // 3
 
    for b in range(108, 118):
        if ob[b] < 20 or ob[b] > 80:
            # Pas besoin de représenter les voitures qui sont loins des poulets
            ob[b] = 0
        else:
            ob[b] = ob[b] // 3
 
    return ob
 
def preprocess_observation(observation):
    crop = tf.image.crop_to_bounding_box(observation, 20, 8, 174, 84)
    grayscale = tf.image.rgb_to_grayscale(crop)
    normalized_image = tf.image.per_image_standardization(grayscale)
    return normalized_image   
 
    
 
# Boucle principale d'entraînement
def train_freeway():
    RAM_mask = [
      14  # Chicken Y
    , 16  # Chicken Lane Collided
    , 108, 109, 110, 111, 112, 113, 114, 115, 116, 117  # Car X Coords
    ]
    model = create_q_model(2)
    model_target = create_q_model(2)
    optimizer = keras.optimizers.RMSprop(learning_rate=0.0001, rho=0.99)

    eps = 1.0  # Epsilon greedy parameter
    epsilon_min = 0.1  # Minimum epsilon greedy parameter
    epsilon_max = 1.0  # Maximum epsilon greedy parameter
    epsilon_interval = (epsilon_max - epsilon_min)  # Rate at which to reduce chance of random action being taken
    epsilon_greedy_frames = 50

    batch_size = 32
    rewards_per_episode = []
    loss_per_episode = []
    states, rewards, next_states, actions = [], [], [], []
    acte = 0
    score_plus_epi = []
    score_plus_epi_tot = []
    count = 0
    #Nombre d'épisodes (époques)
    for epi in range(100):
        print("----------------------------new episode----------------------------")
        # Rediriger la sortie standard vers os.devnull
        original_stdout = sys.stdout
        sys.stdout = open(os.devnull, 'w')
        score_plus = []
        obs, info = env.reset()
        st = preprocess_observation(obs)
        memory = [st,st,st,st]
        count = 0
        while True:
            print(count)
            memory_array = np.array(memory)
            memory_array_rgb = np.concatenate(memory_array, axis=-1)
            if count % 4 == 0:
                if np.random.rand() < eps:
                    acte =  np.random.randint(2)
                else:
                    q_values = model.predict(np.expand_dims(memory_array_rgb, axis=0)) 
                    acte = np.argmax(q_values)
                if acte == 0:
                    act = 2
                else :
                    act = 1
            observation, reward, terminated, truncated, info = env.step(act)
            st2 = preprocess_observation(observation)
            ram_state = env.unwrapped.ale.getRAM()
            ram_state = reduce_state(ram_state)
            reward = reward_policy(reward, ram_state, acte)
            score_plus.append(reward)

            mask = np.zeros(2)
            mask[acte] = 1
 
            index = np.random.randint(len(states) + 1)
            memory_array = np.array(memory)
            memory_array_rgb = np.concatenate(memory_array, axis=-1)
            states.insert(index, memory_array_rgb)
            rewards.insert(index, reward)
            memory.pop(0)
            memory.append(st2)
            memory_array = np.array(memory)
            memory_array_rgb = np.concatenate(memory_array, axis=-1)
            next_states.insert(index, memory_array_rgb)
            actions.insert(index, mask)
 
            if len(states) > 10000:
                states.pop(0)
                rewards.pop(0)
                next_states.pop(0)
                actions.pop(0)
 
            #st = st2
            count +=1
            if terminated or truncated:
                break

        print("---------------")
        print(len(states))




        size = len(next_states)

        # Transform each array into a tensor
        tf_states = tf.convert_to_tensor(np.array(states), dtype=tf.float32)
        tf_rewards = tf.convert_to_tensor(np.array(rewards), dtype=tf.float32)
        tf_next_states = tf.convert_to_tensor(np.array(next_states), dtype=tf.float32)
        tf_actions = tf.convert_to_tensor(np.array(actions), dtype=tf.float32)
 
        # Get the QTargets
        Q_stp1 = model_target.predict(tf_next_states)
        Qtargets = tf.convert_to_tensor(tf_rewards.numpy().reshape(-1, 1) + 0.99 * np.max(Q_stp1, axis=1).reshape(size, 1))
 
        # Generate batch of training and train the model
        losses = []
 
        for b in range(0, size, batch_size):
            with tf.GradientTape() as tape:
                to = min(b + batch_size, size)
                tf_states_b = tf_states[b:to]
                tf_actions_b = tf_actions[b:to]
                Qtargets_b = Qtargets[b:to]
 
                predictions_network = model(tf_states_b)
                loss = model_loss(Qtargets_b, predictions_network, tf_actions_b)
                losses.append(loss.numpy())
 
            # Calculate gradients and update the model
            gradients = tape.gradient(loss, model.trainable_variables)
            optimizer.apply_gradients(zip(gradients, model.trainable_variables))
            #model.
        if epi % 3 == 0:
            # update the the target network with new weights
            model_target.set_weights(model.get_weights())
            # Log details
            template = "running reward: {:.2f} at episode {}"
            print(template.format(np.mean(rewards), epi))     

        sys.stdout = original_stdout
        print("Mean loss", np.mean(losses))
        loss =  np.mean(losses)






        rewards_per_episode.append(sum(rewards))
        loss_per_episode.append(loss)
        print("Episode {}: Total Reward: {}, Epsilon: {:.4f}".format(epi + 1, sum(rewards), eps))
        print("Rewards mean:", np.mean(rewards))
        somme_positifs_alternative = sum(nombre for nombre in score_plus if nombre > 0)
        score_plus_epi.append(somme_positifs_alternative)
        score_plus_epi_tot.append(sum(score_plus))
        print("Score du jeux:", somme_positifs_alternative)
        print("Score des récompenses:", sum(score_plus))
        if sum(score_plus)>25:
            print("Solved at episode {}!".format(epi))
            #break
        eps -= epsilon_interval / epsilon_greedy_frames
        eps = max(eps, epsilon_min)

    plt.figure()
    # Graphique des récompenses par épisode
    plt.subplot(1, 2, 1)
    plt.plot(rewards_per_episode)
    plt.xlabel('Épisode')
    plt.ylabel('Total Reward')
    plt.title('Récompenses de la mémoire par épisode')
    # Graphique de la perte moyenne par épisode
    plt.subplot(1, 2, 2)
    plt.plot(loss_per_episode)
    plt.xlabel('Épisode')
    plt.ylabel('Mean Loss')
    plt.title('Loss moyenne par épisode')
    # Ajuster l'espace entre les sous-graphiques
    plt.tight_layout()
    # Afficher la figure
    plt.savefig('Figure_1.png')  
    plt.show()

    plt.figure()
    plt.subplot(1, 2, 1)
    plt.plot(score_plus_epi)
    plt.xlabel('Épisode')
    plt.ylabel('Total score')
    plt.title('Score du jeux')
    plt.subplot(1, 2, 2)
    plt.plot(score_plus_epi_tot)
    plt.xlabel('Épisode')
    plt.ylabel('Total score')
    plt.title('Score des récompenses')
    plt.tight_layout()
    plt.savefig('Figure_2.png')
    plt.show()

    output_dir = 'model_output/'
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    model.save(output_dir + "Model.h5")
 
# Appeler la fonction principale d'entraînement
train_freeway()
# Fermer l'environnement
env.close()

In [None]:
#Test
import gymnasium as gym
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers
from tensorflow.keras.models import Sequential,load_model
import matplotlib.pyplot as plt
import os
import sys
 
 
img_height, img_width = 174, 84 #img_height, img_width = 174, 84
def preprocess_observation(observation):
    crop = tf.image.crop_to_bounding_box(observation, 20, 8, 174, 84)
    grayscale = tf.image.rgb_to_grayscale(crop)
    normalized_image = tf.image.per_image_standardization(grayscale)
    return normalized_image
 
 
env = gym.make('ALE/Freeway-v5', render_mode="human", difficulty=0, mode=0)
 
 
 
 
state, info = env.reset()
 
 
output_dir = 'model_output/'
 
model = load_model(output_dir + "Model.h5")
 
# Paramètres pour la simulation de test
test_episodes = 5
 
# Boucle pour les épisodes de test
for episode in range(test_episodes):
    state, info = env.reset()
    state = preprocess_observation(state)
    memory = [state,state,state,state]
    total_reward = 0
    count =0
    while True:
        original_stdout = sys.stdout
        sys.stdout = open(os.devnull, 'w')
        # Utiliser le modèle pour prendre une action
        memory_array = np.array(memory)
        memory_array_rgb = np.concatenate(memory_array, axis=-1)

        action = model.predict(np.expand_dims(memory_array_rgb, axis=0))
        action = np.argmax(action[0])
        sys.stdout = original_stdout

        if action == 0:
           action = 2

        # Appliquer l'action à l'environnement
        next_state, reward, done, truncated, info = env.step(action)
        next_state = preprocess_observation(next_state)
        memory.pop(0)
        memory.append(next_state)
        total_reward += reward
        count +=1
        if done:
            print("Episode {}: Total Reward: {}".format(episode + 1, total_reward))
            break

env.close()