In [1]:
import numpy as np
import random
import torch
from torch import nn
import copy

In [2]:
## Game Engine

class Mancala:
    def __init__(self, disp = False, seed = 0):
        import random
        # set seed 
        random.seed(seed)
        
        # Initialize Mancala game board
        self.board = [[4]*6 for i in range(2)]
        
        # Choose which player goes first (probably change to random initialization)
        self.turn = random.randint(0, 1)    # 0 for TOP player
                                            # 1 for BOTTOM player
        
        # Game End indicator
        self.game_end = False
        
        # Should prints be displayed?
        self.disp = disp
        
    
    ## def output
    def __str__(self):
        return str(self.board[0]) + '\n' + str(self.board[1]) +  'End = ' +  str(self.game_end)
        
    def seed(self, seed=0):
        random.seed(seed)
        
    def valid_action(self, action, turn):
        return self.board[turn][abs(5*(1 - turn) - action%6)] != 0
    
    ## act on the game
    def action(self, action):
        # Action meanings:
        # 0-5:  distribute beans from bowl 0-5 WITHOUT placing one in the Score
        # 6-11: distribute beans from bowl 0-5 AND place one in the Score
        # 12: End game
        
        if action == 12:
            self.game_end = True
            return 0
        
        # interpret input
        bowl = action % 6      # take beans from bowl
        toScore = action // 6  # 0 -> skip Score, 1 -> do not skip Score
        
        # position
        x = bowl if self.turn else 5 - bowl
        y = self.turn
        
        # Init reward
        reward = 0
        
        # How many beans are in chosen bowl
        beans = self.board[y][x]
        
        # Who will be next?
        next_turn = 1 - self.turn
        
        # If chosen bowl is empty
        if beans == 0:
            if self.disp: print("Error! You chose an empty bowl! -10 Points Penalty")
            reward -= 2
            next_turn = 1 - next_turn
            
        # Take beans out of bowl
        self.board[y][x] = 0
        
        # Beans are distributed counterclock-wise
        direction = 2*self.turn - 1   # +1 -> right, -1 -> left
        
        # While there are beans left, distribute them counterclock-wise
        while beans != 0:
            
            # If beans < 0 then there has been some error
            if beans < 0:
                if self.disp: print("Error nr of beans negative!")
                
            # move to next bowl
            x += direction  
            
            # if end is reached check if toScore is true and continue on other side
            if x < 0 or x > 5:
                # if toScore is true, put one bean in score (only if it is the correct Score)
                if toScore and y == self.turn:
                    reward += 1
                    beans -= 1
                    
                    # if this was the last bean the same player will play again
                    if beans == 0:
                        next_turn = self.turn
                        break
                # change row and direction
                y = 1 - y
                direction = -direction
                x += direction
            
            # if this was the last bean, check opposite bowl
            if beans == 1 and self.board[y][x] == 0 and y == self.turn:
                reward += 1 + self.board[1 - y][x]
                self.board[1 - y][x] = 0
            else:
                self.board[y][x] += 1
            beans -= 1
            
        # set next player
        self.turn = next_turn
        
        return reward
       
    def end_check(self):
        if self.game_end or self.board[self.turn] == [0]*6:
            if self.disp: print(f'No more moves for {"TOP" if self.turn else "BOTTOM"}!')
            self.__str__()
            self.game_end = True
            return True
        else:
            return False
    
    def final_reward(self):
        if not self.game_end:
            if self.disp: print("The game is not over?!")
            return 0
        
        if self.disp: print(f'No more moves for {"TOP" if self.turn else "BOTTOM"} Player! \nGame End\n ==============================')
        
        return sum(self.board[1 - self.turn])
        
            
    def copy(self):
        cop = Mancala()
        cop.board = np.array(self.board)
        cop.game_end = bool(self.game_end)
        cop.turn = int(self.turn)
        return cop
        
    def reset(self, disp = False):
        # Initialize Mancala game board
        self.board = [[4]*6 for i in range(2)]
        
        # Choose which player goes first (probably change to random initialization)
        self.turn = random.randint(0, 1)    # 0 for TOP player
                                         # 1 for BOTTOM player
            
        # Game End indicator
        self.game_end = False
        
        # Should prints be displayed?
        self.disp = disp
        
    def flip(self):
        self.board = [self.board[1][::-1], self.board[0][::-1]]

