# Lab 2: ES

## Task

Write agents able to play [*Nim*](https://en.wikipedia.org/wiki/Nim), with an arbitrary number of rows and an upper bound $k$ on the number of objects that can be removed in a turn (a.k.a., *subtraction game*).

The goal of the game is to **avoid** taking the last object.

* Task2.1: An agent using fixed rules based on *nim-sum* (i.e., an *expert system*)
* Task2.2: An agent using evolved rules using ES

## Instructions

* Create the directory `lab2` inside your personal course repository for the course 
* Put a `README.md` and your solution (all the files, code and auxiliary data if needed)

## Notes

* Working in group is not only allowed, but recommended (see: [Ubuntu](https://en.wikipedia.org/wiki/Ubuntu_philosophy) and [Cooperative Learning](https://files.eric.ed.gov/fulltext/EJ1096789.pdf)). Collaborations must be explicitly declared in the `README.md`.
* [Yanking](https://www.emacswiki.org/emacs/KillingAndYanking) from the internet is allowed, but sources must be explicitly declared in the `README.md`.



In [2]:
import logging
from pprint import pformat
from collections import namedtuple
import random
from copy import deepcopy
from dataclasses import dataclass
from tqdm.notebook import trange
from scipy.special import softmax

## The *Nim* and *Nimply* classes

In [3]:
# move definition, which row and number of object to remove
Nimply = namedtuple("Nimply", "row, num_objects")

In [4]:
class Nim:
    def __init__(self, num_rows: int, k: int = None) -> None:
        self._rows = [i * 2 + 1 for i in range(num_rows)]
        if k == None:
            self._k = self._rows[-1]
        else:
            self._k = k

    def __bool__(self):
        return sum(self._rows) > 0

    def __str__(self):
        return "<" + " ".join(str(_) for _ in self._rows) + ">"

    @property
    def rows(self) -> tuple:
        return tuple(self._rows)

    @property
    def k(self) -> int:
        return self._k

    # action of removing
    def nimming(self, ply: Nimply) -> None:
        row, num_objects = ply
        assert self._rows[row] >= num_objects
        assert num_objects <= self._k
        self._rows[row] -= num_objects

## Sample (and silly) startegies 

In [5]:
def pure_random(state: Nim) -> Nimply:
    """A completely random move"""
    row = random.choice([r for r, c in enumerate(state.rows) if c > 0])
    num_objects = random.randint(1, min(state.rows[row], state.k))
    return Nimply(row, num_objects)

In [6]:
def gabriele(state: Nim) -> Nimply:
    """Pick always the maximum possible number of the lowest row"""
    possible_moves = [(r, o) for r, c in enumerate(state.rows) for o in range(1, min(c + 1, state.k))]
    return Nimply(*max(possible_moves, key=lambda m: (-m[0], m[1])))

In [7]:
import numpy as np


def nim_sum(state: Nim) -> int:
    """Compute the bitwise xor of the heaps. In the combinatorial game theory usually called the nim-sum"""
    tmp = np.array([tuple(int(x) for x in f"{c:032b}") for c in state.rows])
    xor = tmp.sum(axis=0) % 2
    return int("".join(str(_) for _ in xor), base=2)


# compute for each possibile move the nim_sum and return a dictionary of possible_moves : { Numply: nim_sum , ... ,}
def analize(raw: Nim) -> dict:
    """compute for each possibile move the nim_sum and return a dictionary of possible_moves : { Numply: nim_sum , ... ,}"""
    cooked = dict()
    cooked["possible_moves"] = dict()
    for ply in (Nimply(r, o) for r, c in enumerate(raw.rows) for o in range(1, min(c + 1, raw.k))):
        tmp = deepcopy(raw)
        tmp.nimming(ply)
        cooked["possible_moves"][ply] = nim_sum(tmp)
    return cooked


def optimal(state: Nim) -> Nimply:
    """The professor optimal solution"""
    analysis = analize(state)
    logging.debug(f"analysis:\n{pformat(analysis)}")

    # take only the moves that have nim_sum != 0
    spicy_moves = [ply for ply, ns in analysis["possible_moves"].items() if ns != 0 and ply.num_objects <= state.k]
    if not spicy_moves:
        spicy_moves = [move for move in list(analysis["possible_moves"].keys()) if move.num_objects <= state.k]
    ply = random.choice(spicy_moves)
    return ply

## Task 2.1
My implementation of Rule-Based agent based on Nim-Sum:

Idea:
We always play a move in order to have a nim-sum value of 0. 
At some point, we end up in a position that has only one row of size 2 or more.
In such a position the nim-sum value is not equal to 0. To win we must reduce this to size 0 or 1, leaving an odd number of rows with size 1. From that point on, all moves are forced.

Reference: https://en.wikipedia.org/wiki/Nim#Proof_of_the_winning_formula.

The following function implements this winning strategy:

In [8]:
def expert_system(state: Nim) -> Nimply:
    """
    Implementation of an expert system which beats the strategies defined above.
    Details on how to win are available at https://en.wikipedia.org/wiki/Nim#Proof_of_the_winning_formula.

    Args:
        state: Nim game instance.

    Returns:
        The play to do.
    """
    analysis = analize(state)
    rows_not_zero = len(state.rows) - state.rows.count(0)
    # case of one single row with lenght >= 2
    if state.rows.count(1) == (rows_not_zero - 1):
        row, num_objects = [(row, num_objects) for row, num_objects in enumerate(state.rows) if num_objects > 1][0]
        if (rows_not_zero % 2) == 1:
            num_objects = num_objects if (num_objects - 1) <= state.k else state.k
            return Nimply(row, num_objects - 1)
        else:
            num_objects = num_objects if num_objects <= state.k else state.k
            return Nimply(row, num_objects)
    # case with more row with length >= 2
    spicy_moves = [ply for ply, ns in analysis["possible_moves"].items() if ns == 0]
    if not spicy_moves:
        spicy_moves = [move for move in list(analysis["possible_moves"].keys())]
    ply = random.choice(spicy_moves)
    return ply

In [9]:
def match(nim: Nim, strategy: tuple[Nimply]) -> int:
    """
    Function to do a match.

    Args:
        nim: Nim game instance.
        strategy: tuple of (player_strategy,opponent_strategy)
    Returns:
        The winner player.
    """

    logging.getLogger().setLevel(logging.WARN)
    logging.info(f"init : {nim}")
    player = 0
    while nim:
        ply = strategy[player](nim)
        logging.info(f"ply: player {player} plays {ply}")
        nim.nimming(ply)
        logging.info(f"status: {nim}")
        player = 1 - player
    logging.info(f"status: Player {player} won!")
    return player

In [28]:
def test(nim: Nim, player_strategy: Nimply, opponent_strategies: tuple[Nimply], number_of_games: int) -> None:
    """
    Function to test a strategy.

    Args:
        nim: Nim game instance.
        player_strategy: the player strategy.
        opponent_strategy: a tupla of the opponent strategies.
        number_of_games: number of games to play.
    Returns:
        A print of percentage of win of the player strategy vs all the opponent strategies.
    """
    for strategy in opponent_strategies:
        wins = 0
        for _ in range(number_of_games):
            tmp = deepcopy(nim)

            winner = match(tmp, (player_strategy, strategy))

            if winner == 0:
                wins += 1

        print(f"Win-rate of {player_strategy.__qualname__} vs {strategy.__qualname__}: {(wins / number_of_games):.2%}")

In [12]:
nim = Nim(5, 3)
opponent_strategys = (optimal, pure_random, gabriele)
test(nim, expert_system, opponent_strategys, 100)

Player 0 with strategy expert_system vs Player 1 with strategy optimal: 95.00% win rate
Player 0 with strategy expert_system vs Player 1 with strategy pure_random: 88.00% win rate
Player 0 with strategy expert_system vs Player 1 with strategy gabriele: 93.00% win rate


## Task 2.2
My implementation of Evolved Rules Agent with adaptive strategies

Idea:
The idea was to divide the game into three phases: initial, intermediate and final. Trying to choose the most appropriate moves to make in each phase from the previous strategies. To do this we define a method to understand the phases of the game. And we use a probability to use a specific strategy for each phase. In this way we are able to implement an evolutionary agent with these parameters to be optimized:
 - **Phase Thresholds**: the thresholds for the phases of the game (early, mid, end)  
      We measure the phase of the game by the number of theoretical moves left to end the game over the total number of moves (it thus depends on both size of the board and k-limit): $p\in [0,1]$  
      The thresholds are thus $t_{1}, t_{2} \in [0,1]$ and the phase is defined as follows:  
      - Early phase: $0 \leq\ p \leq t_{1}$
      - Mid  phase: $t_{1} < p \leq t_{2}$
      - Late phase: $t_{2} < p\ < 1$
 - **Strategy Weights**: values between [1,10] to compute the probabilities of the strategies to be used in each phase, we then normalize them with the softmax function so that we have the actual probability values for each strategy.

The fitness of an state is the average of the accuracy of the games played against the *expert* rule-based agent. In this way, we think that probably the Adpative Strategy would collapse to always use the expert system, thus getting to a 50% win rate against him. But, perhaps, the Adpative Strategy could learn that at some stages of the game, playing with other strategies, the expert can be beaten.

In [13]:
def moves_ratio(nim: Nim):
    """
    Compute a value to understand the phase of game.

    Args:
        state: Nim game instance.

    Returns:
        A value between 0 and 1 (0 early game -> 1 end game).
    """
    # total possible moves
    possible_moves = sum([1 for _, c in enumerate(nim.rows) for _ in range(1, min(c + 1, nim.k))])

    inital_nim = Nim(len(nim.rows), nim.k)
    # total moves in the start game
    total_moves = sum([1 for _, c in enumerate(inital_nim.rows) for _ in range(1, min(c + 1, nim.k))])

    return possible_moves / total_moves

In [54]:
STRATEGIES = [expert_system, pure_random, gabriele, optimal]
mutation_rate: float = (0.1, 3)


@dataclass(init=False)
class State:
    """
    Class that define the state for the evolution strategies
    """

    n_strategy: int
    phase_thresholds: list[float]
    strategy_weights: list[list[float]]

    def __init__(self, n_strategy: int = None, strategy_weights=None, phase_thresholds=None) -> None:
        if n_strategy is None:
            n_strategy = len(STRATEGIES)
        if phase_thresholds is None:
            phase_thresholds = sorted([random.random(), random.random()])
        else:
            phase_thresholds = sorted([max(0, phase_thresholds[0]), min(1, phase_thresholds[1])])
        if strategy_weights is None:
            strategy_weights = np.random.uniform(1, 10, (len(phase_thresholds) + 1, n_strategy))

        self.n_strategy = n_strategy
        self.strategy_weights = strategy_weights
        self.phase_thresholds = phase_thresholds

    def mutate(state: "State") -> "State":
        global mutation_rate
        state = deepcopy(state)
        phase_thresholds = np.clip(np.random.normal(state.phase_thresholds, mutation_rate[0]), 0, 1).tolist()
        strategy_weights = np.clip(np.random.normal(state.strategy_weights, mutation_rate[1]), 1, 10).tolist()
        return State(strategy_weights=strategy_weights, phase_thresholds=phase_thresholds, n_strategy=state.n_strategy)

    def __call__(self: "State", state: Nim) -> Nimply:
        phase_ratio = moves_ratio(state)
        phase_index = (
            0
            if phase_ratio < self.phase_thresholds[0]
            else (1 if self.phase_thresholds[0] <= phase_ratio <= self.phase_thresholds[1] else 2)
        )
        probs = softmax(self.strategy_weights[phase_index])
        strategy = np.random.choice(STRATEGIES, p=probs)
        return strategy(state)

    def __str__(self):
        return f"Phase thresholds: {self.phase_thresholds}\nStrategies: {[s.__qualname__ for s in STRATEGIES]}\nStrategies probs in Early game: {softmax(self.strategy_weights[0])}\nStrategies probs in Mid game: {softmax(self.strategy_weights[1])}\nStrategies probs in End game: {softmax(self.strategy_weights[0])}"

In [40]:
OPPONENT = expert_system
N_MATCHES = 10


def streak(player_strategy, opponent_strategy=OPPONENT, number_of_matches: int = N_MATCHES) -> float:
    """
    Play a given number of random matches between two strategies.

    Args:
        player_strategy: the player strategy;
        opponent_strategy: the opponent strategy;
        n_matches: number of matchers to play.

    Returns:
        Percentage of wins.
    """
    wins = 0
    for _ in range(number_of_matches):
        random_size = random.randint(4, 10)
        random_k = random.randint(2, 10)
        nim = Nim(random_size, random_k)
        wins += 1 if match(nim, (player_strategy, opponent_strategy)) == 0 else 0
    return wins / number_of_matches

### (1 + $\lambda$)-ES

In [55]:
LAMBDA = 40
N_MATCHES = 10

solution = State()
solution_winrate = streak(solution)
mutation_rate: float = (0.1, 3)
pbar = trange(0, 1_000 // LAMBDA)


for i in pbar:
    pbar.set_description(f"Parent Win-rate: {solution_winrate:.2%}")
    offspring = [solution.mutate() for _ in range(LAMBDA)]
    results = [streak(i) for i in offspring]
    incr_rate = np.sum([res > solution_winrate for res in results]) / LAMBDA

    if incr_rate > 1 / 5:
        mutation_rate = (mutation_rate[0] * 1.2, mutation_rate[1] * 1.2)

    elif incr_rate < 1 / 5:
        mutation_rate = (mutation_rate[0] / 1.2, mutation_rate[1] / 1.2)

    solution_ind = np.argmax(results)
    # update the solution only if it is better than the parent solution

    if solution_winrate < results[solution_ind]:
        solution = offspring[solution_ind]

        solution_winrate = results[solution_ind]

    if solution_winrate >= 0.99:
        break

print(f'Parameters of solution:\n{solution}')

  0%|          | 0/25 [00:00<?, ?it/s]

Parameters of solution:
Phase thresholds: [0, 0.9832965109861335]
Strategies: ['expert_system', 'pure_random', 'gabriele', 'optimal']
Strategies probs in Early game: [0.00380678 0.00420187 0.0364653  0.95552606]
Strategies probs in Mid game: [9.98506418e-01 4.79051348e-04 5.35479119e-04 4.79051348e-04]
Strategies probs in End game: [0.00380678 0.00420187 0.0364653  0.95552606]


In [63]:
nim = Nim(5, 3)
opponent_strategies = (optimal, pure_random, gabriele, expert_system)
solution.__qualname__ = 'adaptive strategy (1 + 𝜆)-ES'
test(nim, solution, opponent_strategys, 100)

Win-rate of adaptive strategy (1 + 𝜆)-ES vs optimal: 95.00%
Win-rate of adaptive strategy (1 + 𝜆)-ES vs pure_random: 83.00%
Win-rate of adaptive strategy (1 + 𝜆)-ES vs gabriele: 90.00%
Win-rate of adaptive strategy (1 + 𝜆)-ES vs expert_system: 63.00%


### (1,$\lambda$)-ES

In [35]:
LAMBDA = 40
N_MATCHES = 10

solution = State()
solution_winrate = streak(solution)
mutation_rate: float = (0.1, 3)

pbar = trange(0, 1_000 // LAMBDA)
for i in pbar:
    pbar.set_description(f"Parent Win-rate: {solution_winrate:.2%}")
    offspring = [solution.mutate() for _ in range(LAMBDA)]
    results = [streak(i) for i in offspring]

    incr_rate = np.sum([res > solution_winrate for res in results]) / LAMBDA

    if incr_rate > 1 / 5:
        mutation_rate = (mutation_rate[0] * 1.2, mutation_rate[1] * 1.2)
    elif incr_rate < 1 / 5:
        mutation_rate = (mutation_rate[0] / 1.2, mutation_rate[1] / 1.2)

    # always update the solution
    solution_ind = np.argmax(results)
    solution = offspring[solution_ind]
    solution_winrate = results[solution_ind]

    if solution_winrate >= 0.99:
        break

print(f'Parameters of solution:\n{solution}')

  0%|          | 0/25 [00:00<?, ?it/s]

In [36]:
nim = Nim(5, 3)
opponent_strategys = (optimal, pure_random, gabriele, expert_system)
solution.__qualname__ = 'adaptive strategy (1 , 𝜆)-ES'
test(nim, solution, opponent_strategys, 100)

Win-rate of adaptive strategy (1 , 𝜆)-ES vs optimal: 88.00%
Win-rate of adaptive strategy (1 , 𝜆)-ES vs pure_random: 81.00%
Win-rate of adaptive strategy (1 , 𝜆)-ES vs gabriele: 94.00%
Win-rate of adaptive strategy (1 , 𝜆)-ES vs expert_system: 64.00%
