# RL

## Enviroment

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import copy
from matplotlib.animation import FuncAnimation
from matplotlib import rc
import random

rc('animation', html='jshtml')
class MazeGameEnv():
    def __init__(self, board=[['😊', ' ', '😺'],[' ', ' ', ' '],['😺', ' ', '😍']], actions=["up", "down", "left", "right"], actions_moves=[(0,-1),(0,1),(-1,0),(1,0)], values={'': -1, ' ': -1, '😍': 100, '😺': 20}, player="😊", goal="😍"):
        self.board = board
        self.actions = actions
        self.actions_moves = actions_moves
        self.values = values
        self.player = player
        self.goal = goal
        self.goal_position = self.get_pos(self.goal)
        self.board_history = []
        self.board_history.append(copy.deepcopy(self.board))
        self.original_board = [row.copy() for row in self.board]

    def get_pos(self, value):
        for i in range(len(self.board)):
            for j in range(len(self.board[0])):
                if value in self.board[i][j]:
                    return (i, j)

    def reset(self):
        self.board = [row.copy() for row in self.original_board]
        self.goal_position = self.get_pos(self.goal)
        self.board_history = []
        self.board_history.append(copy.deepcopy(self.board))

    def is_finish(self):
        pass

    def is_valid_move(self, state, action):
        x, y = state
        move_x, move_y = self.actions_moves[action]
        new_x = x + move_x
        new_y = y + move_y
        
        if new_x < 0 or new_x >= len(self.board) or new_y < 0 or new_y >= len(self.board):
            return False
        return True
    
    def move(self, state, new_state):
        board = [row.copy() for row in self.original_board]
        x, y = state
        new_x, new_y = new_state
        self.board[x][y] = ' '
        self.board[new_x][new_y] = self.player
    
        return board

    def calculate_reward(self, state):
        return self.values[self.board[state[0]][state[1]]]
    
    def step(self, action):
        state = self.get_pos(self.player)
        if not self.is_valid_move(state, action):
            print("Not valid move")
            return 
        
        x, y = state
        move_x, move_y = self.actions_moves[action]
        new_x = x + move_x
        new_y = y + move_y
        reward = self.calculate_reward((new_x, new_y))
        self.board[x][y] = ' '
        self.board[new_x][new_y] = self.player
        
        done = False
        if self.get_pos(self.player)==self.goal_position:
            done = True

        self.board_history.append(copy.deepcopy(self.board))
        return self.get_pos(self.player), reward, done

    def render(self):
        fig, ax = plt.subplots()
        ax.set_xticks(np.arange(0, len(self.board) + 1, 1))
        ax.set_yticks(np.arange(0, len(self.board) + 1, 1))
        ax.grid(True, color='black')

        # Set limits and reverse y-axis to have (0,0) in top-left
        ax.set_xlim(0, len(self.board))
        ax.set_ylim(0, len(self.board))
        ax.invert_yaxis()

        # Initialize a list of text objects for each cell
        text_objects = []
        for i in range(len(self.board)):
            row = []
            for j in range(len(self.board)):
                text = ax.text(j + 0.5, i + 0.5, '', ha='center', va='center', fontsize=50)
                row.append(text)
            text_objects.append(row)

        # Function to update the board for each frame of the animation
        def update(frame):
            board = self.board_history[frame]
            for i in range(len(self.board)):
                for j in range(len(self.board)):
                    text_objects[i][j].set_text(board[i][j])
            return [item for sublist in text_objects for item in sublist]

        # Create the animation
        ani = FuncAnimation(fig, update, frames=len(self.board_history), interval=500, blit=True)
        plt.close(fig)
        return ani

    def close(self):
        plt.close()

## DummyAgent

In [2]:
class DummyAgent():
    def __init__(self, env):
        self.env = env

    def act(self, observation):
        return random.randint(0, len(self.env.actions))
        
    def learn(self, observation, action, reward, next_observation, done):
        pass

    def predict(self, observation):
        a = random.choice([action for action in range(len(self.env.actions)) if self.env.is_valid_move(observation, action)])
        return a

In [3]:
env = MazeGameEnv()
agent = DummyAgent(env)

observation = env.get_pos(env.player)
done = False
env.reset()
total_reward = 0

while not done:
    action = agent.predict(observation)
    next_observation, reward, done = env.step(action)
    observation = next_observation
    total_reward += reward
        
env.render()

In [4]:
def evaluate_agent(agent, env, num_episodes=100):
    all_rewards = []
    for episode in range(num_episodes):
        env.reset()
        observation = env.get_pos(env.player)
        done = False
        
        total_reward = 0
        while not done:
            action = agent.predict(observation)
            next_observation, reward, done = env.step(action)
            observation = next_observation
            total_reward += reward
        all_rewards.append(total_reward)
    return np.mean(all_rewards)
        

        

env = MazeGameEnv()
average_reward = evaluate_agent(agent, env)
print(f"Average reward over 100 episodes: {average_reward}")

