In [1]:
from time import time
import numpy as np
import logging
from pprint import pprint, pformat
import random
from collections import namedtuple
from copy import deepcopy
from multiprocessing import Pool
from matplotlib import pyplot as plt
from tqdm.autonotebook import tqdm

Nimply = namedtuple("Nimply", "row, num_objects")

  from tqdm.autonotebook import tqdm


In [2]:
class Nim:
    def __init__(self, num_rows: int, k: int = None) -> None:
        self._rows = [i * 2 + 1 for i in range(num_rows)]
        self._k = k

    def __bool__(self):
        return sum(self._rows) > 0

    def __str__(self):
        return "<" + " ".join(str(_) for _ in self._rows) + ">"

    @property
    def rows(self) -> tuple:
        return tuple(self._rows)

    def nimming(self, ply: Nimply) -> None:
        row, num_objects = ply
        assert self._rows[row] >= num_objects
        assert self._k is None or num_objects <= self._k
        self._rows[row] -= num_objects

In [3]:
def pure_random(state: Nim) -> Nimply:
    """A completely random move"""
    row = random.choice([r for r, c in enumerate(state.rows) if c > 0])
    num_objects = random.randint(1, state.rows[row])
    return Nimply(row, num_objects)


def gabriele(state: Nim) -> Nimply:
    """Pick always the maximum possible number of the lowest row"""
    possible_moves = [(r, o) for r, c in enumerate(state.rows) for o in range(1, c + 1)]
    return Nimply(*max(possible_moves, key=lambda m: (-m[0], m[1])))

In [4]:
def nim_sum(state: Nim) -> int:
    tmp = np.array([tuple(int(x) for x in f"{c:032b}") for c in state.rows])

    xor = tmp.sum(axis=0) % 2
    return int("".join(str(_) for _ in xor), base=2)


def analize(raw: Nim) -> dict:
    candidate = dict()
    for ply in (Nimply(r, o) for r, c in enumerate(raw.rows) for o in range(1, c + 1)):
        tmp = deepcopy(raw)
        tmp.nimming(ply)
        candidate[ply] = nim_sum(tmp)
    return candidate


def optimal(state: Nim) -> Nimply:
    analysis = analize(state)
    logging.debug(f"analysis:\n{pformat(analysis)}")
    spicy_moves = [ply for ply, ns in analysis.items() if ns != 0]
    if not spicy_moves:
        spicy_moves = list(analysis.keys())
    ply = random.choice(spicy_moves)
    return ply


def new_optimal(state: Nim) -> Nimply:
    analysis = analize(state)
    logging.debug(f"analysis:\n{pformat(analysis)}")
    spicy_moves = [ply for ply, ns in analysis.items() if ns == 1]
    if not spicy_moves:
        spicy_moves = list(analysis.keys())
    ply = random.choice(spicy_moves)
    return ply

In [5]:
def nim_game(num_rows: int, strategy1, strategy2) -> bool:
    """
    Simulate a game against the optimal strategie and returns True if your strategy 1 won
    """
    strategies = [strategy1, strategy2]
    nim = Nim(num_rows)
    player = 0
    while nim:
        move = strategies[player](nim)
        nim.nimming(move)
        player = 1 - player

    return player == 0


def simulate_games(num_games: int, num_rows: int, strategy1, strategy2) -> float:
    wins = 0
    for _ in range(num_games):
        if nim_game(num_rows, strategy1, strategy2):
            wins += 1
        if not nim_game(num_rows, strategy2, strategy1):
            wins += 1

    return wins / (2 * num_games)


def simulate_games_all(num_games: int, num_rows: int, strategy1) -> float:
    wins = 0
    for _ in range(num_games):
        for j in range(3):
            if nim_game(num_rows, strategy1, pure_random):
                wins += 1
            if not nim_game(num_rows, pure_random, strategy1):
                wins += 1
        if nim_game(num_rows, strategy1, optimal):
            wins += 1
        if not nim_game(num_rows, optimal, strategy1):
            wins += 1
        if nim_game(num_rows, strategy1, new_optimal):
            wins += 1
        if not nim_game(num_rows, new_optimal, strategy1):
            wins += 1

    return wins / (10 * num_games)

In [6]:
def policy(args):
    def strategy(nim: Nim):
        n_possible_rows = 0
        # n_possible_rows = sum([1 for i in nim._rows if i > 0]) - 1
        analysis = analize(nim)
        population = list()
        weights = list()
        for ply, ns in analysis.items():
            population.append(ply)
            weights.append(args[ns + n_possible_rows])

        # ply = random.choices(population=population, weights=weights, k=1)[0]
        return population[np.argmax(weights)]

    return strategy


def fitness1(n_rows: int, n_games: int, strategy2):
    def fitness_func(args) -> float:
        strategy1 = policy(args)
        return simulate_games(n_games, n_rows, strategy1, strategy2)

    return fitness_func


def fitness2(n_rows: int, n_games: int):
    def fitness_func(args) -> float:
        strategy1 = policy(args)
        return simulate_games_all(n_games, n_rows, strategy1)

    return fitness_func

