In [2]:
import gymnasium as gym
from gymnasium import Env
from gymnasium.spaces import Discrete, MultiDiscrete
import numpy as np
from enum import Enum
from dataclasses import dataclass
from typing import List

# docs and experiment results can be found at https://docs.cleanrl.dev/rl-algorithms/dqn/#dqnpy
import os
import random
import time
from dataclasses import dataclass

import gymnasium as gym
import numpy as np
import torch
from torch import tensor
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from stable_baselines3.common.buffers import ReplayBuffer
from statistics import mean
from tqdm import tqdm
from math import log10
from more_itertools import chunked

from plotly.graph_objects import Figure, Scatter
from IPython.display import clear_output

  import distutils.spawn


In [3]:
# x is the horizontal axis, it is bigger on the right
# y is the vertical axis, it is bigger on the top
# (the convention is like in math)

Color = Enum("Player", ["white", "black"])
Cell = Enum("Cell", ["inaccessible", "empty", "white", "black"])

Side = Enum("Side", ["left", "right"])

def opposite_color(color):
    return {Color.white: Color.black, Color.black: Color.white}[color]

def piece_of_player(color):
    return {Color.white: Cell.white, Color.black: Cell.black}[color]

def x_direction(side):
    return {Side.left: -1, Side.right: +1}[side]

def y_direction(player_color):
    return {Color.white: +1, Color.black: -1}[player_color]

@dataclass
class Move:
    from_x: int
    from_y: int
    side: int
    player: int
    board_width: int
    board_height: int

    def __post_init__(self):
        assert (self.from_x + self.from_y) % 2 == 0 

        self.to_x           = (self.from_x +     x_direction(self.side)) % self.board_width
        self.to_x_if_eating = (self.from_x + 2 * x_direction(self.side)) % self.board_width

        self.to_y           = (self.from_y +     y_direction(self.player)) % self.board_height
        self.to_y_if_eating = (self.from_y + 2 * y_direction(self.player)) % self.board_height

def initial_board(board_width, board_height, num_rows_with_pieces_initially):
    assert board_height % 2 == 0
    assert board_width % 2 == 0
    assert 2 * num_rows_with_pieces_initially <= board_width

    board = np.full((board_width, board_height), Cell.empty)
    
    for x in range(board_width):
        for y in range(board_height):
            if (x + y) % 2 == 1:
                board[x, y] = Cell.inaccessible
            elif y < num_rows_with_pieces_initially:
                board[x, y] = Cell.white
            elif y >= board_height - num_rows_with_pieces_initially:
                board[x, y] = Cell.black
            else:
                board[x, y] = Cell.empty

    return board

@dataclass
class MoveResult:
    legal: bool
    eaten: bool = False

def play_move(board, move, modify_board=True) -> MoveResult:
    if board[move.from_x, move.from_y] != piece_of_player(move.player):
        return MoveResult(legal=False)
        
    non_eating_move_possible = board[move.to_x, move.to_y] == Cell.empty
    if non_eating_move_possible:
        if modify_board:
            board[move.from_x, move.from_y] = Cell.empty
            board[move.to_x, move.to_y] = piece_of_player(move.player)
        return MoveResult(legal=True, eaten=False)
        
    eating_move_possible = board[move.to_x, move.to_y] == piece_of_player(opposite_color(move.player)) \
                            and board[move.to_x_if_eating, move.to_y_if_eating] == Cell.empty
    if eating_move_possible:
        if modify_board:
            board[move.from_x, move.from_y] = Cell.empty
            board[move.to_x, move.to_y] = Cell.empty
            board[move.to_x_if_eating, move.to_y_if_eating] = piece_of_player(move.player)
        return MoveResult(legal=True, eaten=True)

    return MoveResult(legal=False)

def is_legal(board, move):
    return play_move(board, move, modify_board=False).legal