In [3]:
## For Deep-Q-Learning we need a replay memory
# This memory is for the Top player!
class Memory:
    def __init__(self, maxlen = 1e5):
        import random
        import copy
        self.size = 0
        self.memory = []
        print("Memory Initialized")
        self.maxlen = int(maxlen)
        self.current_pos = int(0)
        
    
    def __getitem__(self, idx=-1):
        if idx < 0 or idx >= self.size:
            print(f'Index {idx} is too large for memory of length {self.size}. \nInstead return random entry')
            return self.memory[random.randint(0, self.size - 1)]
        else:
            return self.memory[idx]
    
    def draw(self):
        if self.size: # self.size should not be 0
            return self.__getitem__(random.randint(0, self.size - 1))
        else:
            print(f'Memory {turn} is not yet filled')
            return []
        
    def draw_batch(self, batch_size):
        batch_size = min(batch_size, self.size)
        return [self.draw() for i in range(batch_size)]
    
    def add(self, previousState, action, reward, state, game_end):
        if self.size < self.maxlen:
            # relevant quantities: state0, action, reward, state1, game_end
            # we need deepcopy the nested lists,
            # because otherwise we only reference the list 
            # which will change later!
            self.memory.append([copy.deepcopy(previousState), 
                                action, 
                                reward, 
                                copy.deepcopy(state),
                                game_end])
            self.size += 1
            self.current_pos += 1
        else:
            self.current_pos = int(self.current_pos % self.maxlen)
            self.memory[self.current_pos] = ([copy.deepcopy(previousState), 
                                              action, 
                                              reward, 
                                              copy.deepcopy(state),
                                              game_end])
            self.current_pos += 1
        
        
    def add_random_game(self):
        # Initialize game
        game = Mancala(False)  
        
        # Start game Loop
        while not game.game_end:
            
            # state t
            game0 = game.copy()
            
            # random action
            ac = random.randint(0, 11)
            
            while game.board[game.turn][abs(5*(1 - game.turn) - ac%6)] == 0:
                #print(game.turn, game.board)
                ac = random.randint(0, 11)
            reward = game.action(ac)
            
            # save state(t), action, state(t+1)
            self.add(game0.turn, game0, ac, reward, game.copy())  
            
    def __str__(self):
        return f'Size: {self.size} \nFirst Entry Player 0:\n {self.memory[0][0]} \n... \nLast Entry Player 1:\n {self.memory[1][self.size[1] - 1]}'

In [17]:
class Player():
    
    def __init__(self, pos, name):
        
        import numpy as np
        import random
        
        self.score = 0
        self.reward = 0
        self.name = name
        
        self.pos = pos
        self.position = ["TOP", "BOTTOM"][pos]
        
        self.previousState = [[4]*6, [4]*6]
        self.action = -1
        
        self.memory = Memory()
        
    def reset(self):
        self.score = 0
        self.reward = 0
        
        self.previousState = [[4]*6, [4]*6]
        self.action = -1
        
        
        
class RandomPlayer(Player):
    def __init__(self, pos=1, name="Random"):
        Player.__init__(self, pos, name)
        self.memory.maxlen = 2
    
    def think(self, game):
        action = random.randint(0, 11)
        while not game.valid_action(action, game.turn):
            action = random.randint(0, 11)
        self.action = action
        
        return action
        
