In [1]:
from typing import List

import pandas as pd
import numpy as np
from tqdm import tqdm

# Monte Carlo ES

In [2]:
class LineWorld:
    def __init__(self):
        self._agent_pos = 2

    #uniquement pour le monté Carlo ES
    @staticmethod
    def from_random_state() -> 'LineWorld':
        env = LineWorld()
        env._agent_pos = np.random.randint(1, 4)
        return env

    def available_action(self) -> List[int]:
        if self._agent_pos in [1, 2, 3]:
            return [0, 1]  # 0: left, 1: right
        return []

    def is_terminal(self) -> bool:
        return True if self._agent_pos in [0, 4] else False

    def state_id(self) -> int:
        return self._agent_pos

    def step(self, action: int):
        assert (not self.is_terminal())
        assert (action in self.available_action())
        if action == 0:
            self._agent_pos -= 1
        else:
            self._agent_pos += 1

    def score(self) -> float:
        if self._agent_pos == 0:
            return -1.0
        if self._agent_pos == 4:
            return 1.0
        return 0.0

    def display(self):
        for i in range(5):
            print('X' if self._agent_pos == i else '_', end='')
        print()

    def reset(self):
        self._agent_pos = 2

In [3]:
env = LineWorld()
env.display()
env.step(0)
env.display()

__X__
_X___


In [12]:
def naive_monte_carlo_with_exploring_start(env_type,
                                           gamma: float = 0.999,
                                           nb_iter: int = 10000,
                                           max_steps: int = 10):
    pi = {}
    Q = {}
    Returns = {}

    for it in tqdm(range(nb_iter)):
        env: LineWorld = env_type.from_random_state()

        trajectory = []
        steps_count = 0

        is_first_action = True

        while not env.is_terminal() and steps_count < max_steps:
            s = env.state_id()
            aa = env.available_action()

            if s not in pi:
                pi[s] = np.random.choice(aa)

            if is_first_action:
                a = np.random.choice(aa)
                is_first_action = False
            else:
                a = pi[s]

            prev_score = env.score()
            env.step(a)
            r = env.score() - prev_score
            trajectory.append((s, a, r, aa))
            steps_count += 1

        G = 0
        for (t, (s, a, r, aa)) in reversed(list(enumerate(trajectory))):
            G = gamma * G + r

            if all(map(lambda triplet: triplet[0] != s or triplet[1] != a, trajectory[:t])):
                if (s, a) not in Returns:
                    Returns[(s, a)] = []
                Returns[(s, a)].append(G)
                Q[(s, a)] = np.mean(Returns[(s, a)])

                best_a = None
                best_a_idx = 0
                for a in aa:
                    if (s, a) not in Q:
                        Q[(s, a)] = np.random.random()
                    if best_a is None or Q[(s, a)] > best_a_idx:
                        best_a = a
                        best_a_idx = Q[(s, a)]
                pi[s] = best_a
    return pi





In [13]:
naive_monte_carlo_with_exploring_start(LineWorld)

100%|██████████| 10000/10000 [00:03<00:00, 3208.71it/s]


{1: 1, 2: 1, 3: 1}

# Qlearning (of policy TD control)

In [38]:
def naive_q_learning(env_type,
                     gamma: float = 0.999,
                     nb_iter: int = 100000,
                     alpha: float = 0.1,
                     epsilon: float = 0.1):
    Q = {}
    env = env_type()
    for it in tqdm(range(nb_iter)):
        env.reset()

        while not env.is_terminal():
            s = env.state_id()
            aa = env.available_action()
            
            if s not in Q:
                Q[s] = {}
                for a in aa:
                    Q[s][a] = np.random.random()

            if np.random.random() < epsilon:
                a = np.random.choice(aa)
            else:
                q_s = [Q[s][a] for a in aa]
                best_a_index = np.argmax(q_s)
                a = aa[best_a_index]

            prev_score = env.score()
            env.step(a)
            r = env.score() - prev_score

            s_p = env.state_id()
            aa_p = env.available_action()

            if env.is_terminal():
                target = r
            else:
                if s_p not in Q:
                    Q[s_p] = {}
                    for a_p in aa_p:
                        Q[s_p][a_p] = np.random.random()
                        
                q_s_p = [Q[s_p][a_p] for a_p in aa_p]
                max_a_p = np.max(q_s_p)
                target = r + gamma * max_a_p
                
            Q[s][a] = (1 - alpha) * Q[s][a] + alpha * target

    pi = {}
    print(Q)
    for s in Q.keys():
        best_a = None
        best_a_score = 0.0

        for a, a_score in Q[s].items():
            if best_a is None or best_a_score <= a_score:
                best_a = a
                best_a_score = a_score
        pi[s] = best_a

    return pi, Q



In [39]:
naive_q_learning(LineWorld)

100%|██████████| 100000/100000 [00:02<00:00, 34535.43it/s]

{2: {0: 0.9970029989999992, 1: 0.9989999999999996}, 3: {0: 0.9980009999999991, 1: 0.9999999999999994}, 1: {0: -0.9999999999956554, 1: 0.9980009999999991}}





({2: 1, 3: 1, 1: 1},
 {2: {0: 0.9970029989999992, 1: 0.9989999999999996},
  3: {0: 0.9980009999999991, 1: 0.9999999999999994},
  1: {0: -0.9999999999956554, 1: 0.9980009999999991}})