In [1]:
import logging
import os
import pickle
from typing import Dict, Tuple

import hyperopt.pyll.stochastic
import numpy as np
from fedot.api.main import Fedot
from fedot.core.pipelines.adapters import PipelineAdapter
from fedot.core.pipelines.pipeline import Pipeline
from fedot.core.pipelines.pipeline_composer_requirements import PipelineComposerRequirements
from fedot.core.pipelines.pipeline_node_factory import PipelineOptNodeFactory
from fedot.core.pipelines.random_pipeline_factory import RandomPipelineFactory
from fedot.core.pipelines.tuning.search_space import PipelineSearchSpace
from fedot.core.repository.operation_types_repository import get_operations_for_task
from fedot.core.repository.tasks import TaskTypesEnum, Task
from golem.core.dag.graph_verifier import GraphVerifier
from golem.core.dag.graph_verifier import VerificationError
from golem.core.dag.verification_rules import DEFAULT_DAG_RULES
from tqdm import tqdm
import sys

In [2]:
class HiddenPrints:
    def __enter__(self):
        self._original_stdout = sys.stdout
        sys.stdout = open(os.devnull, 'w')

    def __exit__(self, exc_type, exc_val, exc_tb):
        sys.stdout.close()
        sys.stdout = self._original_stdout

In [3]:
class PipelineGenerator:
    def __init__(self):
        task = Task(TaskTypesEnum.classification)
        ops = get_operations_for_task(task)
        rules_for_constraint = tuple(DEFAULT_DAG_RULES)
        self.adapter = PipelineAdapter()
        verifier = GraphVerifier(rules_for_constraint, self.adapter)
        self.pipeline_requirements = PipelineComposerRequirements(primary=ops, secondary=ops)
        node_factory = PipelineOptNodeFactory(self.pipeline_requirements)
        self.random_pipeline_factory = RandomPipelineFactory(verifier, node_factory)
        self.parameters_per_operation = PipelineSearchSpace().parameters_per_operation

    def get_random_pipeline(self, randomize_hyperparameters=True):
        graph = self.random_pipeline_factory(self.pipeline_requirements)
        pipeline = self.adapter._restore(graph)
        if randomize_hyperparameters:
            pipeline = self.randomize_hyperparameters(pipeline)
        return pipeline

    def randomize_hyperparameters(self, pipeline: Pipeline) -> Pipeline:
        for i in range(len(pipeline.nodes)):
            node = pipeline.nodes[i]
            operation_name = node.name
            new_parameters = {}
            try:
                parameters_per_operation = self.parameters_per_operation[operation_name]
            except KeyError:
                continue
            for hp_name, hp_space in parameters_per_operation.items():
                function, args = hp_space
                space = function(hp_name, *args)
                new_parameters[hp_name] = hyperopt.pyll.stochastic.sample(space)
            pipeline.nodes[i].parameters = new_parameters
        return pipeline

    def __call__(self):
        return self.get_random_pipeline()

In [4]:
class FittedPipelineGenerator:
    def __init__(self):
        self.save_dir = "../pipeline_dataset"
        self.features = np.load("../synthetic_dataset/features.npy")
        self.target = np.load("../synthetic_dataset/target.npy")
        self.api = Fedot("classification", logging_level=logging.CRITICAL)
        self.pipeline_generator = PipelineGenerator()
        self.caught_errors = []

    def get_fitted_pipeline(self) -> Tuple[Pipeline, Dict[str, float]]:
        fitted = False
        while not fitted:
            pipeline = self.pipeline_generator()
            try:
                with HiddenPrints():
                    fitted_pipeline = self.api.fit(self.features, self.target, pipeline)
                fitted = True
            except VerificationError:
                continue
        self.api.predict(self.features)
        try:
            metrics = self.api.get_metrics(self.target)
        except Exception as e:  # TODO: specify error
            self.caught_errors.append(e)
            return self.get_fitted_pipeline()
        return fitted_pipeline, metrics

    def save_sample(self, pipeline: Pipeline, metrics: Dict[str, float]):
        json_object, dict_fitted_operations = pipeline.save(os.path.join(self.save_dir, "pipelines"))
        sample_name = dict_fitted_operations["operation_0"].split("\\")[-3]  # TODO: fix hardcode for Windows
        metrics_dir = os.path.join(self.save_dir, "metrics", sample_name)
        if not os.path.exists(metrics_dir):
            os.mkdir(metrics_dir)
        with open(os.path.join(metrics_dir, "metrics.pickle"), "wb") as f:
            pickle.dump(metrics, f)

    def generate(self):
        for _ in tqdm(range(10000)):
            fitted_pipeline, metrics = self.get_fitted_pipeline()
            self.save_sample(fitted_pipeline, metrics)

In [5]:
fpg = FittedPipelineGenerator()

In [None]:
fpg.generate()