In [1]:
# ========================================================
# 03 - Data Cleaning
# Automatic Data Cleaning & Analysis Agent
# ========================================================

import pandas as pd
import numpy as np


In [2]:
DEFAULT_CLEANING_CONFIG = {
    "missing_threshold_drop_column": 0.6,   # si > 60% NaN -> drop colonne
    "impute_numeric": "median",            # "mean" ou "median"
    "impute_categorical": "mode",          # "mode" ou "constant"
    "impute_categorical_constant": "Unknown",
    "remove_duplicates": True,
    "outlier_method": "iqr",               # pour l'instant: "iqr" seulement
    "outlier_iqr_multiplier": 1.5,
    "remove_outliers": False               # False = seulement les signaler
}


In [3]:
def merge_config(user_config: dict | None, default: dict = DEFAULT_CLEANING_CONFIG) -> dict:
    """Merge user provided config with default config."""
    if user_config is None:
        return default.copy()
    merged = default.copy()
    merged.update(user_config)
    return merged


In [4]:
def drop_high_missing_columns(df: pd.DataFrame, threshold: float):
    """
    Drop columns with fraction of missing values > threshold.
    Returns: df_reduced, dropped_columns (list)
    """
    missing_fraction = df.isna().mean()
    cols_to_drop = missing_fraction[missing_fraction > threshold].index.tolist()
    df_reduced = df.drop(columns=cols_to_drop) if cols_to_drop else df.copy()
    return df_reduced, cols_to_drop


In [5]:
def impute_missing_values(df: pd.DataFrame, config: dict):
    """
    Impute missing values according to config.
    Returns: df_imputed, imputation_report (dict)
    """
    df_imputed = df.copy()
    imputation_report = {}

    numeric_cols = df_imputed.select_dtypes(include=np.number).columns.tolist()
    categorical_cols = df_imputed.select_dtypes(exclude=np.number).columns.tolist()

    # Numeric imputation
    for col in numeric_cols:
        n_missing = df_imputed[col].isna().sum()
        if n_missing == 0:
            continue

        strategy = config["impute_numeric"]
        if strategy == "median":
            value = df_imputed[col].median()
        elif strategy == "mean":
            value = df_imputed[col].mean()
        else:
            raise ValueError(f"Unknown numeric imputation strategy: {strategy}")

        df_imputed[col].fillna(value, inplace=True)
        imputation_report[col] = {
            "type": "numeric",
            "strategy": strategy,
            "value_used": float(value),
            "n_imputed": int(n_missing),
        }

    # Categorical imputation
    for col in categorical_cols:
        n_missing = df_imputed[col].isna().sum()
        if n_missing == 0:
            continue

        strategy = config["impute_categorical"]
        if strategy == "mode":
            if df_imputed[col].dropna().empty:
                value = config["impute_categorical_constant"]
            else:
                value = df_imputed[col].mode(dropna=True)[0]
        elif strategy == "constant":
            value = config["impute_categorical_constant"]
        else:
            raise ValueError(f"Unknown categorical imputation strategy: {strategy}")

        df_imputed[col].fillna(value, inplace=True)
        imputation_report[col] = {
            "type": "categorical",
            "strategy": strategy,
            "value_used": str(value),
            "n_imputed": int(n_missing),
        }

    return df_imputed, imputation_report


