In [None]:
from evoframe.reward_builders import RewardBuilderGame
from evoframe.population_update_builders import PopulationUpdateBuilderStatic
from evoframe.selector_function import SelectorFunctionFactory
from evoframe import PopulationManager
from evoframe.models import FeedForwardNetwork
from evoframe.models import ActivationFunctions
from evoframe.games import Game
from evoframe import get_agent_wrapper_func
import evoframe.func_with_context as fwc

import numpy as np

import matplotlib.pyplot as plt
%matplotlib inline

In [None]:
import plotly.express as px

In [None]:
import collections

def recursively_default_dict():
    return collections.defaultdict(recursively_default_dict)

In [None]:
def get_distinct_operators(context):
    epochs = range(1, context["num_epochs"] + 1)
    all_operators = [op for epoch in epochs for op in context["epochs"][epoch]["operators"]]
    return list(set(all_operators))

In [None]:
import pandas as pd

In [None]:
def plot_rewards(context):
    #plt.figure(figsize=(16, 10))
    num_epochs = context["num_epochs"]
    pop_size = context["pop_size"]
    epochs = list(range(1, num_epochs + 1))
    # Max-Mean
    xs = epochs
    ys_max = [max(context["epochs"][epoch]["rewards"]) for epoch in epochs]
    ys_mean = [sum(context["epochs"][epoch]["rewards"])/context["pop_size"] for epoch in epochs]
    ys_category = ["max" for epoch in epochs] + ["mean" for epoch in epochs]
    df = pd.DataFrame({"epochs": xs*2, "rewards": ys_max+ys_mean, "category": ys_category})
    fig_line = px.line(df, x="epochs", y="rewards", color="category")
    # Scatter
    xs = [ep + ((np.random.rand() - 0.5) * 0.4) for ep in epochs for i in range(pop_size)] # add small noise
    ys = [r for epoch in epochs for r in context["epochs"][epoch]["rewards"]]
    operators = [op for epoch in epochs for op in context["epochs"][epoch]["operators"]]
    df = pd.DataFrame({"epochs": epochs*pop_size, "epochs_noise": xs, "rewards": ys, "operators": operators})
    fig_scatter = px.scatter(df, x="epochs_noise", y="rewards", color="operators", marginal_y="rug")
    return fig_line, fig_scatter

In [None]:
def get_best_model_of_epoch(context, epoch):
    i = np.array(context["epochs"][epoch]["rewards"]).argmax()
    return context["epochs"][epoch]["models"][i]

# Guess the point

In [None]:
from evoframe.games import GuessPoint

In [None]:
# Define global context
context = recursively_default_dict()

# Game
game_creation_func = lambda context: GuessPoint(np.array([0.2,0.8,0.5]), np.array([0.4, 0.5, 10]))
game_creation_func = fwc.func_with_context(game_creation_func, context=context)

# Model
layer_sizes = [3, 5, 3]
get_model_func = lambda context: FeedForwardNetwork(layer_sizes, ActivationFunctions.get_sigmoid(), ActivationFunctions.get_id())
get_model_func = fwc.func_with_context(get_model_func, context=context)

# Game-Model interface
predict_func = lambda model, inputs: model.predict(inputs)
agent_wrapper_func = get_agent_wrapper_func(predict_func)

# Reward function and update env function
reward_function = RewardBuilderGame() \
.with_game_creation_function(game_creation_func) \
.with_agent_wrapper_func(agent_wrapper_func) \
.with_context(context) \
.get()

# Update population function
get_new_pop_f = PopulationUpdateBuilderStatic() \
.add_operator("es_1_mutation", 1, 0.3, 0.3) \
.add_operator("es_2_crossover", 0.01, 0.8) \
.add_operator("es_1_copy", 0.01) \
.add_operator("es_n_rewards_gradient", 0.01) \
.add_selector_f(SelectorFunctionFactory.get_geometric_selector_function(0.3)) \
.with_context(context) \
.get()

