In [8]:
#General imports
import numpy as np
import pandas as pd
from scipy import odr
from scipy.stats import sem, t, ttest_ind, iqr, gamma, ks_2samp, linregress, normaltest, f_oneway

import matplotlib.pyplot as plt
%matplotlib inline

from snakeGame.screen import Screen

#Reinforced Learning
from learningModels.typesSnakeGame import SnakeAI, SnakeAIBorders
from learningModels.process import LearningProcess, compute_avg_return

#Auxiliar functions
from utils import read_data, sampler

In [2]:
def paths_to_data(snake_games, rewards):
    """Returns a list of lists with the paths to the losses and returns .csv files of each iteration.
    (The loop its fixed to 3 iterations for simplicity)
    
    snakes (list) : Its a list of objects (SnakeAI or SnakeAIBorders).
    rewards (list): Its a list of dictionaries with the rewards used in the training.
    """
    
    losses_and_returns = []

    for snake in snake_games:
        for num, reward in enumerate(rewards):
            for i in range(3):
                paths = []
                paths.append('./trained_agents/' + snake.__class__.__name__ + '_{}_{}_{}'.format(len(snake.state()), num, i) + '/losses.csv')  
                paths.append('./trained_agents/' + snake.__class__.__name__ + '_{}_{}_{}'.format(len(snake.state()), num, i) + '/returns.csv')
                losses_and_returns.append(paths)
    
    return losses_and_returns

In [3]:
def visual_return_losses(df_returns, df_losses, titles=True, reward=0):
    """Visualization of the returns and losses. The in the returns plot are represented the evolution
    along the iterations and its average. (Its fixed to 75000 iterations where each 5000 and 3000 where tooked
    the data of the returns and losses).
    
    df_returns (DataFrame): returns of the network.
    df_losses (DataFrame): losses of the network.
    titles (Bool): Define if adds titles or not.
    reward (int): Position of the reward int the rewards list.
    """
    
    fig, ax = plt.subplots(1, 2, figsize=(12,7))

    if titles:
        #Title
        fig.suptitle('Training process performance \n {}'.format(REWARDS[reward]), fontsize=16)
        ax[0].set_title('Evolution of the Average Return ')
        ax[1].set_title('Evolution of the loss')
    
    #Data
    samples = df_returns[df_returns.columns[0:-1]]
    average = df_returns['average']
    iterations_avg = range(0, 75000 + 1, 5000)
    iterations_loss = range(0, 75000, 3000)
    #Graphs
    #Graph 1
    for i in samples.columns:
        ax[0].plot(iterations_avg, samples[i], alpha = 0.15, color='gray')
    ax[0].plot(iterations_avg, average, color='red', label='average')
    ax[0].set_ylabel('Average Return')
    ax[0].set_xlabel('Iterations')
    ax[0].legend()
    #Graph 2
    ax[1].plot(iterations_loss, df_losses)
    ax[1].set_ylabel('Loss')
    ax[1].set_xlabel('Iterations')
    plt.show()

In [4]:
def general_sample_visualization(losses_and_returns, reward):
    """Visualization of the groups of iterations for each reward. (the function its fixed to 3 iterations, and 
    10 samples per log of returns)
    
    reward (int): Position of the reward int the rewards list.
    """
    
    if reward > 0:
        reward = reward*3

    for i, val in enumerate(losses_and_returns[reward:reward+3]):
        df_losses = pd.read_csv(val[0], names=['losses'])
        df_returns = pd.read_csv(val[1], names=[str(i) for i in range(10)])
        df_returns['average'] = df_returns.mean(numeric_only=True, axis=1)
        titles = True if i == 0 else False
        
        visual_return_losses(df_returns, df_losses, titles, int(reward/3))

In [5]:
def average_comparative(losses_and_returns, reward):
    """Comparation of the iteration's average (its fixed the amount of iterations to 75000 where every 5000
    where tooked 10 samples).
    
    reward (int): Position of the reward int the rewards list.
    """
    
    if reward > 0:
        reward = reward*3
    
    fig, ax = plt.subplots(figsize=(10,7))
    iterations = range(0, 75000 + 1, 5000)
    fig.suptitle('Average Returns in each Iteration', fontsize=18)
    ax.set_xlabel('Iterations', fontsize=12)
    ax.set_ylabel('Returns', fontsize=12)

    for i, val in enumerate(losses_and_returns[reward:reward+3]):
        df_losses = pd.read_csv(val[0],  names=['losses'])
        df_returns = pd.read_csv(val[1], names=[str(num) for num in range(10)])
        df_returns['average'] = df_returns.mean(numeric_only=True, axis=1)
        ax.plot(iterations, df_returns['average'], label=str(i))
    ax.legend()
    plt.show()

