In [1]:
"""Run this notebook from `<repository_root>/notebooks`"""

'Run this notebook from `<repository_root>/notebooks`'

In [2]:
import sys
sys.path.append("..")

import pickle
import shutil
from tqdm import tqdm
from typing import List, Union, Tuple, Dict, Optional, Any
import json
import pickle
import os
import openml
import pandas as pd
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from meta_automl.data_preparation.models_loaders import KnowledgeBaseModelsLoader
from meta_automl.data_preparation.pipeline_features_extractors import FEDOTPipelineFeaturesExtractor
from meta_automl.data_preparation.meta_features_extractors import OpenMLDatasetMetaFeaturesExtractor
from meta_automl.data_preparation.feature_preprocessors import FeaturesPreprocessor
from fedot.core.pipelines.pipeline import Pipeline
from torch_geometric.data import Data
from meta_automl.data_preparation.model import Model

In [3]:
class KnowledgeBaseToDataset:
    def __init__(
        self,
        knowledge_base_directory: str,
        dataset_directory: str,
        meta_features_data_columns: List[str],
        split: Optional[str] = "all", # Can be train, test, all
        train_test_split_name: Optional[str] = "train_test_datasets_classification.csv",
        task_type: Optional[str] = "classification",
        fitness_metric: Optional[str] = "f1",
        exclude_datasets: Optional[List[str]] = None,  # In accordance to @MorrisNein message in chat
        meta_features_preprocessors: Dict[str, Any] = None,
    ) -> None:
        if task_type != "classification":
            raise NotImplementedError(f"Current version if for `'classification'` `task_type`")
        if fitness_metric == "log_loss":
            self.fitness_coef = 1
        else:
            self.fitness_coef = -1
        
        self.knowledge_base_directory = knowledge_base_directory
        self.dataset_directory = dataset_directory
        self.meta_features_data_columns = meta_features_data_columns
        self.train_test_split_name = train_test_split_name
        self.task_type = task_type
        self.split = split
        self.fitness_metric = fitness_metric
        self.exclude_datasets = exclude_datasets
        self.meta_features_preprocessors = meta_features_preprocessors

        self._maybe_create_dataset_directory(os.path.join(self.dataset_directory, self.split))

        self.pipeline_extractor = FEDOTPipelineFeaturesExtractor()
        self.meta_features_extractor = OpenMLDatasetMetaFeaturesExtractor(meta_features_data_columns=self.meta_features_data_columns)

        self.models_loader = KnowledgeBaseModelsLoader(self.knowledge_base_directory)
        df_datasets = self.models_loader.parse_datasets(self.split, self.task_type)
        self.df_datasets = df_datasets[df_datasets["dataset_name"].apply(lambda x: x not in self.exclude_datasets)]
        self._check_for_duplicated_datasets()

    def _check_for_duplicated_datasets(self):
        occurences = self.df_datasets.dataset_id.value_counts()
        unique_number_of_occurences = set(occurences.to_list())
        assert len(unique_number_of_occurences) == 1, f"Duplicated datasets detected. Check datasets: \n{occurences}"
        assert unique_number_of_occurences.pop() == 1, f"Duplicated datasets detected. Check datasets: \n{occurences}"

    def _maybe_create_dataset_directory(self, directory: str) -> None:
        if not os.path.exists(directory):
            os.mkdir(directory)

    def _get_pipeline_features(self, pipeline: Pipeline) -> Data:
        pipeline_json_string = pipeline.save()[0].encode()
        return self.pipeline_extractor(pipeline_json_string)

    def _get_best_pipelines_unique_indexes(self, dataset_models: List[Model]) -> List[int]:
        temp_df = pd.DataFrame(columns=["predictor", "fitness"])
        temp_df["predictor"] = [str(x.predictor) for x in dataset_models]
        temp_df["fitness"] = [self.fitness_coef * x.fitness.value for x in dataset_models]
        # Select top-1 pipeline 
        best_pipelines_unique_indexes = temp_df.groupby('predictor')['fitness'].idxmax().to_list()
        return best_pipelines_unique_indexes
    
    def _process(self) -> Tuple[List[Dict[str, Union[float, int]]], List[Dict[str, float]], List[Data], List[int]]:
        pipeline_id = 0

        task_pipe_comb = []
        datasets_meta_features = []
        pipelines = []
        is_train_flags = []
        
        for task_id in tqdm(self.df_datasets.index):
            dataset = self.df_datasets.loc[task_id]
            datasets_meta_features.append(self.meta_features_extractor(dataset.dataset_id))
            is_train_flags.append(dataset.is_train)

            dataset_models = self.models_loader.load(
                dataset_names=[dataset.dataset_name],
                fitness_metric=self.fitness_metric,
            )
            best_pipelines_unique_indexes = self._get_best_pipelines_unique_indexes(dataset_models)
            
            for index in best_pipelines_unique_indexes:
                model = dataset_models[index]
                pipelines.append(self._get_pipeline_features(model.predictor))
                y = self.fitness_coef * model.fitness.value
                task_pipe_comb.append({"task_id": task_id, "pipeline_id": pipeline_id, "y": y})
                pipeline_id += 1

        return task_pipe_comb, datasets_meta_features, pipelines, is_train_flags
    
    def _save_task_pipe_comb(self, task_pipe_comb: List[Dict[str, Union[float, int]]]):
        task_pipe_comb_df = pd.DataFrame.from_records(task_pipe_comb)
        task_pipe_comb_df.to_csv(
            os.path.join(self.dataset_directory, self.split, "task_pipe_comb.csv"),
            header=True,
            index=True,
        )
    
    def _save_datasets_meta_features(self, datasets_meta_features: List[Dict[str, float]]):
        df = pd.DataFrame.from_records(datasets_meta_features)
        if self.meta_features_preprocessors is not None:
            df_as_dict = {k: list(v.values()) for k, v in df.to_dict().items()}
            self.meta_features_preprocessors.fit(
                df_as_dict, 
                os.path.join(self.dataset_directory, self.split, "meta_features_preprocessors.pickle"),
            )
            transformed = self.meta_features_preprocessors.transform(df_as_dict, single=False)
            df = pd.DataFrame.from_dict({k: v.reshape(-1) for k,v in transformed.items()})
        
        df.to_csv(
            os.path.join(self.dataset_directory, self.split, "datasets.csv"),
            header=True,
            index=False,
        )

    def _save_pipelines(self, pipelines: List[Data]):
        with open(os.path.join(self.dataset_directory, self.split, "pipelines.pickle"), "wb") as f:
            pickle.dump(pipelines, f)
    
    def _save_split(self, is_train_flags: List[int]):
        split = {
            "train": [],
            "test": [],
        }
        for i, flag in enumerate(is_train_flags):
            if flag == 1:
                split["train"].append(i)
            else:
                split["test"].append(i)
        with open(os.path.join(self.dataset_directory, self.split, "split.json"), "w") as f:
            json.dump(split, f)

    
    def convert(self):
        task_pipe_comb, datasets_meta_features, pipelines, is_train_flags = self._process()
        self._save_split(is_train_flags)
        self._save_pipelines(pipelines)
        self._save_datasets_meta_features(datasets_meta_features)
        self._save_task_pipe_comb(task_pipe_comb)


