In [2]:
import sys
sys.path.append('..')
import markov

# print(markov.Agents.Agent())

TypeError: Agent.__init__() missing 1 required positional argument: 'params'

In [3]:
import gym 
from gym import Space 
import pygame
from gym.utils.play import play
import json
import time
import numpy as np

from functools import reduce
import networkx as nx
import matplotlib.pyplot as plt


def dump(mixed, indent = None):
    print(json.dumps(mixed, indent=indent))


In [4]:
class FrozenLakeAgent:
    def __init__(
            self, 
            env : gym.Env,
            log : bool = False,
    ):
        self.env = env
        self.log = log

    def GetAction(self, observation):
        return self.env.sample()
    
    def ApplyReward(self, reward, action, old_state, new_state):
        raise Exception("Method not implemented : ApplyReward(self, reward, action, old_state, new_state)")
    
    def Render(self):
        raise Exception("Method not implemented : Render(self)")


class QTableAgent(FrozenLakeAgent):
    def __init__(
            self, 
            env : gym.Env,
            log : bool = False,
            alpha   = 0.1,      # Learning rate
            gamma   = 0.99,     # Discount factor
            epsilon = 0.1,      # Exploration rate
    ):
        super().__init__(env, log)
        self.Q = np.zeros([env.observation_space.n, env.action_space.n])
        self.alpha      = alpha
        self.gamma      = gamma
        self.epsilon    = epsilon
        self.fig, self.ax = plt.subplots()  # Initialize a figure and axis for plotting
        self.layout = None
        # plt.ion()  # Turn on interactive mode

        self.states_position_mapping = reduce(
            lambda acc, s: {**acc, s: ( s % 4, s // 4)},
            range(self.env.observation_space.n),
            dict()
        )
        self.action_position_functions = {
            0: lambda s: s if (s%4 == 0) else s - 1, # left
            1: lambda s: s if (s+4 > 15) else s+4, # down
            2: lambda s: s if (s%4 == 3) else s + 1, # right
            3: lambda s: s if (s-4 < 0) else s - 4, # up
        } 
        
    def debug(self, msg):
        if self.log:
            print(msg)
    
    def GetAction(self, observation):
        if np.random.uniform(0, 1) < self.epsilon:
            action = self.env.action_space.sample()  # Explore action space
        else:
            action = np.argmax(self.Q[observation])  # Exploit learned values
        return action
    
    def ApplyReward(self, reward, action, old_state, new_state):
        # Q-learning algorithm
        next_max = np.max(self.Q[new_state]) 
        self.Q[old_state, action] = self.Q[old_state, action] + self.alpha * (reward + self.gamma * next_max - self.Q[old_state, action])
        return
    
    def Render(self):
        self.ax.clear()
        G = nx.DiGraph()
        p_best_action = 1 - self.epsilon

        for state, pos in self.states_position_mapping.items():
            G.add_node(str(pos), style='dashed')

        for state, pos in self.states_position_mapping.items():
            u = str(pos)

            best_action = -1 if np.max(self.Q[state])  == 0 else np.argmax(self.Q[state])
            for action in range(self.env.action_space.n):
                v = str(
                    self.states_position_mapping[
                        self.action_position_functions[action](state)
                    ]
                )
                if v == u:
                    continue
                
                label = f"{self.Q[state, action]:.2f}"

                # if action == best_action:
                #     label = f"{p_best_action:.2f}"
                # else:
                #     label = ''

                G.add_edge(u, v, label=label)
        
        if self.layout == None:
            self.layout = nx.spring_layout(G)
        nx.draw(G, self.layout, with_labels=True, node_size=2000, node_color='lightblue', font_size=10, font_weight='bold', edge_color='gray')
        edge_labels = nx.get_edge_attributes(G, 'label')
        nx.draw_networkx_edge_labels(G, self.layout, edge_labels=edge_labels)
        self.fig.canvas.draw()
        plt.show()

In [5]:
import math

class FrozenLakeRewardWrapper(gym.Wrapper):
    def __init__(
        self, 
        env, 
        distance_penalty_rate   = 1,
        immobility_penalty      = 1,
        fall_penalty            = 1,
        win_reward              = 1,
        map_size                = (4,4),
        logs = False,
    ):
        super().__init__(env)
        self.immobility_penalty     = immobility_penalty
        self.distance_penalty_rate  = distance_penalty_rate
        self.fall_penalty           = fall_penalty
        self.win_reward             = win_reward
        self.map_size               = map_size
        self.logs           = logs
        self.current_state  = None
        self.steps_count    = 0

    def log(self, msg):
        if self.log:
            print(msg)

    def reset(self, **kwargs):
        self.current_state = self.env.reset(**kwargs)
        return self.current_state

    def step(self, action):
        self.steps_count += 1

        next_state, reward, terminated, truncated, info = self.env.step(action)

        reward              = self.compute_reward(next_state, reward, terminated, truncated, info)

        self.current_state  = next_state 
        return next_state, reward, terminated, truncated, info

    def compute_reward(self, next_state, reward, terminated, truncated, info):
        reward = reward * self.win_reward 
        penalty = 0
        
        if truncated:
            self.log("Truncated")
            reward = - self.fall_penalty

        elif terminated:
            self.log("Terminated")

        elif next_state == self.current_state:
            self.log("Immobility penalty")
            penalty = self.immobility_penalty

        else:
            max_distance = math.sqrt( self.map_size[0]**2 + self.map_size[1]**2 )
            x, y = next_state % self.map_size[0], next_state // self.map_size[1] 
            distance_to_arrival = math.sqrt( (4-x)**2 + (4-y)**2 )
            self.log(f"({x},{y}) => distance is {distance_to_arrival} ")
            reward = self.distance_penalty_rate * distance_to_arrival / max_distance
        
        return (reward - penalty) / self.steps_count
  

In [18]:
episodes = 10000  # Total episodes to train on
seed = 42
np.random.seed(seed)

env = gym.make('FrozenLake-v1', is_slippery=True)
# env = FrozenLakeRewardWrapper(
#     env=env,
#     fall_penalty=0,
#     distance_penalty_rate=0.5
# )


{"action_space": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9], "observation_space": [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]}


  if not isinstance(terminated, (bool, np.bool8)):


In [4]:
import sys, gym
sys.path.append('..')
from markov.Policies import ExplorationExploitationPolicy, RandomPolicy, QPolicy
from markov.Training import Agent

training_env = gym.make('FrozenLake-v1', is_slippery=True)
policy = ExplorationExploitationPolicy(
    epsilon             = 0.4,
    exploration_policy  = RandomPolicy(),
    exploitation_policy = QPolicy(alpha = 0.1, gamma = 0.9)
)
agent = Agent(
    training_env        = training_env,
    experience_policy   = policy,
    learning_policies   = { 
        "Q":  policy
    },
    experience_buffer_size = 1
)
agent.Train(epochs = 1000)

inference_env = gym.make('FrozenLake-v1', render_mode="human", is_slippery=True)
agent.Infere(
    env = inference_env
)


In [5]:
training_env.close()
inference_env.close()

In [None]:

env.action_space.seed(seed)
dump({"action_space":env.action_space.to_jsonable(sample_n=range(10)), "observation_space":env.observation_space.to_jsonable(range(10))})

agent = QTableAgent(
    env=env,
    epsilon=0.4,
    # log=True
)

def epoch(agent : FrozenLakeAgent, env : gym.Env):
    old_state, info = env.reset(seed=seed)

    while True:
        action = agent.GetAction(old_state)
    
        new_state, reward, terminated, truncated, info = env.step(action)
        
        agent.ApplyReward(
            reward      = reward,
            old_state   = old_state,
            action      = action,
            new_state   = new_state,
        )
        
        old_state = new_state
        
        if terminated or truncated:
            break

for i in range(episodes):
    epoch(agent, env)
    


In [19]:
env = gym.make('FrozenLake-v1', render_mode="human", is_slippery=True)
agent.epsilon = 0
for i in range(10):
    epoch(agent, env)
agent.Render()

In [2]:
env.close()

NameError: name 'env' is not defined