# ML HW5 sample code
TODO:
 - Complete value iteration algorithm
 - Implement
    1. Greedy action selection
    2. Epsilon-greedy or UCB action selection

Report:
 - Make **2** modifications to encourage the agent to play faster
 - Analyze the public score with and without each modification

In [9]:
import numpy as np
import random
import matplotlib.pyplot as plt
from matplotlib.animation import PillowWriter, FuncAnimation
import os
from PIL import Image
from tqdm import tqdm

# Define constants
BOARD_SIZE = 8
MAX_ROUNDS = 50
OBSTACLES = {(3, 3), (3, 4), (4, 3), (4, 4), (2,2), (2,5), (5,2), (5,5)}
PAWN_MOVE_PROB = 0.2

# Define hyperparameters
REWARD_CATCH = 1.0
REWARD_STEP = -0.01
GAMMA = 0.95
TAU = 0.05
num_episodes = 200000

In [10]:
class ChessEnvironment:
    def __init__(self, max_rounds=MAX_ROUNDS):
        self.max_rounds = max_rounds
        self.reset()

    def reset(self):
        self.knight_pos = self._random_position()
        self.pawn_pos = self._random_position()
        self.rounds = 0
        return self._get_state()

    def set_state(self, state):
        self.knight_pos, self.pawn_pos = state[:2], state[2:]
        self.rounds = 0
        return self._get_state()

    def _random_position(self):
        while True:
            pos = (random.randint(0, BOARD_SIZE - 1), random.randint(0, BOARD_SIZE - 1))
            if pos not in OBSTACLES:
                return pos

    def _get_state(self):
        return (*self.knight_pos, *self.pawn_pos)

    def _knight_moves(self, pos):
        x, y = pos
        moves = [
            (x + 2, y + 1), (x + 2, y - 1), (x - 2, y + 1), (x - 2, y - 1),
            (x + 1, y + 2), (x + 1, y - 2), (x - 1, y + 2), (x - 1, y - 2)
        ]
        return [
            (nx, ny) for nx, ny in moves
            if 0 <= nx < BOARD_SIZE and 0 <= ny < BOARD_SIZE and (nx, ny) not in OBSTACLES
        ]

    def step(self, knight_action):
        self.knight_pos = knight_action
        if self.knight_pos == self.pawn_pos:
            return self._get_state(), REWARD_CATCH, True

        # Adjusted: Pawn only moves downward
        if random.random() < PAWN_MOVE_PROB:
            px, py = self.pawn_pos
            if px < BOARD_SIZE - 1 and (px + 1, py) not in OBSTACLES:
                self.pawn_pos = (px + 1, py)

        self.rounds += 1
        done = self.rounds >= self.max_rounds
        return self._get_state(), REWARD_STEP, done


In [11]:
def greedy_action_selection(env, state, value_table):
    actions = env._knight_moves(state[:2])
    if not actions:
        return state[:2]
    action_values = [
        value_table[action[0], action[1], state[2], state[3]] for action in actions
    ]
    return actions[np.argmax(action_values)]


def epsilon_greedy_action_selection(env, state, value_table, epsilon):
    actions = env._knight_moves(state[:2])
    if not actions:
        return state[:2]
    if random.random() < epsilon:
        return random.choice(actions)
    else:
        return greedy_action_selection(env, state, value_table)


def ucb_action_selection(env, state, value_table, count_table, c=2):
    actions = env._knight_moves(state[:2])
    if not actions:
        return state[:2]
    total_visits = sum(
        count_table[action[0], action[1], state[2], state[3]] for action in actions
    )
    ucb_values = [
        value_table[action[0], action[1], state[2], state[3]] +
        c * np.sqrt(np.log(total_visits + 1) /
                    (count_table[action[0], action[1], state[2], state[3]] + 1e-5))
        for action in actions
    ]
    return actions[np.argmax(ucb_values)]    
    

def train_agent():
    env = ChessEnvironment()
    value_table = np.zeros((BOARD_SIZE, BOARD_SIZE, BOARD_SIZE, BOARD_SIZE))
    count_table = np.zeros_like(value_table, dtype=int)

    for episode in tqdm(range(num_episodes)):
        tau = max(0.05, 0.1 - episode / (num_episodes / 2))  
        epsilon = max(0.01, 0.5 - (episode / (num_episodes * 0.8)))
        state = env.reset()
        done = False

        while not done:
            if episode % 10 == 0:
                action = ucb_action_selection(env, state, value_table, count_table)
            else:
                epsilon = max(0.1, 1 - episode / num_episodes)
                action = epsilon_greedy_action_selection(env, state, value_table, epsilon)

            next_state, reward, done = env.step(action)

            next_actions = env._knight_moves(next_state[:2])
            max_next_value = max(
                [value_table[action[0], action[1], next_state[2], next_state[3]] for action in next_actions],
                default=0
            )

            current_value = value_table[state[0], state[1], state[2], state[3]]
            value_table[state[0], state[1], state[2], state[3]] = current_value + tau * (
                reward + GAMMA * max_next_value - current_value
            )

            count_table[state[0], state[1], state[2], state[3]] += 1
            state = next_state

    return value_table

