In [13]:
#!/usr/bin/env python3
"""
Data acquisition script for Credit Card Fraud Detection MLOps Pipeline.

This script:
1. Downloads the Credit Card Fraud Detection dataset
2. Initializes DVC
3. Adds the raw data to DVC tracking
4. Pushes to the DVC remote
"""

import os
import sys
import logging
import requests
import hashlib
import subprocess
from pathlib import Path

# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger('data-acquisition')

# Constants
DATA_URL = "https://nextcloud.scopicsoftware.com/s/bo5PTKgpngWymGE/download/creditcard-data.csv"
DATA_DIR = Path("data")
RAW_DATA_DIR = DATA_DIR / "raw"
RAW_DATA_FILE = RAW_DATA_DIR / "creditcard-data.csv"
GIT_IGNORE = "data/raw/.gitignore"
# Expected SHA256 checksum of the file (optional for validation)
EXPECTED_SHA256 = None  # Replace with actual SHA256 if known

def setup_directories():
    """Create necessary directories if they don't exist."""
    logger.info(f"Creating directory {RAW_DATA_DIR}")
    RAW_DATA_DIR.mkdir(parents=True, exist_ok=True) 

def download_data():
    """Download the dataset from the source URL."""
    if RAW_DATA_FILE.exists():
        logger.info(f"Data file already exists at {RAW_DATA_FILE}, skipping download.")
        return

    logger.info(f"Downloading data from {DATA_URL}")
    response = requests.get(DATA_URL, stream=True)
    response.raise_for_status()

    with open(RAW_DATA_FILE, 'wb') as f:
        for chunk in response.iter_content(chunk_size=8192):
            f.write(chunk)

    logger.info(f"Download complete: {RAW_DATA_FILE}")

def compute_sha256(filepath):
    """Compute SHA256 checksum of a file."""
    sha256 = hashlib.sha256()
    with open(filepath, 'rb') as f:
        for chunk in iter(lambda: f.read(8192), b''):
            sha256.update(chunk)
    return sha256.hexdigest()

def validate_data():
    """Validate the downloaded data file integrity."""
    if not RAW_DATA_FILE.exists():
        logger.error("Data file does not exist.")
        sys.exit(1)

    if EXPECTED_SHA256:
        logger.info("Validating data file checksum...")
        checksum = compute_sha256(RAW_DATA_FILE)
        if checksum != EXPECTED_SHA256:
            logger.error("Checksum does not match. File may be corrupted.")
            sys.exit(1)
        logger.info("Checksum validated.")
    else:
        logger.warning("No expected checksum provided. Skipping validation.")

def has_staged_changes():
    result = subprocess.run(
        ["git", "diff", "--cached", "--quiet"]
    )
    return result.returncode != 0  # True if there are staged changes

def initialize_dvc():
    """Initialize DVC and add data to tracking."""
    if not Path(".dvc").exists():
        logger.info("Initializing DVC...")
        subprocess.run(["dvc", "init"], check=True)

    logger.info(f"Adding {RAW_DATA_FILE} to DVC tracking...")
    subprocess.run(["dvc", "add", str(RAW_DATA_FILE)], check=True)

    logger.info("Committing DVC changes to Git...")
    subprocess.run(["git", "add", str(RAW_DATA_FILE) + ".dvc"], check=True)
    subprocess.run(["git", "add", GIT_IGNORE], check=True)
    
    if has_staged_changes():
        subprocess.run(["git", "commit", "-m", "Add raw dataset to DVC"], check=True)
    else:
        print("Nothing to commit — working tree clean.")

    logger.info("Pushing data to DVC remote...")
    subprocess.run(["dvc", "push"], check=True)

def main():
    """Main function to orchestrate the data acquisition process."""
    logger.info("Starting data acquisition process")

    setup_directories()
    download_data()
    validate_data()
    initialize_dvc()

    logger.info("Data acquisition completed successfully")

if __name__ == "__main__":
    main()

