In [None]:
# Funkcja odpalana jako pandas_udf




In [None]:
from abc import ABC, abstractmethod
from typing import List



class RetrainingStrategy(ABC):
    
    @abstractmethod
    def get_retraining_data(x_history: List, y_history: List[int], prediction_history: List[int], drift_history: List[int]):
        pass

In [None]:
from abc import ABC, abstractmethod
from typing import List


class EvaluationStrategyManager(ABC):
    
    @abstractmethod
    def get_curr_ref_data(self, x_history: List, y_history: List[int], prediction_history: List[int], drift_history: List[int]):
        pass

In [None]:
from abc import ABC, abstractmethod
from typing import List


class ModelEstimatorPipeline(ABC):
    
    @abstractmethod
    def handle(self, x, y):
        pass

    @abstractmethod
    def adjust_model(self, x_history: List, y_history: List[int], prediction_history: List[int], drift_history: List[int]):
        pass

    @abstractmethod
    def get_name(self):
        pass

In [None]:
from sklearn.pipeline import Pipeline
from typing import Dict, List



class ModelSklearnPipeline(ModelEstimatorPipeline):

    def __init__(self, sklearn_pipeline: Pipeline, hyperparameter_space: Dict, retraining_strategy: TrainingStrategyManager):
        self.estimator = sklearn_pipeline
        self.hyperparameter_space = hyperparameter_space
        self.retraining_strategy = retraining_strategy

    def handle(self, x, y):
        return self.estimator.predict(x)
    
    def adjust_model(self, x_history: List, y_history: List[int], prediction_history: List[int], drift_history: List[int]):
        x_train, y_train = self.retraining_strategy.get_retraining_data(x_history, y_history, prediction_history, drift_history)
        self.estimator.fit(x_train, y_train) 

    def get_name(self):
        return super().get_name() # TO DO

In [None]:
from river import compose
from typing import List


class ModelRiverPipeline(ModelEstimatorPipeline):
    
    def __init__(self, river_pipeline: compose.Pipeline):
        self.estimator = river_pipeline

    def handle(self, x, y):
        prediction = self.estimator.predict_one(x)
        self.estimator.learn_one(x, y)
        return prediction

    def adjust_model(self, x_history: List, y_history: List[int], prediction_history: List[int], drift_history: List[int]):
        return
    
    def get_name(self):
        return super().get_name() # TO DO

In [None]:
class ModelEvaluationPipeline:
    
    def __init__(self, metric_steps):
        self.metric_steps = metric_steps

    def handle(self, y_true, y_predict):
        results = {}
        for metric_name, metric in self.steps:
            metric_value = metric.update(y_true, y_predict)
            results.update({metric_name: metric_value})
        return results

In [None]:
from abc import ABC, abstractmethod
from typing import List



class MonitoringStep(ABC):

    @abstractmethod
    def monitor(self, x_history: List, y_history: List[int], prediction_history: List[int], drift_history: List[int]) -> bool:
        pass

In [None]:
from evidently.test_suite import TestSuite



class EvidentlyMonitoringStep(MonitoringStep):

    def __init__(self, evidently_test_suite: TestSuite, evaluation_strategy: EvaluationStrategyManager):
        self.detector = evidently_test_suite
        self.eval_strategy = evaluation_strategy

    def monitor(self, x_history: List, y_history: List[int], prediction_history: List[int], drift_history: List[int]) -> bool:
        curr, ref = self.eval_strategy.get_curr_ref_data(x_history, y_history, prediction_history, drift_history)
        self.detector.run(reference_data=ref,current_data=curr)
        report = self.detector.as_dict()
        return True # to do based on report
        

In [None]:

# Experiment -> uruchomiony na danym partition: definiuje experiment pipeline



datastream_name = datastream['name'][0]
datastream = datastream.drop(col=['name'])
logger = Logger(dataset_name=name)

logger.start()
for x, y in datastream:

   logger.iter()
   pipe = StreamClassificationPipeline()

   pipe.handle(x, y)
   logger.iter_end()

logger.end()

        


In [33]:
list([{'a': 4}][0].values())[0]

4

In [55]:
from river.drift import ADWIN

a = ADWIN()

In [56]:
type(a)

river.drift.adwin.ADWIN

In [92]:
a.clock

32

In [58]:
'_helper'.startswith('_')

True

In [60]:
vars(a).items()

dict_items([('_drift_detected', False), ('delta', 0.002), ('clock', 32), ('max_buckets', 5), ('min_window_length', 5), ('grace_period', 10), ('_helper', <river.drift.adwin_c.AdaptiveWindowing object at 0x000001BA0C9EA5C0>)])

In [65]:
{item for item in vars(a).items() if not item[0].startswith('_')}

{('clock', 32),
 ('delta', 0.002),
 ('grace_period', 10),
 ('max_buckets', 5),
 ('min_window_length', 5)}

In [48]:
import pandas as pd
import numpy as np

from sklearn import datasets, ensemble, model_selection

from evidently import ColumnMapping
from evidently.test_suite import TestSuite

from evidently.test_preset import NoTargetPerformanceTestPreset
from evidently.test_preset import DataQualityTestPreset
from evidently.test_preset import DataStabilityTestPreset
from evidently.test_preset import DataDriftTestPreset
from evidently.test_preset import RegressionTestPreset
from evidently.test_preset import MulticlassClassificationTestPreset
from evidently.test_preset import BinaryClassificationTopKTestPreset
from evidently.test_preset import BinaryClassificationTestPreset

