In [1]:
from src.core import *
from src import DSL
import inspect
import typing

import asyncio

from poke_env.player.random_player import RandomPlayer
from poke_env.player.utils import cross_evaluate
from poke_env.player_configuration import PlayerConfiguration
from poke_env.server_configuration import LocalhostServerConfiguration
from poke_env.utils import to_id_str
from tabulate import tabulate

import trueskill
import copy
from trueskill import Rating, rate_1vs1, quality_1vs1
from tqdm.notebook import tqdm
import itertools
import numpy as np
import pandas as pd
from pprint import pprint


import logging
# from aiologger.handlers.files import AsyncFileHandler
# from aiologger.handlers.files import AsyncFileHandler
from tempfile import NamedTemporaryFile

from pathlib import Path
import pickle
import dill
import gc
import math
import datetime
import sys, traceback

In [2]:
class PlayerWrapper(Player):
    def __init__(self, *args, **kwargs):
        super().__init__(
            player_configuration=PlayerConfiguration(get_node_id(), None),
            battle_format="gen7randombattle",
            server_configuration=LocalhostServerConfiguration,
            max_concurrent_battles=100000000,
        )
        self.script = None
        
#         self.log_file = NamedTemporaryFile()
#         self.patch_logger()
    
#     def patch_logger(self):
#         self.logger.removeHandler(self.logger.handlers[0])
#         self.logger.addHandler(logging.FileHandler(self.log_file.name))
#         self.logger.
        
    def choose_move(self, battle):
        try:
            move = self.script.choose_move(battle)
            return self.create_order(move)
        except Exception as e:
            print(e)
            traceback.print_exc(file=sys.stdout)
            print(self.script.raw_script)
            return self.choose_random_move(battle)

In [3]:
chunk_size = 1024
shared_players = [PlayerWrapper() for _ in range(chunk_size)]
player_lookup = {player.username: player for player in shared_players}

In [4]:
async def evaluate_population(population, num_games, verbose=True):
    for script in population:
        script.rating = Rating()
    
    num_players = min(chunk_size, len(population))
    players = shared_players[:num_players]
    num_chunks = int(math.ceil(len(population) / chunk_size))
    chunks = [population[i * chunk_size : (i + 1) * chunk_size] for i in range(num_chunks)]
    
    for chunk_idx, population in enumerate(chunks):
        if num_chunks > 1:
            print(f"chunk {chunk_idx + 1} / {num_chunks}")
        
        for i, script in enumerate(population):
            players[i].script = script
        
        for _ in tqdm(range(num_games), disable=not verbose):
            random.shuffle(players)
            awaitables = []
            for p1, p2 in zip(players[0::2], players[1::2]):
                send = p1.send_challenges(
                    opponent=to_id_str(p2.username),
                    n_challenges=1,
                    to_wait=p2.logged_in,
                )
                accept = p2.accept_challenges(
                    opponent=to_id_str(p1.username),
                    n_challenges=1,
                )
                awaitables.append(send)
                awaitables.append(accept)
            await asyncio.gather(*awaitables)

        battles = sum((list(player.battles.values()) for player in players), [])
        
        for battle in battles:
            player = player_lookup[battle.player_username]
            oppo = player_lookup[battle._opponent_username]

            if battle.won:
                winner, loser = player, oppo
            else:
                winner, loser = oppo, player
            winner.script.rating, loser.script.rating = rate_1vs1(winner.script.rating, loser.script.rating)
        
        for player in players:
            player.reset_battles()

    del players
    gc.collect()

In [5]:
def sort_population(population):
    return sorted(population, key=lambda p: p.rating.mu, reverse=True)

In [6]:
def get_elites(population, num_elites):
    return sort_population(population)[:num_elites]

In [7]:
def get_tournament_elites(population, tournament_size, num_elites):
    tournament = random.sample(population, tournament_size)
    return get_elites(tournament, num_elites)

