In [1]:
from poke_env.player.env_player import Gen8EnvSinglePlayer
import numpy as np

class SimpleRLPlayer(Gen8EnvSinglePlayer):
    def embed_battle(self, battle):
        # -1 indicates that the move does not have a base power
        # or is not available
        moves_base_power = -np.ones(4)
        moves_dmg_multiplier = np.ones(4)
        for i, move in enumerate(battle.available_moves):
            moves_base_power[i] = move.base_power / 100 # Simple rescaling to facilitate learning
            if move.type:
                moves_dmg_multiplier[i] = move.type.damage_multiplier(
                    battle.opponent_active_pokemon.type_1,
                    battle.opponent_active_pokemon.type_2,
                )

        # We count how many pokemons have not fainted in each team
        remaining_mon_team = len([mon for mon in battle.team.values() if mon.fainted]) / 6
        remaining_mon_opponent = (
            len([mon for mon in battle.opponent_team.values() if mon.fainted]) / 6
        )

        # Final vector with 10 components
        return np.concatenate(
            [moves_base_power, moves_dmg_multiplier, [remaining_mon_team, remaining_mon_opponent]]
        )

    def compute_reward(self, battle) -> float:
        return self.reward_computing_helper(
            battle,
            fainted_value=2,
            hp_value=1,
            victory_value=30,
        )


In [2]:
env_player = SimpleRLPlayer(battle_format="gen8randombattle")

In [3]:
len(env_player.action_space)


22

In [4]:
from tensorflow.keras.layers import Dense, Flatten
from tensorflow.keras.models import Sequential

# Output dimension
n_action = len(env_player.action_space)

model = Sequential()
model.add(Dense(128, activation="elu", input_shape=(1, 10,)))

# Our embedding have shape (1, 10), which affects our hidden layer dimension and output dimension
# Flattening resolve potential issues that would arise otherwise
model.add(Flatten())
model.add(Dense(64, activation="elu"))
model.add(Dense(n_action, activation="linear"))

2021-09-15 16:08:07.098935: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-09-15 16:08:07.104168: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-09-15 16:08:07.104880: I tensorflow/stream_executor/cuda/cuda_gpu_executor.cc:937] successful NUMA node read from SysFS had negative value (-1), but there must be at least one NUMA node, so returning NUMA node zero
2021-09-15 16:08:07.106229: I tensorflow/core/platform/cpu_feature_guard.cc:142] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags

In [5]:
import tensorflow as tf
from tf_agents.networks import q_network
from tf_agents.agents.dqn import dqn_agent


In [8]:
from rl.agents.dqn import DQNAgent
from rl.memory import SequentialMemory
from rl.policy import LinearAnnealedPolicy, EpsGreedyQPolicy
from tensorflow.keras.optimizers import Adam

memory = SequentialMemory(limit=10000, window_length=1)

# Simple epsilon greedy
policy = LinearAnnealedPolicy(
    EpsGreedyQPolicy(),
    attr="eps",
    value_max=1.0,
    value_min=0.05,
    value_test=0,
    nb_steps=10000,
)

# Defining our DQN
dqn = DQNAgent(
    model=model,
    nb_actions=22,
    policy=policy,
    memory=memory,
    nb_steps_warmup=1000,
    gamma=0.5,
    target_model_update=1,
    delta_clip=0.01,
    enable_double_dqn=True,
)

#dqn.compile(Adam(lr=0.00025), metrics=["mae"])

In [11]:
env_player.reset()

OSError: User SimpleRLPlayer 1 has no active battle.

In [None]:
q_net = q_network.QNetwork(
  train_env.observation_spec(),
  train_env.action_space,
  fc_layer_params=(100,))

agent = dqn_agent.DqnAgent(
  train_env.time_step_spec(),
  train_env.action_spec(),
  q_network=q_net,
  optimizer=optimizer,
  td_errors_loss_fn=common.element_wise_squared_loss,
  train_step_counter=tf.Variable(0))

In [None]:
from poke_env.player.random_player import RandomPlayer

def dqn_training(player, dqn, nb_steps):
    dqn.fit(player, nb_steps=nb_steps)

    # This call will finished eventual unfinshed battles before returning
    player.complete_current_battle()

opponent = RandomPlayer(battle_format="gen8randombattle")

# Training
env_player.play_against(
    env_algorithm=dqn_training,
    opponent=opponent,
    env_algorithm_kwargs={"dqn": dqn, "nb_steps": 100000},
)

In [None]:
def dqn_evaluation(player, dqn, nb_episodes):
    # Reset battle statistics
    player.reset_battles()
    dqn.test(player, nb_episodes=nb_episodes, visualize=False, verbose=False)

    print(
        "DQN Evaluation: %d victories out of %d episodes"
        % (player.n_won_battles, nb_episodes)
    )

# Ths code of MaxDamagePlayer is not reproduced for brevity and legibility
# It can be found in the complete code linked above, or in the max damage example
second_opponent = MaxDamagePlayer(battle_format="gen8randombattle")

# Evaluation
print("Results against random player:")
env_player.play_against(
    env_algorithm=dqn_evaluation,
    opponent=opponent,
    env_algorithm_kwargs={"dqn": dqn, "nb_episodes": 100},
)

print("\nResults against max player:")
env_player.play_against(
    env_algorithm=dqn_evaluation,
    opponent=second_opponent,
    env_algorithm_kwargs={"dqn": dqn, "nb_episodes": 100},
)