Copyright **`(c)`** 2022 Giovanni Squillero `<squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free for personal or classroom use; see [`LICENSE.md`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  


# Lab 3: ES

## Task

Write agents able to play [*Nim*](https://en.wikipedia.org/wiki/Nim), with an arbitrary number of rows and an upper bound $k$ on the number of objects that can be removed in a turn (a.k.a., *subtraction game*).

The goal of the game is to **avoid** taking the last object.

* Task2.1: An agent using fixed rules based on *nim-sum* (i.e., an *expert system*)
* Task2.2: An agent using evolved rules using ES

## Instructions

* Create the directory `lab2` inside your personal course repository for the course 
* Put a `README.md` and your solution (all the files, code and auxiliary data if needed)

## Notes

* Working in group is not only allowed, but recommended (see: [Ubuntu](https://en.wikipedia.org/wiki/Ubuntu_philosophy) and [Cooperative Learning](https://files.eric.ed.gov/fulltext/EJ1096789.pdf)). Collaborations must be explicitly declared in the `README.md`.
* [Yanking](https://www.emacswiki.org/emacs/KillingAndYanking) from the internet is allowed, but sources must be explicitly declared in the `README.md`.



In [18]:
import logging
import sys
from pprint import pprint, pformat
from collections import namedtuple
import random
from copy import deepcopy
import math
import numpy as np
from tqdm import tqdm

## The *Nim* and *Nimply* classes

In [3]:
Nimply = namedtuple("Nimply", "row, num_objects")

In [4]:
class Nim:
    def __init__(self, num_rows: int, k: int = None) -> None:
        self._rows = [i * 2 + 1 for i in range(num_rows)]
        self._k = k

    def __bool__(self):
        return sum(self._rows) > 0

    def __str__(self):
        return "<" + " ".join(str(_) for _ in self._rows) + ">"

    @property
    def rows(self) -> tuple:
        return tuple(self._rows)

    def nimming(self, ply: Nimply) -> None:
        row, num_objects = ply
        assert self._rows[row] >= num_objects
        assert self._k is None or num_objects <= self._k
        self._rows[row] -= num_objects


## Sample (and silly) startegies 

In [5]:
def pure_random(state: Nim, genome=None) -> Nimply:
    """A completely random move"""
    row = random.choice([r for r, c in enumerate(state.rows) if c > 0])
    num_objects = random.randint(1, state.rows[row])
    return Nimply(row, num_objects)


In [6]:
def gabriele(state: Nim, genome=None) -> Nimply:
    """Pick always the maximum possible number of the lowest row"""
    possible_moves = [(r, o) for r, c in enumerate(state.rows) for o in range(1, c + 1)]
    return Nimply(*max(possible_moves, key=lambda m: (-m[0], m[1])))


In [7]:
def adaptive(state: Nim, genome) -> Nimply:
    """A strategy that can adapt its parameters"""
    rows = state.rows
    non_zero = np.array(rows).astype(bool).sum()
    row_t = math.floor(non_zero*genome["row"])
    row_a = -1
    for i in range(len(rows)):
        if rows[i] > 0:
            row_a += 1
        if row_a == row_t:
            row_a = i
            break
    return Nimply(row_a, max(math.floor(rows[row_a]*genome["elements"]), 1))

In [8]:
def nim_sum(state: Nim) -> int:
    tmp = np.array([tuple(int(x) for x in f"{c:032b}") for c in state.rows])
    xor = tmp.sum(axis=0) % 2
    return int("".join(str(_) for _ in xor), base=2)


def analize(raw: Nim) -> dict:
    cooked = dict()
    cooked["possible_moves"] = dict()
    for ply in (Nimply(r, o) for r, c in enumerate(raw.rows) for o in range(1, c + 1)):
        tmp = deepcopy(raw)
        tmp.nimming(ply)
        cooked["possible_moves"][ply] = nim_sum(tmp)
    return cooked


def optimal(state: Nim, genome=None) -> Nimply:
    analysis = analize(state)
    logging.debug(f"analysis:\n{pformat(analysis)}")
    spicy_moves = [ply for ply, ns in analysis["possible_moves"].items() if ns != 0]
    if not spicy_moves:
        spicy_moves = list(analysis["possible_moves"].keys())
    ply = random.choice(spicy_moves)
    return ply

## Oversimplified match

In [8]:
'''logging.getLogger().setLevel(logging.INFO)

strategy = (optimal, adaptive)

nim = Nim(5)
logging.info(f"init : {nim}")
player = 0
while nim:
    ply = strategy[player](nim)
    logging.info(f"ply: player {player} plays {ply}")
    nim.nimming(ply)
    logging.info(f"status: {nim}")
    player = 1 - player
logging.info(f"status: Player {player} won!")'''


'logging.getLogger().setLevel(logging.INFO)\n\nstrategy = (optimal, adaptive)\n\nnim = Nim(5)\nlogging.info(f"init : {nim}")\nplayer = 0\nwhile nim:\n    ply = strategy[player](nim)\n    logging.info(f"ply: player {player} plays {ply}")\n    nim.nimming(ply)\n    logging.info(f"status: {nim}")\n    player = 1 - player\nlogging.info(f"status: Player {player} won!")'

## Fitness and tweak functions

In [9]:
def fitness(current_genome, strategy, games=1000):
    wins = 0
    for _ in range(games):
        nim = Nim(5)
        player = 0
        while nim:
            ply = strategy[player](nim, current_genome)
            nim.nimming(ply)
            player = 1 - player
        if player == 1:
            wins += 1
    return float(wins)/games


def tweak(genome, mu=0, sigma=np.array([0.1, 0.1])):
    dist = np.array([np.random.normal(mu, sigma[0], 1), np.random.normal(mu, sigma[1], 1)])

    candidate = {k: v for k, v in genome.items()}
    candidate["row"] += dist[0][0]
    candidate["elements"] += dist[1][0]

    if candidate["row"] >= 1:
        candidate["row"] = 0.999
    elif candidate["row"] < 0:
        candidate["row"] = 0

    if candidate["elements"] >= 1:
        candidate["elements"] = 0.999
    elif candidate["elements"] < 0:
        candidate["elements"] = 0
    
    return candidate

## Training utility function

In [39]:
def train(strategy, generations=100, population_size=100, mu=0, sigma=0.1, sigma2=0.01, lr=1/math.sqrt(2), self_adaptive=0, plus=True):
    row = np.random.rand()
    elements = np.random.rand()
    if self_adaptive:
        sigma = np.random.rand()
    parent_genome = ({"row": row, "elements": elements}, fitness({"row": row, "elements": elements}, strategy))

    for _ in tqdm(range(generations), file=sys.stdout):
        best_genome = ({"row": 0, "elements": 0}, 0)
        new_sigma = np.array([0, 0])
        for i in range(population_size):
            if self_adaptive == 2:
                child_sigma = sigma*np.exp(lr*np.random.normal(sigma, sigma2, self_adaptive)) 
            elif self_adaptive == 1:
                s = np.random.normal(sigma if type(sigma)==float else sigma[0], sigma2, self_adaptive)
                child_sigma = sigma*np.exp(lr*np.array([s[0], s[0]])) #if type(sigma)==float else np.dot(sigma, np.exp(lr*np.array([s[0], s[0]])))
            child_genome = tweak(parent_genome[0], mu, np.array([sigma, sigma]) if not self_adaptive else child_sigma)
            fit = fitness(child_genome, strategy)
            if fit > best_genome[1] or i == 0:
                best_genome = (child_genome, fit)
                if self_adaptive:
                    new_sigma = child_sigma

        if plus:
            if best_genome[1] > parent_genome[1]:
                parent_genome = best_genome
                if self_adaptive:
                    sigma = new_sigma
                print("  ", parent_genome)
        else:
            parent_genome = best_genome
            if self_adaptive:
                sigma = new_sigma
            print("  ", parent_genome)

## Some trials

In [32]:
strategy = (pure_random, adaptive)

In [23]:
# basic

train(strategy, generations=100, population_size=100, mu=0, sigma=0.1, plus=True)

  0%|          | 0/100 [00:00<?, ?it/s]   ({'row': 0.40753953997917386, 'elements': 0.9052833915795618}, 0.805)
  6%|▌         | 6/100 [00:35<09:43,  6.21s/it]   ({'row': 0.4274663940012736, 'elements': 0.9894326288110677}, 0.806)
 19%|█▉        | 19/100 [01:50<07:19,  5.43s/it]   ({'row': 0.40775311656425467, 'elements': 0.9309163060066133}, 0.809)
 24%|██▍       | 24/100 [02:18<06:59,  5.52s/it]   ({'row': 0.3582286355372122, 'elements': 0.999}, 0.819)
100%|██████████| 100/100 [09:25<00:00,  5.65s/it]


In [40]:
# self-adaptive (shared sigma)

train(strategy, generations=100, population_size=100, mu=0, sigma=0.1, sigma2=0.01, plus=True, self_adaptive=1)

  0%|          | 0/100 [00:00<?, ?it/s]   ({'row': 0.4872150846337658, 'elements': 0.9021528546355535}, 0.768)
  1%|          | 1/100 [00:06<11:07,  6.74s/it]   ({'row': 0.4799438933249305, 'elements': 0.999}, 0.784)
  2%|▏         | 2/100 [00:12<09:36,  5.88s/it]   ({'row': 0.4853228306529377, 'elements': 0.999}, 0.795)
  3%|▎         | 3/100 [00:17<08:58,  5.56s/it]   ({'row': 0.396919694621348, 'elements': 0.999}, 0.808)
 78%|███████▊  | 78/100 [07:22<01:59,  5.44s/it]   ({'row': 0.37691782820114317, 'elements': 0.9485641164840724}, 0.815)
100%|██████████| 100/100 [09:21<00:00,  5.62s/it]


In [31]:
# self-adaptive (different sigmas)

train(strategy, generations=100, population_size=100, mu=0, sigma=0.1, sigma2=0.01, plus=True, self_adaptive=2)

  0%|          | 0/100 [00:00<?, ?it/s]   ({'row': 0.2534725359715397, 'elements': 0.8989403929903849}, 0.778)
  1%|          | 1/100 [00:06<10:01,  6.08s/it]   ({'row': 0.34414663874436535, 'elements': 0.999}, 0.798)
  3%|▎         | 3/100 [00:16<08:53,  5.50s/it]   ({'row': 0.32083773034447194, 'elements': 0.999}, 0.8)
  5%|▌         | 5/100 [00:27<08:32,  5.39s/it]   ({'row': 0.3757317733915228, 'elements': 0.999}, 0.801)
  9%|▉         | 9/100 [00:48<07:55,  5.22s/it]   ({'row': 0.34848653770890253, 'elements': 0.9167249241163516}, 0.802)
 12%|█▏        | 12/100 [01:03<07:41,  5.24s/it]   ({'row': 0.39942431964823244, 'elements': 0.999}, 0.81)
 60%|██████    | 60/100 [05:28<03:27,  5.18s/it]   ({'row': 0.37296434443186366, 'elements': 0.999}, 0.815)
 80%|████████  | 80/100 [07:32<02:04,  6.23s/it]   ({'row': 0.38546181299056315, 'elements': 0.999}, 0.819)
100%|██████████| 100/100 [09:26<00:00,  5.66s/it]
