Copyright **`(c)`** 2022 Giovanni Squillero `<squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free for personal or classroom use; see [`LICENSE.md`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  


# Lab 2: ES

## Task

Write agents able to play [*Nim*](https://en.wikipedia.org/wiki/Nim), with an arbitrary number of rows and an upper bound $k$ on the number of objects that can be removed in a turn (a.k.a., *subtraction game*).

The goal of the game is to **avoid** taking the last object.

* Task2.1: An agent using fixed rules based on *nim-sum* (i.e., an *expert system*)
* Task2.2: An agent using evolved rules using ES

## Instructions

* Create the directory `lab2` inside your personal course repository for the course 
* Put a `README.md` and your solution (all the files, code and auxiliary data if needed)

## Notes

* Working in group is not only allowed, but recommended (see: [Ubuntu](https://en.wikipedia.org/wiki/Ubuntu_philosophy) and [Cooperative Learning](https://files.eric.ed.gov/fulltext/EJ1096789.pdf)). Collaborations must be explicitly declared in the `README.md`.
* [Yanking](https://www.emacswiki.org/emacs/KillingAndYanking) from the internet is allowed, but sources must be explicitly declared in the `README.md`.



In [129]:
import logging
from pprint import pprint, pformat
from collections import namedtuple
import random
from copy import deepcopy
import numpy as np
from tqdm.notebook import tqdm

n_rules=3


## The *Nim* and *Nimply* classes

In [130]:
Nimply = namedtuple("Nimply", "row, num_objects")

In [131]:
class Nim:
    def __init__(self, num_rows: int, k: int = None) -> None:
        self._rows = [i * 2 + 1 for i in range(num_rows)]
        self._k = k

    def __bool__(self):
        return sum(self._rows) > 0

    def __str__(self):
        return "<" + " ".join(str(_) for _ in self._rows) + ">"

    @property
    def rows(self) -> tuple:
        return tuple(self._rows)

    def nimming(self, ply: Nimply) -> None:
        row, num_objects = ply
        assert self._rows[row] >= num_objects
        assert self._k is None or num_objects <= self._k
        self._rows[row] -= num_objects


## Sample (and silly) startegies 

In [132]:
def pure_random(state: Nim) -> Nimply:
    """A completely random move"""
    row = random.choice([r for r, c in enumerate(state.rows) if c > 0])
    num_objects = random.randint(1, state.rows[row])
    return Nimply(row, num_objects)


In [133]:
def gabriele(state: Nim) -> Nimply:
    """Pick always the maximum possible number of the lowest row"""
    possible_moves = [(r, o) for r, c in enumerate(state.rows) for o in range(1, c + 1)]
    return Nimply(*max(possible_moves, key=lambda m: (-m[0], m[1])))


In [134]:
def adaptive(state: Nim) -> Nimply:
    """A strategy that can adapt its parameters"""
    genome = {"love_small": 0.5}


In [135]:
import numpy as np


def nim_sum(state: Nim) -> int:
    tmp = np.array([tuple(int(x) for x in f"{c:032b}") for c in state.rows])
    xor = tmp.sum(axis=0) % 2
    return int("".join(str(_) for _ in xor), base=2)


def analize(raw: Nim) -> dict:
    cooked = dict()
    cooked["possible_moves"] = dict()
    for ply in (Nimply(r, o) for r, c in enumerate(raw.rows) for o in range(1, c + 1)):
        tmp = deepcopy(raw)
        tmp.nimming(ply)
        cooked["possible_moves"][ply] = nim_sum(tmp)
    return cooked


def optimal(state: Nim) -> Nimply:
    analysis = analize(state)
    logging.debug(f"analysis:\n{pformat(analysis)}")
    spicy_moves = [ply for ply, ns in analysis["possible_moves"].items() if ns != 0]
    if not spicy_moves:
        spicy_moves = list(analysis["possible_moves"].keys())
    ply = random.choice(spicy_moves)
    return ply


## Self-Adaptive (μ+λ) ES-Training

In [136]:

def mutate(ind):
    new_prob=np.abs(np.random.normal(loc=ind[0],size=n_rules,scale=ind[2]))
    return [new_prob,0,ind[2]]
    
def update_sigma(ind):
    new_sigma=np.abs(np.random.normal(loc=ind[2],scale=0.2))
    return [ind[0],ind[1],new_sigma]

def select_best(pop,mu):
   return sorted(pop,key=lambda e:e[1],reverse=True)[0:mu]

def reset_fitness(pop):
    for i,p in enumerate(pop):
        pop[i][1]=0
    return pop

def get_max(pop):
    fitness_val=[]
    for i,el in enumerate(pop):
        fitness_val.append(pop[i][1])
    best_fit=np.argmax(fitness_val)
    return pop[best_fit]



In [137]:
def make_move1(ind,state)->Nimply:
    # normalizing to transform in probabilities
    ind[0]=ind[0]/sum(ind[0])
    move=np.random.choice(len(ind[0]),p=ind[0])

    if move==0:
        # remove a random number of matches from the lowest row available
        row=np.nonzero(state.rows)[0]
        index=row[len(row)-1]
        max_obj=state.rows[index]
        if max_obj>1:
            num_obj=np.random.randint(1,max_obj+1)
        else:
            num_obj=1
        return Nimply(index,num_obj)
    elif move==1:
        # removes one match from the higher row available
        row=np.nonzero(state.rows)
        return Nimply(row[0][0],1)
    elif move==2:
        # removes the maximum number of matches from a random row
        index=np.nonzero(state.rows)[0]
        row= np.random.choice(index)
        return Nimply(row,state.rows[row])
    elif move==3:
        # removes an odd number of matches from a random row 
        index=np.nonzero(state.rows)[0]
        row=np.random.choice(index)
        if state.rows[row]>1:
            num_obj=np.random.randint(1,state.rows[row]+1)
        else:
            num_obj=1
        return Nimply(row,num_obj)
    


In [138]:
def make_move2(ind,state)->Nimply:
    # normalizing to transform in probabilities
    ind[0]=ind[0]/sum(ind[0])
    move=np.random.choice(3,p=ind[0])
    
    if move==0:
        # remove a random number of matches from the lowest row available
        row=np.nonzero(state.rows)[0]
        index=row[len(row)-1]
        max_obj=state.rows[index]
        if max_obj>1:
            num_obj=np.random.randint(1,max_obj+1)
        else:
            num_obj=1
        return Nimply(index,num_obj)
    elif move==1:
        # removes one match from the higher row available
        row=np.nonzero(state.rows)
        return Nimply(row[0][0],1)
    elif move==2:
        # golden rule
        analysis = analize(state)
        spicy_moves = [ply for ply, ns in analysis["possible_moves"].items() if ns != 0]
        if not spicy_moves:
            spicy_moves = list(analysis["possible_moves"].keys())
        ply = random.choice(spicy_moves)
        return ply
        
    

In [139]:
def play_game(ind,my_strategy):
    # logging.getLogger().setLevel(logging.INFO)
    
    strategy = (my_strategy, optimal)

    nim = Nim(5)
    # logging.info(f"init : {nim}")
    player = 0
    while nim:
        if player==0:
            ply = strategy[player](ind,nim)
        else:
            ply = strategy[player](nim)
        # logging.info(f"ply: player {player} plays {ply}")
        nim.nimming(ply)
        # logging.info(f"status: {nim}")
        player = 1 - player
    if player==0:
        ind[1]+=1
    # logging.info(f"status: Player {player} won!")


In [140]:
n_gen=5
n_games=100
mu=10
λ=40
best=0

#  initial probabilities,fitness,sigma
pop=[[[ np.float64(1/n_rules) for _ in range(n_rules)],0,1] for m in range(mu)]

for gen in tqdm(range(n_gen)):
    pop=reset_fitness(pop)
    
    for p in range(len(pop)):
        pop[p]=update_sigma(pop[p])
        for i in range(λ//mu):
            pop.append(mutate(pop[p]))

    for ind in pop:
        for game in range(n_games):
            # play a game
            play_game(ind,make_move2)
    
    pop=select_best(pop,mu)
    gen_best=get_max(pop)
    best=gen_best if best==0 or gen_best[1]>best[1] else best
pprint(best)


  0%|          | 0/5 [00:00<?, ?it/s]

[array([0.05875954, 0.19680397, 0.7444365 ]), 51, 1.0338522370398953]