In [12]:
# Helper functions
def finish_game(value_table, env):
    # Generate frames of a complete game with progress bar.
    state = env._get_state()
    frames = []
    done = False
    step = 0
    while not done:
        frames.append((*state, step))
        action = greedy_action_selection(env, state, value_table)
        next_state, _, done = env.step(action)
        state = next_state
        step += 1
    frames.append((*state, step))
    return frames

def render_frame(kx, ky, px, py, step, max_steps):
    fig, ax = plt.subplots(figsize=(6, 7))  # Add extra space for the progress bar

    # Plot the chessboard
    ax.set_xlim(0, BOARD_SIZE-1)
    ax.set_ylim(0, BOARD_SIZE-1)
    ax.set_xticks(range(BOARD_SIZE))
    ax.set_yticks(range(BOARD_SIZE))
    ax.grid(True)

    # Draw the obstacles at the center of grid cells
    for x, y in OBSTACLES:
        ax.text(y, BOARD_SIZE - 1 - x, "O", ha="center", va="center", fontsize=20, color="black")

    # Draw the knight at the center of its grid cell
    ax.text(ky, BOARD_SIZE - 1 - kx, "K", ha="center", va="center", fontsize=20, color="blue")

    # Draw the pawn at the center of its grid cell
    ax.text(py, BOARD_SIZE - 1 - px, "P", ha="center", va="center", fontsize=20, color="red")

    # Add a title
    ax.set_title("Catch the Pawn")

    # Add a progress bar below the chessboard
    bar_ax = fig.add_axes([0.1, 0.05, 0.8, 0.05])  # [left, bottom, width, height]
    bar_ax.barh(0, width=step / (max_steps-1), height=1, color="green")
    bar_ax.set_xlim(0, 1)
    bar_ax.axis("off")
    bar_ax.text(0.5, 0, f"Step: {step}/{max_steps-1}", ha="center", va="center", fontsize=12, color="white")

    # Convert the figure to a PIL image
    fig.canvas.draw()
    width, height = fig.canvas.get_width_height()
    img = np.frombuffer(fig.canvas.tostring_rgb(), dtype='uint8').reshape(height, width, 3)
    pil_img = Image.fromarray(img)

    plt.close(fig)
    return pil_img

def fig_to_array(fig):
    fig.canvas.draw()
    width, height = fig.canvas.get_width_height()
    return np.frombuffer(fig.canvas.tostring_rgb(), dtype='uint8').reshape(height, width, 3)

def save_gif(frames, filename="game.gif"):
    frames[0].save(
        filename, save_all=True, append_images=frames[1:]+frames[-1:]*3, duration=500, loop=0
    )


In [25]:
# Train the agent
# value_table = train_agent()

### Demo

In [26]:
# import IPython
# # Create game frames and save as gif
# env = ChessEnvironment()
# frames = finish_game(value_table, env)

# # save_gif(frames)
# save_gif([render_frame(*f, len(frames)) for f in frames])
# IPython.display.Image(filename="game.gif")

# Testing
**Submit your value_table.npy to Cool**

In [22]:
# Save the value table
# np.save("value_table.npy", value_table)

# # Load the value table
import os
#value_table = np.load(os.path.join("96.1", "value_table_test_0.npy"))

value_table = np.load('C:\\Users\\cdpss\\Downloads\\988\\value_table.npy')

### Evaluate on the Public cases

