# TP1 - Computação Natural

Trabalho realizado pela aluna Isadora Cardoso.

In [None]:
import os
import time
import pandas as pd
import numpy as np

import random as rnd
import matplotlib.pyplot as plt

# progress bar
from tqdm.notebook import tqdm

# implementação da programação genetica
from genetic_programming.GP import GeneticProgramming

rnd.seed(333)

In [None]:
SMALL_SIZE  = 10
MEDIUM_SIZE = 15
BIGGER_SIZE = 20

plt.rc('font', size        = MEDIUM_SIZE)    # controls default text sizes
plt.rc('axes', titlesize   = SMALL_SIZE)    # fontsize of the axes title
plt.rc('axes', labelsize   = MEDIUM_SIZE)   # fontsize of the x and y labels
plt.rc('xtick', labelsize  = SMALL_SIZE)    # fontsize of the tick labels
plt.rc('ytick', labelsize  = SMALL_SIZE)    # fontsize of the tick labels
plt.rc('legend', fontsize  = SMALL_SIZE)    # legend fontsize
plt.rc('figure', titlesize = BIGGER_SIZE)   # fontsize of the figure title

In [None]:
def where_to_save(filename):
    os.makedirs(os.path.dirname(filename), exist_ok = True)

def get_mating_report(global_children, global_repeated, evaluation_rep, max_generations, n_test, save):
    
    all_reports = []

    for rep in range(evaluation_rep):

        df_children = pd.DataFrame(global_children[rep + 1].items())
        df_children = pd.DataFrame(df_children.iloc[:, 1].tolist())

        df_mating = pd.DataFrame()

        for g in range(max_generations):
            w = df_children.T[g].str.count("worse").sum()
            b = df_children.T[g].str.count("better").sum()
            e = df_children.T[g].str.count("equal").sum()
            n = df_children.T[g].str.count("none").sum()

            dict_children = {"worse": w, "better": b, "equal": e, "none": n}
            df_mating     = df_mating.append(dict_children, ignore_index = True)

        r = pd.DataFrame(global_repeated[rep + 1].items())[1]

        df_mating         = pd.concat([df_mating, r], axis = 1)
        df_mating.columns = list(df_mating.columns[:-1]) + ["repeated"]

        all_reports.append(df_mating)
        
        if save == True:
            filename = "results/{}/population_mating_evaluation{}.csv".format(n_test, rep + 1)
            where_to_save(filename)
            df_mating.to_csv(filename, index = False)
    
    return all_reports


def get_fitness_report(global_fitness, evaluation_rep, max_generations, n_test, save):

    all_fitness = []

    for rep in range(evaluation_rep):
        df_fitness         = pd.DataFrame(global_fitness[rep + 1].items())
        df_fitness         = pd.DataFrame(df_fitness.iloc[:, 1].tolist()).T
        df_fitness.columns = ["{}".format(i + 1) for i in range(max_generations)]
        
        all_fitness.append(df_fitness)

        if save == True:
            filename = "results/{}/population_fitness_evaluation{}.csv".format(n_test, rep + 1)
            where_to_save(filename)
            df_fitness.to_csv(filename, index = False)
 
    return all_fitness


def get_pop_size_report(global_pop_size, evaluation_rep, max_generations, n_test, save):
    
    all_pop_size = []

    for rep in range(evaluation_rep):
        df_pop_size         = pd.DataFrame(global_pop_size[rep + 1].items())
        df_pop_size         = pd.DataFrame(df_pop_size.iloc[:, 1].tolist()).T
        df_pop_size.columns = ["{}".format(i + 1) for i in range(max_generations)]
        
        all_pop_size.append(df_pop_size)

        if save == True:
            filename = "results/{}/population_size_evaluation{}.csv".format(n_test, rep + 1)
            where_to_save(filename)
            df_pop_size.to_csv(filename, index = False)
 
    return all_pop_size    

