In [1]:
import sys
import time
import random
import torch
from tqdm import tqdm

import numpy as np
import gym
import gym_chess
import math
import pandas as pd
import matplotlib.pyplot as plt

from typing import cast, List, Tuple, Deque, Optional, Callable

env = gym.make("ChessVsSelf-v1", log=False)

In [2]:
def sigmoid(x: np.ndarray) -> np.ndarray:
    """
    Compute the sigmoid function for the input array.

    Parameters
    ----------
    x : np.ndarray
        The input array for which to compute the sigmoid function.

    Returns
    -------
    np.ndarray
        The output array with the sigmoid function applied element-wise.
    """
    return 1.0 / (1.0 + np.exp(-x))


# Logistic Regression ############################################


class LogisticRegression:
    """
    Logistic Regression model for binary classification.

    Parameters
    ----------
    observations_size : int
        The size of the observation space.

    Attributes
    ----------
    observation_size : int
        The size of the observation space.
    params : np.ndarray
        The parameters of the logistic regression model.
    """

    def __init__(self, observations_size: int):
        """
        Initialize the LogisticRegression model with random parameters.

        Parameters
        ----------
        observations_size : int
            The size of the observation space.
        """
        self.observation_size = observations_size
        self.params = np.random.rand(observations_size)

    def __call__(self, observation: np.ndarray) -> int:
        """
        Predict the class label for a given observation.

        Parameters
        ----------
        observation : np.ndarray
            The input observation.

        Returns
        -------
        int
            The predicted class label (0 or 1).
        """
        prob_push_right = sigmoid(np.dot(observation, np.transpose(self.params)))
        
        return 1 if np.random.rand() < prob_push_right else 0

    def get_params(self) -> np.ndarray:
        """
        Get the parameters of the logistic regression model.

        Returns
        -------
        np.ndarray
            The parameters of the logistic regression model.
        """
        return self.params.copy()

    def set_params(self, params: np.ndarray) -> None:
        """
        Set the parameters of the logistic regression model.

        Parameters
        ----------
        params : np.ndarray
            The parameters of the logistic regression model.
        """
        self.params = params.copy()

In [3]:
def test_agent(
    env: gym.Env, policy: torch.nn.Module, num_episode: int = 1, num_steps: int = 100
) -> List[float]:
    """
    Test a naive agent in the given environment using the provided Q-network.

    Parameters
    ----------
    env : gym.Env
        The environment in which to test the agent.
    policy : torch.nn.Module
        The neural network to use for decision making.
    num_episode : int, optional
        The number of episodes to run, by default 1.

    Returns
    -------
    List[float]
        A list of rewards per episode.
    """
    collected_rewards = []

    for episode_id in range(num_episode):
        observation = env.reset()
        print("\n", "=" * 10, "NEW GAME", "=" * 10)
        env.render()
        total_rewards = {"WHITE": 0, "BLACK": 0}

        
        for j in range(num_steps):
            
            observation = observation.flatten()
            
            action = policy(observation)

            state, reward, done, _ = env.step(action)
            total_rewards["WHITE"] += reward
            if done:
                break

            # black moves
            moves = env.possible_moves
            m = random.choice(moves)
            a = env.move_to_action(m)
            # perform action
            state, reward, done, _ = env.step(a)
            total_rewards["BLACK"] += reward
            if done:
                break

            observation = state

        print(">" * 5, "GAME", i, "REWARD:", total_rewards)
        collected_rewards.append(total_rewards)


    return collected_rewards

In [4]:
nn_policy = LogisticRegression(64)

test_agent(env, nn_policy, num_episode=1, num_steps=100)


    -------------------------
 8 |  ♖  ♘  ♗  ♕  ♔  ♗  ♘  ♖ |
 7 |  ♙  ♙  ♙  ♙  ♙  ♙  ♙  ♙ |
 6 |  .  .  .  .  .  .  .  . |
 5 |  .  .  .  .  .  .  .  . |
 4 |  .  .  .  .  .  .  .  . |
 3 |  .  .  .  .  .  .  .  . |
 2 |  ♟  ♟  ♟  ♟  ♟  ♟  ♟  ♟ |
 1 |  ♜  ♞  ♝  ♛  ♚  ♝  ♞  ♜ |
    -------------------------
      a  b  c  d  e  f  g  h 


