Copyright **`(c)`** 2022 Giovanni Squillero `<squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free for personal or classroom use; see [`LICENSE.md`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  


# Lab 3: ES

## Task

Write agents able to play [*Nim*](https://en.wikipedia.org/wiki/Nim), with an arbitrary number of rows and an upper bound $k$ on the number of objects that can be removed in a turn (a._k.a., *subtraction game*).

The goal of the game is to **avoid** taking the last object.

* Task2.1: An agent using fixed rules based on *nim-sum* (i.e., an *expert system*)
* Task2.2: An agent using evolved rules using ES

## Instructions

* Create the directory `lab2` inside the course repo 
* Put a `README.md` and your solution (all the files, code and auxiliary data if needed)

## Notes

* Working in group is not only allowed, but recommended (see: [Ubuntu](https://en.wikipedia.org/wiki/Ubuntu_philosophy) and [Cooperative Learning](https://files.eric.ed.gov/fulltext/EJ1096789.pdf)). Collaborations must be explicitly declared in the `README.md`.
* [Yanking](https://www.emacswiki.org/emacs/KillingAndYanking) from the internet is allowed, but sources must be explicitly declared in the `README.md`.



In [32]:
import sys
import random
import numpy as np

from collections import namedtuple
from copy import deepcopy

## The *Nim* and *Nimply* classes

In [33]:
Nimply = namedtuple("Nimply", "row, num_objects")

In [34]:
class Nim:
    def __init__(self, num_rows: int, k: int = None) -> None:
        self._rows = [i * 2 + 1 for i in range(num_rows)]
        if k is not None:
            self._k=k
        else:
            self._k=sum(self.rows)

    def __bool__(self):
        return sum(self._rows) > 0

    def __str__(self):
        return "<" + " ".join(str(_) for _ in self._rows) + ">"

    @property
    def rows(self) -> tuple:
        return tuple(self._rows)

    def nimming(self, ply: Nimply) -> None:#controlla se si può fare quella mossa e la fa fare
        row, num_objects = ply
        assert self._rows[row] >= num_objects
        assert self._k is None or num_objects <= self._k
        self._rows[row] -= num_objects

## Sample (and silly) startegies 

In [35]:
def pure_random(state: Nim) -> Nimply:
    """A completely random move"""
    row = random.choice([r for r, c in enumerate(state.rows) if c > 0])
    num_objects = random.randint(1, state.rows[row])
    return Nimply(row, num_objects)

In [36]:
def gabriele(state: Nim) -> Nimply:
    """Pick always the maximum possible number of the lowest row"""
    possible_moves = [(r, o) for r, c in enumerate(state.rows) for o in range(1, c + 1)]
    return Nimply(*max(possible_moves, key=lambda m: (-m[0], m[1])))

In [37]:
def nim_sum(state: Nim) -> int:
    tmp = np.array([tuple(int(x) for x in f"{c:032b}") for c in state.rows])
    xor = tmp.sum(axis=0) % 2
    return int("".join(str(_) for _ in xor), base=2)

def analize(raw: Nim) -> dict:
    cooked = dict()
    cooked["possible_moves"] = dict()
    for ply in (Nimply(r, o) for r, c in enumerate(raw.rows) for o in range(1, c + 1)):
        tmp = deepcopy(raw)
        tmp.nimming(ply)
        cooked["possible_moves"][ply] = nim_sum(tmp)
    return cooked

def optimal(state: Nim) -> Nimply:
    analysis = analize(state)
    spicy_moves = [ply for ply, ns in analysis["possible_moves"].items() if ns != 0]
    if not spicy_moves:
        spicy_moves = list(analysis["possible_moves"].keys())
    ply = random.choice(spicy_moves)
    return ply

## Tested Strategies

In [38]:
def strategy_1(initial_state):
    new_state = deepcopy(initial_state)
    righe_non_zero = [(indice, numero) for indice, numero in enumerate(new_state.rows) if numero != 0]
    numero_righe_non_zero = len(righe_non_zero)
    indice, bstn = righe_non_zero[0]

    if numero_righe_non_zero == 1 and bstn != 1:
        #if there is 1 row left I make a different move, so i don't use evolutaionary alghoritm
        return Nimply(indice, bstn - 1)
    else:
        return optimal(initial_state)

In [39]:
def strategy_2(initial_state):
    return optimal(initial_state)

In [40]:
def strategy_3(initial_state):
    analysis = analize(initial_state)
    spicy_moves = [ply for ply, ns in analysis["possible_moves"].items() if ns == 0]
    if not spicy_moves:
        spicy_moves = list(analysis["possible_moves"].keys())
    ply = random.choice(spicy_moves)
    return ply

In [41]:
def generate_unique_array_from_function(n, initial_state):
    #here i generate the population, so it is composed by dstinct member
    unique_set = set()
    unique_array = []

    while len(unique_set) < n:
        value = pure_random(initial_state)
        _, m = value
        if  value not in unique_set and m <= initial_state._k:
            unique_set.add(value)
            unique_array.append(value)
            
    return unique_array

def strategy_4(initial_state):
    actual_nimsum = nim_sum(initial_state)
    possible_solutions = generate_unique_array_from_function(sum(initial_state.rows), initial_state)
    min = sys.maxsize
    min_sol = initial_state
    for sol in possible_solutions:
        tmp = deepcopy(initial_state)
        tmp.nimming(sol)
        if nim_sum(tmp) == actual_nimsum:
            return sol
        if abs(actual_nimsum - nim_sum(tmp)) < min:
            min = abs(actual_nimsum - nim_sum(tmp))
            min_sol = sol
    return min_sol

In [42]:
def strategy_5(initial_state):
    return gabriele(initial_state)

In [43]:
def strategy_6(initial_state):
    row = initial_state.rows.index(max(initial_state.rows))
    return Nimply(row,random.randint(1, initial_state.rows[row]))

## Fitness Function

In [44]:
def play_nim(nim, p1, p2):
    player = 0
    while nim:
        if player == 0:
            ply = p1(nim)
        else:
            ply = p2(nim)
        nim.nimming(ply)
        player = 1 - player
    
    return player

def evaluate_strategy(strategy):
    win_counter = 0
    enemies = [optimal, pure_random]

    for e in enemies:
        for i in range(25):
            nim = Nim(5)
            winner = play_nim(nim, strategy, e)
            if winner == 0:
                win_counter += 1

    for e in enemies:
        for i in range(25):
            nim = Nim(5)
            winner = play_nim(nim, e, strategy)
            if winner == 1:
                win_counter += 1
                        
    return win_counter

In [45]:
def weighted_random_choice(weights):
    pesi_normalizzati = weights - np.min(weights)
    total_peso = np.sum(pesi_normalizzati)
    probabilita_normalizzate = pesi_normalizzati / total_peso
    indice_selezionato = np.random.choice(len(weights), p=probabilita_normalizzate)    
    return indice_selezionato

def weight_mean(weights, fitness):
    weight_summed = np.sum(weights * fitness)
    sum_weight = np.sum(weights)
    
    # Calcola la media pesata
    mean = weight_summed / sum_weight
    return mean

def fitness(x, tmp_array):
    if x.ndim == 1:
        result_array = np.zeros((1,))
        result_array[0] = weight_mean(x, tmp_array)
    else:
        result_array = np.zeros((x.shape[0],))
        for i in range(x.shape[0]): 
            result_array[i]= weight_mean(x[i], tmp_array)
   
    return result_array

## (1 + λ) - ES

In [46]:
def es_plus_strategy(strategies, σ=None, λ=20):
    strat_evals = []
    for s in strategies:
        strat_evals.append(evaluate_strategy(s))

    sol_dim = len(strategies) + 1 if σ is None else len(strategies)     #if σ is None we use adptive strategy
    solution = abs(np.random.random(sol_dim)) 
    best_so_far = np.copy(solution)

    for _ in range(100000 // λ):
        if σ is None:
            
            # offspring <- select λ random points mutating the current solution
            offspring = abs(np.random.normal(loc=0, scale=solution[sol_dim - 1], size=(λ, sol_dim)) + solution)
            # evaluate and select best
            evals = fitness(offspring[:,:sol_dim - 1], strat_evals)
            solution = offspring[np.argmax(evals)]
    
            if fitness(best_so_far[:sol_dim - 1], strat_evals) < fitness(solution[:sol_dim - 1], strat_evals):
                best_so_far = np.copy(solution)
        else:
            # offspring <- select λ random points mutating the current solution
            offspring = abs(np.random.normal(loc=0, scale=σ, size=(λ, sol_dim)) + solution)
            # evaluate and select best
            evals = fitness(offspring, strat_evals)
            solution = offspring[np.argmax(evals)]
        
            if fitness(best_so_far, strat_evals) < fitness(solution, strat_evals):
                best_so_far = np.copy(solution)
    
    if σ is None:
        best_σ = best_so_far[sol_dim - 1]
        return best_so_far[:sol_dim - 1], best_σ
    
    return best_so_far

## (1, λ) - ES

In [47]:
def es_comma_strategy(strategies, σ=None, λ=20):
    strat_evals = []
    for s in strategies:
        strat_evals.append(evaluate_strategy(s))

    sol_dim = len(strategies) + 1 if σ is None else len(strategies)     #if σ is None we use adptive strategy
    solution = abs(np.random.random(sol_dim)) 
    best_so_far = np.copy(solution)

    for _ in range(100000 // λ):
        if σ is None:
            # offspring <- select λ random points mutating the current solution
            offspring = abs(np.random.normal(loc=0, scale=solution[sol_dim - 1], size=(λ, sol_dim)) + solution)
            # evaluate and select best
            evals = fitness(offspring[:,:sol_dim - 1], strat_evals)
            solution = offspring[np.argmax(evals)]
        else:
            # offspring <- select λ random points mutating the current solution
            offspring = abs(np.random.normal(loc=0, scale=σ, size=(λ, sol_dim)) + solution)
            # evaluate and select best
            evals = fitness(offspring, strat_evals)
            solution = offspring[np.argmax(evals)]
        
        best_so_far = np.copy(solution)
    
    if σ is None:
        best_σ = best_so_far[sol_dim - 1]
        return best_so_far[:sol_dim - 1], best_σ
    
    return best_so_far

## Final Test

In [48]:
def test(strategies, weights):
    strategy = strategies[weighted_random_choice(weights)]
    return evaluate_strategy(strategy)

In [49]:
sigmas = [1, 0.1, 0.01, 0.001, 0.0001]
strategies = [strategy_1, strategy_2, strategy_3, strategy_4, strategy_5, strategy_6]

## (1 + λ) - ES (no adaptive)
print('- (1 + λ) - ES:')
for s in sigmas:
    weights = es_plus_strategy(strategies, σ=s)
    print('\tσ = {0:.3e}\t{1} wins'.format(s, test(strategies, weights)))

## (1, λ) - ES (no adaptive)
print('- (1, λ) - ES:')
for s in sigmas:
    weights = es_comma_strategy(strategies, σ=s)
    print('\tσ = {0:.3e}\t{1} wins'.format(s, test(strategies, weights)))

## (1 + λ) - ES (adaptive)
weights, σ = es_plus_strategy(strategies,λ=100)
print('- (1 + λ) - ES (adaptive):')
print('\tσ = {0:.3e}\t{1} wins'.format(σ, test(strategies, weights)))

## (1, λ) - ES (adaptive)
weights, σ= es_comma_strategy(strategies,λ=100)
print('- (1, λ) - ES (adaptive):')
print('\tσ = {0:.3e}\t{1} wins'.format(σ, test(strategies, weights)))

- (1 + λ) - ES:


	σ = 1.000e+00	59 wins
	σ = 1.000e-01	69 wins
	σ = 1.000e-02	75 wins
	σ = 1.000e-03	62 wins
	σ = 1.000e-04	46 wins
- (1, λ) - ES:
	σ = 1.000e+00	67 wins
	σ = 1.000e-01	64 wins
	σ = 1.000e-02	65 wins
	σ = 1.000e-03	62 wins
	σ = 1.000e-04	62 wins
- (1 + λ) - ES (adaptive):
	σ = 7.441e-02	69 wins
- (1, λ) - ES (adaptive):
	σ = 2.339e-67	69 wins
