## 🧪 Analyst Toolkit Tutorial: Full Data Pipeline

This interactive notebook demonstrates the complete analyst pipeline using a synthetic **Palmer Penguins** dataset.

Each step in the pipeline is modular, YAML-configurable, and produces exports, plots, and certification-ready reports.

### 🧰 Toolkit Architecture: 3-Way Modular Design

This pipeline is built around a flexible ETL framework with three usage modes:

- 📓 **Notebook Mode**: Run individual modules or the full pipeline interactively. Ideal for exploration and QA.
- 🧵 **CLI Mode**: Execute the full pipeline using `run_toolkit_pipeline.py`, controlled via a master YAML config.
- 🧪 **Hybrid Mode**: Develop in notebooks, deploy via scripts, reusing the same configs.

In [None]:
# 📁 1. Load Configuration and Set Execution Context

import logging
import os
from pathlib import Path
from analyst_toolkit.m00_utils.config_loader import load_config
from analyst_toolkit.m00_utils.load_data import load_csv
from analyst_toolkit.m01_diagnostics.run_diag_pipeline import run_diag_pipeline
from analyst_toolkit.m02_validation.run_validation_pipeline import run_validation_pipeline
from analyst_toolkit.m03_normalization.run_normalization_pipeline import run_normalization_pipeline
from analyst_toolkit.m04_duplicates.run_dupes_pipeline import run_duplicates_pipeline
from analyst_toolkit.m05_detect_outliers.run_detection_pipeline import run_outlier_detection_pipeline
from analyst_toolkit.m06_outlier_handling.run_handling_pipeline import run_outlier_handling_pipeline
from analyst_toolkit.m07_imputation.run_imputation_pipeline import run_imputation_pipeline
from analyst_toolkit.m10_final_audit.final_audit_pipeline import run_final_audit_pipeline

# --- Find Project Root ---
# This helper function makes the notebook runnable from any subdirectory
# by locating the project root based on a set of marker directories.
def find_project_root(markers=("config", "notebooks", "data")):
    """Searches upward from the current directory for marker directories to find the project root."""
    current_path = Path.cwd().resolve()
    for parent in [current_path, *current_path.parents]:
        if all((parent / marker).is_dir() for marker in markers):
            return parent
    # Fallback to current working directory if no marker is found
    print(f"⚠️ Could not find project root with markers {markers}. Using current directory.")
    return Path.cwd()

PROJECT_ROOT = find_project_root()
print(f"📂 Project Root detected: {PROJECT_ROOT}")

# --- Load Master Config ---
# Path to master config, resolved from the project root for robustness
master_config_path = PROJECT_ROOT / "config" / "run_toolkit_config.yaml"

# Load master configuration dictionary
master_config = load_config(master_config_path)

# Extract run-level settings for use in all modules
run_id = master_config.get("run_id", "default_run")
notebook_mode = master_config.get("notebook", True)

print(f"🔧 Config loaded | Run ID: {run_id} | Notebook Mode: {notebook_mode}")



In [None]:
# 📥 2. Load Raw Data

# Load input path from the master config
input_path = master_config.get("pipeline_entry_path")
if not input_path:
    raise ValueError("❌ 'pipeline_entry_path' not found in master config.")

print(f"📂 Loading data from: {input_path}")
df_raw = load_csv(input_path)

## 📊 M01 — Diagnostics

This module generates a profile of the raw data: shape, types, nulls, skewness, and sample rows. It's the first step in understanding your dataset's structure and quality.

In [None]:
# --- Run Diagnostics Module ---

# Check if module is enabled in the master config
module_settings = master_config.get("modules", {}).get("diagnostics", {})
if not module_settings.get("run", False):
    print("⏩ Skipping Diagnostics module as per master config.")
else:
    # Load module-specific config
    diag_config_path = module_settings.get("config_path", "config/diag_config_template.yaml")
    diag_config = load_config(diag_config_path)
    
    print(f"🚀 Running Diagnostics from '{diag_config_path}'...")
    df_profiled = run_diag_pipeline(
        config=diag_config,
        df=df_raw,
        notebook=notebook_mode,
        run_id=run_id
    )

## 🛡️ M02 — Validation (Audit Mode)

This module audits the dataset against a defined schema to catch issues early and guide cleaning steps:

- **Expected Columns & Dtypes**
- **Allowed Categorical Values**
- **Numeric Range Checks**

In this first pass, `fail_on_error` is `false`, so it reports all issues without halting the pipeline.

In [None]:
# --- Run Validation Module ---

# Check if module is enabled in the master config
module_settings = master_config.get("modules", {}).get("validation", {})
if not module_settings.get("run", False):
    print("⏩ Skipping Validation module as per master config.")
