## utils

---

> Internship neural networks
>
> Group 4: Reinforcement learning
>
> Deadline 28.02.23 23:59

---

In [7]:
import numpy as np
import matplotlib.pyplot as plt
from collections import namedtuple

# Create replay buffer memory

This is the priorized experience replay buffer (PER)
One transition is a tuple (s,a,r,s')

- Samples transitions according to their priority
- Automatically remove old values
- Sample batch_size many uncorrelated and not repeated transitions from the memory

In [8]:
Transition = namedtuple('Transition',
                        ('state', 'action', 'reward', 'next_state'))


class ReplayMemory(object):
    '''
    The prioritized replay buffer of an agent. The replay buffer enables uncorrelated training of the 
    dqn and helps therefore to stabilize the training and avoid catastrophic forgetting.
    
    capacity (float): the capacity of transitions that fit into the replay buffer
    alpha (float): how strongly the priorization affects the sampling
    '''

    def __init__(self, capacity, alpha=0.8) -> None:
        self.memory = []
        self.priorities = np.array([])
        self.prioritized_sampling_prob = np.ones(capacity)/capacity
        self.alpha = alpha
        self.capacity = capacity

    def push(self, *args) -> None:
        """
        Pushes a transition into the replay buffer
        
        args: The tuple (s,a,r,s',d, p) for one transition
        """
        priority = 1.0 if len(self.memory) == 0 else np.median(self.priorities)
        if(len(self.priorities) == self.capacity):
            idx = self.priorities.argmin()
            self.priorities[idx] = priority
            self.memory[idx] = Transition(*args)
        else:
            self.priorities = np.append(self.priorities, priority)
            self.memory.append(Transition(*args))
            
        assert(len(self.memory) == len(self.priorities))

    def sample(self, batch_size):
        '''
        Samples a random batch of transitions from the replay buffer
        
        batch_size: The size of the batch we sample
        
        returns: sampled_batch, list of indices
        '''
        #priorities = np.array(self.memory["priority"])
        priorities = self.priorities**self.alpha
        sampling_prob = priorities * (1/ priorities.sum())
        self.prioritized_sampling_prob = sampling_prob
        
        indices = np.random.choice(np.array(range(len(self.memory)), dtype=np.int64), batch_size, p=sampling_prob)
        
        return [self.memory[i] for i in indices], indices
    
    def update_priorities(self, indices, priorities) -> None:
        '''
        Updates the priorities by the difference between expected and actual q values(called in the optimizer)
        
        indices (int[]): indices of the batch
        priorities (float[]): new priorities for the transitions
        '''
        for idx, priority in zip(indices, priorities):
            self.priorities[idx] = priority
            
    def pop(self) -> None:
        '''
        Pops the last sample from the replay buffer
        '''
        #batch = Transition(*zip(*transitions))
        self.priorities = self.priorities[:-1]
        self.memory = self.memory[:-1]
        self.alpha*=0.99995

    def __len__(self):
        '''
        Returns the length of the transitions that are currently in the replay buffer
        '''
        return len(self.memory)

# Creating plot for win_rate and average steps taken

In [9]:
def create_plots(th):
    '''
    Generates plots for the training history
    
    - Plot for the winrate with respect to the episode number
    - Plot for the average steps to win wrt. the episode number
    '''
    plt.plot(th[:, 0], th[:, 1], c='c')
    win_rate_moving_average = np.convolve(th[:,1], np.ones(8)/8, mode='valid')
    plt.plot(np.linspace(100, 10000, len(win_rate_moving_average)), win_rate_moving_average, c='b', label='moving average of win rate')
    plt.legend()
    plt.title('Playing against random agent')
    plt.xlabel('Episode no.')
    plt.ylabel('Win rate')
    plt.show()
    plt.savefig("winrate.png")


    plt.plot(th[:, 0], th[:, 2], c='c')
    win_steps_taken_moving_average = np.convolve(th[:,2], np.ones(8)/8, mode='valid')
    plt.plot(np.linspace(100, 10000, len(win_rate_moving_average)), win_steps_taken_moving_average, c='b', label='moving average of win steps taken')
    plt.legend()
    plt.title('Playing against random agent')
    plt.xlabel('Episode no.')
    plt.ylabel('Average steps taken for a win')
    plt.show()
    plt.savefig("avg_steps.png")

### Testing the win rate

In [10]:
# win rate test
def win_rate_test(player1, player2, number_of_games):
    '''
    Tests the win rate of the agents
    
    player1: The first player
    player2: The second player
    number_of_games: number of times the win rate is tested
    
    returns: relative win rate and average win moves for both players
    '''
    win_moves_taken_list_p1 = []
    win_moves_taken_list_p2 = []
    wins_p1 = 0
    wins_p2 = 0
    
    # play games and collect data who won
    for i in range(number_of_games):
        env.reset()
        win_moves_taken = 0

        while not env.isDone:
            state = env.board_state.copy()
            available_actions = env.get_available_actions()
            action = player1.select_action(state, available_actions, training=False)
            state, reward = env.make_move(action, 'p1')
            win_moves_taken += 1
                
            if reward == 10:
                win_moves_taken_list_p1.append(win_moves_taken)
                wins_p1 += 1
                break

            available_actions = env.get_available_actions()
            action = player2.select_action(state, available_actions, training=False)
            state, reward = env.make_move(action, 'p2')
            
            if reward == 10:
                win_moves_taken_list_p2.append(win_moves_taken)
                wins_p2 += 1
                break
    
    average_win_moves_taken_p1 = -1
    average_win_moves_taken_p2 = -1
    
    if len(win_moves_taken_list_p1) > 0:
        average_win_moves_taken_p1 = sum(win_moves_taken_list_p1)/len(win_moves_taken_list_p1)
    if len(win_moves_taken_list_p2) > 0:
        average_win_moves_taken_p2 = sum(win_moves_taken_list_p2)/len(win_moves_taken_list_p2)
        
    statistic = (wins_p1, wins_p2, number_of_games - (wins_p1 + wins_p2))
    
    print("Absolute wins_p1, wins_p2, draws: ", statistic)
    print("Relative wins_p1, wins_p2, draws ", list(map(lambda s: s/number_of_games,statistic)))


    return wins_p1/number_of_games, average_win_moves_taken_p1, wins_p2/number_of_games, average_win_moves_taken_p2