In [1]:
MIN_LENGTH = 0
MAX_LENGTH = 8
MIN_NEURONS = 2
MAX_NEURONS = 8
RANDOM_STATE = 42
DATA_BASE_DIR = "../data/"

In [2]:
import os
import dataclasses as dc
import random
import warnings
from functools import cached_property, partial

import pandas as pd
import spacy
import numpy as np
from deap import algorithms, base, creator, tools
from scipy.stats import randint
from sklearn.metrics import classification_report
from sklearn.ensemble import RandomForestClassifier
from tqdm.contrib.concurrent import process_map

ModuleNotFoundError: No module named 'deap'

In [None]:
def float_field(start: float, end: float):
    def factory():
        return start + random.random() * (end - start)

    return dc.field(default_factory=factory)


def list_field() -> list[int]:
    def factory() -> list[int]:
        return [
            random.randint(MIN_NEURONS, MAX_NEURONS)
            for _ in range(random.randint(MIN_LENGTH, MAX_LENGTH))
        ]

    return dc.field(default_factory=factory)


def int_field(start: float, end: float):
    return dc.field(default_factory=partial(random.randint, start, end))


def str_field(*options: str) -> str:
    return dc.field(default_factory=partial(random.choice, options))


@dc.dataclass(slots=True)
class Individual:
    n_estimators: int = randint(10, 200)
    max_depth: int = randint(1, 20)
    min_samples_split: int = randint(2, 20)
    min_samples_leaf: int = randint(1, 20)
    bootstrap: bool = [True, False]
    criterion: str = ["gini", "entropy"]

    @cached_property
    def model(self):
        return RandomForestClassifier(
            n_estimators=randint(10, 200),
            max_depth=randint(1, 20),
            min_samples_split=randint(2, 20),
            min_samples_leaf=randint(1, 20),
            bootstrap=[True, False],
            criterion=["gini", "entropy"],
        )

    def mutate(self, indpb: float = 0.2):
        "Take one field and re-generate i'ts value"
        fields = dc.fields(self)
        if random.random() < indpb:
            field = random.choice(fields)
            setattr(self, field.name, field.default_factory())
        return self

    def mate_onepoint(self, other):
        field_names = [field.name for field in dc.fields(self)]
        point = random.randint(0, len(field_names))
        child1 = type(self)(
            **{
                attr: getattr(self if i < point else other, attr)
                for i, attr in enumerate(field_names)
            }
        )
        child2 = type(self)(
            **{
                attr: getattr(other if i < point else self, attr)
                for i, attr in enumerate(field_names)
            }
        )
        return child1, child2

    def mate_oneattr(self, other):
        field_names = [field.name for field in dc.fields(self)]
        point = random.randint(0, len(field_names))
        child1 = type(self)(
            **{
                attr: getattr(self if i == point else other, attr)
                for i, attr in enumerate(field_names)
            }
        )
        child2 = type(self)(
            **{
                attr: getattr(other if i == point else self, attr)
                for i, attr in enumerate(field_names)
            }
        )
        return child1, child2

    def evaluate(self, x_train, x_test, y_train, y_test):
        with warnings.catch_warnings(action="ignore"):
            ypred = self.model.fit(x_train, y_train).predict(x_test)
        report = classification_report(y_test, ypred, output_dict=True)
        return report

In [None]:
creator.create("FitnessMin", base.Fitness, weights=(-1.0, -1.0))
creator.create("Individual", Individual, fitness=creator.FitnessMin)

In [None]:
def run(ngen: int, population: int, halloffame: int = 5):
    print("Carregando Dataset")

    nlp = spacy.load("en_core_web_lg")
    X_train = pd.read_csv(os.path.join(DATA_BASE_DIR, "X_train.csv"))
    X_test = pd.read_csv(os.path.join(DATA_BASE_DIR, "X_test.csv"))
    y_train = pd.read_csv(os.path.join(DATA_BASE_DIR, "y_train.csv"))
    y_test = pd.read_csv(os.path.join(DATA_BASE_DIR, "y_test.csv"))

    X_train_encoded = np.array(
        [nlp(x["input"]).vector for _, x in X_train.iterrows()], dtype=np.float32
    )
    X_test_encoded = np.array(
        [nlp(x["input"]).vector for _, x in X_test.iterrows()], dtype=np.float32
    )

    evaluate = partial(
        Individual.evaluate,
        x_train=X_train_encoded,
        x_test=X_test_encoded,
        y_train=y_train,
        y_test=y_test,
    )

    print("Construindo Toolbox")
    toolbox = base.Toolbox()
    toolbox.register("individual", creator.Individual)
    toolbox.register("population", tools.initRepeat, list, toolbox.individual)
    toolbox.register("mate", tools.cxMessyOnePoint)
    toolbox.register(
        "mutate", tools.mutUniformInt, low=MIN_NEURONS, up=MAX_NEURONS, indpb=0.2
    )
    toolbox.register("select", tools.selTournament, tournsize=5)
    toolbox.register("evaluate", evaluate)
    toolbox.register("map", process_map, max_workers=6, chunksize=8, ncols=80)

    pop = toolbox.population(n=population)
    hof = tools.HallOfFame(halloffame)
    stats = tools.Statistics()
    stats.register("min", lambda pop: min(ind.fitness.values for ind in pop))

    print("Iniciando Algoritmo")
    pop, log = algorithms.eaSimple(
        pop, toolbox, 0.2, 0.2, ngen=ngen, halloffame=hof, stats=stats
    )
    print(log)
    for ind in hof:
        report = ind.fitness.values
        print("Error: %s, Size: %s" % (report["accuracy"]))
    print(
        "Best model parameters: %s - accuracy: %f" % (hof[0], hof[0].fitness.values[0])
    )

In [None]:
run(ngen=48, population=24)