In [1]:
# Pi(s,a)
from dataclasses import dataclass
from typing import Dict
import numpy as np


In [2]:
Policy = Dict[int, Dict[int, float]]

# V(s)
ValueFunction = Dict[int, float]

# Q(s,a)
ActionValueFunction = Dict[int, Dict[int, float]]


# Pi(s,a) and V(s)
@dataclass
class PolicyAndValueFunction:
    pi: Policy
    v: ValueFunction


# Pi(s,a) and Q(s,a)
@dataclass
class PolicyAndActionValueFunction:
    pi: Policy
    q: ActionValueFunction


In [3]:

class MDPEnv:
    def states(self) -> np.ndarray:
        pass

    def actions(self) -> np.ndarray:
        pass

    def rewards(self) -> np.ndarray:
        pass

    def is_state_terminal(self, s: int) -> bool:
        pass

    def transition_probability(self, s: int, a: int, s_p: int, r: float) -> float:
        pass

    def view_state(self, s: int):
        pass


class SingleAgentEnv:
    def state_id(self) -> int:
        pass

    def is_game_over(self) -> bool:
        pass

    def act_with_action_id(self, action_id: int):
        pass

    def score(self) -> float:
        pass

    def available_actions_ids(self) -> np.ndarray:
        pass

    def reset(self):
        pass

    def view(self):
        pass

    def reset_random(self):
        pass


class DeepSingleAgentWithDiscreteActionsEnv:
    def state_description(self) -> np.ndarray:
        pass

    def state_description_length(self) -> int:
        pass

    def max_actions_count(self) -> int:
        pass

    def is_game_over(self) -> bool:
        pass

    def act_with_action_id(self, action_id: int):
        pass

    def score(self) -> float:
        pass

    def available_actions_ids(self) -> np.ndarray:
        pass

    def reset(self):
        pass

In [4]:
class LineWorld(MDPEnv):
    def __init__(self, cells_count: int):
        self.cells_count = cells_count
        self.__states = np.arange(self.cells_count)
        self.__actions = np.array([0, 1])
        self.__rewards = np.array([-1, 0, 1])
        self.probality = self.probality_setup()

    def probality_setup(self):
        p = np.zeros((len(self.__states), len(self.__actions), len(self.__states), len(self.__rewards)))
        for s in range(1, self.cells_count - 2):
            p[s, 1, s + 1, 1] = 1.0

        for s in range(2, self.cells_count - 1):
            p[s, 0, s - 1, 1] = 1.0

        p[self.cells_count - 2, 1, self.cells_count - 1, 2] = 1.0
        p[1, 0, 0, 0] = 1.0
        return p

    def states(self) -> np.ndarray:
        return self.__states

    def actions(self) -> np.ndarray:
        return self.__actions

    def rewards(self) -> np.ndarray:
        return self.__rewards

    def is_state_terminal(self, s: int) -> bool:
        return s == self.cells_count - 1 or s == 0

    def transition_probability(self, s: int, a: int, s_p: int, r: float) -> float:
        return self.probality[s, a, s_p, r]

    def view_state(self, s: int):
        pass



In [5]:

class GridWorld(MDPEnv):
    def __init__(self, lines: int, columns: int):
        self.lines = lines
        self.columns = columns
        self.cells_count = lines * columns
        self.negative_terminal = 4
        self.positive_terminal = self.cells_count - 1
        self.__states = np.arange(self.cells_count)
        self.__actions = np.array([0, 1, 2, 3])
        self.__rewards = np.array([-1, 0, 1])
        self.probality = self.probality_setup()

    def probality_setup(self):
        p = np.zeros((len(self.__states), len(self.__actions), len(self.__states), len(self.__rewards)))
        for line in range(0, self.lines):
            for column in range(0, self.columns - 1):
                s = line * self.columns + column
                if s != self.negative_terminal and s != self.positive_terminal:
                    if s + 1 == self.positive_terminal:
                        p[s, 1, s + 1, 2] = 1.0
                    elif s + 1 == self.negative_terminal:
                        p[s, 1, s + 1, 0] = 1.0
                    else:
                        p[s, 1, s + 1, 1] = 1.0

            for column in range(1, self.columns):
                s = line * self.columns + column
                if s != self.positive_terminal and s != self.negative_terminal:
                    if s - 1 == self.negative_terminal:
                        p[s, 0, s - 1, 0] = 1.0
                    elif s - 1 == self.positive_terminal:
                        p[s, 0, s - 1, 2] = 1.0
                    else:
                        p[s, 0, s - 1, 1] = 1.0

        for column in range(0, self.columns):
            for line in range(0, self.lines - 1):
                s = self.columns * line + column
                s2 = self.columns * (line + 1) + column
                # up
                if s2 != self.positive_terminal and s2 != self.negative_terminal:
                    if s == self.negative_terminal:
                        p[s2, 2, s, 0] = 1.0
                    elif s == self.positive_terminal:
                        p[s2, 2, s, 2] = 1.0
                    else:
                        p[s2, 2, s, 1] = 1.0

                # down
                if s != self.negative_terminal and s != self.positive_terminal:
                    if s2 == self.positive_terminal:
                        p[s, 3, s2, 2] = 1.0
                    elif s2 == self.negative_terminal:
                        p[s, 3, s2, 0] = 1.0
                    else:
                        p[s, 3, s2, 1] = 1.0
        return p

    def states(self) -> np.ndarray:
        return self.__states

    def actions(self) -> np.ndarray:
        return self.__actions

    def rewards(self) -> np.ndarray:
        return self.__rewards

    def is_state_terminal(self, s: int) -> bool:
        return s == self.positive_terminal or s == self.negative_terminal

    def transition_probability(self, s: int, a: int, s_p: int, r: float) -> float:
        return self.probality[s, a, s_p, r]

    def view_state(self, s: int):
        pass

