In [None]:
#| hide
# to set up auto reaload whenever changes are made in other modules
%load_ext autoreload
%autoreload 2

## Data Drift Function
> This function helps to detect whether there is any drift in the data provided.

In [None]:
#| default_exp Data_Drift_Function

In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| export
import json
import pandas as pd
import numpy as np
import requests
import zipfile
from io import BytesIO

import plotly.offline as py 
import plotly.graph_objs as go

from evidently.pipeline.column_mapping import ColumnMapping
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset

import mlflow
import mlflow.sklearn
from mlflow.tracking import MlflowClient

from datetime import datetime, timedelta

from evidently import ColumnMapping
from evidently.test_suite import TestSuite
from evidently.dashboard import Dashboard
from evidently.test_suite import TestSuite
from evidently.tests.base_test import generate_column_tests
from evidently.test_preset import DataStabilityTestPreset, NoTargetPerformanceTestPreset
from evidently.tests import *
from evidently.test_preset import NoTargetPerformanceTestPreset
from evidently.test_preset import DataQualityTestPreset
from evidently.test_preset import DataStabilityTestPreset
from evidently.test_preset import DataDriftTestPreset
from evidently.test_preset import RegressionTestPreset
from evidently.test_preset import MulticlassClassificationTestPreset
from evidently.test_preset import BinaryClassificationTopKTestPreset
from evidently.test_preset import BinaryClassificationTestPreset
from datetime import datetime, timedelta
from pandas.api.types import is_datetime64_any_dtype as is_datetime
from dateutil import parser

import warnings
warnings.filterwarnings('ignore')
warnings.simplefilter('ignore')


analyzers are deprecated, use metrics instead


dashboards are deprecated, use metrics instead



In [None]:
#| export
def data_drift_test_selection(user_input, ref_data, cur_data):
    """
    Selects the appropriate data drift test suite based on user input and runs the tests.

    Parameters:
        user_input (int): User input to select the type of data drift test suite.
        ref_data (pd.DataFrame): Reference dataset.
        cur_data (pd.DataFrame): Current dataset.

    Returns:
        TestSuite: The test suite containing the data drift tests.
    """
    if (user_input == 1):
        def basic_preset_tests_fun(user_input):
            tests = TestSuite(tests=[
                TestNumberOfColumnsWithMissingValues(),
                TestNumberOfRowsWithMissingValues(),
                TestNumberOfConstantColumns(),
                TestNumberOfDuplicatedRows(),
                TestNumberOfDuplicatedColumns(),
                TestColumnsType(),
                TestNumberOfDriftedColumns(),
            ])
            tests.run(reference_data=ref_data, current_data=cur_data)
            return tests
        
        return basic_preset_tests_fun(user_input)

    if (user_input == 2):
        def data_stability_fun(user_input):
            data_stability = TestSuite(tests=[
                DataStabilityTestPreset(),
            ])
            data_stability.run(reference_data=ref_data, current_data=cur_data)
            return data_stability
        
        return data_stability_fun(user_input)

    elif (user_input == 3):
        def data_quality_fun(user_input):
            data_quality = TestSuite(tests=[
                DataQualityTestPreset(),
            ])
            data_quality.run(reference_data=ref_data, current_data=cur_data)
            return data_quality
        
        return data_quality_fun(user_input)

    elif (user_input == 4):
        def data_drift_fun(user_input):
            data_drift = TestSuite(tests=[
                DataDriftTestPreset(stattest='psi'),
            ])
            data_drift.run(reference_data=ref_data, current_data=cur_data)
            return data_drift
        
        return data_drift_fun(user_input)

    else:
        print("Invalid user input!!!")

