<a href="https://colab.research.google.com/github/AnshSharma16/ML-Engineer-Roadmap/blob/main/data_drift.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import pandas as pd

# Sample sales data
data = {
    "date": [
        "2024-01-01",
        "2024-01-01",
        "2024-01-02",
        "2024-01-03",
        "2024-01-03"
    ],
    "product": ["A", "B", "A", "C", "A"],
    "quantity": [2, 1, 1, 3, 2],
    "price": [100, 200, 100, 150, 100]
}

# Create DataFrame
df = pd.DataFrame(data)

# Save to CSV
df.to_csv("sales.csv", index=False)

print("sales.csv file created successfully")
print(df)

sales.csv file created successfully
         date product  quantity  price
0  2024-01-01       A         2    100
1  2024-01-01       B         1    200
2  2024-01-02       A         1    100
3  2024-01-03       C         3    150
4  2024-01-03       A         2    100


In [2]:
df = pd.DataFrame(data)
df["date"] = pd.to_datetime(df["date"])
df

Unnamed: 0,date,product,quantity,price
0,2024-01-01,A,2,100
1,2024-01-01,B,1,200
2,2024-01-02,A,1,100
3,2024-01-03,C,3,150
4,2024-01-03,A,2,100


In [3]:
!pip install pandera

Collecting pandera
  Downloading pandera-0.29.0-py3-none-any.whl.metadata (10 kB)
Collecting typing_inspect>=0.6.0 (from pandera)
  Downloading typing_inspect-0.9.0-py3-none-any.whl.metadata (1.5 kB)
Collecting mypy-extensions>=0.3.0 (from typing_inspect>=0.6.0->pandera)
  Downloading mypy_extensions-1.1.0-py3-none-any.whl.metadata (1.1 kB)
Downloading pandera-0.29.0-py3-none-any.whl (295 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m295.9/295.9 kB[0m [31m3.1 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading typing_inspect-0.9.0-py3-none-any.whl (8.8 kB)
Downloading mypy_extensions-1.1.0-py3-none-any.whl (5.0 kB)
Installing collected packages: mypy-extensions, typing_inspect, pandera
Successfully installed mypy-extensions-1.1.0 pandera-0.29.0 typing_inspect-0.9.0


In [4]:
import numpy as np
import pandera as pa
from pandera import Column, Check
from sklearn.linear_model import LinearRegression
import joblib

schema = pa.DataFrameSchema({
    "date": Column(
        pa.DateTime,
        nullable=False,
        description="Transaction date"
    ),
    "product": Column(
        str,
        nullable=False,
        description="Product ID"
    ),
    "quantity": Column(
        int,
        Check.ge(1),
        description="Quantity must be >= 1"
    ),
    "price": Column(
        int,
        Check.ge(0),
        description="Price must be non-negative"
    )
})

top-level pandera module will be **removed in a future version of pandera**.
If you're using pandera to validate pandas objects, we highly recommend updating
your import:

```
# old import
import pandera as pa

# new import
import pandera.pandas as pa
```

If you're using pandera to validate objects from other compatible libraries
like pyspark or polars, see the supported libraries section of the documentation
for more information on how to import pandera:

https://pandera.readthedocs.io/en/stable/supported_libraries.html


```
```



In [5]:
try:
    schema.validate(df)
    print("✅ Schema validation passed")
except pa.errors.SchemaError as e:
    print("❌ Schema validation failed")
    print(e)


✅ Schema validation passed


In [6]:
def detect_drift(
    baseline_df: pd.DataFrame,
    new_df: pd.DataFrame,
    column: str,
    mean_threshold: float = 0.2,
    std_threshold: float = 0.2
):
    baseline_mean = baseline_df[column].mean()
    baseline_std = baseline_df[column].std()

    new_mean = new_df[column].mean()
    new_std = new_df[column].std()

    mean_shift = abs(new_mean - baseline_mean) / baseline_mean
    std_shift = abs(new_std - baseline_std) / baseline_std

    drift_detected = (
        mean_shift > mean_threshold or
        std_shift > std_threshold
    )

    return {
        "column": column,
        "baseline_mean": baseline_mean,
        "new_mean": new_mean,
        "mean_shift": round(mean_shift, 3),
        "baseline_std": baseline_std,
        "new_std": new_std,
        "std_shift": round(std_shift, 3),
        "drift_detected": drift_detected
    }


In [7]:
# Baseline = first 2 days
baseline_df = df[df["date"] <= "2024-01-02"]

# New data = later day (simulating production)
new_df = df[df["date"] > "2024-01-02"]

baseline_df



Unnamed: 0,date,product,quantity,price
0,2024-01-01,A,2,100
1,2024-01-01,B,1,200
2,2024-01-02,A,1,100


In [8]:
new_df

Unnamed: 0,date,product,quantity,price
3,2024-01-03,C,3,150
4,2024-01-03,A,2,100


In [9]:
drift_report = detect_drift(
    baseline_df=baseline_df,
    new_df=new_df,
    column="price",
    mean_threshold=0.2,
    std_threshold=0.2
)

drift_report


{'column': 'price',
 'baseline_mean': np.float64(133.33333333333334),
 'new_mean': np.float64(125.0),
 'mean_shift': np.float64(0.063),
 'baseline_std': 57.735026918962575,
 'new_std': 35.35533905932738,
 'std_shift': 0.388,
 'drift_detected': True}

In [10]:
# Simulate price inflation (drift)
new_df_drifted = new_df.copy()
new_df_drifted["price"] = new_df_drifted["price"] * 2

detect_drift(
    baseline_df=baseline_df,
    new_df=new_df_drifted,
    column="price"
)


{'column': 'price',
 'baseline_mean': np.float64(133.33333333333334),
 'new_mean': np.float64(250.0),
 'mean_shift': np.float64(0.875),
 'baseline_std': 57.735026918962575,
 'new_std': 70.71067811865476,
 'std_shift': 0.225,
 'drift_detected': np.True_}