In [8]:
def mutate(tree, filter_=None):
    if not filter_:
        filter_ = lambda x: type(x) is not str
    tree = copy.deepcopy(tree)
    candidates = anytree.search.findall(tree, filter_=filter_)
    if candidates:
        target = random.choice(candidates)
        target.children = []
        generate_tree(target)
    return tree

In [9]:
def crossover(left, right):
    def _is_valid_subtree_head(node):
#         return node.name in [RULE.IF_BLOCK]
        return True

    def _find_head_types(root):
        return [node.name for node in anytree.search.findall(root, filter_=_is_valid_subtree_head)]

    def _select_type_node(root, type_):
        candidates = anytree.search.findall(root, filter_=lambda node: node.name == type_)
        return random.choice(candidates)
    
    def _swap_children(left, right):
        left.children, right.children = right.children, left.children
        
    left = copy.deepcopy(left)
    right = copy.deepcopy(right)

    left_head_types = _find_head_types(left)
    right_head_types = _find_head_types(right)

    type_intersection = list(set(left_head_types) & set(right_head_types))
    if type_intersection:
        type_to_swap = random.choice(type_intersection)
        left_head = _select_type_node(left, type_to_swap)
        right_head = _select_type_node(right, type_to_swap)
        _swap_children(left_head, right_head)
    return left, right

In [10]:
def get_random_population(population_size):
    return [exec_tree(get_random_tree()) for _ in range(population_size)]

In [11]:
# def get_population_stats(population):
#     ratings = pd.Series([player.rating.mu for player in population])
#     return ratings.describe()

In [12]:
def save_population(population, fname):
    path = Path('temp_scripts').joinpath(fname)
    with open(path, 'wb') as f:
        dill.dump([player.tree for player in population], f)
            
def load_population(fname):
    path = Path('temp_scripts').joinpath(fname)
    with open(path, 'rb') as f:
        return [exec_tree(tree) for tree in dill.load(f)]

In [13]:
def save_stats(stats, fname):
    path = Path('temp_stats').joinpath(fname)
    with open(path, 'wb') as f:
        dill.dump(stats, f)
            
def load_stats(fname):
    path = Path('temp_stats').joinpath(fname)
    with open(path, 'rb') as f:
        return dill.load(f)

# Evolve

In [14]:
def get_fname(
    num_elites,
    tournament_size,
    population_cap,
    epochs,
    num_games,
    index
):
    return f"e_{num_elites}_t_{tournament_size}_k_{population_cap}_g_{epochs}_n_{num_games}_{index}.dill"

In [15]:
async def run(
    num_elites,
    tournament_size,
    population_cap,
    epochs,
    num_games
):
    population = get_random_population(population_size=population_cap)
    generations = []
    for i in tqdm(range(epochs)):
        fname = get_fname(num_elites, tournament_size, population_cap, epochs, num_games, i)
        save_population(population, fname)
        await evaluate_population(population, num_games=num_games, verbose=False)
        generations.append([copy.deepcopy(script) for script in population])
        next_population = []
        next_population.extend(get_elites(population, num_elites))
        while len(next_population) < population_cap:
            left, right = get_tournament_elites(population, tournament_size, 2)
            children = crossover(left.tree, right.tree)
            for child_tree in children:
                child_tree = mutate(child_tree)
                next_population.append(exec_tree(child_tree))
        population = next_population[:population_cap]
        gc.collect()

In [21]:
epochs = 30
tournament_size = 5

parameter_sets = [
#     (4, 32, 5),
    
#     (2, 32, 5),
#     (4, 32, 5),
    
#     (4, 24, 5),
    
#     (2, 32, 5),
    (4, 32, 5),
    (8, 32, 5),
#     (2, 64, 5),
    
]

In [22]:
for num_elites, population_cap, num_games in parameter_sets:
    await run(
        num_elites,
        tournament_size,
        population_cap,
        epochs,
        num_games
    )

HBox(children=(FloatProgress(value=0.0, max=30.0), HTML(value='')))




HBox(children=(FloatProgress(value=0.0, max=30.0), HTML(value='')))




# Evaluate