# Population manager
pop_size = 500
pm = PopulationManager(pop_size, get_model_func, reward_function, get_new_pop_f, context)

# Run population manager
num_epochs = 10
pm.run(num_epochs)

In [None]:
fig_line, fig_scatter = plot_rewards(context)

In [None]:
fig_line

In [None]:
fig_scatter

# Guess the function

In [None]:
from evoframe.games import GuessFunction

In [None]:
# Define global context
context = recursively_default_dict()

# Game
game_func = lambda i: np.array([2*i[0]-3*i[1]+4, i[1]-8*i[2]-5])
input_dim = 3
input_domains = [(-1,1),(-1,1),(3,7)]
sample_every = [0.1, 0.1, 0.3]
game_creation_function = lambda context: GuessFunction(game_func, input_dim, input_domains, sample_every)
game_creation_function = fwc.func_with_context(game_creation_function, context=context)

# Model
layer_sizes = [3, 5, 2]
get_model_func = lambda context: FeedForwardNetwork(layer_sizes, ActivationFunctions.get_arctan(), ActivationFunctions.get_id())
get_model_func = fwc.func_with_context(get_model_func, context=context)

# Game-Model interface
predict_func = lambda model, inputs: model.predict(inputs)
agent_wrapper_func = get_agent_wrapper_func(predict_func)

# Reward function and update env function
reward_function = RewardBuilderGame() \
.with_game_creation_function(game_creation_function) \
.with_agent_wrapper_func(agent_wrapper_func) \
.with_context(context) \
.get()

# Update population function
get_new_pop_f = PopulationUpdateBuilderStatic() \
.add_operator("es_1_mutation", 0.2, 0.1, 0.1) \
.add_operator("es_1_mutation", 0.2, 0.1, 0.5) \
.add_operator("es_1_mutation", 0.2, 0.3, 0.3) \
.add_operator("es_1_mutation", 0.2, 0.5, 0.5) \
.add_operator("es_1_mutation", 0.2, 0.5, 0.1) \
.add_operator("es_2_crossover", 0.1, 0.8) \
.add_operator("es_1_copy", 0.1) \
.add_operator("es_n_rewards_gradient", 0.1) \
.add_selector_f(SelectorFunctionFactory.get_geometric_selector_function(0.3)) \
.with_context(context) \
.get()

# Population manager
pop_size = 50
pm = PopulationManager(pop_size, get_model_func, reward_function, get_new_pop_f, context)

# Run population manager
num_epochs = 30
pm.run(num_epochs)

In [None]:
fig_line, fig_scatter = plot_rewards(context)

In [None]:
fig_line

In [None]:
fig_scatter

In [None]:
test_array = np.array([0.5,0.5,5])
context["epochs"][num_epochs]["models"][0].predict(test_array), game_func(test_array)

In [None]:
from __future__ import print_function
from ipywidgets import interact, interactive, fixed, interact_manual
import ipywidgets as widgets

In [None]:
def show_best_fnn_weights(context, epoch):
    best_model = context["epochs"][epoch]["models"][0]
    num_cols = len(best_model.weights)
    num_rows = 2
    plt.figure(figsize=(15,10))
    for i,layer in enumerate(best_model.weights):
        index = i+1
        plt.subplot(num_rows, num_cols, index)
        plt.imshow(layer, cmap='hot', interpolation='nearest')
    for i,layer in enumerate(best_model.biases):
        index = num_cols+i+1
        plt.subplot(num_rows, num_cols, index)
        plt.imshow(layer.reshape(layer.shape[0], 1), cmap='hot', interpolation='nearest')
    plt.show()
    
interact(show_best_fnn_weights, context=fixed(context), epoch=1)

# Play Tris, 9 inputs

In [None]:
from evoframe.reward_builders.reward_builder_game import TournamentMode

