In [None]:
import collections
import numpy as np
import gymnasium as gym
import plotly.graph_objects as go

from IPython.display import Video

In [None]:
env = gym.make('FrozenLake-v1', desc=None, map_name="8x8", is_slippery=False)

In [None]:
def play_env(env, agent, record=False):
    nb_step = 0
    terminated = False
    observation, info = env.reset()

    if record:
        env.start_video_recorder()
        env.render()

    while not terminated:
        action = agent.action(observation)

        new_observation, reward, terminated, truncated, info = env.step(action)

        if record:
            env.render()

        agent.observe(observation, new_observation, action, reward, terminated)

        observation = new_observation

        nb_step += 1
    
    agent.estimating()

    return reward, nb_step

In [None]:
class Dyna():

    def __init__(self, action_space, gamma, alpha, policy, number_of_model_repetition):
        self.gamma  = gamma
        self.alpha = alpha
        self.policy = policy
        self.number_of_model_repetition = number_of_model_repetition

        self.state_action_values = collections.defaultdict(action_space)
        self.model = collections.defaultdict(lambda: {})

    def action(self, state):
        state_action_value = self.state_action_values[state]
        return self.policy(state_action_value)
    
    def observe(self, state, next_state, action, reward, terminated):
        if terminated:
            self.state_action_values[state][action] += self.alpha * (reward - self.state_action_values[state][action])
            self.next_action = None
        else:
            self.state_action_values[state][action] += self.alpha * (
                reward + self.gamma * max(self.state_action_values[next_state]) - self.state_action_values[state][action]
            )
            self.model[state][action] = (reward, next_state)
        
        for i in range(self.number_of_model_repetition):
            random_state = np.random.choice(list(self.model.keys()))
            random_action = np.random.choice(list(self.model[random_state].keys()))

            random_next_reward, random_next_state = self.model[random_state][random_action]

            self.state_action_values[random_state][random_action] += self.alpha * (
                random_next_reward + self.gamma * max(self.state_action_values[random_next_state]) - self.state_action_values[random_state][random_action]
            )
    
    def estimating(self):
        return

In [None]:
def build_action_space_exploring_start(env):
    return lambda: [0.5] * env.action_space.n

def epsilon_greedy_policy(state_action_value, epsilon=0.1):
    take_random_action_prob = np.random.uniform(0, 1)
    if take_random_action_prob < epsilon:
        action = np.random.randint(0, len(state_action_value))
    else:
        action = np.argmax(state_action_value)
    
    return action

agent = Dyna(action_space=build_action_space_exploring_start(env), gamma=0.99, alpha=0.1, policy=epsilon_greedy_policy, number_of_model_repetition=50)

In [None]:
buffer_size = 10
rewards = []
steps = []
epsilons = []

for i in range(150):
    reward, nb_step = play_env(env, agent)

    rewards.append(reward)
    steps.append(nb_step)
    # epsilons.append(epsilon_greedy_policy.epsilon)

In [None]:
len(agent.state_action_values)

In [None]:
fig = go.Figure()

fig.add_trace(go.Scatter(
    x=[i for i in range(len(rewards))],
    y=rewards,
))

fig.show()

In [None]:
fig = go.Figure()

fig.add_trace(go.Scatter(
    x=[i for i in range(len(steps))],
    y=steps,
))

print("Total nb steps:", sum(steps))

fig.show()

In [None]:
fig = go.Figure()

fig.add_trace(go.Scatter(
    x=[i for i in range(len(epsilons))],
    y=epsilons,
))

fig.show()

In [None]:
env = gym.make('FrozenLake-v1', render_mode="rgb_array", desc=None, map_name="8x8", is_slippery=False)
video_env = gym.wrappers.RecordVideo(env=env, video_folder="../videos", name_prefix="test-video")

reward = play_env(video_env, agent)

video_env.close()

print(reward)