Average reward over 100 episodes: 105.89


## QLearningAgent

In [5]:
class QLearningAgent():
    def __init__(self, env, learning_rate, discount_factor, exploration_rate):
        self.env = env
        self.lr = learning_rate
        self.discount_factor = discount_factor
        self.exploration_rate = exploration_rate
        self.positions = self.create_positions()
        self.q = self.create_q_table(self.positions)

    def create_positions(self):
        positions = []
        for i in range(len(self.env.board)):
            for j in range(len(self.env.board[0])):
                positions.append((i, j))
        return positions
    
    def create_q_table(self, positions):
        q = []
        for i in range(len(positions)):
            row = []
            for j in range(len(self.env.actions)):
                row.append(0)
            q.append(row)
        return q

    def act(self, observation):
        valid_actions = [action for action in range(len(self.env.actions)) if self.env.is_valid_move(observation, action)]
        if random.uniform(0, 1) < self.exploration_rate:
            #print("Exploration")
            return random.choice(valid_actions)
        else:
            #print("Exploitation")
            posible_actions = self.q[self.obs_to_position(observation)]
            posible_valid_actions = []

            for i in range(len(posible_actions)):
                if i in valid_actions:
                    posible_valid_actions.append((posible_actions[i], i))

            action_t = max(posible_valid_actions)
            action = action_t[1]
            if action is None:
                return None
            return action
        
    def obs_to_position(self, observation):
        position = None
        for i in range(len(self.positions)):
            if self.positions[i] == observation:
                position = i
                break
        return position

    def learn(self, observation, action, reward, next_observation, done):
        if done:
            return
        
        best_next_action = self.predict(observation)
        if best_next_action is None:
            print("No best action")
            return
        td_target = reward + self.discount_factor*self.q[self.obs_to_position(next_observation)][best_next_action]
        td_error = td_target - self.q[self.obs_to_position(observation)][action]
        self.q[self.obs_to_position(observation)][action] += self.lr*td_error

        self.exploration_rate = max(0.01, self.exploration_rate*0.999)
        

    def predict(self, observation):
        valid_actions = [action for action in range(len(self.env.actions)) if self.env.is_valid_move(observation, action)]
        posible_actions = self.q[self.obs_to_position(observation)]
        posible_valid_actions = []
        for i in range(len(posible_actions)):
            if i in valid_actions:
                posible_valid_actions.append((posible_actions[i], i))

        action_t = max(posible_valid_actions)
        action = action_t[1]
        if action is None:
            return None
        return action
    


In [6]:
def evaluate_agent(agent, env, num_episodes=100):
    rewards = []
    for episode in range(num_episodes):
        env.reset()
        observation = env.get_pos(env.player)
        done = False
        total_reward = 0
        
        steps = 0
        while not done:
            action = agent.act(observation)
            next_observation, reward, done = env.step(action)
            agent.learn(observation, action, reward, next_observation, done)
            observation = next_observation
            total_reward += reward
            steps += 1
            if steps > 1000:
                break
        print(f"End episode {episode} with {total_reward} points")
        rewards.append(total_reward)
    return rewards

learning_rate = 0.1
discount_factor = 0.9
exploration_rate = 0.1
num_episodes = 1000
env = MazeGameEnv()
agent = QLearningAgent(env, learning_rate, discount_factor, exploration_rate)
average_reward = evaluate_agent(agent, env, num_episodes)
print(f"Average reward over {num_episodes} episodes: {average_reward}")

End episode 0 with 112 points
End episode 1 with 118 points
End episode 2 with 116 points
End episode 3 with 116 points
End episode 4 with 116 points
End episode 5 with 114 points
End episode 6 with 118 points
End episode 7 with 118 points
End episode 8 with 116 points
End episode 9 with 116 points
End episode 10 with 116 points
End episode 11 with 116 points
End episode 12 with 116 points
End episode 13 with 116 points
End episode 14 with 116 points
End episode 15 with 116 points
End episode 16 with 116 points
End episode 17 with 116 points
End episode 18 with 116 points
End episode 19 with 116 points
End episode 20 with 116 points
End episode 21 with 116 points
End episode 22 with 116 points
End episode 23 with 116 points
End episode 24 with 116 points
End episode 25 with 114 points
End episode 26 with 118 points
End episode 27 with 118 points
End episode 28 with 118 points
End episode 29 with 116 points
End episode 30 with 116 points
End episode 31 with 116 points
End episode 32 wit

In [7]:
def try_agent(agent, env):
    env.reset()
    observation = env.get_pos(env.player)
    done = False
    total_reward = 0
    
    steps = 0
    while not done:
        action = agent.predict(observation)
        next_observation, reward, done = env.step(action)
        observation = next_observation
        total_reward += reward
        steps += 1
        if steps > 2000:
            print("finish")
            break
    return total_reward


total_reward = try_agent(agent, env)
env.render()