Copyright (c) 2023 Matteo Pietro Pillitteri  
<s314404@studenti.polito.it>
<br>
https://github.com/Matteo-Pietro-Pillitteri/Computational-Intelligence

# Lab 2: ES

## Task

Write agents able to play [*Nim*](https://en.wikipedia.org/wiki/Nim), with an arbitrary number of rows and an upper bound $k$ on the number of objects that can be removed in a turn (a.k.a., *subtraction game*).

The goal of the game is to **avoid** taking the last object.

* Task2.1: An agent using fixed rules based on *nim-sum* (i.e., an *expert system*)
* Task2.2: An agent using evolved rules using ES

## Instructions

* Create the directory `lab2` inside the course repo 
* Put a `README.md` and your solution (all the files, code and auxiliary data if needed)

## Notes

* Working in group is not only allowed, but recommended (see: [Ubuntu](https://en.wikipedia.org/wiki/Ubuntu_philosophy) and [Cooperative Learning](https://files.eric.ed.gov/fulltext/EJ1096789.pdf)). Collaborations must be explicitly declared in the `README.md`.
* [Yanking](https://www.emacswiki.org/emacs/KillingAndYanking) from the internet is allowed, but sources must be explicitly declared in the `README.md`.



In [None]:
import logging
from pprint import pprint, pformat
from collections import namedtuple
import random
import math
from copy import deepcopy
import numpy as np
import matplotlib.pyplot as plt 

## The *Nim* and *Nimply* classes

In [None]:
NUM_EPOCHS = 100 #to test it 
NUM_GAMES = 60 #to test it  
GAME_ROWS = 5

In [None]:
Nimply = namedtuple("Nimply", "row, num_objects")

In [None]:
class Nim:
    def __init__(self, num_rows: int, k: int = None) -> None:
        self._rows = [i * 2 + 1 for i in range(num_rows)]
        self._k = k

    def __bool__(self):
        return sum(self._rows) > 0

    def __str__(self):
        return "<" + " ".join(str(_) for _ in self._rows) + ">"

    @property
    def rows(self) -> tuple:
        return tuple(self._rows)

    def nimming(self, ply: Nimply) -> None:
        row, num_objects = ply
        assert self._rows[row] >= num_objects
        assert self._k is None or num_objects <= self._k
        self._rows[row] -= num_objects

## Sample (and silly) startegies 

In [None]:
def pure_random(state: Nim) -> Nimply:
    """A completely random move"""
    row = random.choice(
        [r for r, c in enumerate(state.rows) if c > 0]
    )
    num_objects = random.randint(1, state.rows[row])
    return Nimply(row, num_objects)

In [None]:
def gabriele(state: Nim) -> Nimply:
    """Pick always the maximum possible number of the lowest row"""
    possible_moves = [
        (r, o)
        for r, c in enumerate(state.rows)
        for o in range(1, c + 1)
    ]
    return Nimply(*max(possible_moves, key=lambda m: (-m[0], m[1])))

In [None]:
import numpy as np


def nim_sum(state: Nim) -> int:
    tmp = np.array([tuple(int(x) for x in f"{c:032b}") for c in state.rows])
    xor = tmp.sum(axis=0) % 2
    return int("".join(str(_) for _ in xor), base=2)


def analize(raw: Nim) -> dict:
    cooked = dict()
    cooked["possible_moves"] = dict()
    for ply in (
        Nimply(r, o)
        for r, c in enumerate(raw.rows)
        for o in range(1, c + 1)
    ):
        tmp = deepcopy(raw)
        tmp.nimming(ply)
        cooked["possible_moves"][ply] = nim_sum(tmp)
    return cooked


def optimal(state: Nim) -> Nimply:
    analysis = analize(state)
    logging.debug(f"analysis:\n{pformat(analysis)}")
    spicy_moves = [
        ply
        for ply, ns in analysis["possible_moves"].items()
        if ns != 0
    ]
    if not spicy_moves:
        spicy_moves = list(analysis["possible_moves"].keys())
    ply = random.choice(spicy_moves)
    return ply

## Configuration of the ES
* Set of rules: each rule corresponds to an action and has a weight
* The initial weights are the same for all the rules. With the training, rules with higher performance will be heavier.
   

In [None]:
# check if the  number of row containg elements is equal to the initial configuration
def check_all_rows(state: Nim):
    if len([x for x in state.rows if x != 0]) == GAME_ROWS:
        return True
    return False


# check if the number of the rows (with at least 1 element) is odd
def check_num_of_rows_with_element_is_odd(state: Nim):
    if len([x for x in state.rows if x != 0]) % 2 != 0:
        return True

    return False


# check if the number of the rows (with at least 1 element) is even
def check_num_of_rows_with_element_is_even(state: Nim):
    if len([x for x in state.rows if x != 0]) % 2 == 0:
        return True

    return False


# check if there is a row that has only one elements
def check_row_with_one_element(state: Nim):
    if 1 in state.rows:
        return True

    return False


# check if there is a line with an higher number of elements
def check_row_with_many_elements(state: Nim):
    for index, num_el in enumerate(state.rows):
        max_num_elements = float(index * 2 + 1)
        if float(num_el) >= max_num_elements * 0.90:
            return True

    return False


#chek if there are only two row available
def check_only_two_row_available(state: Nim):
    zeros = state.rows.count(0)
    if zeros == GAME_ROWS - 2:
        return True
    return False 

# check if there is only one row available. I need this repetition for having a second key that follow the same rule
def check_only_one_row_available2(state: Nim): 
    zeros = state.rows.count(0)
    if zeros == GAME_ROWS - 1:
        return True
    return False

def check_spicy_move_for_notoptimal(state: Nim):
    analysis = analize(state)
    spicy_moves = [
        ply
        for ply, ns in analysis["possible_moves"].items()
        if ns == 0
    ]

    if len(spicy_moves) != 0:
        return True
    return False
   


# takes from a random rows one element
def action1(state: Nim) -> Nimply:
    return Nimply(random.randint(0, GAME_ROWS - 1), 1)


# takes from the last row available with 2 elements two elements
def action2(state: Nim) -> Nimply:
    reversed_tupla = reversed(state.rows)

    for index, num_el in enumerate(reversed_tupla):
        if (num_el) >= 2:
            return Nimply((GAME_ROWS - 1) - index, 2)
    return Nimply(0, 0)  # if this is the condition applied, this line should never execute


# takes from the first row available with 2 elements two elements
def action3(state: Nim) -> Nimply:
    for index, num_el in enumerate(state.rows):
        if (num_el) >= 2:
            return Nimply(index, 2)

    return Nimply(0, 0)  # if this is the condition applied, this line should never execute


# takes the only element from the first row that has only one element
def action4(state: Nim) -> Nimply:
    return Nimply(state.rows.index(1), 1)


def action5(state: Nim) -> Nimply:
    for index, num_el in enumerate(state.rows):
        max_num_elements = float(index * 2 + 1)
        if float(num_el) >= max_num_elements * 0.90:
            return Nimply(index, random.randint(1, num_el))  # It returns the move 'first row found with an higher number of object, random num of object between 1 and available num of objects'

    return Nimply(0, 0)  # if this is the condition applied, this line should never execute


# if there are two row available, take only one elements from the row that has more elements
def action7(state: Nim) -> Nimply:
    max_el = 0
    max_index = 0
    for index, num_el in enumerate(state.rows):
        if num_el !=0:
            if num_el > max_el:
                max_el = num_el
                max_index = index
    return Nimply(max_index, 1)

# if there is a row available, take 1 object  -> VERY BAD WITH RESPECT TO ACTION6
def action8(state: Nim) -> Nimply:
    for index, num_el in enumerate(state.rows):
        if num_el != 0:
            return Nimply(index, num_el - 1)
    return Nimply(0, 0)  # if this is the condition applied, this line should never execute


def notoptimal_move(state: Nim) -> Nimply:
    analysis = analize(state)
    spicy_moves = [
        ply
        for ply, ns in analysis["possible_moves"].items()
        if ns == 0
    ]
    ply = random.choice(spicy_moves)
    return ply
            


In [None]:
CONDITIONS = [
    check_all_rows,
    check_num_of_rows_with_element_is_even,
    check_num_of_rows_with_element_is_odd,
    check_row_with_one_element,
    check_row_with_many_elements,
    check_only_two_row_available,
    check_spicy_move_for_notoptimal
]

ACTIONS = [
    (action1, 0),
    (action2, 0),
    (action3, 0),
    (action4, 0),
    (action5, 0),
    (action7, 0),
    (notoptimal_move, 0)
]   

print('actions: ', ACTIONS)
    

In [None]:
def set_dictionary(list_of_weights):
    
    for index, (action, _) in enumerate(ACTIONS):
        weight = list_of_weights[index]
        ACTIONS[index] = (action, weight)

    # my dictionary: <condition> - <(action, weight)
    set_of_rules = {rule: (action, weight) for rule, (action,weight) in zip(CONDITIONS, ACTIONS)}  #I create a dictionary by combining the two lists 'conditions' and 'actions'
    #logging.info(f"set_of_rules: {set_of_rules}")
    return set_of_rules
    

Setting the starting current solution/individual

In [None]:

initial_weight = 1 / len(CONDITIONS)  # the weights are probability. Initially, each action has the same probability
print("initial_weight: ", initial_weight)

current_solution = [initial_weight for _ in range(len(CONDITIONS))]
best_solution = current_solution
print('current_solution/best_solution: ', current_solution)


In [None]:


def test_conditions(state: Nim, dict_rules):
    active_actions = []

    for condition, (action, weight) in dict_rules.items():
        if condition(state):
            active_actions.append((action, weight))

    return active_actions


# the voting mechanism is a sort of roulette wheel.
def voting(list_of_actions):

    total_weight = sum(weight for _, weight in list_of_actions)

    #the sum has to be equal to one
    normalized_weights = [weight / total_weight for _, weight in list_of_actions]
    #logging.info(f"normalized_weights: {normalized_weights}")

    random_value = random.uniform(0, 1) 

    current_weight_sum = 0
    for index, normalized_weight in enumerate(normalized_weights):
        current_weight_sum += normalized_weight
        if current_weight_sum >= random_value:
            return list_of_actions[index][0]
        



def adaptive(state: Nim, dict) -> Nimply:
    # check rules active and return the mappend actions
    active_actions = test_conditions(state, dict)
    #logging.info(f"active actions : {active_actions}")
    action_to_apply = voting(active_actions)
    #logging.info(f"applied action : {action_to_apply}")
    ply = action_to_apply(state)

    return ply

## Oversimplified match

logging.basicConfig(
    filename="nim.log",
    level=logging.INFO,
    format="%(asctime)s - %(levelname)s - %(message)s",
    force=True,
)


In [None]:
logging.getLogger().setLevel(logging.INFO)

In [None]:
    
def fitness(weights_array):    
    opponent_strategies = [gabriele, pure_random, optimal]
    player0_win = 0
    player1_win = 0
    index_strategy = 0
    set_of_rules = set_dictionary(weights_array)
   
    
    for game in range(NUM_GAMES):
        # the opponent change strategy in order to train better my algorithm. The strategy change after 3 games but in the last NUM_GAMES - fixed_games games the strategy will be always optimal
        if game % 20 == 0 and game <= NUM_GAMES/2:
            index_strategy += 1
            if index_strategy >= len(opponent_strategies):
                index_strategy = 0
        elif game > NUM_GAMES/2:
            index_strategy = 2

        opponent_strategy = opponent_strategies[index_strategy]
        #logging.info(f"opponent_strategy: {opponent_strategy}")

        strategy = (opponent_strategy, adaptive)
        nim = Nim(GAME_ROWS)

        # logging.info(f"init : {nim}")

        player = 0
        while nim:
            if player == 0:
                ply = strategy[player](nim)
            else:
                ply = strategy[player](nim, set_of_rules)

            #logging.info(f"ply: player {player} plays {ply}")
            nim.nimming(ply)
            #logging.info(f"status: {nim}")
            player = 1 - player
        # logging.info(f"status: Player {player} won!")

        if player == 0:
            player0_win += 1
        else:
            player1_win += 1
        
    score =  player1_win/NUM_GAMES   
    #logging.info(f"\n-player 0 wins {player0_win} times\n-player 1 wins {player1_win} times\n fitness adaptive strategy: {score}")
    return score


## Adaptive (1+λ)-ES

In [None]:
λ = 30
initial_σ = 0.012
final_σ = 0.001
σ = initial_σ

history = []

score_best_attual_solution = fitness(best_solution)
logging.info(f"Initial situation:\nfitness {score_best_attual_solution}:\ninitial set of weight: {best_solution}")     

for epoch in range(NUM_EPOCHS):
    evals = []

    #generate the offspring but no negative values are allowed
    #usefull read: https://numpy.org/doc/stable/reference/generated/numpy.clip.html
    offspring = [np.clip(np.random.normal(loc=0, scale=σ, size=len(current_solution)) + best_solution, 0.01, None) for _ in range(λ)]
    #print(offspring)  #offspring will be an array of arrays. Each array is an individual, so  a fixed lenght array of weights 

    #i have to evaluate the fitness for each individual
    for weights_array in offspring:
        evals.append(fitness(weights_array))

    #logging.info(f"evals: {evals}:\n")
    current_solution = offspring[np.argmax(evals)] #argmax find the index of the maximum and i use it as an index inside offrsping to take the starting point to the next evaluations

    #logging.info(f"score_best_attual_solution: {score_best_attual_solution}")

    score_current_solution = max(evals)
    #logging.info(f"score_current_solution: {score_current_solution}, current_solution: {current_solution}")
    if score_best_attual_solution < score_current_solution:
        best_solution = np.copy(current_solution)
        score_best_attual_solution = score_current_solution
        history.append((epoch, score_best_attual_solution))

    
    σ = max(final_σ, initial_σ - (initial_σ - final_σ) * (epoch / NUM_EPOCHS))

    logging.info(f"Result epoch {epoch +1}:\nbest_set_of_weights: {best_solution}, best_fitness: {score_best_attual_solution}")     




## FITNESS HISTORY
* reference for plotting the graph: https://www.geeksforgeeks.org/graph-plotting-in-python-set-1/
* unzipping values: https://www.programiz.com/python-programming/methods/built-in/zip

In [None]:
x, y = zip(*history)

plt.scatter(x, y)

plt.xlabel('epoch')
plt.ylabel('fitness')


plt.title('fitness over epochs')

plt.show()

## Test the fitness of the initial configuration of weights

In [None]:
initial_weight = 1 / len(CONDITIONS)  # the weights are probability. Initially, each action has the same probability

current_solution = [initial_weight for _ in range(len(CONDITIONS))]

logging.info(f"current_solution: {current_solution}")
fitness(current_solution)