In [7]:
def ES_1_plus_lambda(_function, N_DIM, n_offspring=1_000):
    λ = 25
    σ = 0.1

    solution = np.random.normal(loc=0, scale=σ, size=N_DIM)
    history = list()
    best_so_far = np.copy(solution)
    best_eval = _function(solution)

    stats = [0, 0]
    for step in tqdm(range(n_offspring // λ)):
        samples = np.random.normal(loc=0, scale=σ, size=(λ, N_DIM)) + solution
        samples_evals = np.array([_function(samples[i]) for i in range(λ)])

        stats[0] += λ
        stats[1] += sum(samples_evals > best_eval)

        best_offspring = np.argmax(samples_evals)
        solution = samples[best_offspring]

        if best_eval < samples_evals[best_offspring]:
            best_eval = samples_evals[best_offspring]
            best_so_far = np.copy(solution) / best_eval
            history.append((step, samples_evals[best_offspring]))

        if (step + 1) % 10 == 0:
            if stats[1] == 0 or stats[0] / stats[1] > 1 / 5:
                σ *= 1.1
            elif stats[0] / stats[1] < 1 / 5:
                σ *= 0.9
            stats = [0, 0]
    print(σ)
    return best_so_far, history

In [8]:
def ES_mu_plus_lambda(_function, N_DIM, n_offspring=1_000):
    μ = 10
    λ = 25
    σ = 0.01

    population = np.random.normal(loc=0, scale=σ, size=(µ, N_DIM + 1))

    population[:, -1] = σ

    best_fitness = None
    history = list()
    for step in tqdm(range(n_offspring // λ)):
        # offspring <- select λ random points from the population of μ
        offspring = population[np.random.randint(0, μ, size=(λ,))]
        # mutate all σ (last column) and replace negative values with a small number
        offspring[:, -1] = np.random.normal(loc=offspring[:, -1], scale=0.01)
        offspring[offspring[:, -1] < 1e-5, -1] = 1e-5
        # mutate all v (all columns but the last), using the σ in the last column
        offspring[:, 0:-1] = np.random.normal(
            loc=offspring[:, 0:-1], scale=offspring[:, -1].reshape(-1, 1)
        )

        fitness = np.array([_function(offspring[i, 0:-1]) for i in range(λ)])
        offspring = offspring[(-fitness).argsort()]
        # save best (just for the plot)
        if best_fitness is None or best_fitness < np.max(fitness):
            best_fitness = np.max(fitness)
            history.append((step, best_fitness))
        # select the μ with max fitness and discard fitness
        population = np.copy(offspring[:µ])
        population[:, 0:-1] = population[:, 0:-1] * (1 / best_fitness)

    return population[0, 0:-1], history

In [9]:
N_ROWS = 5
N_GAMES = 10
ARG_DIM = 2 ** ((2 * N_ROWS - 1).bit_length())
best_solution1, history1 = ES_1_plus_lambda(
    fitness2(N_ROWS, N_GAMES), ARG_DIM, n_offspring=200
)

best_solution2, history2 = ES_mu_plus_lambda(
    fitness2(N_ROWS, N_GAMES), ARG_DIM, n_offspring=200
)

100%|██████████| 8/8 [00:45<00:00,  5.72s/it]


0.1


100%|██████████| 8/8 [00:46<00:00,  5.78s/it]


In [14]:
print(best_solution1)
print(best_solution2)

[-0.18136772  0.14256158 -0.46158634 -0.72226175 -0.49728055 -0.22249477
 -0.13322381 -0.01904861  0.01518551  0.06614646  0.23081439 -0.40927297
 -0.18541313  0.09186227  0.06707827 -0.14643235]
[ 0.01625084  0.10242458 -0.16076846 -0.06659419 -0.10794734  0.01639619
  0.04379532 -0.05681257 -0.23602905  0.03750791  0.02522296 -0.0009659
 -0.04148736  0.00532015  0.11084885  0.08715261]


In [16]:
abdel = [
    0.18580199,
    1.17770388,
    0.60390788,
    0.77325736,
    0.01668597,
    0.67694702,
    0.60994287,
    0.09265408,
    0.10901224,
    0.29563519,
    0.99970367,
    0.67504219,
    0.73287389,
    0.09459263,
    0.61431108,
    0.23061273,
]
pprint(np.argsort(best_solution1) % ARG_DIM)
pprint(np.argsort(best_solution2) % ARG_DIM)
pprint(np.argsort(abdel) % ARG_DIM)
print(
    fitness1(
        N_ROWS,
        n_games=50,
        strategy2=policy(best_solution2),
    )(best_solution1)
    * 100,
    "% of wins",
)

print(
    fitness1(
        N_ROWS,
        n_games=50,
        strategy2=policy(abdel),
    )(best_solution1)
    * 100,
    "% of wins",
)

print(
    fitness1(
        N_ROWS,
        n_games=50,
        strategy2=policy(abdel),
    )(best_solution2)
    * 100,
    "% of wins",
)

array([ 3,  4,  2, 11,  5, 12,  0, 15,  6,  7,  8,  9, 14, 13,  1, 10])
array([ 8,  2,  4,  3,  7, 12, 11, 13,  0,  5, 10,  9,  6, 15,  1, 14])
array([ 4,  7, 13,  8,  0, 15,  9,  2,  6, 14, 11,  5, 12,  3, 10,  1])
50.0 % of wins
100.0 % of wins
100.0 % of wins


In [12]:
pprint(history1)
pprint(history2)

[(0, 0.92), (1, 0.99), (3, 1.0)]
[(0, 0.99)]


In [13]:
history = np.array(history)
plt.figure(figsize=(14, 4))
plt.plot(history[:, 0], history[:, 1], marker=".")
plt.show()

NameError: name 'history' is not defined