In [4]:
preprocessors = {
    "MajorityClassSize": StandardScaler(),
    "MaxNominalAttDistinctValues": StandardScaler(),
    "MinorityClassSize": StandardScaler(),
    "NumberOfClasses": StandardScaler(),
    "NumberOfFeatures": StandardScaler(),
    "NumberOfInstances": StandardScaler(),
    "NumberOfInstancesWithMissingValues": StandardScaler(),
    "NumberOfMissingValues": StandardScaler(),
    "NumberOfNumericFeatures": StandardScaler(),
    "NumberOfSymbolicFeatures": StandardScaler(),
}
meta_features_preprocessor = FeaturesPreprocessor(preprocessors)


converter = KnowledgeBaseToDataset(
    knowledge_base_directory = "../data/knowledge_base_0",
    dataset_directory = "../data/openml_meta_features_and_fedot_pipelines",
    meta_features_data_columns = [
        "MajorityClassSize",
        "MaxNominalAttDistinctValues",
        "MinorityClassSize",
        "NumberOfClasses",
        "NumberOfFeatures",
        "NumberOfInstances",
        "NumberOfInstancesWithMissingValues",
        "NumberOfMissingValues",
        "NumberOfNumericFeatures",
        "NumberOfSymbolicFeatures"
    ],
    train_test_split_name = "train_test_datasets_classification.csv",
    task_type="classification",
    fitness_metric = "f1",
    exclude_datasets = ["connect-4", "higgs"],
    meta_features_preprocessors=meta_features_preprocessor,
)
converter.convert()

  0%|          | 0/41 [00:00<?, ?it/s]

2023-07-15 00:14:44.407228: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-07-15 00:14:45.140209: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /home/cherniak/.local/share/virtualenvs/MetaFEDOT-wsJWSqtd/lib/python3.10/site-packages/cv2/../../lib64:
2023-07-15 00:14:45.140280: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvinfer_plugin.so.7: cannot open shared object file: No such file or directory; LD_LIBRARY_PATH: /home/cherniak/.local/sha