In [None]:
def plot_prediction(data, best_individuals, repetition, n_test, save):
    
    for i in range(1, repetition + 1):
        title = "REPETIÇÃO N {}".format(i)
        
        ## predição comparada com o original
        prediction = gp.get_prediction(best_individuals[i])
        results    = pd.DataFrame({'x': data['x'], 'pred': prediction, 'target': target})
        
        fig, axes = plt.subplots(nrows = 1, ncols = 2, figsize = (12, 4))
        results.plot(x = 'x', y = 'target', style = 'o', legend = False, ax = axes[0], title = 'Original')
        results.plot(x = 'x', y = 'pred', style = 'o', legend = False, ax = axes[1], title = 'Predição')
        
        fig.suptitle(title, fontsize = 20)
        
        if save == True:
            plt.tight_layout()
            
            filename = "results/{}/predicted_rep{}.pdf".format(n_test, i)
            plt.savefig(filename, dpi = 600)
        
        
        
def plot_mating_report(all_reports, max_generations, vstd, grid, n_test, save):
    
    aux_mean = pd.DataFrame()
    aux_std  = pd.DataFrame()

    for gen in range(max_generations):
        mean = pd.DataFrame([all_reports[i].iloc[gen, :] for i in range(len(all_reports))]).mean(axis = 0)
        std  = pd.DataFrame([all_reports[i].iloc[gen, :] for i in range(len(all_reports))]).std(axis = 0)

        aux_mean = pd.concat([aux_mean, mean], axis = 1, ignore_index = True)
        aux_std  = pd.concat([aux_std, std], axis = 1, ignore_index = True)
        
    
    #cols = aux_mean.T.columns
    cols = ["worse", "better", "repeated"]#, "equal"] 
    leg  = ["piores", "melhores", "repetidos"]#, "iguais"]
    
    if vstd == True:
        ax = aux_mean.T[cols].plot(figsize = (12, 5), yerr = aux_std.T)

    else:
        ax = aux_mean.T[cols].plot(figsize = (12, 5))


    #plt.xticks(np.arange(0, max_generations))
    #ax.set_xticklabels(range(1, max_generations + 1))

    ax.set_xlabel("Geração")
    ax.set_ylabel("Média")   
    
    ax.legend(leg)

    title = "EVOLUÇÃO DO CRUZAMENTO"
    ax.set_title(title, fontsize = 20)

    ax.grid(grid)
    
    if save == True:
        plt.tight_layout()
        
        filename = "results/{}/mating_report.pdf".format(n_test)
        plt.savefig(filename, dpi = 600)
        
        filename2 = "results/{}/mean_mating_report.csv".format(n_test)
        aux_mean.to_csv(filename2, index = False)
        
        filename3 = "results/{}/std_mating_report.csv".format(n_test)
        aux_std.to_csv(filename3, index = False)
    
    return aux_mean, aux_std


        
def plot_fitness_report(all_fitness, evaluation_rep, max_generations, vstd, grid, n_test, save):

    mean_best = pd.DataFrame([all_fitness[i].min() for i in range(evaluation_rep)]).mean(axis = 0)
    std_best  = pd.DataFrame([all_fitness[i].min() for i in range(evaluation_rep)]).std(axis = 0)

    mean_worst = pd.DataFrame([all_fitness[i].max() for i in range(evaluation_rep)]).mean(axis = 0)
    std_worst  = pd.DataFrame([all_fitness[i].max() for i in range(evaluation_rep)]).std(axis = 0)

    mean_mean = pd.DataFrame([all_fitness[i].mean() for i in range(evaluation_rep)]).mean(axis = 0)
    std_mean  = pd.DataFrame([all_fitness[i].mean() for i in range(evaluation_rep)]).std(axis = 0)

    cols = ["mean", "best", "worst"]
    
    mean_fitness = pd.concat([mean_mean, mean_best, mean_worst], axis = 1)
    mean_fitness.columns = cols

    std_fitness = pd.concat([std_mean, std_best, std_worst], axis = 1)
    std_fitness.columns = cols

    leg  = ["média", "melhor", "pior"]
    
    if vstd == True:
        ax = mean_fitness[cols].plot(logy = True, figsize = (12, 5), yerr = std_fitness)

    else:
        ax = mean_fitness[cols].plot(logy = True, figsize = (12, 5))

    #plt.xticks(np.arange(0, max_generations))
    #ax.set_xticklabels(range(1, max_generations + 1))

    ax.set_xlabel("Geração")
    ax.set_ylabel("Média")
    
    ax.legend(leg)

    title = "EVOLUÇÃO DO VALOR DA FITNESS"
    ax.set_title(title, fontsize = 20)

    ax.grid(grid)
    
    if save == True:
        plt.tight_layout()
        
        filename = "results/{}/fitness_report.pdf".format(n_test)
        plt.savefig(filename, dpi = 600)
        
        filename2 = "results/{}/mean_fitness_report.csv".format(n_test)
        mean_fitness.to_csv(filename2, index = False)
        
        filename3 = "results/{}/std_fitness_report.csv".format(n_test)
        std_fitness.to_csv(filename3, index = False)
    
    return mean_fitness, std_fitness