else:
    # Load module-specific config
    config_path = module_settings.get("config_path", "config/validation_config_template.yaml")
    module_config = load_config(config_path)
    
    print(f"🚀 Running Validation from '{config_path}'...")
    # run_validation_pipeline returns (df, results_dict)
    df_valid = run_validation_pipeline(
        config=module_config,
        df=df_profiled,
        notebook=notebook_mode,
        run_id=run_id
    )

## 🧹 M03 — Normalization

This module performs rule-based cleaning and standardization to prepare the dataset for certification:

- **Column Renaming & Type Coercion**
- **Value Mapping & Text Cleaning**
- **Fuzzy Matching & Datetime Parsing**

All rules and output paths are controlled via the YAML config (`normalization_config_template.yaml`).

In [None]:
# --- Run Normalization Module ---

# Check if module is enabled in the master config
module_settings = master_config.get("modules", {}).get("normalization", {})
if not module_settings.get("run", False):
    print("⏩ Skipping Normalization module as per master config.")
else:
    # Load module-specific config
    config_path = module_settings.get("config_path", "config/normalization_config_template.yaml")
    module_config = load_config(config_path)
    
    print(f"🚀 Running Normalization from '{config_path}'...")
    df_norm= run_normalization_pipeline(
        config=module_config,
        df=df_valid,
        notebook=notebook_mode,
        run_id=run_id
    )

## 🛡️ M02 — Certification Gate (Strict Mode)

This step re-uses the **Validation Module (M02)**, but with a stricter configuration to act as a quality gate. It is designed to **halt the pipeline** if violations are found:

- ✅ All column names, data types, categorical values, and numeric ranges must pass
- 🛑 **`fail_on_error: true`** triggers a hard stop on validation failure

This step certifies the cleaned dataset before proceeding to more advanced steps like outlier handling.

In [None]:
# --- Run Certification Gate (Strict Validation) ---

# Check if module is enabled in the master config
module_settings = master_config.get("modules", {}).get("validation_gatekeeper", {})
if not module_settings.get("run", False):
    print("⏩ Skipping Certification Gate as per master config.")
else:
    # Load module-specific config
    config_path = module_settings.get("config_path", "config/certification_config_template.yaml")
    module_config = load_config(config_path)
    
    print(f"🚀 Running Certification Gate from '{config_path}'...")
    df_cert = run_validation_pipeline(
        config=module_config,
        df=df_norm,
        notebook=notebook_mode,
        run_id=run_id
    )

## ♻️ M04 — Deduplication

This module identifies and handles **duplicate rows** in the dataset, using the logic from `m04_duplicates`.

You can choose to:
- 🔍 **Flag duplicates** for review (`mode: "flag"`)
- ✂️ **Remove duplicates** directly (`mode: "remove"`)

The logic is configurable via `dups_config_template.yaml`, allowing you to specify which columns to check for duplication.

In [None]:
# --- Run Duplicates Module ---

# Check if module is enabled in the master config
module_settings = master_config.get("modules", {}).get("duplicates", {})
if not module_settings.get("run", False):
    print("⏩ Skipping Duplicates module as per master config.")
else:
    # Load module-specific config
    config_path = module_settings.get("config_path", "config/dups_config_template.yaml")
    module_config = load_config(config_path)
    
    print(f"🚀 Running Duplicates from '{config_path}'...")
    # The function returns the dataframe and a results dictionary
    df_duped= run_duplicates_pipeline(
        config=module_config,
        df=df_cert,
        notebook=notebook_mode,
        run_id=run_id
    )

## 📏 M05 — Detect Outliers

This module (`m05_detect_outliers`) scans numeric columns for outliers using configurable logic:

- **Z-Score** or **IQR** methods (per column or global default)
- Adds binary flags (e.g., `*_outlier`) to the dataset if `append_flags: true`
- Skips non-numeric or excluded fields via `exclude_columns`

📊 If enabled, an interactive **PlotViewer** renders boxplots, histograms, and violin plots inline, giving a fast visual summary of where anomalies occur.

In [None]:
# --- Detect Outliers ---

# Initialize detection_results to ensure it exists for the next step
detection_results = None

# Check if module is enabled in the master config
module_settings = master_config.get("modules", {}).get("outlier_detection", {})
if not module_settings.get("run", False):
    print("⏩ Skipping Outlier Detection module as per master config.")
elif df is None:
    print("⏩ Skipping Outlier Detection because input dataframe is None.")
else:
    # Load module-specific config
    config_path = module_settings.get("config_path", "config/outlier_config_template.yaml")
    module_config = load_config(config_path)
    
    print(f"🚀 Running Outlier Detection from '{config_path}'...")
    # This function returns (df_with_flags, detection_results_dict)
    df_detect, detection_results = run_outlier_detection_pipeline(
        config=module_config,
        df=df_duped,
        notebook=notebook_mode,
        run_id=run_id
    )

## 🧼 M06 — Handle Outliers

This module (`m06_outlier_handling`) applies cleanup strategies to outliers flagged in the detection step:

