In [None]:
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from experiments.datasets import DATASETS
import numpy as np
import plotly.express as px

scaler = MinMaxScaler()

DATASET_NAMES = ["truck_driving_data", "atmosphere_data"]

DATASET_NAME_TO_DISPLAY_NAME = {
    "truck_driving_data": "Truck driving",
    "atmosphere_data": "Atmosphere",
}

first_dataset = [dataset for dataset in DATASETS if dataset.name == DATASET_NAMES[0]][0]
second_dataset = [dataset for dataset in DATASETS if dataset.name == DATASET_NAMES[1]][
    0
]

first_dataset_scaled_data = scaler.fit_transform(
    first_dataset.data.reshape(-1, 1)
).reshape(-1)
second_dataset_scaled_data = scaler.fit_transform(
    second_dataset.data.reshape(-1, 1)
).reshape(-1)

data = pd.DataFrame(
    {
        "values": np.concatenate(
            [first_dataset_scaled_data, second_dataset_scaled_data]
        ),
        "Dataset": [DATASET_NAME_TO_DISPLAY_NAME.get(first_dataset.name)]
        * len(first_dataset.data)
        + [DATASET_NAME_TO_DISPLAY_NAME.get(second_dataset.name)]
        * len(second_dataset.data),
    }
)

fig = px.histogram(
    data,
    x="values",
    color="Dataset",
    barmode="overlay",
    nbins=100,
    histnorm="probability",
    width=1000,
    height=600,
)
fig.show()

In [None]:
import pandas as pd
from pflacco.classical_ela_features import *
from pflacco.sampling import create_initial_sample
from umap import UMAP
from distribution_optimization_py.problem import (
    ScaledGaussianMixtureProblemForELA,
    LinearlyScaledGaussianMixtureProblem,
)
import random
import os
from sklearn.pipeline import make_pipeline
from sklearn.feature_selection import SelectKBest, f_classif
from sklearn.metrics import confusion_matrix
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import MinMaxScaler



SEED = 42
SAMPLE_COEFFICIENT = 250


def get_ela_features(problem):
    random.seed(SEED)
    np.random.seed(SEED)
    dim = len(problem.lower)
    X = create_initial_sample(
        dim,
        lower_bound=-5,
        upper_bound=5,
        sample_coefficient=SAMPLE_COEFFICIENT,
        seed=SEED,
    )
    y = X.apply(lambda x: problem(x), axis=1)
    ela_meta = calculate_ela_meta(X, y)
    ela_distr = calculate_ela_distribution(X, y)
    ela_pca = calculate_pca(X, y)
    ela_level = calculate_ela_level(X, y)
    nbc = calculate_nbc(X, y)
    disp = calculate_dispersion(X, y)
    ic = calculate_information_content(X, y, seed=SEED)

    return pd.DataFrame(
        {
            **ic,
            **ela_meta,
            **ela_distr,
            **nbc,
            **disp,
            **ela_pca,
            **ela_level,
            **{"dim": dim},
            **{"seed": SEED},
        },
        index=[0],
    )


def calculate_gaussian_ela_features(calculate_linearly_scaled: bool = False) -> None:
    ela_features_dfs = []

    for dataset in DATASETS:
        scaled_problem = ScaledGaussianMixtureProblemForELA(
            dataset.data, dataset.nr_of_modes
        )
        ela_features_df = get_ela_features(scaled_problem)
        ela_features_df["fid"] = f"{dataset.name} *"
        ela_features_df["iid"] = 1
        ela_features_dfs.append(ela_features_df)
        if calculate_linearly_scaled:
            problem = LinearlyScaledGaussianMixtureProblem(
                dataset.data, dataset.nr_of_modes, lower=-5, upper=5
            )
            ela_features_df = get_ela_features(problem)
            ela_features_df["fid"] = dataset.name
            ela_features_df["iid"] = 1
            ela_features_dfs.append(ela_features_df)
    return pd.concat(ela_features_dfs)


if os.listdir().count("gaussian_ela_features.csv") == 0:
    gaussian_features_df = calculate_gaussian_ela_features()
    gaussian_features_df.to_csv("gaussian_ela_features.csv", index=False)
else:
    gaussian_features_df = pd.read_csv("gaussian_ela_features.csv")

In [None]:
assert os.listdir().count("ela_features_improved.csv") != 0, "Please run `save_ela_features.py` first."

bbob_features_df = pd.read_csv("ela_features_improved.csv", index_col=0)

In [None]:
PROBLEM_CLASSES = (
    ["separable"] * 5
    + ["low-conditioning"] * 4
    + ["unimodal"] * 5
    + ["multimodal-adequate"] * 5
    + ["multimodal-weak"] * 5
)


def get_problem_class(fid: str | int) -> str:
    if isinstance(fid, int):
        return PROBLEM_CLASSES[fid - 1]
    return "distribution-optimization"


