In [1]:
import random
from dstoolkit.core import Log
from dstoolkit.core import trace
from dstoolkit.dev import apply_test
from dstoolkit.dev import DataStep
from dstoolkit.dev import DataStepDataframe
from dstoolkit.engine import Engine
# from pathlib import Path
# from pydataset import data
from pyspark.sql import functions as F
from sklearn import svm
from sklearn import datasets
from sklearn.model_selection import cross_val_score
from sklearn.model_selection import ParameterSampler
from sklearn.utils.fixes import loguniform
import numpy as np
import pandas as pd

An error was encountered:
Invalid status code '401' from https://synapseaccleratorsws.dev.azuresynapse.net/livyApi/versions/2019-11-01-preview/sparkPools/sparkpool/sessions/-1 with error payload: {"code":"ExpiredAuthenticationToken","message":"Token Authentication failed with SecurityTokenExpiredException - IDX10223: Lifetime validation failed. The token is expired. ValidTo: '[PII is hidden]', Current time: '[PII is hidden]'."}


In [None]:

class Step_loadData(DataStep):
    """Load the defined dataset."""

    def test(self):
        """Apply data tests."""
        self.test_is_dataframe_empty(df=self.output_data.dataframe)
        self.test_null_values(
            cols=['sepallength', 'sepalwidth'],
            dt=self.output_data.dataframe
        )

    @apply_test
    @trace
    def initialize(self):
        """
        Initialize the DataStep.

        Parameters
        ----------
        name_dataset : str
            Name of the dataset to load from pydataset package
        """
        s_dt = datasets.load_iris()
        p_df = pd.DataFrame(
            data=np.c_[s_dt['data'], s_dt['target']],
            columns=s_dt['feature_names'] + ['target']
        )
        p_df.columns = [c.replace(' ', '') for c in p_df.columns]
        p_df.columns = [c.replace('(cm)', '') for c in p_df.columns]
        dt = self.spark.createDataFrame(p_df)
        self.set_output_data(dt)



In [None]:
class RandomException(Exception):
    """Random exception"""
    pass

In [None]:
class Step_crossValidate(DataStep):
    """Run multiple models in parallel."""

    def test(self):
        pass

    @trace(attrs_refact=['ai_cs'])
    def initialize(
        self,
        dt: DataStepDataframe,
        pipeline_name: str,
        ai_cs: str,
        n_iter: int
    ):
        param_grid = {
            'C': loguniform(1e0, 1e3),
            'kernel': ['linear', 'rbf'],
            'class_weight': ['balanced', None]
        }
        rng = np.random.RandomState(0)
        param_list = list(
            ParameterSampler(
                param_grid,
                n_iter=n_iter,
                random_state=rng
            )
        )
        # p_dt = Engine.get_instance().spark().createDataFrame(pd.DataFrame(param_list)).\
        #     withColumn('id', F.monotonically_increasing_id())
        p_dt = self.spark.createDataFrame(pd.DataFrame(param_list)).\
            withColumn('id', F.monotonically_increasing_id())
        dt_train = dt.dataframe.crossJoin(
            p_dt
        )

        udf_schema = dt_train.select(
            'id',
            F.lit(0.0).alias('score')
        ).schema

        def pudf_train(dt_model):
            param_id = dt_model['id'].unique()[0]
            param_c = dt_model['C'].unique()[0]
            param_class_weight = dt_model['class_weight'].unique()[0]
            param_kernel = dt_model['kernel'].unique()[0]

            logging_custom_dimensions = {
                'id': str(param_id),
                'C': str(param_c),
                'class_weight': param_class_weight,
                'kernel': param_kernel
            }

            Log(pipeline_name, ai_cs)

            try:

                # Raising randomly an exception
                if random.randint(0, 20) > 15:
                    raise RandomException('Random exception')

                dt_x = dt_model[
                    [
                        'sepallength',
                        'sepalwidth',
                        'petallength',
                        'petalwidth'
                    ]
                ]
                y = dt_model['target']
                clf = svm.SVC(
                    kernel=param_kernel,
                    C=param_c,
                    class_weight=param_class_weight,
                    random_state=42
                )
                scores = cross_val_score(clf, dt_x, y, cv=5, scoring='f1_macro')
                score = scores.mean()
                dt_out = pd.DataFrame(
                    {
                        'id': [param_id],
                        'score': [score]
                    }
                )
                Log.get_instance().log_info("Training:success", custom_dimension=logging_custom_dimensions)
            except Exception:
                Log.get_instance().log_error("Training:failed", custom_dimension=logging_custom_dimensions)
                dt_out = pd.DataFrame(
                    {
                        'id': [param_id],
                        'score': [-1]
                    }
                )
            return dt_out

        '''
        dt_model = dt_train.where(F.col('id') == 25769804021).toPandas()
        '''
        dt_cross_evals = dt_train.\
            groupBy(['id']).\
            applyInPandas(pudf_train, schema=udf_schema).\
            cache()
        dt_cross_evals.count()
        self.set_output_data(dt_cross_evals)


In [None]:
PIPELINE_NAME = "Remote Testing"
ENVVAR_AI_CS = 'AI_CS'
KEY_VAULT_SECRET = 'aics'
KEY_VAULT_NAME = 'synapseaccleratorkv'
RUN_ID = 'test_run_id'

Engine()
Engine().get_instance().initialize_env()
# pipeline_name = Path(__file__).stem

if Engine().get_instance().is_ide_local():
    from dotenv import load_dotenv
    import os
    load_dotenv(dotenv_path='/workspaces/dstoolkit-ml-ops-for-synapse-dev/.env')
    Engine().get_instance().initialize_logger(
        pipeline_name=PIPELINE_NAME,
        aics=os.environ[ENVVAR_AI_CS]
    )
elif Engine().get_instance().is_ide_remote():
    Engine().get_instance().initialize_logger(
        pipeline_name=PIPELINE_NAME,
        aics_kv_secret=KEY_VAULT_SECRET, 
        aics_kv_name=KEY_VAULT_NAME
    )

In [None]:
step_loadData = Step_loadData(
    spark=Engine.get_instance().spark(),
    run_id=RUN_ID
)

step_loadData.initialize()


In [None]:
step_crossValidate = Step_crossValidate(
    spark=Engine.get_instance().spark(),
    run_id=RUN_ID
)

step_crossValidate.initialize(
    dt=step_loadData.output_data,
    pipeline_name=PIPELINE_NAME,
    ai_cs=Engine().get_instance().ai_cs,
    n_iter=1000
)

step_crossValidate.output_data.dataframe.toPandas()