# Projet Deep Reinforcement Learning

##  l'Environnement TIC TAC TOE

#### Import des biblio

In [441]:
import numpy as np
import time
from typing import Callable
from copy import copy

#### Initiale les variables d'environnement

In [442]:
S = []
A = np.arange(9)


MY_COLOR = [0, 1, 0]
ADVERSARY_COLOR = [0, 0, 1]
EMPTY_CELL = [1, 0, 0]

In [443]:
def init_state(state = S, cell_list = [], cell_index = 0):
    if(cell_index == 9):
        state.append(cell_list)
        return
    for i in [[1, 0, 0], [0, 1, 0], [0, 0, 1]]:
        init_state(state, copy(cell_list + [i]),cell_index + 1)
    return

init_state()
S = np.asarray(S)


s_start = np.where(S == np.array([[1, 0, 0] for i in range(9)]))[0][0]
c_start = [0, 1, 0]

print(S[s_start])

[[1 0 0]
 [1 0 0]
 [1 0 0]
 [1 0 0]
 [1 0 0]
 [1 0 0]
 [1 0 0]
 [1 0 0]
 [1 0 0]]


#### definir les fonction d'environnement

In [444]:
def get_available(state):
    ret = []
    state = S[state]
    for i in range(len(state)):
        if(np.array_equal(state[i], EMPTY_CELL)):
            ret.append(i)
    return np.array(ret)

def step(state, action : int, color):
    assert action >= 0 and action < 9, "invalid"
    assert not is_terminal(state), "game is finished"
    assert np.array_equal(S[state, action], EMPTY_CELL), "cell is not empty"
    reward = 1
    
    state = np.copy(S[state])
    state[action] = color
    next_state = np.copy(state)
    next_color = [[0, 1, 1][i] - color[i] for i in range(3)]
    next_state_index = np.where(S == next_state)[0][0]
    
    if(len(get_available(next_state)) == 0):
        reward = 1
        return next_state_index, reward, next_color
    elif(is_terminal_by(next_state, color)):
        reward = 2
        return next_state_index, reward, next_color
    elif(is_terminal_by(next_state, next_color)):
        reward = -1
        return next_state_index, reward, next_color
    
    
    
    return next_state_index, reward, next_color



def get_winner(state):
    if(len(get_available(state)) == 0):
        return 0
    elif(is_terminal_by(state, MY_COLOR)):
        return 1
    elif(is_terminal_by(state, ADVERSARY_COLOR)):
        return -1
    
def reset():
    return s_start

def is_terminal_by(state_index : int, color = MY_COLOR):
    state = S[state_index]
    return np.array([len(np.unique(state[[0,1,2]], axis=0)) == 1 and np.array_equal(state[0],color),
    len(np.unique(state[[3, 4, 5]], axis=0)) == 1 and np.array_equal(state[3], color),
    len(np.unique(state[[6, 7, 8]], axis=0)) == 1 and np.array_equal(state[6], color),
    len(np.unique(state[[0, 3, 6]], axis=0)) == 1 and np.array_equal(state[0], color),
    len(np.unique(state[[1, 4, 7]], axis=0)) == 1 and np.array_equal(state[1], color),
    len(np.unique(state[[2, 5, 8]], axis=0)) == 1 and np.array_equal(state[2], color),
    len(np.unique(state[[0, 4, 8]], axis=0)) == 1 and np.array_equal(state[0], color),
    len(np.unique(state[[2, 4, 6]], axis=0)) == 1 and np.array_equal(state[2], color),]).any() or (len(get_available(state_index)) == 0)

def is_terminal(state : int):
    return is_terminal_by(state, MY_COLOR) or is_terminal_by(state, ADVERSARY_COLOR)

#### definire le policy random uniform

In [445]:
def tabular_random_uniform_policy(state_size: int, action_size: int) -> np.ndarray:
    assert action_size > 0
    return np.ones((state_size, 2, action_size)) / action_size

#### definir un utile pour simules des step

In [446]:
def step_until_the_end_of_the_episode_and_generate_trajectory(
        s0: int,
        c0: int,
        pi: np.ndarray,
        step_func: Callable,
        is_terminal_func: Callable,
        max_steps: int = 10
) -> ([int], [int], [int], [float]):
    s_list = []
    a_list = []
    r_list = []
    c_list = []
    st = s0
    ct = c0
    actions = np.arange(pi.shape[2])
    step = 0
    while not is_terminal_func(st) and step < max_steps:
        at = np.random.choice(actions, p=pi[st, (ct.index(1) - 1)])
        st_p, rt_p, ct_p = step_func(st, at, ct)
        s_list.append(st)
        a_list.append(at)
        c_list.append(ct)
        r_list.append(rt_p)
        st = st_p
        step += 1

    return s_list, a_list, c_list, r_list

