In [1]:
import random
import numpy as np

In [2]:
from secret_envs_wrapper import SecretEnv1

In [3]:
class PolicyIteration:
    def __init__(self, env, gamma=0.9, theta=0.001, max_iter=1000):
        self.env = env
        self.gamma = gamma
        self.theta = theta 
        self.max_iter = max_iter
        self.policy = np.zeros(self.env.num_states(), dtype=int)
        self.value_function = np.zeros(self.env.num_states())

    def policy_iteration(self):
        for _ in range(self.max_iter):
            self.value_function = self.policy_evaluation()

            policy_stable = True
            for s in range(self.env.num_states()):
                old_action = self.policy[s]
                self.policy[s] = self.greedy_policy(s)
                if old_action != self.policy[s]:
                    policy_stable = False

            if policy_stable:
                break

        return self.policy, self.value_function

    def policy_evaluation(self):
        value_function = np.zeros(self.env.num_states())
        for _ in range(self.max_iter):
            delta = 0
            for s in range(self.env.num_states()):
                v = value_function[s]
                value_function[s] = self.expected_value(s)
                delta = max(delta, abs(v - value_function[s]))
            if delta < self.theta:
                break
        self.value_function = value_function
        return value_function

    def expected_value(self, s):
        action = self.policy[s]
        expected_value = 0
        for s_prime in range(self.env.num_states()):
            transition_prob = self.env.p(s, action, s_prime, action)
            reward = self.env.reward(s_prime)
            expected_value += transition_prob * (reward + self.gamma * self.value_function[s_prime])
        return expected_value

    def greedy_policy(self, s):
        actions = self.env.available_actions(s)
        best_action = actions[0]
        best_value = self.expected_value_for_action(s, actions[0])
        for action in actions[1:]:
            value = self.expected_value_for_action(s, action)
            if value > best_value:
                best_action = action
                best_value = value
        return best_action

    def expected_value_for_action(self, s, action):
        expected_value = 0
        for s_prime in range(self.env.num_states()):
            transition_prob = self.env.p(s, action, s_prime, action)
            reward = self.env.reward(s_prime)
            expected_value += transition_prob * (reward + self.gamma * self.value_function[s_prime])
        return expected_value

In [4]:
class ValueIteration:
    def __init__(self, env, gamma=0.9, theta=0.001, max_iter=1000):
        self.env = env
        self.gamma = gamma
        self.theta = theta
        self.max_iter = max_iter
        self.value_function = np.zeros(self.env.num_states())
        self.policy = np.zeros(self.env.num_states(), dtype=int)

    def value_iteration(self):
        for _ in range(self.max_iter):
            delta = 0
            new_value_function = np.copy(self.value_function)
            for s in range(self.env.num_states()):
                v = self.value_function[s]
                new_value_function[s] = self.compute_value(s)
                delta = max(delta, abs(v - new_value_function[s]))
            self.value_function = new_value_function
            if delta < self.theta:
                break
        
        self.extract_policy()
        return self.policy, self.value_function

    def compute_value(self, s):
        actions = self.get_available_actions(s)
        action_values = np.zeros(len(actions))
        for i, action in enumerate(actions):
            action_values[i] = self.expected_value_for_action(s, action)
        return np.max(action_values)

    def expected_value_for_action(self, s, action):
        expected_value = 0
        for s_prime in range(self.env.num_states()):
            for r_index in range(self.env.num_rewards()):
                transition_prob = self.env.p(s, action, s_prime, r_index)
                reward = self.env.reward(s_prime)
                expected_value += transition_prob * (reward + self.gamma * self.value_function[s_prime])
        return expected_value

    def extract_policy(self):
        for s in range(self.env.num_states()):
            actions = self.get_available_actions(s)
            best_action = actions[0]
            best_value = self.expected_value_for_action(s, best_action)
            for action in actions[1:]:
                value = self.expected_value_for_action(s, action)
                if value > best_value:
                    best_action = action
                    best_value = value
            self.policy[s] = best_action

    def get_available_actions(self, s):
        try:
            return self.env.available_actions(s)
        except TypeError:
            return self.env.available_actions()