def all_moves(player, board_width, board_height):
    return ( Move(from_x=from_x, from_y=from_y, side=side, player=player, board_width=board_width, board_height=board_height)
             for from_x in range(board_width)
             for from_y in range(board_height)
             if (from_x + from_y) % 2 == 0
             for side in [Side.left, Side.right] )

def legal_moves(player, board, board_width, board_height):
    moves = all_moves(player=player, board_width=board_width, board_height=board_height)
    return [move for move in moves if is_legal(board, move)]

def side_to_id(side):
    return {Side.left: 0, Side.right: 1}[side]

def side_from_id(side_id):
    return {0: Side.left, 1: Side.right}[side_id]

def combine_ids(ids: List[int], bounds: List[int]) -> int:
    assert len(ids) == len(bounds)
    combined = 0
    for id, bound in zip(ids, bounds):
        combined = bound * combined + id
    return combined

def extract_ids(combined_ids: int, bounds: List[int]) -> List[int]:
    ids = []
    for bound in bounds[::-1]:
        ids = [combined_ids % bound] + ids
        combined_ids //= bound
    assert combined_ids == 0
    return ids

def move_to_id(move, board_width, board_height):
    nsides = 2
    return combine_ids( ids    = [move.from_x // 2, move.from_y,  side_to_id(move.side)],
                        bounds = [board_width // 2, board_height, nsides] )

def move_from_id(move_id, player, board_width, board_height):
    nsides = 2
    from_x, from_y, side = extract_ids(move_id, bounds=[board_width // 2, board_height, nsides])
    from_x *= 2
    side = side_from_id(side)

    if (from_x + from_y) % 2 != 0:
        from_x = from_x + 1

    return Move(from_x=from_x, from_y=from_y, side=side, player=player, board_width=board_width, board_height=board_height)

def cell_to_id(cell):
    return {Cell.empty: 0, Cell.white: 1, Cell.black: 2}[cell]

def board_to_ids(board):
    return np.array([cell_to_id(cell) for cell in board.flatten() if cell != Cell.inaccessible])

def num_accessible_cells(board_width, board_height):
    num_cells = board_width * board_height
    return num_cells // 2

def num_possible_moves(board_width, board_height):
    num_sides = 2
    return num_sides * num_accessible_cells(board_width=board_width, board_height=board_height)

def board_to_string(board, board_width, board_height):
    cell_to_string = {Cell.inaccessible: " ", Cell.empty: ".", Cell.white: "w", Cell.black: "b"}

    return "\n".join( "".join(cell_to_string[board[x, y]] for x in range(board_width))
                      for y in range(board_height-1, -1, -1) )

class SimplifiedCheckersAgainstRandomPlayer(Env):
    metadata = {"render.modes": ["human"]}

    def __init__(self, board_width=8, board_height=8, num_rows_with_pieces_initially=3, agent_color=Color.white):
        self.board_width = board_width
        self.board_height = board_height
        self.num_rows_with_pieces_initially = num_rows_with_pieces_initially
        self.agent_color = agent_color
        
        self.action_space = Discrete(num_possible_moves(board_width=board_width, board_height=board_height))
        
        self.observation_space = MultiDiscrete([3] * num_accessible_cells(board_width=board_width, board_height=board_height))

    def reset(self, seed=None, options=None):
        # TO DO: use seed

        self.board = initial_board( board_width=self.board_width,
                                    board_height=self.board_height,
                                    num_rows_with_pieces_initially=self.num_rows_with_pieces_initially )

        self.reward_before_first_step = None

        if self.agent_color == Color.black:
            adversary_move = np.random.choice(legal_moves( player=Color.black,
                                                           board=self.board,
                                                           board_height=self.board_height,
                                                           board_width=self.board_width))
            result = play_move(self.board, adversary_move)
            if result.eaten:
                self.reward_before_first_step = -1.

        observation = board_to_ids(self.board)
        assert self.observation_space.contains(observation)
        return observation, {}

    def step(self, action):
        assert self.action_space.contains(action)

        reward = 0.

        if self.reward_before_first_step is not None:
            reward += self.reward_before_first_step
            self.reward_before_first_step = None

        agent_move = move_from_id( move_id=action,
                                   player=self.agent_color,
                                   board_width=self.board_width,
                                   board_height=self.board_height )
        if not is_legal(self.board, agent_move):
            reward -= 1.
            agent_legal_moves = legal_moves(self.agent_color, self.board, board_width=self.board_width, board_height=self.board_height)
            if agent_legal_moves == []:
                observation = self.action_space.sample()
                done = True
                truncated = False
                return observation, reward, done, truncated, {}
            agent_move = np.random.choice(agent_legal_moves)
        agent_move_result = play_move(self.board, agent_move)
        assert agent_move_result.legal
        if agent_move_result.eaten:
            reward += 1.

        adversary_legal_moves = legal_moves(opposite_color(self.agent_color), self.board, board_width=self.board_width, board_height=self.board_height)
        if adversary_legal_moves == []:
            observation = self.action_space.sample()
            done = True
            truncated = False
            return observation, reward, done, truncated, {}
        adversary_move = np.random.choice(adversary_legal_moves)
        adversary_move_result = play_move(self.board, adversary_move)
        assert adversary_move_result.legal
        if adversary_move_result.eaten:
            reward -= 1

        observation = board_to_ids(self.board)
        assert self.observation_space.contains(observation)
        done = False
        truncated = False
        return observation, reward, done, truncated, {}

    def render(self, mode="human", close=False):
        print(board_to_string(self.board, board_width=self.board_width, board_height=self.board_height))

# we don't register the environment because chatgpt said not to do it if all the code is in one notebook

In [4]:
np.random.seed(42)

env = SimplifiedCheckersAgainstRandomPlayer()

observation, info = env.reset()
print(observation, info)
env.render()

for action in [ move_to_id(Move(from_x=0, from_y=2, side=Side.right, player=Color.white, board_width=8, board_height=8), board_width=8, board_height=8),
                move_to_id(Move(from_x=6, from_y=2, side=Side.left,  player=Color.white, board_width=8, board_height=8), board_width=8, board_height=8),
                move_to_id(Move(from_x=5, from_y=3, side=Side.right,  player=Color.white, board_width=8, board_height=8), board_width=8, board_height=8),
                move_to_id(Move(from_x=1, from_y=3, side=Side.right,  player=Color.white, board_width=8, board_height=8), board_width=8, board_height=8) ]:
    print()
    observation, reward, done, truncated, info = env.step(action)
    print()
    print(observation, reward, done, truncated, info)
    env.render()

[1 1 0 2 1 0 2 2 1 1 0 2 1 0 2 2 1 1 0 2 1 0 2 2 1 1 0 2 1 0 2 2] {}
 b b b b
b b b b 
 b b b b
. . . . 
 . . . .
w w w w 
 w w w w
w w w w 


[1 0 0 2 1 1 2 2 1 1 0 2 1 0 2 2 1 1 0 2 1 0 2 2 1 1 2 2 1 0 0 2] 0.0 False False {}
 b b b b
b b b b 
 b b b .
. . . b 
 w . . .
. w w w 
 w w w w
w w w w 


[1 0 0 2 1 1 2 2 1 1 2 2 1 0 0 2 1 1 0 2 1 1 2 2 1 0 2 2 1 0 0 2] 0.0 False False {}
 b b b b
b b b b 
 b . b .
. b . b 
 w . w .
. w w . 
 w w w w
w w w w 


[1 0 0 2 1 1 2 2 1 1 2 2 1 0 0 2 1 1 0 2 1 0 0 2 1 0 2 2 1 0 1 2] 1.0 False False {}
 b b b b
b b b b 
 b . . w
. b . b 
 w . . .
. w w . 
 w w w w
w w w w 


[1 0 0 2 1 0 2 2 1 1 0 2 1 0 1 2 1 1 0 0 1 0 2 2 1 0 2 2 1 0 1 2] 1.0 False False {}
 b b b b
b b . b 
 b w b w
. . . b 
 . . . .
. w w . 
 w w w w
w w w w 


In [28]:
def clamp(x, min=None, max=None):
    if min is not None and x < min:
        x = min
    if max is not None and x > max:
        x = max
    return x

def set_weight_decay(optimizer, weight_decay):
    for param_group in optimizer.param_groups:
        param_group['weight_decay'] = weight_decay

@dataclass
class SimpleWeightDecayScheduler:
    aspiration_min: float
    aspiration_max: float
    weight_decay_step_up: float
    weight_decay_step_down: float
    min_weight_decay: float
    max_weight_decay: float
    smoothness: int

    def __post_init__(self):
        self.weight_decay = self.min_weight_decay

    def update(self, episodic_return_history):
        smooth_episodic_return = mean(list(episodic_return_history.values())[max(0, len(episodic_return_history) - self.smoothness):])
        if smooth_episodic_return > self.aspiration_min:
            self.weight_decay *= self.weight_decay_step_up
        if smooth_episodic_return < self.aspiration_max:
            self.weight_decay /= self.weight_decay_step_down
        self.weight_decay = clamp(self.weight_decay, min=self.min_weight_decay, max=self.max_weight_decay)

In [32]:
@dataclass
class Args:
    exp_name: str = "weight_decay_schedule_2"
    """the name of this experiment"""
    seed: int = 1
    """seed of the experiment"""
    torch_deterministic: bool = True
    """if toggled, `torch.backends.cudnn.deterministic=False`"""
    cuda: bool = True
    """if toggled, cuda will be enabled by default"""
    track: bool = False
    """if toggled, this experiment will be tracked with Weights and Biases"""
    wandb_project_name: str = "cleanRL"
    """the wandb's project name"""
    wandb_entity: str = None
    """the entity (team) of wandb's project"""
    capture_video: bool = False
    """whether to capture videos of the agent performances (check out `videos` folder)"""
    save_model: bool = False
    """whether to save model into the `runs/{run_name}` folder"""
    upload_model: bool = False
    """whether to upload the saved model to huggingface"""
    hf_entity: str = ""
    """the user or org name of the model repository from the Hugging Face Hub"""

    aspiration: float = 150.
    min_weight_decay: float = 0.05
    max_weight_decay: float = 2.0
    weight_decay_dichotomy_eps: float = 0.025

    test_total_timesteps: int = 50_000

    # Algorithm specific arguments
    env_id: str = "CartPole-v1"
    """the id of the environment"""
    total_timesteps: int = 500000
    """total timesteps of the experiments"""
    learning_rate: float = 2.5e-4
    """the learning rate of the optimizer"""
    num_envs: int = 1
    """the number of parallel game environments"""
    buffer_size: int = 10000
    """the replay memory buffer size"""
    gamma: float = 0.99
    """the discount factor gamma"""
    tau: float = 1.0
    """the target network update rate"""
    target_network_frequency: int = 500
    """the timesteps it takes to update the target network"""
    batch_size: int = 128
    """the batch size of sample from the reply memory"""
    start_e: float = 1
    """the starting epsilon for exploration"""
    end_e: float = 0.05
    """the ending epsilon for exploration"""
    exploration_fraction: float = 0.5
    """the fraction of `total-timesteps` it takes from start-e to go end-e"""
    learning_starts: int = 10000
    """timestep to start learning"""
    train_frequency: int = 10
    """the frequency of training"""

def make_env(env_id, seed, idx, capture_video):
    def thunk():
        env = SimplifiedCheckersAgainstRandomPlayer(board_height=4, board_width=4, num_rows_with_pieces_initially=1)
        env = gym.wrappers.RecordEpisodeStatistics(env)
        env.action_space.seed(seed)

        return env

    return thunk

# ALGO LOGIC: initialize agent here:
class QNetwork(nn.Module):
    def __init__(self, env):
        super().__init__()
        self.network = nn.Sequential(
            nn.Linear(np.array(env.single_observation_space.shape).prod(), 256),
            nn.ReLU(),
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Linear(256, 256),
            nn.ReLU(),
            nn.Linear(256, env.single_action_space.n),
        )

    def forward(self, x):
        return self.network(x)

def sum_weights_squared(model):
    return sum((param.data**2).sum() for param in model.parameters())

def linear_schedule(start_e: float, end_e: float, duration: int, t: int):
    slope = (end_e - start_e) / duration
    return max(slope * t + start_e, end_e)

def train(args: Args, weight_decay_scheduler):
    random.seed(args.seed)
    np.random.seed(args.seed)
    torch.manual_seed(args.seed)
    torch.backends.cudnn.deterministic = args.torch_deterministic

    device = torch.device("cuda" if torch.cuda.is_available() and args.cuda else "cpu")

    # env setup
    envs = gym.vector.SyncVectorEnv(
        [make_env(args.env_id, args.seed + i, i, args.capture_video) for i in range(args.num_envs)]
    )
    assert isinstance(envs.single_action_space, gym.spaces.Discrete), "only discrete action space is supported"

    q_network = QNetwork(envs).to(device)
    optimizer = optim.AdamW(q_network.parameters(), lr=args.learning_rate, weight_decay=weight_decay_scheduler.weight_decay)
    target_network = QNetwork(envs).to(device)
    target_network.load_state_dict(q_network.state_dict())

    rb = ReplayBuffer(
        args.buffer_size,
        envs.single_observation_space,
        envs.single_action_space,
        device,
        handle_timeout_termination=False,
    )
    
    episodic_return_history = dict()
    episodic_length_history = dict()
    sum_weights_squared_history = dict()
    td_loss_history = dict()
    weight_decay_history = dict()

    # TRY NOT TO MODIFY: start the game
    obs, _ = envs.reset(seed=args.seed)
    # obs = obs.astype(np.float32)
    for global_step in tqdm(range(args.total_timesteps)):
        # ALGO LOGIC: put action logic here
        epsilon = linear_schedule(args.start_e, args.end_e, args.exploration_fraction * args.total_timesteps, global_step)
        if random.random() < epsilon:
            actions = np.array([envs.single_action_space.sample() for _ in range(envs.num_envs)])
        else:
            q_values = q_network(torch.Tensor(obs).to(device))
            actions = torch.argmax(q_values, dim=1).cpu().numpy()

        # TRY NOT TO MODIFY: execute the game and log data.
        next_obs, rewards, terminations, truncations, infos = envs.step(actions)
        # next_obs = next_obs.astype(np.float32)

        # TRY NOT TO MODIFY: record rewards for plotting purposes
        if "final_info" in infos:
            for info in infos["final_info"]:
                if info and "episode" in info:
                    episodic_return = info["episode"]["r"].item()
                    episodic_length = info["episode"]["l"].item()
                    episodic_return_history[global_step] = episodic_return
                    episodic_length_history[global_step] = episodic_length

                    weight_decay_scheduler.update(episodic_return_history)

        # TRY NOT TO MODIFY: save data to reply buffer; handle `final_observation`
        real_next_obs = next_obs.copy()
        for idx, trunc in enumerate(truncations):
            if trunc:
                real_next_obs[idx] = infos["final_observation"][idx]
        rb.add(obs, real_next_obs, actions, rewards, terminations, infos)

        # TRY NOT TO MODIFY: CRUCIAL step easy to overlook
        obs = next_obs

        # ALGO LOGIC: training.
        if global_step > args.learning_starts:
            if global_step % args.train_frequency == 0:
                data = rb.sample(args.batch_size)
                with torch.no_grad():
                    target_max, _ = target_network(data.next_observations.float()).max(dim=1)
                    td_target = data.rewards.flatten() + args.gamma * target_max * (1 - data.dones.flatten())
                old_val = q_network(data.observations.float()).gather(1, data.actions).squeeze()
                loss = F.mse_loss(td_target, old_val)

                set_weight_decay(optimizer, weight_decay_scheduler.weight_decay)

                td_loss_history[global_step] = loss.item()
                sum_weights_squared_history[global_step] = sum_weights_squared(q_network).item()
                weight_decay_history[global_step] = weight_decay_scheduler.weight_decay

                # optimize the model
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

            # update target network
            if global_step % args.target_network_frequency == 0:
                target_network.load_state_dict(q_network.state_dict())
                
    fig = Figure()
    fig.add_trace(Scatter( x      = list(mean(slice) for slice in chunked(weight_decay_history.keys(), 100)),
                           y      = list(mean(slice) for slice in chunked(weight_decay_history.values(), 100)),
                           name   = "weight decay",
                           yaxis = "y2" ))
    fig.add_trace(Scatter( x      = list(mean(slice) for slice in chunked(sum_weights_squared_history.keys(), 100)),
                           y      = list(mean(slice) for slice in chunked(sum_weights_squared_history.values(), 100)),
                           name   = "sum weight squared",
                           yaxis = "y3" ))
    fig.add_trace(Scatter( x      = list(mean(slice) for slice in chunked(td_loss_history.keys(), 100)),
                           y      = list(mean(slice) for slice in chunked(td_loss_history.values(), 100)),
                           name   = "TD loss",
                           yaxis  = "y4",
                           mode   = "markers",
                           marker = {"opacity": 0.5} ))
    fig.add_trace(Scatter( x      = list(mean(slice) for slice in chunked(episodic_return_history.keys(), 100)),
                           y      = list(x for x in (mean(slice) for slice in chunked(episodic_return_history.values(), 100))),
                           name   = "episodic return",
                           mode   = "markers",
                           marker = {"opacity": 0.5}))
    
    fig.add_hline( y=weight_decay_scheduler.aspiration_min,
                   # annotation_y=log10(-weight_decay_scheduler.aspiration_min+3), # we need annotation_y=log10(y) because of a bug in plotly
                   annotation_text="aspiration min",
                   annotation_position="bottom left",
                   line_color="seagreen" )
    fig.add_hline( y=weight_decay_scheduler.aspiration_max,
                   # annotation_y=log10(-weight_decay_scheduler.aspiration_max+3), # we need annotation_y=log10(y) because of a bug in plotly
                   annotation_text="aspiration max",
                   annotation_position="top left",
                   line_color="seagreen" )

    fig.update_layout(
        # title=f"{weight_decay=}",
        xaxis_title="Global step",
        xaxis=dict(domain=[0.10, 0.95]),
        yaxis=dict(title="episodic return"),
        yaxis2=dict(title="weight decay", overlaying="y", side="right", type="log"),
        yaxis3=dict(title="sum weights squared", anchor="free", overlaying="y", autoshift=True),
        yaxis4=dict(title="TD loss", anchor="free", overlaying="y", autoshift=True),
    )

    # clear_output(wait=True)
    display(fig)

    envs.close()

    return target_network

def test(model, args):
    episodic_rewards = []
    episodic_lengths = []
    env = SimplifiedCheckersAgainstRandomPlayer(board_height=4, board_width=4, num_rows_with_pieces_initially=1)
    for _ in tqdm(range(1_000)):
        observation, info = env.reset()
        episodic_reward = 0.
        episodic_length = 0
        while True:
            action = model(tensor(observation).float()).argmax().item()
            observation, reward, done, terminated, info = env.step(action)
            episodic_reward += reward
            episodic_length += 1
            if done:
                break
        episodic_rewards.append(episodic_reward)
        episodic_lengths.append(episodic_length)
    
    return {"episodic_reward": mean(episodic_rewards), "episodic_length": mean(episodic_lengths)}
        

In [33]:
weight_decay_scheduler = SimpleWeightDecayScheduler( aspiration_min = 0.4,
                                                     aspiration_max = 0.6,
                                                     weight_decay_step_up = 1.01,
                                                     weight_decay_step_down = 1.01,
                                                     min_weight_decay = 0.01,
                                                     max_weight_decay = 100.,
                                                     smoothness=100 )
model = train(Args(), weight_decay_scheduler)
print("test_result", test(model, Args()))

  0%|          | 0/500000 [00:00<?, ?it/s]

100%|██████████| 500000/500000 [11:27<00:00, 727.49it/s]


100%|██████████| 1000/1000 [00:03<00:00, 320.57it/s]

test_result {'episodic_reward': 0.82, 'episodic_length': 5.125}





In [34]:
weight_decay_scheduler = SimpleWeightDecayScheduler( aspiration_min = 0.4,
                                                     aspiration_max = 0.6,
                                                     weight_decay_step_up = 1.01,
                                                     weight_decay_step_down = 1.01,
                                                     min_weight_decay = 0.01,
                                                     max_weight_decay = 0.01,
                                                     smoothness=100 )
model = train(Args(), weight_decay_scheduler)
print("test_result", test(model, Args()))

100%|██████████| 500000/500000 [11:06<00:00, 750.47it/s]


100%|██████████| 1000/1000 [00:04<00:00, 240.48it/s]

test_result {'episodic_reward': 1.552, 'episodic_length': 5.605}





In [30]:
weight_decay_scheduler = SimpleWeightDecayScheduler( aspiration_min = 0.4,
                                                     aspiration_max = 0.6,
                                                     weight_decay_step_up = 1.01,
                                                     weight_decay_step_down = 1.01,
                                                     min_weight_decay = 0.01,
                                                     max_weight_decay = 100.,
                                                     smoothness=100 )
model = train(Args(), weight_decay_scheduler)
print("test_result", test(model, Args()))

  0%|          | 0/500000 [00:00<?, ?it/s]

100%|██████████| 500000/500000 [12:25<00:00, 670.90it/s]


100%|██████████| 1000/1000 [00:02<00:00, 368.90it/s]

test_result {'episodic_reward': 0.903, 'episodic_length': 4.976}





In [29]:
weight_decays = [0., 0.05, 0.1, 0.2, 0.5, 1.]
test_rewards = []

for weight_decay in weight_decays:
    weight_decay_scheduler = SimpleWeightDecayScheduler( aspiration_min = 0.9,
                                                        aspiration_max = 1.1,
                                                        weight_decay_step = 1.01,
                                                        min_weight_decay = weight_decay,
                                                        max_weight_decay = weight_decay )
    model = train(Args(), weight_decay_scheduler)
    test_result = test(model, Args())
    print("test_result", test_result)
    test_rewards.append(test_result["episodic_reward"])

display(Figure(Scatter(x=weight_decays, y=test_rewards)))

TypeError: SimpleWeightDecayScheduler.__init__() got an unexpected keyword argument 'weight_decay_step'

In [19]:
for weight_decay in [0.01, 0.0125, 0.015, 0.175, 0.20]:
    weight_decay_scheduler = SimpleWeightDecayScheduler( aspiration_min = 0,
                                                         aspiration_max = 0,
                                                         weight_decay_step = 1,
                                                         min_weight_decay = weight_decay,
                                                         max_weight_decay = weight_decay )
    model = train(Args(), weight_decay_scheduler=weight_decay_scheduler)

  0%|          | 0/500000 [00:00<?, ?it/s]

  8%|▊         | 41924/500000 [01:10<12:45, 598.44it/s] 


KeyboardInterrupt: 

In [6]:
assert False

model = train(Args(), weight_decay=0.)
print(test(model, Args()))

AssertionError: 

In [None]:
from torch import tensor

In [None]:
env = SimplifiedCheckersAgainstRandomPlayer(board_height=4, board_width=4, num_rows_with_pieces_initially=1)
total_rewards = []
for _ in tqdm(range(1_000)):
    observation, info = env.reset()
    total_reward = 0.
    while True:
        action = model(tensor(observation).float()).argmax().item()
        observation, reward, done, terminated, info = env.step(action)
        total_reward += reward
        if done:
            break
    total_rewards.append(total_reward)
print(total_rewards)
print(mean(total_rewards))

100%|██████████| 1000/1000 [00:03<00:00, 264.80it/s]

[2.0, 1.0, 2.0, 2.0, 2.0, 2.0, 1.0, 0.0, 2.0, 2.0, 1.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 0.0, 2.0, 2.0, 2.0, 2.0, 2.0, -3.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 1.0, 2.0, 2.0, 2.0, 2.0, 2.0, 1.0, 2.0, 2.0, 1.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 1.0, 2.0, 2.0, 1.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 1.0, 1.0, 2.0, 2.0, 2.0, 1.0, 2.0, 1.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 1.0, 1.0, 2.0, 1.0, 2.0, 2.0, 2.0, 2.0, -3.0, 2.0, 2.0, 1.0, 2.0, 2.0, -4.0, 1.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 1.0, 1.0, 2.0, 2.0, 2.0, 2.0, 1.0, 2.0, 2.0, 2.0, 2.0, -3.0, 2.0, 2.0, 2.0, -3.0, 2.0, 2.0, 2.0, -1.0, 2.0, 2.0, -3.0, 2.0, 2.0, 2.0, 2.0, 1.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 1.0, 2.0, 2.0, 1.0, 1.0, 2.0, 1.0, -3.0, 1.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 1.0, 2.0, 1.0, 2.0, 2.0, -3.0, 2.0, 2.0, 2.0, 1.0, 2.0, 1.0, 1.0, -3.0, 1.0, 2.0, 2.0, 2.0, 2.0, 2.0, -3.0, 2.0, 2.0, 0.0, 2.0, -3.0, 2.0, 2.0, 1.0, 1.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, 2.0, -2.0, 2.0, 2




In [None]:
np.random.seed(42)

env = SimplifiedCheckersAgainstRandomPlayer(board_height=4, board_width=4, num_rows_with_pieces_initially=1)
observation, info = env.reset()
print(observation)
env.render()
episode_length = 0
while True:
    episode_length += 1
    output = model(tensor(observation).float())
    print(output)
    action = output.argmax().item()
    print(f"{action=} corresponds to move {move_from_id(action, Color.white, board_width=4, board_height=4)}")
    observation, reward, done, terminated, info = env.step(action)
    print(f"{observation=} {reward=}")
    env.render()
    if done:
        break
print(f"{episode_length=}")

[1 0 0 2 1 0 0 2]
 b b
. . 
 . .
w w 
tensor([1.4820, 1.4657, 0.3766, 0.7868, 0.1444, 0.5787, 0.5354, 0.3854, 1.2775,
        1.4983, 0.0377, 0.2502, 0.4890, 0.2585, 0.2950, 0.7561],
       grad_fn=<ViewBackward0>)
action=9 corresponds to move Move(from_x=2, from_y=0, side=<Side.right: 2>, player=<Player.white: 1>, board_width=4, board_height=4)
observation=array([1, 0, 0, 2, 0, 2, 1, 0]) reward=0.0
 b .
. b 
 . w
w . 
tensor([-1.1828, -0.8191, -0.7516, -0.7926, -0.5825, -1.0097, -0.8869, -1.1647,
        -0.0374, -1.1025, -0.4912,  0.3656, -0.3645, -0.8153,  0.0974, -1.3549],
       grad_fn=<ViewBackward0>)
action=11 corresponds to move Move(from_x=3, from_y=1, side=<Side.right: 2>, player=<Player.white: 1>, board_width=4, board_height=4)
observation=array([1, 0, 0, 0, 0, 2, 2, 0]) reward=-1.0
 . .
. b 
 . b
w . 
tensor([-0.8178, -0.2418, -0.8380, -1.0261, -0.9333, -0.3355, -0.8142, -0.9573,
        -0.5782, -1.7586, -1.3056, -1.1799, -0.8129, -0.5749, -0.6571, -2.0393],
       grad_f