Copyright **`(c)`** 2022 Giovanni Squillero `<squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free for personal or classroom use; see [`LICENSE.md`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  


# Lab 3: Policy Search

## Task

Write agents able to play [*Nim*](https://en.wikipedia.org/wiki/Nim), with an arbitrary number of rows and an upper bound $k$ on the number of objects that can be removed in a turn (a.k.a., *subtraction game*).

The player **taking the last object wins**.

* Task3.1: An agent using fixed rules based on *nim-sum* (i.e., an *expert system*)
* Task3.2: An agent using evolved rules
* Task3.3: An agent using minmax
* Task3.4: An agent using reinforcement learning

## Instructions

* Create the directory `lab3` inside the course repo 
* Put a `README.md` and your solution (all the files, code and auxiliary data if needed)

## Notes

* Working in group is not only allowed, but recommended (see: [Ubuntu](https://en.wikipedia.org/wiki/Ubuntu_philosophy) and [Cooperative Learning](https://files.eric.ed.gov/fulltext/EJ1096789.pdf)). Collaborations must be explicitly declared in the `README.md`.
* [Yanking](https://www.emacswiki.org/emacs/KillingAndYanking) from the internet is allowed, but sources must be explicitly declared in the `README.md`.

## Deadlines ([AoE](https://en.wikipedia.org/wiki/Anywhere_on_Earth))

* Sunday, December 4th for Task3.1 and Task3.2
* Sunday, December 11th for Task3.3 and Task3.4
* Sunday, December 18th for all reviews

In [34]:
import logging
from collections import namedtuple
import random
from typing import Callable
from copy import deepcopy
from itertools import accumulate
from operator import xor

## The *Nim* and *Nimply* classes

In [35]:
Nimply = namedtuple("Nimply", "row, num_objects")

In [36]:
class Nim:
    def __init__(self, num_rows: int, k: int = None) -> None:
        self._rows = [i * 2 + 1 for i in range(num_rows)]
        self._k = k

    def __bool__(self):
        return sum(self._rows) > 0

    def __str__(self):
        return "<" + " ".join(str(_) for _ in self._rows) + ">"

    def __hash__(self) -> int:
        rowList=list(self._rows)
        rowList.sort()
        return hash(" ".join(str(_) for _ in self._rows))

    def __eq__(self, __o: object) -> bool:
        return (self.__hash__()==__o.__hash__())

    @property
    def rows(self) -> tuple:
        return tuple(self._rows)

    @property
    def k(self) -> int:
        return self._k

    def nimming(self, ply: Nimply) -> None:
        row, num_objects = ply
        assert self._rows[row] >= num_objects
        assert self._k is None or num_objects <= self._k
        self._rows[row] -= num_objects
    

## Sample Strategies 

In [37]:
def pure_random(state: Nim) -> Nimply:
    row = random.choice([r for r, c in enumerate(state.rows) if c > 0])
    num_objects = random.randint(1, state.rows[row])
    return Nimply(row, num_objects)

In [38]:
def gabriele(state: Nim) -> Nimply:
    """Pick always the maximum possible number of the lowest row"""
    possible_moves = [(r, o) for r, c in enumerate(state.rows) for o in range(1, c + 1)]
    return Nimply(*max(possible_moves, key=lambda m: (-m[0], m[1])))

## P1: Expert Player (same as Professor's)

In [39]:
def nim_sum(state: Nim) -> int:
    *_, result = accumulate(state.rows, xor)
    return result


def cook_status(state: Nim,nimSum=False) -> dict:
    cooked = dict()
    cooked["possible_moves"] = [
        (r, o) for r, c in enumerate(state.rows) for o in range(1, c + 1) if state.k is None or o <= state.k
    ]
    cooked["active_rows_number"] = sum(o > 0 for o in state.rows)
    cooked["shortest_row"] = min((x for x in enumerate(state.rows) if x[1] > 0), key=lambda y: y[1])[0]
    cooked["longest_row"] = max((x for x in enumerate(state.rows)), key=lambda y: y[1])[0]
    if nimSum:
        cooked["nim_sum"] = nim_sum(state)

        brute_force = list()
        for m in cooked["possible_moves"]:
            tmp = deepcopy(state)
            tmp.nimming(m)
            brute_force.append((m, nim_sum(tmp)))
        cooked["brute_force"] = brute_force
    return cooked

In [40]:
def optimal_startegy(state: Nim) -> Nimply:
    data = cook_status(state,True)
    return next((bf for bf in data["brute_force"] if bf[1] == 0), random.choice(data["brute_force"]))[0]

In [41]:
NUM_MATCHES = 100
NIM_SIZE = 20


def evaluate(strategy: Callable,opponent) -> float:
    opponent = (strategy, opponent)
    won = 0

    for m in range(NUM_MATCHES):
        nim = Nim(NIM_SIZE)
        player = 0
        while nim:
            ply = opponent[player](nim)
            nim.nimming(ply)
            player = 1 - player
        if player == 1:
            won += 1
    return won / NUM_MATCHES

## P2 -> Evolutionary Alg 

In [42]:
def make_evolutionary_strategy(genome: dict) -> Callable:
    def evolvable(state: Nim) -> Nimply:
        data = cook_status(state)

        if data["active_rows_number"]==1:
            ply=Nimply(data["shortest_row"],state.rows[data["shortest_row"]])
            return ply

        if random.random() < genome["p"]:
            #Take Everything
            ply=Nimply(data["shortest_row"],state.rows[data["shortest_row"]])
        else:
            #Take Everything but 1
            rowChoice=random.choice([x[0] for x in enumerate(state.rows) if x[1]!=0])
            ply=Nimply(rowChoice,state.rows[rowChoice] if state.rows[rowChoice]==1 else state.rows[rowChoice]-1)
        return ply

    return evolvable

In [43]:
NUM_GEN=100


def evolve(strat:Callable,opponent):
    genome={"p":0.5}
    lostMatchesCounter=0
    prevWR=evaluate(strat(genome),opponent)  
    direction=+0.05
    for i in range(NUM_GEN):
        sign=(direction/abs(direction)) #current sign of direction + or -

        if lostMatchesCounter==0: #if you just made a mistake, double check
            genome["p"]+=direction
        newStrat=strat(genome)
        wr=evaluate(newStrat,opponent)
        if(prevWR<=wr):
            lostMatchesCounter=0
            direction=min(direction*1.2,0.1*sign) #we slowly gain confidence in our direction,
            # up to a maximum accelleration of .1
        else :
            lostMatchesCounter+=1
            direction=0.05*sign#reset step to 0.05
            if lostMatchesCounter>=2:
                direction=-direction #set to flipped sign
                lostMatchesCounter=0
        
        prevWR=wr
        print(f"Round {i+1}: WR:{wr} with P:{genome['p']}")
    return genome

### Example match

In [44]:
logging.getLogger().setLevel(logging.DEBUG)
#Evolve him for the matchup
strategy = (make_evolutionary_strategy(
    evolve(make_evolutionary_strategy,pure_random)
),pure_random)
#play the "real" game
nim = Nim(7)
logging.debug(f"status: Initial board  -> {nim}")
player = 0
while nim:
    ply = strategy[player](nim)
    nim.nimming(ply)
    logging.debug(f"status: After player {player} -> {nim}")
    player = 1 - player
winner = 1 - player
logging.info(f"status: Player {winner} won!")

Round 1: WR:0.89 with P:0.55
Round 2: WR:0.9 with P:0.6100000000000001
Round 3: WR:0.93 with P:0.682
Round 4: WR:0.94 with P:0.7684000000000001
Round 5: WR:0.93 with P:0.8684000000000001
Round 6: WR:0.99 with P:0.8684000000000001
Round 7: WR:0.97 with P:0.9284000000000001
Round 8: WR:1.0 with P:0.9284000000000001
Round 9: WR:0.97 with P:0.9884000000000002
Round 10: WR:0.99 with P:0.9884000000000002
Round 11: WR:0.91 with P:1.0484000000000002
Round 12: WR:0.97 with P:1.0484000000000002
Round 13: WR:0.98 with P:1.1084000000000003
Round 14: WR:0.98 with P:1.1804000000000003
Round 15: WR:0.97 with P:1.2668000000000004
Round 16: WR:0.97 with P:1.2668000000000004
Round 17: WR:0.95 with P:1.3268000000000004
Round 18: WR:0.94 with P:1.3268000000000004
Round 19: WR:0.98 with P:1.2768000000000004
Round 20: WR:0.97 with P:1.1768000000000003
Round 21: WR:0.99 with P:1.1768000000000003
Round 22: WR:0.99 with P:1.0768000000000002
Round 23: WR:0.94 with P:0.9568000000000002
Round 24: WR:0.97 with P:0

DEBUG:root:status: Initial board  -> <1 3 5 7 9 11 13>
DEBUG:root:status: After player 0 -> <0 3 5 7 9 11 13>
DEBUG:root:status: After player 1 -> <0 3 5 7 2 11 13>
DEBUG:root:status: After player 0 -> <0 3 5 7 0 11 13>
DEBUG:root:status: After player 1 -> <0 3 5 7 0 11 11>
DEBUG:root:status: After player 0 -> <0 3 1 7 0 11 11>
DEBUG:root:status: After player 1 -> <0 3 0 7 0 11 11>
DEBUG:root:status: After player 0 -> <0 1 0 7 0 11 11>
DEBUG:root:status: After player 1 -> <0 1 0 7 0 3 11>
DEBUG:root:status: After player 0 -> <0 0 0 7 0 3 11>
DEBUG:root:status: After player 1 -> <0 0 0 2 0 3 11>
DEBUG:root:status: After player 0 -> <0 0 0 0 0 3 11>
DEBUG:root:status: After player 1 -> <0 0 0 0 0 2 11>
DEBUG:root:status: After player 0 -> <0 0 0 0 0 0 11>
DEBUG:root:status: After player 1 -> <0 0 0 0 0 0 0>
INFO:root:status: Player 1 won!


Round 100: WR:0.88 with P:0.5112000000000008


## P3 -> MinMax Approach

In [45]:

def get_val(board:Nim):
    #if there is only one row left you have won
    if sum(o > 0 for o in board.rows)==1:
        return 1
    else:
        return 0

boardCache=dict()

#wrapper for the recursive minmax_rec, used to discard val and only return ply
def minmax(board:Nim):
    return minmax_rec(board)[0]

def minmax_rec(board:Nim):
    #check if the state is already evaluated to avoid recomputation
    if board in boardCache.keys() :
        return boardCache[board]
    val = get_val(board)
    possible = [(r, o) for r, c in enumerate(board.rows) for o in range(1,c+1)] 
    #we sort the possible moves in decreasing order based on the number of elements removed
    possible.sort(reverse=True,key=lambda k: k[1]) 
    if val != 0:
        #if someone has won return the move and its val
        return possible[0],val
    evaluations = list()
    for ply in possible:
        newBoard=deepcopy(board) #return new board
        newBoard.nimming(ply)
        _, val = minmax_rec(newBoard)
        evaluations.append((ply, -val))
        if(val==-1):
            #as we are returning only the max we stop as soon as we get -1 (because it is saved as -val)
            break
    #we cache the evaluation of the current state
    boardCache[board]=max(evaluations, key=lambda k: k[1])
    return max(evaluations, key=lambda k: k[1])


### Example match

In [46]:
logging.getLogger().setLevel(logging.DEBUG)

NUM_ROWS = 6

strategy = ()
nim = Nim(NUM_ROWS)

#We check which player should start in order not to lose automatically against the optimal strategy
minmax_player = int(nim_sum(nim) == 0)
logging.debug(f"We are player {minmax_player}")
if minmax_player == 1:
    strategy = (optimal_startegy,minmax)
else:
    strategy = (minmax,optimal_startegy)

logging.debug(f"status: Initial board  -> {nim}")
player = 0
while nim:
    ply = strategy[player](nim)
    nim.nimming(ply)
    logging.debug(f"status: After player {player} -> {nim}")
    player = 1 - player
winner = 1 - player

logging.info(f"status: Player {winner} won!")

DEBUG:root:We are player 0
DEBUG:root:status: Initial board  -> <1 3 5 7 9 11>
DEBUG:root:status: After player 0 -> <1 1 5 7 9 11>
DEBUG:root:status: After player 1 -> <1 1 5 7 9 8>
DEBUG:root:status: After player 0 -> <1 1 5 4 9 8>
DEBUG:root:status: After player 1 -> <1 1 5 4 5 8>
DEBUG:root:status: After player 0 -> <1 1 5 4 5 4>
DEBUG:root:status: After player 1 -> <1 1 5 3 5 4>
DEBUG:root:status: After player 0 -> <1 1 2 3 5 4>
DEBUG:root:status: After player 1 -> <1 1 2 3 5 2>
DEBUG:root:status: After player 0 -> <1 1 2 3 3 2>
DEBUG:root:status: After player 1 -> <1 1 2 3 3 0>
DEBUG:root:status: After player 0 -> <1 1 0 3 3 0>
DEBUG:root:status: After player 1 -> <1 1 0 2 3 0>
DEBUG:root:status: After player 0 -> <0 1 0 2 3 0>
DEBUG:root:status: After player 1 -> <0 0 0 2 3 0>
DEBUG:root:status: After player 0 -> <0 0 0 2 2 0>
DEBUG:root:status: After player 1 -> <0 0 0 2 1 0>
DEBUG:root:status: After player 0 -> <0 0 0 1 1 0>
DEBUG:root:status: After player 1 -> <0 0 0 1 0 0>
DE