Copyright **`(c)`** 2022 Giovanni Squillero `<squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free for personal or classroom use; see [`LICENSE.md`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  


# Lab 3: ES

## Task

Write agents able to play [*Nim*](https://en.wikipedia.org/wiki/Nim), with an arbitrary number of rows and an upper bound $k$ on the number of objects that can be removed in a turn (a.k.a., *subtraction game*).

The goal of the game is to **avoid** taking the last object.

* Task2.1: An agent using fixed rules based on *nim-sum* (i.e., an *expert system*)
* Task2.2: An agent using evolved rules using ES

## Instructions

* Create the directory `lab2` inside the course repo 
* Put a `README.md` and your solution (all the files, code and auxiliary data if needed)

## Notes

* Working in group is not only allowed, but recommended (see: [Ubuntu](https://en.wikipedia.org/wiki/Ubuntu_philosophy) and [Cooperative Learning](https://files.eric.ed.gov/fulltext/EJ1096789.pdf)). Collaborations must be explicitly declared in the `README.md`.
* [Yanking](https://www.emacswiki.org/emacs/KillingAndYanking) from the internet is allowed, but sources must be explicitly declared in the `README.md`.



In [10]:
import logging
from pprint import pprint, pformat
from collections import namedtuple
import random
from copy import deepcopy
import numpy as np


## The *Nim* and *Nimply* classes

In [11]:
Nimply = namedtuple("Nimply", "row, num_objects")


In [12]:
class Nim:
    def __init__(self, num_rows: int, k: int = None) -> None:
        self._rows = [i * 2 + 1 for i in range(num_rows)]
        self._k = k

    def __bool__(self): #checks if there are objects in any row
        return sum(self._rows) > 0

    def __str__(self): #string for current state
        return "<" + " ".join(str(_) for _ in self._rows) + ">"

    @property 
    def rows(self) -> tuple: #returns a tuple with the current number of objects in each row
        return tuple(self._rows)

    #is used to make a move in the game by specifying a row and the number of objects to remove
    def nimming(self, ply: Nimply) -> None:
        row, num_objects = ply
        assert self._rows[row] >= num_objects
        assert self._k is None or num_objects <= self._k
        self._rows[row] -= num_objects


## Sample (and silly) startegies 

In [13]:
#random move

def pure_random(state: Nim) -> Nimply:
    """A completely random move"""
    row = random.choice([r for r, c in enumerate(state.rows) if c > 0])
    num_objects = random.randint(1, state.rows[row])
    return Nimply(row, num_objects)


In [14]:

#maximum possible in the lowest row

def gabriele(state: Nim) -> Nimply:
    """Pick always the maximum possible number of the lowest row"""
    possible_moves = [(r, o) for r, c in enumerate(state.rows) for o in range(1, c + 1)]
    return Nimply(*max(possible_moves, key=lambda m: (-m[0], m[1])))


In [15]:
def adaptive(state: Nim) -> Nimply:
    """A strategy that can adapt its parameters"""
    genome = {"love_small": 0.5}
    
   # Extract the "love_small" parameter from the genome
    love_small_param = genome["love_small"]
    
   
    # Calculate some values ​​based on the current state of the game
    # Count the number of active rows, i.e. the number of rows still containing objects.
    active_rows_number = sum(o > 0 for o in state.rows)
    
    # Find the shortest row, i.e. the one with the fewest objects.
    # The lambda function is used as the key to determine the MINIMUM based on the number of objects.
    shortest_row = min((x for x in enumerate(state.rows) if x[1] > 0), key=lambda y: y[1])[0]
    
    # Find the longest line, i.e. the one with the most objects.
    # The lambda function is used as the key to determine the MAXIMUM based on the number of objects.
    longest_row = max((x for x in enumerate(state.rows)), key=lambda y: y[1])[0]
    
    #Make a decision based on the "love_small" parameter
    if love_small_param > 0.5:
        #If "love_small" is greater than 0.5, choose the smallest move on the shortest row
        ply = Nimply(active_rows_number, 1) 
    else:
        # Otherwise, choose the largest move on the longest row
        ply = Nimply(longest_row, state.rows[longest_row])
    
    return ply

#Instead of explicitly computing these values, the strategy randomly selects a non-empty row
# and make a move based on the chosen row.
# This simplifies the strategy while still incorporating the adaptive parameter.

def adaptive_alternativa(state: Nim) -> Nimply:
    genome = {"love_small": 0.5}
    
    # Estrai il parametro "love_small" dal genoma
    love_small_param = genome["love_small"]
    
    # Calcola alcuni valori basati sullo stato attuale del gioco
    active_rows = [i for i, oggetti in enumerate(state.rows) if oggetti > 0]
    
    # Prendi una decisione basata sul parametro e lo stato del gioco
    if love_small_param > 0.5:
        # Scegli la mossa più piccola su una fila non vuota a caso
        fila_scelta = random.choice(active_rows)
        ply = Nimply(fila_scelta, 1)
    else:
        # Scegli la mossa più grande su una fila non vuota a caso
        fila_scelta = random.choice(active_rows)
        ply = Nimply(fila_scelta, state.rows[fila_scelta])
    
    return ply

def adaptive_dynamic(state: Nim) -> Nimply:
    genome = {"love_small": 0.5}
    
    # Estrai il parametro "love_small" dal genoma
    love_small_param = genome["love_small"]
    
    # Calcola alcuni valori basati sullo stato attuale del gioco
    active_rows = [i for i, oggetti in enumerate(state.rows) if oggetti > 0]
    
    # Rendi i parametri adattivi più dinamici  in base alla percentuale di righe vuote rispetto al totale delle righe del gioco.
    #n modo che sia proporzionale alla quantità di spazio disponibile nel gioco. 
    # In sostanza, se ci sono più righe vuote, love_small_param sarà influenzato di più, e viceversa.
    love_small_param *= state.rows.count(0) / len(state.rows)
    
    # Prendi una decisione basata sul parametro e lo stato del gioco
    if love_small_param > 0.5:
        # Choose the smallest move on a non-empty row at random
        fila_scelta = random.choice(active_rows)
        ply = Nimply(fila_scelta, 1)
    else:
        # Choose the largest move on a non-empty row at random
        fila_scelta = random.choice(active_rows)
        ply = Nimply(fila_scelta, state.rows[fila_scelta])
    
    return ply





In [16]:
import numpy as np


def nim_sum(state: Nim) -> int:
    tmp = np.array([tuple(int(x) for x in f"{c:032b}") for c in state.rows])
    xor = tmp.sum(axis=0) % 2
    return int("".join(str(_) for _ in xor), base=2)

#analyzes the current state and returns the possible mods and the final result sum after each move
def analize(raw: Nim) -> dict:
    cooked = dict()
    cooked["possible_moves"] = dict()
    for ply in (Nimply(r, o) for r, c in enumerate(raw.rows) for o in range(1, c + 1)):
        tmp = deepcopy(raw)
        tmp.nimming(ply)
        cooked["possible_moves"][ply] = nim_sum(tmp)
    return cooked


#search for the best moves based on the possible moves
def optimal(state: Nim) -> Nimply:
    analysis = analize(state)
    logging.debug(f"analysis:\n{pformat(analysis)}")
    spicy_moves = [ply for ply, ns in analysis["possible_moves"].items() if ns != 0]
    if not spicy_moves:
        spicy_moves = list(analysis["possible_moves"].keys())
    ply = random.choice(spicy_moves)
    return ply


## Oversimplified match

In [17]:
logging.getLogger().setLevel(logging.INFO)
#switch between strategies based on those chosen by the players
strategy = (adaptive_dynamic, gabriele)

nim = Nim(7)
logging.info(f"init : {nim}")
player = 0
while nim:
    ply = strategy[player](nim)
    logging.info(f"ply: player {player} plays {ply}")
    nim.nimming(ply)
    logging.info(f"status: {nim}")
    player = 1 - player
logging.info(f"status: Player {player} won!")


INFO:root:init : <1 3 5 7 9 11 13>
INFO:root:ply: player 0 plays Nimply(row=3, num_objects=7)
INFO:root:status: <1 3 5 0 9 11 13>
INFO:root:ply: player 1 plays Nimply(row=5, num_objects=7)
INFO:root:status: <1 3 5 0 9 4 13>
INFO:root:ply: player 0 plays Nimply(row=2, num_objects=5)
INFO:root:status: <1 3 0 0 9 4 13>
INFO:root:ply: player 1 plays Nimply(row=6, num_objects=11)
INFO:root:status: <1 3 0 0 9 4 2>
INFO:root:ply: player 0 plays Nimply(row=5, num_objects=4)
INFO:root:status: <1 3 0 0 9 0 2>
INFO:root:ply: player 1 plays Nimply(row=4, num_objects=9)
INFO:root:status: <1 3 0 0 0 0 2>
INFO:root:ply: player 0 plays Nimply(row=1, num_objects=3)
INFO:root:status: <1 0 0 0 0 0 2>
INFO:root:ply: player 1 plays Nimply(row=0, num_objects=1)
INFO:root:status: <0 0 0 0 0 0 2>
INFO:root:ply: player 0 plays Nimply(row=6, num_objects=2)
INFO:root:status: <0 0 0 0 0 0 0>
INFO:root:status: Player 1 won!


In [18]:
from functools import partial
import logging
from pprint import pprint, pformat
from collections import namedtuple
import random
from copy import deepcopy, copy
from typing import Callable, List, Set, Tuple
import numpy as np

# Define a tuple to represent a move in the Nim game
NimMove = namedtuple("NimMove", "row, num_objects")

class NimGame:
    def __init__(self, num_rows: int, k: int = None) -> None:
        # Initialize the rows with odd numbers based on the number of rows
        self._rows = [i * 2 + 1 for i in range(num_rows)]
        self._k = k

    def __bool__(self):
        # Check if the sum of rows is greater than 0
        return sum(self._rows) > 0

    def __str__(self):
        # Return a string representation of the current state of the Nim game
        return "<" + " ".join(str(_) for _ in self._rows) + ">"

    @property
    def rows(self) -> tuple:
        # Return the rows as a tuple
        return tuple(self._rows)

    # Method to play a move in the Nim game
    def make_move(self, move: NimMove) -> None:
        row, num_objects = move
        assert self._rows[row] >= num_objects
        assert self._k is None or num_objects <= self._k
        self._rows[row] -= num_objects

# Constants for the genetic algorithm
POPULATION_SIZE = 1000
GENERATIONS = 50
TOURNAMENT_SIZE = 5
OFFSPRING_SIZE = 100
MUTATION_PROBABILITY = 0.25

class Individual:
    def __init__(self, s=None, m=None, f=None):
        if(s):
            self.states = s
            self.moves = m
            self.scores = f
        else:
            self.states = generate_state_list()
            self.moves = []
            self.scores = []
            self.create_moveset()
            self.init_fitness()
        self.fit = None
        self.score()

    def __str__(self):
        return f"Individual with total fitness of {self.fit}:\nstates: {self.states},\nmoves: {self.moves},\nscores: {self.scores}"
    
    def __repr__(self):
        return f"\nIndividual with total fitness of {self.fit}:\nstates: {self.states},\nmoves: {self.moves},\nscores: {self.scores}"

    def create_moveset(self):
        # Generate a set of moves based on the current state
        for j, state in enumerate(self.states):
            rows = [i for i, row in enumerate(state) if row != 0]
            row = random.choice(rows)
            num_objects = random.randint(1, state[row])
            self.moves.append(NimMove(row, num_objects))

    def init_fitness(self):
        # Initialize fitness scores based on the XOR of the current state
        for i, state in enumerate(self.states):
            current = list(state)
            current[self.moves[i].row] -= self.moves[i].num_objects
            binary_representation = np.array([tuple(int(x) for x in f"{c:04b}") for c in current])
            xor = binary_representation.sum(axis=0) % 2
            xor = int("".join(str(_) for _ in xor), base=2)
            self.scores.append(xor)
            
    def fitness(self, idx):
        # Update fitness score for a specific move
        current = list(self.states[idx])
        current[self.moves[idx].row] -= self.moves[idx].num_objects
        binary_representation = np.array([tuple(int(x) for x in f"{c:04b}") for c in current])
        xor = binary_representation.sum(axis=0) % 2
        xor = int("".join(str(_) for _ in xor), base=2)
        self.scores[idx] = xor

    def score(self):
        # Calculate the total fitness score
        self.fit = sum(self.scores)
    

Population = List[Individual]

def generate_move(state):
    # Generate a random move based on the current state
    rows = [i for i, row in enumerate(state) if row != 0]
    row = random.choice(rows)
    num_objects = random.randint(1, state[row])
    move =  NimMove(row, num_objects)
    return move

def generate_state(prev=None):
    if(prev):
        # Generate a new state based on the previous state
        state = list(prev)
        rows = [i for i, row in enumerate(state) if row != 0]
        row = random.choice(rows)
        num_objects = random.randint(1, state[row])
        state[row] -= num_objects

    else:
        # Initialize a random state if no previous state is provided
        state = [1, 3, 5, 7, 9]
        if random.random() > 0.5:
            i = random.randint(0, 4)
            state[i] = random.randint(1, i * 2 + 1)
    return tuple(state)

def generate_state_list():
    # Generate a list of states until a valid state is reached
    states = [generate_state()]
    while sum(1 for o in list(states[-1]) if o > 0) != 1:
        states.append(generate_state(states[-1]))
    return states

def generate_population() -> Population:
    # Generate an initial population of individuals
    return [Individual() for _ in range(POPULATION_SIZE)]

def select_parent(population):
    # Select a parent using tournament selection
    pool = [random.choice(population) for _ in range(TOURNAMENT_SIZE)]
    champion = min(pool, key=lambda i: i.fit)
    return champion

def mutation(ind: Individual) -> Individual:
    # Apply mutation to an individual
    offspring = copy(ind)
    idx = random.randint(0, len(offspring.states) - 1)
    offspring.moves[idx] = generate_move(offspring.states[idx])
    offspring.fitness(idx)
    return offspring

def crossover(inds: List[Individual]):
    """Fixed 2 inds"""
    p1 = inds[0]
    p2 = inds[1]
 
    s = p1.states[0:(len(p1.states)//2)] + p2.states[(len(p2.states)//2):]
    m = p1.moves[0:(len(p1.states)//2)] + p2.moves[(len(p2.states)//2):]
    f = p1.scores[0:(len(p1.states)//2)] + p2.scores[(len(p2.states)//2):]

    return Individual(s, m, f)

class NimAgent:
    def __init__(self):
        self.brain = None
    
    def training(self):
        # Train the agent using a genetic algorithm
        population = generate_population()

        for generation in range(GENERATIONS):
            offsprings = []
            for _ in range(OFFSPRING_SIZE):
                if random.random() < MUTATION_PROBABILITY:
                    p = select_parent(population)
                    o = mutation(p)
                else:
                    o = crossover([select_parent(population) for _ in range(2)])
                offsprings.append(o)
            population.extend(offsprings)
            population.sort(key=lambda i: i.fit, reverse=False)
            population = population[:POPULATION_SIZE]
        
            best = min(population, key=lambda o: o.fit)
            print()
            print(f"Generation {generation}, minimum fitness offspring of this gen: {best}")

        self.brain = population

    def play(self, nim_game: NimGame):
        # Make a move based on the trained agent's strategy
        move = None
        fitness = 0
        for neuron in self.brain:
            for i, state in enumerate(neuron.states):
                if state == nim_game:
                    if neuron.scores[i] > fitness:
                        move = neuron.moves[i]
        if move:
            return move
        else:
            return generate_move(nim_game.rows)

# Initialize the Nim agent
nim_agent = NimAgent()
nim_agent.training()

# Initialize a Nim game with 5 rows
nim_game = NimGame(5)
player_turn = random.randint(0, 1)
print(f"Initial Nim Game State: {nim_game}")
while nim_game:
    player_turn = 1 - player_turn
    if player_turn:
        pile = int(input("Choose Pile: "))
        count = int(input("Choose Count: "))
        player_move = NimMove(pile, count)
        print(f"You chose to take {player_move.num_objects} from pile {player_move.row}.")
    else:
        agent_move = nim_agent.play(nim_game)
        print(f"Player chose to take {agent_move.num_objects} from pile {agent_move.row}.")

    nim_game.make_move(player_move if player_turn else agent_move)
    print(f"Current Nim Game State: {nim_game}")

if player_turn:
    print(f"Game Over. You won!")
else:
    print(f"Game Over. Player won!")



Generation 0, minimum fitness offspring of this gen: Individual with total fitness of 8:
states: [(1, 3, 5, 7, 2), (1, 3, 1, 7, 2), (1, 1, 1, 7, 2), (1, 1, 1, 2, 2), (1, 1, 1, 2, 0), (1, 0, 1, 2, 0), (1, 0, 0, 2, 0), (1, 0, 0, 0, 0)],
moves: [NimMove(row=3, num_objects=1), NimMove(row=3, num_objects=7), NimMove(row=3, num_objects=5), NimMove(row=0, num_objects=1), NimMove(row=3, num_objects=2), NimMove(row=3, num_objects=1), NimMove(row=3, num_objects=2), NimMove(row=0, num_objects=1)],
scores: [3, 1, 1, 0, 1, 1, 1, 0]

Generation 1, minimum fitness offspring of this gen: Individual with total fitness of 8:
states: [(1, 3, 5, 7, 2), (1, 3, 1, 7, 2), (1, 1, 1, 7, 2), (1, 1, 1, 2, 2), (1, 1, 1, 2, 0), (1, 0, 1, 2, 0), (1, 0, 0, 2, 0), (1, 0, 0, 0, 0)],
moves: [NimMove(row=3, num_objects=1), NimMove(row=3, num_objects=7), NimMove(row=3, num_objects=5), NimMove(row=0, num_objects=1), NimMove(row=3, num_objects=2), NimMove(row=3, num_objects=1), NimMove(row=3, num_objects=2), NimMove(row=0