In [6]:

class MDPEnv:
    def states(self) -> np.ndarray:
        pass

    def actions(self) -> np.ndarray:
        pass

    def rewards(self) -> np.ndarray:
        pass

    def is_state_terminal(self, s: int) -> bool:
        pass

    def transition_probability(self, s: int, a: int, s_p: int, r: float) -> float:
        pass

    def view_state(self, s: int):
        pass


class SingleAgentEnv:
    def state_id(self) -> int:
        pass

    def is_game_over(self) -> bool:
        pass

    def act_with_action_id(self, action_id: int):
        pass

    def score(self) -> float:
        pass

    def available_actions_ids(self) -> np.ndarray:
        pass

    def reset(self):
        pass

    def view(self):
        pass

    def reset_random(self):
        pass


class DeepSingleAgentWithDiscreteActionsEnv:
    def state_description(self) -> np.ndarray:
        pass

    def state_description_length(self) -> int:
        pass

    def max_actions_count(self) -> int:
        pass

    def is_game_over(self) -> bool:
        pass

    def act_with_action_id(self, action_id: int):
        pass

    def score(self) -> float:
        pass

    def available_actions_ids(self) -> np.ndarray:
        pass

    def reset(self):
        pass

# Dynamic programming 

In [7]:

def policy_evaluation_on_line_world() -> ValueFunction:
    """
    Creates a Line World of 7 cells (leftmost and rightmost are terminal, with -1 and 1 reward respectively)
    Launches a Policy Evaluation Algorithm in order to find the Value Function of a uniform random policy
    Returns the Value function (V(s)) of this policy
    """
    env = LineWorld(7)
    pi = np.ones((len(env.states()), len(env.actions())))
    pi /= len(env.actions())

    theta = 0.00001
    gamma = 1.0

    V = np.zeros((len(env.states()),))
    while True:
        delta = 0
        for s in env.states():
            old_v = V[s]
            V[s] = 0.0
            for a in env.actions():
                for s_next in env.states():
                    for r_idx, r in enumerate(env.rewards()):
                        V[s] += pi[s, a] * env.transition_probability(s, a, s_next, r_idx) * (r + gamma * V[s_next])
            delta = max(delta, abs(V[s] - old_v))

        if delta < theta:
            break

    return dict(enumerate(V.flatten(), 1))


def policy_iteration_on_line_world() -> PolicyAndValueFunction:
    """
    Creates a Line World of 7 cells (leftmost and rightmost are terminal, with -1 and 1 reward respectively)
    Launches a Policy Iteration Algorithm in order to find the Optimal Policy and its Value Function
    Returns the Policy (Pi(s,a)) and its Value Function (V(s))
    """
    env = LineWorld(7)
    V = np.zeros((len(env.states()),))
    pi = np.ones((len(env.states()), len(env.actions())))
    pi /= len(env.actions())

    theta = 0.00001
    gamma = 1.0

    while True:
        while True:
            delta = 0
            for s in env.states():
                old_v = V[s]
                V[s] = 0.0
                for a in env.actions():
                    for s_next in env.states():
                        for r_idx, r in enumerate(env.rewards()):
                            V[s] += pi[s, a] * env.transition_probability(s, a, s_next, r_idx) * (r + gamma * V[s_next])
                delta = max(delta, abs(V[s] - old_v))

            if delta < theta:
                break

        policy_stable = True
        for s in env.states():
            old_policy = pi[s, :]

            best_a = None
            best_a_value = None
            for a in env.actions():
                a_value = 0
                for s_p in env.states():
                    for r_idx, r in enumerate(env.rewards()):
                        a_value += env.transition_probability(s, a, s_p, r_idx) * (r + gamma * V[s_p])
                if best_a_value is None or best_a_value < a_value:
                    best_a_value = a_value
                    best_a = a

            pi[s, :] = 0.0
            pi[s, best_a] = 1.0
            if not np.array_equal(pi[s], old_policy):
                policy_stable = False

        if policy_stable:
            break

    final_pi = {}
    for indice, value in enumerate(pi):
        final_pi[indice] = dict(enumerate(value.flatten(), 1))

    return PolicyAndValueFunction(final_pi, dict(enumerate(V.flatten(), 1)))


