In [1]:
# USE NUMPY 1.21
#!pip list

In [2]:
#!pip uninstall --yes tensorflow
#!pip uninstall --yes keras
#!pip uninstall --yes keras-rl
#!pip uninstall --yes keras-rl2
#!pip install tensorflow==2.11.0
#!pip install keras-rl2==1.0.4
#!pip install --upgrade pip

In [40]:
from poke_env.environment.abstract_battle import AbstractBattle
from poke_env.player import (
    background_evaluate_player,
    background_cross_evaluate,
    Gen8EnvSinglePlayer,
    RandomPlayer,
    MaxBasePowerPlayer,
    ObservationType,
)  
from poke_env.player.baselines import SimpleHeuristicsPlayer
from poke_env import PlayerConfiguration
from poke_env import ServerConfiguration
from poke_env.player_configuration import PlayerConfiguration
from poke_env.player.env_player import Gen8EnvSinglePlayer
from poke_env.player.random_player import RandomPlayer
from poke_env.player.player import Player
from poke_env.server_configuration import LocalhostServerConfiguration

myServerConfig = ServerConfiguration("my.custom.host:5432", "authentication-endpoint.com/action.php?")

myPlayerConfig = PlayerConfiguration("cr0nch", None)

In [46]:
import asyncio
import numpy as np
import tensorflow as tf
import nest_asyncio
nest_asyncio.apply()
from gym.spaces import Space, Box
from gym.utils.env_checker import check_env
from rl.agents.dqn import DQNAgent
from rl.memory import SequentialMemory
from rl.policy import LinearAnnealedPolicy, EpsGreedyQPolicy
from tensorflow.keras.layers import Dense, Flatten
from tensorflow.keras.models import Sequential
from tensorflow.keras.optimizers import Adam
import time

# myServerConfig = ServerConfiguration("my.custom.host:5432", "authentication-endpoint.com/action.php?")

# myPlayerConfig = PlayerConfiguration("cr0nch", None)

In [47]:
class SimpleRLPlayer(Gen8EnvSinglePlayer):
    # Reward function
    def calc_reward(self, last_battle, current_battle) -> float:
        return self.reward_computing_helper(
            current_battle, fainted_value = 2.0, hp_value = 1.0, victory_value = 30.0
        )
    
    # Damage multiplier and set up battle
    
    def embed_battle(self, battle: AbstractBattle)-> ObservationType:
        moves_base_power = -np.ones(4)
        multiplier = np.ones(4)
        # Rescale for easier learning
        for i, move in enumerate(battle.available_moves):
            moves_base_power[i] = (move.base_power / 100) 
            if move.type:
                multiplier[i] = move.type.damage_multiplier(
                    battle.opponent_active_pokemon.type_1,
                    battle.opponent_active_pokemon.type_2,
                )
        fainted_mon_team = len([mon for mon in battle.team.values() if mon.fainted])/6
        fainted_mon_opponent = (len([mon for mon in battle.opponent_team.values() if mon.fainted])/6)
        
        final_vector = np.concatenate(
            [
                moves_base_power,
                multiplier,
                [fainted_mon_team, fainted_mon_opponent],
            ]
        )
        return np.float32(final_vector)
    
    def describe_embedding(self) -> Space:
        low = [-1,-1,-1,-1,0,0,0,0,0,0]
        high = [3,3,3,3,4,4,4,4,1,1]
        return Box(
            np.array(low, dtype = np.float32),
            np.array(high, dtype = np.float32),
            dtype = np.float32,
        )

In [48]:
class MaxDamagePlayer(RandomPlayer):
    def choose_move(self, battle):
        if battle.available_moves:
            best_move = max(battle.available_moves, key = lambda move: move.base_power)
            return self.create_order(best_move)
        else:
            return self.choose_random_move(battle)

In [49]:
NB_TRAINING_STEPS = 1
NB_EVALUATION_EPISODES = 1

tf.random.set_seed(0)
np.random.seed(0)

# Train DQN

def dqn_training(player, dqn, nb_steps):
    dqn.fit(player, nb_steps = nb_steps)
    player.complete_current_battle()
    
