In [1]:
import logging
from pprint import pprint, pformat
from collections import namedtuple
import random
from copy import deepcopy
import heapq

logging.basicConfig(level=logging.DEBUG)

logging.debug("hello")

DEBUG:root:hello


## The *Nim* and *Nimply* classes

In [2]:
Nimply = namedtuple("Nimply", "row, num_objects")

In [3]:
class Nim:
    def __init__(self, num_rows: int, k: int = None) -> None:
        self._rows = [i * 2 + 1 for i in range(num_rows)]
        self._k = k

    def __bool__(self):
        return sum(self._rows) > 0

    def __str__(self):
        return "<" + " ".join(str(_) for _ in self._rows) + ">"

    @property
    def rows(self) -> tuple:
        return tuple(self._rows)

    @property
    def cooked(self) -> dict:
        '''to generate the possible moves for each game'''
        cooky = dict()
        cooky["possible_moves"] = [Nimply(index, obj) for index, item in enumerate(self._rows) for obj in range(1, item + 1)]
        #[Nimply(row=0, num_objects=1), Nimply(row=1, num_objects=1), Nimply(row=1, num_objects=2), ...
        return cooky

    
    def nimming(self, ply: Nimply) -> None:
        row, num_objects = ply
        assert self._rows[row] >= num_objects, "Number of matches to remove more than what available!"
        assert self._k is None or num_objects <= self._k, f"max number(k) you can remove is {self._k}"
        self._rows[row] -= num_objects

## Sample (and silly) startegies 

In [4]:
def pure_random(state: Nim) -> Nimply:
    """A completely random move"""
    #selects a random (non empty) row
    row = random.choice([r for r, c in enumerate(state.rows) if c > 0]) 
    #a random number from 1 to the # objects in the above row
    num_objects = random.randint(1, state.rows[row])
    # returns the move tuple
    return Nimply(row, num_objects)


def gabriele(state: Nim) -> Nimply:
    """Pick always the maximum possible number of the lowest row"""
    #generating all possible moves
    possible_moves = [(r, o) for r, c in enumerate(state.rows) for o in range(1, c + 1)]
    print(possible_moves)
    # select the move with highest objects from lowest row 
    return Nimply(*max(possible_moves, key=lambda m: (-m[0], m[1])))

def adaptive(state: Nim) -> Nimply:
    """A strategy that can adapt its parameters"""
    genome = {"love_small": 0.5}

import numpy as np


def nim_sum(state: Nim) -> int:
    tmp = np.array([tuple(int(x) for x in f"{c:032b}") for c in state.rows])
    xor = tmp.sum(axis=0) % 2
    return int("".join(str(_) for _ in xor), base=2)


def analize(raw: Nim) -> dict:
    cooked = dict()
    # The key is each possible move, value is nimsum (xor) of game board after playing this move.
    cooked["possible_moves"] = dict()
    # for all plays
    for ply in (Nimply(r, o) for r, c in enumerate(raw.rows) for o in range(1, c + 1)):
        tmp = deepcopy(raw)
        tmp.nimming(ply)
        cooked["possible_moves"][ply] = nim_sum(tmp)
    return cooked


def optimal(state: Nim) -> Nimply:
    '''a random move among those with non-zero nim-sum - for normal play (the one that last taker wins) optimal play is to finish every move with nimsum of zero'''
    analysis = analize(state)
    logging.debug(f"analysis:\n{pformat(analysis)}")
    #take the moves with nimsum not zero
    spicy_moves = [ply for ply, ns in analysis["possible_moves"].items() if ns != 0]
    # if there are no optimal, take the random one
    if not spicy_moves:
        spicy_moves = list(analysis["possible_moves"].keys())
    ply = random.choice(spicy_moves)
    return ply

### The Evolutionary Strategy Class