In [5]:
class MonteCarloES:
    def __init__(self, env, gamma=0.9, episodes=1000):
        self.env = env
        self.gamma = gamma
        self.episodes = episodes
        self.policy = np.random.choice(self.env.available_actions(), size=self.env.num_states())
        self.value_function = np.zeros((self.env.num_states(), self.env.num_actions()))
        self.returns = {(state, action): [] for state in range(self.env.num_states()) for action in range(self.env.num_actions())}

    def generate_episode(self, start_state=None, start_action=None):
        episode = []
        self.env.reset()
        state = start_state if start_state is not None else self.env.state_id()
        action = start_action if start_action is not None else np.random.choice(self.env.available_actions())
        
        steps = 0
        while True:
            self.env.step(action)  # Perform the action
            next_state = self.env.state_id()  # Get the next state
            reward = self.env.reward(next_state)  # Get the reward for the next state
            done = self.env.is_game_over()  # Check if the episode is done
            
            episode.append((state, action, reward))
            if done:
                break
            state = next_state
            action = np.random.choice(self.env.available_actions())
            steps += 1
            if steps > 1000:
                print("Episode too long, stopping")
                break
        
        return episode

    def monte_carlo_es(self):
        for episode_num in range(self.episodes):
            start_state = np.random.choice(self.env.num_states())
            start_action = np.random.choice(self.env.available_actions())
            episode = self.generate_episode(start_state, start_action)
            G = 0
            for t in reversed(range(len(episode))):
                state, action, reward = episode[t]
                G = self.gamma * G + reward
                if (state, action) not in [(x[0], x[1]) for x in episode[:t]]:
                    self.returns[(state, action)].append(G)
                    self.value_function[state, action] = np.mean(self.returns[(state, action)])
                    best_action = np.argmax(self.value_function[state, :])
                    self.policy[state] = best_action

            if episode_num % 100 == 0:
                print(f"Episode {episode_num}/{self.episodes} completed")
        
        # Return the learned policy and value function
        return self.policy, self.value_function

    def find_best_path(self, start_state):
        state = start_state
        path = [state]
        steps = 0
        while True:
            action = self.policy[state]
            self.env.step(action)  # Perform the action
            state = self.env.state_id()  # Get the next state
            path.append(state)
            if self.env.is_game_over():
                break
            steps += 1
            if steps > 1000:
                print("Path too long, stopping")
                break
        return path

In [6]:
class MonteCarloOnPolicy:
    def __init__(self, env, gamma=0.9, epsilon=0.1, episodes=1000):
        self.env = env
        self.gamma = gamma
        self.epsilon = epsilon
        self.episodes = episodes
        self.num_states = self.env.num_states()
        self.num_actions = self.env.num_actions()
        
        # Initialisation de la politique stochastique avec une distribution uniforme
        self.policy = np.ones((self.num_states, self.num_actions)) * (self.epsilon / self.num_actions)
        self.state_action_values = np.zeros((self.num_states, self.num_actions))
        self.returns = {(state, action): [] for state in range(self.num_states) for action in range(self.num_actions)}

    def generate_episode(self):
        self.env.reset()
        state = self.env.state_id()
        episode = []
        done = False
        
        while not done:
            action_probs = self.policy[state]
            action_probs = action_probs / np.sum(action_probs)  # Normaliser les probabilités
            action = np.random.choice(self.num_actions, p=action_probs)
            self.env.step(action)
            next_state = self.env.state_id()
            reward = self.env.reward(next_state)
            done = self.env.is_game_over()
            
            episode.append((state, action, reward))
            state = next_state
        
        return episode

    def monte_carlo_on_policy(self):
        for episode_num in range(self.episodes):
            episode = self.generate_episode()
            G = 0
            visited_state_action_pairs = set()
            for t in reversed(range(len(episode))):
                state, action, reward = episode[t]
                G = self.gamma * G + reward
                if (state, action) not in visited_state_action_pairs:
                    visited_state_action_pairs.add((state, action))
                    self.returns[(state, action)].append(G)
                    self.state_action_values[state][action] = np.mean(self.returns[(state, action)])
                    
                    # Mise à jour de la politique
                    best_action = np.argmax(self.state_action_values[state])
                    for a in range(self.num_actions):
                        if a == best_action:
                            self.policy[state][a] = 1 - self.epsilon + self.epsilon / self.num_actions
                        else:
                            self.policy[state][a] = self.epsilon / self.num_actions
                    # Normaliser la politique pour l'état donné
                    self.policy[state] = self.policy[state] / np.sum(self.policy[state])

            if episode_num % 100 == 0:
                print(f"Episode {episode_num}/{self.episodes} completed")
        
        # Retourner la politique et les valeurs d'état-action
        return self.policy, self.state_action_values

    def find_best_path(self, start_state):
        state = start_state
        path = [state]
        steps = 0
        while True:
            action_probs = self.policy[state]
            action = np.argmax(action_probs)  # Choisir l'action avec la probabilité la plus élevée
            self.env.step(action)
            state = self.env.state_id()
            path.append(state)
            if self.env.is_game_over():
                break
            steps += 1
            if steps > 1000:
                print("Path too long, stopping")
                break
        return path

