In [None]:
''' 
Appendix to bachelor thesis 'USING DEEP REINFORCEMENT LEARNING METHODS TO PLAY THE GAME OF SKULL'

Copyright © 2022 Reimer Sjouke Theodoor Koopal. All right reserved.
'''

!pip install ray==1.10.0

import gym
import tensorflow as tf
from ray.rllib.agents.a3c.a2c import A2CTrainer, A2C_DEFAULT_CONFIG
from ray.rllib.agents.ppo import PPOTrainer, DEFAULT_CONFIG
from gym.spaces import Discrete, Tuple, Dict
from ray.rllib.env.multi_agent_env import MultiAgentEnv
from ray.tune.logger import pretty_print, UnifiedLogger

from datetime import datetime
import tempfile
import os 
from os import path
from google.colab import drive

import random
import gym, ray
import pandas as pd
from gym.spaces import Discrete, Tuple, Dict
from ray.rllib.env.multi_agent_env import MultiAgentEnv

import matplotlib.pyplot as plt
import numpy as np

%load_ext tensorboard

In [None]:
class SkullEnv(MultiAgentEnv):
    """
    Class used to represent Skull environment SkullEnv


    Attributes
    ----------
    MultiAgentEnv : ray.rllib.env.multi_agent_env.MultiAgentEnv
        Converts gym.Env class to MultiAgentEnv
  

    Methods
    -------
    __init__(self):
        Creates instance of environment

    reset(self):
        Resets current environment
        Returns an observation

    step(self, action):
        Takes an action selected by an agent and changes environment accordingly
        Returns an observation, rewards, done and info

    get_obs(self):
        Creates observation for the current player

    illegal_move(self):
        Sets rewards for current player to -100

    set_done(self):
        Sets all fields of self.done to True

    is_allowed(self, action, fase):
        Checks if action is allowed in current phase

    set_fase(self):
        Sets current phase

    roses_left(self):
        Counts number of roses left

    play_card(self, i):
        Changes self.obs to represent card being played

    play_skull(self):
        Function for action (0, y), play skull card

    play_rose(self):
        Function for action (0, y), play rose card

    bet(self):
        Function for action (0, y), raise bet by 1

    _pass(self):
        Function for action (0, y), pass

    flip(self, card):
        Function for action (x, y), flip card y
    """


    metadata = {'render.modes': ['human']}

    def __init__(self):
      self.n_players = 2
      self.n_roses = 3
      self.fase = 0
      self.current_player = 0
      self.has_passed = None
      self.cards_copy = []
      self.cards_played = 0

      self.rewards = {"player0": 0, "player1": 0}
      self.done = {"player0": False, "player1": False, "__all__": False}

      self.skullLoc = {
          'skull0' : 4,
          'skull1' : 4
      }
      self.observation_space = Dict({
          'self': Tuple([Discrete(2), Discrete(2), Discrete(2), Discrete(2)]),
          'other': Tuple([Discrete(2), Discrete(2), Discrete(2), Discrete(2)]),
          'currentBet': Discrete(7),
          'toTurn': Discrete(7),
          'skull': Discrete(5)
      }) 
      self.obs = {
          'player0': tuple([0, 0, 0, 0]),
          'player1': tuple([0, 0, 0, 0]),
          'currentBet': 0,
          'toTurn': 0,
          'skull': 4
      }
      ## observations are: 4 possible played cards for each player
      ##                   The current highest bet
      ##                   The amount of cards left to turn

      self.action_space = Tuple([Discrete(4), Discrete((self.n_roses + 1) * self.n_players)])
      ## actions are ([play skull, play rose, bet, pass] and 
      ##             [turn 1, turn 2, turn 3, turn 4, turn 5, turn 6, turn 7, turn 8])

    def reset(self):
        self.n_players = 2
        self.n_roses = 3
        self.action = None
        self.current_bet = None
        self.fase = 0
        self.current_player = 0
        self.has_passed = None
        self.cards_copy = {}
        self.cards_played = 0


        self.rewards = {"player0": 0, "player1": 0}
        self.done = {"player0": False, "player1": False, "__all__": False}

        self.skullLoc = {
            'skull0' : 4,
            'skull1' : 4
        }
        self.observation_space = Dict({
            'self': Tuple([Discrete(2), Discrete(2), Discrete(2), Discrete(2)]),
            'other': Tuple([Discrete(2), Discrete(2), Discrete(2), Discrete(2)]),
            'currentBet': Discrete(7),
            'toTurn': Discrete(7),
            'skull': Discrete(5)
        }) 
   

        ## observations are: 4 possible played cards for each player
        ##                   The current highest bet
        ##                   The amount of cards left to turn

        self.obs = {
            'player0': tuple([0, 0, 0, 0]),
            'player1': tuple([0, 0, 0, 0]),
            'currentBet': 0,
            'toTurn': 0,
            'skull': 4
        }

        self.action_space = Tuple([Discrete(4), Discrete((self.n_roses + 1) * self.n_players)])
        ## actions are ([play skull, play rose, bet, pass] and 
        ##             [turn 1, turn 2, turn 3, turn 4, turn 5, turn 6, turn 7, turn 8])

       
        return self.get_obs()


    def step(self, action):
      info = {}
      player = "player" + str(self.current_player)
      self.set_fase()
  
      if(self.fase == 0):
          if (self.is_allowed(action, self.fase)):
            if(action[0] == 0):                               ##skull
              self.play_skull()
            elif(action[0] == 1):                             ##rose
                  self.play_rose()
            elif(action[0] == 2):                             ##bet
                  self.bet()
          self.current_player = abs(self.current_player-1)    ##switch turn

      elif(self.fase == 1):
        if (self.is_allowed(action, self.fase)):
          if (action[0] == 2):                                ##bet
              self.bet()
          elif (action[0] == 3):                              ##pass
              self._pass()
        self.current_player = abs(self.current_player-1)      ##switch turn

      elif(self.fase == 2):                                   ##flipping fase 
        if (self.is_allowed(action, self.fase)):
          skullFound = 0
          if (self.flip(action[1])):
            self.rewards['player' + str(self.current_player)] = -1
            self.rewards['player' + str(abs(self.current_player - 1))] = 1
            skullFound = 1
            self.set_done()
          if (skullFound == 0 and self.obs['toTurn'] == 0):
            self.rewards['player' + str(self.current_player)] = 1
            self.rewards['player' + str(abs(self.current_player - 1))] = -1
            self.set_done()
      
      return self.get_obs(), self.rewards, self.done, info


    def get_obs(self):
        obs_agent = {"player"+str(self.current_player): 
              {
                  'self': self.obs['player'+str(self.current_player)],
                  'other': self.obs['player' + str(abs(self.current_player-1))],
                  'currentBet': self.obs['currentBet'],
                  'toTurn': self.obs['toTurn'],
                  'skull': self.skullLoc['skull' + str(self.current_player)]
              }
        }
        return obs_agent

    def illegal_move(self):
        self.rewards['player' + str(self.current_player)] = -100
        self.set_done()

    def set_done(self):
        self.done = {"player0": True, "player1": True, "__all__": True}

    def is_allowed(self, action, fase):
      if(action[0] < 0 or action[1] < 0):
        print("Action non-existent")
        # print("The action " + str(action[0] + "is illegal in fase " + str(self.fase)))
        self.illegal_move()
        return 0
      if(fase == 0):
        if (action[0] > 2):
          # print("Can only place cards or challenge during this fase")
          self.illegal_move()
          return 0
        if (action[0] < 2):
          if not (0 in self.obs['player' + str(self.current_player)]):
            # print("No cards left to play")
            self.illegal_move()
            return 0
        if (action[0] == 0 and self.skullLoc['skull' + str(self.current_player)] != 4):
          print("Skull has already been played")
          self.illegal_move()
          return 0
        if (action[0] == 1 and self.skullLoc['skull' + str(self.current_player)] == 4):
          if (self.roses_left() == 3):
            # print("No roses left to play")
            self.illegal_move()
            return 0
        if (action[0] == 2):
          if not (1 in self.obs['player' + str(self.current_player)]):
            if not (1 in self.obs['player' + str(abs(self.current_player - 1))]):
              # print('Cannot bet with 0 cards played')
              self.illegal_move()
              return 0             
      if (fase == 1):
        if (action[0] != 2 and action[0] != 3):
          # print("Can only bet or pass at this fase")
          self.illegal_move()
          return 0
      if (fase == 2):
        if (self.cards_copy[action[1]] == 0):
          # print("Card non-existent")
          self.illegal_move()
          return 0
          
      return 1


    def set_fase(self):
      if (self.obs['toTurn'] > 0):
        self.fase = 2
      elif (self.obs['currentBet'] > 0):
        self.fase = 1
      else:
        self.fase = 0
    

    def roses_left(self):
      n_roses = 0
      for card in self.obs['player' + str(self.current_player)]:
        if card == 1:
          n_roses += 1
      return n_roses


    def play_card(self, i):
      cards = self.obs['player' + str(self.current_player)]
      cards_new = list(cards)                                            
      cards_new[i] = 1
      self.cards_played += 1
      return tuple(cards_new)


    def play_skull(self):
        i = self.obs['player' + str(self.current_player)].index(0)        
        cards = self.play_card(i)
        self.obs['player' + str(self.current_player)] = cards
        self.obs['skull'] = i
        self.skullLoc['skull' + str(self.current_player)] = i

    def play_rose(self):
        i = self.obs['player' + str(self.current_player)].index(0)       
        cards = self.play_card(i)
        self.obs['player' + str(self.current_player)] = cards

    def bet(self):
      if (self.obs['currentBet'] >= 6):
        # print('Cannot bet more than 6')
        self.illegal_move()
      else:
        self.obs['currentBet'] += 1


    def _pass(self):
        self.has_passed = self.current_player
        self.obs['toTurn'] = self.obs['currentBet']
        self.cards_copy = list(self.obs['player' + str(abs(self.current_player-1))]) + list(self.obs['player' + str(self.current_player)])
        

    def flip(self, card):
        if (card < 4):
          if (card == self.skullLoc['skull' + str(self.current_player)]):
            return 1
          self.obs['toTurn'] -= 1
          self.cards_copy[card] = 0
          self.obs['player' + str(self.current_player)] = tuple(self.cards_copy[:4])
        elif (card >= 4):
          card -= 4
          if (card == self.skullLoc['skull' + str(abs(self.current_player-1))]):
            return 1
          self.obs['toTurn'] -= 1
          self.cards_copy[card+4] = 0
          self.obs['player' + str(abs(self.current_player-1))] = tuple(self.cards_copy[4:])
        return 0


