In [1]:
#
# ! Select evidently_no_docker virtual env 
# conda install numpy pandas evidently  -c conda-forge -y just ins case
# 

# In demo mode it generates biased production dataset 
k_DEMO_MODE = True

# In debug mode it limit the umber of samples to be used to generate the report to speed up the process
k_DEBUG_MODE = True


# !!!!!!!!!!!!!!!!!
import warnings
warnings.filterwarnings("ignore", category=RuntimeWarning)



In [2]:
# -----------------------------------------------------------------------------
# prelude


import random
import numpy as np
import pandas as pd
from pathlib import Path
from datetime import datetime
pd.options.display.max_columns = None
# pd.reset_option('display.max_columns')

# import evidently
# print(evidently.__version__)

from evidently.report import Report
from evidently.test_suite import TestSuite
from evidently.metric_preset import DataDriftPreset
from evidently.test_preset import DataStabilityTestPreset
from evidently.pipeline.column_mapping import ColumnMapping




# -----------------------------------------------------------------------------
k_Number_of_samples     = 512  # ! Must be an even number >= 62

# k_Current_dir   = Path.cwd()
# print(k_Current_dir)
k_AssetsDir             = Path("../../04_data")
k_Local_Fraud_Test_csv  = "fraud_test.csv"
k_URL_Fraud_Test_csv    = "https://lead-program-assets.s3.eu-west-3.amazonaws.com/M05-Projects/fraudTest.csv"

k_Reference_Dataset_Dataset     = "reference_sample.csv"
k_Production_Dataset_Dataset    = 'production_sample.csv'
k_Production_Drifted_Dataset    = 'production_drifted_sample.csv'




In [3]:
# -----------------------------------------------------------------------------
df = pd.read_csv(k_AssetsDir/k_Local_Fraud_Test_csv)
df.rename(columns={df.columns[0]: 'id'}, inplace=True)


# Alternative (AWS S3 bucket)
# df = pd.read_csv(k_URL_Fraud_Test_csv)

df.columns = df.columns.str.lower()
df.head(3)

Unnamed: 0,id,trans_date_trans_time,cc_num,merchant,category,amt,first,last,gender,street,city,state,zip,lat,long,city_pop,job,dob,trans_num,unix_time,merch_lat,merch_long,is_fraud
0,0,2020-06-21 12:14:25,2291163933867244,fraud_Kirlin and Sons,personal_care,2.86,Jeff,Elliott,M,351 Darlene Green,Columbia,SC,29209,33.9659,-80.9355,333497,Mechanical engineer,1968-03-19,2da90c7d74bd46a0caf3777415b3ebd3,1371816865,33.986391,-81.200714,0
1,1,2020-06-21 12:14:33,3573030041201292,fraud_Sporer-Keebler,personal_care,29.84,Joanne,Williams,F,3638 Marsh Union,Altonah,UT,84002,40.3207,-110.436,302,"Sales professional, IT",1990-01-17,324cc204407e99f51b0d6ca0055005e7,1371816873,39.450498,-109.960431,0
2,2,2020-06-21 12:14:53,3598215285024754,"fraud_Swaniawski, Nitzsche and Welch",health_fitness,41.28,Ashley,Lopez,F,9333 Valentine Point,Bellmore,NY,11710,40.6729,-73.5365,34496,"Librarian, public",1970-10-21,c81755dbbbea9d5c77f094348a7579be,1371816893,40.49581,-74.196111,0


