In [8]:
import numpy as np
from scipy.io import arff
from sklearn.linear_model import LogisticRegression
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler

In [9]:
from frouros.detectors.concept_drift import DDM, DDMConfig

 Unlike synthetic datasets, in real datasets is not possible to know for sure if and when drift occurs.

In [10]:
# Load the .arff file
data = arff.loadarff('/Users/hirushau/Code/Model-Monitoring/datasets/concept_drift/powersupply.arff.txt')

# Convert the data to a numpy array
data_array = np.array(data[0])

# Convert the structured array to a regular array
X = np.array([list(i) for i in data_array[['attribute0', 'attribute1']]])
y = np.array([i.decode('utf-8') for i in data_array['class']])

# Split the data into a reference set and a test set
split_idx = 20000
X_ref, y_ref, X_test, y_test = X[:split_idx], y[:split_idx], X[split_idx:], y[split_idx:]

In [11]:
pipeline = Pipeline([("scaler", StandardScaler()), ("model", LogisticRegression())])
pipeline.fit(X=X_ref, y=y_ref)

In [12]:
# Detector configuration class
config = DDMConfig(warning_level=2.0,
                   drift_level=3.0,
                   min_num_instances=2000,)
detector = DDM(config=config)

A simulation of stream samples is performed using the test dataset until drift is detected. In each iteration the model performs a prediction that is compared with the ground-truth, resulting in an error value. This error value is used to update the detector. In order to check if drift is occurring, a status attribute can be assumed.

In [13]:
for i, (X, y) in enumerate(zip(X_test, y_test)):
    y_pred = pipeline.predict(X.reshape(1, -1))
    error = 1 - (y_pred.item() == y.item())
    detector.update(value=error)
    status = detector.status
    if status["drift"]:
        print(f"Drift detected at index {i}")
        break

Drift detected at index 2967
