Copyright **`(c)`** 2022 Giovanni Squillero `<squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free for personal or classroom use; see [`LICENSE.md`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  


# Lab 3: Policy Search

## Task

Write agents able to play [*Nim*](https://en.wikipedia.org/wiki/Nim), with an arbitrary number of rows and an upper bound $k$ on the number of objects that can be removed in a turn (a.k.a., *subtraction game*).

The player **taking the last object wins**.

* Task3.1: An agent using fixed rules based on *nim-sum* (i.e., an *expert system*)
* Task3.2: An agent using evolved rules
* Task3.3: An agent using minmax
* Task3.4: An agent using reinforcement learning

## Instructions

* Create the directory `lab3` inside the course repo 
* Put a `README.md` and your solution (all the files, code and auxiliary data if needed)

## Notes

* Working in group is not only allowed, but recommended (see: [Ubuntu](https://en.wikipedia.org/wiki/Ubuntu_philosophy) and [Cooperative Learning](https://files.eric.ed.gov/fulltext/EJ1096789.pdf)). Collaborations must be explicitly declared in the `README.md`.
* [Yanking](https://www.emacswiki.org/emacs/KillingAndYanking) from the internet is allowed, but sources must be explicitly declared in the `README.md`.

## Deadlines ([AoE](https://en.wikipedia.org/wiki/Anywhere_on_Earth))

* Sunday, December 4th for Task3.1 and Task3.2
* Sunday, December 11th for Task3.3 and Task3.4
* Sunday, December 18th for all reviews

In [81]:
import logging
from collections import namedtuple
import random
from typing import Callable
from copy import deepcopy
from itertools import accumulate
from operator import xor
from matplotlib import pyplot as plt
import numpy as np
import math
import json
import os


logging.basicConfig(
    format="[%(asctime)s] %(levelname)s: %(message)s",
    datefmt="%H:%M:%S",
    level=logging.INFO,
)

## The *Nim* and *Nimply* classes

In [82]:
Nimply = namedtuple("Nimply", "row, num_objects")

In [83]:
class Nim:
    def __init__(self, num_rows: int, k: int = None) -> None:
        self._rows = [i * 2 + 1 for i in range(num_rows)]
        self._k = k
        self._sticks = sum(self._rows)

    def __bool__(self):
        return sum(self._rows) > 0

    def __str__(self):
        return "<" + " ".join(str(_) for _ in self._rows) + ">"

    @property
    def rows(self) -> tuple:
        return tuple(self._rows)

    @property
    def k(self) -> int:
        return self._k

    @property
    def sticks(self) -> int:
        return self._sticks

    def nimming(self, ply: Nimply) -> None:
        row, num_objects = ply
        assert self._rows[row] >= num_objects
        assert self._k is None or num_objects <= self._k
        self._rows[row] -= num_objects

In [84]:
def save_strategy(strategy: dict) -> None:
    DATA_DIR = "strategies/"
    current_s = 0
    if os.listdir(DATA_DIR):
        current_s = max([int(filename.split(".")[0]) for filename in os.listdir(DATA_DIR)]) + 1
    with open(DATA_DIR + str(current_s) + ".json", "w") as fp:
        json.dump(strategy , fp)
    return None

## Sample (and silly) startegies 

In [85]:
def pure_random(state: Nim) -> Nimply:
    row = random.choice([r for r, c in enumerate(state.rows) if c > 0])
    num_objects = random.randint(1, state.rows[row])
    return Nimply(row, num_objects)

In [86]:
def gabriele(state: Nim) -> Nimply:
    """Pick always the maximum possible number of the lowest row"""
    possible_moves = [(r, o) for r, c in enumerate(state.rows) for o in range(1, c + 1)]
    return Nimply(*max(possible_moves, key=lambda m: (-m[0], m[1])))

In [87]:
def aggressive(state: Nim) -> Nimply:
    """Pick always the entire row if the number of active rows is odd"""
    data = cook_status(state)
    if data['active_rows_number'] % 2 == 0:
        # random move
        row, num_objects = random.choice(data['possible_moves'])
    else:
        # aggressive move
        row = data['longest_row']
        num_objects = state.rows[row]
    return Nimply(row, num_objects)

In [88]:
def nim_sum(state: Nim) -> int:
    *_, result = accumulate(state.rows, xor)
    return result


def cook_status(state: Nim, complete=False) -> dict:
    cooked = dict()
    cooked["possible_moves"] = [
        (r, o) for r, c in enumerate(state.rows) for o in range(1, c + 1) if state.k is None or o <= state.k
    ]
    cooked["active_rows_number"] = sum(o > 0 for o in state.rows)
    cooked["shortest_row"] = min((x for x in enumerate(state.rows) if x[1] > 0), key=lambda y: y[1])[0]
    cooked["longest_row"] = max((x for x in enumerate(state.rows)), key=lambda y: y[1])[0]
    cooked["completation"] = sum(state.rows)/state.sticks
    if complete:
        brute_force = list()
        cooked["nim_sum"] = nim_sum(state)
        for m in cooked["possible_moves"]:
            tmp = deepcopy(state)
            tmp.nimming(m)
            brute_force.append((m, nim_sum(tmp)))
        cooked["brute_force"] = brute_force
    return cooked

In [89]:
def optimal_strategy(state: Nim) -> Nimply:
    data = cook_status(state, complete = True)
    return next((bf for bf in data["brute_force"] if bf[1] == 0), random.choice(data["brute_force"]))[0]

def human(state: Nim) -> Nimply:
    print(f"Current board: {state}")
    row = int(input("Row: "))
    num_objects = int(input("Num objects: "))
    return Nimply(row, num_objects)

In [90]:
# sample_parameter = {'p': .5, 'max_k': 1, 'turn_strategy': .6}
# concept of aggressivity -> I take everithing from the row

def evolvable_strategy(parameter: dict) -> Callable:
    def evolvable(state: Nim) -> Nimply:
        data = cook_status(state)
        if data['completation'] >= parameter['turn_strategy']:
            # early strategy: take from the longest row tha maximum amount, 
            # but leave always one.
            if state.k is not None:
                return Nimply(data['longest_row'], ((state.rows[data['longest_row']] - 1) % (parameter['max_k'] + 1)) % state.k)
            else:
                return Nimply(data['longest_row'], (state.rows[data['longest_row']] - 1) % (parameter['max_k'] + 1) + 1)
        else:
            # take the maximum amount of the shortest row
            if state.k is not None:
                return Nimply(data['shortest_row'], state.rows[data['shortest_row']] % state.k)
            else:
                return Nimply(data['shortest_row'], state.rows[data['shortest_row']])
    return evolvable

def evolvable_random_strategy(parameter: dict) -> Callable:
    # A the firts, just play random
    def evolvable(state: Nim) -> Nimply:
        row = 0
        num_objects = 0
        data = cook_status(state)
        if data['completation'] >= parameter['turn_strategy']:
            row = data['longest_row']
            num_objects = random.randint(0, state.rows[row])
        else:
            row, num_objects = data['shortest_row'], state.rows[data['shortest_row']]
        return Nimply(row, num_objects)
    return evolvable

In [91]:
NUM_MATCHES = 100
NIM_SIZE = 11

def evaluate_against(strategy: Callable, against: Callable) -> float:
    opponent = (strategy, against)
    won = 0
    for _ in range(NUM_MATCHES):
        nim = Nim(NIM_SIZE)
        player = 0
        while nim:
            # logging.debug(nim)
            ply = opponent[player](nim)
            nim.nimming(ply)
            player = 1 - player
        if player == 1: # winner is the zero
            won += 1
        # logging.debug(f"player {1 - player} has won.")
    return won / NUM_MATCHES

## Task 3.1 - Expert system
For solving this task, we introduced a certain amount of hardcoded strategies, and we evaluate it against each other

In [126]:
# Players
players = [gabriele, pure_random, aggressive]
scores = [[evaluate_against(player, opponent) for opponent in players] for player in players]

for i, x in enumerate(scores):
    for j, y in enumerate(x):
        print(f" {players[i].__name__} against {players[j].__name__}: {y}")


 gabriele against gabriele: 1.0
 gabriele against pure_random: 0.84
 gabriele against aggressive: 0.0
 pure_random against gabriele: 0.24
 pure_random against pure_random: 0.53
 pure_random against aggressive: 0.25
 aggressive against gabriele: 1.0
 aggressive against pure_random: 0.75
 aggressive against aggressive: 0.4


## Task 3.2
Here, we evolve some strategies


### Evolve the strategy

In [92]:
def tweak(sol: dict) -> dict:
    sol['turn_strategy'] += np.random.normal(0, .05)
    sol['turn_strategy'] = abs(sol['turn_strategy'])
    if sol['turn_strategy'] > 1:
        sol['turn_strategy'] = 1
    sol['max_k'] += math.trunc(np.random.normal(0, .5))
    sol['max_k'] = abs(sol['max_k'])
    
    return sol

def hill_climber(to_be_evolved: Callable):
    unuseful_steps = 0
    solution = {'max_k': 1, 'turn_strategy': .6}
    iteration = 0
    last_score = 0.0
    while unuseful_steps < 100 and iteration <= 1000:
        unuseful_steps += 1
        new_sol = tweak(solution)
        new_strategy = to_be_evolved(new_sol)
        score = evaluate_against(new_strategy, pure_random)
        if score >= last_score:
            last_score = score
            solution = new_sol
            unuseful_steps = 0
        iteration += 1
    return to_be_evolved(solution), solution

f1, sol1 = hill_climber(evolvable_strategy)
f2, sol2 = hill_climber(evolvable_random_strategy)

# plt.plot(range(len(fitness_log)), fitness_log)
# plt.show()

In [107]:
score1 = evaluate_against(f1, pure_random)
score2 = evaluate_against(f2, pure_random)

logging.info(f"{sol1}")
logging.info(f"S1: score =  {score1:.1%}")
logging.info(f"{sol2}")
logging.info(f"S2 score = : {score2:.1%}")

evaluate_against(f1, f2)

[18:13:47] INFO: {'max_k': 6, 'turn_strategy': 0.5053187055568902}
[18:13:47] INFO: S1: score =  91.0%
[18:13:47] INFO: {'max_k': 1, 'turn_strategy': 0.946127914495746}
[18:13:47] INFO: S2 score = : 97.0%


1.0

## Oversimplified match

In [94]:
strategy = (f1, pure_random)

nim = Nim(11)
logging.debug(f"status: Initial board  -> {nim}")
player = 0
while nim:
    ply = strategy[player](nim)
    nim.nimming(ply)
    logging.debug(f"status: After player {player} -> {nim}")
    player = 1 - player
winner = 1 - player
logging.info(f"status: Player {winner} won!")

[18:12:49] INFO: status: Player 0 won!