In [None]:
''' 
Code block to mount notebook to drive folder
'''

drive.mount('/content/drive')

results_dir = ""

def custom_log_creator(custom_path, custom_str):
    """
    Log model checkpoints to specified directory

    Parameters
    ----------
    custom_path : str
        Path to log directory

    custom_Str : str
        Name folder
    """

    timestr = datetime.today().strftime("%Y-%m-%d_%H-%M-%S")
    logdir_prefix = "{}_{}".format(custom_str, timestr)

    def logger_creator(config):

        if not os.path.exists(custom_path):
            os.makedirs(custom_path)
        logdir = tempfile.mkdtemp(prefix=logdir_prefix, dir=custom_path)
        return UnifiedLogger(config, logdir, loggers=None)

    return logger_creator

Mounted at /content/drive


In [None]:
def ppo():
  """
  Run Proximal Policy Optimization algortihm on SkullEnv

  """
  config1 = DEFAULT_CONFIG.copy()
  config1['num_gpus'] = 0
  config1['framework'] = 'torch'
  config1['num_workers'] = 0
  config1['num_envs_per_worker'] = 1
  agent2 = PPOTrainer(config1, SkullEnv, logger_creator=custom_log_creator(os.path.expanduser(results_dir), 'PPO'))
  ##uncomment and pass checkpoint path to use
  # agent2.restore("")

  %tensorboard --logdir=""

  for i in range(50):
      for j in range(50):
        print("Training iteration " + str(i) + ", " + str(j))
        result = agent2.train()
        print(pretty_print(result))
      agent2.save()


