# Lab 2: ES

## Task

Write agents able to play [*Nim*](https://en.wikipedia.org/wiki/Nim), with an arbitrary number of rows and an upper bound $k$ on the number of objects that can be removed in a turn (a.k.a., *subtraction game*).

The goal of the game is to **avoid** taking the last object.

* Task2.1: An agent using fixed rules based on *nim-sum* (i.e., an *expert system*)
* Task2.2: An agent using evolved rules using ES


In [2]:
import logging
from pprint import pformat
from collections import namedtuple
import random
from copy import deepcopy
import numpy as np


## The *Nim* and *Nimply* classes

In [3]:
Nimply = namedtuple("Nimply", "row, num_objects")

In [4]:
class Nim:
    def __init__(self, num_rows: int, k: int = None) -> None:
        self._rows = [i * 2 + 1 for i in range(num_rows)]
        self._k = k

    def __bool__(self):
        return sum(self._rows) > 0

    def __str__(self):
        return "<" + " ".join(str(_) for _ in self._rows) + ">"

    @property
    def rows(self) -> tuple:
        return tuple(self._rows)

    def nimming(self, ply: Nimply) -> None:
        row, num_objects = ply
        assert self._rows[row] >= num_objects
        assert self._k is None or num_objects <= self._k
        self._rows[row] -= num_objects


## Sample (and silly) startegies 

In [5]:
def pure_random(state: Nim) -> Nimply:
    """A completely random move"""
    row = random.choice([r for r, c in enumerate(state.rows) if c > 0])
    num_objects = random.randint(1, state.rows[row])
    return Nimply(row, num_objects)


In [6]:
def gabriele(state: Nim) -> Nimply:
    """Pick always the maximum possible number of the lowest row"""
    possible_moves = [(r, o) for r, c in enumerate(state.rows) for o in range(1, c + 1)]
    return Nimply(*max(possible_moves, key=lambda m: (-m[0], m[1])))


In [7]:
def nim_sum(state: Nim) -> int:
    tmp = np.array([tuple(int(x) for x in f"{c:032b}") for c in state.rows])
    xor = tmp.sum(axis=0) % 2
    return int("".join(str(_) for _ in xor), base=2)

def analize(raw: Nim) -> dict:
    cooked = dict()
    cooked["possible_moves"] = dict()
    for ply in (Nimply(r, o) for r, c in enumerate(raw.rows) for o in range(1, c + 1)):
        tmp = deepcopy(raw)
        tmp.nimming(ply)
        cooked["possible_moves"][ply] = nim_sum(tmp)
    return cooked

def optimal(state: Nim) -> Nimply:
    analysis = analize(state)
    logging.debug(f"analysis:\n{pformat(analysis)}")
    spicy_moves = [ply for ply, ns in analysis["possible_moves"].items() if ns != 0]
    if not spicy_moves:
        spicy_moves = list(analysis["possible_moves"].keys())
    ply = random.choice(spicy_moves)
    return ply

Strategy of taking as "spicy_moves" those with nim_sum!=0 doesn't always work in misére play (we have to wait our opponent to do a mistake).\
It works always with normal play (if we take as "spicy_moves" those with nim_sum==0), since the final move to win would be taking the nim_sum to 0: this is always possible from an unbalanced position (nim_sum!=0), but not possible from a balanced position (nim_sum==0), since in this position the only thing to do will be to change nim_sum again to a number different from 0.\
Said that, to win we always try to keep our moves with nim_sum==0 (proovable that is always possible from a nim_sum!=0), so that next move will have nim_sum!=0, so we can arrive at the end of the game as a winner!

## My strategy

In [8]:
# one parameter strategy: percentage of leave-one rows
def leave_one_strategy(params:list, state: Nim) -> Nimply:
  needed = round(params[0] * len(state.rows))

  ones = sum([1 for _ in state.rows if _ == 1])
  gt_ones = sum([1 for _ in state.rows if _ > 1])

  r = 0
  while True:
    r = round(np.random.rand() * (len(state.rows)-1))
    if (state.rows[r] >= 1):
      break

  if ones < needed and gt_ones != 0 and needed <= (ones+gt_ones):
    # generate ones until it's possible (bounded)
    while(state.rows[r] <= 1):
      r = round(np.random.rand() * (len(state.rows)-1))
    return Nimply(r, state.rows[r]-1)
  elif ones >= needed and gt_ones != 0 and needed <= (ones+gt_ones):
    # set to zero gt_ones
    while(state.rows[r] <= 1):
      r = round(np.random.rand() * (len(state.rows)-1))
    return Nimply(r, state.rows[r])
  else:
    # set to zero all
    return Nimply(r, state.rows[r])

In [9]:
class es:
  def __init__(self, params: list, l: int, s: float, eras: int, n_r: int, player_strategy: callable, opponent_strategy: callable):
    self.params = params
    self.l = l
    self.s = s
    self.eras = eras
    self.era = 0
    self.n_r = n_r
    self.player_strategy = player_strategy
    self.opponent_strategy = opponent_strategy

  def fitness(self, params:list, rows: int, iterations: int):
    wins = 0
    for _ in range(iterations):
      strategy = (self.player_strategy, self.opponent_strategy)
      nim = Nim(rows)
      args = ([params, nim],[nim])
      player = 0
      while nim:
          ply = strategy[player](*args[player])
          nim.nimming(ply)
          player = 1 - player
      if (player == 0):
        wins += 1
      self.r = 0
    return wins/iterations
  
  def generate_offspring(self) -> list:
    offspring = []
    for _ in range(self.l):
      params = np.random.normal(loc=self.params, scale=self.s)
      for i, _ in enumerate(params):
        while params[i] > 1 or params[i] < 0:
          params[i] = np.random.normal(loc=params[i], scale=self.s)
      offspring.append(params)
    return offspring

# (1+1)-es
class opo_es(es):
  def __init__(self, params: list, s: float, eras: int, n_r: int, player_strategy: callable, opponent_strategy: callable):
    super().__init__(params, 1, s, eras, n_r, player_strategy, opponent_strategy)

  def optimize(self):
    for era in range(self.eras):
      if (era == 0):
        for i, _ in enumerate(self.params):
          self.params[i] = np.random.rand()
        self.current_fitness = self.fitness(self.params, self.n_r, 1000)
        print(f"current_fitness: {self.current_fitness}, current_params: {self.params}")
      else:
        new_offspring = self.generate_offspring()[0]
        new_fitness = self.fitness(new_offspring, self.n_r, 1000)
        if new_fitness > self.current_fitness:
          self.params = new_offspring
          self.current_fitness = new_fitness
          print(f"current_fitness: {self.current_fitness}, current_params: {self.params}")

  def __str__(self) -> str:
    return f"(1+1)-es | l:1 | s:{self.s} | e:{self.eras} | r:{self.n_r} | final_p: {self.params} | final_fit: {self.current_fitness}"
  
# (1+lambda)-es
class opl_es(es):
  def __init__(self, params: list, l:int, s: float, eras: int, n_r: int, player_strategy: callable, opponent_strategy: callable):
    super().__init__(params, l, s, eras, n_r, player_strategy, opponent_strategy)

  def optimize(self):
    for era in range(self.eras):
      if era == 0:
        for i, _ in enumerate(self.params):
          self.params[i] = np.random.rand()
        self.current_fitness = self.fitness(self.params, self.n_r, 1000)
        print(f"current_fitness: {self.current_fitness}, current_params: {self.params}")
      else:
        new_offspring = self.generate_offspring()
        new_fitness = [self.fitness(n_o, self.n_r, 1000) for n_o in new_offspring]
        change = False
        for i in range(len(new_fitness)):
          if new_fitness[i] > self.current_fitness:
            change = True
            self.params = new_offspring[i]
            self.current_fitness = new_fitness[i]
        if change:
          print(f"current_fitness: {self.current_fitness}, current_params: {self.params}")

  def __str__(self) -> str:
    return f"(1+l)-es | l:{self.l} | s:{self.s} | e:{self.eras} | r:{self.n_r} | final_p: {self.params} | final_fit: {self.current_fitness}"
  
# (1,lambda)-es
class ocl_es(es):
  def __init__(self, params: list, l:int, s: float, eras: int, n_r: int, player_strategy: callable, opponent_strategy: callable):
    super().__init__(params, l, s, eras, n_r, player_strategy, opponent_strategy)

  def optimize(self):
    for era in range(self.eras):
      if era == 0:
        for i, _ in enumerate(self.params):
          self.params[i] = np.random.rand()
        self.current_fitness = self.fitness(self.params, self.n_r, 1000)
      else:
        new_offspring = self.generate_offspring()
        new_fitness = [self.fitness(n_o, self.n_r, 1000) for n_o in new_offspring]
        id_max = max(range(len(new_fitness)), key=new_fitness.__getitem__)
        self.current_fitness = new_fitness[id_max]
        self.params = new_offspring[id_max]
      print(f"current_fitness: {self.current_fitness}, current_params: {self.params}")

  def __str__(self) -> str:
    return f"(1,l)-es | l:{self.l} | s:{self.s} | e:{self.eras} | r:{self.n_r} | final_p: {self.params} | final_fit: {self.current_fitness}"

In [11]:
opo = opo_es([0], 0.1, 100, 20, leave_one_strategy, pure_random)
opl = opl_es([0], 10, 0.1, 100, 20, leave_one_strategy, pure_random)
ocl = ocl_es([0], 10, 0.1, 100, 20, leave_one_strategy, pure_random)

In [12]:
opo.optimize()

current_fitness: 0.055, current_params: [0.4991534964541594]
current_fitness: 0.949, current_params: [0.4507188]
current_fitness: 0.957, current_params: [0.44176776]
current_fitness: 0.969, current_params: [0.37462422]


In [13]:
opl.optimize()

current_fitness: 0.961, current_params: [0.3632961380949693]
current_fitness: 0.963, current_params: [0.33846289]
current_fitness: 0.965, current_params: [0.45678131]
current_fitness: 0.968, current_params: [0.37479374]
current_fitness: 0.969, current_params: [0.36254104]


In [14]:
ocl.optimize()

current_fitness: 0.276, current_params: [0.891212445964451]
current_fitness: 0.374, current_params: [0.75383131]
current_fitness: 0.822, current_params: [0.56737166]
current_fitness: 0.958, current_params: [0.45955745]
current_fitness: 0.957, current_params: [0.46638954]
current_fitness: 0.951, current_params: [0.43438162]
current_fitness: 0.95, current_params: [0.33122722]
current_fitness: 0.957, current_params: [0.44096]
current_fitness: 0.953, current_params: [0.35610216]
current_fitness: 0.958, current_params: [0.4546455]
current_fitness: 0.946, current_params: [0.43538427]
current_fitness: 0.951, current_params: [0.43893019]
current_fitness: 0.954, current_params: [0.4657754]
current_fitness: 0.958, current_params: [0.47093148]
current_fitness: 0.958, current_params: [0.46782512]
current_fitness: 0.951, current_params: [0.36461982]
current_fitness: 0.956, current_params: [0.36925698]
current_fitness: 0.964, current_params: [0.42967203]
current_fitness: 0.962, current_params: [0.35

In [15]:
print(opo)
print(opl)
print(ocl)

(1+1)-es | l:1 | s:0.1 | e:100 | r:20 | final_p: [0.37462422]
(1+l)-es | l:10 | s:0.1 | e:100 | r:20 | final_p: [0.36254104]
(1,l)-es | l:10 | s:0.1 | e:100 | r:20 | final_p: [0.24588965]