In [5]:
class Evolution:
    def __init__(self,state: Nim) -> None:

        # mutation step size
        self._sigma = 0.15
        self._gamesize = 5
        self._k = 200
        self._game = deepcopy(state)
        self._lambda = 50
        # create a random inditial individual
        self._individual = self.initialize_individual()
        # set of tupels (individual,fitness)
        self.pool = list()
        self.pool.append((-self.fitness(self._individual),self._individual))


    def initialize_individual(self):
        '''CHECKED creates a dictionary that values are possible moves and values are random values with the length of possible moves '''
        random_values = [random.random() for _ in range(len(self._game.cooked["possible_moves"]))]
        individual = {key: value for key, value in zip(self._game.cooked["possible_moves"], random_values)}
        individual = self.renormalize(individual)
        ####temporary check
        self.check_individual(individual)
        return individual

    def update_individual(self, game, indiv):
        '''update the individual dict after the game board and possible moves have changed'''
        keys_to_remove = [key for key in indiv.keys() if key not in game.cooked["possible_moves"]]
        for key in keys_to_remove:
            del indiv[key]    
        upd_indiv = self.renormalize(indiv)
        return upd_indiv




    def check_individual(self,indiv):
        '''CHECKED check whether _individual is a probablility distribution'''
        tolerance = 0.00001
        #print(f"{indiv} and the sum is {sum(indiv.values())}")
        assert abs(sum(indiv.values()) - 1.0) <= tolerance or sum(indiv.values()) == 0, f"the sum of probabilities of this individual is not 1 - it is not a distribution with tolerance of {tolerance}"
        

    def renormalize(self,indiv):
        '''CHECKED normalizing the indiv dict'''
        total_sum = sum(indiv.values())
        scaling_factor = 1.0 / total_sum if total_sum != 0 else 0
        for key in indiv:
            indiv[key] *= scaling_factor 
        return indiv

    def tweak(self,indiv):
        '''CHECKED'''
        #random select a move among possible moves
        keylist = list(indiv.keys())
        key = np.random.choice(len(keylist),1)
        chosen_key = keylist[key[0]]
        #add/minus a guassian random with sigma variance
        indiv[chosen_key] += np.random.normal(0, self._sigma)
        #make sure changed prob. is not negative
        indiv[chosen_key] = max(indiv[chosen_key], 0)
        #renormalize
        indiv = self.renormalize(indiv)
        self.check_individual(indiv)
        return indiv

    def play(self,game,indiv):
        '''CHECKED select and return a move based on prob. dist. of the individual passed'''
        game_ply = Nimply(0,0)
        # loop until a valid move is made
        while not game_ply.num_objects:
            keylist = list(indiv.keys())
            #print(list(indiv.values()))
            #select a key based on prob. dist. in values
            if sum(indiv.values()) == 0 :
                # logging.info(f"the sum of prob.s are zero.and the length of it {len(indiv)}. playing the first available move")
                return list(indiv.keys())[0] 
            key = np.random.choice(len(keylist),1,p=list(indiv.values()))
            chosen_key = keylist[key[0]]
            #print(chosen_key)
            game_ply = chosen_key
        #print(f"the move by PLAY is: {game_ply}")   
        return game_ply


    def pure_random(self, state: Nim,individual):
        """A completely random move"""
        #selects a random (non empty) row
        row = random.choice([r for r, c in enumerate(state.rows) if c > 0]) 
        #a random number from 1 to the # objects in the above row
        num_objects = random.randint(1, state.rows[row])
        # returns the move tuple
        return Nimply(row, num_objects)

    def optimal(self, state: Nim,individual) -> Nimply:
        '''a random move among those with non-zero nim-sum - for normal play (the one that last taker wins) optimal play is to finish every move with nimsum of zero'''
        analysis = analize(state)
        #logging.debug(f"analysis:\n{pformat(analysis)}")
        #take the moves with nimsum not zero
        spicy_moves = [ply for ply, ns in analysis["possible_moves"].items() if ns != 0]
        # if there are no optimal, take the random one
        if not spicy_moves:
            spicy_moves = list(analysis["possible_moves"].keys())
        ply = random.choice(spicy_moves)
        return ply




    def fitness(self,indiv):
        '''the fitness of an individual is the number of wins in 100 plays against a benchmark strategy'''
        number_of_plays = 100
       
        benchmark_strategy = self.optimal
        opponents = {'0':0,'1':0}
        # Player '0' is the ES agent
        strategy = (self.play,benchmark_strategy)
        wins = 0
        total_moves = 0

        for i in range(number_of_plays):
            local_game = Nim(self._gamesize,self._k)
            local_indiv = deepcopy(indiv)
            moves = 0
            players = [0 if p < (number_of_plays/2) else 1 for p in range(number_of_plays)]
            player = players[i]

            while local_game:
                ply = strategy[player](local_game,local_indiv)
                local_game.nimming(ply)
                #update individual
                local_indiv = self.update_individual(local_game,local_indiv)
                #counting the number of moves
                if player == 0:
                    moves += 1
                
                player = 1 - player
            
            opponents[str(player)]+=1
            if player == 0:
                total_moves += moves
        #print(opponents)
        wins = opponents['0']
        #the average moves the agent made in the games it won
        avg_moves = total_moves / wins
        #print(avg_moves)
        # factor to calibrate fitness 
        x = 0.6
        #return  x * wins + (1-x) * (number_of_plays/avg_moves)
        return wins



    def evolve_one_plus_lambda(self,pool):
        '''to implement the 1 + lambda strategy
        correct the navie logic later!'''
        heapq.heapify(pool)
        keep = []
        
        for i in range(10):

            indiv = pool[0]
            print(indiv)

            # if len(keep) >2: 
            #     keep.sort(key=lambda x:-x[0])
            #     print("inside keep")
            #     #print(keep[-2])
            #     if keep[-2][0] == keep[-1][0]:
            #         rst_indiv = self.initialize_individual()
            #         pool = []
            #         pool.append((-self.fitness(rst_indiv),rst_indiv))



            for i in range(self._lambda):
                new_indiv = self.tweak(indiv[1])
                self.pool.append((-self.fitness(new_indiv),new_indiv))
            top = heapq.nlargest(1, pool, key=lambda x: -x[0])
            pool = []
            pool.append(top[0])
            keep.append(top[0])
    
        print(pool[0][0])

        

