# Imports

In [1]:
# Default needs
import dill
import numpy as np
import torch
import pandas as pd
from collections import Counter
from collections import defaultdict
from matplotlib import pyplot as plt

np.set_printoptions(precision=3)
np.set_printoptions(suppress=True)

# Importing Environments
from environments import square_room

from utils.agent_utils import calc_win_percentage

# Combat Handler
from combat_handler import CombatHandler

# agents
from agents import TIME_LIMIT

# Actions and Players
from actions import *
from players import dungeon_master
from players import hayden
from utils.dnd_utils import roll_dice
from creatures import Creature

In [2]:
import logging
from datetime import datetime
import time
start_time = str(datetime.now().isoformat()[:-7]).replace(':',"-")

EXPT_NAME = "Ranger_PPO_Dense"
MODEL_FILE = "results\AnandakrishnanDumps\PPO_Dense\model_PPO_Rang_ITERS_10000.pickle"

# log_file_name = "logs\sims\Simulations_"+EXPT_NAME+"_"+start_time+".log"

# logging.basicConfig(filename=log_file_name, filemode='w', level=logging.INFO)
# logger = logging.getLogger("RUNNER")

# print("GONNA LOG AT ",log_file_name)

# PPO n Stuff

In [3]:
# Helpers
def report_win_percentages(winner_list, num_games, combatants, total_rewards, last_states, num_actions_takens, details = True):
    """
    :return: None
    """
    win_percentages = calc_win_percentage(winner_list[-num_games:], combatants)
    last_states = torch.cat(last_states).data.numpy()
    print("Win percentages: {}\t".format(win_percentages))
    logger.info(("Win percentages: {}\t".format(win_percentages)))

    results = list(zip(winner_list[-num_games:], total_rewards[-num_games:], last_states, num_actions_takens))
    results = sorted(results, key=lambda x: -x[1])
    if details:
        for winner, avg_reward, last_state, num_actions_taken in results:
            print(" {}: {} ({}) \t\t{}".format(winner, avg_reward, last_state, num_actions_taken))
    print("----------------------\n")


def intialize_combatants(combatants, combat_handler):
    """
    :param combatants:
    :return:
    """
    [combatant.initialize(combat_handler) for combatant in combatants]


In [4]:
from utils.agent_utils import EGreedyPolicy
from utils.agent_utils import Experience
from utils.agent_utils import filter_illegal_actions
from utils.agent_utils import filter_out_final_states
from utils.agent_utils import mean_sq_error
from utils.agent_utils import PrioritizedMemory
from utils.agent_utils import SARSAExperience
from utils.agent_utils import DuelingNet
from utils.agent_utils import ActorCritic
from agents import RandomStrategy
from agents import Strategy

class FunctionApproximation_no_train(Strategy):
    def __init__(self, max_training_steps=5e6, epsilon_start=0.3, epsilon_end=0.05, alpha=1e-4,
                 gamma=0.999, update_frequency=5e4, memory_length=1024, batch_size=128, policy_net = None):
        self.max_training_steps = max_training_steps
        self.epsilon_start = epsilon_start
        self.epsilon_end = epsilon_end
        self.alpha = alpha
        self.gamma = gamma
        self.update_frequency = update_frequency
        self.training_iteration = 0
        self.t = 0
        self.policy = EGreedyPolicy(n_steps=max_training_steps, epsilon_start=epsilon_start, epsilon_end=epsilon_end)
        self.w = None
        self.w_stored = None
        self.action_to_index = None
        self.index_to_action = None
        self.n_states = None
        self.n_actions = None

        self.policy_net = policy_net
        self.target_net = None
        self.optimizer = None
        self.memory = PrioritizedMemory(memory_length)
        self.name = "DQN"
        self.batch_size = batch_size

        self.learning_rate_decay_freq = TIME_LIMIT * 100
        self.n_learning_rate_decays = 0
        self.n_weight_updates = 0

    @staticmethod
    def determine_enemy(creature, combat_handler):
        """
        :param creature:
        :param combat_handler:
        :return enemy:
        """
        enemy = None
        combatants = combat_handler.combatants
        for combatant in combatants:
            if combatant != creature:
                enemy = combatant
        return enemy

    def initialize(self, creature, combat_handler):
        # Initialize weights if needed
        if self.policy_net is None:
            print("NO NET")
            state = self.get_current_state(creature=creature, combat_handler=combat_handler)
            self.initialize_weights(creature, state)
        else:
            # print(self.policy_net)
            
            # print(state.size(),state)
            try:
