Copyright **`(c)`** 2022 Giovanni Squillero `<squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free for personal or classroom use; see [`LICENSE.md`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  


# Lab 3: ES

## Task

Write agents able to play [*Nim*](https://en.wikipedia.org/wiki/Nim), with an arbitrary number of rows and an upper bound $k$ on the number of objects that can be removed in a turn (a.k.a., *subtraction game*).

The goal of the game is to **avoid** taking the last object.

* Task2.1: An agent using fixed rules based on *nim-sum* (i.e., an *expert system*)
* Task2.2: An agent using evolved rules using ES

## Instructions

* Create the directory `lab2` inside the course repo 
* Put a `README.md` and your solution (all the files, code and auxiliary data if needed)

## Notes

* Working in group is not only allowed, but recommended (see: [Ubuntu](https://en.wikipedia.org/wiki/Ubuntu_philosophy) and [Cooperative Learning](https://files.eric.ed.gov/fulltext/EJ1096789.pdf)). Collaborations must be explicitly declared in the `README.md`.
* [Yanking](https://www.emacswiki.org/emacs/KillingAndYanking) from the internet is allowed, but sources must be explicitly declared in the `README.md`.



In [54]:
import logging
from pprint import pprint, pformat
from collections import namedtuple
import random
from copy import deepcopy
import numpy as np

## The *Nim* and *Nimply* classes

In [55]:
Nimply = namedtuple("Nimply", "row, num_objects")


In [56]:
class Nim:
    def __init__(self, num_rows: int) -> None:
        self._rows = [i * 2 + 1 for i in range(num_rows)]

    def __bool__(self):
        return sum(self._rows) > 0

    def __str__(self):
        return "<" + " ".join(str(_) for _ in self._rows) + ">"

    @property
    def rows(self) -> tuple:
        return tuple(self._rows)

    def nimming(self, ply: Nimply) -> None:
        row, num_objects = ply
        assert self._rows[row] >= num_objects
        self._rows[row] -= num_objects


## Sample (and silly) startegies 

In [57]:
def pure_random(state: Nim) -> Nimply:
    """A completely random move"""
    row = random.choice([r for r, c in enumerate(state.rows) if c > 0])
    num_objects = random.randint(1, state.rows[row])
    return Nimply(row, num_objects)


In [58]:
def take_from_last(state: Nim) -> Nimply:
    """Takes a random number from the last row"""
    row = max([r for r, c in enumerate(state.rows) if c > 0])
    num_objects = random.randint(1, state.rows[row])
    return Nimply(row, num_objects)

In [59]:
def take_from_first(state: Nim) -> Nimply:
    """Takes a random number from the first row"""
    row = min([r for r, c in enumerate(state.rows) if c > 0])
    num_objects = random.randint(1, state.rows[row])
    return Nimply(row, num_objects)

In [60]:
def gabriele(state: Nim) -> Nimply:
    """Pick always the maximum possible number of the lowest row"""
    possible_moves = [(r, o) for r, c in enumerate(state.rows) for o in range(1, c + 1)]
    return Nimply(*max(possible_moves, key=lambda m: (m[0], m[1])))


In [61]:
def eleirbag(state: Nim) -> Nimply:
    """Pick always the maximum possible number of the highest row, the opposite of gabriele"""
    possible_moves = [(r, o) for r, c in enumerate(state.rows) for o in range(1, c + 1)]
    return Nimply(*max(possible_moves, key=lambda m: (-m[0], m[1])))

In [62]:
import numpy as np


def nim_sum(state: Nim) -> int:
    tmp = np.array([tuple(int(x) for x in f"{c:032b}") for c in state.rows])
    xor = tmp.sum(axis=0) % 2
    return int("".join(str(_) for _ in xor), base=2)


def analize(raw: Nim) -> dict:
    cooked = dict()
    cooked["possible_moves"] = dict()
    for ply in (Nimply(r, o) for r, c in enumerate(raw.rows) for o in range(1, c + 1)):
        tmp = deepcopy(raw)
        tmp.nimming(ply)
        cooked["possible_moves"][ply] = nim_sum(tmp)
    return cooked


def optimal(state: Nim) -> Nimply:
    analysis = analize(state)
    logging.debug(f"analysis:\n{pformat(analysis)}")
    spicy_moves = [ply for ply, ns in analysis["possible_moves"].items() if ns != 0]
    if not spicy_moves:
        spicy_moves = list(analysis["possible_moves"].keys())
    ply = random.choice(spicy_moves)
    return ply


## Need to define a real optimal strategy
 This should be it, from [wikipedia](https://en.wikipedia.org/wiki/Nim)

 In other words, follow the common startegy (always try to leave the board in a state with nim-sum = 0), but when the move would leave only 1-sized heaps, in that case try to leave a odd number of such heaps

In [63]:
def only_size1(state: Nim) -> bool:
    return (sum(np.array(state.rows) == 1) + sum(np.array(state.rows) == 0)) == len(state.rows)

def n_size1(state: Nim) -> int:
    return sum(np.array(state.rows) == 1)

def real_optimal(state: Nim) -> Nimply:
    analysis = analize(state)
    logging.debug(f"analysis:\n{pformat(analysis)}")
    spicy_moves = [ply for ply, ns in analysis["possible_moves"].items() if ns == 0]
    if not spicy_moves or nim_sum(state) == 0:
        ply = random.choice(list(analysis["possible_moves"].keys()))
    else: #if only heaps of size 1, leave an odd number of them    
        ply = random.choice(spicy_moves)
        tmp = deepcopy(state)
        tmp.nimming(ply)
        if only_size1(tmp) and n_size1(tmp) % 2 == 0: #modify the play
            if ply[1] < state.rows[ply[0]]:
                return Nimply(ply[0], ply[1] + 1)
            if ply[1] > 1:
                return Nimply(ply[0], ply[1] - 1)
    return ply

## Oversimplified match

In [64]:
logging.getLogger().setLevel(logging.INFO)

strategy = (take_from_first, real_optimal)

nim = Nim(5)
logging.info(f"init : {nim}")
player = 0
while nim:
    ply = strategy[player](nim)
    logging.info(f"ply: player {player} plays {ply}")
    nim.nimming(ply)
    logging.info(f"status: {nim}")
    player = 1 - player
logging.info(f"status: Player {player} won!")

INFO:root:init : <1 3 5 7 9>
INFO:root:ply: player 0 plays Nimply(row=0, num_objects=1)
INFO:root:status: <0 3 5 7 9>
INFO:root:ply: player 1 plays Nimply(row=4, num_objects=8)
INFO:root:status: <0 3 5 7 1>
INFO:root:ply: player 0 plays Nimply(row=1, num_objects=2)
INFO:root:status: <0 1 5 7 1>
INFO:root:ply: player 1 plays Nimply(row=3, num_objects=2)
INFO:root:status: <0 1 5 5 1>
INFO:root:ply: player 0 plays Nimply(row=1, num_objects=1)
INFO:root:status: <0 0 5 5 1>
INFO:root:ply: player 1 plays Nimply(row=3, num_objects=1)
INFO:root:status: <0 0 5 4 1>
INFO:root:ply: player 0 plays Nimply(row=2, num_objects=2)
INFO:root:status: <0 0 3 4 1>
INFO:root:ply: player 1 plays Nimply(row=3, num_objects=2)
INFO:root:status: <0 0 3 2 1>
INFO:root:ply: player 0 plays Nimply(row=2, num_objects=2)
INFO:root:status: <0 0 1 2 1>
INFO:root:ply: player 1 plays Nimply(row=3, num_objects=1)
INFO:root:status: <0 0 1 1 1>
INFO:root:ply: player 0 plays Nimply(row=2, num_objects=1)
INFO:root:status: <0 0

In [65]:
class adaptive_class:
    def __init__(self, stdev = 0.2, stdev_step = 0.1) -> None:
        self.strategy = real_optimal
        self.stdev = stdev
        self.stdev_step = stdev_step
        self.genome = {real_optimal: 0, pure_random: 0, take_from_last: 0, take_from_first: 0, eleirbag: 0}
        for k in self.genome.keys():
            self.genome[k] = 1

    def tweak(self) -> None:
        """randomly tweaks the genome of the object"""
        for k in self.genome.keys():
            self.genome[k] += np.random.normal(0, self.stdev) 
            self.genome[k] = 0 if self.genome[k] < 0 else self.genome[k] 

    def apply_rule(self, ratio) -> None:
        """modifies exploration/exploitation based on 1/5 rule"""
        #ratio > 1/5 -> increase exploration -> increase stdev
        self.stdev += self.stdev * self.stdev_step * (ratio - 0.2)

    def reset_strategy(self) -> None:
        """resets the strategy to be used, call this before every match"""
        a_random = random.random() * sum(self.genome.values())
        k = real_optimal
        for k in self.genome.keys():
            a_random -= self.genome[k]
            if a_random <= 0:
                self.strategy = k
                break 

    def adaptive(self, state: Nim) -> Nimply:
        """play a move, this strategy that can adapt its parameters"""
        return self.strategy(state)  

adaptive_obj = adaptive_class()
adaptive_obj.adaptive(Nim(5))
print(adaptive_obj.genome)
adaptive_obj.tweak()
print(adaptive_obj.genome)

{<function real_optimal at 0x7f2930135900>: 1, <function pure_random at 0x7f2930136440>: 1, <function take_from_last at 0x7f29301364d0>: 1, <function take_from_first at 0x7f293019cdc0>: 1, <function eleirbag at 0x7f29301375b0>: 1}
{<function real_optimal at 0x7f2930135900>: 0.921803718205494, <function pure_random at 0x7f2930136440>: 0.947778613975927, <function take_from_last at 0x7f29301364d0>: 1.1727273360976462, <function take_from_first at 0x7f293019cdc0>: 1.2463864394899824, <function eleirbag at 0x7f29301375b0>: 1.0594586833335222}


In [66]:
def play_game(strategy: tuple) -> int:
    """plays a game of nim(5) and returns the winning player index"""
    nim = Nim(5)
    player = 0
    while nim:
        ply = strategy[player](nim)
        nim.nimming(ply)
        player = 1 - player
    return player
    

In [67]:
def getratio(adaptive_obj: adaptive_class, m_half) -> float: 
    """return winning ratio of an adaptive strategy playing 100 games vs gabriele"""
    wins = [0, 0]
    strategy = (adaptive_obj.adaptive, gabriele)
    for _ in range(m_half):
        adaptive_obj.reset_strategy()
        wins[play_game(strategy)] += 1
    strategy = (gabriele, adaptive_obj.adaptive) 
    for _ in range(m_half):
        adaptive_obj.reset_strategy()
        wins[1 - play_game(strategy)] += 1   
    return wins[0] / (wins[1] + 1)

## Adapting the strategy
 how does it work:
  - we create $N$ copies of the original adaptive object
  - for each copy, we play $M$ games against gabriele ($M/2$ going first, $M/2$ going second)
  - we measure the winning rate = wins / total games
  - we measure how many are better to apply the 1/5 rule
  - we take the best among the $N/2$ and tweak the stdev based on 1/5 rule
  - repeat many times and see the result

In [68]:
adaptive_obj = adaptive_class()
ratio_prev = getratio(adaptive_obj, 50)
print(ratio_prev)

1.525


You can run the cell below multiple times to check where is the genome going

In [69]:
N = 10
M_HALF = 50
USE_PLUS = True             #put this to False if a comma startegy is needed (maybe increase N if using comma strat)
ITERATIONS = 100

for _ in range(ITERATIONS):
    offspring = []
    ratios = []
    for i in range(N):
        offspring.append(deepcopy(adaptive_obj))
        offspring[i].tweak()
        ratios.append(getratio(offspring[i], M_HALF))

    ratios = np.array(ratios)
    best_index = ratios.argmax()
    if USE_PLUS:
        if ratios[best_index] > ratio_prev:
            adaptive_obj = offspring[best_index]  
            ratio_prev = ratios[best_index]
    else:    
        adaptive_obj = offspring[best_index]  
    adaptive_obj.apply_rule((np.array(ratios) > ratio_prev).sum() / N)
    ratio_prev = ratios.max()

tmp = list(adaptive_obj.genome.values())
print('probabilities: ', np.array(tmp) / sum(tmp))
print('standard deviation: ', adaptive_obj.stdev)
print(adaptive_obj.genome)

probabilities:  [0.53210103 0.01782441 0.00839372 0.22886896 0.21281188]
standard deviation:  0.026523911178950636
{<function real_optimal at 0x7f2930135900>: 2.6350346280426233, <function pure_random at 0x7f2930136440>: 0.08826885626976655, <function take_from_last at 0x7f29301364d0>: 0.04156679693405979, <function take_from_first at 0x7f293019cdc0>: 1.1333893406020343, <function eleirbag at 0x7f29301375b0>: 1.0538725409013103}


## Self-adaptive class
 the simulation is ran again, this time sigma is a learnable parameter

In [76]:
class self_adaptive_class:
    def __init__(self, lr, stdev = 1) -> None:
        """this strategy that can adapt its parameters"""
        self.strategy = real_optimal
        self.stdev = stdev
        self.lr = lr
        self.genome = {real_optimal: 0, pure_random: 0, take_from_last: 0, take_from_first: 0, eleirbag: 0}
        for k in self.genome.keys():
            self.genome[k] = 1

    def tweak(self) -> None:
        """randomly tweaks the genome and stdev of the object"""
        self.stdev *= np.exp((self.lr * np.random.normal(0, 1))) 
        for k in self.genome.keys():
            self.genome[k] += np.random.normal(0, self.stdev ** 2) 
            self.genome[k] = 0 if self.genome[k] < 0 else self.genome[k] 

    def reset_strategy(self) -> None:
        """resets the strategy to be used, call this before every match"""
        a_random = random.random() * sum(self.genome.values())
        k = real_optimal
        for k in self.genome.keys():
            a_random -= self.genome[k]
            if a_random <= 0:
                self.strategy = k
                break 

    def adaptive(self, state: Nim) -> Nimply:
        """play a move using the chosen strategy"""
        return self.strategy(state)  

adaptive_obj = self_adaptive_class(5)
adaptive_obj.adaptive(Nim(5))
print(adaptive_obj.genome, adaptive_obj.stdev)
adaptive_obj.tweak()
print(adaptive_obj.genome, adaptive_obj.stdev)

{<function real_optimal at 0x7f2930135900>: 1, <function pure_random at 0x7f2930136440>: 1, <function take_from_last at 0x7f29301364d0>: 1, <function take_from_first at 0x7f293019cdc0>: 1, <function eleirbag at 0x7f29301375b0>: 1} 1
{<function real_optimal at 0x7f2930135900>: 0, <function pure_random at 0x7f2930136440>: 0, <function take_from_last at 0x7f29301364d0>: 49353.208204618284, <function take_from_first at 0x7f293019cdc0>: 0, <function eleirbag at 0x7f29301375b0>: 45317.21671139393} 526.9556597152076


In [77]:
adaptive_obj = self_adaptive_class(lr = 1 / np.sqrt(5))
ratio_prev = getratio(adaptive_obj, 50)
print(ratio_prev)

2.0606060606060606


In [78]:
for _ in range(ITERATIONS):
    offspring = []
    ratios = []
    for i in range(N):
        offspring.append(deepcopy(adaptive_obj))
        offspring[i].tweak()
        ratios.append(getratio(offspring[i], M_HALF))

    ratios = np.array(ratios)
    best_index = ratios.argmax()
    if USE_PLUS:
        if ratios[best_index] > ratio_prev:
            adaptive_obj = offspring[best_index]  
            ratio_prev = ratios[best_index]
    else:    
        adaptive_obj = offspring[best_index]  
    ratio_prev = ratios.max()

tmp = list(adaptive_obj.genome.values())
print('probabilities: ', np.array(tmp) / sum(tmp))
print('standard deviation: ', adaptive_obj.stdev)
print(adaptive_obj.genome)

probabilities:  [0.98586821 0.00195945 0.         0.00471851 0.00745383]
standard deviation:  0.21987279648407201
{<function real_optimal at 0x7f2930135900>: 29.299210723286535, <function pure_random at 0x7f2930136440>: 0.0582331859255496, <function take_from_last at 0x7f29301364d0>: 0, <function take_from_first at 0x7f293019cdc0>: 0.14023033801818463, <function eleirbag at 0x7f29301375b0>: 0.22152195624907278}