def dqn_evaluation(player, dqn, nb_episodes):
    player.reset_battles()
    dqn.test(player, nb_episodes = nb_episodes, visualize = False, verbose = False)
    
    print(
        "DQN Evaluation: %d victories out of %d episodes" % (player.n_won_battles, nb_episodes)
    )

async def final_tests():
    await emb_player.send_challenges('cr0nch', 100)

In [51]:
# Main function 
async def main():
    
    #test_env = SimpleRLPlayer(battle_format = "gen9randombattle", opponent = opponent, start_challenging = True)
    #check_env(test_env)
    #test_env.close()
    
    # Create one environment for training and one for evaluation
    start = time.time()
    
    env_player = SimpleRLPlayer(
        player_configuration = PlayerConfiguration("RL Player", None),
        battle_format = "gen8randombattle",
        server_configuration = LocalhostServerConfiguration,
    )
    
    opponent = RandomPlayer(
        player_configuration = PlayerConfiguration("RandomPlayer", None),
        battle_format = "gen8randombattle",
        server_configuration = LocalhostServerConfiguration,
        )
    
    second_opponent = MaxDamagePlayer(
        player_configuration = PlayerConfiguration("Max damage player", None),
        battle_format = "gen8randombattle",
        server_configuration = LocalhostServerCofiguration,
    )
    
    #test_env = SimpleRLPlayer(battle_format = "gen8randombattle")
  # Dimensions 
    n_action = len(env_player.action_space)
    input_shape = (1,10) # + train_env.observation_space.shape
    
    # Create Model
    
    model = Sequential()
    model.add(Dense(128, activation = "elu", input_shape = input_shape))
    
    # Our embedding have shape (1, 10), which affects our hidden layer
    # dimension and output dimension
    # Flattening resolve potential issues that would arise otherwis
    
    model.add(Flatten())
    model.add(Dense(64, activation = "elu"))
    model.add(Dense(n_action, activation = "linear"))
    
    # Define DQN
    memory = SequentialMemory(limit = 10000, window_length = 1)
    
    policy = LinearAnnealedPolicy(
        EpsGreedyQPolicy(),
        attr = "eps",
        value_max = 1.0,
        value_min = 0.05,
        value_test = 0.0,
        nb_steps = 10000,
    )
    
    dqn = DQNAgent(
        model = model,
        nb_actions = len(env_player.action_space),
        policy = policy,
        memory = memory,
        nb_steps_warmup = 1000,
        gamma = 0.5,
        target_model_update = 1,
        delta_clip = 0.01,
        enable_double_dqn = True,
    )
    
    dqn.compile(Adam(learning_rate = 0.0025), metrics = ["mae"])
    
    
    class EmbeddedRLPlayer(Player):
        def choose_move(self, battle):
            if np.random.rand() < 0.01:
                return self.choose_random_move(battle)
            embedding = SimpleRLPlayer.embed_battle(self, battle)
            action = dqn.forward(embedding)
            return SimpleRLPlayer._action_to_move(self, action, battle)
    
    emb_player = EmbeddedRLPlayer(
        env_algorithm = PlayerConfiguration ("Embedded RL Player", None),
        battle_format = "gen8randombattle",
        server_configuration = LocalhostServerConfiguration,
    ) 
    
    env_player.play_against(
        env_algorithm = dqn_training,
        opponent = opponent,
        env_algorithm_kwargs = {"dqn": dqn, "nb_steps": NB_TRAINING_STEPS},
    )
    
    model.save("model_%d" % NB_TRAINING_STEPS)
    
    print("Results against random player:")
    env_player.play_against(
        env_algorithm = dqn_evaluation,
        opponent = opponent, 
        env_algorithm_kwargs = {"dqn": dqn, "nb_steps": NB_EVALUATION_STEPS},
    )
    
    print("\nResults against max player:")
    env_player.play_against(
        env_algorithm=dqn_evaluation,
        opponent=second_opponent,
        env_algorithm_kwargs={"dqn": dqn, "nb_episodes": NB_EVALUATION_EPISODES},
    )
    asyncio.run(final_tests())
    #eval_env.close()

TypeError: __init__() missing 1 required positional argument: 'opponent'

In [None]:
#if __name__ == "__main__":
 #   asyncio.get_event_loop().run_until_complete(main())