In [3]:
# Data Ingestion, Processing, and MLflow Model Logging
import io, os
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from pandas.plotting import scatter_matrix
import mlflow
from mlflow.models import infer_signature

# from domino.data_sources import DataSourceClient
from domino_data.data_sources import DataSourceClient
from domino_data.datasets import DatasetClient

from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.pipeline import Pipeline

from ydata_profiling import ProfileReport

import time
import yaml

%matplotlib inline

domino_working_dir = os.environ.get("DOMINO_WORKING_DIR", ".")
domino_datasource_dir = domino_working_dir.replace('code', 'data')
domino_artifact_dir = domino_working_dir.replace('code', 'artifacts')
domino_project_name = os.environ.get("DOMINO_PROJECT_NAME", "my-local-project")

## Data Ingestion, Processing, and MLflow Model Logging

mlflow.set_experiment('CC Fraud Preprocessing [no PCA]')

def run_data_ingestion_and_processing(raw_filename: str, clean_filename: str):
    # 1) Download the raw file
    ds = DataSourceClient().get_datasource("credit_card_fraud_detection")
    buf = io.BytesIO()
    ds.download_fileobj(raw_filename, buf)
    buf.seek(0)
    df = pd.read_csv(buf)
    print(f"🔍 Loaded {len(df):,} rows from {raw_filename}")

    # 2) Drop missing rows
    before = len(df)
    df = df.dropna()
    after = len(df)
    pct_removed = 100 * (before - after) / before if before > 0 else 0
    print(f"🧹 Dropped {before - after:,} rows with missing data")

    # 3) Match run_all: drop Class, Time, Hour from X
    X = df.drop(columns=["Class", "Time", "Hour"], errors="ignore")
    y = df["Class"]

    # 4) Detect numeric and categorical columns as in run_all
    numeric_features = X.select_dtypes(include=[np.number]).columns.tolist()
    categorical_features = X.select_dtypes(include=[object, "category"]).columns.tolist()

    preprocessor = ColumnTransformer(transformers=[
        ('num', StandardScaler(), numeric_features),
        ('cat', OneHotEncoder(handle_unknown='ignore', sparse_output=False), categorical_features,)
    ])
    pipeline = Pipeline([
        ("preproc", preprocessor)
    ])
    start_time = time.time()
    X_processed = pipeline.fit_transform(X)
    fit_time = time.time() - start_time

    # 5) Save processed data exactly as in run_all
    np.save(f"{domino_datasource_dir}/{domino_project_name}/X_processed.npy", X_processed)
    y.to_csv(f"{domino_datasource_dir}/{domino_project_name}/y.csv", index=False)
    print(f"✅ Saved X_processed.npy and y.csv for downstream modeling")

    if hasattr(X_processed, "toarray"):
        X_arr = X_processed.toarray()
    else:
        X_arr = X_processed
    # get numeric names + one-hot names
    num_cols = numeric_features
    cat_cols = pipeline.named_steps["preproc"] \
                       .named_transformers_["cat"] \
                       .get_feature_names_out(categorical_features).tolist()
    all_cols = num_cols + cat_cols

    df_scaled = pd.DataFrame(X_arr, columns=all_cols)
    df_scaled["Class"] = y.values  # add back target if you like

    # 2) Save it under clean_filename
    clean_path = f"{domino_datasource_dir}/{domino_project_name}/{clean_filename}"
    os.makedirs(os.path.dirname(clean_path), exist_ok=True)
    df_scaled.to_csv(clean_path, index=False)

    if mlflow.active_run():
        mlflow.end_run()

    # 6) Start MLflow run and log everything
    with mlflow.start_run(run_name="Preprocessing Pipeline") as run:
        # Log parameters
        mlflow.log_artifact(clean_path, artifact_path="data")
        mlflow.log_param("raw_filename", raw_filename)
        mlflow.log_param("clean_filename", clean_filename)
        mlflow.log_param("num_rows_loaded", before)
        mlflow.log_param("num_rows_after_dropna", after)
        mlflow.log_param("num_cat_features", len(categorical_features))
        mlflow.log_param("num_num_features", len(numeric_features))

        # Log human-readable pipeline parameters as YAML
        pipeline_params = {
            "raw_filename": raw_filename,
            "clean_filename": clean_filename,
            "num_rows_loaded": before,
            "num_rows_after_dropna": after,
            "num_cat_features": len(categorical_features),
            "num_num_features": len(numeric_features),
            "categorical_columns": categorical_features,
            "numerical_columns": numeric_features,
        }
        params_yaml_path = f"{domino_artifact_dir}/pipeline_params.yaml"
        with open(params_yaml_path, "w") as f:
            yaml.dump(pipeline_params, f, default_flow_style=False)
        mlflow.log_artifact(params_yaml_path, artifact_path="params")

        # Log the pipeline as a single model
        X_sig = X.copy()
        for col in numeric_features:
            if np.issubdtype(X_sig[col].dtype, np.integer):
                X_sig[col] = X_sig[col].astype("float64")
        signature = infer_signature(X_sig.iloc[:5], pipeline.transform(X_sig.iloc[:5]))
        mlflow.sklearn.log_model(
            pipeline,
            artifact_path="preprocessing_pipeline",
            registered_model_name="CC Fraud Preprocessing",
            signature=signature
        )
        mlflow.set_tag("pipeline", "full_preproc_no_pca")

        # Log metrics
        mlflow.log_metric("pct_data_removed", pct_removed)
        mlflow.log_metric("num_rows_removed", before - after)
        mlflow.log_metric("preproc_fit_time_sec", fit_time)

        # 7) Generate and log artifacts (corr, scatter, etc.)
        num_df = df.select_dtypes(include="number").drop(columns=["Time", "Class"], errors="ignore")
        # Correlation heatmap
        plt.figure(figsize=(14,12))
        sns.heatmap(num_df.corr(), annot=True, fmt=".2f", cmap="vlag")
        plt.title("Correlation Matrix")
        corr_path = f"{domino_artifact_dir}/raw_correlation_matrix.png"
        plt.savefig(corr_path); plt.close()
        mlflow.log_artifact(corr_path, artifact_path="plots")
        # Scatter matrix
        sample_df = num_df.sample(n=500, random_state=0)
        fig = scatter_matrix(sample_df, alpha=0.2, diagonal="hist", figsize=(15,15))
        scatter_path = f"{domino_artifact_dir}/raw_scatter_plots.png"
        plt.savefig(scatter_path); plt.close()
        mlflow.log_artifact(scatter_path, artifact_path="plots")

        # 8) EDA HTML
        profile = ProfileReport(df, title="EDA Report", explorative=True, minimal=True)
        eda_path = f"{domino_artifact_dir}/eda_report.html"
        profile.to_file(eda_path)
        mlflow.log_artifact(eda_path, artifact_path="eda")

    return df, X_processed, y

