In [None]:
from niapy.algorithms.basic import (
    BatAlgorithm,
    FireflyAlgorithm,
    ParticleSwarmAlgorithm
)
from niapy.runner import Runner
from niapy.problems.ackley import Ackley
from niapy.problems.sphere import Sphere
from niapy.problems.rastrigin import Rastrigin
from sklearn.decomposition import PCA
import torch
from torch import nn
from PIL import Image
from matplotlib import pyplot as plt
import os
import pygad
import numpy as np
import pandas as pd
from tools.ml_tools import get_data_loaders, nn_test, nn_train, LSTM
from util.optimization_data import SingleRunData
from tools.optimization_tools import optimization_runner
from tools.meta_ga import MetaGA

from util.constants import (
    RNG_SEED,
    BATCH_SIZE,
    DATASET_PATH,
    EPOCHS,
    POP_SIZE,
    MAX_ITERS,
    NUM_RUNS,
    OPTIMIZATION_PROBLEM,
    GENE_SPACES,
    POP_DIVERSITY_METRICS,
    INDIV_DIVERSITY_METRICS,
    N_PCA_COMPONENTS,
    LSTM_NUM_LAYERS,
    LSTM_HIDDEN_DIM,
    LSTM_DROPOUT,
    VAL_SIZE,
    TEST_SIZE,
)

execute_training = True

### Optimization

In [None]:
ga_instance_0 = pygad.load("./archive/2024-06-10_06.55.36_WVCPSO_Trid/meta_ga_obj")
ga_instance_1 = pygad.load("./archive/2024-06-10_07.08.39_FA_Trid/meta_ga_obj")

In [None]:
ga_instance_0.plot_genes(solutions="best")
ga_instance_0.plot_genes(solutions="all")
ga_instance_0.plot_new_solution_rate()

print(ga_instance_0.best_solutions[-1])
print(ga_instance_0.best_solutions_fitness[-1])

In [None]:
ga_instance_1.plot_genes(solutions="best")
ga_instance_1.plot_genes(solutions="all")
ga_instance_1.plot_new_solution_rate()

print(ga_instance_1.best_solutions[-1])
print(ga_instance_1.best_solutions_fitness[-1])

In [None]:
algorithms = MetaGA.solution_to_algorithm_attributes(np.concatenate([ga_instance_0.best_solutions[-1], ga_instance_1.best_solutions[-1]]), GENE_SPACES, POP_SIZE)

In [None]:

test_problem = Ackley(dimension=20)
test_algorithms = [
    FireflyAlgorithm(population_size=POP_SIZE, alpha=1.0, beta0=1.0, gamma=0.0, theta=0.99),
    ParticleSwarmAlgorithm(population_size=POP_SIZE, c1=0.5, c2=0.5, w=0.75, min_velocity=-np.inf, max_velocity=np.inf),
]

problem = OPTIMIZATION_PROBLEM

for algorithm in algorithms:
    optimization_runner(
        algorithm=algorithm,
        problem=problem,
        runs=1,
        dataset_path=DATASET_PATH,
        pop_diversity_metrics=POP_DIVERSITY_METRICS,
        indiv_diversity_metrics=INDIV_DIVERSITY_METRICS,
        max_iters=MAX_ITERS,
        rng_seed=RNG_SEED,
        keep_pop_data=True,
        parallel_processing=False,
    )

### Population diversity metrics comparison

In [None]:
for algorithm in os.listdir(DATASET_PATH):
    for problem in os.listdir(os.path.join(DATASET_PATH, algorithm)):
        runs = os.listdir(os.path.join(DATASET_PATH, algorithm, problem))
        runs.sort()
        run_path = os.path.join(DATASET_PATH, algorithm, problem, runs[0])
        srd = SingleRunData.import_from_json(run_path)
        pop_metrics = SingleRunData.import_from_json(run_path).get_pop_diversity_metrics_values(normalize=True)
        ax = pop_metrics.plot(title=" ".join([algorithm, problem]), figsize=(20,5), fontsize=13, logy=True)
        ax.set_xlabel(xlabel="Iterations", fontdict={'fontsize':13})
        ax.set_ylabel(ylabel="Value", fontdict={'fontsize':13})