In [6]:
def impute_missing_values(df: pd.DataFrame, config: dict):
    """
    Impute missing values according to config.
    Returns: df_imputed, imputation_report (dict)
    """
    df_imputed = df.copy()
    imputation_report = {}

    numeric_cols = df_imputed.select_dtypes(include=np.number).columns.tolist()
    categorical_cols = df_imputed.select_dtypes(exclude=np.number).columns.tolist()

    # Numeric imputation
    for col in numeric_cols:
        n_missing = df_imputed[col].isna().sum()
        if n_missing == 0:
            continue

        strategy = config["impute_numeric"]
        if strategy == "median":
            value = df_imputed[col].median()
        elif strategy == "mean":
            value = df_imputed[col].mean()
        else:
            raise ValueError(f"Unknown numeric imputation strategy: {strategy}")

        df_imputed[col].fillna(value, inplace=True)
        imputation_report[col] = {
            "type": "numeric",
            "strategy": strategy,
            "value_used": float(value),
            "n_imputed": int(n_missing),
        }

    # Categorical imputation
    for col in categorical_cols:
        n_missing = df_imputed[col].isna().sum()
        if n_missing == 0:
            continue

        strategy = config["impute_categorical"]
        if strategy == "mode":
            if df_imputed[col].dropna().empty:
                value = config["impute_categorical_constant"]
            else:
                value = df_imputed[col].mode(dropna=True)[0]
        elif strategy == "constant":
            value = config["impute_categorical_constant"]
        else:
            raise ValueError(f"Unknown categorical imputation strategy: {strategy}")

        df_imputed[col].fillna(value, inplace=True)
        imputation_report[col] = {
            "type": "categorical",
            "strategy": strategy,
            "value_used": str(value),
            "n_imputed": int(n_missing),
        }

    return df_imputed, imputation_report


In [7]:
def handle_duplicates(df: pd.DataFrame, remove: bool = True):
    """
    Detect (and optionally remove) duplicate rows.
    Returns: df_dedup, n_duplicates
    """
    duplicated_mask = df.duplicated()
    n_duplicates = int(duplicated_mask.sum())

    if remove and n_duplicates > 0:
        df_dedup = df[~duplicated_mask].copy()
    else:
        df_dedup = df.copy()

    return df_dedup, n_duplicates


In [8]:
def detect_outliers_iqr(df: pd.DataFrame, multiplier: float = 1.5):
    """
    Detect outliers per numeric column using IQR rule.
    Returns:
        outlier_indices_per_column: dict[col -> list of indices]
    """
    outlier_indices = {}
    numeric_df = df.select_dtypes(include=np.number)

    for col in numeric_df.columns:
        series = numeric_df[col].dropna()
        if series.empty:
            continue

        q1 = series.quantile(0.25)
        q3 = series.quantile(0.75)
        iqr = q3 - q1
        lower = q1 - multiplier * iqr
        upper = q3 + multiplier * iqr

        mask = (numeric_df[col] < lower) | (numeric_df[col] > upper)
        idx = numeric_df[mask].index.tolist()

        if idx:
            outlier_indices[col] = {
                "lower_bound": float(lower),
                "upper_bound": float(upper),
                "indices": idx,
                "n_outliers": len(idx),
            }

    return outlier_indices


In [9]:
def clean_dataset(df: pd.DataFrame, user_config: dict | None = None):
    """
    Full cleaning pipeline:
    - Drop columns with too many missing values
    - Impute remaining missing values
    - Remove duplicate rows
    - Detect (and optionally remove) outliers
    Returns: df_clean, cleaning_report (dict)
    """
    config = merge_config(user_config, DEFAULT_CLEANING_CONFIG)
    report = {
        "dropped_columns": [],
        "imputations": {},
        "duplicates": {
            "n_duplicates_before": 0,
            "n_removed": 0
        },
        "outliers": {
            "method": config["outlier_method"],
            "details": {},
            "n_rows_removed": 0
        },
        "shape_before": df.shape,
        "shape_after": None
    }

    # 1) Drop high-missing columns
    df_step, cols_dropped = drop_high_missing_columns(
        df,
        threshold=config["missing_threshold_drop_column"]
    )
    report["dropped_columns"] = cols_dropped

    # 2) Impute missing values
    df_step, imputation_report = impute_missing_values(df_step, config)
    report["imputations"] = imputation_report

    # 3) Handle duplicates
    n_duplicates_before = int(df_step.duplicated().sum())
    df_step, n_removed_dup = handle_duplicates(
        df_step,
        remove=config["remove_duplicates"]
    )
    report["duplicates"]["n_duplicates_before"] = n_duplicates_before
    report["duplicates"]["n_removed"] = n_removed_dup

    # 4) Detect outliers
    outlier_details = {}
    if config["outlier_method"] == "iqr":
        outlier_details = detect_outliers_iqr(
            df_step,
            multiplier=config["outlier_iqr_multiplier"]
        )
    else:
        raise ValueError(f"Unknown outlier method: {config['outlier_method']}")

    report["outliers"]["details"] = outlier_details

    # Optionally remove outliers
    if config["remove_outliers"] and outlier_details:
        # build a mask of all outlier indices
        all_indices = set()
        for col, info in outlier_details.items():
            all_indices.update(info["indices"])
        n_rows_outliers = len(all_indices)
        report["outliers"]["n_rows_removed"] = n_rows_outliers

        df_step = df_step.drop(index=list(all_indices))

    # Final shape
    report["shape_after"] = df_step.shape

    return df_step, report