class GreedyPlayer(Player):
    def __init__(self, pos=1, name="Greedy"):
        Player.__init__(self, pos, name)
        self.reward_list = np.zeros(12, dtype=np.int64)
        self.memory.maxlen = 2
    
    def think(self, game):
        reward_list = np.zeros(12, dtype=np.int64)
        #if 0 not in game.board[self.pos]:
        #    if 12 not in game.board[self.pos] and 13 not in game.board[self.pos]:
        #        return random.randint(0, 11)
                
        if True: #0
            for idx, beans in enumerate(game.board[self.pos][::(game.turn*2-1)]):
                # If there are no beans, skip
                if beans == 0:
                    continue
                    
                # if beans reach 'score', and action >= 6, put one in 'score'
                if beans + idx >= 5:
                    reward_list[idx + 6] += 1
                    if beans + idx > 17:
                        reward_list[idx + 6] += 1
                
                # if beans 13 it cannot land in an empty bowl
                if beans > 13:
                    continue
                # if beans=13 then action idx+6 will be good
                elif beans == 13:
                    reward_list[idx + 6] += 2 + game.board[1 - self.pos][abs(5*(1-game.turn)-idx)]
                # if beans=12 then action idx will be good
                elif beans == 12:
                    reward_list[idx] += 2 + game.board[1 - self.pos][abs(5*(1-game.turn)-idx)]
                # if beans do not cross 'score'
                elif beans + idx <= 5:
                    if game.board[self.pos][abs(5*(1-game.turn)-idx-beans)] == 0:
                        reward_list[idx] += 1 + game.board[1 - self.pos][abs(5*(1-game.turn)-idx-beans)]
                        reward_list[idx + 6] += 1 + game.board[1 - self.pos][abs(5*(1-game.turn)-idx-beans)]
                # if beans cross 'score' of both players
                elif beans + idx == 12:
                    if game.board[self.pos][abs(5*game.turn)] == 0:
                        reward_list[idx] += 2 + game.board[1 - self.pos][abs(5*game.turn)]
                elif beans + idx > 12:
                    if game.board[self.pos][abs(5*(1-game.turn)-(beans + idx - 12))] == 0:
                        reward_list[idx] += 2 + game.board[1 - self.pos][abs(5*(1-game.turn)-(beans + idx - 12))]
                    if game.board[self.pos][abs(5*(1-game.turn)-(beans + idx - 13))] == 0:
                        reward_list[idx + 6] += 2 + game.board[1 - self.pos][abs(5*(1-game.turn)-(beans + idx - 13))]
            
        else:
            for idx, beans in enumerate(game.board[self.pos]):
                if beans == 0:
                    continue
                
                if beans > idx:
                    reward_list[idx + 6] += 1
                    if beans > 12 + idx:
                        reward_list[idx + 6] += 1
                        
                if beans > 13:
                    continue
                elif beans == 13:
                    reward_list[idx + 6] = 2 + game.board[1 - self.pos][idx]
                elif beans == 12:
                    reward_list[idx] = 2 + game.board[1 - self.pos][idx]
                elif beans + idx <= 5:
                    if game.board[self.pos][idx - beans] == 0:
                        reward_list[idx] = 1 + game.board[1 - self.pos][idx - beans]
                        reward_list[idx + 6] = 1 + game.board[1 - self.pos][idx - beans]
                elif beans + idx == 12:
                    if game.board[self.pos][beans + (5-idx) - 12] == 0:
                        reward_list[idx] = 2 + game.board[1 - self.pos][beans - idx - 7]
                elif beans+ idx > 12:
                    if game.board[self.pos][beans + (5-idx) - 12] == 0:
                        reward_list[idx] = 2 + game.board[1 - self.pos][beans - idx - 7]
                    if game.board[self.pos][beans + (5-idx) - 13] == 0:
                        reward_list[idx + 6] = 2 + game.board[1 - self.pos][beans - idx - 8]
                        
        #print(reward_list)
        action = np.random.choice(np.where(reward_list == np.max(reward_list))[0])
        while not game.valid_action(action, game.turn):
            #print(reward_list, "invalid action", action)
            reward_list[action] -= 1000
            action = np.random.choice(np.where(reward_list == np.max(reward_list))[0])
        #print(action)
        return action
    