In [6]:
def samples_per_iteration(reward, samples):
    
    """Calculate a specific number of samples for the reward for each iteration
    (the function its fixed to 3 iterations).
    
    reward (int): Position of the reward int the rewards list.
    samples (int): Amount of samples that will be taken.
    
    returns a pandas DataFrame
    """
    
    df = pd.DataFrame()
    
    for i in range(3):
        sample = pd.DataFrame(sampler(SNAKE_GAMES[0], REWARDS[reward], reward, i, samples), columns=[str(i)])
        df = pd.concat([df, sample], axis=1)
    
    return df    

In [6]:
def histogram(data, bins=30):
    """Histogram with the means and standard errors (the function its fixed to 3 iterations)."""
    
    fig, ax = plt.subplots(1, 3, figsize=(16,8))
    
    for i in range(3):
        ax[i]
        ax[i].hist(data.iloc[:,i], bins=bins)
        ax[i].set_title('iteration {}'.format(i))
        ax[i].set_xlabel('Total Reward')
        ax[i].set_ylabel('Frequency')
        ax[i].axvline(data.iloc[:,i].mean(), color='red')
        
        avg = data.iloc[:,i].mean()
        s = np.std(data.iloc[:,i])
        
        data_str = '\n'.join((r'$\bar{x}=%.2f$' % (avg, ),
                             r'$\mathrm{s}=%.2f$' % (s, )))
        
        ax[i].text(0.6,0.85, data_str, fontsize=13, transform=ax[i].transAxes )
    
    plt.show()

In [8]:
def delete_outliers_df(data):
    """Delete the outliers using the inter quantile range (the function its fixed to 3 iterations).
    
    data (DataFrame): Data of the samples.
    """

    for i in range(3):
        left, right = outliers_limits(data.iloc[:,i])
        mask = (data.iloc[:,i] < left) | (data.iloc[:,i] > right)
        data.loc[mask, str(i)] = None
    
    

In [9]:
def delete_outliers(data):
    left, right = outliers_limits(data)
    data = [i for i in data if (i>=left and i<=right)]
    return data

In [10]:
def outliers_limits(data):
    inter_qr = iqr(data)
    q_25 = np.quantile(data, 0.25)
    q_75 = np.quantile(data, 0.75)
    left = q_25 - 1.5 * inter_qr
    right = q_75 + 1.5 * inter_qr
    return left, right

In [11]:
def confidence_interval(data, confidence=0.05):
    """Evaluates the confidence intervals for each iteration.
    
    data (DataFrame): Samples data.
    """
    
    n = len(data)
    mean = data.mean()
    std_error = sem(data, nan_policy='omit')
    
    z = t.ppf( 1 - confidence/2, n)
    
    left = mean - std_error*z
    right = mean + std_error*z
    
    intervals =pd.DataFrame([list(left), list(right)], columns=[i for i in range(3)], index=['left', 'right']).transpose()
    
    return intervals

In [12]:
def test_means(best, data):
    """Evaluate a two side hypothesis t-test considering distinc variances.
    
    best (int): Number of the best iteration.
    data (DataFrame): Data of the samples.
    """
    results = {}
    for i in range(len(data.columns)):   
        if i == best:
            continue
        else:
            results['{}_vs_{}'.format(best, i)] = (list(ttest_ind(data.iloc[:,best], data.iloc[:,i], equal_var=False, nan_policy='omit')))
    
    df_results = pd.DataFrame(results, index=['Statistic', 'p_value']).transpose()
    
    return df_results

In [13]:
def time_diff(games):
    """Evaluate the time difference between punctuation for a list of games
    
    games (list): List of lists with n number of games of certain length
                  (the games are get it with the sampler function).
                  
    outliers (bool): Bool parameter to define if result its return with or 
                     without outliers.
    """
    
    duration_diff = []
    for game in games:
        x = range(len(game)-1)
        game_diff = []

        #Difference between two steps in the game
        for i in x:
            game_diff.append(game[i+1]-game[i])

        pos = []
        #Step where the punctuation change
        for j, val in enumerate(game_diff):
            if val == 1.0:
                pos.append(j)

        #Actual difference in "time"        
        z = range(len(pos)- 1)
        for k in z:
            duration_diff.append(pos[k+1]-pos[k])
    
    return duration_diff
            

