Copyright **`(c)`** 2022 Giovanni Squillero `<squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free for personal or classroom use; see [`LICENSE.md`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  


# Lab 3: Policy Search

## Task

Write agents able to play [*Nim*](https://en.wikipedia.org/wiki/Nim), with an arbitrary number of rows and an upper bound $k$ on the number of objects that can be removed in a turn (a.k.a., *subtraction game*).

The player **taking the last object wins**.

* Task3.1: An agent using fixed rules based on *nim-sum* (i.e., an *expert system*)
* Task3.2: An agent using evolved rules
* Task3.3: An agent using minmax
* Task3.4: An agent using reinforcement learning

## Instructions

* Create the directory `lab3` inside the course repo 
* Put a `README.md` and your solution (all the files, code and auxiliary data if needed)

## Notes

* Working in group is not only allowed, but recommended (see: [Ubuntu](https://en.wikipedia.org/wiki/Ubuntu_philosophy) and [Cooperative Learning](https://files.eric.ed.gov/fulltext/EJ1096789.pdf)). Collaborations must be explicitly declared in the `README.md`.
* [Yanking](https://www.emacswiki.org/emacs/KillingAndYanking) from the internet is allowed, but sources must be explicitly declared in the `README.md`.

## Deadlines ([AoE](https://en.wikipedia.org/wiki/Anywhere_on_Earth))

* Sunday, December 4th for Task3.1 and Task3.2
* Sunday, December 11th for Task3.3 and Task3.4
* Sunday, December 18th for all reviews

In [1]:
import logging
from collections import namedtuple
import random
from typing import Callable
from copy import deepcopy
from itertools import accumulate, product
from operator import xor


## The *Nim* and *Nimply* classes

In [2]:
Nimply = namedtuple("Nimply", "row, num_objects")


In [3]:
class Nim:
    def __init__(self, num_rows: int, k: int = None) -> None:
        self._rows = [i * 2 + 1 for i in range(num_rows)]
        self._k = k

    def __bool__(self):
        return sum(self._rows) > 0

    def __str__(self):
        return "<" + " ".join(str(_) for _ in self._rows) + ">"

    def assign_rows(self, rows):
        self._rows = list(rows)
        return self

    @property
    def rows(self) -> tuple:
        return tuple(self._rows)

    @property
    def k(self) -> int:
        return self._k

    def nimming(self, ply: Nimply) -> None:
        row, num_objects = ply
        assert self._rows[row] >= num_objects
        assert self._k is None or num_objects <= self._k
        self._rows[row] -= num_objects

    def game_over(self) -> bool:
        return sum(self._rows) == 0
        
def active_rows(state: Nim) -> int:
    return sum(o > 0 for o in state.rows)


In [4]:
def nim_sum(state: Nim) -> int:
    *_, result = accumulate(state.rows, xor)
    return result


def cook_status(state: Nim) -> dict:
    cooked = dict()
    cooked["possible_moves"] = [
        (r, o) for r, c in enumerate(state.rows) for o in range(1, c + 1) if state.k is None or o <= state.k
    ]
    cooked["active_rows_number"] = sum(o > 0 for o in state.rows)
    cooked["shortest_row"] = min((x for x in enumerate(state.rows) if x[1] > 0), key=lambda y: y[1])[0]
    cooked["longest_row"] = max((x for x in enumerate(state.rows)), key=lambda y: y[1])[0]
    cooked["nim_sum"] = nim_sum(state)

    brute_force = list()
    for m in cooked["possible_moves"]:
        tmp = deepcopy(state)
        tmp.nimming(m)
        brute_force.append((m, nim_sum(tmp)))
    cooked["brute_force"] = brute_force

    return cooked

In [5]:
def optimal_strategy(state: Nim) -> Nimply:
    data = cook_status(state)
    return next((bf for bf in data["brute_force"] if bf[1] == 0), random.choice(data["brute_force"]))[0]

In [6]:
def pure_random(state: Nim) -> Nimply:
    row = random.choice([r for r, c in enumerate(state.rows) if c > 0])
    num_objects = random.randint(1, state.rows[row])
    return Nimply(row, num_objects)

In [7]:
NUM_MATCHES = 100
NIM_SIZE = 4


def evaluate(strategy1: Callable, strategy2: Callable, nim_size=NIM_SIZE) -> float:
    opponent = (strategy1, strategy2)
    won = 0

    for m in range(NUM_MATCHES):
        nim = Nim(nim_size)
        player = 0
        while nim:
            ply = opponent[player](nim)
            nim.nimming(ply)
            player = 1 - player
        if player == 1:
            won += 1
    return won / NUM_MATCHES


# Task 3.1

## Fixed rules

In [226]:
def pick_maximum_from_highest_row(state: Nim) -> Nimply:
    """Pick always the maximum possible number of the highest row"""
    possible_moves = [(r, o) for r, c in enumerate(state.rows)
                      for o in range(1, c + 1)]
    return Nimply(*max(possible_moves, key=lambda m: (m[0], m[1])))

evaluate(pick_maximum_from_highest_row, pure_random)


0.63

In [232]:
def pick_minimum_from_lowest_row(state: Nim) -> Nimply:
    """Pick always the minimum possible number of the lowest row"""
    possible_moves = [(r, o) for r, c in enumerate(state.rows)
                      for o in range(1, c + 1)]
    return Nimply(*max(possible_moves, key=lambda m: (-m[0], -m[1])))

evaluate(pick_minimum_from_lowest_row, pure_random)


0.42

In [236]:
def pick_maximum_from_lowest_row(state: Nim) -> Nimply:
    """Pick always the maximum possible number of the lowest row"""
    possible_moves = [(r, o) for r, c in enumerate(state.rows)
                      for o in range(1, c + 1)]
    return Nimply(*max(possible_moves, key=lambda m: (m[0], -m[1])))

evaluate(pick_maximum_from_lowest_row, pure_random)


0.44

In [377]:
def pick_minimum_from_highest_row(state: Nim) -> Nimply:
    """Pick always the minimum possible number of the highest row"""
    possible_moves = [(r, o) for r, c in enumerate(state.rows)
                      for o in range(1, c + 1)]
    return Nimply(*max(possible_moves, key=lambda m: (-m[0], m[1])))

evaluate(pick_minimum_from_highest_row, pure_random)


0.8

In [372]:
def count_and_decide(state: Nim) -> Nimply:
    if active_rows(state) % 2 == 0:
        return pick_maximum_from_highest_row(state)
    else:
        return pick_minimum_from_lowest_row(state)


evaluate(count_and_decide, pure_random)


0.27

In [407]:
def count_and_decide(state: Nim) -> Nimply:
    if active_rows(state) % 2 != 0:
        return pick_maximum_from_highest_row(state)
    else:
        return pick_minimum_from_lowest_row(state)

evaluate(count_and_decide, pure_random)


0.85

In [273]:
def count_and_decide(state: Nim) -> Nimply:
    if active_rows(state) % 2 == 0:
        return pick_maximum_from_highest_row(state)
    else:
        return pick_maximum_from_lowest_row(state)


evaluate(count_and_decide, pure_random)


0.31

In [417]:
def count_and_decide(state: Nim) -> Nimply:
    if active_rows(state) % 2 != 0:
        return pick_maximum_from_highest_row(state)
    else:
        return pick_maximum_from_lowest_row(state)

evaluate(count_and_decide, pure_random)


0.85

In [364]:
def count_and_decide(state: Nim) -> Nimply:
    if active_rows(state) % 2 == 0:
        return pick_maximum_from_highest_row(state)
    else:
        return pick_minimum_from_highest_row(state)


evaluate(count_and_decide, pure_random)


0.53

In [422]:
def count_and_decide(state: Nim) -> Nimply:
    if active_rows(state) % 2 != 0:
        return pick_maximum_from_highest_row(state)
    else:
        return pick_minimum_from_highest_row(state)


evaluate(count_and_decide, pure_random)


0.73

In [380]:
def count_and_decide(state: Nim) -> Nimply:
    if active_rows(state) % 2 == 0:
        return pick_minimum_from_lowest_row(state)
    else:
        return pick_maximum_from_lowest_row(state)


evaluate(count_and_decide, pure_random)


0.37

In [382]:
def count_and_decide(state: Nim) -> Nimply:
    if active_rows(state) % 2 != 0:
        return pick_minimum_from_lowest_row(state)
    else:
        return pick_maximum_from_lowest_row(state)


evaluate(count_and_decide, pure_random)


0.37

In [446]:
def count_and_decide(state: Nim) -> Nimply:
    if active_rows(state) % 2 == 0:
        return pick_minimum_from_lowest_row(state)
    else:
        return pick_minimum_from_highest_row(state)


evaluate(count_and_decide, pure_random)


0.87

In [401]:
def count_and_decide(state: Nim) -> Nimply:
    if active_rows(state) % 2 != 0:
        return pick_minimum_from_lowest_row(state)
    else:
        return pick_minimum_from_highest_row(state)


evaluate(count_and_decide, pure_random)


0.3

In [470]:
def count_and_decide(state: Nim) -> Nimply:
    if active_rows(state) % 2 == 0:
        return pick_maximum_from_lowest_row(state)
    else:
        return pick_minimum_from_highest_row(state)


evaluate(count_and_decide, pure_random)


0.77

In [474]:
def count_and_decide(state: Nim) -> Nimply:
    if active_rows(state) % 2 != 0:
        return pick_maximum_from_lowest_row(state)
    else:
        return pick_minimum_from_highest_row(state)


evaluate(count_and_decide, pure_random)


0.31

In [475]:
def pick_all_elements(state: Nim) -> Nimply:
    row = random.choice([r for r, c in enumerate(state.rows) if c > 0])
    return Nimply(row, state.rows[row])


def pick_all_but_one_elements(state: Nim) -> Nimply:
    row = random.choice([r for r, c in enumerate(state.rows) if c > 0])
    return Nimply(row, state.rows[row] - 1)


In [491]:
def count_and_decide(state: Nim) -> Nimply:
    if active_rows(state) % 2 == 0:
        return pick_all_but_one_elements(state)
    else:
        return pick_all_elements(state)

print(evaluate(count_and_decide, pure_random))
print(evaluate(count_and_decide, optimal_strategy))


1.0
1.0


## Task 3.2

In [543]:
Individual = namedtuple("Individual", ["genome", "fitness"])
POPULATION_SIZE = 50
NUM_GENERATIONS = 100
OFFSPRING_SIZE = 300


In [548]:
def compute_fitness(genome, strategy):
    return evaluate(strategy(genome), optimal_strategy)


def tournament(population, tournament_size=2):
    return max(random.choices(population, k=tournament_size), key=lambda i: i.fitness)


def mutation():
    return str(random.random())


def crossover(g_1, g_2):
    g_x = (float(g_1) + float(g_2))/2
    return str(g_x)


In [549]:
def my_genetic_algorithm(population, strategy):
    for generation in range(NUM_GENERATIONS):
        offspring = list()
        for i in range(OFFSPRING_SIZE):
            if random.random() < 0.2:
                p = tournament(population)
                o = mutation()
            else:
                # promising genome 1
                p1 = tournament(population)
                # promising genome 2
                p2 = tournament(population)
                o = crossover(p1.genome, p2.genome)
            f = compute_fitness(o, strategy)
            offspring.append(Individual(o, f))

        population += offspring
        population = sorted(population, key=lambda i: i.fitness, reverse=True)[
            :POPULATION_SIZE]

        best_so_far = population[0]
        if (generation % 5 == 0):
            print(
                f"Generation #{generation}\t\tGENOME (Probability): {best_so_far.genome}\tFITNESS: {best_so_far.fitness}")


In [550]:
def evolution(evolvable_strategy):
    population = list()
    for _ in range(POPULATION_SIZE):
        genome = str(random.random())
        population.append(Individual(
            genome, compute_fitness(genome, evolvable_strategy)))

    my_genetic_algorithm(population, evolvable_strategy)


In [551]:
def make_strategy(genome: str) -> Callable:
    def evolvable(state: Nim) -> Nimply:

        if random.random() < float(genome):
            ply = pick_all_elements(state)
        else:
            ply = pick_all_but_one_elements(state)
        return ply

    return evolvable

evolution(make_strategy)


Generation #0		GENOME (Probability): 0.3916160795171203	FITNESS: 0.33
Generation #5		GENOME (Probability): 0.40491509329877645	FITNESS: 0.38
Generation #10		GENOME (Probability): 0.5178968736576	FITNESS: 0.4
Generation #15		GENOME (Probability): 0.5178968736576	FITNESS: 0.4
Generation #20		GENOME (Probability): 0.5178968736576	FITNESS: 0.4
Generation #25		GENOME (Probability): 0.4614059834781882	FITNESS: 0.41
Generation #30		GENOME (Probability): 0.4614059834781882	FITNESS: 0.41
Generation #35		GENOME (Probability): 0.4614059834781882	FITNESS: 0.41
Generation #40		GENOME (Probability): 0.4614059834781882	FITNESS: 0.41
Generation #45		GENOME (Probability): 0.4614059834781882	FITNESS: 0.41
Generation #50		GENOME (Probability): 0.4614059834781882	FITNESS: 0.41
Generation #55		GENOME (Probability): 0.4614059834781882	FITNESS: 0.41
Generation #60		GENOME (Probability): 0.4614059834781882	FITNESS: 0.41
Generation #65		GENOME (Probability): 0.4614059834781882	FITNESS: 0.41
Generation #70		GEN

KeyboardInterrupt: 

# Task 3.3
MiniMax agent

In [580]:
import math

def minmax(state: Nim, maximizing_player: bool, alpha = -1, beta = 1):
    if not state:
        return -1 if maximizing_player else 1
    
    data = cook_status(state)
    possible_next_states = []

    for ply in data['possible_moves']:
        tmp_state = deepcopy(state)
        tmp_state.nimming(ply)
        possible_next_states.append(tmp_state)
    
    if maximizing_player:
        best_val = -math.inf

        for next_state in possible_next_states:
            val = minmax(next_state, not maximizing_player, alpha, beta)
            best_val = max(best_val, val)
            alpha = max(alpha, best_val)

            if beta <= alpha:
                break
        return best_val
    else:
        best_val = math.inf
        next_state = deepcopy(state)
        ply = optimal_strategy(next_state)
        next_state.nimming(ply)
        
        val = minmax(next_state, not maximizing_player, alpha, beta)
        best_val = min(best_val, val)
        beta = min(beta, best_val)

        return best_val

def minmax_strategy(state: Nim) -> Nimply:
    data = cook_status(state)

    for ply in data['possible_moves']:
        tmp_state = deepcopy(state)
        tmp_state.nimming(ply)

        score = minmax(tmp_state, maximizing_player = True)
        if score > 0:
            break
    return ply


In [581]:
evaluate(minmax_strategy, pure_random, 3)

0.33

In [582]:
evaluate(minmax_strategy, pure_random, 4)

0.34

In [583]:
evaluate(minmax_strategy, optimal_strategy, 3)

0.0

In [584]:
evaluate(minmax_strategy, optimal_strategy, 5)

KeyboardInterrupt: 

# Task 3.4
RL Agent

In [308]:
import numpy as np

class QL_Agent():
    q = {}
    previous_state = previous_action = None
    WIN_REWARD, LOSS_REWARD = 1, -1

    def __init__(self, state, k = None, epsilon = 1, learning_rate = 1, discount_factor = 1) -> None:
        #q is a function f: State x Action -> R and is internally represented as a Map.

        #alpha is the learning rate and determines to what extent the newly acquired 
        #information will override the old information

        #gamma is the discount rate and determines the importance of future rewards

        #epsilon serves as the exploration rate and determines the probability 
        #that the agent, in the learning process, will randomly select an action

        self.epsilon = epsilon                      # epsilon   -> the higher epsilon,  the more random I act
        self.learning_rate = learning_rate          # alpha     -> the higher alpha,    the more I replace "q"
        self.discount_factor = discount_factor      # gamma     -> the higher gamma,    the more I favor long-term reward
        # as I get closer and closer to the deadline, my preference for near-term reward should increase, 
        # which means my gamma should decrease.

    def makeKey(self, state):
        possActions = list(self.getActions(state))
        someAction = possActions[0]

        # generating Q Table
        if (tuple(state), someAction) not in self.q:
            for i in possActions:
                self.q[(tuple(state), i)] = np.random.uniform(0.0, 0.01)

    def is_terminal(self, state):
        '''returns True if the state is terminal'''
        return sum(state) == 0

    def getActions(self, state):
        '''returns a list of possible actions for a given state'''
        if self.is_terminal(state):
            return [None]

        possible_actions = []
        for row, num_objects in enumerate(state):
            for remaining in range(num_objects):
                possible_actions.append((row, num_objects - remaining))
        return possible_actions


    def policy(self, state):
        '''Policy
        This function takes a state and chooses the action for that state that will lead to the maximum reward'''
        possActions = list(self.getActions(state))

        if np.random.random() < self.epsilon:
            # Highest reward -> Low exploration rate
            q_values = [self.q[(tuple(state),i)] for i in possActions]
            return possActions[np.argmax(q_values)]
        else:
            # Random -> High exploration rate
            chosen_action_idx = np.random.randint(0, len(possActions))
            return possActions[chosen_action_idx]

    # Updates the Q-table as specified by the standard Q-learning algorithm
    def update_q(self, state):
        if self.is_terminal(state):
            self.q[(tuple(self.previous_state), self.previous_action)] += \
                self.learning_rate * (self.LOSS_REWARD - self.q[(tuple(self.previous_state), self.previous_action)])
            
            current_action = self.previous_state = self.previous_action = None
        else:
            self.makeKey(state)
            current_action = self.policy(state)

            if self.previous_action is not None:
                next_state = state.copy()
                next_state[current_action[0]] -= current_action[1]

                reward = self.WIN_REWARD if self.is_terminal(next_state) else 0
                maxQ = max(self.q[(tuple(state), a)] for a in self.getActions(state))
                self.q[(tuple(self.previous_state), self.previous_action)] += \
                    self.learning_rate * (reward + self.discount_factor * maxQ - \
                        self.q[(tuple(self.previous_state), self.previous_action)])

            self.previous_state, self.previous_action = tuple(state),current_action
        return current_action

In [309]:
def Q_play(opponent_strategy: Callable, nim_dim = 4):
    losses = 0
    wins = 0
    nGames = 10000

    for i in np.arange(nGames):
        currState = Nim(nim_dim)                        # Reset game
        agent = QL_Agent(currState)                     # Reset Agent 

        while True:
            # Opponent plays
            opponent_play = opponent_strategy(currState)
            currState.nimming(opponent_play) 

            action_p1 = agent.update_q(currState._rows) 

            if(action_p1 is not None):
                currState.nimming(Nimply(action_p1[0], action_p1[1]))

            if action_p1 is None:
                # Player can't do any actions -> LOSS
                losses += 1
                break
            elif currState.game_over():
                # Player reached gameover state -> WIN
                wins += 1
                break  

    print(f"Games: {nGames} Wins: {wins} Losses: {losses} => winrate: {wins/(wins+losses)}")

In [310]:
Q_play(pure_random)

Games: 10000 Wins: 9465 Losses: 535 => winrate: 0.9465


In [311]:
Q_play(optimal_strategy)

Games: 10000 Wins: 9317 Losses: 683 => winrate: 0.9317