#                 print("GETTING STATE")
                state = self.get_current_state(creature=creature, combat_handler=combat_handler)
#                 print(state)
                # print(state.size(),state)
            except:
                # print("PROBLEM")
                pass
            if state is None:
                state = torch.zeros([1,9])
            # print(state)
            self.activate_weights(creature, state)
        # print(self.n_actions)

        # Obtain dictionaries translating index to actions and vice versa
        action_indicies = zip(creature.actions, range(self.n_actions))
        self.action_to_index = {action: index for action, index in action_indicies}
        self.index_to_action = {index: action for action, index in self.action_to_index.items()}

class PPO_No_Train(FunctionApproximation_no_train):
    def __init__(self, max_training_steps=1e5, epsilon_start=0.5, epsilon_end=0.05, alpha=1e-5,
                 gamma=0.99, update_frequency=30000, memory_length=16834, batch_size=128, 
                 win_reward=5,lose_reward=0,attack_dealt_reward=0,attack_recieved_reward=0, policy_net=None):
        super().__init__(
            max_training_steps, epsilon_start, epsilon_end, alpha, gamma, update_frequency, memory_length, batch_size, policy_net
        )
        self.name = "PPO"
        self.optimizer = None
        self.win_reward             = win_reward
        self.lose_reward            = lose_reward
        self.attack_dealt_reward    = attack_dealt_reward
        self.attack_recieved_reward = attack_recieved_reward

    def activate_weights(self, creature, state=torch.zeros([1,9])):
#         print(creature, state)
        self.n_states = state.shape[1]
        self.n_actions = len(creature.actions)
        h = self.n_actions
        self.optimizer = torch.optim.Adam(self.policy_net.parameters(), lr=self.alpha)


    def update_step(self, action, creature, current_state, next_state, combat_handler):
        pass

    def sample_action(self, creature, combat_handler, increment_counter=True, state=None):
        """
        Returns an action or an action_index

        :param creature:
        :param combat_handler:
        :param increment_counter:
        :param state:
        :return: action_index
        """
        # Obtain state / actions:
        if state is None:
            state = self.get_current_state(creature=creature, combat_handler=combat_handler)

        dist, value = self.policy_net(state)
        action_index = dist.sample()
        log_prob = dist.log_prob(action_index)
        action = self.index_to_action[action_index.data.numpy()[0]]

        # Return action
        return action, log_prob, value

    def evaluate_state_and_action(self, creature, combat_handler, state, action):
        """
        Obtain:
           - the probability of selection `action_index` when in input state 'state'
           - the value of the being in input state `state`
        :param creature:
        :param combat_handler:
        :param action:
        :param state:
        :return:
        """
        # Obtain state and action index:
        action_index = self.action_to_index.get(action)

        # Check if creature hadn't taken any actions.
        if action_index is None:
            return

        # Convert to tensor
        action_index = torch.tensor(action_index)

        # Check if end of combat state
        if state is None:
            state = self.get_current_state(creature=creature, combat_handler=combat_handler)

        dist, value = self.policy_net(state)
        log_prob = dist.log_prob(action_index)
        return log_prob, value

    def get_gae(self, trajectory, lmbda=0.95):
        """
        :param trajectory:
        :param lmbda:
        :return:
        """
        # Todo: replace this codeblock
        rewards = [t[2] for t in trajectory]
        values = [t[-1] for t in trajectory]
        dummy_next_value = 0  # should get masked out
        values = values + [dummy_next_value]
        masks = [t[3] is not None for t in trajectory]

        gae = 0
        returns = []

        for step in reversed(range(len(rewards))):
            delta = rewards[step] + self.gamma * values[step + 1] * masks[step] - values[step]
            gae = delta + self.gamma * lmbda * masks[step] * gae
            returns.insert(0, gae + values[step])

        returns = torch.cat(returns)
        return returns

    def get_returns(self, trajectory):
        rewards = [t[2] for t in trajectory]
        is_terminals = [t[3] is None for t in trajectory]
        discounted_rewards = list()

        for reward, is_terminal in reversed(list(zip(rewards, is_terminals))):
            if is_terminal:
                discounted_reward = 0
            discounted_reward = reward + self.gamma * discounted_reward
            discounted_rewards.insert(0, [discounted_reward])

        discounted_rewards = torch.tensor(discounted_rewards)

        return discounted_rewards

    @staticmethod
    def select_random_batch(current_states, actions, log_probs,returns, advantages, mini_batch_size):
        random_indicies = np.random.randint(0, len(current_states), mini_batch_size)

        batch_current_states = current_states[random_indicies]
        batch_actions = actions[random_indicies]
        batch_log_probs = log_probs[random_indicies]
        batch_returns = returns[random_indicies]
        batch_advantages = advantages[random_indicies]

        return batch_current_states, batch_actions, batch_log_probs, batch_returns, batch_advantages

    def update_step_trajectory(self, trajectory, clip_val=0.2):
        pass

    def determine_reward(self, creature, current_state, next_state, combat_handler):
        """
        :param creature:
        :param current_state:
        :param next_state:
        :param combat_handler:
        :return:
        """
        reward = 0

        enemy = self.determine_enemy(creature, combat_handler)


        # Winner
        if next_state is None:
            if not enemy.is_alive():
                reward += self.win_reward
            else:
                reward += self.lose_reward 

        # Get raw state
        raw_next_state = self.get_raw_state(creature, enemy, combat_handler)

        # Damage done
        damage_done = (current_state - raw_next_state)[0][1]
        # print("DMG DONE : ",damage_done)
        if float(damage_done) > 0:
            reward += self.attack_dealt_reward

        # Damage taken
        damage_taken = (raw_next_state - current_state)[0][0]
        # print("DMG TAKEN : ",damage_taken)
        if float(damage_taken) < 0:
            reward += self.attack_recieved_reward

        # print(reward)
        # input("CONTINUE ?")

        return reward