#### Implement l'algorithms monte carlo es

In [447]:
def monte_carlo_es(
        s0 : int,
        c0 : np.ndarray,
        states_count: int,
        actions_count: int,
        step_func: Callable,
        is_terminal_func: Callable,
        max_episodes: int = 1000,
        max_steps_per_episode: int = 10,
        gamma: float = 0.99
) -> (np.ndarray, np.ndarray):
    pi = tabular_random_uniform_policy(states_count, actions_count)
    
    states = np.arange(states_count)
    actions = get_available(s0)

    Q = np.random.random((states_count, 2, actions_count))

    for s in range(states_count):
        if is_terminal_func(s):
            Q[s, :] = 0.0
            pi[s, :] = 0.0
            
        availble_cells = get_available(s)
        for c in range(2):
            for a in range(actions_count):
                if(a not in availble_cells):
                    pi[s, c, a] = 0.0
                    Q[s, c, a] = 0.0

    returns = np.zeros((states_count, 2, actions_count))
    returns_count = np.zeros((states_count, 2, actions_count))

    for episode_id in range(max_episodes):
        s_temp = s0
        
        a0 = np.random.choice(actions)
        
        s1, r1, c1 = step_func(s_temp, a0, c0)

        s_list, a_list, c_list , r_list = step_until_the_end_of_the_episode_and_generate_trajectory(s1, c1, pi, step_func,
                                                                                                  is_terminal_func,
                                                                                                  max_steps_per_episode)
        s_list.insert(0, s_temp)
        a_list.insert(0, a0)
        c_list.insert(0, c0)
        r_list.insert(0, 1)

        G = 0.0
        for t in reversed(range(len(s_list))):
            G = gamma * G + r_list[t]
            st = s_list[t]
            at = a_list[t]
            ct = c_list[t]
            # cr = [[0, 1, 1][i] - ct[i] for i in range(3)]
            if (st, ct, at) in zip(s_list[0:t], c_list[0:t], a_list[0:t]):
                continue

            returns[st, (ct.index(1) - 1), at] += G
            returns_count[st, (ct.index(1) - 1), at] += 1
            Q[st,(ct.index(1) - 1), at] = returns[st, (ct.index(1) - 1), at] / returns_count[st, (ct.index(1) - 1), at]
            pi[st, (ct.index(1) - 1), :] = 0.0
            pi[st, (ct.index(1) - 1), np.argmax(Q[st, (ct.index(1) - 1), :])] = 1.0
    return Q, pi

#### test l'algorithms monte_carlo_es

In [448]:
t1 = time.time()
s_start = np.array([[1,0,0],[0,1,0],[0,0,1],[0,1,0],[0,1,0],[0,0,1],[0,0,1],[1,0,0],[1,0,0]])
s_start = np.where(S==s_start)[0][0]
Q, Pi = monte_carlo_es(s_start, c_start, len(S), len(A), step, is_terminal,
                                                      max_episodes=1000, max_steps_per_episode=10)

print(Q[s_start, (c_start.index(1) - 1)])
print(Pi[s_start, (c_start.index(1) - 1)])
print(f"le time d'execution : {time.time() - t1}")

[10.46617457 10.46617457 10.46617457 10.46617457 10.46617457 10.46617457
 10.46617457 10.46617457 10.46617457]
[0. 1. 0. 0. 0. 0. 0. 0. 0.]
le time d'execution : 96.46855568885803


#### Implement l'algorithms on policy first visit monte carlo

