Copyright **`(c)`** 2022 Giovanni Squillero `<squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free for personal or classroom use; see [`LICENSE.md`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  


# Lab 3: ES

## Task

Write agents able to play [*Nim*](https://en.wikipedia.org/wiki/Nim), with an arbitrary number of rows and an upper bound $k$ on the number of objects that can be removed in a turn (a.k.a., *subtraction game*).

The goal of the game is to **avoid** taking the last object.

* Task2.1: An agent using fixed rules based on *nim-sum* (i.e., an *expert system*)
* Task2.2: An agent using evolved rules using ES

## Instructions

* Create the directory `lab2` inside the course repo 
* Put a `README.md` and your solution (all the files, code and auxiliary data if needed)

## Notes

* Working in group is not only allowed, but recommended (see: [Ubuntu](https://en.wikipedia.org/wiki/Ubuntu_philosophy) and [Cooperative Learning](https://files.eric.ed.gov/fulltext/EJ1096789.pdf)). Collaborations must be explicitly declared in the `README.md`.
* [Yanking](https://www.emacswiki.org/emacs/KillingAndYanking) from the internet is allowed, but sources must be explicitly declared in the `README.md`.



In [1]:
import logging
from pprint import pprint, pformat
from collections import namedtuple
import random
from copy import deepcopy
from tqdm import tqdm
from queue import PriorityQueue

## The *Nim* and *Nimply* classes

In [2]:
Nimply = namedtuple("Nimply", "row, num_objects")


In [3]:
class Nim:
    def __init__(self, num_rows: int, k: int = None) -> None:
        self._rows = [i * 2 + 1 for i in range(num_rows)]
        self._k = k

    def __bool__(self):
        return sum(self._rows) > 0

    def __str__(self):
        return "<" + " ".join(str(_) for _ in self._rows) + ">"

    @property
    def rows(self) -> tuple:
        return tuple(self._rows)

    def nimming(self, ply: Nimply) -> None:
        row, num_objects = ply
        assert self._rows[row] >= num_objects
        assert self._k is None or num_objects <= self._k
        self._rows[row] -= num_objects


## Sample (and silly) startegies 

In [4]:
def pure_random(state: Nim) -> Nimply:
    """A completely random move"""
    row = random.choice([r for r, c in enumerate(state.rows) if c > 0])
    num_objects = random.randint(1, state.rows[row])
    return Nimply(row, num_objects)


In [5]:
def gabriele(state: Nim) -> Nimply:
    """Pick always the maximum possible number of the lowest row"""
    possible_moves = [(r, o) for r, c in enumerate(state.rows) for o in range(1, c + 1)]
    return Nimply(*max(possible_moves, key=lambda m: (-m[0], m[1])))


In [6]:
def adaptive(state: Nim) -> Nimply:
    """A strategy that can adapt its parameters"""
    genome = {"love_small": 0.5}


In [7]:
import numpy as np


def nim_sum(state: Nim) -> int:
    tmp = np.array([tuple(int(x) for x in f"{c:032b}") for c in state.rows])
    xor = tmp.sum(axis=0) % 2
    return int("".join(str(_) for _ in xor), base=2)

# Use a cache to improve computation time of this function
#analize_cache = dict()
def analize(raw: Nim) -> dict:
    # global analize_cache
    # cached_value = analize_cache.get(raw)
    # if cached_value is not None:
    #     key = list(analize_cache.keys())[list(analize_cache.values()).index(cached_value)]
    #     print(raw, key)
    #     return cached_value
    # else :
        cooked = dict()
        cooked["possible_moves"] = dict()
        for ply in (Nimply(r, o) for r, c in enumerate(raw.rows) for o in range(1, c + 1)):
            tmp = deepcopy(raw)
            tmp.nimming(ply)
            cooked["possible_moves"][ply] = nim_sum(tmp)
        #analize_cache[raw] = cooked
        #print(raw, "added in cache -->", cooked)
        return cooked


def optimal(state: Nim) -> Nimply:
    analysis = analize(state)
    logging.debug(f"analysis:\n{pformat(analysis)}")
    spicy_moves = [ply for ply, ns in analysis["possible_moves"].items() if ns != 0]
    if not spicy_moves:
        spicy_moves = list(analysis["possible_moves"].keys())
    ply = random.choice(spicy_moves)
    return ply


## Oversimplified match

In [8]:
logging.getLogger().setLevel(logging.INFO)

strategy = (pure_random, optimal)

nim = Nim(5)
logging.info(f"init : {nim}")
player = 0
while nim:
    ply = strategy[player](nim)
    logging.info(f"ply: player {player} plays {ply}")
    nim.nimming(ply)
    logging.info(f"status: {nim}")
    player = 1 - player
logging.info(f"status: Player {player} won!")


INFO:root:init : <1 3 5 7 9>
INFO:root:ply: player 0 plays Nimply(row=1, num_objects=3)
INFO:root:status: <1 0 5 7 9>
INFO:root:ply: player 1 plays Nimply(row=4, num_objects=9)
INFO:root:status: <1 0 5 7 0>
INFO:root:ply: player 0 plays Nimply(row=0, num_objects=1)
INFO:root:status: <0 0 5 7 0>
INFO:root:ply: player 1 plays Nimply(row=3, num_objects=4)
INFO:root:status: <0 0 5 3 0>
INFO:root:ply: player 0 plays Nimply(row=3, num_objects=3)
INFO:root:status: <0 0 5 0 0>
INFO:root:ply: player 1 plays Nimply(row=2, num_objects=4)
INFO:root:status: <0 0 1 0 0>
INFO:root:ply: player 0 plays Nimply(row=2, num_objects=1)
INFO:root:status: <0 0 0 0 0>
INFO:root:status: Player 1 won!


## Automatic comparison of strategies

In [51]:
# Run 100 games with the given parameters and print number of victories for each player
def compare_strategies(plyr1_strat, plyr2_strat, nim_size) :
    strategy = (plyr1_strat, plyr2_strat)
    plyr1_victories = 0
    plyr2_victories = 0
    for _ in tqdm(range(1000)) :
        nim = Nim(nim_size)
        player = 0
        while nim:
            ply = strategy[player](nim)
            nim.nimming(ply)
            player = 1 - player
        if player == 0 :
            plyr1_victories += 1
        else :
            plyr2_victories += 1
    
    results = (plyr1_victories, plyr2_victories)
    print("Victories -> Player 1 :", plyr1_victories, ", Player 2 :", plyr2_victories)
    return results

compare_strategies(optimal_rule_based1, optimal_rule_based3, 3)

100%|██████████| 1000/1000 [00:07<00:00, 131.53it/s]

Victories -> Player 1 : 507 , Player 2 : 493





(507, 493)

## First task : Improve rule-based agent

In [11]:
def optimal_rule_based1(state : Nim) -> Nimply :
    # We keep the basic idea : we first have to filter the good moves before selecting one
    analysis = analize(state)
    good_moves = [ply for ply, ns in analysis["possible_moves"].items() if ns != 0]
   
    # If there is no good move, we can only select one random bad move
    if not good_moves:
        good_moves = list(analysis["possible_moves"].keys())
        ply = random.choice(good_moves)
        return ply
    
    # The best move one can make is the one that lets no good move to their opponent
    best_moves = list()
    for ply in good_moves :
        tmp_nim = deepcopy(state)
        tmp_nim.nimming(ply)
        opponent_moves = analize(tmp_nim)["possible_moves"]
        good_opponent_moves = [ply for ply,ns in opponent_moves.items() if ns != 0]
        if not good_opponent_moves :
            best_moves.append(ply)
    
    # If there are best moves, the agent can randomly choose one of them
    if len(best_moves) > 0 :
        ply = random.choice(best_moves)
        return ply
    
    # If there are no best moves, agent can only choose a good move
    ply = random.choice(good_moves)
    return ply

In [12]:
def optimal_rule_based2(state : Nim) -> Nimply :
    # We keep the basic idea : we first have to filter the good moves before selecting one
    analysis = analize(state)
    good_moves = [ply for ply, ns in analysis["possible_moves"].items() if ns != 0]
   
    # If there is no good move, we can only select one random bad move
    if not good_moves:
        good_moves = list(analysis["possible_moves"].keys())
        ply = random.choice(good_moves)
        return ply
    
    # The possible moves are sorted regarding the number of good moves they let to the opponent
    sorted_good_moves = PriorityQueue()
    for ply in good_moves :
        tmp_nim = deepcopy(state)
        tmp_nim.nimming(ply)
        opponent_moves = analize(tmp_nim)["possible_moves"]
        good_opponent_moves = [ply for ply,ns in opponent_moves.items() if ns != 0]
        sorted_good_moves.put((len(good_opponent_moves), ply))
    
    # We always choose the move that lets the lesser good moves to the opponent
    _, ply = sorted_good_moves.get()
    return ply

In [13]:
def optimal_rule_based3(state : Nim) -> Nimply :
    # We keep the basic idea : we first have to filter the good moves before selecting one
    analysis = analize(state)
    good_moves = [ply for ply, ns in analysis["possible_moves"].items() if ns != 0]
   
    # If there is no good move, we can still apply the strategy of taking the move that lets the less godd moves to the opponent
    if not good_moves:
        good_moves = list(analysis["possible_moves"].keys())
    
    # The possible moves are sorted regarding the number of good moves they let to the opponent
    sorted_good_moves = PriorityQueue()
    for ply in good_moves :
        tmp_nim = deepcopy(state)
        tmp_nim.nimming(ply)
        opponent_moves = analize(tmp_nim)["possible_moves"]
        good_opponent_moves = [ply for ply,ns in opponent_moves.items() if ns != 0]
        sorted_good_moves.put((len(good_opponent_moves), ply))
    
    # We always choose the move that lets the lesser good moves to the opponent
    _, ply = sorted_good_moves.get()
    return ply

## Second task : create an evolutionary agent

In [143]:
class Evolutionary_Agent1 :
    def __init__(self, nim : Nim) -> None:
        self.limit = nim._k if nim._k is not None else np.max(nim.rows)
        self.rules = np.array([['np.sum(nim.rows) <=', random.randint(1,50), random.randint(0, len(nim.rows)) - 1, random.randint(1,self.limit)],
                      ['np.sum([row for i,row in enumerate(nim.rows) if i % 2 == 1]) <=', random.randint(1,50), random.randint(0, len(nim.rows) - 1), random.randint(1,self.limit)],
                      ['np.sum([row for i,row in enumerate(nim.rows) if i % 2 == 0]) <=', random.randint(1,50), random.randint(0, len(nim.rows) - 1), random.randint(1,self.limit)],
                      ], dtype=object)
    
    def default_move(self, nim : Nim) : 
        ply = Nimply(np.argmax(np.array(nim.rows) != 0), 1)
        return ply
    
    def strategy(self, nim : Nim) :
        for rule in self.rules :
            if eval(rule[0] + str(rule[1])) :
                ply = Nimply(rule[2], rule[3])
                # If the move is valid, make it
                if nim.rows[ply.row] >= ply.num_objects :
                    return ply
        # If no valid move was found, make the default move
        return self.default_move(nim)


In [144]:
def evaluate_agent_fitness(agent : Evolutionary_Agent1) :
    results = compare_strategies(agent.strategy, optimal_rule_based3, 3)
    return results[0]

def one_plus_lambda_evolve(agent : Evolutionary_Agent1, lamb : int, nim : Nim) :
    num_rows = len(agent.rules)
    # TODO : Self-adaptive sigmas
    sigmas = np.ones((num_rows, 3))    
    agents = [agent]
    for _ in range(lamb) :
        new_agent = deepcopy(agent)
        mutations = np.random.normal(np.zeros((num_rows,3)),np.ones((num_rows,3))).astype(int)
        new_agent.rules[:,1:] += mutations
        # Prevent invalid mutations from happening (negative thresholds or none object taken)
        new_agent.rules[:,1:] = np.maximum([[0,0,1] for _ in range(num_rows)], new_agent.rules[:,1:])
        new_agent.rules[:,1:] = np.minimum([[np.sum(nim.rows), num_rows - 1, np.argmax(nim.rows)] for _ in range(num_rows)], new_agent.rules[:,1:])
        agents.append(new_agent)
    
    fitness = []
    for a in agents :
        fitness.append(evaluate_agent_fitness(a))
    
    return agents[np.argmax(fitness)]

In [146]:
nim = Nim(3)
test = Evolutionary_Agent1(nim)
new_test = one_plus_lambda_evolve(test, 10, nim)

  0%|          | 0/1000 [00:00<?, ?it/s]

100%|██████████| 1000/1000 [00:03<00:00, 326.55it/s]


Victories -> Player 1 : 0 , Player 2 : 1000


100%|██████████| 1000/1000 [00:02<00:00, 353.65it/s]


Victories -> Player 1 : 0 , Player 2 : 1000


100%|██████████| 1000/1000 [00:02<00:00, 355.06it/s]


Victories -> Player 1 : 0 , Player 2 : 1000


100%|██████████| 1000/1000 [00:04<00:00, 213.94it/s]


Victories -> Player 1 : 0 , Player 2 : 1000


100%|██████████| 1000/1000 [00:04<00:00, 234.13it/s]


Victories -> Player 1 : 0 , Player 2 : 1000


100%|██████████| 1000/1000 [00:02<00:00, 412.12it/s]


Victories -> Player 1 : 0 , Player 2 : 1000


100%|██████████| 1000/1000 [00:02<00:00, 414.05it/s]


Victories -> Player 1 : 0 , Player 2 : 1000


100%|██████████| 1000/1000 [00:02<00:00, 407.45it/s]


Victories -> Player 1 : 0 , Player 2 : 1000


100%|██████████| 1000/1000 [00:02<00:00, 394.43it/s]


Victories -> Player 1 : 0 , Player 2 : 1000


100%|██████████| 1000/1000 [00:02<00:00, 385.72it/s]


Victories -> Player 1 : 0 , Player 2 : 1000


100%|██████████| 1000/1000 [00:02<00:00, 395.27it/s]

Victories -> Player 1 : 0 , Player 2 : 1000





In [109]:
np.random.normal(np.zeros((len(test.rules),3)),np.ones((len(test.rules),3)))

array([[-0.36645553,  1.15063815, -1.38711325],
       [-1.03914014, -1.5501803 , -1.75795627],
       [ 1.10735406,  0.14152366, -0.49839743]])

In [97]:
tab = (0,3,0)
np.argmax(np.array(tab) != 0)

1