Copyright **`(c)`** 2022 Giovanni Squillero `<squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free for personal or classroom use; see [`LICENSE.md`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  


# Lab 3: ES

## Task

Write agents able to play [*Nim*](https://en.wikipedia.org/wiki/Nim), with an arbitrary number of rows and an upper bound $k$ on the number of objects that can be removed in a turn (a._k.a., *subtraction game*).

The goal of the game is to **avoid** taking the last object.

* Task2.1: An agent using fixed rules based on *nim-sum* (i.e., an *expert system*)
* Task2.2: An agent using evolved rules using ES

## Instructions

* Create the directory `lab2` inside the course repo 
* Put a `README.md` and your solution (all the files, code and auxiliary data if needed)

## Notes

* Working in group is not only allowed, but recommended (see: [Ubuntu](https://en.wikipedia.org/wiki/Ubuntu_philosophy) and [Cooperative Learning](https://files.eric.ed.gov/fulltext/EJ1096789.pdf)). Collaborations must be explicitly declared in the `README.md`.
* [Yanking](https://www.emacswiki.org/emacs/KillingAndYanking) from the internet is allowed, but sources must be explicitly declared in the `README.md`.



In [181]:
import logging
from pprint import pprint, pformat
from collections import namedtuple
import random
from copy import deepcopy
import math
import numpy as np
from tqdm.notebook import tqdm


## The *Nim* and *Nimply* classes

In [182]:
Nimply = namedtuple("Nimply", "row, num_objects")


In [183]:
class Nim:
    def __init__(self, num_rows: int, k: int = None) -> None:
        self._rows = [i * 2 + 1 for i in range(num_rows)]
        if k is not None:
            self._k=k
        else:
            self._k=sum(self.rows)

    def __bool__(self):
        return sum(self._rows) > 0

    def __str__(self):
        return "<" + " ".join(str(_) for _ in self._rows) + ">"

    @property
    def rows(self) -> tuple:
        return tuple(self._rows)

    def nimming(self, ply: Nimply) -> None:#controlla se si può fare quella mossa e la fa fare
        row, num_objects = ply
        assert self._rows[row] >= num_objects
        assert self._k is None or num_objects <= self._k
        self._rows[row] -= num_objects


## Sample (and silly) startegies 

In [184]:
def pure_random(state: Nim) -> Nimply:
    """A completely random move"""
    row = random.choice([r for r, c in enumerate(state.rows) if c > 0])
    num_objects = random.randint(1, state.rows[row])
    return Nimply(row, num_objects)


In [185]:
def gabriele(state: Nim) -> Nimply:
    """Pick always the maximum possible number of the lowest row"""
    possible_moves = [(r, o) for r, c in enumerate(state.rows) for o in range(1, c + 1)]
    return Nimply(*max(possible_moves, key=lambda m: (-m[0], m[1])))


In [186]:
def adaptive(state: Nim) -> Nimply:
    """A strategy that can adapt its parameters"""
    genome = {"love_small": 0.5}


In [187]:
import numpy as np


def nim_sum(state: Nim) -> int:
    tmp = np.array([tuple(int(x) for x in f"{c:032b}") for c in state.rows])
    xor = tmp.sum(axis=0) % 2
    return int("".join(str(_) for _ in xor), base=2)


def analize(raw: Nim) -> dict:
    cooked = dict()
    cooked["possible_moves"] = dict()
    for ply in (Nimply(r, o) for r, c in enumerate(raw.rows) for o in range(1, c + 1)):
        tmp = deepcopy(raw)
        tmp.nimming(ply)
        cooked["possible_moves"][ply] = nim_sum(tmp)
    return cooked


def optimal(state: Nim) -> Nimply:
    analysis = analize(state)
    #logging.debug(f"analysis:\n{pformat(analysis)}")
    spicy_moves = [ply for ply, ns in analysis["possible_moves"].items() if ns != 0]
    if not spicy_moves:
        spicy_moves = list(analysis["possible_moves"].keys())
    ply = random.choice(spicy_moves)
    return ply

In [188]:
def strategy_4(initial_state):
    actual_nimsum=nim_sum(initial_state)
    new_nimsum=-1
    possible_solutions=generate_unique_array_from_function(sum(initial_state.rows),initial_state)
    min=1000
    min_sol=initial_state
    for sol in possible_solutions:
        tmp=deepcopy(initial_state)
        tmp.nimming(sol)
        if nim_sum(tmp)==actual_nimsum:
            min_sol=sol
            break
        if abs(actual_nimsum-nim_sum(tmp))<min:
            min=abs(actual_nimsum-nim_sum(tmp))
            min_sol=sol
        
    return min_sol
def strategy_2(initial_state):
    return optimal(initial_state)
def strategy_3(initial_state):
    analysis = analize(initial_state)
    #logging.debug(f"analysis:\n{pformat(analysis)}")
    spicy_moves = [ply for ply, ns in analysis["possible_moves"].items() if ns == 0]
    if not spicy_moves:
        spicy_moves = list(analysis["possible_moves"].keys())
    ply = random.choice(spicy_moves)
    return ply
def strategy_1(initial_state):
    new_state = deepcopy(initial_state)

    righe_non_zero = [(indice, numero) for indice, numero in enumerate(new_state.rows) if numero != 0]

    numero_righe_non_zero = len(righe_non_zero)
    indice,bstn=righe_non_zero[0]

    if numero_righe_non_zero==1 and bstn!=1:
        #if there is 1 row left I make a different move, so i don't use evolutaionary alghoritm
        new_state.nimming(Nimply(indice,min(new_state._k,bstn-1)))
        return Nimply(indice,bstn-1)
    else:
        return optimal(initial_state)

def strategy_6(initial_state):
    row=initial_state.rows.index(max(initial_state.rows))
    return Nimply(row,random.randint(1,initial_state.rows[row]))

In [189]:
def evaluate_strategy(strategy):
    win_counter=0

    for _ in range(100):
        nim = Nim(5)
        #print(nim.rows)
        #add gabriele
        player = 0
        while nim:
            if player==0:
                ply = optimal(nim)
                nim.nimming(ply)
            else:
                ply = strategy(nim)
                nim.nimming(ply)
            player = 1 - player
        if player==1:
            win_counter+=1
        
    return win_counter

In [190]:
def tweak(state,nmax):
    #to the tweak function i pass the state that contain the row that it should modify
    row,num_object=state     
#it call posnormal that generate random gaussian number with mean the number of object that the parent want remove
    return Nimply(row,PosNormal(num_object,nmax))
 

In [191]:
def PosNormal(mean,nbastoncini):
    x = math.trunc(abs(np.random.normal(mean,nbastoncini/2)))
    return(x if x <=nbastoncini else PosNormal(mean,nbastoncini))#i call it until i find a number that has less sticks of actual state

In [192]:
from queue import PriorityQueue
def generate_unique_array_from_function(n,initial_state):
    #here i generate the population, so it is composed by dstinct member
    unique_set = set()
    unique_array = []

    while len(unique_set) < n:
        value=pure_random(initial_state)
        _,m=value
        if  value not in unique_set and m <= initial_state._k:
            unique_set.add(value)
            unique_array.append(value)

    return unique_array
def extract_ith_element(pq, i):
    if not isinstance(pq, PriorityQueue):
        raise ValueError("L'oggetto fornito non è una PriorityQueue.")

    if i < 0:
        raise ValueError("L'indice deve essere non negativo.")

    
    for _ in range(i):
        pq.get()
    ith_element = pq.get()
    for _ in range(i):
        pq.put(pq.get())
    
    return ith_element

def evolutionary_strategy(initial_state, generations, population_size):
    best_move = None
    best_fitness = float('-inf')
    new_state = deepcopy(initial_state)

    righe_non_zero = [(indice, numero) for indice, numero in enumerate(new_state.rows) if numero != 0]

    numero_righe_non_zero = len(righe_non_zero)
    indice,bstn=righe_non_zero[0]

    if numero_righe_non_zero==1 and bstn!=1:
        #if there is 1 row left I make a different move, so i don't use evolutaionary alghoritm
        new_state.nimming(Nimply(indice,min(new_state._k,bstn-1)))
        return Nimply(indice,bstn-1)
    else:
        population = generate_unique_array_from_function(min(sum(initial_state.rows),population_size),initial_state)

        for _ in range(generations):
            prior=[]

            for move in population:

                new_state = deepcopy(initial_state)
                new_state.nimming(move)
                fitness = nim_sum(new_state)

                prior.append((-fitness,move))

                extracted_element=0
                if fitness > best_fitness:
                    best_move = move
                    best_fitness = fitness 
                #i evaluate each parent and i select the bests parent
            prior_ordered= sorted(prior, key=lambda x: x[0])
            while extracted_element<=len(prior_ordered)/2:
                #from here i extract the parent and i mutate him
                # and i search the best also in the mutation of the parent 

                new_state = deepcopy(initial_state)

                cnt_=0
                while random.random()<0.7:
                     cnt_+=1
                     if(cnt_>=len(prior_ordered)):
                        cnt_=0

                el=prior_ordered.pop(cnt_)
                extracted_element+=1

                _,tmp=el
                i,v1=tmp
                counter=0
                while counter<3:
                    new_state = deepcopy(initial_state)
                    move=tweak(tmp,min(new_state._k,new_state.rows[i]))
                    new_state.nimming(move)
                    population.append(move)
                    fitness=nim_sum(new_state)


                    if fitness > best_fitness:
                        best_move = tmp
                        best_fitness = fitness
                    counter+=1                 

    return best_move


In [193]:
def weighted_random_choice( weights):
  
    pesi_normalizzati = weights - np.min(weights)
    total_peso = np.sum(pesi_normalizzati)
    probabilita_normalizzate = pesi_normalizzati / total_peso
    
    indice_selezionato = np.random.choice(len(weights), p=probabilita_normalizzate)

    return indice_selezionato
def weight_mean(weights, fitness):
    weight_summed = np.sum(weights * fitness)
    sum_weight = np.sum(weights)
    
    # Calcola la media pesata
    mean = weight_summed / sum_weight
    return mean

def rastrigin(x,strategies,tmp_array,A=10):
   
   if x.ndim==1:
    result_array = np.zeros((1,))
   

    result_array[0]=weight_mean(x,tmp_array)
    return result_array
   else:
    result_array = np.zeros((x.shape[0],))
    for i in range(x.shape[0]): 
        result_array[i]= weight_mean(x[i],tmp_array)
    
   return result_array

      
    
def generate_weights(strategies,tmp_array):
    λ = 20
    σ = 0.001

    solution = abs(np.random.random(6)) 
    history = list()
    best_so_far = np.copy(solution)

    for n in range(100000 // λ):
    # offspring <- select λ random points mutating the current solution
        offspring =abs( (
         np.random.normal(loc=0, scale=σ, size=(λ, 6))) + solution
        )
    # evaluarte and select best
        evals = rastrigin(offspring,strategies,tmp_array)
        solution = offspring[np.argmax(evals)]
    if rastrigin(best_so_far,strategies,tmp_array) < rastrigin(solution,strategies,tmp_array):
         best_so_far = np.copy(solution)
         history.append((n, rastrigin(solution,strategies,tmp_array)))
    return best_so_far




## Oversimplified match

In [194]:
logging.getLogger().setLevel(logging.INFO)

strategy = (optimal, pure_random)

nim = Nim(5)
print(nim.rows)
logging.info(f"init : {nim}")
player = 0
while nim:
    if player==0:
        ply = optimal(nim)
        logging.info(f"ply: player {player} plays {ply}")
        nim.nimming(ply)
        logging.info(f"status: {nim}")
    else:
        ply = strategy_6(nim)
        logging.info(f"ply: player {player} plays {ply}")
        nim.nimming(ply)
        logging.info(f"status: {nim}")
    player = 1 - player
logging.info(f"status: Player {player} won!")


INFO:root:init : <1 3 5 7 9>
INFO:root:ply: player 0 plays Nimply(row=2, num_objects=1)
INFO:root:status: <1 3 4 7 9>
INFO:root:ply: player 1 plays Nimply(row=4, num_objects=7)
INFO:root:status: <1 3 4 7 2>
INFO:root:ply: player 0 plays Nimply(row=3, num_objects=5)
INFO:root:status: <1 3 4 2 2>
INFO:root:ply: player 1 plays Nimply(row=2, num_objects=1)
INFO:root:status: <1 3 3 2 2>
INFO:root:ply: player 0 plays Nimply(row=3, num_objects=1)
INFO:root:status: <1 3 3 1 2>
INFO:root:ply: player 1 plays Nimply(row=1, num_objects=2)
INFO:root:status: <1 1 3 1 2>
INFO:root:ply: player 0 plays Nimply(row=2, num_objects=1)
INFO:root:status: <1 1 2 1 2>
INFO:root:ply: player 1 plays Nimply(row=2, num_objects=1)
INFO:root:status: <1 1 1 1 2>
INFO:root:ply: player 0 plays Nimply(row=2, num_objects=1)
INFO:root:status: <1 1 0 1 2>
INFO:root:ply: player 1 plays Nimply(row=4, num_objects=2)
INFO:root:status: <1 1 0 1 0>
INFO:root:ply: player 0 plays Nimply(row=1, num_objects=1)
INFO:root:status: <1 0

(1, 3, 5, 7, 9)


In [195]:
#evaluate_strategy(strategy_2)
tmp_array = np.zeros((6,))
vett_strategies=[strategy_1,strategy_2,strategy_3,strategy_4,gabriele,strategy_6]

for j in range(6) :
    tmp_array[j]=evaluate_strategy(vett_strategies[j])
print(generate_weights(vett_strategies,tmp_array))
print(tmp_array)


[1.46711882e+00 1.52372198e-03 1.42764697e-04 1.54661923e-03
 1.87937150e-04 8.90338806e-04]
[54. 45.  0. 27. 14. 43.]
