Copyright **`(c)`** 2022 Giovanni Squillero `<squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free for personal or classroom use; see [`LICENSE.md`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  


# Lab 3: ES

## Task

Write agents able to play [*Nim*](https://en.wikipedia.org/wiki/Nim), with an arbitrary number of rows and an upper bound $k$ on the number of objects that can be removed in a turn (a.k.a., *subtraction game*).

The goal of the game is to **avoid** taking the last object.

* Task2.1: An agent using fixed rules based on *nim-sum* (i.e., an *expert system*)
* Task2.2: An agent using evolved rules using ES

## Instructions

* Create the directory `lab2` inside the course repo 
* Put a `README.md` and your solution (all the files, code and auxiliary data if needed)

## Notes

* Working in group is not only allowed, but recommended (see: [Ubuntu](https://en.wikipedia.org/wiki/Ubuntu_philosophy) and [Cooperative Learning](https://files.eric.ed.gov/fulltext/EJ1096789.pdf)). Collaborations must be explicitly declared in the `README.md`.
* [Yanking](https://www.emacswiki.org/emacs/KillingAndYanking) from the internet is allowed, but sources must be explicitly declared in the `README.md`.



In [50]:
import logging
from pprint import pprint, pformat
from collections import namedtuple
import random
from copy import deepcopy


## The *Nim* and *Nimply* classes

In [51]:
Nimply = namedtuple("Nimply", "row, num_objects")


In [52]:
class Nim:
    def __init__(self, num_rows: int, k: int = None) -> None:
        self._rows = [i * 2 + 1 for i in range(num_rows)]
        self._k = k

    def __bool__(self):
        return sum(self._rows) > 0

    def __str__(self):
        return "<" + " ".join(str(_) for _ in self._rows) + ">"

    @property
    def rows(self) -> tuple:
        return tuple(self._rows)

    def nimming(self, ply: Nimply) -> None:
        row, num_objects = ply
        assert self._rows[row] >= num_objects
        assert self._k is None or num_objects <= self._k
        self._rows[row] -= num_objects


## Sample (and silly) startegies 

In [53]:
def pure_random(state: Nim) -> Nimply:
    """A completely random move"""
    row = random.choice([r for r, c in enumerate(state.rows) if c > 0])
    num_objects = random.randint(1, state.rows[row])
    return Nimply(row, num_objects)


In [54]:
def gabriele(state: Nim) -> Nimply:
    """Pick always the maximum possible number of the lowest row"""
    possible_moves = [(r, o) for r, c in enumerate(state.rows) for o in range(1, c + 1)]
    return Nimply(*max(possible_moves, key=lambda m: (-m[0], m[1])))


In [55]:
def adaptive(state: Nim) -> Nimply:
    """A strategy that can adapt its parameters"""
    genome = {"love_small": 0.5}


In [56]:
import numpy as np


def nim_sum(state: Nim) -> int:
    tmp = np.array([tuple(int(x) for x in f"{c:032b}") for c in state.rows])
    xor = tmp.sum(axis=0) % 2
    return int("".join(str(_) for _ in xor), base=2)


def analize(raw: Nim) -> dict:
    cooked = dict()
    cooked["possible_moves"] = dict()
    for ply in (Nimply(r, o) for r, c in enumerate(raw.rows) for o in range(1, c + 1)):
        tmp = deepcopy(raw)
        tmp.nimming(ply)
        cooked["possible_moves"][ply] = nim_sum(tmp)
    return cooked


def optimal(state: Nim) -> Nimply:
    analysis = analize(state)
    logging.debug(f"analysis:\n{pformat(analysis)}")
    spicy_moves = [ply for ply, ns in analysis["possible_moves"].items() if ns != 0]
    if not spicy_moves:
        spicy_moves = list(analysis["possible_moves"].keys())
    ply = random.choice(spicy_moves)
    return ply


## Oversimplified match

In [57]:
logging.getLogger().setLevel(logging.INFO)

strategy = (optimal, pure_random)

nim = Nim(5)
logging.info(f"init : {nim}")
player = 0
while nim:
    ply = strategy[player](nim)
    logging.info(f"ply: player {player} plays {ply}")
    nim.nimming(ply)
    logging.info(f"status: {nim}")
    player = 1 - player
logging.info(f"status: Player {player} won!")


INFO:root:init : <1 3 5 7 9>
INFO:root:ply: player 0 plays Nimply(row=1, num_objects=2)
INFO:root:status: <1 1 5 7 9>
INFO:root:ply: player 1 plays Nimply(row=1, num_objects=1)
INFO:root:status: <1 0 5 7 9>
INFO:root:ply: player 0 plays Nimply(row=4, num_objects=5)
INFO:root:status: <1 0 5 7 4>
INFO:root:ply: player 1 plays Nimply(row=4, num_objects=4)
INFO:root:status: <1 0 5 7 0>
INFO:root:ply: player 0 plays Nimply(row=2, num_objects=5)
INFO:root:status: <1 0 0 7 0>
INFO:root:ply: player 1 plays Nimply(row=0, num_objects=1)
INFO:root:status: <0 0 0 7 0>
INFO:root:ply: player 0 plays Nimply(row=3, num_objects=4)
INFO:root:status: <0 0 0 3 0>
INFO:root:ply: player 1 plays Nimply(row=3, num_objects=1)
INFO:root:status: <0 0 0 2 0>
INFO:root:ply: player 0 plays Nimply(row=3, num_objects=1)
INFO:root:status: <0 0 0 1 0>
INFO:root:ply: player 1 plays Nimply(row=3, num_objects=1)
INFO:root:status: <0 0 0 0 0>
INFO:root:status: Player 0 won!


In [58]:
import numpy as np
import numpy.typing as npt
from typing import List, Tuple, Optional, Callable, Union
from itertools import chain, combinations
from functools import reduce
from tqdm import tqdm

class Agent:

    def __init__(self, num_rows: int, params: Optional[np.array] = None, k: Optional[int] = None) -> None:
        """
        num_rows: number of rows of the game
        params: initial parameters
        k: largest number of piles it can take
        """
        if params is None:
            self._params = np.random.normal(size=num_rows)
        else:
            self._params = params
        if k is None:
            self.k = (2*num_rows)-1
        else:
            self.k = k
        self.fitness_scores: List[float] = []
        self.num_rows = num_rows

    def powerset(self, iterable): # utility for an attempt of a more sophistacated but unsuccessful strategy
        s = list(iterable)
        return list(chain.from_iterable(combinations(s, r) for r in range(len(s)+1)))
    
    def binary_matrix_from_array(self, arr):
        binary_matrix = [list(map(lambda x: -1 if x == '0' else 1, bin(num)[2:].zfill(self.num_rows))) for num in arr]
        return np.array(binary_matrix)
    
    def powerset_matrix(self,arr): # utility for an attempt of a more sophistacated but unsuccessful strategy
        binary_matrix = list(self.binary_matrix_from_array(arr).T)
        res = []
        for row in binary_matrix:
            pset = self.powerset(row)
            pset.pop(0)
            res.append(list(map(lambda x: reduce(lambda v1, v2: int(v1*v2),x), pset)))
        return np.array(res)
    
    def __lt__(self, other: 'Agent'):
        return self.fitness < other.fitness

    def strategy(self, nim: Nim) -> Nimply:
        possible_moves: List[Tuple[float, Nimply]] = self.generate_states(nim)
        best: Nimply = max(possible_moves, key = lambda v: v[0])[1]
        return best

    def generate_states(self, nim: Nim) -> List[Tuple[float, Nimply]]:
        ply_list: List[float, Nimply] = []
        for ply in (Nimply(r, o) for r, c in enumerate(nim.rows) for o in range(1, c + 1)):
            tmp: Nim = deepcopy(nim)
            tmp.nimming(ply)
            n_more_than_one: int = 0
            for r in tmp.rows:
                if r>1:
                    n_more_than_one += 1
            score: float = np.average(self._params @ self.binary_matrix_from_array(tmp.rows))    
            ply_list.append((score, ply))
        return ply_list
            

    def analize(self, raw: Nim) -> dict:
        cooked = dict()
        cooked["possible_moves"] = dict()
        for ply in (Nimply(r, o) for r, c in enumerate(raw.rows) for o in range(1, min(self.k, c + 1))):
            tmp = deepcopy(raw)
            tmp.nimming(ply)
            cooked["possible_moves"][ply] = nim_sum(tmp)
        return cooked
    
    @property
    def params(self):
        return self._params

    @params.setter
    def params(self, p):
        self._params = p
    
    @property
    def fitness(self):
        if len(self.fitness_scores) > 0:
            return np.mean(self.fitness_scores)
        else:
            return 0

    @fitness.setter
    def fitness(self, fitness: float):
        self.fitness_scores.append(fitness)
    
    def reset(self):
        self.fitness_scores = []
    
    def __iadd__(self, other) -> None:
        self._params += other


class EvolutionTask:

    def __init__(self, num_rows: int, k: Optional[int] = None, scale: float = 1.0, loc: float = 0.0, strategy='comma', mu=25, population_size=50) -> None:
        if k is None:
            self.k = 2*num_rows-1
        else:
            self.k = k
        self.scale = scale
        self.loc = loc
        self.num_rows = num_rows
        self.mu = mu
        self.strategy = strategy
        self.population_size = population_size
    
    def mutate(self, agent: Agent) -> None:
        """
        Mutates according to Gaussian mutation
        """
        agent.params += np.random.normal(size = agent.params.shape, loc=self.loc, scale=self.scale)

    def play_match(self, a1: Agent, a2: Agent) -> Tuple[float, float]:
        """
        Play match between two agents
        The fitness of the game for one agent is given by the percentage of optimal moves it performs
        An optimal move is decided as written in https://en.wikipedia.org/wiki/Nim
        """
        correct_moves_count: List[int, int] = [0, 0]
        optimal_is_possible_count: List[int, int] = [0, 0]

        strategy: Tuple[Callable, Callable] = (a1.strategy, a2.strategy)

        nim: Nim = Nim(self.num_rows)
        optimal_nimsum: Optional[int] = 0
        played_optimal: bool = True
        player = 0
        while nim:
            ply = strategy[player](nim)
            if self.is_late_state(nim):
                if self.nim_sum(nim) == 0:
                    optimal_nimsum = 1
                    optimal_is_possible_count[player] += 1
                else:
                    optimal_nimsum = None
            else:
                if self.nim_sum(nim) > 0:
                    optimal_nimsum = 0
                    optimal_is_possible_count[player] += 1
                else:
                    optimal_nimsum = None
            nim.nimming(ply)
            if optimal_nimsum is not None:
                played_optimal = self.nim_sum(nim) == optimal_nimsum
            if played_optimal:
                correct_moves_count[player] += 1
            player = 1 - player
        
        fitness: List[float, float] = [0,0]

        for player in [0,1]:
            if optimal_is_possible_count[player] == 0:
                fitness[player] = 0
            else:
                fitness[player] = correct_moves_count[player]/optimal_is_possible_count[player]

        return fitness
    
    def is_late_state(self, nim: Nim):
        """
        Verifies whether we are in the late stages of the game, that is where we have at most one row with two or more piles
        """
        n_rows_with_multiple_piles: int = 0
        for row in nim.rows:
            if row>=2:
                n_rows_with_multiple_piles += 1
        return n_rows_with_multiple_piles < 2
    
    def tournament(self, size: Union[int, float]):
        """
        Do random matches to calculate fitness
        """
        if isinstance(size, float):
            size = int(size*self.population_size)
        matches = np.random.choice(self.agents, size=(size,2))
        for a1, a2 in matches:
            f1, f2 = self.play_match(a1, a2)
            a1.fitness = f1
            a2.fitness = f2

    def nim_sum(self, state: Nim) -> int:
        """
        Calculate the nim sum of a state
        """
        tmp = np.array([tuple(int(x) for x in f"{c:032b}") for c in state.rows])
        xor = tmp.sum(axis=0) % 2
        return int("".join(str(_) for _ in xor), base=2)
    
    def exploration(self, tournament_size = 10.0) -> None:
        """
        Performs tournament to calculate fitness and select parents
        """
        self.tournament(tournament_size)
        mu = self.mu
        parents = np.partition(self.agents, self.population_size-mu)[self.population_size-mu:] # takes the top half parents in terms of fitness
        children = []
        if self.strategy == 'comma':
            num_children = self.population_size
        else:
            num_children = self.population_size-mu
        for _ in range(num_children):
            children.append(self.crossover(np.random.choice(parents), np.random.choice(parents)))
        """for _ in range(2):
            random_indices = np.random.choice(mu, size=mu, replace=False) # randomly select two parents 
            for i in range(len(random_indices)//2):
                children.append(self.crossover(parents[random_indices[i]], parents[random_indices[(len(random_indices)//2+i)%(len(random_indices))]]))"""
        if self.strategy == 'comma':
            self.agents = np.array(children)
        else:
            num_children = self.population_size-mu
        self.agents = np.concatenate((parents,children))
    
    def crossover(self, a1: Agent, a2: Agent) -> Agent:
        """
        Given two agents it randomly selects the parameters between the two
        """
        new_params: List[float] = []
        for i in range(len(a1.params)):
            if np.random.normal() > 0:
                new_params.append(a1.params[i])
            else:
                new_params.append(a2.params[i])
        return Agent(self.num_rows, np.array(new_params), self.k)        


    def exploitation(self, tournament_size = 10.0) -> None:
        """
        Mutates parameters of the agents with Gaussian mutation
        """
        self.tournament(tournament_size)
        mu = self.mu
        parents = np.partition(self.agents, self.population_size-mu)[self.population_size-mu:] # takes the top half parents in terms of fitness
        children = []
        if self.strategy == 'comma':
            num_children = self.population_size
        else:
            num_children = self.population_size-mu
        for _ in range(num_children):
            children.append(self.mutate(np.random.choice(parents)))

        """for a in self.agents:
            self.mutate(a)"""
    
    def reset_fitness(self) -> None:
        for agent in self.agents:
            agent.reset()

    def train(self, n_generations=100, tournament_size=10.0, temperature = 0.5):
        """
        Training loop, the temperature defines the transition from an exploration prevalent strategy to an exploitation prevalent strategy.
        In particular exploitation is performed with probability P[X>(generation/tot_generations)^t], while exploitation is performed with P[X<(generation/tot_generations)^t], where X is uniformly distributed between 0 and 1
        """
        self.agents: List[Agent] = [Agent(self.num_rows, k=self.k) for _ in range(self.population_size)]
        self.best_agent = None
        for gen in tqdm(range(n_generations)):
            if np.random.uniform(0,1) < np.power(gen/n_generations, temperature):
                #exploitation
                self.exploitation()
            if np.random.uniform(0,1) > np.power(gen/n_generations, temperature):
                #exploration
                self.exploration(tournament_size=tournament_size)
            self.best_agent = max(self.agents, key = lambda x: x.fitness)
            self.reset_fitness()
    
    @property
    def best_agent(self) -> Agent:
        return self._best_agent

    @best_agent.setter
    def best_agent(self, a: Optional[Agent]) -> Agent:
        self._best_agent = a
    
    def test_best_agent(self, expert_strategy: Callable[[Nim, Nimply], None], num_matches=100):
        strategy = (self.best_agent.strategy, expert_strategy)
        wins = 0
        for _ in range(num_matches):
            nim = Nim(self.num_rows)
            player = 0
            while nim:
                ply = strategy[player](nim)
                nim.nimming(ply)
                player = 1 - player
            if player==0:
                wins += 1
        logging.info(f"status: best agent won {100*wins/num_matches}% of times")

class AgentDict:

    """
    Class for storing rule based agents
    """

    def __init__(self, k: Optional[int] = None) -> None:
        self.k = k
    
    def pure_random(self, state: Nim) -> Nimply:
        """A completely random move"""
        if self.k is None:
            self.k = len(state.rows)*2-1
        row = random.choice([r for r, c in enumerate(state.rows) if c > 0])
        num_objects = random.randint(1, min(self.k, state.rows[row]))
        return Nimply(row, num_objects)

    def gabriele(self, state: Nim) -> Nimply:
        """Pick always the maximum possible number of the lowest row"""
        if self.k is None:
            self.k = len(state.rows)*2-1
        possible_moves = [(r, o) for r, c in enumerate(state.rows) for o in range(1, c + 1)]
        return Nimply(*max(possible_moves, key=lambda m: (-m[0], min(self.k, m[1]))))
    

    def nim_sum(self, state: Nim) -> int:
        tmp = np.array([tuple(int(x) for x in f"{c:032b}") for c in state.rows])
        xor = tmp.sum(axis=0) % 2
        return int("".join(str(_) for _ in xor), base=2)


    def analize(self, raw: Nim) -> dict:
        cooked = dict()
        cooked["possible_moves"] = dict()
        for ply in (Nimply(r, o) for r, c in enumerate(raw.rows) for o in range(1, min(self.k, c + 1))):
            tmp = deepcopy(raw)
            tmp.nimming(ply)
            cooked["possible_moves"][ply] = nim_sum(tmp)
        return cooked


    def optimal(self, state: Nim) -> Nimply:
        analysis = analize(state)
        logging.debug(f"analysis:\n{pformat(analysis)}")
        spicy_moves = [ply for ply, ns in analysis["possible_moves"].items() if ns != 0]
        if not spicy_moves:
            spicy_moves = list(analysis["possible_moves"].keys())
        ply = random.choice(spicy_moves)
        return ply

In [59]:
et = EvolutionTask(3, mu=25, population_size=50, strategy='comma')
et.train(n_generations=100, temperature=0.5, tournament_size=2.0)
a: Agent = et.best_agent

100%|██████████| 100/100 [05:35<00:00,  3.36s/it]


In [61]:
agent_dict = AgentDict()

et.test_best_agent(agent_dict.pure_random, num_matches=1000)

INFO:root:status: best agent won 57.6% of times