2025-05-15 10:34:06,221 - data-acquisition - INFO - Starting data acquisition process
2025-05-15 10:34:06,221 - data-acquisition - INFO - Creating directory data\raw
2025-05-15 10:34:06,222 - data-acquisition - INFO - Data file already exists at data\raw\creditcard-data.csv, skipping download.
2025-05-15 10:34:06,224 - data-acquisition - INFO - Adding data\raw\creditcard-data.csv to DVC tracking...
2025-05-15 10:34:07,567 - data-acquisition - INFO - Committing DVC changes to Git...
Nothing to commit — working tree clean.
2025-05-15 10:34:07,661 - data-acquisition - INFO - Pushing data to DVC remote...
2025-05-15 10:34:08,998 - data-acquisition - INFO - Data acquisition completed successfully


In [None]:
#!/usr/bin/env python3
"""
Data preprocessing script for Credit Card Fraud Detection MLOps Pipeline.

This script:
1. Loads a specific version of raw data from DVC
2. Handles class imbalance
3. Normalizes features
4. Splits data into train/validation/test sets
5. Saves processed datasets back to DVC
6. Logs preprocessing steps to MLflow

Usage:
    python preprocess.py --data-rev <DVC_REVISION>
"""

import os
import sys
import logging
import argparse
import subprocess
import numpy as np
import pandas as pd
from pathlib import Path
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from imblearn.under_sampling import RandomUnderSampler
import mlflow

# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger('data-preprocessing')

# Constants
DATA_DIR = Path("data")
RAW_DATA_DIR = DATA_DIR / "raw"
PROCESSED_DATA_DIR = DATA_DIR / "processed"
RAW_DATA_FILE = RAW_DATA_DIR / "creditcard-data.csv"

def parse_args():
    logger.info(f"Check parser")
    parser = argparse.ArgumentParser(description='Data preprocessing script')
    logger.info(f"Check parser 1")
    parser.add_argument('--data-rev', type=str, required=False, default="HEAD", help='(Optional) DVC revision/version of the raw data to use. Defaults to HEAD.')
    logger.info(f"Check parser 2")
    return parser.parse_known_args()[0]

def setup_directories():
    logger.info(f"Creating processed data directory: {PROCESSED_DATA_DIR}")
    PROCESSED_DATA_DIR.mkdir(parents=True, exist_ok=True)

def setup_mlflow():
    mlflow.set_tracking_uri("http://localhost:5000")
    mlflow.set_experiment("Preprocessing")

def load_data(data_rev):
    logger.info(f"Checking out raw data at DVC revision: {data_rev}")
    # subprocess.run(["dvc", "checkout", RAW_DATA_FILE.as_posix(), "--rev", data_rev], check=True)
    subprocess.run(["git", "checkout", data_rev], check=True)
    subprocess.run(["dvc", "pull", RAW_DATA_FILE.as_posix()], check=True)
    logger.info(f"Loading dataset from {RAW_DATA_FILE}")
    return pd.read_csv(RAW_DATA_FILE)

def analyze_data(df):
    stats = {
        "num_rows": len(df),
        "num_features": df.shape[1],
        "num_fraud": df[df["Class"] == 1].shape[0],
        "num_normal": df[df["Class"] == 0].shape[0],
    }
    mlflow.log_metrics(stats)
    logger.info(f"Data summary: {stats}")
    return stats

def preprocess_data(df):
    logger.info("Splitting features and labels...")
    X = df.drop(columns=["Class"])
    y = df["Class"]

    logger.info("Splitting into train/validation/test...")
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=42)
    X_train, X_val, y_train, y_val = train_test_split(X_train, y_train, test_size=0.22, random_state=42)

    logger.info("Normalizing features...")
    scaler = StandardScaler()
    X_train = scaler.fit_transform(X_train)
    X_val =  scaler.transform(X_val)
    X_test = scaler.transform(X_test)

    logger.info("Applying SMOTE to balance the dataset...")
    undersampler = RandomUnderSampler(sampling_strategy=0.1, random_state=42)
    X_resampled, y_resampled = undersampler.fit_resample(X_train, y_train)

    train_df = pd.DataFrame(X_resampled)
    train_df["Class"] = y_resampled.values

    val_df = pd.DataFrame(X_val)
    val_df["Class"] = y_val.values

    test_df = pd.DataFrame(X_test)
    test_df["Class"] = y_test.values

    return train_df, val_df, test_df

