Copyright **`(c)`** 2022 Giovanni Squillero `<squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free for personal or classroom use; see [`LICENSE.md`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  


# Lab 3: Policy Search

## Task

Write agents able to play [*Nim*](https://en.wikipedia.org/wiki/Nim), with an arbitrary number of rows and an upper bound $k$ on the number of objects that can be removed in a turn (a.k.a., *subtraction game*).

The player **taking the last object wins**.

* Task3.1: An agent using fixed rules based on *nim-sum* (i.e., an *expert system*)
* Task3.2: An agent using evolved rules
* Task3.3: An agent using minmax
* Task3.4: An agent using reinforcement learning

## Instructions

* Create the directory `lab3` inside the course repo 
* Put a `README.md` and your solution (all the files, code and auxiliary data if needed)

## Notes

* Working in group is not only allowed, but recommended (see: [Ubuntu](https://en.wikipedia.org/wiki/Ubuntu_philosophy) and [Cooperative Learning](https://files.eric.ed.gov/fulltext/EJ1096789.pdf)). Collaborations must be explicitly declared in the `README.md`.
* [Yanking](https://www.emacswiki.org/emacs/KillingAndYanking) from the internet is allowed, but sources must be explicitly declared in the `README.md`.

## Deadlines ([AoE](https://en.wikipedia.org/wiki/Anywhere_on_Earth))

* Sunday, December 4th for Task3.1 and Task3.2
* Sunday, December 11th for Task3.3 and Task3.4
* Sunday, December 18th for all reviews

In [72]:
import logging
from collections import namedtuple
from functools import *
import random
from typing import Callable
from copy import deepcopy
from itertools import *
from operator import *
import math
import matplotlib.pyplot as plt

logging.basicConfig(format='%(message)s', level=logging.DEBUG)

## The *Nim* and *Nimply* classes

In [73]:
Nimply = namedtuple("Nimply", "row, num_objects")

In [74]:
class Nim:
    def __init__(self, num_rows: int, k: int = None) -> None:
        self._rows = [i * 2 + 1 for i in range(num_rows)]
        self._k = k

    def __bool__(self):
        return sum(self._rows) > 0

    def __str__(self):
        return "<" + " ".join(str(_) for _ in self._rows) + ">"

    def __hash__(self):
        return hash(tuple(self._rows))

    def __eq__(self, other):
        return self.__hash__() == hash(other)

    @property
    def rows(self) -> tuple:
        return tuple(self._rows)

    @property
    def k(self) -> int:
        return self._k

    def nimming(self, ply: Nimply) -> None:
        row, num_objects = ply
        assert self._rows[row] >= num_objects
        assert self._k is None or num_objects <= self._k
        self._rows[row] -= num_objects

In [75]:
def evaluate(strategyA: Callable, strategyB: Callable, num_matches = 1, nim_size = 3, k = None) -> float:
    players = (strategyA, strategyB)
    won = 0

    for _ in range(num_matches):
        nim = Nim(nim_size, k)
        player = 1
        while nim:
            ply = players[player](nim)
            nim.nimming(ply)
            player = 1 - player
        if player == 1:
            won += 1
    return won / num_matches

## Optimal strategy

In [76]:
def nim_sum(state: Nim) -> int:
    *_, result = accumulate(state.rows, xor)
    return result


def cook_status(state: Nim) -> dict:
    cooked = dict()
    cooked["possible_moves"] = [
        (r, o) for r, c in enumerate(state.rows) for o in range(1, c + 1) if state.k is None or o <= state.k
    ]
    cooked["active_rows_number"] = sum(o > 0 for o in state.rows)
    cooked["shortest_row"] = min((x for x in enumerate(state.rows) if x[1] > 0), key=lambda y: y[1])[0]
    cooked["longest_row"] = max((x for x in enumerate(state.rows)), key=lambda y: y[1])[0]
    cooked["nim_sum"] = nim_sum(state)

    brute_force = list()
    for m in cooked["possible_moves"]:
        tmp = deepcopy(state)
        tmp.nimming(m)
        brute_force.append((m, nim_sum(tmp)))
    cooked["brute_force"] = brute_force

    return cooked

In [77]:
def optimal_strategy(state: Nim) -> Nimply:
    data = cook_status(state)
    return next((bf for bf in data["brute_force"] if bf[1] == 0), random.choice(data["brute_force"]))[0]

## Random strategy

In [78]:
# Choose a random non empty row and remove a random number of objects smaller than min(k, row_objects)

def random_strategy(state: Nim):
    r = random.choice([idx for idx, r in enumerate(state.rows) if r > 0])
    num_objects = random.randint(1, min(state.rows[r], state.k) if state.k != None else state.rows[r])

    return (r, num_objects)

## Task 3.1: Fixed-Rule Strategy

In [79]:
# Among all possible moves, simply do this:
# - if there is a winning move, choose it
# - if there is not a winning move but the move puts the opponent in a winning situation, discard it
# - otherwise choose the first move possible, even if not optimal (obliged to do a move)

def fixed_strategy(state: Nim):
    possible_moves = ((r, o) for r, c in enumerate(state.rows) for o in range(1, c + 1) if state.k is None or o <= state.k)
    move = None
    firstMove = None

    for m in possible_moves:
        if firstMove == None:
            firstMove = Nimply(m[0], m[1])
        tmp = deepcopy(state)
        tmp.nimming(m)
        if not tmp:
            return Nimply(m[0], m[1])
        else:
            active_rows = len([r for r in state.rows if r > 0])
            eliminable_rows = len([r for r in state.rows if r > 0 and (state.k == None or r < state.k)])
            if active_rows == eliminable_rows and eliminable_rows == 1:
                continue
            elif move == None:
                move = Nimply(m[0], m[1])

    return move if move != None else firstMove


## Task 3.2: Evolved Strategy

In [80]:
from functools import *
from statistics import *

GENOME_LENGTH = 11 # NB: GENOME_LENGTH must be of the form 4*n + 3 with n > 0, if n > 1 it may converge to the xor solution
GAME_PARS = list((a, b) for a, b in product(range(2, 10, 3), list(range(1, 10, 3)) + [None]) if b == None or b < a)
GAMES = len(GAME_PARS)

def decode_genome(genome):
    assert (GENOME_LENGTH - 3) % 4 == 0 and GENOME_LENGTH > 0, 'GENOME_LENGTH must be of the form 4*n + 3 with n > 0'
    out = ""

    for op_start in range(0, GENOME_LENGTH, 4):
        tmpA = 'a' if genome[op_start] < 0.5 else '!a'
        tmpB = 'b' if genome[op_start + 2] < 0.5 else '!b'
        internal_op = '&' if genome[op_start + 1] < 0.5 else '|'
        op = ('&' if genome[op_start + 3] < 0.5 else '|') if op_start + 3 < GENOME_LENGTH else ''
        out += f'({tmpA} {internal_op} {tmpB}) {op} '

    return out[:-2]

def evolvable_strategy(genome):
    assert (GENOME_LENGTH - 3) % 4 == 0 and GENOME_LENGTH > 0, 'GENOME_LENGTH must be of the form 4*n + 3 with n > 0'

    def genetic_op(a, b):
        result = 0
        op = lambda _, b: b

        for op_start in range(0, GENOME_LENGTH, 4):
            tmpA = a if genome[op_start] < 0.5 else ~a
            tmpB = b if genome[op_start + 2] < 0.5 else ~b
            internal_op = and_ if genome[op_start + 1] < 0.5 else or_
            result = op(result, internal_op(tmpA, tmpB))
            op = (and_ if genome[op_start + 3] < 0.5 else or_) if op_start + 3 < GENOME_LENGTH else None

        return result

    def strategy(state: Nim):
            possible_moves = [(r, o) for r, c in enumerate(state.rows) for o in range(1, c + 1) if state.k is None or o <= state.k]
            best = None

            for m in possible_moves:
                tmp = deepcopy(state)
                tmp.nimming(m)
                
                val = reduce(genetic_op, tmp.rows)

                if best == None or best[1] > val:
                    best = (m, val)
                             
            return best[0]
    
    return strategy

def mutation(genome):
    point = random.randint(0, len(genome) - 1)
    return genome[:point] + [1 - genome[point]] + genome[point + 1:]

def crossover(genomeA, genomeB):
    p = random.random()
    return [x if p < 0.5 else y for x, y in zip(genomeA, genomeB)]

def tournament(population, tournament_size):
    return max(random.choices(population, k=tournament_size), key=lambda i: i.fitness)

def fitness(genome):
    win_optimal = 0.0
    win_random = 0.0

    for nim_size, k in GAME_PARS:
        home = evaluate(evolvable_strategy(genome), random_strategy, nim_size=nim_size, k=k) 
        away = 1 - evaluate(random_strategy, evolvable_strategy(genome), nim_size=nim_size, k=k)
        win_random += home + away
        home = evaluate(evolvable_strategy(genome), optimal_strategy, nim_size=nim_size, k=k) 
        away = 1 - evaluate(optimal_strategy, evolvable_strategy(genome), nim_size=nim_size, k=k)
        win_optimal += home + away

    return (win_optimal/(2*GAMES), win_random/(2*GAMES))
        
def genetic_algorithm():
    Individual = namedtuple('Individual', ('genome', 'fitness'))

    NUM_GENS = 100    
    POPULATION_SIZE = 10
    OFFSPRING_SIZE = 20
    TOURNAMENT_SIZE = 2
    USELESS_GENS = 0
    STEADY_STATE_LIMIT = 5

    population = [Individual(i, fitness(i)) for i in ([round(random.random(), 2) for _ in range(GENOME_LENGTH)] for _ in range(POPULATION_SIZE))]
    best = None
    
    for g in range(NUM_GENS):
        offspring = list()
        for i in range(OFFSPRING_SIZE):
            if random.random() < 0.3:
                p = tournament(population, tournament_size=TOURNAMENT_SIZE)
                o = mutation(p.genome)
            else:
                p1 = tournament(population, tournament_size=TOURNAMENT_SIZE)
                p2 = tournament(population, tournament_size=TOURNAMENT_SIZE)
                o = crossover(p1.genome, p2.genome)
            f = fitness(o)
            offspring.append(Individual(o, f))
        population += offspring
        population = sorted(population, key=lambda i: i.fitness, reverse=True)[:POPULATION_SIZE]
        newBest = max(population, key=lambda i: i.fitness)

        if best != None and newBest <= best:
            logging.info(f'Gen {g+1} skipped because useless')
            USELESS_GENS += 1
        else:
            logging.info(f'Gen {g+1}, found new best individual: {decode_genome(newBest.genome)} with fitness = {newBest.fitness}')
            best = newBest
            USELESS_GENS = 0
        
        if USELESS_GENS == STEADY_STATE_LIMIT:
            logging.info(f'Gen {g+1}, no improvements after {USELESS_GENS} gens, terminating...')
            break

    logging.info(f'Best individual: {decode_genome(best.genome)} with genome {best.genome} fitness = {best.fitness}')

    return evolvable_strategy(best.genome)

## Task 3.3: Min-Max Strategy

In [81]:
def minmax_strategy_no_pruning(state: Nim):
    def minmax(state: Nim, current_player, level = 0):
        possible_moves = ((r, o) for r, c in enumerate(state.rows) for o in range(1, c + 1) if state.k is None or o <= state.k)
        current_func = min if current_player == 1 else max
        tabs = '\t' * level
        if not state:
            #logging.debug(f'{tabs} Level: {level}, Winner: {1 if level % 2 == 1 else -1}')
            return None, current_player

        evaluations = list()
        for m in possible_moves:
            tmp = deepcopy(state)
            tmp.nimming(m)
            #logging.debug(f"{tabs} Level: {level}, Move: {m}, State: {tmp}, Player: {-current_player}")
            _, val = minmax(tmp, -current_player, level + 1)
            evaluations.append((m, val))
            
        #logging.debug(f"{tabs} Level: {level}, Evals: {evaluations}, Player: {-current_player}")
        return current_func(evaluations, key=lambda k: k[1])
      

    move, _ = minmax(state, 1)

    return move

In [241]:
def minmax(max_depth=math.inf, print_stats = False):
    max_depth_reached = 0
    cache = {}
    hits = 0
    misses = 0

    def minmax_with_pruning(state: Nim, alpha, beta, current_player, depth=0):
        nonlocal max_depth_reached, cache, hits, misses
        max_depth_reached = max(max_depth_reached, depth)
                
        if not state or depth >= max_depth:
            return current_player, None # i.e. the loser
        
        possible_moves = ((r, o) for r, c in enumerate(state.rows) for o in range(1, c + 1) if state.k is None or o <= state.k)
        value = current_player * math.inf, None

        if current_player == 1: # my turn, minimize
            for m in possible_moves:
                tmp = deepcopy(state)
                tmp.nimming(m)
                
                if (tmp, -current_player) not in cache:
                    val, _ = minmax_with_pruning(tmp, alpha, beta, -current_player, depth + 1)
                    cache.update({(tmp, -current_player): (val, None)}) 
                    misses += 1
                else:
                    val, _ = cache[(tmp, -current_player)]
                    hits += 1

                #logging.info(f'Move: {(val, m)} on {state} by player {current_player} at depth {depth}')
                
                value = min(value, (val, m))

                if value <= alpha:
                    break

                beta = min(beta, value)   
            return value
        else: # its turn, maximize
            for m in possible_moves:
                tmp = deepcopy(state)
                tmp.nimming(m)
                
                if (tmp, -current_player) not in cache:
                    val, _ = minmax_with_pruning(tmp, alpha, beta, -current_player, depth + 1)
                    cache.update({(tmp, -current_player): (val, None)}) 
                    misses += 1
                else:
                    val, _ = cache[(tmp, -current_player)]
                    hits += 1
                
                value = max(value, (val, m))
                
                if value >= beta:
                    break

                alpha = max(alpha, value)    
            return value

    def minmax_strategy_with_pruning(state: Nim):
        _, move = minmax_with_pruning(state, (-math.inf, None), (math.inf, None), 1)
        nonlocal max_depth_reached, hits, misses
        
        if print_stats:        
            logging.debug(f'Max depth reached: {max_depth_reached}, Cache hit ration: {round(hits/(hits+misses), 3)*100}% ({hits}/{hits+misses})')

        return move

    return minmax_strategy_with_pruning

## Task 3.4: Reinforcement Learning Strategy

In [209]:
def reinforcement_learning(initial_state: Nim, training_epochs = 100, randomness = 0.3, learning_rate = 0.15, max_depth = math.inf, plot_stats = False):
    def rl_strategy(state: Nim):
        return choose_next_move(state, 0)

    def choose_next_move(state: Nim, randomness):
        allowed_moves = ((r, o) for r, c in enumerate(state.rows) for o in range(1, c + 1) if state.k is None or o <= state.k)
        maxQ = -10e15
        next_move = None
        randomN = random.random()
        if randomN < randomness:
            next_move = random.choice(list(allowed_moves))
            new_state = deepcopy(state)
            new_state.nimming(next_move)
            Q.update({ new_state: random.random() }) if new_state not in Q else None
        else:
            for move in allowed_moves:
                new_state = deepcopy(state)
                new_state.nimming(move)
                Q.update({ new_state: random.random() }) if new_state not in Q else None

                if Q[new_state] >= maxQ:
                    next_move = move
                    maxQ = Q[new_state]

        return next_move

    def eval_state(state: Nim):
        return 0 if state else 1

    def learn(states_history, randomness, learning_rate):
        target = 0

        for prev, reward in reversed(states_history):
            Q[prev] += learning_rate * (target - Q[prev])
            target += reward

        states_history = []
        randomness -= 10e-5  
    
    current_state = deepcopy(initial_state)
    Q = {}
    states_history = []
    moveHistory = []
    indices = []
    depth = 0    

    for i in range(training_epochs):
        while current_state:
            move = choose_next_move(deepcopy(current_state), randomness)
            current_state.nimming(move)  
            reward = eval_state(deepcopy(current_state))
            depth += 1
            states_history.append((deepcopy(current_state), reward))
            if depth >= max_depth:
                break
        learn(states_history, randomness, learning_rate)  
        if plot_stats and i % 50 == 0:
            moveHistory.append(depth)
            indices.append(i)
        current_state = deepcopy(initial_state)
        depth = 0

    if plot_stats:
        logging.getLogger().setLevel(logging.ERROR)
        plt.semilogy(indices, moveHistory, "b")
        logging.getLogger().setLevel(logging.DEBUG)

    return rl_strategy

## Oversimplified match

In [242]:
nim = Nim(6, None)
logging.debug(f"status: Initial board  -> {nim}")
player = 0
strategy = (minmax(), optimal_strategy)

while nim:
    ply = strategy[player](nim) 
    nim.nimming(ply)
    logging.debug(f"status: After player {player} -> {nim}")
    player = 1 - player
winner = 1 - player
logging.info(f"status: Player {winner} won!")


status: Initial board  -> <1 3 5 7 9 11>
status: After player 0 -> <0 3 5 7 9 11>
status: After player 1 -> <0 0 5 7 9 11>
status: After player 0 -> <0 0 4 7 9 11>
status: After player 1 -> <0 0 4 6 9 11>
status: After player 0 -> <0 0 3 6 9 11>
status: After player 1 -> <0 0 3 1 9 11>
status: After player 0 -> <0 0 3 1 8 11>
status: After player 1 -> <0 0 2 1 8 11>
status: After player 0 -> <0 0 2 0 8 11>
status: After player 1 -> <0 0 2 0 8 10>
status: After player 0 -> <0 0 2 0 7 10>
status: After player 1 -> <0 0 2 0 7 5>
status: After player 0 -> <0 0 1 0 7 5>
status: After player 1 -> <0 0 1 0 4 5>
status: After player 0 -> <0 0 0 0 4 5>
status: After player 1 -> <0 0 0 0 4 4>
status: After player 0 -> <0 0 0 0 3 4>
status: After player 1 -> <0 0 0 0 3 3>
status: After player 0 -> <0 0 0 0 2 3>
status: After player 1 -> <0 0 0 0 2 2>
status: After player 0 -> <0 0 0 0 1 2>
status: After player 1 -> <0 0 0 0 1 1>
status: After player 0 -> <0 0 0 0 0 1>
status: After player 1 -> <0

## Benchmarks

In [215]:
games = list((a, b) for a, b in product(range(2, 10), list(range(1, 10)) + [None]) if b == None or b < a)
opponents = [('Optimal', optimal_strategy), ('Random', random_strategy)]
strategies = [('MinMax', minmax(5))] # [('Fixed', fixed_strategy), ('Evolved', evolvable_strategy([0, 0, 1, 0, 1, 1, 0, 1, 0, 1, 0])), ('MinMax', minmax(5)), ('RL', reinforcement_learning)]
scores = {a[0]: [0.0 for _ in strategies] for a in opponents}

for idx, pars in enumerate(games):
    nim_size, k = pars
    
    logging.debug(f'Game {idx}: Nim({nim_size}, {k})')
    
    for idx, s in enumerate(strategies):
        strategy_name, strategy = s
        strategy = strategy if strategy_name !='RL'else strategy(Nim(nim_size, k), max_depth=20) # Necessary to train on new type of nim
        for opponent_name, opponent in opponents:
            strategy = strategy if strategy_name !='MinMax'else minmax(5) # Necessary to reset cache
            scores[opponent_name][idx] += evaluate(strategy, opponent, nim_size=nim_size, k=k) 
            strategy = strategy if strategy_name !='MinMax'else minmax(5) # Necessary to reset cache
            scores[opponent_name][idx] += 1 - evaluate(opponent, strategy, nim_size=nim_size, k=k) 

for idx, strategy in enumerate(strategies):
    for opponent_name, _ in opponents:
        logging.info(f'{strategy[0]} strategy win rate against {opponent_name} strategy was {round(scores[opponent_name][idx] * 100 / (2 * len(games)), 2)} % ({scores[opponent_name][idx]}/{2 * len(games)})')

Game 0: Nim(2, 1)
Game 1: Nim(2, None)
Game 2: Nim(3, 1)
Game 3: Nim(3, 2)
Game 4: Nim(3, None)
Game 5: Nim(4, 1)
Game 6: Nim(4, 2)
Game 7: Nim(4, 3)
Game 8: Nim(4, None)
Game 9: Nim(5, 1)
Game 10: Nim(5, 2)
Game 11: Nim(5, 3)
Game 12: Nim(5, 4)
Game 13: Nim(5, None)
Game 14: Nim(6, 1)
Game 15: Nim(6, 2)
Game 16: Nim(6, 3)
Game 17: Nim(6, 4)
Game 18: Nim(6, 5)
Game 19: Nim(6, None)
Game 20: Nim(7, 1)
Game 21: Nim(7, 2)
Game 22: Nim(7, 3)
Game 23: Nim(7, 4)
Game 24: Nim(7, 5)
Game 25: Nim(7, 6)
Game 26: Nim(7, None)
Game 27: Nim(8, 1)
Game 28: Nim(8, 2)
Game 29: Nim(8, 3)
Game 30: Nim(8, 4)
Game 31: Nim(8, 5)
Game 32: Nim(8, 6)
Game 33: Nim(8, 7)
Game 34: Nim(8, None)
Game 35: Nim(9, 1)


KeyboardInterrupt: 