In [51]:
import sys
import os
import time
import logging
import argparse
import numpy as np
from datetime import datetime
from joblib import Parallel, delayed

# Set the parent directory. This assumes your notebook is in the project root or a subdirectory.
# Adjust the path as necessary.
parent_dir = os.path.abspath(os.path.join(os.getcwd(), ".."))
sys.path.append(parent_dir)

from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import train_test_split

# Custom imports from your project
from estimators.statistical_descriptor import Nagler_WS
from utils.dataset_management import parse_pipeline
from utils.dataset_load import shuffle_data, DatasetLoader
from utils.fold_management import FoldManagement
from utils.label_management import LabelManagement
from utils.balance_management import BalanceManagement
from utils.figures import *
from utils.files_management import *


In [7]:
rng = np.random.RandomState(seed=442)
print(rng)

RandomState(MT19937)


In [13]:
with open('/home/listic/Bureau/cortes_stage/ML-WetSnowSAR_pipeline_stage/pipeline/parameter/config_param.yml', 'r') as file:
    tmp = yaml.safe_load(file)

tmp

{'fixed_args': {'options': {'--data_path': '/home/listic/Bureau/cortes_stage/ML-WetSnowSAR_pipeline_stage/pipeline/data/dataset/dataset_AD_08200821_14Mas3Top3Phy_W15_corrected_V2.h5',
   '--fold_method': 'combinationFold',
   '--labeling_method': 'crocus',
   '--balancing_method': 'undersample',
   '--request': '(date.dt.month == 3 and date.dt.day == 1) and ((elevation > 1000) and (elevation < 2000))',
   '--shuffle_data': True,
   '--balance_data': False,
   '--import_list': ['from sklearn.svm import SVC',
    'from sklearn.neighbors import KNeighborsClassifier',
    'from sklearn.ensemble import AdaBoostClassifier, RandomForestClassifier',
    'from sklearn.linear_model import LogisticRegression',
    'from sklearn.neural_network import MLPClassifier',
    'from estimators.statistical_descriptor import *',
    'from estimators.band_transform import *'],
   '--pipeline': [[['KNN_direct'],
     ['BandSelector', {'bands': [0, 1, 2, 3, 4, 5, 6, 7, 8]}],
     ['BandTransformer', {'bands':

In [37]:
import_list = tmp["fixed_args"]["options"]["--import_list"]
pipeline = tmp["fixed_args"]["options"]["--pipeline"]
pipeline

[[['KNN_direct'],
  ['BandSelector', {'bands': [0, 1, 2, 3, 4, 5, 6, 7, 8]}],
  ['BandTransformer', {'bands': [0, 1, 2, 3]}, {'transformations': []}],
  ['Hist_SAR'],
  ['KNeighborsClassifier', {'n_neighbors': 50}]],
 [['RandomForest_direct'],
  ['BandSelector', {'bands': [0, 1, 2, 3, 4, 5, 6, 7, 8]}],
  ['BandTransformer', {'bands': [0, 1, 2, 3]}, {'transformations': []}],
  ['Hist_SAR'],
  ['RandomForestClassifier', {'n_estimators': 200}, {'criterion': 'entropy'}]],
 [['MLP_direct'],
  ['BandSelector', {'bands': [0, 1, 2, 3, 4, 5, 6, 7, 8]}],
  ['BandTransformer', {'bands': [0, 1, 2, 3]}, {'transformations': []}],
  ['Hist_SAR'],
  ['MLPClassifier', {'alpha': 0.01}]],
 [['SVMrbf_direct'],
  ['BandSelector', {'bands': [0, 1, 2, 3, 4, 5, 6, 7, 8]}],
  ['BandTransformer', {'bands': [0, 1, 2, 3]}, {'transformations': []}],
  ['Hist_SAR'],
  ['SVC', {'kernel': 'rbf'}, {'probability': True}]],
 [['LogisticR_direct'],
  ['BandSelector', {'bands': [0, 1, 2, 3, 4, 5, 6, 7, 8]}],
  ['BandTrans

In [55]:
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import LabelEncoder
from imblearn.under_sampling import RandomUnderSampler
from imblearn.over_sampling import RandomOverSampler
from imblearn.combine import SMOTEENN
from inspect import signature
import os, ast

from utils.files_management import load_h5

pipe = [['AdaBoost_direct'],['AdaBoostClassifier', {'n_estimators': 200}]]
steps = []

for step in pipe[1:]:
    name_method = step[0]
    params = step[1] if len(step) > 1 else {}
    estimator = globals()[name_method](**params)

    # Set random_state if it is a parameter for the estimator and rng is provided
    if rng is not None and 'random_state' in signature(estimator.__init__).parameters:
        setattr(estimator, 'random_state', rng)

    steps.append((name_method, estimator))
Pipeline(steps, verbose=True, memory=".cache")


KeyError: 'Hist_SAR'

In [48]:
import ast
from sklearn.pipeline import Pipeline
from inspect import signature

def parse_pipeline(args, idx, rng=None):
    """Parse a dictionary to create a pipeline of estimators
    The dictionary must have the following structure::
    {
        "import": [
            "from sklearn.preprocessing import StandardScaler",
            "from sklearn.decomposition import PCA",
            "from sklearn.svm import SVC",
        ],
        "pipeline": [
            [
                ["StandardScaler", {"with_mean": False, "with_std": False}],
                ["PCA", {"n_components": 0.95}],
                ["SVC", {"kernel": "rbf", "C": 10, "gamma": 0.01}],
            ]
        ],
    }

    Parameters
    ----------
    args : Namespace
        Namespace object containing the pipeline

    idx : int
        Index of the pipeline to use in case of multiple pipelines

    rng : int, optional
        Random state to set for estimators that support it.

    Returns
    -------
    sklearn.pipeline.Pipeline
        Pipeline of estimators
    """
    for import_lib in args.import:
        exec(import_lib, globals())
    
    pipe = ast.literal_eval(str(args.pipeline[idx]))
    steps = []

    for step in pipe:
        name_method = step[0]
        params = step[1] if len(step) > 1 else {}
        estimator = globals()[name_method](**params)

        # Set random_state if it is a parameter for the estimator and rng is provided
        if rng is not None and 'random_state' in signature(estimator.__init__).parameters:
            setattr(estimator, 'random_state', rng)

        steps.append((name_method, estimator))
    return Pipeline(steps, verbose=True, memory=".cache")

# Example usage
from argparse import Namespace

args = Namespace(
    import=[
        "from sklearn.preprocessing import StandardScaler",
        "from sklearn.decomposition import PCA",
        "from sklearn.svm import SVC",
    ],
    pipeline=[
        [
            ["StandardScaler", {"with_mean": False, "with_std": False}],
            ["PCA", {"n_components": 0.95}],
            ["SVC", {"kernel": "rbf", "C": 10, "gamma": 0.01}],
        ]
    ])   

pipeline = parse_pipeline(args, idx=0, rng=42)
print(pipeline)


SyntaxError: invalid syntax (1875665354.py, line 39)

In [24]:
parse_pipeline(tmp["fixed_args"]["options"], 0)

AttributeError: 'dict' object has no attribute 'import_list'