def save_processed_data(train_df, val_df, test_df):
    logger.info("Saving processed datasets...")
    train_path = PROCESSED_DATA_DIR / "train.csv"
    val_path = PROCESSED_DATA_DIR / "val.csv"
    test_path = PROCESSED_DATA_DIR / "test.csv"

    train_df.to_csv(train_path, index=False)
    val_df.to_csv(val_path, index=False)
    test_df.to_csv(test_path, index=False)

    logger.info("Tracking processed data with DVC...")
    # subprocess.run(["dvc", "commit", train_path.as_posix()], check=True)
    # subprocess.run(["dvc", "commit", str(val_path)], check=True)
    # subprocess.run(["dvc", "commit", str(test_path)], check=True)
    subprocess.run(["dvc", "commit", "preprocess", "--force"], check=True)
    subprocess.run(["git", "add", "."], check=True)
    subprocess.run(["git", "commit", "-m", "Add processed datasets"], check=True)
    subprocess.run(["dvc", "push"], check=True)

def log_to_mlflow(stats, train_df, val_df, test_df):
    mlflow.log_param("train_size", len(train_df))
    mlflow.log_param("val_size", len(val_df))
    mlflow.log_param("test_size", len(test_df))
    mlflow.log_metrics({
        "class_ratio_train": train_df["Class"].mean(),
        "class_ratio_val": val_df["Class"].mean(),
        "class_ratio_test": test_df["Class"].mean()
    })

def main():
    args = parse_args()
    logger.info(f"Starting data preprocessing pipeline with data revision: {args.data_rev}")
    setup_directories()
    setup_mlflow()
    with mlflow.start_run():
        df = load_data(args.data_rev)
        stats = analyze_data(df)
        train_df, val_df, test_df = preprocess_data(df)
        save_processed_data(train_df, val_df, test_df)
        log_to_mlflow(stats, train_df, val_df, test_df)
    logger.info("Data preprocessing completed successfully")

if __name__ == "__main__":
    main()

2025-05-15 10:47:45,272 - data-preprocessing - INFO - Check parser
2025-05-15 10:47:45,273 - data-preprocessing - INFO - Check parser 1
2025-05-15 10:47:45,273 - data-preprocessing - INFO - Check parser 2
2025-05-15 10:47:45,273 - data-preprocessing - INFO - Starting data preprocessing pipeline with data revision: HEAD
2025-05-15 10:47:45,274 - data-preprocessing - INFO - Creating processed data directory: data\processed
2025-05-15 10:47:45,338 - data-preprocessing - INFO - Checking out raw data at DVC revision: HEAD
2025-05-15 10:47:47,047 - data-preprocessing - INFO - Loading dataset from data\raw\creditcard-data.csv
2025-05-15 10:47:48,377 - data-preprocessing - INFO - Data summary: {'num_rows': 284807, 'num_features': 31, 'num_fraud': 492, 'num_normal': 284315}
2025-05-15 10:47:48,381 - data-preprocessing - INFO - Splitting features and labels...
2025-05-15 10:47:48,393 - data-preprocessing - INFO - Splitting into train/validation/test...
2025-05-15 10:47:48,505 - data-preprocessin

CalledProcessError: Command '['git', 'commit', '-m', 'Add processed datasets']' returned non-zero exit status 1.

In [29]:
#!/usr/bin/env python3
"""
Model training script for Credit Card Fraud Detection MLOps Pipeline.

This script:
1. Loads preprocessed data from a specific DVC version
2. Trains a Gradient Boosting model (XGBoost)
3. Performs hyperparameter tuning
4. Tracks experiments with MLflow
5. Registers the best model

Usage:
    python train.py --data-rev <DVC_REVISION>
"""

import os
import sys
import logging
import argparse
import numpy as np
import pandas as pd
from pathlib import Path
import joblib
from xgboost import XGBClassifier
from sklearn.model_selection import RandomizedSearchCV, train_test_split
from sklearn.metrics import (
    accuracy_score, precision_score, recall_score, f1_score,
    roc_auc_score, average_precision_score, confusion_matrix
)
import mlflow
import mlflow.xgboost
import subprocess

# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[logging.StreamHandler(sys.stdout)]
)
logger = logging.getLogger('model-training')