class HumanPlayer(Player):
    def __init__(self, pos=1, name='Human'):
        Player.__init__(self, pos, name)
        self.memory.maxlen = 2
        
    def think(self, game):
        #print(game)
        
        inp = int(input(f'It is {self.name}s turn. Whats your next move? (input 0-11, end: 12)'))
        if inp in range(13):
            action = inp
        else:
            print(f'{inp} is an invalid input! Valid inputs are numbers 0-11 for actions and 12 to end the game.')
            action = self.think(game)
            
        self.action = action
        
        return action
        
class DQN(nn.Module):

    def __init__(self, state_space_dim, action_space_dim):
        from torch import nn
        
        super().__init__()

        self.linear = nn.Sequential(
                nn.Linear(state_space_dim, 256),
                nn.ReLU(),
                nn.Linear(256, 128),
                nn.ReLU(),
                nn.Linear(128, 128),
                nn.ReLU(),
                nn.Linear(128, action_space_dim)
                )

    def forward(self, x):
        return self.linear(x)
    
        
class DQNPlayer(Player):
    def __init__(self, 
                 gamma=0.97, 
                 learning_rate = 1e-2,
                 batch_size = 256,
                 target_net_update_steps = 1000,
                 replay_memory_capacity = 1e5, 
                 epsilon_start = 100,
                 epsilon_stop = 3e3,
                 epsilon_min = 0.05,
                 weight_decay = 1e-5,
                 policy = 'softmax',
                 pos=0, 
                 name='AI'):
        import torch
        import random
        import numpy as np
        from torch import nn
        import math
        
        Player.__init__(self, pos, name)
        
        ### Set Memory length
        self.memory.maxlen = replay_memory_capacity
        
        ### Initialize the policy network
        self.net = DQN(state_space_dim=12, action_space_dim=12)
        
        ### Initialize the target network with the same weights of the policy network
        self.target_net = DQN(state_space_dim=12, action_space_dim=12)
        self.target_net_update_steps = target_net_update_steps
        
        ### Initialize the optimizer
        self.optimizer = torch.optim.Adam(self.net.parameters(), lr=learning_rate, weight_decay=1e-4) # The optimizer will update ONLY the parameters of the policy network
        
        ### Initialize the loss function (Huber loss)
        self.loss_fn = nn.SmoothL1Loss()
        self.loss = 0
        
        self.idx = 0
        
        self.final = False
        
        self.policy = policy
        
        if False:
            torch.manual_seed(0)
            np.random.seed(0)
            random.seed(0)
        
        self.gamma = gamma
        self.memory.maxlen = replay_memory_capacity
        
        self.learning_rate = learning_rate
        self.batch_size = batch_size
        
        self.epsilon_start = epsilon_start
        self.epsilon_stop = epsilon_stop
        self.epsilon_min = epsilon_min
        
    def epsilon(self):
        return max(self.epsilon_min, 
                   1 if self.idx < self.epsilon_start 
                   else 1 - (1 - self.epsilon_min) * (self.idx - self.epsilon_start)  / (self.epsilon_stop - self.epsilon_start))
        
    def update_step(self):
        
        if self.idx < self.epsilon_start:
            self.idx += 1
            pass
        
        #replay_mem = self.memory
        #policy_net = self.net
        #target_net = self.target_net
        #gamma = self.gamma
        #optimizer = self.optimizer
        #loss_fn = self.loss_fn
        #batch_size = self.batch_size
        
        # Sample the data from the replay memory
        batch = self.memory.draw_batch(self.batch_size)
        batch_size = len(batch)

        # Create tensors for each element of the batch
        states      = torch.tensor([s[0][0] + s[0][1] for s in batch], dtype=torch.float32)
        actions     = torch.tensor([s[1] for s in batch], dtype=torch.int64)
        rewards     = torch.tensor([s[2] for s in batch], dtype=torch.int64)
        next_states = torch.tensor([s[3][0] + s[3][1] if not s[4] else [0]*12 for s in batch], dtype=torch.float32)
        game_end    = torch.tensor([s[4] for s in batch], dtype=torch.bool)


        # Compute a mask of non-final states (all the elements where the next state is not None)
        non_final_next_states = torch.tensor([s[2] for s in batch if s[2] is not None], dtype=torch.int64) # the next state can be None if the game has ended
        non_final_mask = torch.tensor([s[2] is not None for s in batch], dtype=torch.bool)


        # Compute all the Q values (forward pass)
        self.net.train()
        q_values = self.net(states)
        # Select the proper Q value for the corresponding action taken Q(s_t, a)
        state_action_values = q_values.gather(1, actions.unsqueeze(1))

        # Compute the value function of the next states using the target network V(s_{t+1}) = max_a( Q_target(s_{t+1}, a)) )
        with torch.no_grad():
            self.target_net.eval()
            q_values_target = self.target_net(next_states)
        next_state_max_q_values = torch.zeros(batch_size)
        next_state_max_q_values[game_end] = q_values_target.max(dim=1)[0][game_end]

        # Compute the expected Q values
        expected_state_action_values = rewards + (next_state_max_q_values * self.gamma)
        expected_state_action_values = expected_state_action_values.unsqueeze(1) # Set the required tensor shape

        # Compute the Huber loss
        self.loss = self.loss_fn(state_action_values, expected_state_action_values)

        # Optimize the model
        self.optimizer.zero_grad()
        self.loss.backward()
        # Apply gradient clipping (clip all the gradients greater than 2 for training stability)
        nn.utils.clip_grad_norm_(self.net.parameters(), 2)
        self.optimizer.step()
        
        self.idx += 1
        
        # Update the target network every target_net_update_steps episodes
        if self.idx == self.epsilon_start + 100 or self.idx % self.target_net_update_steps == 0:
            print('Updating target network...')
            self.target_net.load_state_dict(self.net.state_dict()) # This will copy the weights of the policy network to the target network
    
    def think(self, game, invalid = []):
        if self.policy == 'softmax':
            return self.think_softmax(game, invalid)
        else:
            return self.think_epsilon(game, invalid)
    
    def think_epsilon(self, game, invalid = []):
        
        if not self.final and random.random() < self.epsilon():
            action = random.randint(0, 11)
            while action in invalid:
                action = random.randint(0, 11)
            
        else:
            with torch.no_grad():
                self.net.eval()
                net_out = self.net(torch.tensor([game.board[0] + game.board[1]], dtype=torch.float32))
                action = int(torch.argmax(net_out))
                
            
            while action in invalid:
                #print(reward_list, "invalid action", action)
                net_out[0][action] -= 1000
                action = int(torch.argmax(net_out))
        
        
        return int(action)
        
    def think_softmax(self, game, invalid=[]):  #ignore softmax temperature option
        
        # ignore softmax temperature option
        # instead use epsilon and softmax combined.
        # Lets see if it works.
        
        if not self.final and random.random() < self.epsilon():
            action = random.randint(0, 11)
            while action in invalid:
                action = random.randint(0, 11)
        
        with torch.no_grad():
            self.net.eval()
            net_out = self.net(torch.tensor([game.board[0] + game.board[1]], dtype=torch.float32))
            net_softmax = torch.softmax(net_out, 1, dtype=torch.float64)[0]
            net_softmax /= net_softmax.sum()
            if self.final:
                action = int(torch.argmax(net_softmax))
            else:
                action = int(np.random.choice(list(range(12)), p=net_softmax))
            
        while action in invalid:
            net_out[0][action] -= 1000
            net_softmax = torch.softmax(net_out, 1, dtype=torch.float64)[0]
            net_softmax /= net_softmax.sum()
            if self.final:
                action = int(torch.argmax(net_softmax))
            else:
                action = int(np.random.choice(list(range(12)), p=net_softmax))
        
        return int(action)
            
    
        
        