def plot_best_fitness(mean_fitness, max_generations, vstd, grid, n_test, save):
    
    if vstd == False:
        ax = mean_fitness["best"].plot(figsize = (12, 5), legend = False, color = "orange")

    else:
        ax = mean_fitness["best"].plot(figsize = (12, 5), yerr = vstd, legend = False, color = "orange")
        
    
    #plt.xticks(np.arange(0, max_generations))
    #ax.set_xticklabels(range(1, max_generations + 1))

    ax.set_xlabel("Geração")
    ax.set_ylabel("Média")

    title = "EVOLUÇÃO DO VALOR DA FITNESS DO MELHOR INDIVIDUO"
    ax.set_title(title, fontsize = 20)

    ax.grid(grid)
    
    if save == True:
        plt.tight_layout()
        
        filename = "results/{}/best_fitness_report.pdf".format(n_test)
        plt.savefig(filename, dpi = 600)
        
        
def plot_population_size(all_pop_size, max_generations, vstd, grid, n_test, save = True):
    
    mean_size = pd.DataFrame([all_pop_size[i].mean() for i in range(evaluation_rep)]).mean(axis = 0)
    std_size  = pd.DataFrame([all_pop_size[i].mean() for i in range(evaluation_rep)]).std(axis = 0)

    if vstd == False:
        ax = mean_size.plot(figsize = (12, 5))

    else:
        ax = mean_size.plot(figsize = (12, 5), yerr = vstd)

    ax.set_xlabel("Geração")
    ax.set_ylabel("Média")

    title = "EVOLUÇÃO DO TAMANHO MEDIO DA POPULAÇÃO"
    ax.set_title(title, fontsize = 20)
    
    ax.grid(grid)
    
    if save == True:
        plt.tight_layout()
        
        filename = "results/{}/pop_size_report.pdf".format(n_test)
        plt.savefig(filename, dpi = 600)
        
        filename2 = "results/{}/mean_pop_size.csv".format(n_test)
        mean_size.to_csv(filename2, index = False)
        
        filename3 = "results/{}/std_pop_size.csv".format(n_test)
        std_size.to_csv(filename3, index = False)

In [None]:
### define functions
def add(a, b):
    return a + b

def sub(a, b):
    return a - b

def mul(a, b):
    return a * b

def div(a, b):
    if b != 0:
        return a / b
    else:
        return 1
    
def neg(a):
    return - a

def sqrt(a):
    if a >= 0:
        return np.sqrt(a)
    else:
        return np.sqrt(np.abs(a))

def invt(a):
    if a != 0:
        return 1 / a
    else:
        return a

    
genotype = (                                                                                       
    {"function": add,  "children": 2, "phenotype": "({} + {})"},                               
    {"function": sub,  "children": 2, "phenotype": "({} - {})"},                               
    {"function": mul,  "children": 2, "phenotype": "({} * {})"},                               
    {"function": div,  "children": 2, "phenotype": "({} / {})"},                                   
    {"function": neg,  "children": 1, "phenotype": "-({})"},
    {"function": sqrt, "children": 1, "phenotype": "sqrt({})"},
    {"function": invt, "children": 1, "phenotype": "1/{}"}
)