logging.basicConfig(level=logging.ERROR)

test = Nim(5,200)
evol = Evolution(test)

evol.evolve_one_plus_lambda(evol.pool)

(-32, {Nimply(row=0, num_objects=1): 0.044737618910709445, Nimply(row=1, num_objects=1): 0.00048792779552303604, Nimply(row=1, num_objects=2): 0.04192172231148807, Nimply(row=1, num_objects=3): 0.06101477448298825, Nimply(row=2, num_objects=1): 0.010661690123805773, Nimply(row=2, num_objects=2): 0.01642462337311521, Nimply(row=2, num_objects=3): 0.059845035464944796, Nimply(row=2, num_objects=4): 0.07141993769793424, Nimply(row=2, num_objects=5): 0.03161070118474643, Nimply(row=3, num_objects=1): 0.077721750041826, Nimply(row=3, num_objects=2): 0.06527973673209811, Nimply(row=3, num_objects=3): 0.005112608983085732, Nimply(row=3, num_objects=4): 0.05551064716485987, Nimply(row=3, num_objects=5): 0.05193349518367576, Nimply(row=3, num_objects=6): 0.07486103887704267, Nimply(row=3, num_objects=7): 0.04158460970459838, Nimply(row=4, num_objects=1): 0.0004428391005783098, Nimply(row=4, num_objects=2): 0.024813272195872916, Nimply(row=4, num_objects=3): 0.07604781741612768, Nimply(row=4, nu

KeyboardInterrupt: 