In [4]:
def gamma_aproximation(data, test=True):
    """Plot the data samples and the the fitted gamma distribution
    
    data (list): list with the time between punctuation (result of the function 'time_diff')
                 
    test (bool): Bool value to execute or not the kolmogorov test.
    """
    
    #Parameters of gamma distribution
    fit_alpha, fit_loc, fit_beta = gamma.fit(data)
    #Probability density function
    max_duration = np.max(data)
    density = np.linspace(0, max_duration, max_duration + 1)
    distribution = gamma.pdf(density, fit_alpha, loc=fit_loc, scale=fit_beta)
    
    #Kolmogorov-smirnov test 
    if test:
        samples = np.histogram(data,bins=15, density=True)[0]
        result = round(ks_2samp(samples, distribution)[1], 2)
        title = r'Time between punctuation (fit $p$ = {})'.format(result)
    else:
        result = r'Time between punctuation'
        
    avg = np.mean(data)
    s = np.std(data)
    
    fig, ax = plt.subplots()
    ax.hist(data, bins=15, density=True, label='Samples')
    ax.plot(distribution, label='Gamma pdf')
    ax.set_title(title , fontsize='14')
    ax.set_xlabel('Time (steps)')
    data_str = '\n'.join((r'$\bar{x}=%.2f$' % (avg, ),
                             r'$\mathrm{s}=%.2f$' % (s, )))
        
    ax.text(0.6,0.6, data_str, fontsize=13, transform=ax.transAxes )
    ax.axvline(avg, color='red', linestyle='--')
    ax.legend()
    plt.show()
    
    return distribution
    

In [2]:
def stepsScoreValues(games):
    """Process the games information to take the final score and the duration of each game
    (the result is sorted).
    
    games (list): List of lists with n number of games of certain length
                  (the games are get it with the sampler function).
    
    """
    #Last score and amount of steps
    last = []
    duration = []
    for game in games:
        last.append(game[-1])
        duration.append(len(game))
    
    #Sort the values on duration
    values = zip(duration, last)
    sort_values = sorted(values)
    tuples = zip(*sort_values)
    x, y = [list(i) for i in tuples]
    
    return x, y
    

In [3]:
def deleteStuckGames(games):
    limit_steps = 30000
    clean_games = []
    for game in games:
        if len(game) != limit_steps:
            clean_games.append(game)
    
    return clean_games

In [4]:
def plotPathGames(games):
    fig, ax = plt.subplots(figsize=(8,8))
    
    for game in games:
        x = range(len(game))
        ax.plot(x, game, alpha=0.35)
        
    ax.set_xlabel('Steps', fontsize=16)
    ax.set_ylabel('Score', fontsize=16)
    ax.set_title('Games Evolution (Steps vs Score)', fontsize=20)

In [5]:
def transformRegression(x, y):
    """Regression of the model final score vs duration in steps. The behaviour of the 
    data isn't linear without a simple transformation (y = x / (b0 + b1x), in this function it is 
    calculated the simple linear regression of the transform and the equivalence in the original data.
    x, y (lists): List with the last score and duration in steps of a set of games.
                  (result of the function 'score_steps')"""
    x_transform = list(map(lambda x: 1/x, x))
    y_transform = list(map(lambda y: 1/y, y))
    reg = linregress(x_transform, y_transform)
    
    fig, ax = plt.subplots(2,2, figsize=(12,12))
    
    #Transform Reg.
    transform_regression = reg.intercept + reg.slope*(np.array(x_transform))
    ax[0,0].scatter(x_transform, y_transform)
    ax[0,0].plot(x_transform, transform_regression, color="orange")
    ax[0,0].set_title('Transform Regression', fontsize=16)
    #ax[0].set_title('Sr = {}'.format(reg.rvalue**2))
    #Residuals
    ax[0,1].scatter(x_transform, y_transform - transform_regression)
    ax[0,1].axhline(0, color="red", linestyle='--')
    ax[0,1].set_title('Residuals', fontsize=16)
    #Real Reg.
    real_regression = list(map(lambda x: x/(reg.slope + reg.intercept*x), np.array(x)))
    ax[1,0].scatter(x,y)
    ax[1,0].plot(x, real_regression, color="orange")
    ax[1,0].set_title("Real Regression", fontsize=16)
    #Real Residuals
    ax[1,1].scatter(x, np.array(y) - np.array(real_regression))
    ax[1,1].axhline(0, color="red", linestyle='--')
    ax[1,1].set_title("Residuals", fontsize=16)
    
    fig.suptitle('Regression Duration vs Score', fontsize=20)
    return transform_regression, real_regression
    