In [10]:
from pprint import pprint

def display_cleaning_report(report: dict):
    print("=== Cleaning Report ===")
    print(f"Shape before: {report['shape_before']}")
    print(f"Shape after : {report['shape_after']}")
    print("\nDropped columns:", report["dropped_columns"])

    print("\nImputations:")
    if report["imputations"]:
        for col, info in report["imputations"].items():
            print(f"  - {col}: {info}")
    else:
        print("  None")

    print("\nDuplicates:")
    print(f"  Duplicates before: {report['duplicates']['n_duplicates_before']}")
    print(f"  Duplicates removed: {report['duplicates']['n_removed']}")

    print("\nOutliers:")
    print(f"  Method: {report['outliers']['method']}")
    print(f"  Rows removed (if configured): {report['outliers']['n_rows_removed']}")
    print("  Details per column:")
    if report["outliers"]["details"]:
        for col, info in report["outliers"]["details"].items():
            print(f"    - {col}: {info['n_outliers']} outliers")
    else:
        print("    None")


In [11]:
# Test cleaning on messy dataset
path_messy = "../data/test_messy.csv"
df_messy = pd.read_csv(path_messy)

df_messy_clean, messy_report = clean_dataset(df_messy)

display_cleaning_report(messy_report)
df_messy_clean.head()


The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  df_imputed[col].fillna(value, inplace=True)
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values always behaves as a copy.

For example, when doing 'df[col].method(value, inplace=True)', try using 'df.method({col: value}, inplace=True)' or df[col] = df[col].method(value) instead, to perform the operation inplace on the original object.


  df_imputed[col].fillna(value, inplace=True)
The behavior will change in pandas 3.0. This inplace method will never work because the intermediate object on which we are setting values alway

=== Cleaning Report ===
Shape before: (20, 4)
Shape after : (20, 4)

Dropped columns: []

Imputations:
  - age: {'type': 'numeric', 'strategy': 'median', 'value_used': 34.0, 'n_imputed': 5}
  - income: {'type': 'numeric', 'strategy': 'median', 'value_used': 4050.0, 'n_imputed': 6}
  - city: {'type': 'categorical', 'strategy': 'mode', 'value_used': 'Paris', 'n_imputed': 6}

Duplicates:
  Duplicates before: 0
  Duplicates removed: 0

Outliers:
  Method: iqr
  Rows removed (if configured): 0
  Details per column:
    - age: 1 outliers
    - income: 2 outliers


Unnamed: 0,id,age,income,city
0,1,25.0,3000.0,Paris
1,2,26.0,3200.0,Lyon
2,3,27.0,4050.0,Paris
3,4,34.0,4000.0,Paris
4,5,29.0,3500.0,Marseille


In [12]:
path_outliers = "../data/test_outliers_duplicates.csv"
df_outliers = pd.read_csv(path_outliers)

# Exemple: ici on d√©cide de supprimer les outliers
config_remove_outliers = {
    "remove_outliers": True
}

df_outliers_clean, outliers_report = clean_dataset(df_outliers, config_remove_outliers)

display_cleaning_report(outliers_report)
df_outliers_clean.head()


=== Cleaning Report ===
Shape before: (22, 3)
Shape after : (19, 3)

Dropped columns: []

Imputations:
  None

Duplicates:
  Duplicates before: 0
  Duplicates removed: 0

Outliers:
  Method: iqr
  Rows removed (if configured): 3
  Details per column:
    - temperature: 3 outliers
    - humidity: 2 outliers


Unnamed: 0,id,temperature,humidity
0,1,21.779629,49.387901
1,2,23.639454,52.927804
2,3,23.618778,42.930079
3,4,21.434908,64.801538
4,5,23.133212,62.89197