In [None]:
#| export
def detect_drift(X, window_size="NULL"):
    """
    Detects data drift in a dataset.

    Parameters:
        X (pd.DataFrame): Input dataset.
        window_size (str or int): Window size for detecting data drift. Default is "NULL".

    Yields:
        Tuple[pd.DataFrame, pd.DataFrame]: Current chunk and next chunk of data.
    """
    drifts = []

    def check_datetime_format(x):
        try:
            parser.parse(str(x), fuzzy=False, default=None)
            return True
        except:
            return False

    if window_size == "NULL":
        # Determine window size based on data type
        if X['Time'].apply(check_datetime_format).all():
            X['Time'] = pd.to_datetime(X['Time'])

            time_diff = pd.Timedelta(hours=24)
            num_entries = []
            for i in range(0, len(X), int(24 * 60 * 60 / time_diff.total_seconds())):
                start_time = X['Time'].iloc[i]
                end_time = start_time + time_diff
                # count number of entries within chunk
                num_entries.append(((X['Time'] >= start_time) & (X['Time'] < end_time)).sum())

            mean_entries = sum(num_entries) / len(num_entries)
            
            window_size = int(mean_entries)
            print("Using mean number of entries per 24-hour window as window size:", window_size)
        else:
            # Use 100 as window size otherwise
            window_size = 100
            print("window size=100")

    else:
        # Incase window size is provided by the user
        window_size = window_size

    num_chunks = int(np.ceil(X.shape[0] / window_size))
    chunk_starts = np.arange(0, len(X), window_size)
    chunk_ends = chunk_starts + window_size
    curr_chunk = X[chunk_starts[0]:chunk_ends[0]]
    next_chunk = X[chunk_starts[1]:chunk_ends[1]]

    yield curr_chunk, next_chunk

    for i in range(num_chunks - 2):
        drift_detected = False

        # Detect drift between curr_chunk and next_chunk
        res = data_drift_test_selection(4, ref_data=curr_chunk, cur_data=next_chunk)
        report_dict = res.as_dict()
        if report_dict['tests'][0]['parameters']['features']['Electricity_load']['data_drift'] == 'Detected':
            drift_detected = True

        if drift_detected:
            # Update curr_chunk and next_chunk for the next iteration
            drifts.append((chunk_ends[i], chunk_starts[i+1]))
            curr_chunk = next_chunk
            if i+2 < num_chunks:
                next_chunk = X[chunk_starts[i+2]:chunk_ends[i+2]]
        else:
            # Only update next_chunk for the next iteration
            drifts.append((chunk_ends[i], chunk_starts[i+1]))
            if i+2 < num_chunks:
                next_chunk = X[chunk_starts[i+2]:chunk_ends[i+2]]

        yield curr_chunk, next_chunk

In [None]:
#| hide
# Dataset
data = pd.read_csv('time_series_data_el.csv')

In [None]:
#| export
def run_data_drift_detection():
    """
    Runs the data drift detection on the dataset.
    """
    res = detect_drift(data)
    print(res)

    for chunks in res:
        for i, (curr_chunk, next_chunk) in enumerate(zip(chunks, chunks[1:])):
            # Add assertions to check for expected results
            assert isinstance(curr_chunk, pd.DataFrame), "Current chunk should be a DataFrame"
            assert isinstance(next_chunk, pd.DataFrame), "Next chunk should be a DataFrame"
            print("Current Chunk:\n", curr_chunk)
            print("Next Chunk:\n", next_chunk)
            print("------------------")

In [None]:
#| export
def main():
    """
    Main function to run the data drift detection.
    """
    run_data_drift_detection()

if __name__ == "__main__":
    main()

<generator object detect_drift>
Using mean number of entries per 24-hour window as window size: 88
Current Chunk:
                   Time  Electricity_load
0  2011-01-01 00:15:00              5.98
1  2011-01-01 00:30:00              8.45
2  2011-01-01 00:45:00              9.93
3  2011-01-01 01:00:00              6.92
4  2011-01-01 01:15:00              5.90
..                 ...               ...
83 2011-01-01 21:00:00             22.13
84 2011-01-01 21:15:00             23.69
85 2011-01-01 21:30:00             23.77
86 2011-01-01 21:45:00             24.64
87 2011-01-01 22:00:00             18.34

[88 rows x 2 columns]
Next Chunk:
                    Time  Electricity_load
88  2011-01-01 22:15:00             22.41
89  2011-01-01 22:30:00             25.48
90  2011-01-01 22:45:00             26.27
91  2011-01-01 23:00:00             28.29
92  2011-01-01 23:15:00             31.66
..                  ...               ...
171 2011-01-02 19:00:00             29.46
172 2011-01-02 19:15:

In [None]:
#| hide
import nbdev; nbdev.nbdev_export()