In [None]:
class Tris(Game):
    """Player1 starts. Rewards of both players are returned."""
    PLAYER_1 = 1
    PLAYER_2 = -1
    EMPTY = 0
    DRAW = 0
    CONTINUE = 2
    PLAYERS = [PLAYER_1, PLAYER_2]
    
    def __init__(self):
        self.board = np.array([np.array([self.EMPTY for i in range(3)]) for j in range(3)])
        
    def check_win(self):
        # check rows
        board = self.board
        for row in board:
            for player in self.PLAYERS:
                if np.all(np.equal(row, np.full(3, player))):
                    return player
        
        # check cols
        board = self.board.transpose()
        for row in board:
            for player in self.PLAYERS:
                if np.all(np.equal(row, np.full(3, player))):
                    return player
                
        # check diagonals
        diags = []
        diags.append(np.array([board[i][i] for i in range(3)]))
        diags.append(np.array([board[i][3 - i - 1] for i in range(3)]))
        for row in diags:
            for player in self.PLAYERS:
                if np.all(np.equal(row, np.full(3, player))):
                    return player
        
        # check draw
        exist_empty = False
        for row in self.board:
            for cell in row:
                if cell == self.EMPTY:
                    exist_empty = True
        if not exist_empty:
            return self.DRAW
        
        return self.CONTINUE
    
    def extract_move(self, prediction):
        highest_value = -100000
        highest_value_index = -1
        for i,pred in enumerate(prediction):
            if self.board[i//3][i%3] == self.EMPTY and pred > highest_value:
                highest_value = pred
                highest_value_index = i
        return highest_value_index
    
    def do_move(self, move, player):
        self.board[move//3][move%3] = player
        
    def opposite_board(self):
        return np.array([np.array([self.PLAYER_2 if self.board[row][col] == self.PLAYER_1
                                   else self.PLAYER_1 if self.board[row][col] == self.PLAYER_2
                                   else self.EMPTY for col in range(3)]) for row in range(3)])
    
    def play(self, agent_1, agent_2, interactive=False):
        player_turn = self.PLAYER_1
        
        if interactive:
            self.print_board() 
        
        result = self.check_win()
        while result == self.CONTINUE:
            if player_turn == self.PLAYER_1:
                prediction = agent_1.predict(self.board)
            else:
                prediction = agent_2.predict(self.opposite_board())
              
            move = self.extract_move(prediction)
            self.do_move(move, player_turn)
            
            if interactive:
                self.print_board()
            
            if player_turn == self.PLAYER_1:
                player_turn = self.PLAYER_2
            else:
                player_turn = self.PLAYER_1
                
            result = self.check_win()
            
        if interactive:
                self.print_board()
                
        opponent_result = self.PLAYER_2 if result == self.PLAYER_1 else self.PLAYER_1 if result == self.PLAYER_2 else self.DRAW
        return result, opponent_result
    
    def print_board(self):
        for row in self.board:
            for cell in row:
                print(cell, end=" ")
            print("")
        print("-"*30)

In [None]:
# Define global context
context = recursively_default_dict()

# Game
game_creation_function = lambda context: Tris()
game_creation_function = fwc.func_with_context(game_creation_function, context=context)

# Model
layer_sizes = [9, 5, 5, 9]
get_model_func = lambda context: FeedForwardNetwork(layer_sizes, ActivationFunctions.get_sigmoid(), ActivationFunctions.get_sigmoid())
get_model_func = fwc.func_with_context(get_model_func, context=context)

# Game-Model interface
def predict_func(model, inputs):
    # In Tris, 'inputs' is a 3x3 np.array, thus we must flatten it to a 9x1 np.array
    # output is a 9x1 vector, where the cell with highest value corresponding
    # to a valid move will be the chosen move
    return model.predict(inputs.flatten())[0]
agent_wrapper_func = get_agent_wrapper_func(predict_func)

# Reward function and update env function
# Since Tris is a two-players game, we can compute the reward with a tournament against the current generation
reward_function = RewardBuilderGame() \
.with_game_creation_function(game_creation_function) \
.with_agent_wrapper_func(agent_wrapper_func) \
.with_competitive_tournament(TournamentMode.VS_BEST_OF_EACH_GEN) \
.with_keep_only(20) \
.with_context(context) \
.get()

# Update population function
get_new_pop_f = PopulationUpdateBuilderStatic() \
.add_operator("es_1_mutation", 0.2, 0.1, 0.1) \
.add_operator("es_1_mutation", 0.2, 0.1, 0.5) \
.add_operator("es_1_mutation", 0.2, 0.3, 0.3) \
.add_operator("es_1_mutation", 0.2, 0.5, 0.5) \
.add_operator("es_1_mutation", 0.2, 0.5, 0.1) \
.add_operator("es_2_crossover", 0.1, 0.8) \
.add_operator("es_1_copy", 0.1) \
.add_operator("es_n_rewards_gradient", 0.1) \
.add_selector_f(SelectorFunctionFactory.get_geometric_selector_function(0.3)) \
.with_context(context) \
.get()

# Population manager
pop_size = 50
pm = PopulationManager(pop_size, get_model_func, reward_function, get_new_pop_f, context)

# Run population manager
num_epochs = 300
pm.run(num_epochs)

In [None]:
fig_line, fig_scatter = plot_rewards(context)

In [None]:
fig_line

In [None]:
fig_scatter

In [None]:
class AgentHuman:
    def __init__(self):
        pass
    
    def predict(self, board):
        move = int(input("Select move: "))
        prediction = [1 if i == move else 0 for i in range(9)]
        return prediction

In [None]:
game = Tris()
game.play(AgentHuman(), agent_wrapper_func(get_best_model_of_epoch(context, num_epochs)), interactive=True)

# Play Tris, 18 inputs

In [None]:
# Game
game_creation_function = lambda: Tris()

# Model
layer_sizes = [18, 9, 9]
get_model_func = lambda: FeedForwardNetwork(layer_sizes, ActivationFunctions.get_sigmoid(), ActivationFunctions.get_sigmoid())

# Game-Model interface
def predict_func(model, inputs):
    # In Tris, 'inputs' is a 3x3 np.array
    # output is a 9x1 vector, where the cell with highest value corresponding
    # to a valid move will be the chosen move
    board = inputs
    inputs = []
    for row in board:
        for cell in row:
            if cell == Tris.PLAYER_1:
                inputs += [1, 0]
            elif cell == Tris.PLAYER_2:
                inputs += [0, 1]
            else:
                inputs += [0, 0]
    return model.predict(np.array(inputs))[0]
agent_wrapper_func = get_agent_wrapper_func(predict_func)

# Reward function and update env function
# Since Tris is a two-players game, we can compute the reward with a tournament against the current generation
reward_function, update_env_f = RewardBuilderGame() \
.with_game_creation_function(game_creation_function) \
.with_agent_wrapper_func(agent_wrapper_func) \
.with_competitive_tournament(TournamentMode.VS_BEST_OF_EACH_GEN) \
.with_keep_only(30) \
.get()

# Update population function
get_new_pop_f = PopulationUpdateBuilderStatic() \
.add_operator("es_1_mutation", 0.6, 0.3, 0.5) \
.add_operator("es_2_crossover", 0.3, 0.1) \
.add_operator("es_1_copy", 0.1) \
.add_selector_f(SelectorFunctionFactory.get_geometric_selector_function(0.3)) \
.get()

# Population manager
pop_size = 100
pm = PopulationManager(pop_size, get_model_func, reward_function, update_env_f, get_new_pop_f)

# Run population manager
num_epochs = 300
last_pop, env = pm.run(num_epochs)

In [None]:
plot_rewards(env)

In [None]:
class AgentHuman:
    def __init__(self):
        pass
    
    def predict(self, board):
        move = int(input("Select move: "))
        prediction = [1 if i == move else 0 for i in range(9)]
        return prediction

In [None]:
game = Tris()
game.play(AgentHuman(), agent_wrapper_func(last_pop[0]), interactive=True)