In [23]:
# 100 public states
public_states = [(1, 7, 7, 5), (6, 7, 7, 2), (6, 2, 6, 2), (1, 1, 1, 5), (2, 1, 1, 1), (4, 2, 1, 6), (5, 7, 5, 3), (5, 4, 1, 0), (6, 5, 2, 3), (2, 4, 1, 0), (3, 6, 5, 7), (0, 3, 1, 3), (0, 7, 5, 3), (3, 5, 1, 5), (2, 1, 7, 5), (1, 5, 1, 6), (1, 2, 6, 6), (7, 5, 4, 2), (6, 7, 7, 4), (2, 6, 7, 6), (7, 3, 7, 5), (0, 6, 7, 6), (6, 5, 1, 4), (2, 6, 2, 1), (3, 2, 6, 4), (0, 3, 2, 4), (7, 2, 5, 1), (3, 6, 1, 0), (6, 4, 0, 3), (6, 1, 5, 3), (5, 4, 4, 1), (7, 7, 1, 6), (4, 7, 5, 4), (5, 6, 0, 4), (2, 6, 0, 3), (7, 0, 1, 4), (6, 4, 1, 4), (0, 2, 3, 0), (4, 6, 1, 0), (1, 1, 7, 4), (1, 4, 6, 2), (1, 2, 4, 1), (4, 7, 0, 1), (5, 4, 2, 6), (6, 4, 6, 0), (2, 1, 1, 5), (5, 3, 2, 6), (6, 7, 2, 0), (6, 3, 0, 6), (6, 1, 3, 7), (5, 7, 1, 3), (0, 6, 1, 6), (0, 6, 6, 2), (4, 1, 1, 5), (7, 1, 3, 2), (7, 6, 1, 3), (1, 7, 1, 4), (5, 6, 1, 1), (5, 1, 6, 2), (0, 4, 2, 1), (0, 2, 1, 4), (6, 1, 7, 4), (7, 3, 3, 5), (3, 5, 6, 7), (0, 4, 6, 1), (2, 1, 7, 2), (0, 1, 5, 3), (4, 7, 3, 1), (7, 7, 4, 0), (4, 7, 2, 3), (1, 4, 1, 1), (1, 2, 1, 0), (6, 4, 0, 0), (7, 3, 1, 1), (2, 4, 1, 7), (2, 0, 4, 5), (7, 1, 5, 4), (1, 5, 1, 5), (1, 7, 2, 1), (7, 4, 3, 7), (6, 4, 2, 0), (4, 2, 6, 1), (3, 0, 0, 6), (4, 2, 0, 6), (2, 6, 6, 7), (2, 6, 7, 5), (2, 3, 3, 2), (7, 1, 3, 6), (2, 1, 6, 5), (2, 7, 7, 5), (7, 4, 7, 4), (4, 6, 6, 5), (2, 1, 2, 1), (2, 1, 5, 0), (1, 0, 0, 1), (5, 0, 3, 5), (0, 0, 3, 5), (6, 3, 0, 3), (4, 5, 6, 4), (1, 7, 3, 1)]

# Eval environment (score = 100 - rounds)
class EvalEnvironment:
    def __init__(self):
        self.reset()

    def reset(self):
        self.knight_pos = self._random_position()
        self.pawn_pos = self._random_position()
        self.rounds = 0
        return self._get_state()

    def set_state(self, state):
        """Set the state of the game."""
        self.knight_pos, self.pawn_pos = state[:2], state[2:]
        self.rounds = 0
        return self._get_state()

    def _random_position(self):
        while True:
            pos = (random.randint(0, BOARD_SIZE - 1), random.randint(0, BOARD_SIZE - 1))
            if pos not in OBSTACLES:
                return pos

    def _get_state(self):
        return (*self.knight_pos, *self.pawn_pos)

    def _knight_moves(self, pos):
        x, y = pos
        moves = [
            (x + 2, y + 1), (x + 2, y - 1), (x - 2, y + 1), (x - 2, y - 1),
            (x + 1, y + 2), (x + 1, y - 2), (x - 1, y + 2), (x - 1, y - 2)
        ]
        return [
            (nx, ny) for nx, ny in moves
            if 0 <= nx < BOARD_SIZE and 0 <= ny < BOARD_SIZE and (nx, ny) not in OBSTACLES
        ]

    def step(self, knight_action):
        # Update knight position
        self.knight_pos = knight_action

        # Check for termination
        if self.knight_pos == self.pawn_pos:
            return self._get_state(), 100-self.rounds, True

        # Pawn's random movement
        if random.random() < PAWN_MOVE_PROB:
            px, py = self.pawn_pos
            if px < BOARD_SIZE - 1 and (px + 1, py) not in OBSTACLES:
                self.pawn_pos = (px + 1, py)

        self.rounds += 1
        done = self.rounds >= MAX_ROUNDS
        return (self._get_state()), 0, done

def finish_game_eval(value_table, env):
    # Finish the game and get the score
    state = env._get_state()
    done = False
    while not done:
        action = greedy_action_selection(env, state, value_table)
        state, score, done = env.step(action)
    return score

In [24]:
# Evaluate the value table
env = EvalEnvironment()

# fix the random seed (TA will use this seed)
random.seed(42)

# run the public cases
scores = []
for state in public_states:
    env.set_state(state)
    score = finish_game_eval(value_table, env)
    scores.append(score)
print(f"Public score: {sum(scores)/len(scores)}")

Public score: 95.37