In [4]:
def generate_datasets() -> None:
    """
    Create two random samples if there are sufficient rows in the dataset.
    """
    assert k_Number_of_samples%2==0 and k_Number_of_samples>=60
    
    # Define the data directory and file paths
    # data_dir = Path('../../data')
    fraud_test_file = Path(k_AssetsDir)/k_Local_Fraud_Test_csv
    
    # Check if the input file exists
    if not fraud_test_file.exists():
        print(f"File not found: {fraud_test_file}")
        return

    # Load the dataset
    df = pd.read_csv(fraud_test_file)

    # Determine the number of rows
    num_rows: int = len(df)
    print(f"Number of rows in the dataset: {num_rows}")

    # Stop if there are less than 2000 rows
    # Indeed I want 1_000 in one set and 1_000 others rows in the other set
    if num_rows < k_Number_of_samples:
        print("Not enough rows in the dataset. Exiting.")
        return

    # Take 1_000 random rows for the reference sample
    reference_indices = random.sample(range(num_rows), int(k_Number_of_samples/2))
    reference_df = df.iloc[reference_indices]

    # Take another 1_000 random rows for the production sample (different from the first set)
    remaining_indices = list(set(range(num_rows)) - set(reference_indices))
    production_indices = random.sample(remaining_indices, int(k_Number_of_samples/2))
    production_df = df.iloc[production_indices]

    # Save the samples to their respective files
    reference_sample_file = Path(k_AssetsDir) / k_Reference_Dataset_Dataset
    production_sample_file = Path(k_AssetsDir) / k_Production_Dataset_Dataset 
    reference_df.to_csv(reference_sample_file, index=False)
    production_df.to_csv(production_sample_file, index=False)

    print(f"Saved reference sample to: {reference_sample_file}")
    print(f"Saved production sample to: {production_sample_file}")

generate_datasets()


Number of rows in the dataset: 555719
Saved reference sample to: ..\..\04_data\reference_sample.csv
Saved production sample to: ..\..\04_data\production_sample.csv


In [5]:
reference_df = pd.read_csv(Path(k_AssetsDir) / k_Reference_Dataset_Dataset)
reference_df.rename(columns={reference_df.columns[0]: 'id'}, inplace=True)
reference_df["trans_date_trans_time"] = pd.to_datetime(reference_df["trans_date_trans_time"], format='%Y-%m-%d %H:%M:%S')
reference_df["dob"] = pd.to_datetime(reference_df["dob"], format='%Y-%m-%d')
reference_df["is_fraud"] = reference_df["is_fraud"].astype(bool)


production_df = pd.read_csv(Path(k_AssetsDir) / k_Production_Dataset_Dataset)
production_df.rename(columns={production_df.columns[0]: 'id'}, inplace=True)
production_df["trans_date_trans_time"] = pd.to_datetime(production_df["trans_date_trans_time"], format='%Y-%m-%d %H:%M:%S')
production_df["dob"] = pd.to_datetime(production_df["dob"], format='%Y-%m-%d')
production_df["is_fraud"] = production_df["is_fraud"].astype(bool)


# print(reference_df.dtypes)
# display(reference_df.head(5))
# Afficher les 5 premières lignes du DataFrame filtré 
# Histoire de se rassurer
# filtered_df = df[df['is_fraud'] == 1] 
# filtered_df.head(5)

In [6]:
# reference_df.describe(include='all')


In [7]:
# print("Summary statistics for 'amt' in reference data:")
# print(reference_df['amt'].describe())
# print("\nSummary statistics for 'amt' in production drifted data:")
# print(production_df['amt'].describe())

In [8]:
# Montants (amt) : 30 % des transactions ont été augmentées de 50 %.
# Latitudes et longitudes (merch_lat, merch_long) : 20 % des transactions ont subi un décalage.
# Catégories (category) : 30 % des transactions ont vu leur catégorie changée en faveur d'un biais.
def bias_production_dataset(production_df):

    # Créer une copie des données pour introduire un drift
    production_drifted_df = production_df.copy()

    # np.random.choice([1, 2.0], size=len(production_df), p=[0.7, 0.3]) :
    # Génère un tableau de taille production_drifted_df['amt'] 
    # Chaque élément vaut 1 ou 2 
    # Probabilités associées :
    # 70 % des valeurs générées seront 1.
    # 30 % des valeurs générées seront 2 
    # Du coup 30% des valeurs auront été mult par 2 => Normalement ca doit drifter !!!
    production_drifted_df['amt'] = production_drifted_df['amt'] * np.random.choice([1.0, 2.0], size=len(production_df), p=[0.2, 0.8]) # p=[0.7, 0.3]

    # Drift 2: Modifier les latitudes et longitudes ('merch_lat', 'merch_long')
    # Introduire un décalage artificiel pour certaines transactions
    # ! INUTILE car les transaction sont sur tout le pays
    # production_drifted_df['merch_lat'] += np.random.choice([0, 0.1], size=len(production_df), p=[0.6, 0.4])
    # production_drifted_df['merch_long'] += np.random.choice([0, -0.1], size=len(production_df), p=[0.6, 0.4])

    # Drift 3: Modifier la répartition des catégories ('category')
    # Introduire un biais pour favoriser certaines catégories
    categories = production_drifted_df['category'].unique()
    biased_categories = np.random.choice(categories, size=len(production_df), p=[0.5] + [0.5 / (len(categories) - 1)] * (len(categories) - 1))

    # Modifier les catégories avec une probabilité de 30%
    production_drifted_df['category'] = np.where(
        np.random.rand(len(production_df)) < 0.3,  # 30% des lignes seront modifiées
        biased_categories,  # Nouvelle catégorie biaisée
        production_drifted_df['category']  # Catégorie actuelle
    )

    # Enregistrer le dataset drifté pour les tests
    production_drifted_df.to_csv(Path(k_AssetsDir)/k_Production_Drifted_Dataset, index=False)



    # print(reference_df.dtypes)
    # display(reference_df.head(5))
    # production_drifted_df.describe(include='all')
    return (production_drifted_df)