def value_iteration_on_line_world() -> PolicyAndValueFunction:
    """
    Creates a Line World of 7 cells (leftmost and rightmost are terminal, with -1 and 1 reward respectively)
    Launches a Value Iteration Algorithm in order to find the Optimal Policy and its Value Function
    Returns the Policy (Pi(s,a)) and its Value Function (V(s))
    """
    env = LineWorld(7)
    V = np.zeros((len(env.states()),))
    pi = np.ones((len(env.states()), len(env.actions())))
    pi /= len(env.actions())
    pi2 = pi.copy()

    theta = 0.00001
    gamma = 1.0

    while True:
        delta = 0
        for s in env.states():
            old_v = V[s]
            V[s] = 0.0
            best_a_value = None
            best_a = None
            for a in env.actions():
                a_value = 0
                for s_p in env.states():
                    for r_idx, r in enumerate(env.rewards()):
                        pre_a_value = env.transition_probability(s, a, s_p, r_idx) * (r + gamma * V[s_p])
                        a_value += pre_a_value
                        V[s] += pi[s, a] * pre_a_value
                if best_a_value is None or best_a_value < a_value:
                    best_a_value = a_value
                    best_a = a

            delta = max(delta, abs(V[s] - old_v))
            pi2[s, :] = 0.0
            pi2[s, best_a] = 1.0

        if delta < theta:
            break

    final_pi = {}
    for indice, value in enumerate(pi2):
        final_pi[indice] = dict(enumerate(value.flatten(), 1))

    return PolicyAndValueFunction(final_pi, dict(enumerate(V.flatten(), 1)))


def policy_evaluation_on_grid_world() -> ValueFunction:
    """
    Creates a Grid World of 5x5 cells (upper rightmost and lower rightmost are terminal, with -1 and 1 reward respectively)
    Launches a Policy Evaluation Algorithm in order to find the Value Function of a uniform random policy
    Returns the Value function (V(s)) of this policy
    """
    env = GridWorld(5, 5)
    pi = np.ones((len(env.states()), len(env.actions())))
    pi /= len(env.actions())

    theta = 0.00001
    gamma = 1.0

    V = np.zeros((len(env.states()),))
    while True:
        delta = 0
        for s in env.states():
            old_v = V[s]
            V[s] = 0.0
            for a in env.actions():
                for s_next in env.states():
                    for r_idx, r in enumerate(env.rewards()):
                        V[s] += pi[s, a] * env.transition_probability(s, a, s_next, r_idx) * (r + gamma * V[s_next])
            delta = max(delta, abs(V[s] - old_v))

        if delta < theta:
            break

    return dict(enumerate(V.flatten(), 1))


def policy_iteration_on_grid_world() -> PolicyAndValueFunction:
    """
    Creates a Grid World of 5x5 cells (upper rightmost and lower rightmost are terminal, with -1 and 1 reward respectively)
    Launches a Policy Iteration Algorithm in order to find the Optimal Policy and its Value Function
    Returns the Policy (Pi(s,a)) and its Value Function (V(s))
    """
    env = GridWorld(5, 5)
    V = np.zeros((len(env.states()),))
    pi = np.ones((len(env.states()), len(env.actions())))
    pi /= len(env.actions())

    theta = 0.00001
    gamma = 1.0

    while True:
        while True:
            delta = 0
            for s in env.states():
                old_v = V[s]
                V[s] = 0.0
                for a in env.actions():
                    for s_next in env.states():
                        for r_idx, r in enumerate(env.rewards()):
                            V[s] += pi[s, a] * env.transition_probability(s, a, s_next, r_idx) * (r + gamma * V[s_next])
                delta = max(delta, abs(V[s] - old_v))

            if delta < theta:
                break

        policy_stable = True
        for s in env.states():
            old_policy = pi[s, :]

            best_a = None
            best_a_value = None
            for a in env.actions():
                a_value = 0
                for s_p in env.states():
                    for r_idx, r in enumerate(env.rewards()):
                        a_value += env.transition_probability(s, a, s_p, r_idx) * (r + gamma * V[s_p])
                if best_a_value is None or best_a_value < a_value:
                    best_a_value = a_value
                    best_a = a

            pi[s, :] = 0.0
            pi[s, best_a] = 1.0
            if not np.array_equal(pi[s], old_policy):
                policy_stable = False

        if policy_stable:
            break

    final_pi = {}
    for indice, value in enumerate(pi):
        final_pi[indice] = dict(enumerate(value.flatten(), 1))

    return PolicyAndValueFunction(final_pi, dict(enumerate(V.flatten(), 1)))


def value_iteration_on_grid_world() -> PolicyAndValueFunction:
    """
    Creates a Grid World of 5x5 cells (upper rightmost and lower rightmost are terminal, with -1 and 1 reward respectively)
    Launches a Value Iteration Algorithm in order to find the Optimal Policy and its Value Function
    Returns the Policy (Pi(s,a)) and its Value Function (V(s))
    """
    env = GridWorld(5, 5)
    V = np.zeros((len(env.states()),))
    pi = np.ones((len(env.states()), len(env.actions())))
    pi /= len(env.actions())
    pi2 = pi.copy()

    theta = 0.00001
    gamma = 1.0

    while True:
        delta = 0
        for s in env.states():
            old_v = V[s]
            V[s] = 0.0
            best_a_value = None
            best_a = None
            for a in env.actions():
                a_value = 0
                for s_p in env.states():
                    for r_idx, r in enumerate(env.rewards()):
                        pre_a_value = env.transition_probability(s, a, s_p, r_idx) * (r + gamma * V[s_p])
                        a_value += pre_a_value
                        V[s] += pi[s, a] * pre_a_value
                if best_a_value is None or best_a_value < a_value:
                    best_a_value = a_value
                    best_a = a

            delta = max(delta, abs(V[s] - old_v))
            pi2[s, :] = 0.0
            pi2[s, best_a] = 1.0

        if delta < theta:
            break

    final_pi = {}
    for indice, value in enumerate(pi2):
        final_pi[indice] = dict(enumerate(value.flatten(), 1))

    return PolicyAndValueFunction(final_pi, dict(enumerate(V.flatten(), 1)))