In [None]:
def a2c():
    """
    Run Advantage Actor-Critic algorithm on SkullEnv
    """
    
  config1 = A2C_DEFAULT_CONFIG.copy()
  config1['num_gpus'] = 0
  config1['framework'] = 'torch'
  config1['num_workers'] = 0
  config1['num_envs_per_worker'] = 1
  agent1 = A2CTrainer(config1, SkullEnv, logger_creator=custom_log_creator(os.path.expanduser(results_dir), 'A2C_3.1'))
  agent1 = A2CTrainer(config1, SkullEnv)
  ##uncomment and pass checkpoint path to use
  # agent1.restore("")

  %tensorboard --logdir=""

  for i in range(30):
      for j in range(20):
        print("Training iteration " + str(i) + ", " + str(j))
        result = agent1.train()
        print(pretty_print(result))
      agent1.save()

In [None]:
def one_agent(agent1, numGames):
  """
    Play number of games in SkullEnv, print obs and result

    Parameters
    ----------
    agent1 : 
        Instance of algorithm

    numGames : int
        Number of games to be played
  """
  for j in range(numGames):
    sk = SkullEnv(None)
    done = {"__all__": False}
    obs = sk.reset()
    for i in range(21):
      action = agent1.compute_single_action(obs['player' + str(sk.current_player)])
      print("obs player" + str(sk.current_player) + ":", obs)
      print("action player" + str(sk.current_player) + ":", action)
      obs, reward, done, info = sk.step(action)
      if(done['__all__'] == True):
        break;
      print('\n')
    print(reward)
    print('rounds ' + str(i))
    print('\n')


