Copyright **`(c)`** 2022 Giovanni Squillero `<squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free for personal or classroom use; see [`LICENSE.md`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  


# Lab 3: ES

## Task

Write agents able to play [*Nim*](https://en.wikipedia.org/wiki/Nim), with an arbitrary number of rows and an upper bound $k$ on the number of objects that can be removed in a turn (a.k.a., *subtraction game*).

The goal of the game is to **avoid** taking the last object.

* Task 2.1: An agent using fixed rules based on *nim-sum* (i.e., an *expert system*)
* Task 2.2: An agent using evolved rules using ES

## Instructions

* Create the directory `lab2` inside the course repo 
* Put a `README.md` and your solution (all the files, code and auxiliary data if needed)

## Notes

* Working in group is not only allowed, but recommended (see: [Ubuntu](https://en.wikipedia.org/wiki/Ubuntu_philosophy) and [Cooperative Learning](https://files.eric.ed.gov/fulltext/EJ1096789.pdf)). Collaborations must be explicitly declared in the `README.md`.
* [Yanking](https://www.emacswiki.org/emacs/KillingAndYanking) from the internet is allowed, but sources must be explicitly declared in the `README.md`.



In [1]:
import logging
from pprint import pprint, pformat
from collections import namedtuple
from random import random, choice, randint
from copy import deepcopy
from typing import Callable, Literal
from dataclasses import dataclass, field
from tqdm.notebook import trange
import numpy as np

## The *Nim* and *Nimply* classes

In [2]:
Nimply = namedtuple("Nimply", "row, num_objects")

In [3]:
class Nim:
    """
    Class implementing the Nim game.

    num_rows: number of rows (piles)
    k: maximum number of objects you can nim from a row
    """

    def __init__(self, num_rows: int, k: int = None) -> None:
        self._rows = [i * 2 + 1 for i in range(num_rows)]
        self._k = k

    def __bool__(self):
        return sum(self._rows) > 0

    def __str__(self):
        return "<" + " ".join(str(_) for _ in self._rows) + ">"

    def __repr__(self):
        return "<" + " ".join(str(_) for _ in self._rows) + ">"

    @property
    def rows(self) -> tuple:
        return tuple(self._rows)

    @property
    def k(self) -> int:
        return self._k

    def nimming(self, ply: Nimply) -> None:
        row, num_objects = ply
        assert self._rows[row] >= num_objects
        assert self._k is None or num_objects <= self._k, f"{num_objects=}, {self._k=}"
        self._rows[row] -= num_objects

## Rule-based startegies 

In [4]:
def pure_random(state: Nim) -> Nimply:
    """A completely random move"""
    row = choice([r for r, c in enumerate(state.rows) if c > 0])
    num_objects = randint(1, state.rows[row] if state.k is None else min(state.rows[row], state.k))
    return Nimply(row, num_objects)

In [5]:
def gabriele(state: Nim) -> Nimply:
    """Pick always the maximum possible number of the lowest row"""
    possible_moves = [
        (r, o) for r, c in enumerate(state.rows) for o in range(1, c + 1 if state.k is None else min(c + 1, state.k))
    ]
    return Nimply(*max(possible_moves, key=lambda m: (-m[0], m[1])))

In [6]:
def nim_sum(state: Nim) -> int:
    """Compute nim-sum value on a Nim game instance"""
    tmp = np.array([tuple(int(x) for x in f"{c:032b}") for c in state.rows])
    xor = tmp.sum(axis=0) % 2
    return int("".join(str(_) for _ in xor), base=2)


def generate_all_plys(raw: Nim) -> list[Nimply]:
    return [
        Nimply(r, o) for r, c in enumerate(raw.rows) for o in range(1, c + 1 if raw.k is None else min(c + 1, raw.k))
    ]


def analize(raw: Nim) -> dict:
    """Given a Nim game instance, this function compute all the possible moves we can do"""
    cooked = dict()
    cooked["possible_moves"] = dict()
    for ply in generate_all_plys(raw):
        tmp = deepcopy(raw)
        tmp.nimming(ply)
        cooked["possible_moves"][ply] = nim_sum(tmp)
    return cooked


def optimal(state: Nim) -> Nimply:
    """
    If possible, this function returns a move which leads to a nim-sum value not equal to zero,
    otherwise a random move among all the possible moves
    """
    analysis = analize(state)
    logging.debug(f"analysis:\n{pformat(analysis)}")
    spicy_moves = [ply for ply, ns in analysis["possible_moves"].items() if ns != 0]
    if not spicy_moves:
        spicy_moves = list(analysis["possible_moves"].keys())
    logging.debug(pformat(f"{analysis['possible_moves']}"))
    ply = choice(spicy_moves)
    return ply

In [7]:
def play_expert_system(state: Nim) -> Nimply:
    """
    This function implement an expert systems which beats the strategies defined above
    """
    analysis = analize(state)
    logging.debug(f"analysis:\n{pformat(analysis)}")
    not_zero_rows = len(state.rows) - state.rows.count(0)
    one_count_rows = state.rows.count(1)
    if one_count_rows == not_zero_rows - 1:
        is_odd = (one_count_rows % 2) == 1
        row, objects = [(row, objects) for row, objects in enumerate(state.rows) if objects > 1][0]
        if is_odd:
            return Nimply(row, objects if state.k is None else min(objects, state.k))
        return Nimply(row, objects - 1 if state.k is None else min(objects - 1, state.k))
    spicy_moves = [ply for ply, ns in analysis["possible_moves"].items() if ns == 0]
    if not spicy_moves:
        spicy_moves = list(analysis["possible_moves"].keys())
    logging.debug(pformat(f"{analysis['possible_moves']}"))
    ply = choice(spicy_moves)
    return ply

## Oversimplified match

In [13]:
def play_game(nim: Nim, strategy1: Callable[[Nim], Nimply], strategy2: Callable[[Nim], Nimply]) -> Literal[0, 1]:
    """
    Play a Nim game using the given strategies.

    Args:
        nim: Nim game instance;
        strategy1: Player 0 strategy;
        strategy2: Player 1 strategy.

    Returns:
        player: the winning player.
    """
    logging.getLogger().setLevel(logging.WARN)

    strategy = (strategy1, strategy2)

    logging.info(f"init : {nim}")
    player = 0
    while nim:
        ply = strategy[player](nim)
        logging.info(f"ply: player {player} plays {ply}, {nim_sum(nim)}")
        nim.nimming(ply)
        logging.info(f"status: {nim}")
        player = 1 - player
    logging.info(f"status: Player {player} won!")

    return player


def play_games(
    nim: Nim,
    player: int,
    player_strategy: Callable[[Nim], Nimply],
    opponent_strategy: Callable[[Nim], Nimply],
    n_matches: int,
) -> None:
    return [
        play_game(deepcopy(nim), player_strategy, opponent_strategy)
        if player == 0
        else play_game(deepcopy(nim), opponent_strategy, player_strategy)
        for _ in range(n_matches)
    ]

## Adaptive Strategy

In [8]:
def get_phase_ratio(state: Nim) -> Nimply:
    all_plys_new_game = len(generate_all_plys(Nim(len(state.rows), state.k)))
    all_plys_current_game = len(generate_all_plys(state))
    return all_plys_current_game / all_plys_new_game

In [62]:
mutation_rate: tuple[float] = (0.1, 0.01)


@dataclass(init=False)
class Individual:
    n_strategies: int
    strategy_probs: list[list[float]]
    phase_thresholds: list[float]

    def __init__(
        self, n_strategies: int = None, strategy_probs: list[list[float]] = None, phase_thresholds: list[float] = None
    ):
        if n_strategies is None:
            n_strategies = 3
        if strategy_probs is None:
            strategy_probs = self._softmax(np.random.randn(3, n_strategies))
        else:
            strategy_probs = self._softmax(strategy_probs)
        if phase_thresholds is None:
            phase_thresholds = sorted([random(), random()])
        else:
            phase_thresholds = sorted(phase_thresholds)

        self.n_strategies = n_strategies
        self.strategy_probs = strategy_probs
        self.phase_thresholds = phase_thresholds

    def _softmax(self, values) -> list[list[float]]:
        tmp = np.exp(values)
        return (tmp / np.sum(tmp, axis=1).reshape(3, 1)).tolist()

    def mutate(ind: "Individual") -> "Individual":
        global mutation_rate
        strategy_probs = np.random.normal(loc=ind.strategy_probs, scale=mutation_rate[0]).tolist()
        phase_thresholds = np.clip(np.random.normal(loc=ind.phase_thresholds, scale=mutation_rate[1]), 0, 1).tolist()
        return Individual(
            n_strategies=ind.n_strategies, strategy_probs=strategy_probs, phase_thresholds=phase_thresholds
        )

    def __call__(self, state: Nim) -> Nimply:
        phase_ratio = get_phase_ratio(state)
        phase_index = (
            0 if phase_ratio < self.phase_thresholds[0] else (2 if phase_ratio > self.phase_thresholds[1] else 1)
        )
        probs = self.strategy_probs[phase_index]
        STRATEGIES = [pure_random, gabriele, play_expert_system]
        strategy = np.random.choice(STRATEGIES, p=probs)
        return strategy(state)

In [74]:
LAMBDA = 40
N_MATCHES = 16

In [75]:
def streak(player_strategy, n_matches, opponent_strategy):
    wins = 0
    for _ in range(n_matches):
        random_size = randint(4, 10)
        random_k = choice([None, None, *[randint(2, random_size * 2 + 1) for _ in range(2)]])
        nim = Nim(random_size, random_k)
        player = choice([0, 1])
        wins += 1 if play_games(nim, player, player_strategy, opponent_strategy, 1)[0] == player else 0
    return wins / n_matches

In [76]:
parent = Individual()
parent_result = streak(parent, N_MATCHES, optimal)
pprint(parent)

pbar = trange(0, 10_000 // LAMBDA)
for _ in pbar:
    pbar.set_description(f'Parent Accuracy: {parent_result:.2%}')
    offspring = [parent.mutate() for _ in range(LAMBDA)]
    results = [streak(ind, N_MATCHES, optimal) for ind in offspring]

    if np.sum(np.array(results) > parent_result) / LAMBDA > 1 / 5:
        mutation_rate = (mutation_rate[0] * 1.1, mutation_rate[1] * 1.1)
    else:
        mutation_rate = (mutation_rate[0] / 1.1, mutation_rate[1] / 1.1)

    solution_index = np.argmax(results)
    if parent_result < results[solution_index]:
        parent = offspring[solution_index]
        parent_result = results[solution_index]

    if parent_result >= 0.999:
        break

streak(parent, 100, optimal)

Individual(n_strategies=3,
           strategy_probs=[[0.21007028333534858,
                            0.2236933223494059,
                            0.5662363943152455],
                           [0.038730389201127625,
                            0.5978504121955865,
                            0.36341919860328575],
                           [0.0730552767802505,
                            0.7932164214088794,
                            0.13372830181087011]],
           phase_thresholds=[0.07520953305765299, 0.7505734706772872])


  0%|          | 0/250 [00:00<?, ?it/s]

KeyboardInterrupt: 

In [77]:
pprint(parent)

Individual(n_strategies=3,
           strategy_probs=[[0.32845477343242346,
                            0.328878596314839,
                            0.3426666302527375],
                           [0.3234416252266368,
                            0.34346122649512173,
                            0.3330971482782415],
                           [0.32293213780298813,
                            0.3527240623526987,
                            0.32434379984431305]],
           phase_thresholds=[0.07525985080352998, 0.7505655173702628])


In [78]:
streak(parent, 100, optimal)

0.56

In [16]:
def assess_strategy(
    nim: Nim,
    player: int,
    player_strategy: Callable[[Nim], Nimply],
    opponent_strategies: list[Callable[[Nim], Nimply]],
    n_matches: int,
) -> None:
    """
    This function prints how many times the given player strategy
    wins against the opponent strategies.

    Args:
        nim: Nim game instance;
        player: which player to play;
        player_strategy: which strategy to play;
        opponent_strategies: which strategies to play against;
        n_matches: number of matches to assess the quality of the strategy.

    Returns:
        None.
    """
    for opponent_strategy in opponent_strategies:
        games = play_games(nim, player, player_strategy, opponent_strategy, n_matches)
        accuracy = games.count(player) / len(games)
        print(
            f"-- Player {player} ({player_strategy.__qualname__}) against {opponent_strategy.__qualname__}: {accuracy:.2%} wins"
        )

In [17]:
nim = Nim(5, 3)

In [18]:
assess_strategy(
    nim=nim,
    player=0,
    player_strategy=play_expert_system,
    opponent_strategies=[pure_random, gabriele, optimal],
    n_matches=1000,
)

-- Player 0 (play_expert_system) against pure_random: 82.10% wins
-- Player 0 (play_expert_system) against gabriele: 94.60% wins
-- Player 0 (play_expert_system) against optimal: 93.80% wins


In [19]:
assess_strategy(
    nim=nim,
    player=1,
    player_strategy=play_expert_system,
    opponent_strategies=[pure_random, gabriele, optimal],
    n_matches=1000,
)

-- Player 1 (play_expert_system) against pure_random: 82.90% wins
-- Player 1 (play_expert_system) against gabriele: 93.20% wins
-- Player 1 (play_expert_system) against optimal: 92.80% wins