In [7]:
class MonteCarloOffPolicy:
    def __init__(self, env, gamma=0.9, epsilon=0.1, episodes=1000):
        self.env = env
        self.gamma = gamma
        self.epsilon = epsilon
        self.episodes = episodes
        self.num_states = self.env.num_states()
        self.num_actions = self.env.num_actions()

        # Initialisation de la politique cible (policy) et de la politique comportementale (b_policy)
        self.policy = np.ones((self.num_states, self.num_actions)) * (self.epsilon / self.num_actions)
        self.b_policy = np.ones((self.num_states, self.num_actions)) * (self.epsilon / self.num_actions)
        self.state_action_values = np.zeros((self.num_states, self.num_actions))
        self.C = np.zeros((self.num_states, self.num_actions))  # compteur pour la moyenne des retours

        for state in range(self.num_states):
            best_action = np.random.choice(self.num_actions)
            self.b_policy[state][best_action] += 1 - self.epsilon
    
    def generate_episode(self, policy):
        self.env.reset()
        state = self.env.state_id()
        episode = []
        done = False
        
        while not done:
            action_probs = policy[state]
            action_probs = action_probs / np.sum(action_probs)  # Normaliser les probabilités
            action = np.random.choice(self.num_actions, p=action_probs)
            self.env.step(action)
            next_state = self.env.state_id()
            reward = self.env.reward(next_state)
            done = self.env.is_game_over()
            
            episode.append((state, action, reward))
            state = next_state
        
        return episode

    def monte_carlo_off_policy(self):
        for episode_num in range(self.episodes):
            episode = self.generate_episode(self.b_policy)
            G = 0
            W = 1
            visited_state_action_pairs = set()
            
            for t in reversed(range(len(episode))):
                state, action, reward = episode[t]
                G = self.gamma * G + reward
                
                if (state, action) not in visited_state_action_pairs:
                    visited_state_action_pairs.add((state, action))
                    self.C[state][action] += W
                    self.state_action_values[state][action] += (W / self.C[state][action]) * (G - self.state_action_values[state][action])
                    
                    # Mise à jour de la politique cible
                    best_action = np.argmax(self.state_action_values[state])
                    self.policy[state] = np.zeros(self.num_actions)
                    self.policy[state][best_action] = 1.0

                if action != np.argmax(self.state_action_values[state]):
                    break

                # Mise à jour des poids d'importance
                W = W / (self.b_policy[state][action] + 1e-10)
            
            if episode_num % 100 == 0:
                print(f"Episode {episode_num}/{self.episodes} completed")
        
        # Retourner la politique et les valeurs d'état-action
        return self.policy, self.state_action_values

    def find_best_path(self, start_state):
        state = start_state
        path = [state]
        steps = 0
        while True:
            action_probs = self.policy[state]
            action = np.argmax(action_probs)  # Choisir l'action avec la probabilité la plus élevée
            self.env.step(action)
            state = self.env.state_id()
            path.append(state)
            if self.env.is_game_over():
                break
            steps += 1
            if steps > 1000:
                print("Path too long, stopping")
                break
        return path