In [1]:
def two_agents(agent1, agent2, numGames):
  """
    Two agents play number of games in SkullEnv, print obs and result

    Parameters
    ----------
    agent1 : 
        Instance of an algorithm
    
    agent1 : 
        Instance of another algorithm

    numGames : int
        Number of games to be played
  """
  for j in range(numGames):
    sk = SkullEnv(None)
    done = {"__all__": False}
    obs = sk.reset()
    game = "game: " + str(j) + "\n"
    for i in range(21):
      if (sk.current_player == 0):
        action = agent1.compute_single_action(obs['player' + str(sk.current_player)])
      elif (sk.current_player == 1):
        action = agent2.compute_single_action(obs['player' + str(sk.current_player)])
      game +=  str(obs) + "\n"
      game += "action player" + str(sk.current_player) + ": " + str(action) + "\n\n"
      obs, reward, done, info = sk.step(action)
      if(done['__all__'] == True):
        break;
    game += str(reward) + "\n"
    game += 'rounds ' + str(i) + "\n\n"
    print(game)
  

In [None]:
def arena_p1(agent1, path, name, len, gameLen=100,):
  """
    Play number of games in SkullEnv, save statistics to .csv, save transcriptions to .txt

    Parameters
    ----------
    agent1 : 
        Instance of an algorithm
    
    path: str
        Path to directory

    name: str
        Custom name

    len: str
        Number of .txt files

    gameLen=100 : int
        Number of games per .txt file, default: 100
  """
  cols = ['id_f', 'id', 'start', 'winner', 'cB', 'passed', 'skull0', 'skull1', 'cards', 'ruleBreak']
  df = pd.DataFrame(columns=cols)
  id_f = 0
  id = 0
  row = 0
  for k in range(len):
    with open(path + name + "-" + str(k) +'.txt', 'w') as file:
      for j in range(gameLen):
        file.write("game: " + str(j) + "\n")
        sk = SkullEnv(None)
        done = {"__all__": False}
        obs = sk.reset()
        reward = {}
        winner = 'NA'
        ruleBreak = 'NA'
        start = sk.current_player
        for i in range(21):
          action = agent1.compute_single_action(obs['player' + str(sk.current_player)])
          file.write(str(obs) + "\n")
          file.write("action player" + str(sk.current_player) + ": " + str(action) + "\n\n")
          obs, reward, done, info = sk.step(action)
          if(done['__all__'] == True):
            break;
        file.write(str(reward) + "\n")
        file.write('rounds ' + str(i) + "\n\n")

        if(reward['player0'] == 1):
          winner = 'player0'
        if(reward['player0'] == -100):
          ruleBreak = 'player0'
        if(reward['player1'] == 1):
          winner = 'player1'
        if(reward['player1'] == -100):
          ruleBreak = 'player1'
        df.loc[row] = [k, j, start, winner, sk.obs['currentBet'], sk.has_passed, sk.skullLoc['skull0'], sk.skullLoc['skull1'], sk.cards_played, ruleBreak]
        row += 1
  df.to_csv(path + name + '.csv')

In [None]:
def arena_p2(agent1, agent2, path, name, len, gameLen=100):
  """
    Two agents play number of games in SkullEnv, save statistics to .csv, save transcriptions to .txt

    Parameters
    ----------
    agent1 : 
        Instance of an algorithm

    agent2 : 
        Instance of another algorithm
    
    path: str
        Path to directory

    name: str
        Custom name

    len: str
        Number of .txt files

    gameLen : int
        Number of games per .txt file, default: 100
  """
  cols = ['id_f', 'id', 'player0', 'winner', 'cB', 'passed', 'skull0', 'skull1', 'cards', 'ruleBreak']
  df = pd.DataFrame(columns=cols)
  id_f = 0
  id = 0
  row = 0
  for k in range(len):
    with open(path + name + "-" + str(k) +'.txt', 'w') as file:
      for j in range(gameLen):
        file.write("game: " + str(j) + "\n")
        sk = SkullEnv(None)
        done = {"__all__": False}
        obs = sk.reset()
        reward = {}
        winner = 'NA'
        ruleBreak = 'NA'
        rnum = random.randint(0, 1)
        if (rnum == 1):
          file.write("started: A2C \n")
        else:
          file.write("started: PPO \n")
        for i in range(21):
          if (sk.current_player == rnum):
            action = agent1.compute_single_action(obs['player' + str(sk.current_player)])
          else:
            action = agent2.compute_single_action(obs['player' + str(sk.current_player)])
          file.write(str(obs) + "\n")
          file.write("action player" + str(sk.current_player) + ": " + str(action) + "\n\n")
          obs, reward, done, info = sk.step(action)
          if(done['__all__'] == True):
            break;
        file.write(str(reward) + "\n")
        file.write('rounds ' + str(i) + "\n\n")

        if(reward['player0'] == 1):
          winner = 'player0'
        if(reward['player0'] == -100):
          ruleBreak = 'player0'
        if(reward['player1'] == 1):
          winner = 'player1'
        if(reward['player1'] == -100):
          ruleBreak = 'player1'

        if (rnum == 0):
          start = 'A2C'
        else:
          start = 'PPO'

        df.loc[row] = [k, j, start, winner, sk.obs['currentBet'], sk.has_passed, sk.skullLoc['skull0'], sk.skullLoc['skull1'], sk.cards_played, ruleBreak]
        row += 1
  df.to_csv(path + name + '.csv')
 

