In [1]:
EXPERIMENT_INSTANCE_ID = "fa1926761b4f47dbab4152a405a6df91"


In [2]:
import sys
import warnings
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from copy import deepcopy

from distributed import Client
from mlflow import MlflowClient
from mlflow.entities import RunStatus
from distributed import get_client

import ray
from tqdm.auto import tqdm
import mlflow
import numpy as np
from loguru import logger
from mlxtend.classifier import EnsembleVoteClassifier
from pymoo.algorithms.soo.nonconvex.ga import GA
from pymoo.optimize import minimize
from sklearn.ensemble import BaggingClassifier
from sklearn.metrics import accuracy_score
from pymoo.core.problem import DaskParallelization

from scalarizing.scalarizing import FindingBestExpressionSingleDatasetProblem, FindingBestExpressionProblemMutation, \
    FindingBestExpressionProblemCrossover, FindingBestExpressionProblemSampling, scorer_creator
from scalarizing.scoring_functions import default_scoring_function, diversity_metric_scoring_function
from scalarizing.utils import top_n_indicies
from glob import glob
import pandas as pd
from box import Box
from sklearn.linear_model import Perceptron


  from .autonotebook import tqdm as notebook_tqdm
ujson module not found, using json


In [3]:
run = mlflow.start_run(run_id=EXPERIMENT_INSTANCE_ID)

In [4]:
params = Box(run.data.params, box_recast={
                'bagging_size': int,
                'ensemble_size': int,
                'n_gen': int,
                'pop_size' : int
            })

In [5]:
params

<Box: {'bagging_size': 500, 'ensemble_size': 20, 'n_gen': 100, 'pop_size': 100, 'scoring_method': 'normal', 'train_path': '../../datasets/processed/nursery-train-3-s1.csv', 'dataset': 'nursery-3-s1.csv'}>

In [6]:
def read_dataset(path):
    data = pd.read_csv(path)
    x = data.drop('TARGET', axis=1).values
    y = data['TARGET'].values

    return {
        "x": x,
        "y": y
    }

In [7]:
train_path = params.train_path
test_path = train_path.replace('train','test')

In [8]:
train_path

'../../datasets/processed/nursery-train-3-s1.csv'

In [9]:
test_path

'../../datasets/processed/nursery-test-3-s1.csv'

In [10]:
del params['train_path']

In [11]:
dataset = Box({
        'train': read_dataset(train_path),
        'test': read_dataset(test_path),
        'name': train_path.split("/")[-1].replace("-train", '')
    })


In [12]:
class predict_wrapper(object):
    def __init__(self, predict_func, labels):
        self.predict_func = predict_func
        self.labels = labels

    def __call__(self, *args, **kwargs):
        return self.labels[self.predict_func(*args, **kwargs)]

def raise_not_implemented():
    raise NotImplemented("Predict proba is not supported")
def extract_classifiers_from_bagging(bagging):

    extracted = []
    for classifier in bagging.estimators_:
        cloned_classifier = deepcopy(classifier)
        cloned_classifier.predict = predict_wrapper(cloned_classifier.predict, bagging.classes_)
        cloned_classifier.predict_proba = raise_not_implemented

        extracted.append(cloned_classifier)

    return extracted

In [13]:
@ray.remote
def execute_in_ray(f, x):
    return f(x)

class RayParallelization:

    def __init__(self) -> None:
        super().__init__()

    def __call__(self, f, X):
        results = [execute_in_ray.remote(f, x) for x in X]

        return ray.get(results)


    def __getstate__(self):
        state = self.__dict__.copy()
        return state



In [14]:
class ExecutorParallelization:

    def __init__(self, executor) -> None:
        super().__init__()
        self.executor = executor

    def __call__(self, f, X):
        jobs = [self.executor.submit(f, x) for x in X]
        return [job.result() for job in jobs]

    def __getstate__(self):
        state = self.__dict__.copy()
        state.pop("executor", None) # is not serializable
        return state

In [15]:
warnings.filterwarnings("ignore")

logger.remove()
logger.add(sys.stdout, level='INFO')

1

