3rd iteration of the project, building a temporal difference learning training set

also renaming player to agent because apparently that is what you do

heavily relying on this repository: 
https://github.com/ltbringer/tic_tac_toe

In [1]:
import numpy as np

## Main Functions

In [277]:
def main():
    """
    Creates an agent, sets config parameters, runs trainings.
    """
    
    agent = Agent(strategy = "TD")

    # configuration parameters
    
    # how many times training is ran
    number_of_trainings = 1000000
    
    # the type of dicerolling game we are playing
    number_of_diceroll = 5
    
    train(agent, number_of_trainings, number_of_diceroll)

In [201]:
def train(agent, number_of_trainings, number_of_dicerolls):
    
    agent_results = {'mdl': agent, 'wins':0}
    
    for i in range(number_of_trainings):
        
        game = Game(number_of_dicerolls)
        
        while not game.over:
            
            # first, player looks at the state of the game and makes a decision
            player_move = agent.select_move(game)
            
            # then, we insert that decision into the game
            # in this case, it is a true or false decision
            game.play_one_round(player_move)
        

        # once the game is over, optimize agent
        reward = game.die_roll_to_reward(game.current_die_roll)
        agent.on_reward(reward)

    for key in sorted(agent.states.keys()):
        print(key, agent.states[key], agent.states[key][0,0] < agent.states[key][0,1])

## Game Class

In [4]:
import numpy as np

class Game:
    
    def __init__(self, number_of_dicerolls):
        
        self.number_of_dicerolls = number_of_dicerolls
        
        self.over = False
        self.current_player_decision = True
        self.current_round = 1
        
        #initiate game with one opening roll
        self.current_die_roll = self.die_roll()
        
    def die_roll(self, side = 6):
        """
        Returns a random integer i, 1 <= i <= side
        uniform distribution
        side is 6 by default, so it simulates 1d6
        """
        
        return np.random.randint(1,side+1)
    
    def is_game_over(self):
        """
        Checks if the game is over, which can happen two ways:
        - either we rolled the last dice, and there is no more player decision
        - or the latest player decision was False
        """
        
        if (self.remaining_rounds() <= 0) or (not(self.current_player_decision)):
            gameover =  True
        else:
            gameover = False
        return gameover
    
    def advance_one_round(self):
        self.current_round += 1
        
    def remaining_rounds(self):
        return self.number_of_dicerolls - self.current_round
    
    def play_one_round(self, player_move):
        """
        Checks player move is True or False
        if True, rolls another die
        if False, stops the game
        """
        
        # player move re-coded as [0,1] = True, [1,0] = False
        
        if np.array_equal(player_move, np.array([1,0])):
            self.current_player_decision = False
        else: 
            self.current_player_decision = True
        
        if self.current_player_decision == True:
            self.advance_one_round()
            self.current_die_roll = self.die_roll()
        else:
            pass
            
        # also checks if game is over at this point
        self.over = self.is_game_over()
        
    def die_roll_to_reward(self, die_roll_value):
        """
        Recodes results to rewards spanning from -1 to +1
        """
        if die_roll_value == 1:
            reward = -1
        elif die_roll_value == 2:
            reward = - 0.6
        elif die_roll_value == 3:
            reward = - 0.2
        elif die_roll_value == 4:
            reward = 0.2
        elif die_roll_value == 5:
            reward = 0.6
        elif die_roll_value == 6:
            reward = 1
        
        return reward

## Agent Class

In [275]:
import numpy as np

class Agent: 
    """
    Agent playing the game.
    Keeps track of the prior experiments, makes a decision which can be exploration or exploitation. 
    """
    
    def __init__(
        self, exploration_rate=1, decay=0.01, learning_rate=0.005, discount_factor=1, min_exploration_rate = 0.1,
        strategy = "random"
    ):
        
        self.states = {}
        # dictionary of states, each state key is built from two integer: 
        # (current_die_value, remaining_dice_rolls)
        # handled by staticmethod serialize_game
        
        self.state_order = []
        # order of the states agent went through, empty the first time an agent is created
        # will be emptied after each game too, when we handle the rewards
        
        self.learning_rate = learning_rate
        self.decay = decay
        self.discount_factor = discount_factor
        self.exploration_rate = exploration_rate
        self.min_exploration_rate = min_exploration_rate
        
        self.strategy = strategy
    
    def select_move(self, game):
        """
        Takes in state of the game, which consists of two integers:
            - current_die_roll: the value showing on current die
            - remaining_rolls: number of potential dice rolls after the current one
        If player decides FALSE, the game stops, and the current_die_roll is the outcome
        If player decides TRUE, the game continues with new die roll and -1 remaining_rolls
        """
        
        # TODO
        # obviously build this up lol, for now, testing environment
        
        if self.strategy == "external_strategy":
            if game.current_die_roll > 4:
                decision = False
            else:
                decision = True
        elif self.strategy == "random":
            decision=np.random.choice([True, False])
        elif self.strategy == "devmode":
            #testing out different functions
            #right now: will it pick up the 4-2 optimal move is True, if it plays correctly in last round
            if game.remaining_rounds() == 1:
                if game.current_die_roll > 4: 
                    decision = False
                else:
                    decision = True
            else:
                decision=np.random.choice([True, False])
        elif self.strategy == "TD":
            p =  np.random.uniform(0,1)
            exploration = (p < self.exploration_rate)
            if exploration == True:
                decision = self.explore()
            else:
                decision = self.exploit(game)
        
        # action is True or False, coded as: 
        # True: [0, 1]
        # False: [1, 0]
        
        if decision == False:
            action = np.array([1,0])
        else: 
            action = np.array([0,1])
        
        # stores the state - action pair the player made for this step
        self.set_state(game, action)
        
        return action
    
    def set_state(self, game, action):
        """
        store the action performed in a given state of the game
        """
        state_key = Agent.serialize_game(game)
        self.state_order.append((state_key, action))

    
    def on_reward(self, reward_value):
        """
        Reward is the die roll that we had at the end of the game. 
        The higher the better. 
        """        
        while self.state_order:
            # while there are states saved in the order list, keeps going 
            
            state_key, action = self.state_order.pop()
            # get the state and action pairs
            
            old_reward = self.states.get(state_key, np.zeros((1,2)))
            # old state is either whatever was in that state previously, or an empty 1*2 matrix
            
            reward_increment = self.learn_by_temporal_difference(old_reward, reward_value, state_key, action)
            # reward increment and old_reward are a 1 by 2 np array, 
            # first element: the cumulative reward for cases when the chosen action was False
            # second: cumulative reward for cases when action was True
            
            self.states[state_key] = old_reward + reward_increment
            
            reward_value *= self.discount_factor
            # reduce the reward after each step
            