all_features_df = pd.concat([bbob_features_df, gaussian_features_df])
all_features_df["class"] = [get_problem_class(fid) for fid in all_features_df["fid"]]

UMAP visualization:

In [None]:
FEATURES = [
    "disp.ratio_mean_02",
    "ela_distr.skewness",
    "ela_meta.lin_simple.adj_r2",
    "ela_meta.lin_simple.intercept",
    "ela_meta.lin_simple.coef.max",
    "ela_meta.quad_simple.adj_r2",
    "ic.eps_ratio",
    "ic.eps_s",
    "nbc.nb_fitness.cor",
    "pca.expl_var_PC1.cov_init",
]

METADATA_COLUMNS = ["dim", "iid", "fid", "seed", "class"]

In [None]:
scaler = MinMaxScaler()

dim_8_index = all_features_df["dim"] == 8
dim_14_index = all_features_df["dim"] == 14

dim_8_features = all_features_df[dim_8_index].drop(columns=METADATA_COLUMNS)
dim_14_features = all_features_df[dim_14_index].drop(columns=METADATA_COLUMNS)
dim_8_metadata = all_features_df[dim_8_index][METADATA_COLUMNS]
dim_14_metadata = all_features_df[dim_14_index][METADATA_COLUMNS]

scaled_dim_8_features = pd.DataFrame(
    scaler.fit_transform(dim_8_features), columns=dim_8_features.columns
)
scaled_dim_14_features = pd.DataFrame(
    scaler.fit_transform(dim_14_features), columns=dim_14_features.columns
)

scaled_dim_8_features = pd.concat(
    [dim_8_metadata.reset_index(drop=True), scaled_dim_8_features], axis=1
)
scaled_dim_14_features = pd.concat(
    [dim_14_metadata.reset_index(drop=True), scaled_dim_14_features], axis=1
)

scaled_all_features = pd.concat([scaled_dim_8_features, scaled_dim_14_features])

In [None]:
scaler = MinMaxScaler()
bbob_without_metadata_features_df = bbob_features_df.drop(
    columns=METADATA_COLUMNS, errors="ignore"
)
scaler.fit(bbob_without_metadata_features_df)
scaled_all_features_df = pd.DataFrame(
    scaler.transform(all_features_df[bbob_without_metadata_features_df.columns]),
    columns=bbob_without_metadata_features_df.columns,
)
umap = UMAP(n_components=2, random_state=42)
umap_all = umap.fit_transform(scaled_all_features_df[FEATURES])
umap_df = pd.DataFrame(umap_all, columns=["UMAP-1", "UMAP-2"])
umap_df["class"] = all_features_df["class"].values
color_map = {
    "distribution-optimization": "red",
    "low-conditioning": "green",
    "separable": "pink",
    "unimodal": "orange",
    "multimodal-adequate": "blue",
    "multimodal-weak": "purple",
}

fig = px.scatter(
    umap_df,
    x="UMAP-1",
    y="UMAP-2",
    color="class",
    # title="UMAP Projection of ELA Features with Problem Class Labels",
    width=1200,  # width in pixels
    height=1200,  # height in pixels
    color_discrete_map=color_map,
)

fig.show()

Test to check if the model is correct:

In [None]:
from sklearn.model_selection import cross_val_score

X = bbob_features_df[FEATURES]
y = [get_problem_class(fid) for fid in bbob_features_df["fid"]]

assert len(X) == len(y)
assert "class" not in X.columns

pipeline = make_pipeline(
    MinMaxScaler(),
    RandomForestClassifier(random_state=42),
)

cross_val_score(pipeline, X, y, cv=10, scoring="accuracy").mean()

Predicting problem class:

In [None]:
scaled_all_features
train_index = scaled_all_features["class"] != "distribution-optimization"
test_index = scaled_all_features["class"] == "distribution-optimization"

X_train = scaled_all_features[train_index][FEATURES]
y_train = scaled_all_features[train_index]["class"]
X_test = scaled_all_features[test_index][FEATURES]

assert len(X_train) == len(y_train)
assert "class" not in X_train.columns
assert "class" not in X_test.columns

classifier = RandomForestClassifier(random_state=42)
classifier.fit(X_train, y_train)
classifier.predict(X_test)

In [None]:
classifier.predict_proba(X_test)

In [None]:
classifier.classes_

Predicting fid:

In [None]:
scaled_all_features
train_index = scaled_all_features["class"] != "distribution-optimization"
test_index = scaled_all_features["class"] == "distribution-optimization"

X_train = scaled_all_features[train_index][FEATURES]
y_train = scaled_all_features[train_index]["fid"].values.astype(int)
X_test = scaled_all_features[test_index][FEATURES]

assert len(X_train) == len(y_train)
assert "class" not in X_train.columns
assert "class" not in X_test.columns

classifier = RandomForestClassifier(random_state=42)
classifier.fit(X_train, y_train)
classifier.predict(X_test)

In [None]:
classifier.predict_proba(X_test)

In [None]:
classifier.classes_