In [None]:
path = 'datasets-TP1'

In [None]:
# bases de dados de um problema real
### read file
# file   = pd.read_csv(os.path.join(path, "concrete.txt"), header = None, sep = "\t")

# target       = file.iloc[:, -1] ## ultima coluna

# fdata        = file.iloc[:, :-1]
# randlist     = rnd.choices(np.linspace(0, 1000), k = len(file))
# data         = pd.concat([fdata, pd.Series(randlist)], axis = 1)
# data.columns = ["col{}".format(i) for i in range(1, len(data.columns) + 1)]

In [None]:
# bases geradas a partir de uma função conhecida
datasets = ['SR_div.txt', 'SR_circle.txt', 'SR_div_noise.txt', 'SR_ellipse_noise.txt']

# o separador das colunas não é igual, algumas é um espaço, outras são dois
# substitui direto no arquivo por vírgulas, mas a coluna da frente também tinha espaço...
# então a data[0] é NaN, o que é descartado

### read file
file         = pd.read_csv(os.path.join(path, datasets[0]), header = None, usecols = [1, 2])
file.columns = ['x', 'y']

target          = file['y']

randlist1       = rnd.choices(np.linspace(-1.0, 1.0), k = len(file))
data            = pd.DataFrame({'x': file['x'], 'R1': pd.Series(randlist1)})

In [None]:
### parameters

functions           = genotype
population_size     = 150
individual_size     = 7
fitness_method      = "mse"
fitness_penalty     = 0.5
tournament_size     = 2
crossover_prob      = 0.6
mutation_prob       = 0.3
mutation_nodes      = 5
elitism_size        = 1
max_generations     = 150
populate_method     = "ramped"

evaluation_rep  = 30

In [None]:
gp = GeneticProgramming(data, functions, target, population_size, individual_size, fitness_method, fitness_penalty,
                        tournament_size, crossover_prob, mutation_prob, mutation_nodes, elitism_size, 
                        max_generations, populate_method)

In [None]:
global_fitness  = {}
global_repeated = {}
global_best     = {}
global_children = {}
global_pop_size = {}

start = time.time()

for i in tqdm(range(evaluation_rep)):
    this_fitness, this_repeated, this_best, crossover_fitness, this_pop_size = gp.run_GP()
    
    global_fitness[i + 1]  = this_fitness
    global_repeated[i + 1] = this_repeated
    global_best[i + 1]     = this_best
    global_children[i + 1] = crossover_fitness
    global_pop_size[i + 1] = this_pop_size

    
end   = time.time()
total = end - start
print("\n")
#print("Tempo de execução desse teste: {:.03f} segundos".format(total))
print("Um tempo médio de {:.03f} segundos".format(total / evaluation_rep))

In [None]:
n_test = "018"
save   = False
vstd   = False
grid   = True

In [None]:
all_reports = get_mating_report(global_children, global_repeated, evaluation_rep, max_generations, n_test, save)
mean_values, std_values = plot_mating_report(all_reports, max_generations, vstd, grid, n_test, save)

In [None]:
all_fitness = get_fitness_report(global_fitness, evaluation_rep, max_generations, n_test, save)
mean_fitness, std_fitness = plot_fitness_report(all_fitness, evaluation_rep, max_generations, vstd, grid, n_test, save)

In [None]:
plot_best_fitness(mean_fitness, max_generations, vstd, grid, n_test, save)

In [None]:
all_pop_size = get_pop_size_report(global_pop_size, evaluation_rep, max_generations, n_test, save)
plot_population_size(all_pop_size, max_generations, vstd, grid, n_test, save)

In [None]:
if len(data.columns) == 2: ## plota somente para os primeiros problemas, que são 2D
    plot_prediction(data, global_best, evaluation_rep, n_test, save)

##### 