# AUtomator

In [5]:
n_iters = 1000

policy_net_trained = dill.load(open(MODEL_FILE, "rb"))

winner_list = []
total_rewards = []
last_states = []
num_actions_takens = []
ranger = Creature(
    player=hayden,
    name="Leotris",
    hit_points=28,
    armor_class=14,
    resistance = 0,
    actions=[MoveLeft(), MoveRight(), MoveUp(), MoveDown(), DoNotMove(), shortsword_slash, handcrossbow_shot],
    location=np.array([5, 10]),
    symbol="x",
        strategy=PPO_No_Train(win_reward=50,lose_reward=-50,
                              attack_dealt_reward=1,attack_recieved_reward=-1,
                              policy_net=policy_net_trained)
    )

manticore = Creature(
        player=dungeon_master,
        name="Strahd",
        hit_points=95,
        armor_class=16,
        actions=[MoveLeft(), MoveRight(), MoveUp(), MoveDown(), DoNotMove(), bite, tail_spike],
        level_1_spell_slots = 10,
        location=np.array([5, 5]),
        symbol="@",
        strategy=RandomStrategy()
    )

for i in range(n_iters):
#     ranger = Creature(
#             player=hayden,
#             name="Leotris",
#             hit_points=16,
#             armor_class=11,
#             resistance = 0,
#             actions=[MoveLeft(), MoveRight(), MoveUp(), MoveDown(), DoNotMove(), fire_bolt_cantrip, ray_of_frost_cantrip, chromatic_orb_level_1, magic_missile_level_1, scorching_ray_level_2, aganazzars_scorcher_level_2],
#             location=np.array([5, 10]),
#             level_1_spell_slots = 3,
#             level_2_spell_slots = 1,
#             symbol="x",
#             strategy=PPO_No_Train(win_reward=50,lose_reward=-50,
#                                   attack_dealt_reward=1,attack_recieved_reward=-1,
#                                   policy_net=policy_net_trained)
#         )