from evidently.tests import TestNumberOfEmptyRows, TestNumberOfEmptyColumns, TestNumberOfDuplicatedRows, TestNumberOfDuplicatedColumns, TestNumberOfDriftedColumns, TestShareOfDriftedColumns

In [36]:
#Dataset for Data Quality and Integrity
adult_data = datasets.fetch_openml(name='adult', version=2, as_frame='auto')
adult = adult_data.frame

adult_ref = adult[~adult.education.isin(['Some-college', 'HS-grad', 'Bachelors'])]
adult_cur = adult[adult.education.isin(['Some-college', 'HS-grad', 'Bachelors'])]

adult_cur.iloc[:2000, 3:5] = np.nan

  warn(


In [37]:
#Dataset for Regression
housing_data = datasets.fetch_california_housing(as_frame='auto')
housing = housing_data.frame

housing.rename(columns={'MedHouseVal': 'target'}, inplace=True)
housing['prediction'] = housing_data['target'].values + np.random.normal(0, 3, housing.shape[0])

housing_ref = housing.sample(n=5000, replace=False)
housing_cur = housing.sample(n=5000, replace=False)

In [38]:
#Dataset for Binary Probabilistic Classifcation
bcancer_data = datasets.load_breast_cancer(as_frame='auto')
bcancer = bcancer_data.frame

bcancer_ref = bcancer.sample(n=300, replace=False)
bcancer_cur = bcancer.sample(n=200, replace=False)

bcancer_label_ref = bcancer_ref.copy(deep=True)
bcancer_label_cur = bcancer_cur.copy(deep=True)

model = ensemble.RandomForestClassifier(random_state=1, n_estimators=10)
model.fit(bcancer_ref[bcancer_data.feature_names.tolist()], bcancer_ref.target)

bcancer_ref['prediction'] = model.predict_proba(bcancer_ref[bcancer_data.feature_names.tolist()])[:, 1]
bcancer_cur['prediction'] = model.predict_proba(bcancer_cur[bcancer_data.feature_names.tolist()])[:, 1]

bcancer_label_ref['prediction'] = model.predict(bcancer_label_ref[bcancer_data.feature_names.tolist()])
bcancer_label_cur['prediction'] = model.predict(bcancer_label_cur[bcancer_data.feature_names.tolist()])

In [39]:
#Dataset for Multiclass Classifcation
iris_data = datasets.load_iris(as_frame='auto')
iris = iris_data.frame

iris_ref = iris.sample(n=75, replace=False)
iris_cur = iris.sample(n=75, replace=False)

model = ensemble.RandomForestClassifier(random_state=1, n_estimators=3)
model.fit(iris_ref[iris_data.feature_names], iris_ref.target)

iris_ref['prediction'] = model.predict(iris_ref[iris_data.feature_names])
iris_cur['prediction'] = model.predict(iris_cur[iris_data.feature_names])

In [49]:
data_drift_dataset_tests = TestSuite(tests=[
    TestNumberOfEmptyRows(),
    TestNumberOfEmptyColumns(),
    TestNumberOfDuplicatedRows(),
    TestNumberOfDuplicatedColumns(),
    TestNumberOfDriftedColumns(),
    TestShareOfDriftedColumns(),
])

data_drift_dataset_tests.run(reference_data=adult_ref, current_data=adult_cur)

In [50]:
#test preset as a python object
res = data_drift_dataset_tests.as_dict()

In [72]:
res['tests']

[{'name': 'Number of Empty Rows',
  'description': 'Number of Empty Rows is 0. The test threshold is eq=0 ± 1e-12.',
  'status': 'SUCCESS',
  'group': 'data_integrity',
  'parameters': {}},
 {'name': 'Number of Empty Columns',
  'description': 'Number of Empty Columns is 0. The test threshold is lte=0.',
  'status': 'SUCCESS',
  'group': 'data_integrity',
  'parameters': {}},
 {'name': 'Number of Duplicate Rows',
  'description': 'The number of duplicate rows is 38. The test threshold is eq=27 ± 2.7.',
  'status': 'FAIL',
  'group': 'data_integrity',
  'parameters': {'condition': {'eq': 26.955634051571884 ± 2.6955634051571886},
   'number_of_duplicated_rows': 38}},
 {'name': 'Number of Duplicate Columns',
  'description': 'The number of duplicate columns is 0. The test threshold is lte=0.',
  'status': 'SUCCESS',
  'group': 'data_integrity',
  'parameters': {'condition': {'lte': 0}, 'number_of_duplicated_columns': 0}},
 {'name': 'Number of Drifted Features',
  'description': 'The drift

In [90]:
is_drift_detected = False
detection_idx = 3



In [91]:
desc

'Drift not detected.'

In [70]:
from collections import Counter


x = Counter({'SUCCESS': 3, 'FAIL': 3})

In [71]:
x.elements()

<itertools.chain at 0x1ba0f7e5060>

In [91]:
def foo():
    return 1, 2, {}

In [92]:
a, b, kwargs = foo()

In [93]:
class Foo:

    def __init__(self, a: int, b: int, weight: int=4, height: int=3):
        self.a = a
        self.b = b
        self.weight = weight
        self.height = height


In [94]:
f= Foo(a, b, **kwargs)

In [95]:
f.a

1

In [96]:
f.weight

4

In [97]:
f.c

AttributeError: 'Foo' object has no attribute 'c'

In [83]:
args