In [49]:
import numpy as np
from sklearn.datasets import make_classification
from sklearn.pipeline import Pipeline
from sklearn.tree import DecisionTreeClassifier

from frouros.transformations import NumericalDetectors
from frouros.unsupervised.statistical_test import CVMTest, KSTest

## Unsupervised - Pipeline with multiple detectors in parallel

Frouros provides some transformations that allow to use multiple unsupervised detectors in parallel. The following example shows the use of one of these transformations with two statistical tests and a synthetic dataset composed by 3 informative features and 2 non-informative/useless features for the model.

In [58]:
np.random.seed(seed=31)

X, y = make_classification(n_samples=10000,
                           n_features=5,
                           n_informative=3,
                           n_redundant=0,
                           n_repeated=0,
                           n_classes=2,
                           scale=[10, 0.1, 5, 15, 1],
                           shuffle=False,  # False because it also shuffles features order (we dont want features to be shuffled)
                           random_state=31,)

Random shuffle the data rows and split data in train (70%) and test (30%).

In [61]:
idxs = np.arange(X.shape[0])
np.random.shuffle(idxs)
X, y = X[idxs], y[idxs]

idx_split = int(X.shape[0] * 0.7)
X_train, y_train, X_test, y_test = X[:idx_split], y[:idx_split], X[idx_split:], y[idx_split:]

The significance level will be $\alpha = 0.01$.

In [60]:
alpha = 0.01

Two statistical methods are used in this case by defining them in a list that the NumericDetectors object receives.

In [54]:
detectors = [
    CVMTest(),
    KSTest(),
]

pipeline = Pipeline([
    ("detectors", NumericalDetectors(detectors=detectors, n_jobs=-1)),
    ("model", DecisionTreeClassifier(random_state=31))
])

pipeline.fit(X=X_train,
             y=y_train)

Finally, for each method included in the list defined above, it is checked whether data drift exists or not.

In [56]:
y_pred = pipeline.predict(X=X_test)

detectors_results = [
    (type(detector).__name__, detector.test)
    for detector in pipeline["detectors"].detectors
]

for i in range(X_test.shape[1]):
    print(f"\nFeature {i + 1}:")
    for detector, result in detectors_results:
        p_value = result[i].p_value
        print(f"{detector} - p-value: {round(p_value, 4)}. {'Data drift detected' if p_value < alpha else 'No data drift detected'}")


Feature 1:
CVMTest - p-value: 0.2978. No data drift detected
KSTest - p-value: 0.1606. No data drift detected

Feature 2:
CVMTest - p-value: 0.6225. No data drift detected
KSTest - p-value: 0.5984. No data drift detected

Feature 3:
CVMTest - p-value: 0.1022. No data drift detected
KSTest - p-value: 0.0637. No data drift detected

Feature 4:
CVMTest - p-value: 0.4774. No data drift detected
KSTest - p-value: 0.2359. No data drift detected

Feature 5:
CVMTest - p-value: 0.7636. No data drift detected
KSTest - p-value: 0.8064. No data drift detected