In [None]:
for num_elites, population_cap, num_games in parameter_sets:
    generations = []
    for i in range(epochs):
        fname = get_fname(num_elites, tournament_size, population_cap, epochs, num_games, i)
        generations.append(load_population(fname))
    all_scripts = sum(generations, [])
    print(len(all_scripts))
    
    await evaluate_population(all_scripts, num_games=6, verbose=True)

    script_to_gen = {}
    for i, gen in enumerate(generations):
        for script in gen:
            script_to_gen[script] = i

    gen_ratings = [[] for _ in range(len(generations))]
    for script in all_scripts:
        gen_idx = script_to_gen[script]
        gen_ratings[gen_idx].append(script.rating.mu)
    gen_ratings = np.asarray(gen_ratings)
    
    best_scripts = sort_population(all_scripts)[:10]

    xs = np.arange(len(gen_ratings))
    ys_high = np.percentile(gen_ratings, 75, axis=-1)
    ys_median = np.median(gen_ratings, axis=-1)
    ys_low = np.percentile(gen_ratings, 25, axis=-1)
    stats = (xs, ys_high, ys_median, ys_low)
    
    fname = get_fname(num_elites, tournament_size, population_cap, epochs, num_games, 'stats')
    
    save_stats(stats, fname)
    save_population(best_scripts, fname)

960


HBox(children=(FloatProgress(value=0.0, max=6.0), HTML(value='')))

In [20]:
from bokeh.plotting import figure, output_file, show, reset_output, output_notebook

reset_output()
output_notebook()

for num_elites, population_cap, num_games in parameter_sets:
    fname = get_fname(num_elites, tournament_size, population_cap, epochs, num_games, 'stats')
    print(fname)
    
    xs, ys_high, ys_median, ys_low = load_stats(fname)
    p = figure(plot_width=800, plot_height=400, y_range=(15, 40))
    p.line(xs, ys_high, line_width=2, alpha=0.3)
    p.line(xs, ys_median, line_width=2)
    p.line(xs, ys_low, line_width=2, alpha=0.3)

    show(p)
    
    best_scripts = load_population(fname)
    for script in best_scripts[:3]:
        print(script.raw_script)

e_2_t_5_k_32_g_30_n_5_stats.dill



class Script_4dddcfdac3ac41658761b5ff322ee562(Script):
    def choose_move(self, battle: Battle):
    
        available_moves = battle.available_moves
        if not available_moves:
            return random.choice(battle.available_switches)
            
        dsl = DSL(battle)
        move_scores = []
        for move in battle.available_moves:
            score = 0
            if ((dsl.player_has_status_effect(Status.TOX) or dsl.move_is_status(move))):
                score += -8
            if (dsl.move_is_physical(move)):
                if (((dsl.move_accuracy(move) >= 50) or (dsl.move_accuracy(move) >= 90))):
                    score += 3
                if (dsl.check_move_sds_if_hits_opp(move)):
                    score += 6
                if ((dsl.check_move_rainy(move) or dsl.opp_has_status_effect(Status.SLP))):
                    score += -1
            if ((dsl.gets_stab(move) and (dsl.move_base_power(move) <= 233))):
                score += 4
            if ((dsl.


class Script_3be2859ec419469e814da3ab69bdb68f(Script):
    def choose_move(self, battle: Battle):
    
        available_moves = battle.available_moves
        if not available_moves:
            return random.choice(battle.available_switches)
            
        dsl = DSL(battle)
        move_scores = []
        for move in battle.available_moves:
            score = 0
            if (((dsl.opponent_battle_stat_modifier('def') <= 1) or (dsl.player_base_stat('def') >= 53))):
                score += 4
            if ((dsl.gets_stab(move) or dsl.check_move_rainy(move))):
                score += 3
            move_scores.append(score)
            
        best_move = available_moves[move_scores.index(max(move_scores))]
        return best_move


class Script_9a6eadeeaa1544e182727c593fbf95a3(Script):
    def choose_move(self, battle: Battle):
    
        available_moves = battle.available_moves
        if not available_moves:
            return random.choice(battle.available_switches)