# Constants
DATA_DIR = Path("data")
PROCESSED_DATA_DIR = DATA_DIR / "processed"
PROCESSED_DATA_FILE_TRAIN = PROCESSED_DATA_DIR / "train.csv"
PROCESSED_DATA_FILE_VAL = PROCESSED_DATA_DIR / "val.csv"
MODELS_DIR = Path("models")


def parse_args():
    parser = argparse.ArgumentParser(description='Model training script')
    parser.add_argument('--data-rev', type=str, required=False, default="HEAD",
                        help='(Optional) DVC revision/version of the processed data to use')
    return parser.parse_known_args()[0]


def setup_directories():
    MODELS_DIR.mkdir(parents=True, exist_ok=True)


def setup_mlflow():
    mlflow.set_tracking_uri("http://localhost:5000")
    mlflow.set_experiment("credit-card-fraud-detection")


def load_data(data_rev):
    logger.info(f"Pulling data from DVC revision: {data_rev}")
    subprocess.run(["dvc", "pull", "--force", PROCESSED_DATA_FILE_TRAIN.as_posix()], check=True)
    subprocess.run(["dvc", "pull", "--force", PROCESSED_DATA_FILE_VAL.as_posix()], check=True)
    train_df = pd.read_csv(PROCESSED_DATA_DIR / "train.csv")
    val_df = pd.read_csv(PROCESSED_DATA_DIR / "val.csv")
    X_train = train_df.drop(columns=["Class"])
    y_train = train_df["Class"]
    X_val = val_df.drop(columns=["Class"])
    y_val = val_df["Class"]
    return X_train, y_train, X_val, y_val


def train_model(X_train, y_train, X_val, y_val):
    model = XGBClassifier(use_label_encoder=False, eval_metric='logloss', random_state=42)

    param_dist = {
        "n_estimators": [100, 200, 300],
        "max_depth": [3, 5, 7],
        "learning_rate": [0.01, 0.1, 0.2],
        "subsample": [0.6, 0.8, 1.0]
    }

    logger.info("Starting hyperparameter tuning with RandomizedSearchCV...")
    search = RandomizedSearchCV(model, param_distributions=param_dist, n_iter=10,
                                scoring='roc_auc', cv=3, verbose=1, n_jobs=-1, random_state=42)
    search.fit(X_train, y_train)
    best_model = search.best_estimator_
    logger.info(f"Best hyperparameters: {search.best_params_}")
    return best_model, search.best_params_


def evaluate_model(model, X_val, y_val):
    logger.info("Evaluating model on validation data...")
    y_pred = model.predict(X_val)
    y_proba = model.predict_proba(X_val)[:, 1]
    metrics = {
        "accuracy": accuracy_score(y_val, y_pred),
        "precision": precision_score(y_val, y_pred),
        "recall": recall_score(y_val, y_pred),
        "f1": f1_score(y_val, y_pred),
        "roc_auc": roc_auc_score(y_val, y_proba),
        "avg_precision": average_precision_score(y_val, y_proba)
    }
    logger.info(f"Evaluation metrics: {metrics}")
    return metrics


def log_to_mlflow(model, params, metrics):
    with mlflow.start_run():
        mlflow.log_params(params)
        mlflow.log_metrics(metrics)
        mlflow.xgboost.log_model(model, "model")
        mlflow.register_model(f"runs:/{mlflow.active_run().info.run_id}/model", "fraud-detection-model")


def save_model(model):
    logger.info("Saving model to disk...")
    joblib.dump(model, MODELS_DIR / "model.joblib")


def main():
    args = parse_args()
    logger.info(f"Starting model training pipeline with data revision: {args.data_rev}")

    setup_directories()
    setup_mlflow()
    X_train, y_train, X_val, y_val = load_data(args.data_rev)
    model, best_params = train_model(X_train, y_train, X_val, y_val)
    metrics = evaluate_model(model, X_val, y_val)
    log_to_mlflow(model, best_params, metrics)
    save_model(model)

    logger.info("Model training completed successfully")


if __name__ == "__main__":
    main()

2025-05-14 17:25:54,562 - model-training - INFO - Starting model training pipeline with data revision: HEAD
2025-05-14 17:25:54,609 - model-training - INFO - Pulling data from DVC revision: HEAD


CalledProcessError: Command '['dvc', 'pull', '--force', 'data/processed/train.csv']' returned non-zero exit status 1.