In [449]:
def on_policy_first_visit_monte_carlo(
        s0 : int,
        c0 : np.ndarray,
        states_count: int,
        actions_count: int,
        reset_func: Callable,
        step_func: Callable,
        is_terminal_func: Callable,
        max_episodes: int = 1000,
        max_steps_per_episode: int = 10,
        gamma: float = 0.99,
        epsilon: float = 0.1
) -> (np.ndarray, np.ndarray):
    pi = tabular_random_uniform_policy(states_count, actions_count)
    states = np.arange(states_count)
    actions = np.arange(actions_count)

    Q = np.random.random((states_count, actions_count))

    for s in range(states_count):
        if is_terminal_func(s):
            Q[s, :] = 0.0
            pi[s, :] = 0.0
            
        availble_cells = get_available(s)
        for c in range(2):
            for a in range(actions_count):
                if(a not in availble_cells):
                    pi[s, c, a] = 0.0
                    Q[s, c, a] = 0.0

    returns = np.zeros((states_count, actions_count))
    returns_count = np.zeros((states_count, actions_count))

    for episode_id in range(max_episodes):
        s0 = reset_func()
        
        s_list, a_list, c_list , r_list = step_until_the_end_of_the_episode_and_generate_trajectory(s1, c1, pi, step_func,
                                                                                                is_terminal_func,
                                                                                                max_steps_per_episode)
        s_list, a_list, _, r_list = step_until_the_end_of_the_episode_and_generate_trajectory(s0, pi, step_func,
                                                                                              is_terminal_func,
                                                                                              max_steps_per_episode)

        G = 0.0
        for t in reversed(range(len(s_list))):
            G = gamma * G + r_list[t]
            st = s_list[t]
            at = a_list[t]
            ct = c_list[t]
            if (st, ct, at) in zip(s_list[0:t],c_list[0:t] a_list[0:t]):
                continue

            returns[st, (ct.index(1) - 1),at] += G
            returns_count[st, (ct.index(1) - 1),at] += 1
            Q[st, (ct.index(1) - 1),at] = returns[st, (ct.index(1) - 1),at] / returns_count[st, (ct.index(1) - 1),at]
            pi[st, (ct.index(1) - 1), :] = epsilon / actions_count
            pi[st, (ct.index(1) - 1), np.argmax(Q[st, (ct.index(1) - 1), :])] = 1.0 - epsilon + epsilon / actions_count
    return Q, pi

SyntaxError: invalid syntax (<ipython-input-449-25b8873ee143>, line 51)

#### test l'algorithms on policy first visit monte carlo

In [None]:
t1 = time.time()
Q, Pi = on_policy_first_visit_monte_carlo(len(S), len(A),
                                            reset,
                                            step,
                                            is_terminal,
                                            max_episodes=10000, max_steps_per_episode=100)
print(Q)
print(Pi)
print(f"le time d'execution : {time.time() - t1}")

#### Implement l'algorithms off policy monte carlo control

In [None]:
def off_policy_monte_carlo_control(
        states_count: int,
        actions_count: int,
        reset_func: Callable,
        step_func: Callable,
        is_terminal_func: Callable,
        max_episodes: int = 1000,
        max_steps_per_episode: int = 10,
        gamma: float = 0.99,
        epsilon: float = 0.1,
        epsilon_greedy_behaviour_policy: bool = False
) -> (np.ndarray, np.ndarray):
    b = tabular_random_uniform_policy(states_count, actions_count)
    pi = tabular_random_uniform_policy(states_count, actions_count)
    states = np.arange(states_count)

    Q = np.random.random((states_count, actions_count))

    for s in states:
        if is_terminal_func(s):
            Q[s, :] = 0.0
            pi[s, :] = 0.0
        pi[s, :] = 0
        pi[s, np.argmax(Q[s, :])] = 1.0

    C = np.zeros((states_count, actions_count))

    for episode_id in range(max_episodes):
        if epsilon_greedy_behaviour_policy:
            for s in states:
                b[s, :] = epsilon / actions_count
                b[s, np.argmax(Q[s, :])] = 1.0 - epsilon + epsilon / actions_count

        s0 = reset_func()

        s_list, a_list, _, r_list = step_until_the_end_of_the_episode_and_generate_trajectory(s0, b, step_func,
                                                                                              is_terminal_func,
                                                                                              max_steps_per_episode)

        G = 0.0
        W = 1
        for t in reversed(range(len(s_list))):
            G = gamma * G + r_list[t]
            st = s_list[t]
            at = a_list[t]
            C[st, at] += W

            Q[st, at] += W / C[st, at] * (G - Q[st, at])
            pi[st, :] = 0
            pi[st, np.argmax(Q[st, :])] = 1.0
            if np.argmax(Q[st, :]) != at:
                break
            W = W / b[st, at]
    return Q, pi

#### test l'algorithms off policy monte carlo control

In [None]:
t1 = time.time()
Q, Pi = off_policy_monte_carlo_control(len(S), len(A),
                                           reset,
                                           step,
                                           is_terminal,
                                           max_episodes=10000, max_steps_per_episode=100)
print(Q)
print(Pi)
print(f"le time d'execution : {time.time() - t1}")