Copyright **`(c)`** 2022 Giovanni Squillero `<squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free for personal or classroom use; see [`LICENSE.md`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  


# Lab 2: ES

## Task

Write agents able to play [*Nim*](https://en.wikipedia.org/wiki/Nim), with an arbitrary number of rows and an upper bound $k$ on the number of objects that can be removed in a turn (a.k.a., *subtraction game*).

The goal of the game is to **avoid** taking the last object.

* Task2.1: An agent using fixed rules based on *nim-sum* (i.e., an *expert system*)
* Task2.2: An agent using evolved rules using ES

## Instructions

* Create the directory `lab2` inside the course repo 
* Put a `README.md` and your solution (all the files, code and auxiliary data if needed)

## Notes

* Working in group is not only allowed, but recommended (see: [Ubuntu](https://en.wikipedia.org/wiki/Ubuntu_philosophy) and [Cooperative Learning](https://files.eric.ed.gov/fulltext/EJ1096789.pdf)). Collaborations must be explicitly declared in the `README.md`.
* [Yanking](https://www.emacswiki.org/emacs/KillingAndYanking) from the internet is allowed, but sources must be explicitly declared in the `README.md`.



In [1]:
import logging
import sys
from pprint import pprint, pformat
from collections import namedtuple
import random
from copy import deepcopy
import numpy as np
from tqdm import tqdm


## The *Nim* and *Nimply* classes

In [2]:
Nimply = namedtuple("Nimply", "row, num_objects")


In [3]:
class Nim:
    def __init__(self, num_rows: int, k: int = None) -> None: 
        self._rows = [i * 2 + 1 for i in range(num_rows)]
        self._k = k

    def __bool__(self): 
        return sum(self._rows) > 0

    def __str__(self): 
        return "<" + " ".join(str(_) for _ in self._rows) + ">"

    @property 
    def rows(self) -> tuple: 
        return tuple(self._rows)

    def nimming(self, ply: Nimply) -> None:
        row, num_objects = ply
        assert self._rows[row] >= num_objects
        assert self._k is None or num_objects <= self._k
        self._rows[row] -= num_objects


## Sample (and silly) startegies 

In [4]:
def pure_random(state: Nim, genomes = None) -> Nimply:
    """A completely random move"""
    row = random.choice([r for r, c in enumerate(state.rows) if c > 0])
    num_objects = random.randint(1, state.rows[row])
    return Nimply(row, num_objects)


In [5]:
def gabriele(state: Nim, genomes = None) -> Nimply:
    """Pick always the maximum possible number of the lowest row"""
    possible_moves = [(r, o) for r, c in enumerate(state.rows) for o in range(1, c + 1)]
    return Nimply(*max(possible_moves, key=lambda m: (-m[0], m[1])))


In [6]:

def nim_sum(state: Nim) -> int: 
    tmp = np.array([tuple(int(x) for x in f"{c:032b}") for c in state.rows])
    xor = tmp.sum(axis=0) % 2
    return int("".join(str(_) for _ in xor), base=2)


def analize(raw: Nim) -> dict: 
    cooked = dict()
    cooked["possible_moves"] = dict()
    for ply in (Nimply(r, o) for r, c in enumerate(raw.rows) for o in range(1, c + 1)):
        tmp = deepcopy(raw)
        tmp.nimming(ply)
        cooked["possible_moves"][ply] = nim_sum(tmp)
    return cooked


def optimal(state: Nim, genomes = None) -> Nimply: 
    analysis = analize(state)
    logging.debug(f"analysis:\n{pformat(analysis)}")
    spicy_moves = [ply for ply, ns in analysis["possible_moves"].items() if ns != 0]
    if not spicy_moves:
        spicy_moves = list(analysis["possible_moves"].keys())
    ply = random.choice(spicy_moves)
    return ply

## Task 2.1
We always play a move in order to have a nim-sum value of 0. 

This strategy "optimalgabriele" is stronger than the "optimal" because in the last step, when there is only a row with some matches, it tries to leave only one match to the opponent (and in this way winning). Instead with the "optimal" strategy the player makes a move only to go to another safe configuration, without thinking about closing the game at the end.

The following function implements this winning strategy

In [7]:
def optimal_gabriele(state: Nim, genomes = None) -> Nimply: 
    analysis = analize(state)
    logging.debug(f"analysis:\n{pformat(analysis)}")
    spicy_moves = [ply for ply, ns in analysis["possible_moves"].items() if ns != 0]
    if not spicy_moves:
        spicy_moves = list(analysis["possible_moves"].keys())
    return Nimply(*max(spicy_moves, key=lambda m: (-m[0], m[1])))

## Oversimplified match

In [8]:
logging.getLogger().setLevel(logging.INFO)

strategy = (pure_random, optimal_gabriele)

nim = Nim(5)
logging.info(f"init : {nim}")
player = 0
while nim:
    ply = strategy[player](nim)
    logging.info(f"ply: player {player} plays {ply}")
    nim.nimming(ply)
    logging.info(f"status: {nim}")
    player = 1 - player
logging.info(f"status: Player {player} won!")

INFO:root:init : <1 3 5 7 9>
INFO:root:ply: player 0 plays Nimply(row=1, num_objects=2)
INFO:root:status: <1 1 5 7 9>
INFO:root:ply: player 1 plays Nimply(row=0, num_objects=1)
INFO:root:status: <0 1 5 7 9>
INFO:root:ply: player 0 plays Nimply(row=2, num_objects=1)
INFO:root:status: <0 1 4 7 9>
INFO:root:ply: player 1 plays Nimply(row=1, num_objects=1)
INFO:root:status: <0 0 4 7 9>
INFO:root:ply: player 0 plays Nimply(row=4, num_objects=9)
INFO:root:status: <0 0 4 7 0>
INFO:root:ply: player 1 plays Nimply(row=2, num_objects=4)
INFO:root:status: <0 0 0 7 0>
INFO:root:ply: player 0 plays Nimply(row=3, num_objects=4)
INFO:root:status: <0 0 0 3 0>
INFO:root:ply: player 1 plays Nimply(row=3, num_objects=2)
INFO:root:status: <0 0 0 1 0>
INFO:root:ply: player 0 plays Nimply(row=3, num_objects=1)
INFO:root:status: <0 0 0 0 0>
INFO:root:status: Player 1 won!


## Several matches to collect results

In [9]:
counter=0
NUM_GAMES=2000
player0_win=0
while counter<NUM_GAMES:
    strategy = (optimal, optimal_gabriele)
    nim = Nim(5)
    player = 0
    while nim:
        ply = strategy[player](nim)
        nim.nimming(ply)
        player = 1 - player
    if player==0:
        player0_win+=1
    counter+=1

    
print(f"player_0 won:{player0_win}  , player_1 won: {NUM_GAMES-player0_win} in {counter} iterations" )
print(f"player_0 win_rate: {round((player0_win/counter)*100,2)}%  , player_1 win_rate: {round(((NUM_GAMES-player0_win)/counter)*100,2)}% " )


player_0 won:599  , player_1 won: 1401 in 2000 iterations
player_0 win_rate: 29.95%  , player_1 win_rate: 70.05% 


## Task 2.2
Implementation of ES strategies

In [10]:
def find_genomes():
    pre = np.random.normal(loc=0, scale=1, size=2) 
    val = 0.5 * (1 + np.tanh(pre / np.sqrt(2)))
    print(val)
    return val

def choose_element(vector, alpha):
    alpha = max(0, min(alpha, 1)) #Ensure that alpha is between 0 and 1
    index = int(alpha * (len(vector)))# Calculate the index based on the value of alpha
    if index >= len(vector)-1:
        index = len(vector) - 1
    if index < 0:
        index = 0
    chosen_element = vector[index]# Return the corresponding element
    return chosen_element


In [11]:
nim = Nim(5)
genomes = find_genomes()

def adaptive(state: Nim, genomes) -> Nimply:
    """Is it a random move?"""
    indices = [i for i, elemento in enumerate(state.rows) if elemento != 0] 
    row = choose_element(indices, genomes[0]) 
    closest_genome = genomes[0] 
    while state.rows[row] == 0:
        closest_genome += random.uniform(-0.25, 0.25)
        row = choose_element(indices, closest_genome)
    if (state.rows[row] == 1):
        num_objects = 1
    else:
        num_objects = choose_element(range(1, state.rows[row]), genomes[1])
    return Nimply(row, num_objects) 

[0.21480928 0.76110614]


In [12]:
def play_game(strat, genomes):

    strategy = (strat, pure_random)

    nim = Nim(3)
    player = 0
    while nim:
        ply = strategy[player](nim, genomes)
        nim.nimming(ply)
        player = 1 - player
    return player

def fitness(strategy, genomes, num_games=100):
    wins = 0
    for _ in range(num_games):
        if play_game(strategy, genomes):
            wins += 1
    return (num_games - wins) / num_games

In [13]:
def mutation(genome, mu=0, sigma=np.array([0.1, 0.1])):
    gaussian = np.array([np.random.normal(mu, sigma[0], 1), np.random.normal(mu, sigma[1], 1)])
    child = np.array([0,0])

    child[0] = genome[0] + gaussian[0][0]
    child[1] = genome[1] + gaussian[1][0]

    if child[0] >= 1:
        child[0] = 0.999
    elif child[0] < 0:
        child[0] = 0

    if child[1] >= 1:
        child[1] = 0.999
    elif child[1] < 0:
        child[1] = 0
    
    return child

## (1 + $\lambda$)-ES

In [14]:
def develop(generations=100, population=100, mu=0, sigma=0.1):
    genomes = find_genomes()
    parent_gen = (genomes, fitness(adaptive, genomes))

    for _ in tqdm(range(generations), file=sys.stdout):
        best_gen = ({"row": 0, "elements": 0}, 0)
        for i in range(population):
            
            child_gen = mutation(parent_gen[0], mu, np.array([sigma, sigma]))
            evaluation = fitness(adaptive, child_gen)
            if evaluation > best_gen[1] or i == 0:
                best_gen = (child_gen, evaluation)

        if best_gen[1] > parent_gen[1]:
            parent_gen = best_gen
            print(parent_gen)

In [15]:
develop(generations=100, population=100, mu=0.1, sigma=0.2) 

[0.83005921 0.04646298]
  0%|          | 0/100 [00:00<?, ?it/s]

(array([0, 0]), 0.73)
  3%|▎         | 3/100 [00:02<01:03,  1.52it/s](array([0, 0]), 0.74)
  7%|▋         | 7/100 [00:04<00:57,  1.61it/s](array([0, 0]), 0.75)
 14%|█▍        | 14/100 [00:08<00:46,  1.84it/s](array([0, 0]), 0.77)
 27%|██▋       | 27/100 [00:16<00:45,  1.62it/s](array([0, 0]), 0.78)
 42%|████▏     | 42/100 [00:26<00:36,  1.60it/s](array([0, 0]), 0.79)
100%|██████████| 100/100 [00:46<00:00,  2.16it/s]


## (1,$\lambda$)-ES

In [16]:
def develop(generations=100, population=100, mu=0, sigma=0.1):
    genomes = find_genomes()
    parent_gen = (genomes, fitness(adaptive, genomes))

    for _ in tqdm(range(generations), file=sys.stdout):
        best_gen = ({"row": 0, "elements": 0}, 0)
        for i in range(population):
            
            child_gen = mutation(parent_gen[0], mu, np.array([sigma, sigma]))
            evaluation = fitness(adaptive, child_gen)
            if evaluation > best_gen[1] or i == 0:
                best_gen = (child_gen, evaluation)

        parent_gen = best_gen
        print(parent_gen)

In [17]:
develop(generations=100, population=100, mu=0.1, sigma=0.2)

[0.2234094  0.54099761]
  0%|          | 0/100 [00:00<?, ?it/s](array([0, 0]), 0.72)
  1%|          | 1/100 [00:00<00:34,  2.83it/s](array([0, 0]), 0.74)
  2%|▏         | 2/100 [00:00<00:27,  3.50it/s](array([0, 0]), 0.74)
  3%|▎         | 3/100 [00:00<00:24,  3.92it/s](array([0, 0]), 0.72)
  4%|▍         | 4/100 [00:01<00:23,  4.14it/s](array([0, 0]), 0.7)
  5%|▌         | 5/100 [00:01<00:23,  4.09it/s](array([0, 0]), 0.73)
  6%|▌         | 6/100 [00:01<00:24,  3.85it/s](array([0, 0]), 0.7)
  7%|▋         | 7/100 [00:01<00:25,  3.58it/s](array([0, 0]), 0.74)
  8%|▊         | 8/100 [00:02<00:26,  3.43it/s](array([0, 0]), 0.74)
  9%|▉         | 9/100 [00:02<00:27,  3.31it/s](array([0, 0]), 0.72)
 10%|█         | 10/100 [00:02<00:27,  3.31it/s](array([0, 0]), 0.73)
 11%|█         | 11/100 [00:03<00:25,  3.46it/s](array([0, 0]), 0.72)
 12%|█▏        | 12/100 [00:03<00:24,  3.65it/s](array([0, 0]), 0.72)
 13%|█▎        | 13/100 [00:03<00:22,  3.82it/s](array([0, 0]), 0.74)
 14%|█▍        |