<a href="https://colab.research.google.com/github/GuysBarash/Genetic-programing-with-DEAP/blob/master/evolutionarty_GP.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [85]:
!apt install libgraphviz-dev
!pip install pygraphviz
import pygraphviz as pgv

!pip install DEAP

Reading package lists... Done
Building dependency tree       
Reading state information... Done
libgraphviz-dev is already the newest version (2.40.1-2).
The following package was automatically installed and is no longer required:
  libnvidia-common-440
Use 'apt autoremove' to remove it.
0 upgraded, 0 newly installed, 0 to remove and 43 not upgraded.


In [0]:
import random
import operator
import os
import numpy as np
import pandas as pd

from deap import algorithms
from deap import base
from deap import creator
from deap import tools
from deap import gp
from datetime import datetime
from deap.tools import History

from IPython.display import display
import matplotlib.pyplot as plt

from sklearn.metrics import accuracy_score, recall_score, precision_score, fbeta_score
from sklearn import preprocessing

# Generate Data

In [0]:
import sys
colab_mode = 'google.colab' in sys.modules

Fetch data

In [0]:
if not colab_mode:
  train_set = r"C:\school\evolutionary\ex2\train.csv"
  vld_set = r"C:\school\evolutionary\ex2\validate.csv"
  print("loading trains set.")
  rawdatadf = pd.read_csv(train_set, header=None)
  print("loading VLD set.")
  rawvlddf = pd.read_csv(vld_set, header=None)
else:
  from google.colab import drive

  drive.mount('/content/drive')
  base_folder = r'/content/drive/My Drive/colab_storage'
  train_set = os.path.join(base_folder ,'train.csv')
  vld_set = os.path.join(base_folder ,'validate.csv')
  print("loading trains set.")
  rawdatadf = pd.read_csv(train_set, header=None)
  print("loading VLD set.")
  rawvlddf = pd.read_csv(vld_set, header=None)



Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
loading trains set.


Normalize data

In [0]:
def get_last_n_tuples_from_cols(n, cols):
    return cols[-4 * n:]


datacols = rawdatadf.columns[1:]
datacols = get_last_n_tuples_from_cols(7, datacols)
datadf = pd.DataFrame(columns=datacols,
                      index=range(rawdatadf.shape[0]),
                      data=rawdatadf[datacols])
vld_df = pd.DataFrame(columns=datacols,
                      index=range(rawvlddf.shape[0]),
                      data=rawvlddf[datacols])

mean_of_df = datadf.mean(axis=0)
std_of_df = datadf.std(axis=0)
datadf -= mean_of_df
vld_df -= mean_of_df
datadf /= std_of_df
vld_df /= std_of_df

normal = pd.Series(index=datadf.columns,
                   data=np.fmax(-datadf.min(axis=0), datadf.max(axis=0)))
datadf /= normal
vld_df /= normal

labels = rawdatadf[rawdatadf.columns[0]]
vld_labels = rawvlddf[rawvlddf.columns[0]]
labelcol = 'LABEL'
datadf[labelcol] = labels
vld_df[labelcol] = vld_labels

In [0]:
top_n = 14
print("TRAIN")
display(datadf.head(14))
print("")
print(f"TRAIN: Displaying top {top_n} out of {len(datadf)}")
print("\n\n")
print("VLD")
display(vld_df.head(14))
print("")
print(f"TRAIN: Displaying top {top_n} out of {len(vld_df)}")



Check with SVM as baseline

In [0]:
# Solve with SVM
from sklearn import svm

# print("Calculating SVM")
# clf = svm.LinearSVC()
# clf.fit(datadf[datacols], datadf[labelcol])
# predictions = clf.predict(datadf[datacols])
# true_results = datadf[labelcol]
# scoring_sr = pd.DataFrame(dtype=np.float, columns=['value'])
# scoring_sr.loc['Accuracy', 'value'] = accuracy_score(true_results, predictions)
# scoring_sr.loc['Recall', 'value'] = recall_score(true_results, predictions)
# scoring_sr.loc['precision', 'value'] = precision_score(true_results, predictions)
# scoring_sr.loc['F0.25', 'value'] = fbeta_score(true_results, predictions, beta=0.25)