def policy_evaluation_on_secret_env1() -> ValueFunction:
    """
    Creates a Secret Env1
    Launches a Policy Evaluation Algorithm in order to find the Value Function of a uniform random policy
    Returns the Value function (V(s)) of this policy
    """
    env = Env1()
    pi = np.ones((len(env.states()), len(env.actions())))
    pi /= len(env.actions())

    theta = 0.00001
    gamma = 1.0

    V = np.zeros((len(env.states()),))
    while True:
        delta = 0
        for s in env.states():
            old_v = V[s]
            V[s] = 0.0
            for a in env.actions():
                for s_next in env.states():
                    for r_idx, r in enumerate(env.rewards()):
                        V[s] += pi[s, a] * env.transition_probability(s, a, s_next, r_idx) * (r + gamma * V[s_next])
            delta = max(delta, abs(V[s] - old_v))

        if delta < theta:
            break

    return dict(enumerate(V.flatten(), 1))


def policy_iteration_on_secret_env1() -> PolicyAndValueFunction:
    """
    Creates a Secret Env1
    Launches a Policy Iteration Algorithm in order to find the Optimal Policy and its Value Function
    Returns the Policy (Pi(s,a)) and its Value Function (V(s))
    """
    env = Env1()
    V = np.zeros((len(env.states()),))
    pi = np.ones((len(env.states()), len(env.actions())))
    pi /= len(env.actions())

    theta = 0.00001
    gamma = 1.0

    while True:
        while True:
            delta = 0
            for s in env.states():
                old_v = V[s]
                V[s] = 0.0
                for a in env.actions():
                    for s_next in env.states():
                        for r_idx, r in enumerate(env.rewards()):
                            V[s] += pi[s, a] * env.transition_probability(s, a, s_next, r_idx) * (r + gamma * V[s_next])
                delta = max(delta, abs(V[s] - old_v))

            if delta < theta:
                break

        policy_stable = True
        for s in env.states():
            old_policy = pi[s, :]

            best_a = None
            best_a_value = None
            for a in env.actions():
                a_value = 0
                for s_p in env.states():
                    for r_idx, r in enumerate(env.rewards()):
                        a_value += env.transition_probability(s, a, s_p, r_idx) * (r + gamma * V[s_p])
                if best_a_value is None or best_a_value < a_value:
                    best_a_value = a_value
                    best_a = a

            pi[s, :] = 0.0
            pi[s, best_a] = 1.0
            if not np.array_equal(pi[s], old_policy):
                policy_stable = False

        if policy_stable:
            break

    final_pi = {}
    for indice, value in enumerate(pi):
        final_pi[indice] = dict(enumerate(value.flatten(), 1))

    return PolicyAndValueFunction(final_pi, dict(enumerate(V.flatten(), 1)))


def value_iteration_on_secret_env1() -> PolicyAndValueFunction:
    """
    Creates a Secret Env1
    Launches a Value Iteration Algorithm in order to find the Optimal Policy and its Value Function
    Prints the Policy (Pi(s,a)) and its Value Function (V(s))
    """
    env = Env1()
    V = np.zeros((len(env.states()),))
    pi = np.ones((len(env.states()), len(env.actions())))
    pi /= len(env.actions())
    pi2 = pi.copy()

    theta = 0.00001
    gamma = 1.0

    while True:
        delta = 0
        for s in env.states():
            old_v = V[s]
            V[s] = 0.0
            best_a_value = None
            best_a = None
            for a in env.actions():
                a_value = 0
                for s_p in env.states():
                    for r_idx, r in enumerate(env.rewards()):
                        pre_a_value = env.transition_probability(s, a, s_p, r_idx) * (r + gamma * V[s_p])
                        a_value += pre_a_value
                        V[s] += pi[s, a] * pre_a_value
                if best_a_value is None or best_a_value < a_value:
                    best_a_value = a_value
                    best_a = a

            delta = max(delta, abs(V[s] - old_v))
            pi2[s, :] = 0.0
            pi2[s, best_a] = 1.0

        if delta < theta:
            break

    final_pi = {}
    for indice, value in enumerate(pi2):
        final_pi[indice] = dict(enumerate(value.flatten(), 1))

    return PolicyAndValueFunction(final_pi, dict(enumerate(V.flatten(), 1)))



## Result fort Dynamic programming 

