Copyright **`(c)`** 2022 Giovanni Squillero `<squillero@polito.it>`  
[`https://github.com/squillero/computational-intelligence`](https://github.com/squillero/computational-intelligence)  
Free for personal or classroom use; see [`LICENSE.md`](https://github.com/squillero/computational-intelligence/blob/master/LICENSE.md) for details.  


# Lab 3: ES

##Authors:
- Andrea Panuccio s294603
- Giovanni Violo s317617

##Summary
We created 5 strategies, but in order to figure out the best mix, we call them with a meta-strategy.

This meta-strategy first, checks if it is in a terminal case. If so, it just chooses the best action with explicit rules.

Otherwise, it calls one of a set of given sub-strategies.

The probability of these sub-strategies to be called is tuned at training time, in order to find the best probability, somehow taking into account the whole set composition.

In [75]:
import logging
from pprint import pprint, pformat
from collections import namedtuple
import random
from copy import deepcopy
from numpy import random as rnd
import scipy
from queue import PriorityQueue


## The *Nim* and *Nimply* classes

In [76]:
Nimply = namedtuple("Nimply", "row, num_objects")


In [77]:
class Nim:
    def __init__(self, num_rows: int, k: int = None) -> None:
        self._rows = [i * 2 + 1 for i in range(num_rows)]
        self._k = k

    def __bool__(self):
        return sum(self._rows) > 0

    def __str__(self):
        return "<" + " ".join(str(_) for _ in self._rows) + ">"

    @property
    def rows(self) -> tuple:
        return tuple(self._rows)

    def nimming(self, ply: Nimply) -> None:
        row, num_objects = ply
        assert self._rows[row] >= num_objects
        assert self._k is None or num_objects <= self._k
        self._rows[row] -= num_objects


## Sample (and silly) startegies

In [78]:
def pure_random(state: Nim) -> Nimply:
    """A completely random move"""
    row = random.choice([r for r, c in enumerate(state.rows) if c > 0])
    num_objects = random.randint(1, state.rows[row])
    return Nimply(row, num_objects)


In [79]:
def gabriele(state: Nim) -> Nimply:
    """Pick always the maximum possible number of the lowest row"""
    possible_moves = [(r, o) for r, c in enumerate(state.rows) for o in range(1, c + 1)]
    return Nimply(*max(possible_moves, key=lambda m: (-m[0], m[1])))


#Sub-Strategies
- N_stg: it completely removes a random heap
- NO_stg: it removes all but one the elements of a random heap
- NM_stg: it removes m elements from an heap with n elements, where m is the size of another heap and m < n
- NOM_stg: it removes m + 1 elements from an heap with n elements, where m is the size of another heap and m < n
- O_stg: it just removes an element from a random heap

In [81]:
#OUR STRATEGIES

def N_stg(state: Nim) -> Nimply:
    ply = random.choice([Nimply(r, n) for r, n in enumerate(state.rows) if n != 0])

    return ply

def NO_stg(state: Nim) -> Nimply:

    moveset = [Nimply(r, n-1) for r, n in enumerate(state.rows) if n != 0 and n > 1]

    if not len(moveset): # se non ci sono heap con più di un match
        ply = pure_random(state)
        return ply

    ply = random.choice(moveset)

    return ply

def NM_stg(state: Nim) -> Nimply:
    ply1 = random.choice([Nimply(r, n) for r, n in enumerate(state.rows) if n != 0])
    ply2 = random.choice([Nimply(r, n) for r, n in enumerate(state.rows) if n != 0])

    if ply1 == ply2: # se ottengo per caso lo stesso heap
        ply = pure_random(state)
        return ply

    ply = Nimply(ply1.row, ply2.num_objects)

    if ply2.num_objects > ply1.num_objects:
        ply = Nimply(ply2.row, ply1.num_objects)

    return ply

def NMO_stg(state: Nim) -> Nimply:
    moveset = [Nimply(r, n) for r, n in enumerate(state.rows) if n != 0 and n > 1]

    if not len(moveset): # se non ho trovato heap con più di un match
        ply = pure_random(state)
        return ply

    ply1 = random.choice(moveset)
    ply2 = random.choice(moveset)

    if ply1 == ply2: # se ottengo per caso lo stesso heap
        ply = pure_random(state)
        return ply

    ply = Nimply(ply1.row, ply2.num_objects-1)

    if ply2.num_objects > ply1.num_objects:
        ply = Nimply(ply2.row, ply1.num_objects-1)

    return ply

def O_stg(state: Nim) -> Nimply:
    ply = random.choice([Nimply(r, 1) for r, n in enumerate(state.rows) if n != 0])

    return ply

def metaStrategy(state: Nim) -> Nimply:
    global fun_list

    ones = 0
    nones = 0
    nones_idx = 0
    first = 0
    target_idx = -1
    nozeros = 0
    count = 0

    for r, h in enumerate(state.rows): # here I handle simbolically the terminal conditions, otherwise call my strategies with their probability
        if h == 1: ones += 1
        if h > 1: nones += 1
        if h > 1: nones_idx = r

        if first != 0 and h != 0 and first != h: count, target_idx = (count + 1, r)
        if first == 0 and h != 0: target_idx, first = (r, h)
        if h != 0: nozeros += 1

    if ones % 2 != 0 and nones == 1:
        ply = Nimply(nones_idx, state.rows[nones_idx]-1)
        #print("tutti 1 (dispari) tranne 1", state.rows, ply)
    elif ones % 2 == 0 and ones > 0 and nones == 1:
        ply = Nimply(nones_idx, state.rows[nones_idx])
        #print("tutti 1 (pari) tranne 1", state.rows, ply)
    elif ones % 2 == 0 and ones == 0 and nones == 1:
        ply = Nimply(nones_idx, state.rows[nones_idx]-1)
        #print("uno solo > 1", state.rows, ply)
    elif nozeros % 2 != 0 and count == 0 and ones == 0:
        ply = Nimply(target_idx, state.rows[target_idx])
        #print("tutti uguali (dispari)", state.rows, ply)
    elif nozeros % 2 != 0 and count == 1:
        ply = Nimply(target_idx, state.rows[target_idx])
        #print("tutti uguali (pari) tranne 1", state.rows, ply)
    else:
        ply = random.choices( [ st for _, st in fun_list], weights = [w for w, _ in fun_list])[0](state)
        #print("strategia x", state.rows, ply)
    return ply

In [107]:
import numpy as np


def nim_sum(state: Nim) -> int:
    tmp = np.array([tuple(int(x) for x in f"{c:032b}") for c in state.rows])
    xor = tmp.sum(axis=0) % 2
    return int("".join(str(_) for _ in xor), base=2)


def analize(raw: Nim) -> dict:
    cooked = dict()
    cooked["possible_moves"] = dict()
    for ply in (Nimply(r, o) for r, c in enumerate(raw.rows) for o in range(1, c + 1)):
        tmp = deepcopy(raw)
        tmp.nimming(ply)
        cooked["possible_moves"][ply] = nim_sum(tmp)
    return cooked


def optimal(state: Nim) -> Nimply:
    analysis = analize(state)
    logging.debug(f"analysis:\n{pformat(analysis)}")
    spicy_moves = [ply for ply, ns in analysis["possible_moves"].items() if ns != 0]
    if not spicy_moves:
        spicy_moves = list(analysis["possible_moves"].keys())
    ply = random.choice(spicy_moves)
    return ply


## Code Testing Match

In [104]:
logging.getLogger().setLevel(logging.INFO)

fun_list = [(0.2, N_stg), (0.2, NO_stg),(0.2, NM_stg), (0.2, NMO_stg),(0.2, O_stg)]
strategy = (optimal, metaStrategy)

nim = Nim(5)
logging.info(f"init : {nim}")
player = 0
while nim:
    ply = strategy[player](nim)
    logging.info(f"ply: player {player} plays {ply}")
    nim.nimming(ply)
    logging.info(f"status: {nim}")
    player = 1 - player
logging.info(f"status: Player {player} won!")


INFO:root:init : <1 3 5 7 9>
INFO:root:ply: player 0 plays Nimply(row=2, num_objects=3)
INFO:root:status: <1 3 2 7 9>
INFO:root:ply: player 1 plays Nimply(row=2, num_objects=2)
INFO:root:status: <1 3 0 7 9>
INFO:root:ply: player 0 plays Nimply(row=4, num_objects=7)
INFO:root:status: <1 3 0 7 2>
INFO:root:ply: player 1 plays Nimply(row=1, num_objects=2)
INFO:root:status: <1 1 0 7 2>
INFO:root:ply: player 0 plays Nimply(row=3, num_objects=7)
INFO:root:status: <1 1 0 0 2>
INFO:root:ply: player 1 plays Nimply(row=4, num_objects=2)
INFO:root:status: <1 1 0 0 0>
INFO:root:ply: player 0 plays Nimply(row=1, num_objects=1)
INFO:root:status: <1 0 0 0 0>
INFO:root:ply: player 1 plays Nimply(row=0, num_objects=1)
INFO:root:status: <0 0 0 0 0>
INFO:root:status: Player 0 won!


strategia x (1, 3, 2, 7, 9) Nimply(row=2, num_objects=2)
strategia x (1, 3, 0, 7, 2) Nimply(row=1, num_objects=2)
tutti 1 (pari) tranne 1 (1, 1, 0, 0, 2) Nimply(row=4, num_objects=2)
strategia x (1, 0, 0, 0, 0) Nimply(row=0, num_objects=1)


##Training strategy
We implemented an ES algorith with the Plus strategy, so we always keep the parents when we create the offsprings.

We noticed that due to the randomness of the opponent's plays, the average score has high variance, so we chose a sample of size=500 to be enough representative of the population i.e. the real unknown score.

We chose mu=5 so the number of survivor/parents is 5.
The number of offspring is lambda=20 so we have a high selective pressure.

The choice of running just 10 epochs is just a matter of time at disposal: the higher the best. So we have generations=10.

We also use a linear decreasing mutation factor, called tweak_factor, such that it goes down from an initial value of 1, to a final value of 0.4.
The value of sigma so is sigma=tweak_factor / number_of_strategies.

We tried to have a second set of mutation, for each new offspring, than choosing the best for each, but it wasn't computationally efficient due to the curse of dimensionality, and we decided to remove it.

##Results and comments
We found out that the N_stg sub_strategy was one of the most selected, but we got relatively high scores also from a combination of N_stg and NMO_stg, and from a combination of NO_stg and NMO_stg.

NM_stg and O_stg have been always discarded.

In [123]:
# SETUP
fun_list_orig = [(0.2, N_stg), (0.2, NO_stg),(0.2, NM_stg), (0.2, NMO_stg),(0.2, O_stg)]

fun_list = fun_list_orig.copy()
strategy = (optimal, metaStrategy)

best_list = fun_list.copy()
best_win_perc = 0

In [124]:
# TUNING & TRAINING
NUM_MATCHES = 500       # less than 500 matches are not enough representative of the population of matches, mainly thanks to the high randomness of the opponent plays
NUM_LOCAL_RUNS = 1      # local runs were a trial of "gettin a checkpoint and mutate again NUM_LOCAL_RUNS times each genotype, then getting the best local: it wasn't efficient due to the curse of dimensionality"
NUM_RUNS = 20           # the number of offsprings
NUM_GENS = 10           # the number of generations

ELITES_MAX_NUM = 5      # the number of survivors of each generation; we tried crossin over them but it wasn't so computationally efficient
elites_fun_list = []

tweak_factor_orig = 1
tweak_factor = tweak_factor_orig
tweak_factor_final = 0.4
twaek_factor_loss = (tweak_factor_orig - tweak_factor_final) / NUM_LOCAL_RUNS # we use a linear decreasing factor, between two thresholds, for the mutation
population_fun_queue = PriorityQueue(ELITES_MAX_NUM)

for gen in range(NUM_GENS): # for each generation
  print("gen #",gen)
  print("best:", best_win_perc)

  population_fun_list = []
  if population_fun_queue.qsize() > 0: # if it's not the first generation
    # mutation
    eli_fun_list = list(population_fun_queue.queue)
    print(eli_fun_list)
    eli = eli_fun_list[random.choice(range(ELITES_MAX_NUM))][1] # get a random survivor
    while len(population_fun_list) < NUM_RUNS:
      offspring = eli.copy() #get a copy as offspring

      tweak = rnd.normal(loc=0, scale= tweak_factor/len(fun_list_orig)) #mutate the offspring
      index = random.choice(range(len(fun_list_orig)))

      old_weights = [w for w, _ in offspring]
      old_weights[index] += tweak

      new_weights = scipy.special.softmax(old_weights, axis=None)
      offspring = [ (new_weights[idx], st ) for idx, ( _ , st) in enumerate(offspring)]
      population_fun_list.append(offspring) # add it to the offspring list
  else: # if it's the first generation
    while len(population_fun_list) < NUM_RUNS: # just fill with random genotypes
      rand_weights = scipy.special.softmax([random.choice(range(10)) for _ in range(len(fun_list_orig))], axis=None)
      population_fun_list.append([ (rand_weights[idx], st ) for idx, ( _ , st) in enumerate(fun_list_orig)])

  tweak_factor -= twaek_factor_loss #decrease the tweak factor

  for fun_list in population_fun_list: # for each new genotype
      #print("best:", best_win_perc)

      num_win = 0
      for _ in range(NUM_MATCHES): # get NUM_MATCHES matches

          nim = Nim(5)
          player = 0
          while nim:
              ply = strategy[player](nim)
              nim.nimming(ply)
              player = 1 - player

          if player:# if I win
              num_win += 1

      perc_run = num_win / NUM_MATCHES # newbie winrate
      #print("",perc_run)
      if perc_run > best_win_perc: # check if it's the best
        best_list = fun_list.copy()
        best_win_perc = perc_run

      if population_fun_queue.full(): # tryin to add it to the elites
        old_eli_val, old_eli = population_fun_queue.get_nowait()
        if perc_run > old_eli_val:
            population_fun_queue.put((perc_run, fun_list.copy()))
        else:
            population_fun_queue.put((old_eli_val, old_eli))
      else:
        population_fun_queue.put((perc_run, fun_list.copy()))




gen # 0
best: 0
gen # 1
best: 0.27
[(0.146, [(0.00023355741468343382, <function N_stg at 0x7d135c8cedd0>), (0.012751802768239404, <function NO_stg at 0x7d135c8cf6d0>), (0.25612680533867926, <function NM_stg at 0x7d135c87feb0>), (0.6962248407333991, <function NMO_stg at 0x7d135c87c430>), (0.03466299374499893, <function O_stg at 0x7d135c87c0d0>)]), (0.152, [(0.00015689390672160133, <function N_stg at 0x7d135c8cedd0>), (0.06329551949505735, <function NO_stg at 0x7d135c8cf6d0>), (0.0011592978783463049, <function NM_stg at 0x7d135c87feb0>), (0.4676941443599374, <function NMO_stg at 0x7d135c87c430>), (0.4676941443599374, <function O_stg at 0x7d135c87c0d0>)]), (0.176, [(0.0870931799085484, <function N_stg at 0x7d135c8cedd0>), (0.03203979035460067, <function NO_stg at 0x7d135c8cf6d0>), (0.2367438083281215, <function NM_stg at 0x7d135c87feb0>), (0.0005868292302056143, <function NMO_stg at 0x7d135c87c430>), (0.6435363921785239, <function O_stg at 0x7d135c87c0d0>)]), (0.182, [(0.10108902372958828

KeyboardInterrupt: ignored

In [125]:
best_list # what I got from the training

[(0.8803455729499796,
  <function __main__.N_stg(state: __main__.Nim) -> __main__.Nimply>),
 (0.00010864327468633266,
  <function __main__.NO_stg(state: __main__.Nim) -> __main__.Nimply>),
 (0.11914181746128359,
  <function __main__.NM_stg(state: __main__.Nim) -> __main__.Nimply>),
 (0.00010864327468633266,
  <function __main__.NMO_stg(state: __main__.Nim) -> __main__.Nimply>),
 (0.00029532303936414265,
  <function __main__.O_stg(state: __main__.Nim) -> __main__.Nimply>)]

In [126]:
best_win_perc # the best winrate registered

0.27

#Test

Here we test the obtained solution. As we can notice, the average score is highly variable, due to the randomness of the opponent plays.

In [134]:
# TESTING: it will never be the same as during training just for the inner variability of the plays
fun_list = best_list
strategy = (optimal, metaStrategy)
num_win = 0
print(fun_list)
for _ in range(NUM_MATCHES):

    nim = Nim(5)
    player = 0
    while nim:
        ply = strategy[player](nim)
        nim.nimming(ply)
        player = 1 - player

    if player:# if I win
        num_win += 1

#get the winrate
perc_run = num_win / NUM_MATCHES
print(perc_run)

[(0.8803455729499796, <function N_stg at 0x7d135c8cedd0>), (0.00010864327468633266, <function NO_stg at 0x7d135c8cf6d0>), (0.11914181746128359, <function NM_stg at 0x7d135c87feb0>), (0.00010864327468633266, <function NMO_stg at 0x7d135c87c430>), (0.00029532303936414265, <function O_stg at 0x7d135c87c0d0>)]
0.29