NameError: name 'i' is not defined

In [5]:
class ObjectiveFunction:
    """
    Objective function for evaluating a policy in a given environment.

    Parameters
    ----------
    env : gym.Env
        The environment in which to evaluate the policy.
    policy : torch.nn.Module
        The policy to evaluate.
    num_episodes : int, optional
        The number of episodes to run for each evaluation, by default 1.
    max_time_steps : float, optional
        The maximum number of time steps per episode, by default float("inf").
    minimization_solver : bool, optional
        Whether the solver is a minimization solver, by default True.

    Attributes
    ----------
    env : gym.Env
        The environment in which to evaluate the policy.
    policy : torch.nn.Module
        The policy to evaluate.
    num_episodes : int
        The number of episodes to run for each evaluation.
    max_time_steps : float
        The maximum number of time steps per episode.
    minimization_solver : bool
        Whether the solver is a minimization solver.
    num_evals : int
        The number of evaluations performed.
    """

    def __init__(
        self,
        env: gym.Env,
        policy: torch.nn.Module,
        num_episodes: int = 1,
        max_time_steps: float = float("inf"),
        minimization_solver: bool = True,
    ):
        self.env = env
        self.policy = policy
        self.num_episodes = num_episodes
        self.max_time_steps = max_time_steps
        self.minimization_solver = minimization_solver

        self.num_evals = 0

    def eval(self, policy_params: np.ndarray, num_episodes: Optional[int] = None, max_time_steps: Optional[float] = None) -> float:
        """
        Evaluate a policy.

        Parameters
        ----------
        policy_params : np.ndarray
            The parameters of the policy to evaluate.
        num_episodes : int, optional
            The number of episodes to run for each evaluation, by default None.
        max_time_steps : float, optional
            The maximum number of time steps per episode, by default None.

        Returns
        -------
        float
            The average total rewards over the evaluation episodes.
        """
        self.policy.set_params(policy_params)

        self.num_evals += 1

        if num_episodes is None:
            num_episodes = self.num_episodes

        if max_time_steps is None:
            max_time_steps = self.max_time_steps

        average_total_rewards = 0

        for i_episode in range(num_episodes):
            total_rewards = {"WHITE": 0, "BLACK": 0}
            observation = self.env.reset()

            for t in range(max_time_steps):
                observation = observation.flatten()
            
                action = self.policy(observation)

                state, reward, done, _ = env.step(action)
                total_rewards["WHITE"] += reward
                if done:
                    break

                # black moves
                moves = env.possible_moves
                m = random.choice(moves)
                a = env.move_to_action(m)
                # perform action
                state, reward, done, _ = env.step(a)
                total_rewards["BLACK"] += reward
                if done:
                    break

                observation = state

            average_total_rewards += float(total_rewards["WHITE"]) / num_episodes

        if self.minimization_solver:
            average_total_rewards *= -1.0

        return average_total_rewards  # Optimizers do minimization by default...

    def __call__(self, policy_params: np.ndarray, num_episodes: Optional[int] = None, max_time_steps: Optional[float] = None) -> float:
        """
        Evaluate a policy.

        Parameters
        ----------
        policy_params : np.ndarray
            The parameters of the policy to evaluate.
        num_episodes : int, optional
            The number of episodes to run for each evaluation, by default None.
        max_time_steps : float, optional
            The maximum number of time steps per episode, by default None.

        Returns
        -------
        float
            The average total rewards over the evaluation episodes.
        """
        return self.eval(policy_params, num_episodes, max_time_steps)