# display(scoring_sr)
# print("DONE WITH SVM")

# Defining evolution parameters

register operands

In [0]:
def activation(a):
    return np.tanh(a)


def neg(a):
    return -a


def double(a):
    return 2.0 * a


def half(a):
    return 0.5 * a

In [0]:
pset = gp.PrimitiveSet("MAIN", 5, "IN")
pset.addPrimitive(operator.add, 2)
pset.addPrimitive(operator.sub, 2)
pset.addPrimitive(operator.mul, 2)
pset.addPrimitive(np.fmax, 2)
pset.addPrimitive(np.square, 1)
pset.addPrimitive(neg, 1)
pset.addPrimitive(double, 1)
pset.addPrimitive(half, 1)
pset.addPrimitive(activation, 1)
pset.addTerminal(np.float64(1.0))
pset.addTerminal(np.float64(0.25))
pset.addTerminal(np.float64(0.5))
pset.addTerminal(np.float64(2.0))
pset.addTerminal(np.float64(3.0))

Support functions

In [0]:
def individual_to_function(individual):
    return toolbox.compile(expr=individual)


def apply_function_to_df(func, dataset):
    residual = np.zeros(dataset.shape[0])
    for pos in np.arange(0, dataset.shape[1], step=4):
        if pos + 3 >= dataset.shape[1]:
          break
        col1, col2, col3, col4 = datacols[pos], datacols[pos + 1], datacols[pos + 2], datacols[pos + 3]
        residual = func(residual, dataset[col1], dataset[col2], dataset[col3], dataset[col4])
    if np.isscalar(residual):
        residual = np.full(dataset.shape[0], residual)

    results = (residual > 0.0).astype(int)
    return results


def fitness_function(individual):
    global evaluation_counter
    evaluation_counter += 1

    dataset = datadf.sample(n=10000)
    dataset_X = dataset[datacols]
    dataset_y = dataset[labelcol]

    func = individual_to_function(individual)
    results = apply_function_to_df(func, dataset_X)

    # Fitness is accuracy
    # fitness_score = accuracy_score(dataset_y, results)
    # Fitness is F0.25 score
    fitness_score = fbeta_score(dataset_y, results, beta=0.25)
    return fitness_score,

In [0]:
calculations_time = datetime.now()


def calc_time(*args):
    global calculations_time
    now = datetime.now()
    ret = now - calculations_time
    calculations_time = now
    return ret

Register hyper parameters

In [0]:
# random.seed(10)
population_size = 50
number_of_generations = 1000

In [0]:
history = History()

creator.create("FitnessMax", base.Fitness, weights=(1.0,))
creator.create("Individual", gp.PrimitiveTree, fitness=creator.FitnessMax)

toolbox = base.Toolbox()
toolbox.register("expr", gp.genFull, pset=pset, min_=2, max_=8)
toolbox.register("individual", tools.initIterate, creator.Individual, toolbox.expr)
toolbox.register("population", tools.initRepeat, list, toolbox.individual)
toolbox.register("compile", gp.compile, pset=pset)
toolbox.register("evaluate", fitness_function)
toolbox.register("select", tools.selTournament, tournsize=10)
toolbox.register("mate", gp.cxOnePoint)
toolbox.register("expr_mut", gp.genGrow, min_=0, max_=8)
toolbox.register("mutate", gp.mutUniform, expr=toolbox.expr_mut, pset=pset)

# Bloat control
toolbox.decorate("mate", gp.staticLimit(key=operator.attrgetter("height"), max_value=10))
toolbox.decorate("mutate", gp.staticLimit(key=operator.attrgetter("height"), max_value=10))