In [None]:
algorithms = ['FA', 'WVCPSO']
line_styles = ['-g', ':g', '--g', '-.g', '-b', ':b', '--b', '-.b']
_line_styles = ['-g', '-b', '-r', '-k', ':g', ':b', ':r', ':k']
style = {}
style_idx = 0
for algorithm in os.listdir(DATASET_PATH):
    if algorithm not in algorithms:
        continue
    for idx, metric in enumerate(POP_DIVERSITY_METRICS):
        if idx > 3:
            continue
        style['_'.join([algorithm, metric.value])] = line_styles[style_idx]
        style_idx += 1

metrics_by_problem = {}
for algorithm in os.listdir(DATASET_PATH):
    if algorithm not in algorithms:
        continue
    for problem in os.listdir(os.path.join(DATASET_PATH, algorithm)):
        runs = os.listdir(os.path.join(DATASET_PATH, algorithm, problem))
        runs.sort()
        run_path = os.path.join(DATASET_PATH, algorithm, problem, runs[0])
        run = SingleRunData.import_from_json(run_path)
        pop_metrics = run.get_pop_diversity_metrics_values(normalize=False)
        for metric in POP_DIVERSITY_METRICS:
            key = '_'.join([algorithm, metric.value])
            if problem in metrics_by_problem:
                metrics_by_problem[problem][key] = pop_metrics.get(metric.value).to_list()
            else:
                metric_values = {key: pop_metrics.get(metric.value).to_list()}
                metrics_by_problem[problem] = metric_values
        

for problem in metrics_by_problem:
    metrics = metrics_by_problem[problem]
    df_metrics = pd.DataFrame.from_dict(metrics)
    df_metrics.plot(style=style, figsize=(25, 7), logy=True, title=problem, xlabel="Iterations", fontsize=13)

### Best fitness value convergence comparison

In [None]:
convergences = {}
max_len = 0
for algorithm in os.listdir(DATASET_PATH):
    for problem in os.listdir(os.path.join(DATASET_PATH, algorithm)):
        runs = os.listdir(os.path.join(DATASET_PATH, algorithm, problem))
        runs.sort()
        run_path = os.path.join(DATASET_PATH, algorithm, problem, runs[0])
        run = SingleRunData.import_from_json(run_path)
        print(f"best fitness {algorithm} - {problem}: {run.best_fitness}")
        #print(f"best solution {algorithm} - {problem}: {run.best_solution}")
        convergence = run.get_best_fitness_values(normalize=False)
        if len(convergence) > max_len:
            max_len = len(convergence)
        
        if problem in convergences:
            convergences[problem][algorithm] = convergence
        else:
            convergence_dict = {algorithm: convergence}
            convergences[problem] = convergence_dict

for problem in convergences:
    convergence_dict = convergences[problem]
    for key in convergence_dict:
        convergence = convergence_dict[key]
        convergence = np.append(convergence, [convergence[-1]] * (max_len - len(convergence)))
        convergence_dict[key] = convergence

    convergence_dict = pd.DataFrame.from_dict(convergence_dict)
    convergence_dict.plot(title=problem, figsize=(25, 7), logy=True)

### Individual diversity metrics comparison

In [None]:
metrics_by_problem = {}
for algorithm in os.listdir(DATASET_PATH):
    if algorithm not in algorithms:
        continue
    for problem in os.listdir(os.path.join(DATASET_PATH, algorithm)):
        runs = os.listdir(os.path.join(DATASET_PATH, algorithm, problem))
        runs.sort()
        run_path = os.path.join(DATASET_PATH, algorithm, problem, runs[0])
        run = SingleRunData.import_from_json(run_path)
        indiv_metrics = run.get_indiv_diversity_metrics_values(normalize=False)
        for metric in INDIV_DIVERSITY_METRICS:
            key = '_'.join([algorithm, metric.value])
            if problem in metrics_by_problem:
                metrics_by_problem[problem][key] = indiv_metrics.get(metric.value).to_list()
            else:
                metric_values = {key: indiv_metrics.get(metric.value).to_list()}
                metrics_by_problem[problem] = metric_values
        