In [6]:
def cem_uncorrelated(
    objective_function: Callable[[np.ndarray], float],
    mean_array: np.ndarray,
    var_array: np.ndarray,
    max_iterations: int = 500,
    sample_size: int = 50,
    elite_frac: float = 0.2,
    print_every: int = 10,
    success_score: float = float("inf"),
    num_evals_for_stop: Optional[int] = None,
    hist_dict: Optional[dict] = None,
) -> np.ndarray:
    """
    Cross-entropy method.

    Parameters
    ----------
    objective_function : Callable[[np.ndarray], float]
        The function to maximize.
    mean_array : np.ndarray
        The initial proposal distribution (mean vector).
    var_array : np.ndarray
        The initial proposal distribution (variance vector).
    max_iterations : int, optional
        Number of training iterations, by default 500.
    sample_size : int, optional
        Size of population at each iteration, by default 50.
    elite_frac : float, optional
        Rate of top performers to use in update with elite_frac ∈ ]0;1], by default 0.2.
    print_every : int, optional
        How often to print average score, by default 10.
    success_score : float, optional
        The score at which to stop the optimization, by default float("inf").
    num_evals_for_stop : Optional[int], optional
        Number of evaluations for stopping criteria, by default None.
    hist_dict : Optional[dict], optional
        Dictionary to log the history, by default None.

    Returns
    -------
    np.ndarray
        The optimized mean vector.
    """
    assert 0.0 < elite_frac <= 1.0

    n_elite = math.ceil(sample_size * elite_frac)

    for iteration_index in range(0, max_iterations):

        # SAMPLE A NEW POPULATION OF SOLUTIONS (X VECTORS) ####################

        # SAMPLE A NEW POPULATION OF SOLUTIONS (X VECTORS)
        x_array = torch.tensor(mean_array) + torch.sqrt(torch.tensor(var_array)) * torch.randn((sample_size, len(mean_array)))


        # EVALUATE SAMPLES AND EXTRACT THE BEST ONES ("ELITE") ################

        score_array = torch.tensor([objective_function(x.numpy(), 1, 100) for x in tqdm(x_array)])

        sorted_indices_array = score_array.argsort()       # Sort from the lower score to the higher one
        elite_indices_array = sorted_indices_array[:n_elite]              # Recall: we *minimize* the objective function thus we take the samples that are at the begining of the sorted_indices

        elite_x_array = x_array[elite_indices_array]

        # FIT THE NORMAL DISTRIBUTION ON THE ELITE POPULATION #################

        mean_array = elite_x_array.mean(dim=0)
        var_array = elite_x_array.var(dim=0)
        score = objective_function(mean_array.numpy(), 1, 100)

        # PRINT STATUS ########################################################

        if iteration_index % print_every == 0:
            print("Iteration {}\tScore {}".format(iteration_index, score))

        if hist_dict is not None:
            hist_dict[iteration_index] = score

        # STOPPING CRITERIA ####################################################

        if num_evals_for_stop is not None:
            score = objective_function(mean_array.numpy(), num_evals_for_stop)

        # `num_evals_for_stop = None` may be used to fasten computations but it introduces bias...
        if score <= success_score:
            break

    return mean_array.numpy()

In [7]:
env = gym.make("ChessVsSelf-v1", log=False)

nn_policy = LogisticRegression(64)

objective_function = ObjectiveFunction(
    env=env, policy=nn_policy, num_episodes=10, max_time_steps=1000
)

In [8]:
hist_dict = {}

num_params = len(nn_policy.get_params())

init_mean_array = np.random.random(num_params)
init_var_array = np.ones(num_params) * 100.0

optimized_policy_params = cem_uncorrelated(
    objective_function=objective_function,
    mean_array=init_mean_array,
    var_array=init_var_array,
    max_iterations=5,
    sample_size=10,
    elite_frac=0.2,
    print_every=1,
    success_score=-500,
    num_evals_for_stop=None,
    hist_dict=hist_dict,
)

env.close()

100%|██████████| 10/10 [00:25<00:00,  2.56s/it]
  x_array = torch.tensor(mean_array) + torch.sqrt(torch.tensor(var_array)) * torch.randn((sample_size, len(mean_array)))


Iteration 0	Score 1000.0


  return 1.0 / (1.0 + np.exp(-x))
100%|██████████| 10/10 [00:26<00:00,  2.62s/it]


Iteration 1	Score 980.0


100%|██████████| 10/10 [00:25<00:00,  2.50s/it]


Iteration 2	Score 980.0


100%|██████████| 10/10 [00:27<00:00,  2.72s/it]


Iteration 3	Score 980.0


100%|██████████| 10/10 [00:22<00:00,  2.29s/it]


Iteration 4	Score 980.0


In [None]:
score = objective_function(optimized_policy_params, 1, 100)

In [None]:
print(score)

960.0


In [None]:
df = pd.DataFrame.from_dict(
    hist_dict,
    orient="index",
    columns=["score", "mu1", "mu2", "mu3", "mu4", "var1", "var2", "var3", "var4"],
)