In [None]:
def get_stats(path):
  """
    Print out statistics on generated .csv file

    Parameters
    ----------
    path: str
        Path to .csv file
  """
  df = pd.read_csv(path)
  games_played = 0
  A2C_won = 0
  PPO_won = 0
  A2C_won_1 = 0
  PPO_won_1 = 0
  A2C_rb = 0
  PPO_rb = 0
  A2C_rb_1 = 0
  PPO_rb_1 = 0
  len = df.shape[0]
  for index, row in df.iterrows():
    if (row['player0'] == 'A2C'):
      if (row['winner'] == 'player0'):
        A2C_won += 1
      elif (row['winner'] == 'player1'):
        PPO_won += 1
      if (row['ruleBreak'] == 'player0'):
        A2C_rb += 1
        print("a2c: " + str(row['id_f']) + str(row['id']))
      elif (row['ruleBreak'] == 'player1'):
        PPO_rb += 1
    elif (row['player0'] == 'PPO'):
      if (row['winner'] == 'player0'):
        PPO_won_1 += 1
      elif (row['winner'] == 'player1'):
        A2C_won_1 += 1
      if (row['ruleBreak'] == 'player0'):
        PPO_rb_1 += 1
        print("ppo: " + str(row['id_f']) + str(row['id']))
      elif (row['ruleBreak'] == 'player1'):
        A2C_rb_1 += 1

  ##player0: A2C
  print('A2C_won: ' + str(A2C_won))
  print('PPO_won: ' + str(PPO_won))
  print('PPO rule break: ' + str(PPO_rb))
  print('A2C rule break: ' + str(A2C_rb))
  ##player1: PPO
  print('A2C_won_1: ' + str(A2C_won_1))
  print('PPO_won_1: ' + str(PPO_won_1))
  print('PPO rule break 1: ' + str(PPO_rb_1))
  print('A2C rule break 1: ' + str(A2C_rb_1))

  plotdata = pd.DataFrame({
    "A2C wins":[A2C_won, A2C_won_1],
    "PPO wins ": [PPO_won, PPO_won_1],
    "A2C rule violations":[A2C_rb, A2C_rb_1], 
    "PPO rule violations":[PPO_rb, PPO_rb_1],
    }, index=["A2C", "PPO"])
  
  plotdata.plot(kind="bar")
  plt.title("A2C vs PPO")
  plt.subplots_adjust(bottom=0.15)
  plt.xlabel("Starting player")
  plt.ylabel("Games won")
  plt.show()

  plotdata1 = pd.DataFrame({
    "wins":[A2C_won + A2C_won_1, PPO_won + PPO_won_1],
    "Rule violations":[A2C_rb + A2C_rb_1, PPO_rb + PPO_rb_1]
    }, index=["A2C", "PPO"])
  plotdata1.plot(kind="bar")
  plt.title("A2C vs PPO")
  plt.subplots_adjust(bottom=0.15)
  plt.ylabel("Games won")
  plt.show()
  plt.savefig(f"")  

In [None]:
''' 
Load TensorBoard interface
Provide path to event files
'''

%load_ext tensorboard
%tensorboard --logdir=""

In [None]:
''' 
Call functions here
'''
def main():

    ## uncomment and pass parameters to use
    # ppo()
    # a2c()
    # one_agent()
    # two_agents()
    # arena_p1()
    # arena_p2()
    # get_stats()

if __name__ == "__main__":
    main()