# Usage:
raw_df, X_processed, y = run_data_ingestion_and_processing(
    raw_filename="raw_cc_transactions.csv",
    clean_filename="cleaned_cc_transactions.csv"
)


🔍 Loaded 78,324 rows from raw_cc_transactions.csv
🧹 Dropped 1,114 rows with missing data
✅ Saved X_processed.npy and y.csv for downstream modeling


Registered model 'CC Fraud Preprocessing' already exists. Creating a new version of this model...
2025/06/30 21:14:18 INFO mlflow.store.model_registry.abstract_store: Waiting up to 300 seconds for model version to finish creation. Model name: CC Fraud Preprocessing, version 21
Created version '21' of model 'CC Fraud Preprocessing'.


Summarize dataset:   0%|          | 0/5 [00:00<?, ?it/s]


  0%|          | 0/19 [00:00<?, ?it/s][A
  5%|▌         | 1/19 [00:00<00:02,  7.21it/s][A
 11%|█         | 2/19 [00:00<00:02,  8.40it/s][A
 16%|█▌        | 3/19 [00:00<00:01,  8.95it/s][A
100%|██████████| 19/19 [00:00<00:00, 34.51it/s][A


Generate report structure:   0%|          | 0/1 [00:00<?, ?it/s]

Render HTML:   0%|          | 0/1 [00:00<?, ?it/s]

Export report to file:   0%|          | 0/1 [00:00<?, ?it/s]

🏃 View run Preprocessing Pipeline at: http://127.0.0.1:8768/#/experiments/1541/runs/6d98ac9891b7416eb6d3632cab576054
🧪 View experiment at: http://127.0.0.1:8768/#/experiments/1541