#             self.log_analysis(state_key, action, reward_value, old_reward, reward_increment)
            
        self.exploration_rate = max(self.exploration_rate - self.decay, self.min_exploration_rate)
        # we decrease future exploration rate in this step

    def learn_by_temporal_difference(self, old_reward, reward_value, state_key, action):

        
        return self.learning_rate * (((reward_value * action) - old_reward) * action)
        # following temporal learning formula
        # reward value * action: makes a 1 x 2 matrix from the reward value
        # minus old_reward: subtract the old saved rewards
        # * action again: we are only interested in the parts of the matrix that was impacted by the action
        # everything else is zeroed out, because we want to keep the old reward values there, considering
        # we dont have new information regarding that state - action pair
        
    def explore(self):
        """
        Randomly explores the possible actions, without considering the history.
        In our case, this is a random True or False.
        """
        decision = np.random.choice([True, False])
        return decision
    
    def exploit(self, game):
        """
        Checks if we have any prior information regarding this particular state. 
        If not, reverts back to exploration. 
        If yes, picks the action with the highest cumulated reward assigned to it. 
        """
        state_key = Agent.serialize_game(game)
        if state_key in self.states:
            historical_reward = self.states[state_key]
            if historical_reward[0,0] > historical_reward[0,1]:
                decision = False
            else:
                decision = True
            #TODO find a way to make this more robust    
            
#             print(historical_reward)
#             print(decision)
            
        else:
            decision = self.explore()
            #even if we wanted to exploit, there is nothing to move on, revert to explore
            
        return decision
    
    @staticmethod
    def serialize_game(game):
        """
        For now, it is very simple, game state is defined by two integers: 
        current_die_roll, remaining rounds
        Returns these in a string with a "-" in the middle
        current_die_roll = 3 and remaining_rounds = 12 returns "3-12"
        """
        current_die_roll = game.current_die_roll
        remaining_rounds = game.remaining_rounds()
        return str(current_die_roll) + "-" + str(remaining_rounds)
    
    def log_analysis(self, state_key, action, reward_value, old_reward, reward_increment):
        """
        Created to track how one key gets the final value. 
        """
        if True:
            print(f"{state_key} key was encountered in this round")
            print(f"action chosen: {action}")
            print(f"reward to give to all states was : {reward_value}")
            print(f"old cumulative reward was: {old_reward}")
            print(f"reward is going to be increased by: {reward_increment}")
            print(f"so now the cumulative reward is: {self.states[state_key]}")
            print("")
            
            if (state_key[-1] == str(2)) or (action[1] == 1 ):
                print("--------------------------")
                print("")
        

In [278]:
main()

1-1 [[-0.99997397 -0.01910152]] True
1-2 [[-0.99999998  0.25753547]] True
1-3 [[-1.          0.40410508]] True
1-4 [[-1.          0.53263981]] True
2-1 [[-0.59999153 -0.01650078]] True
2-2 [[-0.59999999  0.2838495 ]] True
2-3 [[-0.6         0.38229775]] True
2-4 [[-0.6         0.49330816]] True
3-1 [[-0.19999737 -0.00587907]] True
3-2 [[-0.19999999  0.22689457]] True
3-3 [[-0.2         0.37763388]] True
3-4 [[-0.2        0.4962943]] True
4-1 [[0.2        0.03144677]] False
4-2 [[0.2        0.23823287]] True
4-3 [[0.2        0.47342095]] True
4-4 [[0.2       0.5217909]] True
5-1 [[0.6        0.02504011]] False
5-2 [[0.6        0.23787624]] False
5-3 [[0.6        0.42414841]] False
5-4 [[0.6        0.49288091]] False
6-1 [[ 1.         -0.02222511]] False
6-2 [[1.         0.28468875]] False
6-3 [[1.         0.37943137]] False
6-4 [[1.        0.5282986]] False


In [251]:
# min exploration rate of 0.2 means that with 0.2 probability, we will stay at 3.5 expected value
0.2 * 3.5 + 0.8 * 4.25

4.1000000000000005

In [None]:
#still more than 4, should converge