pop = toolbox.population(n=population_size)
hof = tools.HallOfFame(1)
stats = tools.Statistics(lambda ind: ind.fitness.values)
stats.register("avg", np.mean)
stats.register("std", np.std)
stats.register("min", np.min)
stats.register("max", np.max)
stats.register("TIME", calc_time)

# Decorate the variation operators
toolbox.decorate("mate", history.decorator)
toolbox.decorate("mutate", history.decorator)
history.update(pop)

# Evolve

In [0]:
print("Starting evolution.")
evaluation_counter = 0
start_time = datetime.now()
calculations_time = datetime.now()
_, logbook = algorithms.eaSimple(pop, toolbox, cxpb=0.8, mutpb=0.15, ngen=number_of_generations, stats=stats,
                                 halloffame=hof)


# Results

In [0]:
record = stats.compile(pop)
duration = datetime.now() - start_time
print("Run concluded.")
print("Evaluations commited: {}".format(evaluation_counter))
print("Run time: {}".format(duration))
winner_creature = hof.items[0]
winner_function = individual_to_function(winner_creature)

In [0]:
nodes, edges, labels = gp.graph(winner_creature)

In [0]:
import matplotlib.pyplot as plt
import networkx as nx
from networkx.drawing.nx_agraph import graphviz_layout
plt.rcParams["figure.figsize"] = (50, 40)

g = nx.Graph()
g.add_nodes_from(nodes)
g.add_edges_from(edges)
pos = graphviz_layout(g, prog="dot")


nx.draw_networkx_nodes(g, pos, node_size=1600)
nx.draw_networkx_edges(g, pos)
nx.draw_networkx_labels(g, pos, labels, font_size=25)
plt.show()

analyze and display

In [0]:
generations_idx = logbook.select('gen')
generations_avg = logbook.select('avg')
generations_std = logbook.select('std')
generations_max = logbook.select('max')
generations_min = logbook.select('min')

plt.figure(figsize=(20, 10))
plt.errorbar(generations_idx, generations_avg, yerr=generations_std, fmt='-o', label='AVG')
# plt.plot(generations_idx,generations_avg, '-o',label='AVG')
plt.plot(generations_max, '-o', label='BEST', color='black')
plt.plot(generations_min, '-o', label='WORST', color='red')
plt.grid()
plt.legend()
plt.xlabel('Generations')
plt.ylabel('hits')
plt.ylim(0, 1)
_ = plt.plot()

Evaluate winner on entire train-set

In [0]:
predictions = apply_function_to_df(winner_function, datadf[datacols])
true_results = datadf[labelcol]

scoring_sr = pd.DataFrame(dtype=np.float, columns=['value'])
scoring_sr.loc['Accuracy', 'value'] = accuracy_score(true_results, predictions)
scoring_sr.loc['Recall', 'value'] = recall_score(true_results, predictions)
scoring_sr.loc['precision', 'value'] = precision_score(true_results, predictions)
scoring_sr.loc['F0.25', 'value'] = fbeta_score(true_results, predictions, beta=0.25)

print("Labels balance:")
print("0: {:>.3f}".format(1 - predictions.mean()))
print("1: {:>.3f}".format(predictions.mean()))
display(scoring_sr)

Evaluate winner on VLD group

In [0]:
predictions = apply_function_to_df(winner_function, vld_df[datacols])
true_results = vld_df[labelcol]

scoring_sr = pd.DataFrame(dtype=np.float, columns=['value'])
scoring_sr.loc['Accuracy', 'value'] = accuracy_score(true_results, predictions)
scoring_sr.loc['Recall', 'value'] = recall_score(true_results, predictions)
scoring_sr.loc['precision', 'value'] = precision_score(true_results, predictions)
scoring_sr.loc['F0.25', 'value'] = fbeta_score(true_results, predictions, beta=0.25)

print("Labels balance:")
print("0: {:>.3f}".format(1 - predictions.mean()))
print("1: {:>.3f}".format(predictions.mean()))

display(scoring_sr)