In [8]:
class DynaQAgent:
    def __init__(self, environment, gamma=0.9, alpha=0.1, epsilon=0.1, n=10):
        self.environment = environment
        self.gamma = gamma
        self.alpha = alpha
        self.epsilon = epsilon
        self.n = n
        self.num_states = environment.num_states()
        self.num_actions = environment.num_actions()
        self.q_values = np.zeros((self.num_states, self.num_actions))
        self.model = {}
        self.initialize_model()

    def initialize_model(self):
        for state in range(self.num_states):
            for action in range(self.num_actions):
                self.model[(state, action)] = (0, state)  # (reward, next_state) initialisé

    def select_action(self, state):
        if np.random.rand() < self.epsilon:
            return np.random.choice(self.num_actions)
        else:
            return np.argmax(self.q_values[state])

    def update_model(self, state, action, reward, next_state):
        self.model[(state, action)] = (reward, next_state)

    def planning_step(self):
        for _ in range(self.n):
            state, action = random.choice(list(self.model.keys()))
            reward, next_state = self.model[(state, action)]
            if next_state is None:
                continue
            if next_state >= self.num_states:
                next_state = self.num_states - 1
            self.q_values[state][action] += self.alpha * (reward + self.gamma * np.max(self.q_values[next_state]) - self.q_values[state][action])

    def dyna_q(self, episodes=100):
        for episode in range(episodes):
            state = self.environment.reset()
            done = False
            while not done:
                action = self.select_action(state)
                try:
                    result = self.environment.step(action)
                    if result is None:
                        raise ValueError("The step method returned None.")
                    
                    next_state, reward, done, _ = result
                except Exception as e:
                    print(f"Exception: {e}. State: {state}, Action: {action}")
                    raise

                if next_state is None or state is None or action is None:
                    print(f"Unexpected None value. State: {state}, Action: {action}, Next State: {next_state}")
                    continue  # Skip this step
                
                if state >= self.num_states or action >= self.num_actions:
                    raise IndexError(f"Invalid index. State: {state}, Action: {action}")
                if next_state >= self.num_states:
                    next_state = self.num_states - 1

                self.q_values[state][action] += self.alpha * (reward + self.gamma * np.max(self.q_values[next_state]) - self.q_values[state][action])
                self.update_model(state, action, reward, next_state)
                self.planning_step()
                state = next_state

            if episode % 10 == 0:
                print(f"Episode {episode}/{episodes} completed")

        # Construire la politique optimale
        policy = np.zeros((self.num_states, self.num_actions))
        for state in range(self.num_states):
            best_action = np.argmax(self.q_values[state])
            policy[state][best_action] = 1.0

        # Retourner la politique et la fonction de valeur
        return policy, self.q_values

    def find_best_path_for_goal(self, start_state):
        state = start_state
        path = [state]
        action_map = {0: 'up', 1: 'down', 2: 'left', 3: 'right'}
        while True:
            action_idx = np.argmax(self.q_values[state])
            action = action_map[action_idx]
            next_state, reward, done, _ = self.environment.step(action)
            path.append(next_state)
            if done:
                break
            state = next_state
        return path

In [None]:
env = SecretEnv1()
policy_iteration = PolicyIteration(env)
policy, value_function = policy_iteration.policy_iteration()
print("Politique optimale : ", policy)
print("Fonction de valeur optimale : ", value_function)

In [None]:
env = SecretEnv1()
value_iteration = ValueIteration(env)
value, value_function = value_iteration.value_iteration()
print("Politique optimale : ", value)
print("Fonction de valeur optimale : ", value_function)

In [None]:
env = SecretEnv1()
monte_carlo_es = MonteCarloES(env, episodes=10)
value, value_function = monte_carlo_es.monte_carlo_es()
print("Politique optimale : ", value)
print("Fonction de valeur optimale : ", value_function)

In [None]:
env = SecretEnv1()
monte_carlo_on_policy = MonteCarloOnPolicy(env, episodes=10)
value, value_function = monte_carlo_on_policy.monte_carlo_on_policy()
print("Politique optimale : ", value)
print("Fonction de valeur optimale : ", value_function)

In [None]:
env = SecretEnv1()
monte_carlo_off_policy = MonteCarloOffPolicy(env, episodes=10)
value, value_function = monte_carlo_off_policy.monte_carlo_off_policy()
print("Politique optimale : ", value)
print("Fonction de valeur optimale : ", value_function)

In [None]:
env = SecretEnv1()
dyna_q = DynaQAgent(env)
value, value_function = dyna_q.dyna_q()
print("Politique optimale : ", value)
print("Fonction de valeur optimale : ", value_function)