Copyright **`(c)`** 2022 Giovanni Squillero `<squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free for personal or classroom use; see [`LICENSE.md`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  


# Lab 3: ES

## Task

Write agents able to play [*Nim*](https://en.wikipedia.org/wiki/Nim), with an arbitrary number of rows and an upper bound $k$ on the number of objects that can be removed in a turn (a.k.a., *subtraction game*).

The goal of the game is to **avoid** taking the last object.

* Task2.1: An agent using fixed rules based on *nim-sum* (i.e., an *expert system*)
* Task2.2: An agent using evolved rules using ES

## Instructions

* Create the directory `lab2` inside your personal course repository for the course 
* Put a `README.md` and your solution (all the files, code and auxiliary data if needed)

## Notes

* Working in group is not only allowed, but recommended (see: [Ubuntu](https://en.wikipedia.org/wiki/Ubuntu_philosophy) and [Cooperative Learning](https://files.eric.ed.gov/fulltext/EJ1096789.pdf)). Collaborations must be explicitly declared in the `README.md`.
* [Yanking](https://www.emacswiki.org/emacs/KillingAndYanking) from the internet is allowed, but sources must be explicitly declared in the `README.md`.



In [72]:
import logging
from pprint import pprint, pformat
from collections import namedtuple
from random import random, choice, randint, seed, gauss
from copy import deepcopy, copy
import numpy as np
import networkx as nx
import matplotlib.pyplot as plt
from functools import reduce

## The *Nim* and *Nimply* classes

In [73]:
Nimply = namedtuple("Nimply", "row, num_objects")


In [74]:
class Nim:
    def __init__(self, num_rows: int, k: int = None) -> None:
        self._rows = [i * 2 + 1 for i in range(num_rows)]
        self._k = k

    def __bool__(self):
        return sum(self._rows) > 0

    def __str__(self):
        return "<" + " ".join(str(_) for _ in self._rows) + ">"

    @property
    def rows(self) -> tuple:
        return tuple(self._rows)

    def nimming(self, ply: Nimply) -> None:
        row, num_objects = ply
        assert self._rows[row] >= num_objects
        assert self._k is None or num_objects <= self._k
        self._rows[row] -= num_objects


In [75]:
NUM_PROB = 0.3
MAX_FIT = 50000
TOURNAMENT_SIZE = 2
NUM_POPULATIONS = 12
MUTATION_PROBABILITY = .9
DIM_CHROM = 10
FITNESS_TRESHOLD = 868
GENERATIONS = 20
OPERATIONS = ["+","-","*","&","|","^"]

from random import choice, randint, shuffle

from random import choice, randint, random

#### Generates a random operation gene with some constraints.
def generate_operation(index, op_indexes: list, elements_to_add: list) -> tuple:
    op = (choice(OPERATIONS),)
    
    if elements_to_add:         #if not all piles are already in an operation, add them
        if op_indexes:  
            if random() < 0.5:
                num1 = choice(op_indexes)
                num2 = choice(elements_to_add)
                elements_to_add.remove(num2)
            else:
                num2 = choice(op_indexes)
                num1 = choice(elements_to_add)
                elements_to_add.remove(num1)
        else:
            # Handle the case when op_indexes is empty
            num1 = choice(elements_to_add)
            num2 = choice(elements_to_add)
            while num2 == num1:  # Ensure num2 is different from num1
                num2 = randint(0, index - 1)
            elements_to_add.remove(num1)
            elements_to_add.remove(num2)
    else :
        if op_indexes:  # concatenate with previous operations in chromosome
            if random() < 0.5:
                num1 = choice(op_indexes)
                num2 = randint(0,index-1)
            else:
                num2 = choice(op_indexes)
                num1 = randint(0,index-1)
        else:
            # Handle the case when op_indexes is empty
            num1 = randint(0,index-1)
            num2 = randint(0,index-1)
            while num2 == num1:  # Ensure num2 is different from num1
                num2 = randint(0, index - 1)
    op += (num1, num2)
    op_indexes.append(index)
    return op

### Randomly changes the operator of the given operation
def mutate_operation(operation) -> tuple:
    new_op = choice(OPERATIONS)
    while new_op == operation[0]:
        new_op = choice(OPERATIONS)
    new_tuple = (new_op, operation[1], operation[2])
    return new_tuple

### Checks if a position is safe
def check_rules(father_NP: bool, children: list) -> int:
    flag = 0
    if father_NP:  # No P after P             
        for i in range(len(children)):
            if children[i]:
                flag = 1
                break
    else:  # At least a P after N
        flag = 1
        for i in range(len(children)):
            if children[i]:
                flag = 0
                break
    return flag

### Determines the state of a position (N or P)
def setNP(node: tuple, chromosome) -> int:
    return chromosome.get_expression(node)
    
    
class Chromosome:
    def __init__(self, expression) -> None:
        self.expr = []
        self.expr = list(expression)
        self.fitness = 0
        self.operations_indexes = []
    
    ### Generates a chromosome with random expression
    def create_random(self, nim_rows):
        indexes = list(range(len(nim_rows)))    
        shuffle(indexes)
        self.expr.extend(indexes)

        elements_to_add = [i for i in range(len(nim_rows))]
            

        for i in range(len(nim_rows), DIM_CHROM-1):
            self.expr.append(generate_operation(i, self.operations_indexes, elements_to_add))

    ### Decodes the gene
    def recursive_decoder(self, index, position_tuple):
        if isinstance(self.expr[index], int):
            return position_tuple[self.expr[index]]
        else:
            operator = self.expr[index][0]
            if operator == "+":
                op1 = self.recursive_decoder(self.expr[index][1], position_tuple)
                op2 = self.recursive_decoder(self.expr[index][2], position_tuple)
                result = op1 + op2
            elif operator == "*":
                op1 = self.recursive_decoder(self.expr[index][1], position_tuple)
                op2 = self.recursive_decoder(self.expr[index][2], position_tuple)
                result = op1 * op2
            elif operator == "-":
                op1 = self.recursive_decoder(self.expr[index][1], position_tuple)
                op2 = self.recursive_decoder(self.expr[index][2], position_tuple)
                result = abs(op1 - op2)
            elif operator == "&":
                op1 = self.recursive_decoder(self.expr[index][1], position_tuple)
                op2 = self.recursive_decoder(self.expr[index][2], position_tuple)
                result = op1 & op2
            elif operator == "|":
                op1 = self.recursive_decoder(self.expr[index][1], position_tuple)
                op2 = self.recursive_decoder(self.expr[index][2], position_tuple)
                result = op1 | op2
            elif operator == "^":
                op1 = self.recursive_decoder(self.expr[index][1], position_tuple)
                op2 = self.recursive_decoder(self.expr[index][2], position_tuple)
                result = op1 ^ op2
            else:
                # Handle unknown operator
                raise ValueError(f"Unknown operator: {operator}")

            #print(f"Operator: {operator}, op1: {op1}, op2: {op2}, result: {result}")
            return result

    ### Returns the value of the expression for a certain board
    def get_expression(self, position_tuple) -> int:
        return self.recursive_decoder(len(self.expr) - 1, position_tuple)

    ### Print expression encoded in the gene
    def print_expression(self, index, position_tuple):
        if isinstance(self.expr[index], int):
            print(f"{position_tuple[self.expr[index]]}", end="")
            return position_tuple[self.expr[index]]
        else:
            operator = self.expr[index][0]
            print("(", end="")
            op1 = self.print_expression(self.expr[index][1], position_tuple)
            print(f" {operator} ", end="")
            op2 = self.print_expression(self.expr[index][2], position_tuple)
            print(")", end="")
            return f"({op1} {operator} {op2})"

    ### Computes fitness of the chromosome
    def fitness_function(self, state: Nim) -> int:
        def node_fitness(node: tuple, visited: dict) -> int:
            # If sub-tree fitness is already known, just return it
            if node in visited:
                return visited[node][0]

            # Terminal case, fitness 0 if P, else 1
            if all(value == 0 for value in node):
                result = setNP(node, self)
                curr_state = True if result == 0 else False
                if curr_state == True:
                    visited[node] = (0, curr_state)
                    return 0
                else:
                    visited[node] = (0, curr_state)
                    return 0

            fit = 0
            children_state = []

            # Determine state of the current node (P or N)
            result = setNP(node, self)
            curr_state = True if result == 0 else False

            # Compute or retrieve fitness for all children sub-trees
            for i, value in enumerate(node):
                for new_value in range(value, -1, -1): 
                    if new_value != value:
                        child = node[:i] + (new_value,) + node[i + 1:]
                        child_fit = node_fitness(child, visited)
                        children_state.append(visited[child][1]) 
                        fit += child_fit
            
            # Now add current fitness
            fit += check_rules(curr_state, children_state)
            # Add node to visited
            visited[node] = (fit, curr_state)

            return fit
        
        visited = {}
        self.fitness = node_fitness(state.rows, visited) 
        return self.fitness

    ### Changes a random gene in the chromosome
    def mutate(self, state: Nim) -> 'Chromosome':
        offspring = deepcopy(self)
        pos = randint(0, len(self.expr) - 1)

        if isinstance(self.expr[pos], int):
            # If it's an integer, change it to a random row index
            offspring.expr[pos] = randint(0, len(state.rows) - 1)
        else:
            # If it's an operation, change it to a new operation
            offspring.expr[pos] = mutate_operation(self.expr[pos])

        offspring.fitness = None  

        return offspring


### Exchanges genes of two chromosomes
def one_cut_xover(c1: Chromosome, c2: Chromosome) -> list:
    exchanged = []
    cut_point = randint(0, len(c1.expr) - 1)
    #print(f"cut point is {cut_point}")
    off_1 = c1.expr[:cut_point] + c2.expr[cut_point:]
    off_2 = c2.expr[:cut_point] + c1.expr[cut_point:]

    coff_1 = Chromosome(off_1)
    coff_2 = Chromosome(off_2)
    exchanged.append(coff_1)
    exchanged.append(coff_2)
    return exchanged


### Evolves dna one step
def run(dna: list, state: Nim) -> list[Chromosome]: 
    for k in range(0, len(dna), 4):  # Iterate over dna 4 by 4
        p1 = randint(0, 3)  # Select two parents
        p2 = randint(0, 3)
        while p1 == p2:
            p1 = randint(0, 3)
        off = generate_offspring(dna[k + p1], dna[k + p2], state)  # Generate one offspring
        dna.append(off)        

    sorted_dna = sorted(dna, key=lambda x: x.fitness)       # sort ascending and remove last N/4 elements
    result_dna = sorted_dna[:-int(NUM_POPULATIONS/4)] if int(NUM_POPULATIONS/4) > 0 else sorted_dna[:]
    shuffle(result_dna)

    return result_dna

### Generates a chromosome starting from two parents    
def generate_offspring(parent1 : Chromosome, parent2 : Chromosome, state: Nim) -> Chromosome:
    
    off = one_cut_xover(parent1,parent2)    #crossover
    for i in range(2):
        if(random() < MUTATION_PROBABILITY):    #offspring mutation
            off[i] = off[i].mutate(state)
    fit1 = off[0].fitness_function(state)
    fit2 = off[1].fitness_function(state)
    if fit1>=fit2:
        return off[0]    
    else:
        return off[1]
    
### Evolves dna
def evolve(dna: list, state: Nim) -> list:
    if max(dna, key=lambda x: x.fitness).fitness == 0:      #dna not yet initialized
        for ch in dna:
            ch.fitness = ch.fitness_function(state)

    for _ in range(GENERATIONS):
        dna = run(dna, state)

    dna = sorted(dna, key = lambda x: x.fitness)
    
    return dna

### Prints dna
def print_dna(dna):
    for ch in dna:
        print(f"{ch.fitness}")   
    print() 

### Creates a number of random chromosomes
def create_population(state: Nim) -> list:
    dna = []
    for _ in range(NUM_POPULATIONS):
        cromo = Chromosome([])
        cromo.create_random(state.rows)
        cromo.fitness_function(state)
        dna.append(cromo)
    return dna

### Creates a number of bad chromosomes (fitness higher than a threshold)
def dummy_population(state : Nim) -> list:
    dna = []
    i = 0
    while i<NUM_POPULATIONS:
        cromo = Chromosome([])
        cromo.create_random(state.rows)
        if cromo.fitness_function(state) > FITNESS_TRESHOLD:
            dna.append(cromo)
            i+=1
    return dna

### Prints Nim board
def print_nim_board(board):
    max_height = max(board)

    for height in range(max_height, 0, -1):
        row = ['*' if col >= height else ' ' for col in board]
        print(' '.join(row))


## Sample (and silly) startegies 

In [76]:

def pure_random(state: Nim, nothing) -> Nimply:
    """A completely random move"""
    row = choice([r for r, c in enumerate(state.rows) if c > 0])
    num_objects = randint(1, state.rows[row])
    return Nimply(row, num_objects)


In [77]:
def gabriele(state: Nim, nothing) -> Nimply:
    """Pick always the maximum possible number of the lowest row"""
    possible_moves = [(r, o) for r, c in enumerate(state.rows) for o in range(1, c + 1)]
    return Nimply(*max(possible_moves, key=lambda m: (-m[0], m[1])))


In [78]:
def adaptive(state: Nim, nothing) -> Nimply:
    """A strategy that can adapt its parameters"""
    genome = {"love_small": 0.5}


In [79]:
import numpy as np


def nim_sum(state: Nim) -> int:
    tmp = np.array([tuple(int(x) for x in f"{c:032b}") for c in state.rows])
    xor = tmp.sum(axis=0) % 2
    return int("".join(str(_) for _ in xor), base=2)

def analize(raw: Nim) -> dict:
    cooked = dict()
    cooked["possible_moves"] = dict()
    for ply in (Nimply(r, o) for r, c in enumerate(raw.rows) for o in range(1, c + 1)):
        tmp = deepcopy(raw)
        tmp.nimming(ply)
        cooked["possible_moves"][ply] = nim_sum(tmp)
    return cooked

def optimal(state: Nim, nothing) -> Nimply:
    analysis = analize(state)
    logging.debug(f"analysis:\n{pformat(analysis)}")
    spicy_moves = [ply for ply, ns in analysis["possible_moves"].items() if ns == 0]
    if not spicy_moves:
        spicy_moves = list(analysis["possible_moves"].keys())
    ply = choice(spicy_moves)
    return ply




# Evolutionary Strategy


In [80]:
def evo_nim_sum(state:Nim, ch: Chromosome ) ->int:
    return ch.get_expression(state.rows)

def evo_analize(raw: Nim, ch: Chromosome) -> dict:
    cooked = dict()
    cooked["possible_moves"] = dict()
    for ply in (Nimply(r, o) for r, c in enumerate(raw.rows) for o in range(1, c + 1)):
        tmp = deepcopy(raw)
        tmp.nimming(ply)
        cooked["possible_moves"][ply] = evo_nim_sum(tmp, ch)
    return cooked

def evolutionary(state :Nim, ch: Chromosome) -> Nimply:
    analysis = evo_analize(state, ch)
    spicy_moves =[ply for ply, ns in analysis["possible_moves"].items() if ns == 0] 
    if not spicy_moves:
        spicy_moves = list(analysis["possible_moves"].keys())
    ply = choice(spicy_moves)
    return ply


## Match Simulator

In [81]:
logging.getLogger().setLevel(logging.INFO)
args = []

##################################
# SET YOUR STRATEGIES HERE #######
#   pure_random
#   gabriele
#   adaptive
#   optimal
#   evolutionary
##################################
strategy = (evolutionary, optimal)



nim = Nim(3)
winners = []


if strategy[0] == evolutionary:
    dna1 = create_population(nim)
    dna1 = evolve(dna1, nim)
    ns1 = dna1[0]
    args.append(ns1)
    #print("player 0 using expr")
   # ns1.print_expression(len(ns1.expr)-1, nim.rows)
   # print()
    #print(f"with fitness {ns1.fitness}")

if strategy[1] == evolutionary:
    dna2 = create_population(nim)
    dna2 = evolve(dna2, nim)
    ns2 = dna2[0]
    args.append(ns2)
    #print("player 1 using expr")
    #ns2.print_expression(len(ns2.expr)-1, nim.rows)
    #print()
    #print(f"with fitness {ns2.fitness}")

logging.info(f"init : {nim}")
player = 0

while nim:
    if player == 0:
        ply = strategy[player](nim, *args)
        logging.info(f"ply: player {player} plays {ply}")
        nim.nimming(ply)
        logging.info(f"status: {nim.rows}")
        player = 1 - player
    else:
        ply = strategy[player](nim, *args)
        logging.info(f"ply: player {player} plays {ply}")
        nim.nimming(ply)
        logging.info(f"status: {nim.rows}")
        player = 1 - player
logging.info(f"status: Player {1- player} won!")






INFO:root:init : <1 3 5>
INFO:root:ply: player 0 plays Nimply(row=2, num_objects=2)
INFO:root:status: (1, 3, 3)
INFO:root:ply: player 1 plays Nimply(row=0, num_objects=1)
INFO:root:status: (0, 3, 3)
INFO:root:ply: player 0 plays Nimply(row=1, num_objects=3)
INFO:root:status: (0, 0, 3)
INFO:root:ply: player 1 plays Nimply(row=2, num_objects=3)
INFO:root:status: (0, 0, 0)
INFO:root:status: Player 1 won!