In [16]:
def do_run_experiment(dataset, params, run_id, scoring_function=default_scoring_function, parallelization=RayParallelization(), mlflow_client = MlflowClient()):
    from loguru import logger

    bagging = BaggingClassifier(base_estimator=Perceptron(), n_estimators=params.bagging_size, max_samples=0.3)
    bagging.fit(dataset.train.x, dataset.train.y)
    problem = FindingBestExpressionSingleDatasetProblem(dataset.train, extract_classifiers_from_bagging(bagging), ensemble_size=params.ensemble_size, scoring_function=scoring_function, elementwise_runner=parallelization)
    result = minimize(problem,
                      GA(
                          pop_size=params.pop_size,
                          verbose=True,
                          seed=42,
                          eliminate_duplicates=False,
                          mutation=FindingBestExpressionProblemMutation(),
                          crossover=FindingBestExpressionProblemCrossover(),
                          sampling=FindingBestExpressionProblemSampling()
                      ),
                      ("n_gen", params.n_gen),
                      verbose=False,
                      save_history=False,
                      seed=42)

    bagging_estimators = np.array(extract_classifiers_from_bagging(bagging))

    scorer = scorer_creator(result.X[0], labels=np.unique(dataset.train.y))
    estimator_accuracies = []
    estimator_scores = []

    for estimator in bagging_estimators:
        predictions = estimator.predict(dataset.train.x)

        estimator_accuracies.append(accuracy_score(dataset.train.y, predictions))
        estimator_scores.append(scorer(dataset.train.y, predictions))

    estimators_selected_by_accuracy = bagging_estimators[top_n_indicies(estimator_accuracies, params.ensemble_size)]

    ensemble = EnsembleVoteClassifier(clfs=estimators_selected_by_accuracy,
                                      weights=[1 for _ in range(params.ensemble_size)],
                                      fit_base_estimators=False)

    ensemble.fit(dataset.train.x, dataset.train.y) # Required by design, but does nothing apart from checking labels
    accuracy_ensemble_train_accuracy = accuracy_score(dataset.train.y, ensemble.predict(dataset.train.x))
    accuracy_ensemble_accuracy = accuracy_score(dataset.test.y, ensemble.predict(dataset.test.x))


    estimators_selected_by_score = bagging_estimators[top_n_indicies(estimator_scores, params.ensemble_size)]
    ensemble = EnsembleVoteClassifier(clfs=estimators_selected_by_score,
                                      weights=[1 for _ in range(params.ensemble_size)],
                                      fit_base_estimators=False)

    ensemble.fit(dataset.train.x, dataset.train.y) # Required by design, but does nothing apart from checking labels
    score_ensemble_train_accuracy = accuracy_score(dataset.train.y, ensemble.predict(dataset.train.x))
    score_ensemble_accuracy = accuracy_score(dataset.test.y, ensemble.predict(dataset.test.x))


    if(accuracy_ensemble_train_accuracy > score_ensemble_train_accuracy):
        mlflow_client.log_metric(run_id, "accuracy_ensemble_selected", True)
        selected_ensemble_accuracy = accuracy_ensemble_accuracy
    else:
        selected_ensemble_accuracy = score_ensemble_accuracy

    mlflow_client.log_metric(run_id, "selected_ensemble_accuracy", selected_ensemble_accuracy)
    mlflow_client.log_metric(run_id, "method_selection_accuracy", score_ensemble_accuracy)
    mlflow_client.log_metric(run_id, "accuracy_selection_accuracy", accuracy_ensemble_accuracy)

    logger.info(f"{idx} method={selected_ensemble_accuracy} normal={accuracy_ensemble_accuracy}, discarded={accuracy_ensemble_train_accuracy > score_ensemble_train_accuracy}, diff={score_ensemble_accuracy-accuracy_ensemble_accuracy}")

In [17]:
def run_experiment(dataset, params, run_id, parallelization=RayParallelization()):
    scoring_function = default_scoring_function
    
    if params.scoring_method == 'diversity':
        scoring_function = diversity_metric_scoring_function
    
    
    from loguru import logger
    mlflow_client = MlflowClient()

    mlflow_client.log_param(run_id, "dataset", dataset.name)

    try:
        do_run_experiment(dataset, params, scoring_function=scoring_function, run_id=run_id, parallelization=parallelization)
        mlflow_client.set_terminated(run_id=run_id)
    except Exception as ex:
        logger.exception(ex)
        mlflow_client.set_terminated(run_id=run_id, status="FAILED")



In [18]:
params

<Box: {'bagging_size': 500, 'ensemble_size': 20, 'n_gen': 100, 'pop_size': 100, 'scoring_method': 'normal', 'dataset': 'nursery-3-s1.csv'}>

In [19]:
import dill
from scalarizing.utils import np_cache
def custom_serializer(a):
    return dill.dumps(a)

def custom_deserializer(b):
    return dill.loads(b)

In [20]:
ray.util.register_serializer(
  np_cache, serializer=custom_serializer, deserializer=custom_deserializer)

In [21]:
ray.util.inspect_serializability(run_experiment)

Checking Serializability of <function run_experiment at 0x2af47bd07d90>


(True, set())

In [22]:
from concurrent.futures import ProcessPoolExecutor

In [23]:
from aiocache import cached, Cache

In [24]:
def some_x(a):
    return a + 2

In [25]:
cached_some_x = cached(cache=Cache.MEMORY)(some_x)

In [26]:
executor = ProcessPoolExecutor(2)

In [27]:

%%time
run_experiment(dataset, Box({**params, 'n_gen': 1}), EXPERIMENT_INSTANCE_ID, parallelization=ExecutorParallelization(executor))



Compiled modules for significant speedup can not be used!
https://pymoo.org/installation.html#installation

from pymoo.config import Config

2022-11-26 13:57:31.339 | ERROR    | __main__:run_experiment:17 - y contains previously unseen labels: [2]
concurrent.futures.process._RemoteTraceback: 
"""
Traceback (most recent call last):
  File "/home/bogul/mambaforge/lib/python3.10/concurrent/futures/process.py", line 246, in _process_worker
    r = call_item.fn(*call_item.args, **call_item.kwargs)
  File "/home/bogul/scalarizing/.venv/lib/python3.10/site-packages/pymoo/core/problem.py", line 20, in __call__
    self.problem._evaluate(x, out, *self.args, **self.kwargs)
  File "/home/bogul/scalarizing/scalarizing/scalarizing.py", line 162, in _evaluate
    out["F"] = self.scoring_function(scorer, self.folds_iterator(), self.classifiers, self.ensemble_size)
  File "/home/bogul/scalarizing/scalarizing/scoring_functions.py", line 28, in default_scoring_function
    'by_score': accuracy_score(y_