#     manticore = Creature(
#             player=dungeon_master,
#             name="Strahd",
#             hit_points=95,
#             armor_class=16,
#             actions=[MoveLeft(), MoveRight(), MoveUp(), MoveDown(), DoNotMove(), bite, tail_spike],
#             level_1_spell_slots = 10,
#             location=np.array([5, 5]),
#             symbol="@",
#             strategy=RandomStrategy()
#         )
    combat_handler = CombatHandler(
        environment=square_room,
        combatants=[ranger, manticore],
        time_limit=TIME_LIMIT
    )
    intialize_combatants([ranger, manticore], combat_handler=combat_handler)
    winner, total_reward, last_state, num_actions_taken = combat_handler.run_no_train()

    winner_list.append(winner)
    total_rewards.append(total_reward)
    last_states.append(last_state)
    num_actions_takens.append(num_actions_taken)

    if (i + 1) % 10 == 0:
        print("GAMES DONE : ",i+1)
        report_win_percentages(
                    winner_list=winner_list,
                    num_games=10,
                    combatants=[ranger, manticore],
                    total_rewards=total_rewards,
                    last_states=last_states,
                    num_actions_takens=num_actions_takens,
                    details = False
        )

    # dill.dump(winner_list, open("results/sims/Simulation_winner_list_{}_EXPT_{}_STARTED_{}_NITERS{}.pickle".format(ranger.strategy.name, EXPT_NAME, start_time, n_iters), "wb"))
    # dill.dump(total_rewards, open('results/sims/Simulation_reward_list_{}_EXPT_{}_STARTED_{}_NITERS{}.pickle'.format(ranger.strategy.name, EXPT_NAME, start_time, n_iters), "wb"))
    # dill.dump(num_actions_takens, open('results/sims/Simulation_num_actions_taken_{}_EXPT_{}_STARTED_{}_NITERS{}.pickle'.format(ranger.strategy.name, EXPT_NAME, start_time, n_iters), "wb"))
    # dill.dump(last_states, open('results/sims/Simulation_last_states_{}_EXPT_{}_STARTED_{}_NITERS{}.pickle'.format(ranger.strategy.name, EXPT_NAME, start_time, n_iters), "wb"))

GAMES DONE :  10
Win percentages: [('Leotris', 1.0), ('Strahd', 0)]	
----------------------

GAMES DONE :  20
Win percentages: [('Leotris', 1.0), ('Strahd', 0)]	
----------------------

GAMES DONE :  30
Win percentages: [('Leotris', 0.9), ('Strahd', 0.1)]	
----------------------

GAMES DONE :  40
Win percentages: [('Leotris', 1.0), ('Strahd', 0)]	
----------------------

GAMES DONE :  50
Win percentages: [('Leotris', 1.0), ('Strahd', 0)]	
----------------------

GAMES DONE :  60
Win percentages: [('Leotris', 1.0), ('Strahd', 0)]	
----------------------

GAMES DONE :  70
Win percentages: [('Leotris', 1.0), ('Strahd', 0)]	
----------------------

GAMES DONE :  80
Win percentages: [('Leotris', 0.9), ('Strahd', 0.1)]	
----------------------

GAMES DONE :  90
Win percentages: [('Leotris', 1.0), ('Strahd', 0)]	
----------------------

GAMES DONE :  100
Win percentages: [('Leotris', 1.0), ('Strahd', 0)]	
----------------------

GAMES DONE :  110
Win percentages: [('Leotris', 0.9), ('Strahd', 

# Results

In [6]:
Result_Arr = []
for i in range(len(winner_list)):
    Result_Arr.append([winner_list[i],total_rewards[i],num_actions_takens[i],last_states[i]])
df = pd.DataFrame(Result_Arr, columns=['winner','reward','num_actions','last_states'])
df.to_csv('results/sims/csvs/Simulation_{}_EXPT_{}_STARTED_{}_NSIMS{}.csv'.format(ranger.strategy.name, EXPT_NAME, start_time, n_iters))

In [7]:
import numpy as np
np.unique(np.array(winner_list),return_counts=True)

(array(['Leotris', 'Strahd'], dtype='<U7'), array([972,  28], dtype=int64))

In [8]:
"DONE"

'DONE'