In [1]:
def survival_analysis(games_max_steps):
    """Survival analysis using the Kaplan-Meier approach for the set of games, also include the confidence
    intervals.
    games_max_steps (list): a list with the duration in steps for the set of games.
    """
    deads_per_step = {}
    for val in np.sort(games_max_steps):
        if val in deads_per_step:
            deads_per_step[val] += 1
        else:
            deads_per_step[val] = 1
    
    #Kaplan-Meier approach
    max_step = np.sort(games_max_steps).max()
    steps = range(1, max_step + 1)
    probabilities = []
    num_alive = len(games_max_steps)
    prev_result = 1
    acum_estimate = 0
    boundaries = []
    
    for i in steps:
        num_deads = deads_per_step[i] if i in deads_per_step else 0
        result = prev_result * ((num_alive - num_deads) / num_alive)
        probabilities.append(result)
        #Confidence Intervals
        try:
            estimate = num_deads/(num_alive*(num_alive-num_deads))
        except ZeroDivisionError:
            pass
        
        acum_estimate = acum_estimate + estimate
        boundaries.append( 1.96 * result * np.sqrt(acum_estimate))
        
        #Update values
        prev_result = result
        num_alive = num_alive - num_deads
    
    fig, ax = plt.subplots(figsize=(8,8))
    
    up = np.array(probabilities) + np.array(boundaries)
    down = np.array(probabilities) - np.array(boundaries)
    ax.plot(steps, up, color="blue", linestyle='--', alpha=0.35)
    ax.plot(steps, down, color="blue", linestyle='--', alpha=0.35)
    ax.plot(steps, probabilities, color="red")
    ax.set_title('Kaplan_Meier Survival Curve with Confidence Intervals', fontsize=18)
    ax.set_xlabel('Duration', fontsize=16)
    ax.set_ylabel('Survival Probability', fontsize=16)
    
    return up, down, probabilities, steps
    

In [18]:
def response_histogram(x,y):
    fig, ax = plt.subplots(1,2, figsize=(12,6))
    #Steps
    ax[0].hist(x, bins=20)
    ax[0].set_title('Duration', fontsize=16)
    
    avg = np.mean(x)
    s = np.std(x)  
    data_str = '\n'.join((r'$\bar{x}=%.2f$' % (avg, ),
                             r'$\mathrm{s}=%.2f$' % (s, )))   
    ax[0].text(0.6,0.85, data_str, fontsize=13, transform=ax[0].transAxes )
    ax[0].axvline(np.mean(x), color='red', linestyle='--')
        
    #Score
    ax[1].hist(y, bins=20)
    ax[1].set_title('Score', fontsize=16)
    avg = np.mean(y)
    s = np.std(y)  
    data_str = '\n'.join((r'$\bar{x}=%.2f$' % (avg, ),
                             r'$\mathrm{s}=%.2f$' % (s, )))   
    ax[1].text(0.6,0.85, data_str, fontsize=13, transform=ax[1].transAxes )
    ax[1].axvline(np.mean(y), color='red', linestyle='--')
    
    #Hist 2d
    fig, ax= plt.subplots(figsize=(12,8))
    ax.hist2d(x,y, bins=60)
    ax.set_xlabel('Steps', fontsize=14)
    ax.set_ylabel('Score', fontsize=14)

In [15]:
def checkNormalityModels(data):
    
    for i in range(3):
        iteration = np.histogram(data[data.iloc[:,i].notnull()].iloc[:,i],bins=20, density=True)[0]
        p_val = normaltest(iteration)[1]
        
        print('The iteration {} is normal with {} of confidence'.format(i, p_val) )
        

In [3]:
def checkNormalitySamples(dataList):
    rewardNum = 0
    for sample in dataList:
        histogram = np.histogram(sample, bins=20)[0]
        p_val = normaltest(histogram)[1]
        
        print('The reward {} have a p value of {} to check its normality'.format(rewardNum, p_val))
        rewardNum += 1

In [5]:
def anovaTestModels(data):
    iter_1 = data[data.iloc[:,0].notnull()].iloc[:,0]
    iter_2 = data[data.iloc[:,1].notnull()].iloc[:,1]
    iter_3 = data[data.iloc[:,2].notnull()].iloc[:,2]
    print(f_oneway(iter_1,iter_2,iter_3))