for problem in metrics_by_problem:
    metrics = metrics_by_problem[problem]
    df_metrics = pd.DataFrame.from_dict(metrics)

    fig, axes = plt.subplots(1, len(INDIV_DIVERSITY_METRICS))
    fig.suptitle(problem, fontsize=23)

    for idx, metric in enumerate(INDIV_DIVERSITY_METRICS):
        df_selected_metric = df_metrics.filter(regex=metric.value)
        df_selected_metric.columns = df_selected_metric.columns.str.replace('_'+metric.value, '')
        df_selected_metric.plot(ax=axes[idx], kind="box", figsize=(25, 6), logy=False, title=metric.name, fontsize=13)

In [None]:
for algorithm in os.listdir(DATASET_PATH):
    for problem in os.listdir(os.path.join(DATASET_PATH, algorithm)):
        runs = os.listdir(os.path.join(DATASET_PATH, algorithm, problem))
        runs.sort()
        run_path = os.path.join(DATASET_PATH, algorithm, problem, runs[0])
        srd = SingleRunData.import_from_json(run_path)
        indiv_metrics = SingleRunData.import_from_json(run_path).get_indiv_diversity_metrics_values(normalize=True)
        indiv_metrics.plot(title=" ".join([algorithm, problem]), figsize=(25, 7), kind="bar", logy=True)

        indiv_metrics = indiv_metrics.to_numpy()
        pca = PCA(n_components=N_PCA_COMPONENTS)
        principal_components = pca.fit_transform(indiv_metrics)
        variance = pca.explained_variance_ratio_

### Diversity metrics euclidean distance

In [None]:
srd = []
for algorithm in os.listdir(DATASET_PATH):
    for problem in os.listdir(os.path.join(DATASET_PATH, algorithm)):
        runs = os.listdir(os.path.join(DATASET_PATH, algorithm, problem))
        runs.sort()
        run_path = os.path.join(DATASET_PATH, algorithm, problem, runs[0])
        srd.append(SingleRunData.import_from_json(run_path))

print(srd[0].diversity_metrics_euclidean_distance(srd[1]))
print(srd[0].diversity_metrics_euclidean_distance(srd[1], include_fitness_convergence=True))

### LSTM training and test

In [None]:
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
print(device)
print("CPUs: ", os.cpu_count())

In [None]:
train_data_loader, val_data_loader, test_data_loader, labels = get_data_loaders(
    dataset_path=DATASET_PATH,
    batch_size=BATCH_SIZE,
    val_size=VAL_SIZE,
    test_size=TEST_SIZE,
    n_pca_components=N_PCA_COMPONENTS,
    problems=[OPTIMIZATION_PROBLEM.name()],
    random_state=RNG_SEED,
)

pop_features, indiv_features, target = next(iter(train_data_loader))
model = LSTM(
    input_dim=np.shape(pop_features)[2],
    aux_input_dim=np.shape(indiv_features)[1],
    num_labels=len(labels),
    hidden_dim=LSTM_HIDDEN_DIM,
    num_layers=LSTM_NUM_LAYERS,
    dropout=LSTM_DROPOUT
)

optimizer = torch.optim.Adam(model.parameters(), lr=1e-5)
loss_fn = nn.CrossEntropyLoss()
model_filename = f"./lstm_model.pt"

if execute_training:
    model.to(device)
    nn_train(
        model=model,
        train_data_loader=train_data_loader,
        val_data_loader=val_data_loader,
        epochs=EPOCHS,
        loss_fn=loss_fn,
        optimizer=optimizer,
        device=device,
        model_filename=model_filename,
        verbal=True)
else:
    model = torch.load(model_filename, map_location=torch.device(device))
    model.to(device)
    if os.path.exists('loss_plot.png'):
        loss_plot = np.asarray(Image.open('loss_plot.png'))
        plt.axis("off")
        plt.imshow(loss_plot)

In [None]:
nn_test(model, test_data_loader, device, labels=labels, show_classification_report=True)