if(k_DEMO_MODE):
    production_df = bias_production_dataset(production_df)


In [9]:
# print("Summary statistics for 'amt' in reference data:")
# print(reference_df['amt'].describe())
# print("\nSummary statistics for 'amt' in production drifted data:")
# print(production_drifted_df['amt'].describe())

In [10]:
# Filtrer uniquement les colonnes numériques dans les DataFrames
# Car le modèle n'utilise que les données numériques et qu'en plus en mode demo
# on introduit du biais que sur certaines données numériques

reference_numeric_df  = reference_df.select_dtypes(include="number")
production_numeric_df = production_df.select_dtypes(include="number")
# production_drifted_numeric_df = production_drifted_df.select_dtypes(include="number")

numeric_columns = reference_df.select_dtypes(include="number").columns
object_columns = reference_df.select_dtypes(include="object").columns


column_mapping = ColumnMapping(
    numerical_features=numeric_columns,
    # categorical_features=["gender", "category"],
    datetime_features=["trans_date_trans_time", "dob"],
    target="is_fraud",
    id="id"
)


# Run the report and analyze drift
data_drift_report = Report(metrics=[DataDriftPreset()])








In [11]:
data_drift_report.run(reference_data=reference_numeric_df, current_data=production_numeric_df)

# Extract and display drifted features
results = data_drift_report.as_dict()
drift_by_columns = results['metrics'][1]['result']['drift_by_columns']

# Display features with drift
drifted_features = [
    col for col, details in drift_by_columns.items()
    # Si la clé 'drift_detected' existe dans le dictionnaire, sa valeur sera retournée sinon on retourne False
    if details.get('drift_detected', False)  
]
print("Features with drift:", drifted_features)



Features with drift: ['amt']


In [12]:
if False:
    # Run the report and analyze drift
    data_drift_report.run(reference_data=reference_numeric_df, current_data=production_drifted_numeric_df)

    # Extract and display drifted features
    results = data_drift_report.as_dict()
    drift_by_columns = results['metrics'][1]['result']['drift_by_columns']

    # Display features with drift
    drifted_features = [
        col for col, details in drift_by_columns.items()
        # Si la clé 'drift_detected' existe dans le dictionnaire, sa valeur sera retournée sinon on retourne False
        if details.get('drift_detected', False)  
    ]
    print("Features with drift:", drifted_features)



In [14]:
# column_mapping=None => Inférence automatique
data_drift_report.run(reference_data=reference_numeric_df, current_data=production_numeric_df, column_mapping=None)
data_drift_report.show("inline")

timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
output_file = f"data_drift_report_{timestamp}.html"
data_drift_report.save_html(output_file)



In [None]:
if False:
    data_stability= TestSuite(tests=[
        DataStabilityTestPreset(), # ici on fait 1 seul test de stabilité
    ])

    # column_mapping=None => Inférence automatique
    data_stability.run(reference_data=reference_numeric_df, current_data=production_numeric_df, column_mapping=None)         
    data_stability.show("inline")

    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    output_file = f"data_stability_report_{timestamp}.html"
    data_drift_report.save_html(output_file)