In [5]:
 class Arena():
    def __init__(self, game, aPlayer, bPlayer, disp = False):
        
        import random
        import numpy as np
        
        self.game = game
        
        if aPlayer.pos == bPlayer.pos:
            print("Both Players have the same position!")
            self.player = [aPlayer, bPlayer]
            self.game.game_end = True
        else:
            self.player = [aPlayer, bPlayer][::bPlayer.pos - aPlayer.pos]
        
        
        self.disp = disp
        
        if self.disp:
            print("Game initialized!")
            print(self.__str__())
            
    def __str__(self):
        out = f'Top Player:    {self.player[0].name}. Score: {self.player[0].score}\n'
        out += f'Bottom Player: {self.player[1].name}. Score: {self.player[1].score}\n'
        out += str(self.game)
        out += f'\nNext Player: {[self.player[0].name, self.player[1].name][self.game.turn]}'
        return out if self.disp else ''
        
    
    def reset(self):
        self.game.reset()
        for p in self.player:
            p.reset()
        
        
        
    def play(self):
        
        # Whose turn is it?
        player = self.player[self.game.turn]
        
        # Put what happened since the last move into Player's Memory
        if player.action != -1:
            player.memory.add(player.previousState, 
                              player.action, 
                              player.reward, 
                              self.game.board,
                              self.game.game_end)
        # Reset reward
        player.reward = 0
        
        # Save current board for future memory
        player.previousState = self.game.board
        
        # Let player choose action
        player.action = player.think(self.game)
        
        # Check if this was a valid move
        invalid = []
        while not self.game.valid_action(player.action, player.pos):
            # print('AI chose invalid action')
            # Save this wrong move with a penalty of -10!
            player.memory.add(self.game.board,
                             player.action,
                             -10,
                             self.game.board,
                             False)
            invalid.append(player.action)
            player.action = player.think(self.game, invalid)
        
        
        # If AI, print net output
        if self.disp and player.name == 'AI':
            print(list(player.net(torch.tensor([self.game.board[0] + self.game.board[0]], dtype=torch.float32))))
        
        # Display choice
        if self.disp: print(f'{player.name} chooses {player.action}')
        
            
        # Perform action and get reward from Game
        reward = self.game.action(player.action)
        
        # Display reward
        if self.disp: print(f'Reward: {reward}')
        
        # player gets positive reward
        player.reward += reward
        player.score += reward
        
        # opposite player gets negative reward
        self.player[1 - player.pos].reward -= reward
        
        # check if game is over: 
        if self.game.end_check():
            reward = self.game.final_reward()
            self.player[self.game.turn].reward -= reward
            self.player[1 - self.game.turn].reward += reward
            self.player[1 - self.game.turn].score += reward
            
            self.player[self.game.turn].memory.add(self.player[self.game.turn].previousState,
                                                   self.player[self.game.turn].action,
                                                   self.player[self.game.turn].reward,
                                                   None,
                                                   self.game.game_end)
            
            self.player[1 - self.game.turn].memory.add(self.player[1 - self.game.turn].previousState,
                                                   self.player[1 - self.game.turn].action,
                                                   self.player[1 - self.game.turn].reward,
                                                   None,
                                                   self.game.game_end)
            if self.disp: print('Game Over')
            
        else:
            if self.disp: print(self.__str__())
            self.play()
            
    def test(self, player0, player1, num_games=500, disp=False):
        testarena = Arena(self.game, player0, player1, disp=disp)
        
        player0.final = True
        player1.final = True
        
        res = [[],[]]
        
        ## Play some games
        for i in range(num_games):
            testarena.reset()
            testarena.play()
            res[0].append(player0.score)
            res[1].append(player1.score)
            
        print(f'Average Score: Top {player0.name}: {sum(res[0])/len(res[0])} - Bottom {player1.name}: {sum(res[1])/len(res[1])}')
        
        
        
        player0.final=False
        player1.final=False
    