In [8]:
    print(policy_evaluation_on_line_world())
    print(policy_iteration_on_line_world())
    print(value_iteration_on_line_world())

    print(policy_evaluation_on_grid_world())
    print(policy_iteration_on_grid_world())
    print(value_iteration_on_grid_world())

    print(policy_evaluation_on_secret_env1())
    print(policy_iteration_on_secret_env1())
    print(value_iteration_on_secret_env1())

{1: 0.0, 2: -0.666686199082779, 3: -0.3333626319575018, 4: -2.9298624168505594e-05, 5: 0.33331135936520695, 6: 0.6666556796826035, 7: 0.0}
PolicyAndValueFunction(pi={0: {1: 1.0, 2: 0.0}, 1: {1: 0.0, 2: 1.0}, 2: {1: 0.0, 2: 1.0}, 3: {1: 0.0, 2: 1.0}, 4: {1: 0.0, 2: 1.0}, 5: {1: 0.0, 2: 1.0}, 6: {1: 1.0, 2: 0.0}}, v={1: 0.0, 2: -0.666686199082779, 3: -0.3333626319575018, 4: -2.9298624168505594e-05, 5: 0.33331135936520695, 6: 0.6666556796826035, 7: 0.0})
PolicyAndValueFunction(pi={0: {1: 1.0, 2: 0.0}, 1: {1: 0.0, 2: 1.0}, 2: {1: 0.0, 2: 1.0}, 3: {1: 0.0, 2: 1.0}, 4: {1: 0.0, 2: 1.0}, 5: {1: 0.0, 2: 1.0}, 6: {1: 1.0, 2: 0.0}}, v={1: 0.0, 2: -0.666686199082779, 3: -0.3333626319575018, 4: -2.9298624168505594e-05, 5: 0.33331135936520695, 6: 0.6666556796826035, 7: 0.0})
{1: -0.012408241005487585, 2: -0.03849208733039875, 3: -0.10946152767210533, 4: -0.3206577559932595, 5: 0.0, 6: -0.011130448543747447, 7: -0.032086147618454595, 8: -0.07868611730403527, 9: -0.1731645441465791, 10: -0.2932932689

NameError: name 'Env1' is not defined

In [None]:
def max_dict(d):
  # returns the argmax (key) and max (value) from a dictionary
  max_key = None
  max_val = float('-inf')
  for k, v in d.items():
    if v > max_val:
      max_val = v
      max_key = k
  return max_key, max_val


# Monte Carlo 

## Monte Carlo Es

In [None]:
def algo_monte_carlo_es(env) -> PolicyAndActionValueFunction:
    max_episodes_count = 10000
    gamma = 0.85

    pi = {}
    q = {}
    returns = {}

    for ep in tqdm(range(max_episodes_count)):
        env.reset()
        S = []
        A = []
        R = []


        while not env.is_game_over():
            s = env.state_id()
            S.append(s)
            available_actions = env.available_actions_ids()

            if s not in pi:
                pi[s] = {}
                q[s] = {}
                returns[s] = {}
                for a in available_actions:
                    pi[s][a] = 1.0 / len(available_actions)
                    q[s][a] = 0.0
                    returns[s][a] = []
            chosen_action = available_actions[np.random.randint(len(available_actions))]
            A.append(chosen_action)

            old_score = env.score()
            env.act_with_action_id(chosen_action)
            r = env.score() - old_score
            R.append(r)

            G = 0
            for t in reversed(range(len(S))):
                G = gamma * G + R[t]

                found = False
                for prev_s, prev_a in zip(S[:t], A[:t]):
                    if prev_s == S[t] and prev_a == A[t]:
                        found = True
                        break
                if found:
                    continue

                if A[t] not in returns[S[t]]:
                    returns[S[t]][A[t]] = []

                returns[S[t]][A[t]].append(G)
                q[S[t]][A[t]] = np.mean(returns[S[t]][A[t]])
                pi[S[t]] = list(q[S[t]].keys())[np.argmax(list(q[S[t]].values()))]

                #max = max_dict(q[s])
                #pi[s][max[0]] = max[1]

                #optimal_a_t = list(q[S[t]].keys())[np.argmax(list(q[S[t]].values()))]
                # pi[S[t]][optimal_a_t] = np.argmax(q[S[t]][optimal_a_t])
                #for a_key in pi[S[t]].keys():
                    # pi[S[t]][a_key] = np.argmax(q[S[t]][a_key])
                    # pi[S[t]][a_key] = np.argmax(q[S[t]][optimal_a_t])

    #for s in pi.keys():
    #    probabilities = np.array(list(pi[s].values()))
    #    probabilities /= probabilities.sum()
    #    for i in range(len(probabilities)):
    #        pi[s][i] = probabilities[i]

    return PolicyAndActionValueFunction(pi, q)


## Monte Carlo On Policy

In [None]:
def algo_on_policy_monte_carlo(env) -> PolicyAndActionValueFunction:
    epsilon = 0.1
    max_episodes_count = 10000
    gamma = 0.9

    pi = {}
    q = {}
    returns = {}

    for it in tqdm(range(max_episodes_count)):
        env.reset()
        S = []
        A = []
        R = []
        while not env.is_game_over():
            s = env.state_id()
            S.append(s)
            available_actions = env.available_actions_ids()

            if s not in pi:
                pi[s] = {}
                q[s] = {}
                returns[s] = {}
                for a in available_actions:
                    pi[s][a] = 1.0 / len(available_actions)
                    q[s][a] = 0.0
                    returns[s][a] = []

            chosen_action = np.random.choice(
                list(pi[s].keys()),
                1,
                False,
                p=list(pi[s].values())
            )[0]
            A.append(chosen_action)
            old_score = env.score()
            env.act_with_action_id(chosen_action)
            r = env.score() - old_score
            R.append(r)

            G = 0

            for t in reversed(range(len(S))):
                G = gamma * G + R[t]
                s_t = S[t]
                a_t = A[t]
                found = False
                for p_s, p_a in zip(S[:t], A[:t]):
                    if s_t == p_s and a_t == p_a:
                        found = True
                        break
                if found:
                    continue

                if a_t not in returns[s_t]:
                    returns[s_t][a_t] = []

                returns[s_t][a_t].append(G)
                q[s_t][a_t] = np.mean(returns[s_t][a_t])
                optimal_a_t = list(q[s_t].keys())[np.argmax(list(q[s_t].values()))]
                available_actions_t_count = len(q[s_t])
                for a_key, q_s_a in q[s_t].items():
                    if a_key == optimal_a_t:
                        pi[s_t][a_key] = 1 - epsilon + epsilon / available_actions_t_count
                    else:
                        pi[s_t][a_key] = epsilon / available_actions_t_count

    return PolicyAndActionValueFunction(pi, q)



## Monte Carlo Off Policy

In [None]:
def algo_off_policy_monte_carlo(env) -> PolicyAndActionValueFunction:
    max_episodes_count = 10000
    gamma = 0.90

    Q = {}
    C = {}
    pi = {}

    for it in tqdm(range(max_episodes_count)):
        env.reset()
        S = []
        A = []
        R = []
        while not env.is_game_over():
            s = env.state_id()
            S.append(s)
            available_actions = env.available_actions_ids()

            if s not in pi:
                pi[s] = {}
                Q[s] = {}
                C[s] = {}
                for a in available_actions:
                    pi[s][a] = 1.0 / len(available_actions)
                    Q[s][a] = 0.0
                    C[s][a] = 0.0

            chosen_action = available_actions[np.random.randint(len(available_actions))]

            A.append(chosen_action)
            old_score = env.score()
            env.act_with_action_id(chosen_action)
            r = env.score() - old_score
            R.append(r)

            G = 0
            W = 1

            for t in reversed(range(len(S))):
                G = gamma * G + R[t]

                s_t = S[t]
                a_t = A[t]

                if a_t not in C[s_t]:
                    C[s_t][a_t] = 0.0

                if a_t not in Q[s_t]:
                    Q[s_t][a_t] = 0.0

                C[s_t][a_t] += W
                Q[s_t][a_t] += (W / (C[s_t][a_t])) * (G - Q[s_t][a_t])

                max = max_dict(Q[s])
                pi[s][max[0]] = max[1]
                # for a_key in pi[s_t].keys():
                #    pi[s_t][a_key] = np.argmax(Q[s_t][a_key])

                optimal_a_t = list(Q[s_t].keys())[np.argmax(list(Q[s_t].values()))]
                if chosen_action != optimal_a_t:
                    break

                W *= 1. / (available_actions[np.random.randint(len(available_actions))] + 1)

    for s in pi.keys():
        probabilities = np.array(list(pi[s].values()))
        probabilities /= probabilities.sum()
        for i in range(len(probabilities)):
            pi[s][i] = probabilities[i]

    return PolicyAndActionValueFunction(pi, Q)


## Result Monte Carlo ES

<img src="img/monte_carlo_es.png"> </img>

## Monte Carlo On Policy

<img src="img/on_policy_monte_carlo.png"> </img>

## Monte Carlo Off Policy

<img src="img/off_policy_monte_carlo.png"> </img>

# Temporal difference

In [None]:
def get_epsilon_best_action(epsilon, available_actions, Q, s):
    available_actions_len = len(available_actions)
    if available_actions_len == 1:
        return available_actions[0]
    elif available_actions_len == 0:
        action_values = list(Q[s].values())
        if len(action_values) > 0:
            best_action_value = np.sort(action_values)[len(action_values)-1]
            best_action = list(Q[s].keys())[list(Q[s].values()).index(best_action_value)]
            return best_action
        else:
            return np.random.randint(8)

    if np.random.uniform(0, 1) > epsilon:
        return available_actions[np.random.randint(available_actions_len)]
    else:
        for i in range(len(list(Q[s].keys())) - 1, 0, -1):
            best_action_value = np.sort(list(Q[s].values()))[i]
            best_action = list(Q[s].keys())[list(Q[s].values()).index(best_action_value)]
            if best_action in available_actions:
                return best_action
        return available_actions[np.random.randint(available_actions_len)]



## Q-Learning

In [None]:

def algo_q_learning(env) -> PolicyAndActionValueFunction:
    alpha = 0.1
    epsilon = 1.0
    gamma = 0.9
    max_iter = 10000

    pi = {}  # learned greedy policy
    b = {}  # behaviour epsilon-greedy policy
    q = {}  # action-value function of pi

    for it in tqdm(range(max_iter)):
        env.reset()

        while not env.is_game_over():
            s = env.state_id()
            available_actions = env.available_actions_ids()
            if s not in pi:
                pi[s] = {}
                q[s] = {}
                b[s] = {}
                for a in available_actions:
                    pi[s][a] = 1.0 / len(available_actions)
                    q[s][a] = 0.0
                    b[s][a] = 1.0 / len(available_actions)

            # actions disponibles differents selon les states
            available_actions_count = len(available_actions)
            optimal_a = list(q[s].keys())[np.argmax(list(q[s].values()))]
            for a_key, q_s_a in q[s].items():
                if a_key == optimal_a:
                    b[s][a_key] = 1 - epsilon + epsilon / available_actions_count
                else:
                    b[s][a_key] = epsilon / available_actions_count

            chosen_action = np.random.choice(
                list(b[s].keys()),
                1,
                False,
                p=list(b[s].values())
            )[0]
            old_score = env.score()
            env.act_with_action_id(chosen_action)
            r = env.score() - old_score
            s_p = env.state_id()
            next_available_actions = env.available_actions_ids()

            if env.is_game_over():
                q[s][chosen_action] += alpha * (r + 0.0 - q[s][chosen_action])
            else:
                if s_p not in pi:
                    pi[s_p] = {}
                    q[s_p] = {}
                    b[s_p] = {}
                    for a in next_available_actions:
                        pi[s_p][a] = 1.0 / len(next_available_actions)
                        q[s_p][a] = 0.0
                        b[s_p][a] = 1.0 / len(next_available_actions)
                q[s][chosen_action] += alpha * (r + gamma * np.max(list(q[s_p].values())) - q[s][chosen_action])

    for s in q.keys():
        optimal_a = list(q[s].keys())[np.argmax(list(q[s].values()))]
        for a_key, q_s_a in q[s].items():
            if a_key == optimal_a:
                pi[s][a_key] = 1.0
            else:
                pi[s][a_key] = 0.0

    return PolicyAndActionValueFunction(pi, q)



## Sarsa

In [None]:
def algo_sarsa(env) -> PolicyAndActionValueFunction:
    max_episodes_count = 10000
    alpha = 0.85
    gamma = 0.95
    epsilon = 0.1

    Q = {}
    pi = {}

    for ep in tqdm(range(max_episodes_count)):

        env.reset()
        S = []
        A = []
        R = []

        s_1 = env.state_id()
        available_actions = env.available_actions_ids()
        if s_1 not in Q:
            pi[s_1] = {}
            Q[s_1] = {}
            for a in available_actions:
                pi[s_1][a] = 1.0 / len(available_actions)
                Q[s_1][a] = 0.0
        action_1 = get_epsilon_best_action(epsilon, available_actions, Q, s_1)

        while not env.is_game_over():
            S.append(s_1)
            available_actions = env.available_actions_ids()

            if s_1 not in Q:
                pi[s_1] = {}
                Q[s_1] = {}
                for a in available_actions:
                    pi[s_1][a] = 1.0 / len(available_actions)
                    Q[s_1][a] = 0.0

            A.append(action_1)

            old_score = env.score()
            env.act_with_action_id(action_1)
            r = env.score() - old_score
            R.append(r)

            s_2 = env.state_id()
            available_actions = env.available_actions_ids()

            if s_2 not in Q:
                Q[s_2] = {}
                pi[s_2] = {}
                for a in available_actions:
                    Q[s_2][a] = 0.0
                    pi[s_2][a] = 1.0 / len(available_actions)

            action_2 = get_epsilon_best_action(epsilon, available_actions, Q, s_2)

            if action_2 not in Q[s_2]:
                Q[s_2][action_2] = 0.0

            target = r + gamma * Q[s_2][action_2]
            Q[s_1][action_1] += alpha * (target - Q[s_1][action_1])

            #for a_key in pi[s_1].keys():
            #    max = np.argmax(Q[s_1][a_key])
            #    pi[s_1][a_key] = max
            s_1 = s_2
            action_1 = action_2

    for s in Q.keys():
        max = max_dict(Q[s])
        pi[s][max[0]] = max[1]
        probabilities = np.array(list(pi[s].values()))
        probabilities /= probabilities.sum()
        for i in range(len(probabilities)):
            pi[s][i] = probabilities[i]

    return PolicyAndActionValueFunction(pi, Q)


## Expected Sarsa

In [None]:
def algo_expected_sarsa(env) -> PolicyAndActionValueFunction:
    alpha = 0.1
    epsilon = 1.0
    gamma = 0.9
    max_iter = 10000

    pi = {}  # learned greedy policy
    b = {}  # behaviour epsilon-greedy policy
    q = {}  # action-value function of pi

    for it in tqdm(range(max_iter)):
        env.reset()

        while not env.is_game_over():
            s = env.state_id()
            available_actions = env.available_actions_ids()
            if s not in pi:
                pi[s] = {}
                q[s] = {}
                b[s] = {}
                for a in available_actions:
                    pi[s][a] = 1.0 / len(available_actions)
                    q[s][a] = 0.0
                    b[s][a] = 1.0 / len(available_actions)

            # actions disponibles differents selon les states
            available_actions_count = len(available_actions)
            optimal_a = list(q[s].keys())[np.argmax(list(q[s].values()))]
            for a_key, q_s_a in q[s].items():
                if a_key == optimal_a:
                    b[s][a_key] = 1 - epsilon + epsilon / available_actions_count
                else:
                    b[s][a_key] = epsilon / available_actions_count

            chosen_action = np.random.choice(
                list(b[s].keys()),
                1,
                False,
                p=list(b[s].values())
            )[0]
            old_score = env.score()
            env.act_with_action_id(chosen_action)
            r = env.score() - old_score
            s_p = env.state_id()
            next_available_actions = env.available_actions_ids()

            if env.is_game_over():
                q[s][chosen_action] += alpha * (r + 0.0 - q[s][chosen_action])
            else:
                if s_p not in pi:
                    pi[s_p] = {}
                    q[s_p] = {}
                    b[s_p] = {}
                    for a in next_available_actions:
                        pi[s_p][a] = 1.0 / len(next_available_actions)
                        q[s_p][a] = 0.0
                        b[s_p][a] = 1.0 / len(next_available_actions)
                sum = 0
                for a in pi[s_p]:
                    sum += pi[s_p][a] * q[s_p][a]
                q[s][chosen_action] += alpha * (r + gamma * sum - q[s][chosen_action])

    for s in q.keys():
        optimal_a = list(q[s].keys())[np.argmax(list(q[s].values()))]
        for a_key, q_s_a in q[s].items():
            if a_key == optimal_a:
                pi[s][a_key] = 1.0
            else:
                pi[s][a_key] = 0.0

    return PolicyAndActionValueFunction(pi, q)



# Result for Q Learning

<img src="img/q_learning.png"> </img>

# Result for Sarsa

<img src="img/sarsa.png"> </img>

# Result for expected Sarsa

<img src="img/ex_sarsa.png"> </img>

# deep reinforcement learning

In [None]:
def episodic_semi_gradient_sarsa(env: DeepSingleAgentWithDiscreteActionsEnv):
    epsilon = 0.1
    gamma = 0.9
    max_episodes_count = 100 if not isinstance(env, PacMan) else 10
    pre_warm = (max_episodes_count / 10) if not isinstance(env, PacMan) else 3

    state_description_length = env.state_description_length()
    max_actions_count = env.max_actions_count()

    q = tf.keras.Sequential([
        tf.keras.layers.Dense(16, activation=tf.keras.activations.tanh,
                              input_dim=(state_description_length + max_actions_count)),
        tf.keras.layers.Dense(1, activation=tf.keras.activations.linear),
    ])

    q.compile(optimizer=tf.keras.optimizers.Adam(), loss=tf.keras.losses.mse)

    for episode_id in tqdm.tqdm(range(max_episodes_count)):
        env.reset()
        round_counter = 0

        while not env.is_game_over():
            round_counter += 1
            s = env.state_description()
            available_actions = env.available_actions_ids()

            if (episode_id < pre_warm) or np.random.uniform(0.0, 1.0) < epsilon:
                chosen_action = np.random.choice(available_actions)
            else:
                all_q_inputs = np.zeros((len(available_actions), state_description_length + max_actions_count))
                for i, a in enumerate(available_actions):
                    all_q_inputs[i] = np.hstack([s, tf.keras.utils.to_categorical(a, max_actions_count)])

                all_q_values = np.squeeze(q.predict(all_q_inputs))
                chosen_action = available_actions[np.argmax(all_q_values)]

            previous_score = env.score()
            env.act_with_action_id(chosen_action)
            r = env.score() - previous_score
            s_p = env.state_description()

            if env.is_game_over():
                target = r
                q_inputs = np.hstack([s, tf.keras.utils.to_categorical(chosen_action, max_actions_count)])
                q.train_on_batch(np.array([q_inputs]), np.array([target]))
                break

            next_available_actions = env.available_actions_ids()

            if episode_id < pre_warm or np.random.uniform(0.0, 1.0) < epsilon:
                next_chosen_action = np.random.choice(next_available_actions)
            else:
                next_chosen_action = None
                next_chosen_action_q_value = None
                for a in next_available_actions:
                    q_inputs = np.hstack([s_p, tf.keras.utils.to_categorical(a, max_actions_count)])
                    q_value = q.predict(np.array([q_inputs]))[0][0]
                    if next_chosen_action is None or next_chosen_action_q_value < q_value:
                        next_chosen_action = a
                        next_chosen_action_q_value = q_value

            next_q_inputs = np.hstack([s_p, tf.keras.utils.to_categorical(next_chosen_action, max_actions_count)])
            next_chosen_action_q_value = q.predict(np.array([next_q_inputs]))[0][0]

            target = r + gamma * next_chosen_action_q_value

            q_inputs = np.hstack([s, tf.keras.utils.to_categorical(chosen_action, max_actions_count)])
            q.train_on_batch(np.array([q_inputs]), np.array([target]))

    return q