- **Strategies**: `clip` (cap to bounds), `median` (impute), `constant` (fill with a fixed value), or `none`.
- **Configuration**: Apply rules globally (`__default__`) or per-column.

This step is purely for remediation and relies on the `detection_results` from the previous module.

In [None]:
# --- Handle Outliers ---

# Check if module is enabled in the master config
module_settings = master_config.get("modules", {}).get("outlier_handling", {})
if not module_settings.get("run", False):
    print("⏩ Skipping Outlier Handling module as per master config.")
elif df is None:
    print("⏩ Skipping Outlier Handling because input dataframe is None.")
elif detection_results is None:
    print("⏩ Skipping Outlier Handling because no detection results were provided from the previous step.")
else:
    # Load module-specific config
    config_path = module_settings.get("config_path", "config/handling_config_template.yaml")
    module_config = load_config(config_path)
    
    print(f"🚀 Running Outlier Handling from '{config_path}'...")
    # This function returns the dataframe with outliers handled
    df_handled= run_outlier_handling_pipeline(
        config=module_config,
        df=df_detect,
        detection_results=detection_results,  # Pass results from M05
        notebook=notebook_mode,
        run_id=run_id
    )

## 🔧 M07 — Impute Missing Values

This module (`m07_imputation`) fills missing (`NaN`) values using a column-specific strategy:

- **Strategies**: `mean`, `median`, `mode`, or `constant`.
- **Configuration**: Apply rules per column via `rules.strategies` in the YAML.

📊 If enabled, comparison plots show how categorical columns changed post-imputation.

In [None]:
# --- Run Imputation Module ---

# Check if module is enabled in the master config
module_settings = master_config.get("modules", {}).get("imputation", {})
if not module_settings.get("run", False):
    print("⏩ Skipping Imputation module as per master config.")
elif df is None:
    print("⏩ Skipping Imputation because input dataframe is None.")
else:
    # Load module-specific config
    config_path = module_settings.get("config_path", "config/imputation_config_template.yaml")
    module_config = load_config(config_path)
    
    print(f"🚀 Running Imputation from '{config_path}'...")
    df_imput = run_imputation_pipeline(
        config=module_config,
        df=df_handled,
        notebook=notebook_mode,
        run_id=run_id
    )

## Alternative: Full Pipeline Runner

For non-interactive runs, or to execute the entire pipeline in one go, you can use the `run_toolkit_pipeline` function from the `analyst_toolkit` library. This is particularly useful for automated scripts where step-by-step inspection is not required.

To use it, you would import `run_toolkit_pipeline` and call it with the path to your master configuration file.

```python
# from analyst_toolkit.run_toolkit_pipeline import run_toolkit_pipeline
#
# This function runs all modules enabled in your 'run_toolkit_config.yaml' sequentially.
# df_final, all_results = run_toolkit_pipeline(config_path=RUN_CONFIG_PATH)
```

> **Note:** This template notebook is designed for step-by-step execution and inspection. Using the full pipeline runner will execute all steps at once and bypass the individual cell outputs in this notebook.

## 🎬 M10 — Final Audit & Certification

This final module (`m10_final_audit`) serves as the ultimate quality gate before exporting the cleaned dataset. It performs a comprehensive audit and applies strict certification checks.

- ✅ **Final Edits**: Drops or renames columns and coerces dtypes as needed.
- ✅ **Certification Check**: Re-runs validation rules with `fail_on_error: true` to enforce schema, dtypes, and content requirements.
- ✅ **Lifecycle Comparison**: Compares the raw vs. final dataset's structure, nulls, and column presence.
- ✅ **Capstone Report**: Renders a complete dashboard summarizing the pipeline's impact and status.

🛡️ If any rule is violated, the system halts and logs failure details for debugging. Once this step passes, your dataset is certified and ready for production use.

In [None]:
# --- Run Final Audit Module ---

# Check if module is enabled in the master config
module_settings = master_config.get("modules", {}).get("final_audit", {})
if not module_settings.get("run", False):
    print("⏩ Skipping Final Audit module as per master config.")
elif df is None:
    print("⏩ Skipping Final Audit because input dataframe is None.")
else:
    # Load module-specific config
    config_path = module_settings.get("config_path", "config/final_audit_config_template.yaml")
    module_config = load_config(config_path)
    
    print(f"🚀 Running Final Audit from '{config_path}'...")
    # This function returns the final, certified dataframe
    df_final = run_final_audit_pipeline(
        config=module_config,
        df=df_imput,
        notebook=notebook_mode,
        run_id=run_id
    )

In [None]:
# 🎉 Final Certified Data Preview
if 'df' in locals() and df is not None:
    print("✅ Pipeline complete. Displaying the first 5 rows of the final certified dataset:")
    display(df.head())
else:
    print("⏹️ Pipeline finished, but no final dataframe was produced (likely skipped or failed). ")