Copyright **`(c)`** 2022 Giovanni Squillero `<squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free for personal or classroom use; see [`LICENSE.md`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  


# Lab 2: ES

## Task

Write agents able to play [*Nim*](https://en.wikipedia.org/wiki/Nim), with an arbitrary number of rows and an upper bound $k$ on the number of objects that can be removed in a turn (a.k.a., *subtraction game*).

The goal of the game is to **avoid** taking the last object.

* Task2.1: An agent using fixed rules based on *nim-sum* (i.e., an *expert system*)
* Task2.2: An agent using evolved rules using ES

## Instructions

* Create the directory `lab2` inside your personal course repository for the course
* Put a `README.md` and your solution (all the files, code and auxiliary data if needed)

## Notes

* Working in group is not only allowed, but recommended (see: [Ubuntu](https://en.wikipedia.org/wiki/Ubuntu_philosophy) and [Cooperative Learning](https://files.eric.ed.gov/fulltext/EJ1096789.pdf)). Collaborations must be explicitly declared in the `README.md`.
* [Yanking](https://www.emacswiki.org/emacs/KillingAndYanking) from the internet is allowed, but sources must be explicitly declared in the `README.md`.



In [1]:
import logging
from pprint import pprint, pformat
from collections import namedtuple
import random
from copy import deepcopy
import numpy as np

## The *Nim* class

In [2]:
Nimply = namedtuple("Nimply", "row, num_objects")

class Nim:
    def __init__(self, num_rows: int, k: int = None) -> None:
        self._rows = [i * 2 + 1 for i in range(num_rows)]
        self._k = k

    def __bool__(self):
        return sum(self._rows) > 0

    def __str__(self):
        return "<" + " ".join(str(_) for _ in self._rows) + ">"

    @property
    def rows(self) -> tuple:
        return tuple(self._rows)

    @property
    def k(self) -> int:
        return self._k

    def non_empty_rows(self) -> list:
        return [i for i, row in enumerate(self._rows) if row > 0]

    def nimming(self, ply: Nimply) -> None:
        row, num_objects = ply
        assert self._rows[row] >= num_objects
        assert self._k is None or num_objects <= self._k
        self._rows[row] -= num_objects

def nim_sum(state: Nim) -> int:
    tmp = np.array([tuple(int(x) for x in f"{c:032b}") for c in state.rows])
    xor = tmp.sum(axis=0) % 2
    return int("".join(str(_) for _ in xor), base=2)

# Task 1 - Fixed Rules Agents

In [26]:
def random_agent(state: Nim) -> Nimply:
    """A completely random move"""
    row = random.choice([r for r, c in enumerate(state.rows) if c > 0])
    num_objects = random.randint(1, state.rows[row])
    return Nimply(row, num_objects)

def silly(state: Nim) -> Nimply:
    """Pick always the maximum possible number of the lowest row"""
    possible_moves = [(r, o) for r, c in enumerate(state.rows) for o in range(1, c + 1)]
    return Nimply(*max(possible_moves, key=lambda m: (-m[0], m[1])))

def analize(raw: Nim) -> dict:
    cooked = dict()
    cooked["possible_moves"] = dict()
    for ply in (Nimply(r, o) for r, c in enumerate(raw.rows) for o in range(1, c + 1)):
        tmp = deepcopy(raw)
        tmp.nimming(ply)
        cooked["possible_moves"][ply] = nim_sum(tmp)
    return cooked

def optimal_move(state: Nim) -> Nimply:
    analysis = analize(state)
    logging.debug(f"analysis:\n{pformat(analysis)}")
    spicy_moves = [ply for ply, ns in analysis["possible_moves"].items() if ns == 0]
    if not spicy_moves:
        spicy_moves = list(analysis["possible_moves"].keys())
    ply = random.choice(spicy_moves)
    return ply

def check_misere(state: Nim) -> int:

    num_abundant_rows = 0
    abundant_row_index = -1
    for row, num_objects in enumerate(state.rows):
        if num_objects > 1:
            num_abundant_rows += 1
            abundant_row_index = row

    if num_abundant_rows == 1:
        return abundant_row_index

    return -1

def optimal_agent(state: Nim) -> Nimply:
    rows = state.rows

    num_active_rows = sum(1 for row in rows if row > 0)
    max_objects = max(rows)
    max_index = rows.index(max_objects)

    if check_misere(state)!=-1:
        if num_active_rows % 2 == 0:  # Even number of active rows
           return Nimply(max_index, max_objects)
        else:  # Odd number of active rows
            if max_objects > 1:
              return Nimply(max_index, max_objects - 1)

    return optimal_move(state)  # Continue with the default strategy

## Test match

In [None]:
logging.getLogger().setLevel(logging.INFO)

strategy = (random_agent, optimal_agent)

nim = Nim(3)
logging.info(f"init : {nim}")
player = 0
while nim:
    ply = strategy[player](nim)
    logging.info(f"ply: player {player} plays {ply}")
    nim.nimming(ply)
    logging.info(f"status: {nim} nim_sum: {nim_sum(nim)}\n")
    player = 1 - player
logging.info(f"status: Player {player} won!")


# Task 2 - ES Agents

## Rule = condition + position + action

In [94]:
from collections import namedtuple

# Define a named tuple 'Rule' to represent a rule in Nim game
Rule = namedtuple("Rule", ["condition", "position", "action"])

# Custom string representation for the Rule named tuple
Rule.__str__ = lambda self: f"{self.condition.__name__} + {self.position.__name__} + {self.action.__name__}"

# Conditions for rules
def odd_rows(state: Nim) -> bool:
    """Check if the number of non-empty rows in the state is odd."""
    return len(state.non_empty_rows()) % 2 == 1

def even_rows(state: Nim) -> bool:
    """Check if the number of non-empty rows in the state is even."""
    return len(state.non_empty_rows()) % 2 == 0

def misere_state(state: Nim) -> bool:
    """Check if the state is in a misere state."""
    return check_misere(state) != -1

# Positions for rules
def min_position(state: Nim) -> int:
    """Get the row index with the minimum number of objects."""
    return min(state.non_empty_rows(), key=lambda x: state.rows[x])

def max_position(state: Nim) -> int:
    """Get the row index with the maximum number of objects."""
    return max(state.non_empty_rows(), key=lambda x: state.rows[x])

def random_position(state: Nim) -> int:
    """Get a random non-empty row index."""
    return random.choice(state.non_empty_rows())

# Actions for rules
def get_one(state: Nim, row: int) -> Nimply:
    """Take one object from the specified row."""
    return Nimply(row, 1)

def leave_one(state: Nim, row: int) -> Nimply:
    """Leave one object in the specified row if possible."""
    if state.rows[row] == 1:
        return Nimply(row, 1)
    if state.k is None:
        return Nimply(row, state.rows[row] - 1)
    else:
        return Nimply(row, min(state.k, state.rows[row] - 1))

def get_all(state: Nim, row: int) -> Nimply:
    """Take all objects from the specified row if within the limit."""
    if state.k is None:
        return Nimply(row, state.rows[row])
    else:
        return Nimply(row, min(state.k, state.rows[row]))

def get_random(state: Nim, row: int) -> Nimply:
    """Take a random number of objects from the specified row within the limit."""
    if state.k is None:
        return Nimply(row, random.randint(1, state.rows[row]))
    else:
        return Nimply(row, random.randint(1, min(state.k, state.rows[row])))

# Lists of conditions, positions, and actions
conditions = [odd_rows, even_rows, misere_state]
positions = [min_position, max_position, random_position]
actions = [get_one, leave_one, get_all, get_random]

def generate_all_possible_rules() -> list:
    """Generate all possible combinations of rules."""
    rules = []
    for condition in conditions:
        for position in positions:
            for action in actions:
                rules.append(Rule(condition, position, action))
    return rules


### ES Agent

In [96]:
class ESAgent:
    def __init__(self) -> None:
        # Initialize the agent's rules, weights, sigmas, and fitness
        self.rules = generate_all_possible_rules()  # Generate all possible rules
        self.weights = np.random.uniform(0, 1, len(self.rules))  # Initialize rule weights randomly
        self.sigmas = np.random.uniform(0, 3, len(self.rules))  # Initialize sigma values for mutation
        self.fitness = 0  # Initialize fitness to evaluate performance

    def __str__(self) -> str:
        # Return a string representation of the agent (displaying fitness and top 5 rules with weights)
        description = f"Fitness: {self.fitness}\n"
        top_5_indices = np.argsort(self.weights)[-5:]
        for i in top_5_indices:
            description += f"\t WEIGHT: {self.weights[i]:.2f} \t {self.rules[i]}\t \n"
        return description

    def pick_rule(self, state: Nim) -> Rule:

        # 1. Select the rules in which the condition is true for the current state
        indices = [i for i, rule in enumerate(self.rules) if rule.condition(state)]
        valid_rules = [self.rules[i] for i in indices]

        # 2. Compute the total fitness (sum of weights) of valid rules
        total_fitness = sum(self.weights[i] for i in indices)

        # 3. Calculate the interval size
        interval_size = total_fitness / len(valid_rules)

        # 4. Generate a random starting point in the interval
        start = random.uniform(0, interval_size)

        # 5. Select rules using Stochastic Universal Sampling
        chosen_rules = []
        pointer = start
        while len(chosen_rules) < 1:  # Modify this number to select multiple rules if needed
            for i in indices:
                pointer -= self.weights[i]
                if pointer <= 0:
                    chosen_rules.append(self.rules[i])
                    pointer += interval_size

        # Return a randomly chosen rule from the selected ones
        return random.choice(chosen_rules)

    def play(self, state: Nim) -> Nimply:
        # Pick a rule and execute its action on the game state
        rule = self.pick_rule(state)
        return rule.action(state, rule.position(state))

    def evaluate(self, num_games: int = 60) -> None:
        wins = 0

        # Play against the optimal agent
        players = [self.play, optimal_agent]
        for _ in range(num_games//2):
            nim = Nim(4)  # Initialize the Nim game
            player = 0
            while nim:
                ply = players[player](nim)
                nim.nimming(ply)
                player = 1 - player
            if player == 0:
                wins += 1  # Increment wins if the agent wins against optimal_agent

        # Play against the random agent
        players = [random_agent, self.play]
        for _ in range(num_games//2):
            nim = Nim(4)  # Initialize the Nim game
            player = 0
            while nim:
                ply = players[player](nim)
                nim.nimming(ply)
                player = 1 - player
            wins += player

        # Update the fitness based on the win rate
        self.fitness = wins / num_games

    def mutate(self, mutation_rate: float = 0.1) -> None:
        for i in range(len(self.weights)):
            if random.random() < mutation_rate:
                # Add a random value to the weight with Gaussian noise and ensure it's non-negative
                self.weights[i] += random.gauss(0, self.sigmas[i])
                self.weights[i] = max(0, self.weights[i])

    def adaptation(self) -> None:
       win_rate = self.fitness
       for i in range(len(self.sigmas)):
            if random.random() > win_rate:
                # Modify sigmas based on Gaussian noise and ensure they remain non-negative
                self.sigmas[i] += random.gauss(0, 3 - win_rate)
                self.sigmas[i] = max(0, self.sigmas[i])


In [59]:
num_parents = 15
num_children = 50
num_generations = 20
mutation_rate = 0.5
crossover_rate = 0.5

In [97]:
# Initialize population with ESAgent instances
population = [ESAgent() for _ in range(num_children)]

# Evaluate fitness for each individual in the population
for individual in population:
    individual.evaluate()

# Sort the population based on fitness in descending order
population.sort(key=lambda x: x.fitness, reverse=True)

# Iterate through generations
for generation in range(num_generations):
    print(f"Generation {generation+1}")

    # Select parents based on the top individuals
    parents = population[:num_parents]

    # Generate children through crossover and mutation
    children = []
    for _ in range(num_children):
        # Select two parents randomly
        parent1, parent2 = random.choices(parents, k=2)

        # Create a child by copying from parent1 and applying crossover
        child = deepcopy(parent1)
        for i in range(len(child.weights)):
            if random.random() < crossover_rate:
                child.weights[i] = parent2.weights[i]
                child.sigmas[i] = parent2.sigmas[i]

        # Mutate the child and evaluate its fitness
        child.mutate(mutation_rate)
        child.evaluate()
        children.append(child)

    # Combine parents and children, and select the top individuals
    population = parents + children
    population.sort(key=lambda x: x.fitness, reverse=True)
    population = population[:num_parents]

    # Apply adaptation to the individuals in the population
    for individual in population:
        individual.adaptation()

    # Display the top individual's information for the current generation
    print(population[0])


Generation 1
Fitness: 0.38333333333333336
	 WEIGHT: 1.20 	 even_rows + random_position + get_one	 
	 WEIGHT: 1.28 	 misere_state + random_position + leave_one	 
	 WEIGHT: 1.67 	 misere_state + max_position + get_all	 
	 WEIGHT: 1.84 	 misere_state + min_position + get_random	 
	 WEIGHT: 4.15 	 misere_state + random_position + get_random	 

Generation 2
Fitness: 0.38333333333333336
	 WEIGHT: 1.20 	 even_rows + random_position + get_one	 
	 WEIGHT: 1.28 	 misere_state + random_position + leave_one	 
	 WEIGHT: 1.67 	 misere_state + max_position + get_all	 
	 WEIGHT: 1.84 	 misere_state + min_position + get_random	 
	 WEIGHT: 4.15 	 misere_state + random_position + get_random	 

Generation 3
Fitness: 0.38333333333333336
	 WEIGHT: 1.20 	 even_rows + random_position + get_one	 
	 WEIGHT: 1.28 	 misere_state + random_position + leave_one	 
	 WEIGHT: 1.67 	 misere_state + max_position + get_all	 
	 WEIGHT: 1.84 	 misere_state + min_position + get_random	 
	 WEIGHT: 4.15 	 misere_state + random