In [1]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import os
from datetime import datetime
pd.options.display.float_format = '{:20.2f}'.format
import warnings
warnings.filterwarnings("ignore")
import joblib as jb
import numpy as np
pd.set_option('display.max_columns', 999)
from collections import Counter

from typing import NamedTuple
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import RobustScaler
from sklearn.metrics import accuracy_score,classification_report,roc_auc_score
from imblearn.combine import SMOTEENN
from pathlib import Path
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import (
    roc_auc_score, precision_score, recall_score, f1_score,
    classification_report, roc_curve, auc,precision_recall_curve, average_precision_score,matthews_corrcoef
)
from sklearn.model_selection import RandomizedSearchCV
import json
from google.cloud.storage import Client, transfer_manager
import glob
from kfp.compiler import Compiler
from kfp import dsl
from kfp.dsl import (
    Input,
    Output,
    Artifact,
    Model,
    Metrics,
    component
)

In [2]:
class VersionConfiguration:
    def __init__(self):
        self.root_folder = "artifacts"
        self.data_version_dir = os.path.join(self.root_folder, "data_version")
        self.data_ingestion_dir = os.path.join(self.root_folder, "data_ingestion")
        self.model_version_dir = os.path.join(self.root_folder, "model_version")
        self.evaluation_dir = os.path.join(self.root_folder, "evaluation")
        self.datetime_suffix = datetime.now().strftime('%Y%m%dT%H%M%S')
        self.credential = "C:/Users/Admin/Downloads/llmops-460406-f379299f4261.json"

class gcpConfig:
    def __init__(self):
        self.credential = "C:/Users/Admin/Downloads/llmops-460406-f379299f4261.json"  
        self.bucket = "churn_data_version"
        self.workers = 8
class SupportModel:
    def most_common(lst):
        counts = Counter(lst)
        if not counts:
            return None 
        return counts.most_common(1)[0][0]
    def get_dummies(df):
        categorical_cols = df.select_dtypes(include=['object']).columns
        if len(categorical_cols) > 0:
            for col in categorical_cols:
                if df[col].isin(['yes', 'no', 'True', 'False']).any():
                    df[col] = df[col].map({'yes': 1, 'True': 1, 'no': 0, 'False': 0})
                else:
                    df = pd.get_dummies(df, columns=[col])
        return df
class ModelConfiguration:
    def __init__(self):
        self.random_state = 42
        self.test_size = 0.2
        self.n_estimators = 500
        self.criterion="entropy"
        self.max_depth=39
        self.max_features="log2"
        self.min_samples_leaf=2


## **DATA PREPROCESS PIPELINE**

In [3]:
def get_dataset():
    version_config = VersionConfiguration()
    df_input = pd.read_csv(version_config.data_ingestion_dir + "/input_raw.csv")
    print(f"Loaded dataset with {len(df_input)} rows")
    print(f"Columns found: {list(df_input.columns)}")
    input_data_versioned_name = f"input_raw_data_version_{version_config.datetime_suffix}.csv"
    input_data_versioned_path = Path(version_config.data_version_dir) / input_data_versioned_name
    df_input.to_csv(input_data_versioned_path, index=False)
    print(f"Created versioned input data file: {input_data_versioned_path}")
    df_input.columns = df_input.columns.map(lambda x: str(x).strip())
    cols_to_drop = {"Returns", "Age", "Total Purchase Amount"}
    df_input.drop(columns=[col for col in cols_to_drop if col in df_input.columns], inplace=True)
    df_input.dropna(inplace=True)
    if 'Price' not in df_input.columns:
        df_input['Price'] = df_input['Product Price']
    if 'Product Price' not in df_input.columns:
        raise KeyError("Required column 'Product Price' is missing from the dataset.")
    
    df_input['TotalSpent'] = df_input['Quantity'] * df_input['Price']
    df_features = df_input.groupby("customer_id", as_index=False, sort=False).agg(
        LastPurchaseDate = ("Purchase Date","max"),
        Favoured_Product_Categories = ("Product Category", lambda x: SupportModel.most_common(list(x))),
        Frequency = ("Purchase Date", "count"),
        TotalSpent = ("TotalSpent", "sum"),
        Favoured_Payment_Methods = ("Payment Method", lambda x: SupportModel.most_common(list(x))),
        Customer_Name = ("Customer Name", "first"),
        Customer_Label = ("Customer_Labels", "first"),
        Churn = ("Churn", "first"),
    )

    df_features = df_features.drop_duplicates(subset=['Customer_Name'], keep='first')
    df_features['LastPurchaseDate'] = pd.to_datetime(df_features['LastPurchaseDate'])
    df_features['LastPurchaseDate'] = df_features['LastPurchaseDate'].dt.date
    df_features['LastPurchaseDate'] = pd.to_datetime(df_features['LastPurchaseDate'])
    max_LastBuyingDate = df_features["LastPurchaseDate"].max()
    df_features['Recency'] = (max_LastBuyingDate - df_features['LastPurchaseDate']).dt.days
    df_features['LastPurchaseDate'] = df_features['LastPurchaseDate'].dt.date
    df_features['Avg_Spend_Per_Purchase'] = df_features['TotalSpent'] / df_features['Frequency'].replace(0, 1)
    df_features['Purchase_Consistency'] = df_features['Recency'] / df_features['Frequency'].replace(0, 1)
    df_features.drop(columns=["customer_id","LastPurchaseDate",'Customer_Name'], inplace=True)
    print(f"processed dataset with {len(df_features)} rows")
    print(f"After feature engineering columns: {list(df_features.columns)}")
    print(f"Data shape after feature engineering: {df_features.shape}")
    print(f"First few rows of feature engineered data: \n{df_features.head()}")
    model_data_versioned_name = f"model_data_version_{version_config.datetime_suffix}.csv"
    model_data_versioned_path = Path(version_config.data_version_dir) / model_data_versioned_name
    df_features.to_csv(model_data_versioned_path, index=False)
    print(f"Created versioned model data file: {model_data_versioned_path}")
    df_encode = SupportModel.get_dummies(df_features)
    print("Successfully processed data")
    return df_encode


## **SPLIT DATA**

In [4]:
def split_data(df):
    version_config = VersionConfiguration()
    df_model = df.copy()
    X = df_model.drop('Churn',axis=1)
    y = df_model['Churn']
    print(f"X shape (features): {X.shape}")
    print(f"y shape (target): {y.shape}")

    print(f"Target variable shape: {y.shape}")
    print(f"NaN count in target: {y.isna().sum()}")
    print(f"Target variable distribution: \n{y.value_counts(normalize=True)}")

    if y.isna().sum() > 0:
        print(f"Found {y.isna().sum()} NaN values in target variable, filling with 0")
        y = y.fillna(0)

    nan_cols = X.columns[X.isna().any()].tolist()
    if nan_cols:
        print(f"Found NaN values in feature columns: {nan_cols}")
        X = X.fillna(0)

    print(f"Final feature matrix shape: {X.shape}")
    print(f"Final target vector shape: {y.shape}")

    class_distribution = y.value_counts(normalize=True)
    print(f"Target variable distribution (normalized): \n{class_distribution}")

    imbalance_threshold = 0.4

    if class_distribution.min() < imbalance_threshold:
        print("Target variable is imbalanced. Applying SMOTEENN...")
        smote = SMOTEENN(random_state=42)
        X_res, y_res = smote.fit_resample(X, y)
        print(f"Resampled feature matrix shape: {X_res.shape}")
        print(f"Resampled target distribution: \n{y_res.value_counts()}")
    else:
        print("Target variable is balanced. Skipping SMOTEENN.")
        X_res, y_res = X, y
    
    print("Splitting data into train and test sets")
    X_train, X_test, y_train, y_test = train_test_split(X_res, y_res, test_size=0.2, random_state=42)
    print(f"Train data: {X_train.shape}, Test data: {X_test.shape}")
    
    print("Converting variables to DataFrames...")
    
    if not isinstance(X_train, pd.DataFrame):
        if hasattr(X_res, 'columns'):
            X_train = pd.DataFrame(X_train, columns=X_res.columns)
        else:
            X_train = pd.DataFrame(X_train, columns=X.columns)
    
    if not isinstance(X_test, pd.DataFrame):
        if hasattr(X_res, 'columns'):
            X_test = pd.DataFrame(X_test, columns=X_res.columns)
        else:
            X_test = pd.DataFrame(X_test, columns=X.columns)
    
    if not isinstance(y_train, pd.DataFrame):
        y_train = pd.DataFrame(y_train, columns=['Churn'])
    
    if not isinstance(y_test, pd.DataFrame):
        y_test = pd.DataFrame(y_test, columns=['Churn'])
    
    print(f"Final DataFrame shapes:")
    print(f"X_train: {X_train.shape}, type: {type(X_train)}")
    print(f"X_test: {X_test.shape}, type: {type(X_test)}")
    print(f"y_train: {y_train.shape}, type: {type(y_train)}")
    print(f"y_test: {y_test.shape}, type: {type(y_test)}")
    
    train_feature_path = os.path.join(version_config.data_version_dir, f"train_feature_version_{version_config.datetime_suffix}.csv")
    test_feature_path = os.path.join(version_config.data_version_dir, f"test_feature_version_{version_config.datetime_suffix}.csv")
    train_target_path = os.path.join(version_config.data_version_dir, f"train_target_version_{version_config.datetime_suffix}.csv")
    test_target_path = os.path.join(version_config.data_version_dir, f"test_target_version_{version_config.datetime_suffix}.csv")
    
    X_train.to_csv(train_feature_path, index=False)
    X_test.to_csv(test_feature_path, index=False)
    y_train.to_csv(train_target_path, index=False)
    y_test.to_csv(test_target_path, index=False)
    
    print("Data successfully saved to CSV files")
    
    return train_feature_path, test_feature_path, train_target_path, test_target_path

## **PREPARE ESSENTIALS FOR MODEL**

In [5]:
def prepare_base_model(X_train_path, X_test_path):
    model_config = ModelConfiguration()
    version_config = VersionConfiguration()
    model = RandomForestClassifier(
    n_estimators=model_config.n_estimators, 
    random_state=model_config.random_state,
    criterion=model_config.criterion,
    max_depth=model_config.max_depth,
    max_features=model_config.max_features,
    min_samples_leaf=model_config.min_samples_leaf
)
    base_model_version_name = f"base_model_version_{version_config.datetime_suffix}.pkl"
    base_model_version_path = Path(version_config.model_version_dir) / base_model_version_name
    jb.dump(model, base_model_version_path)
    print("Created versioned base model file:", base_model_version_path)
    X_train = pd.read_csv(X_train_path)
    X_test = pd.read_csv(X_test_path)

    scaled_test_data_path = os.path.join(version_config.data_version_dir, f"test_feature_scaled_version_{version_config.datetime_suffix}.csv")
    scaled_train_data_path = os.path.join(version_config.data_version_dir, f"train_feature_scaled_version_{version_config.datetime_suffix}.csv")
    scaler_path = os.path.join(version_config.model_version_dir, f"scaler_churn_version_{version_config.datetime_suffix}.pkl")
    scaler = RobustScaler()
    X_train_scaled = scaler.fit_transform(X_train)
    X_test_scaled = scaler.transform(X_test)
    jb.dump(scaler, scaler_path)
    
    X_train_scaled_df = pd.DataFrame(X_train_scaled, columns=X_train.columns)
    X_test_scaled_df = pd.DataFrame(X_test_scaled, columns=X_test.columns)

    X_train_scaled_df.to_csv(scaled_train_data_path, index=False)
    X_test_scaled_df.to_csv(scaled_test_data_path, index=False)
    
    print(f"Scalers and scaled data saved:")
    print(f"  Scaler: {scaler_path}")
    print(f"  X_train_scaled: {scaled_train_data_path}")
    print(f"  X_test_scaled: {scaled_test_data_path}")
    return base_model_version_path, scaled_test_data_path, scaled_train_data_path

## **TRAINNING PIPELINE**

In [6]:
def trainning(base_model_path, scaled_train_path, scaled_test_path, y_train_path, y_test_path):
    version_config = VersionConfiguration()
    model = jb.load(base_model_path)
    X_train_scaled = pd.read_csv(scaled_train_path)
    X_test_scaled = pd.read_csv(scaled_test_path)
    y_train = pd.read_csv(y_train_path)
    y_test = pd.read_csv(y_test_path)
    
    model.fit(X_train_scaled, y_train)
    prediction = model.predict(X_test_scaled)
    accuracy = accuracy_score(y_test, prediction)
    print(f"Initial model accuracy: {accuracy:.4f}")
    
    if accuracy < 0.85:
        print("Trigger fine-tuning!")
        rf_params = {
            'n_estimators': [100, 200, 300, 400, 500, 700, 1000],  
            'criterion': ['gini', 'entropy', 'log_loss'],          
            'max_depth': [None, 10, 20, 30, 50, 70],                
            'min_samples_split': [2, 5, 10, 15],                    
            'min_samples_leaf': [1, 2, 4, 6],                       
            'max_features': ['sqrt', 'log2', None],                
            'bootstrap': [True, False],                             
            'class_weight': [None, 'balanced', 'balanced_subsample']  
        }
        random_search = RandomizedSearchCV(model, rf_params, cv=5, n_jobs=1, n_iter=20, random_state=42, scoring='accuracy')
        random_search.fit(X_train_scaled, y_train)
        best_model = RandomForestClassifier(**random_search.best_params_, random_state=42)
        best_model.fit(X_train_scaled, y_train)  # Train the best model
        print(f"Best parameters found: {random_search.best_params_}")
        print(f"Best cross-validation score: {random_search.best_score_:.4f}")
        
        best_prediction = best_model.predict(X_test_scaled)
        best_accuracy = accuracy_score(y_test, best_prediction)
        print(f"Best model accuracy on test set: {best_accuracy:.4f}")
        
        best_model_version_name = f"fine_tuned_model_version_{version_config.datetime_suffix}.pkl"
        best_model_version_path = Path(version_config.model_version_dir) / best_model_version_name
        jb.dump(best_model, best_model_version_path)
        print(f"Best model saved to: {best_model_version_path}")
    else:
        print("No fine-tuning needed!")
        best_model = model

    return best_model, X_test_scaled, y_test


## **EVALUATING PIPELINE**

In [7]:
def evaluating(model, X_test_scaled, y_test):
    version_config = VersionConfiguration()
    y_pred = model.predict(X_test_scaled)
    y_pred_prob = model.predict_proba(X_test_scaled)[:, 1]
    precision = precision_score(y_test, y_pred)
    recall = recall_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred)
    roc_auc = roc_auc_score(y_test, y_pred_prob)
    mcc = matthews_corrcoef(y_test, y_pred)
    avg_precision = average_precision_score(y_test, y_pred_prob)
    metrics = {
    "precision": float(precision),
    "recall": float(recall),
    "f1_score": float(f1),
    "roc_auc": float(roc_auc),
    "mcc": float(mcc),
    "avg_precision": float(avg_precision)
}
        
    report = classification_report(y_test, y_pred, output_dict=True)
    metrics["classification_report"] = report
    
    metrics_file_versioned = Path(version_config.evaluation_dir) / f"metrics_{version_config.datetime_suffix}.json"
    with open(metrics_file_versioned, "w") as f:
        json.dump(metrics, f)
    print(f"Metrics saved to: {metrics_file_versioned}")
    
    precision_vals, recall_vals, _ = precision_recall_curve(y_test, y_pred_prob)
    avg_precision = average_precision_score(y_test, y_pred_prob)

    plt.figure(figsize=(8, 6))
    plt.plot(recall_vals, precision_vals, color='purple', lw=2,
            label=f'Precision-Recall curve (AP = {avg_precision:.3f})')
    plt.xlabel('Recall')
    plt.ylabel('Precision')
    plt.title('Precision-Recall Curve')
    plt.legend(loc='lower left')

    pr_path = os.path.join(version_config.evaluation_dir, f"precision_recall_curve_{version_config.datetime_suffix}.png")
    plt.savefig(pr_path)
    plt.close()
    print(f"Precision-Recall curve saved to: {pr_path}")
    print("Creating ROC curve plot")
    
    fpr, tpr, thresholds = roc_curve(y_test, y_pred_prob)
    roc_auc = auc(fpr, tpr)
    
    plt.figure(figsize=(8, 6))
    plt.plot(fpr, tpr, color='blue', lw=2, label=f'ROC curve (AUC = {roc_auc:.3f})')
    plt.plot([0, 1], [0, 1], color='gray', linestyle='--')
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Receiver Operating Characteristic (ROC) Curve')
    plt.legend(loc="lower right")
    
    # Save the plot with datetime naming
    roc_path = os.path.join(version_config.evaluation_dir, f"roc_curve_{version_config.datetime_suffix}.png")
    plt.savefig(roc_path)
    plt.close()
    print(f"ROC curve saved to: {roc_path}")
    return pr_path, roc_path, metrics

## **PUSHING STORAGE PIPELINE**

In [8]:
def upload_to_GCP(filenames=None):
    """Upload multiple files to Google Cloud Storage with proper error handling."""
    try:
        version_config = VersionConfiguration()
        gcp_config = gcpConfig()
        bucket_name = gcp_config.bucket
        workers = gcp_config.workers

        os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = gcp_config.credential
        
        files_to_upload = []
        
        if filenames:
            if isinstance(filenames, (Path, str)):
                filenames = [str(filenames)]
            else:
                filenames = [str(f) for f in filenames]
            
            for filename in filenames:
                for source_dir in [version_config.data_version_dir, version_config.evaluation_dir]:
                    file_path = os.path.join(source_dir, filename)
                    if os.path.exists(file_path):
                        files_to_upload.append((filename, source_dir))
                        break
                else:
                    print(f"File does not exist in any source directory: {filename}")
        else:
            # Upload all files from both directories
            target_dirs = [
                version_config.data_version_dir,
                version_config.evaluation_dir
            ]
            
            for source_dir in target_dirs:
                if os.path.exists(source_dir):
                    print(f"Scanning directory: {source_dir}")
                    data_files = glob.glob(os.path.join(source_dir, "**", "*"), recursive=True)
                    for file_path in data_files:
                        if os.path.isfile(file_path):
                            rel_path = os.path.relpath(file_path, source_dir)
                            files_to_upload.append((rel_path, source_dir))
                else:
                    print(f"Directory does not exist: {source_dir}")
        
        if not files_to_upload:
            print("No files found to upload")
            return 0, 0
        
        print(f"Found {len(files_to_upload)} files to upload")
        
        # Initialize storage client and bucket
        storage_client = Client()
        bucket = storage_client.bucket(bucket_name)

        # Group files by source directory for batch upload
        uploads_by_source = {}
        for rel_path, source_dir in files_to_upload:
            if source_dir not in uploads_by_source:
                uploads_by_source[source_dir] = []
            uploads_by_source[source_dir].append(rel_path)

        total_success = 0
        total_errors = 0

        for source_dir, file_list in uploads_by_source.items():
            print(f"Uploading {len(file_list)} files from {source_dir}")
            
            try:
                results = transfer_manager.upload_many_from_filenames(
                    bucket, 
                    file_list, 
                    source_directory=source_dir,
                    max_workers=workers, 
                    blob_name_prefix="churn_data_store/"
                )

                success_count = 0
                error_count = 0
                
                for name, result in zip(file_list, results):
                    if isinstance(result, Exception):
                        print(f"Failed to upload {name}: {result}")
                        error_count += 1
                    else:
                        print(f"Uploaded {name} to bucket {bucket.name}")
                        success_count += 1
                
                total_success += success_count
                total_errors += error_count
                
            except Exception as e:
                print(f"Failed to upload batch from {source_dir}: {e}")
                total_errors += len(file_list)
        
        print(f"Upload completed: {total_success} successful, {total_errors} failed")
        return total_success, total_errors
                
    except Exception as e:
        print(f"Cloud storage upload failed: {e}")
        raise e

## **CLEANUP PIPELINE**

In [9]:
def cleanup_temp_files():
    """Clean up temporary versioned files after pipeline completion."""

    version_config = VersionConfiguration()
    data_version_dir = version_config.data_version_dir
    evaluation_dir = version_config.evaluation_dir
    try:
        # Pattern: *_version_YYYYMMDDTHHMMSS.csv
        data_version_files = glob.glob(os.path.join(data_version_dir, "*_version_????????T??????.csv"))
        print(f"Found {len(data_version_files)} timestamp-versioned data files to clean")
        
        for file_path in data_version_files:
            try:
                os.remove(file_path)
                print(f"Deleted temporary file: {os.path.basename(file_path)}")
            except Exception as e:
                print(f"Failed to delete file {file_path}: {e}")
        
        # Pattern: *_YYYYMMDDTHHMMSS.json and *_YYYYMMDDTHHMMSS.png
        eval_json_files = glob.glob(os.path.join(evaluation_dir, "*_????????T??????.json"))
        eval_png_files = glob.glob(os.path.join(evaluation_dir, "*_????????T??????.png"))
        eval_files = eval_json_files + eval_png_files
        print(f"Found {len(eval_files)} timestamp-versioned evaluation files to clean")
        
        for file_path in eval_files:
            try:
                os.remove(file_path)
                print(f"Deleted temporary file: {os.path.basename(file_path)}")
            except Exception as e:
                print(f"Failed to delete file {file_path}: {e}")
        
        remaining_data_files = len(glob.glob(os.path.join(data_version_dir, "*.csv")))
        remaining_eval_files = len(glob.glob(os.path.join(evaluation_dir, "*")))
        print(f"Kept {remaining_data_files} essential data files for DVC tracking")
        print(f"Kept {remaining_eval_files} essential evaluation files for DVC tracking")
                
    except Exception as e:
        print(f"Error during cleanup: {e}")

In [10]:
# try:
#     print(f">>>>>> Pipeline Started <<<<<<")
#     df_encode = get_dataset()
#     print(f">>>>>> stage Data Ingestion Stage completed <<<<<<\n\nx==========x")
#     train_feature_path, test_feature_path, train_target_path, test_target_path = split_data(df_encode)
#     print(f">>>>>> stage Data Split Stage completed <<<<<<\n\nx==========x")
#     base_model_version_path, scaled_test_data_path, scaled_train_data_path = prepare_base_model(train_feature_path, test_feature_path)
#     print(f">>>>>> stage Prepare Base Model Stage completed <<<<<<\n\nx==========x")
#     best_model, X_test_scaled, y_test = trainning(base_model_path=base_model_version_path, scaled_train_path=scaled_train_data_path, scaled_test_path=scaled_test_data_path, y_train_path=train_target_path, y_test_path=test_target_path)
#     print(f">>>>>> stage Model Training Stage completed <<<<<<\n\nx==========x")
#     pr_path, roc_path, metrics = evaluating(best_model, X_test_scaled, y_test)
#     print(f">>>>>> stage Model Evaluation Stage completed <<<<<<\n\nx==========x")
#     upload_to_GCP()
#     print(f">>>>>> stage Cloud Storage Upload Stage completed <<<<<<\n\nx==========x")
#     cleanup_temp_files()
#     print(f">>>>>> Pipeline Completed <<<<<<")
# except Exception as e:
#     print(e)
#     raise e

## **KUBEFLOW PIPELINE**

In [None]:
from kfp import dsl
from kfp.dsl import Input, Output, Model, Dataset, Metrics, Artifact
from kfp import compiler
from kfp.client import Client
from typing import NamedTuple
import os
import time

@dsl.pipeline(
    name='churn-prediction-pipeline-gcs',
    description='A pipeline for churn prediction with GCS data source'
)
def churn_prediction_pipeline():

    @dsl.component(
        base_image='python:3.11.9',
        packages_to_install=[
            'pandas==2.0.3',
            'numpy==1.24.3',
            'scikit-learn==1.3.0',
            'imbalanced-learn==0.11.0',
            'joblib==1.3.2',
            'matplotlib==3.7.2',
        ]
        
    )
    def data_ingestion(processed_data: Output[Dataset]):
        import pandas as pd
        import os
        from datetime import datetime
        from pathlib import Path
        from collections import Counter

        class VersionConfiguration:
            def __init__(self):
                self.root_folder = "/tmp/artifacts"
                self.data_version_dir = os.path.join(self.root_folder, "data_version")
                os.makedirs(self.data_version_dir, exist_ok=True)
                self.datetime_suffix = datetime.now().strftime('%Y%m%dT%H%M%S')



        class SupportModel:
            @staticmethod
            def most_common(lst):
                counts = Counter(lst)
                if not counts:
                    return None 
                return counts.most_common(1)[0][0]
            
            @staticmethod
            def get_dummies(df):
                categorical_cols = df.select_dtypes(include=['object']).columns
                if len(categorical_cols) > 0:
                    for col in categorical_cols:
                        if df[col].isin(['yes', 'no', 'True', 'False']).any():
                            df[col] = df[col].map({'yes': 1, 'True': 1, 'no': 0, 'False': 0})
                        else:
                            df = pd.get_dummies(df, columns=[col], prefix=col)
                return df

        def get_dataset():
            version_config = VersionConfiguration()

            try:

                df_input = pd.read_csv("https://raw.githubusercontent.com/Teungtran/churn_mlops/main/artifacts/data_ingestion/input_raw.csv")
                print(f"Loaded dataset with {len(df_input)} rows and columns {list(df_input.columns)}")

            except Exception as e:
                print(f"Error loading data from github: {str(e)}")
                raise
            
            if df_input is None or df_input.empty:
                raise ValueError("Failed to load data from github or data is empty")
            
            input_data_versioned_name = f"input_raw_data_version_{version_config.datetime_suffix}.csv"
            input_data_versioned_path = Path(version_config.data_version_dir) / input_data_versioned_name
            df_input.to_csv(input_data_versioned_path, index=False)
            print(f"Created versioned input data file: {input_data_versioned_path}")
            
            df_input.columns = df_input.columns.map(lambda x: str(x).strip())
            
            cols_to_drop = {"Returns", "Age", "Total Purchase Amount"}
            df_input.drop(columns=[col for col in cols_to_drop if col in df_input.columns], inplace=True)
            df_input.dropna(inplace=True)
            
            if 'Price' not in df_input.columns and 'Product Price' in df_input.columns:
                df_input['Price'] = df_input['Product Price']
            
            required_columns = ['customer_id', 'Purchase Date', 'Product Category', 'Quantity', 'Price', 'Payment Method', 'Customer Name', 'Customer_Labels', 'Churn']
            missing_columns = [col for col in required_columns if col not in df_input.columns]
            
            if missing_columns:
                print(f"Warning: Missing required columns: {missing_columns}")
                print(f"Available columns: {list(df_input.columns)}")
                
                # Try to map common column variations
                column_mappings = {
                    'customer_id': ['Customer ID', 'CustomerId', 'ID'],
                    'Purchase Date': ['PurchaseDate', 'Date', 'purchase_date'],
                    'Product Category': ['ProductCategory', 'Category', 'product_category'],
                    'Customer Name': ['CustomerName', 'Name', 'customer_name'],
                    'Customer_Labels': ['CustomerLabels', 'Labels', 'customer_labels'],
                    'Payment Method': ['PaymentMethod', 'payment_method']
                }
                
                for standard_col, alternatives in column_mappings.items():
                    if standard_col not in df_input.columns:
                        for alt in alternatives:
                            if alt in df_input.columns:
                                df_input[standard_col] = df_input[alt]
                                print(f"Mapped {alt} -> {standard_col}")
                                break
            
            if all(col in df_input.columns for col in ['Quantity', 'Price']):
                df_input['TotalSpent'] = df_input['Quantity'] * df_input['Price']
                
                df_features = df_input.groupby("customer_id", as_index=False, sort=False).agg(
                    LastPurchaseDate=("Purchase Date", "max"),
                    Favoured_Product_Categories=("Product Category", lambda x: SupportModel.most_common(list(x))),
                    Frequency=("Purchase Date", "count"),
                    TotalSpent=("TotalSpent", "sum"),
                    Favoured_Payment_Methods=("Payment Method", lambda x: SupportModel.most_common(list(x))),
                    Customer_Name=("Customer Name", "first"),
                    Customer_Label=("Customer_Labels", "first"),
                    Churn=("Churn", "first"),
                )

                df_features = df_features.drop_duplicates(subset=['Customer_Name'], keep='first')
                df_features['LastPurchaseDate'] = pd.to_datetime(df_features['LastPurchaseDate'])
                max_LastBuyingDate = df_features["LastPurchaseDate"].max()
                df_features['Recency'] = (max_LastBuyingDate - df_features['LastPurchaseDate']).dt.days
                df_features['Avg_Spend_Per_Purchase'] = df_features['TotalSpent'] / df_features['Frequency'].replace(0, 1)
                df_features['Purchase_Consistency'] = df_features['Recency'] / df_features['Frequency'].replace(0, 1)
                
                # Drop unnecessary columns
                df_features.drop(columns=["customer_id", "LastPurchaseDate", 'Customer_Name'], inplace=True)
                df_features.dropna(inplace=True)
                
                print(f"Processed dataset with {len(df_features)} rows")
                print(f"After feature engineering columns: {list(df_features.columns)}")
                
                # Encode categorical variables
                df_encode = SupportModel.get_dummies(df_features)
            else:
                print("Warning: Required columns for feature engineering not found. Using original data.")
                df_encode = SupportModel.get_dummies(df_input)
            
            # Save processed data
            df_encode.to_csv(processed_data.path, index=False)
            print(f"Processed data saved to: {processed_data.path}")
            print(f"Final dataset shape: {df_encode.shape}")
            print(f"Final columns: {list(df_encode.columns)}")
            
            return df_encode

        get_dataset()

    @dsl.component(
        base_image='python:3.11.9',
        packages_to_install=[
            'pandas==2.0.3',
            'numpy==1.24.3',
            'scikit-learn==1.3.0',
            'imbalanced-learn==0.11.0'
        ]
    )
    def data_split(
        processed_data: Input[Dataset],
        X_train: Output[Dataset],
        X_test: Output[Dataset],
        y_train: Output[Dataset],
        y_test: Output[Dataset]
    ):
        import pandas as pd
        from sklearn.model_selection import train_test_split
        from imblearn.combine import SMOTEENN
        from collections import Counter

        def split_data():
            # Load processed data
            df_model = pd.read_csv(processed_data.path)
            print(f"Loaded data with shape: {df_model.shape}")
            print(f"Columns: {list(df_model.columns)}")
            
            # Check if Churn column exists
            if 'Churn' not in df_model.columns:
                print("Warning: 'Churn' column not found. Available columns:")
                print(list(df_model.columns))
                potential_targets = [col for col in df_model.columns if 'churn' in col.lower() or 'target' in col.lower()]
                if potential_targets:
                    target_col = potential_targets[0]
                    print(f"Using '{target_col}' as target column")
                    df_model['Churn'] = df_model[target_col]
                else:
                    raise ValueError("No suitable target column found")
            
            # Separate features and target
            X = df_model.drop('Churn', axis=1)
            y = df_model['Churn']
            
            print(f"X shape (features): {X.shape}")
            print(f"y shape (target): {y.shape}")
            print(f"Target distribution: \n{y.value_counts(normalize=True)}")

            # Handle missing values
            if y.isna().sum() > 0:
                print(f"Found {y.isna().sum()} NaN values in target variable, filling with 0")
                y = y.fillna(0)

            if X.isna().any().any():
                print("Found NaN values in features, filling with 0")
                X = X.fillna(0)

            # Check for class imbalance
            class_distribution = y.value_counts(normalize=True)
            imbalance_threshold = 0.4

            if len(class_distribution) > 1 and class_distribution.min() < imbalance_threshold:
                print("Target variable is imbalanced. Applying SMOTEENN...")
                try:
                    smote = SMOTEENN(random_state=42)
                    X_res, y_res = smote.fit_resample(X, y)
                    print(f"Resampled feature matrix shape: {X_res.shape}")
                    print(f"Resampled target distribution: \n{Counter(y_res)}")
                except Exception as e:
                    print(f"SMOTEENN failed: {e}. Using original data.")
                    X_res, y_res = X, y
            else:
                print("Target variable is balanced or single class. Skipping SMOTEENN.")
                X_res, y_res = X, y
            
            # Split data
            X_train_data, X_test_data, y_train_data, y_test_data = train_test_split(
                X_res, y_res, test_size=0.2, random_state=42, 
                stratify=y_res if len(Counter(y_res)) > 1 else None
            )
            
            print(f"Train data: {X_train_data.shape}, Test data: {X_test_data.shape}")
            
            # Convert to DataFrames if needed
            if hasattr(X_res, 'columns'):
                columns = X_res.columns
            else:
                columns = X.columns
                
            X_train_df = pd.DataFrame(X_train_data, columns=columns)
            X_test_df = pd.DataFrame(X_test_data, columns=columns)
            y_train_df = pd.DataFrame(y_train_data, columns=['Churn'])
            y_test_df = pd.DataFrame(y_test_data, columns=['Churn'])
            
            # Save split data
            X_train_df.to_csv(X_train.path, index=False)
            X_test_df.to_csv(X_test.path, index=False)
            y_train_df.to_csv(y_train.path, index=False)
            y_test_df.to_csv(y_test.path, index=False)
            
            print("Data split completed and saved")

        split_data()

    @dsl.component(
        base_image='python:3.11.9',
        packages_to_install=[
            'pandas==2.0.3',
            'numpy==1.24.3',
            'scikit-learn==1.3.0',
            'joblib==1.3.2'
        ]
    )
    def prepare_model(
        X_train: Input[Dataset],
        X_test: Input[Dataset],
        base_model: Output[Model],
        X_train_scaled: Output[Dataset],
        X_test_scaled: Output[Dataset],
        scaler_artifact: Output[Artifact]
    ):
        import pandas as pd
        import joblib as jb
        from sklearn.preprocessing import RobustScaler
        from sklearn.ensemble import RandomForestClassifier

        def prepare_base_model():
            # Create base model
            model = RandomForestClassifier(
                n_estimators=500, 
                random_state=42,
                criterion="entropy",
                max_depth=39,
                max_features="log2",
                min_samples_leaf=2
            )
            
            # Save base model
            jb.dump(model, base_model.path)
            print(f"Base model saved to: {base_model.path}")
            
            # Load training and test data
            X_train_data = pd.read_csv(X_train.path)
            X_test_data = pd.read_csv(X_test.path)
            
            # Scale data
            scaler = RobustScaler()
            X_train_scaled_data = scaler.fit_transform(X_train_data)
            X_test_scaled_data = scaler.transform(X_test_data)
            
            # Save scaler
            jb.dump(scaler, scaler_artifact.path)
            print(f"Scaler saved to: {scaler_artifact.path}")
            
            # Convert back to DataFrames and save
            X_train_scaled_df = pd.DataFrame(X_train_scaled_data, columns=X_train_data.columns)
            X_test_scaled_df = pd.DataFrame(X_test_scaled_data, columns=X_test_data.columns)
            
            X_train_scaled_df.to_csv(X_train_scaled.path, index=False)
            X_test_scaled_df.to_csv(X_test_scaled.path, index=False)
            
            print("Model preparation completed")

        prepare_base_model()

    @dsl.component(
        base_image='python:3.11.9',
        packages_to_install=[
            'pandas==2.0.3',
            'numpy==1.24.3',
            'scikit-learn==1.3.0',
            'joblib==1.3.2'
        ]
    )
    def model_training(
        base_model: Input[Model],
        X_train_scaled: Input[Dataset],
        X_test_scaled: Input[Dataset],
        y_train: Input[Dataset],
        y_test: Input[Dataset],
        trained_model: Output[Model]
    ):
        import pandas as pd
        import joblib as jb
        from sklearn.ensemble import RandomForestClassifier
        from sklearn.metrics import accuracy_score
        from sklearn.model_selection import RandomizedSearchCV

        def train_model():
            # Load model and data
            model = jb.load(base_model.path)
            X_train_data = pd.read_csv(X_train_scaled.path)
            X_test_data = pd.read_csv(X_test_scaled.path)
            y_train_data = pd.read_csv(y_train.path)['Churn'].astype('int32')
            y_test_data = pd.read_csv(y_test.path)['Churn'].astype('int32')
            
            print(f"Training data shape: {X_train_data.shape}")
            print(f"Test data shape: {X_test_data.shape}")
            
            # Initial training
            model.fit(X_train_data, y_train_data)
            prediction = model.predict(X_test_data)
            accuracy = accuracy_score(y_test_data, prediction)
            print(f"Initial model accuracy: {accuracy:.4f}")
            
            # Fine-tuning if accuracy is low
            if accuracy < 0.85:
                print("Triggering fine-tuning...")
                rf_params = {
                    'n_estimators': [100, 200, 300, 500],
                    'criterion': ['gini', 'entropy'],
                    'max_depth': [None, 10, 20, 30],
                    'min_samples_split': [2, 5, 10],
                    'min_samples_leaf': [1, 2, 4],
                    'max_features': ['sqrt', 'log2'],
                    'bootstrap': [True, False]
                }
                
                random_search = RandomizedSearchCV(
                    model, rf_params, cv=5, n_jobs=-1, n_iter=10, 
                    random_state=42, scoring='accuracy'
                )
                random_search.fit(X_train_data, y_train_data)
                
                best_model = RandomForestClassifier(**random_search.best_params_, random_state=42)
                print(f"Best parameters: {random_search.best_params_}")
                print(f"Best CV score: {random_search.best_score_:.4f}")
                
                # Evaluate best model
                best_prediction = best_model.predict(X_test_data)
                best_accuracy = accuracy_score(y_test_data, best_prediction)
                print(f"Best model test accuracy: {best_accuracy:.4f}")
            else:
                print("No fine-tuning needed!")
                best_model = model
            
            # Save trained model
            jb.dump(best_model, trained_model.path)
            print(f"Trained model saved to: {trained_model.path}")

        train_model()

    @dsl.component(
        base_image='python:3.11.9',
        packages_to_install=[
            'pandas==2.0.3',
            'numpy==1.24.3',
            'scikit-learn==1.3.0',
            'matplotlib==3.7.2'
        ]
    )
    def model_evaluation(
        trained_model: Input[Model],
        X_test_scaled: Input[Dataset],
        y_test: Input[Dataset],
        metrics_output: Output[Metrics],
        pr_curve: Output[Artifact],
        roc_curve: Output[Artifact]
    ):
        import pandas as pd
        import joblib as jb
        import matplotlib.pyplot as plt
        import json
        from sklearn.metrics import (
            accuracy_score, precision_score, recall_score, f1_score,
            classification_report, roc_curve as roc_curve_func, auc,
            precision_recall_curve, average_precision_score,
            matthews_corrcoef, roc_auc_score
        )

        def evaluate_model():
            # Load model and data
            model = jb.load(trained_model.path)
            X_test_data = pd.read_csv(X_test_scaled.path)
            y_test_data = pd.read_csv(y_test.path)['Churn'].astype('int32')
            
            # Make predictions
            y_pred = model.predict(X_test_data)
            y_pred_prob = model.predict_proba(X_test_data)[:, 1]
            
            # Calculate metrics
            accuracy = accuracy_score(y_test_data, y_pred)
            precision = precision_score(y_test_data, y_pred, average='weighted')
            recall = recall_score(y_test_data, y_pred, average='weighted')
            f1 = f1_score(y_test_data, y_pred, average='weighted')
            roc_auc = roc_auc_score(y_test_data, y_pred_prob)
            mcc = matthews_corrcoef(y_test_data, y_pred)
            avg_precision = average_precision_score(y_test_data, y_pred_prob)
            
            metrics = {
                "accuracy": float(accuracy),
                "precision": float(precision),
                "recall": float(recall),
                "f1_score": float(f1),
                "roc_auc": float(roc_auc),
                "mcc": float(mcc),
                "avg_precision": float(avg_precision)
            }
            metrics['classification_report'] = classification_report(y_test_data, y_pred)
            print(f"Model Evaluation Results:")
            for metric, value in metrics.items():
                print(f"  {metric}: {value:.4f}")
            
            # Save metrics
            with open(metrics_output.path, "w") as f:
                json.dump(metrics, f, indent=2)
            
            # Generate Precision-Recall curve
            precision_vals, recall_vals, _ = precision_recall_curve(y_test_data, y_pred_prob)
            
            plt.figure(figsize=(8, 6))
            plt.plot(recall_vals, precision_vals, color='purple', lw=2,
                    label=f'PR curve (AP = {avg_precision:.3f})')
            plt.xlabel('Recall')
            plt.ylabel('Precision')
            plt.title('Precision-Recall Curve')
            plt.legend()
            plt.grid(True)
            plt.savefig(pr_curve.path)
            plt.close()
            
            # Generate ROC curve
            fpr, tpr, _ = roc_curve_func(y_test_data, y_pred_prob)
            
            plt.figure(figsize=(8, 6))
            plt.plot(fpr, tpr, color='blue', lw=2, label=f'ROC curve (AUC = {roc_auc:.3f})')
            plt.plot([0, 1], [0, 1], color='gray', linestyle='--')
            plt.xlim([0.0, 1.0])
            plt.ylim([0.0, 1.05])
            plt.xlabel('False Positive Rate')
            plt.ylabel('True Positive Rate')
            plt.title('ROC Curve')
            plt.legend()
            plt.grid(True)
            plt.savefig(roc_curve.path)
            plt.close()
            
            print("Model evaluation completed")

        evaluate_model()

    # Create tasks
    data_ingestion_task = data_ingestion()
    

    data_split_task = data_split(processed_data=data_ingestion_task.outputs['processed_data'])
    
    model_prep_task = prepare_model(
        X_train=data_split_task.outputs['X_train'],
        X_test=data_split_task.outputs['X_test']
    )
    
    training_task = model_training(
        base_model=model_prep_task.outputs['base_model'],
        X_train_scaled=model_prep_task.outputs['X_train_scaled'],
        X_test_scaled=model_prep_task.outputs['X_test_scaled'],
        y_train=data_split_task.outputs['y_train'],
        y_test=data_split_task.outputs['y_test']
    )
    
    evaluation_task = model_evaluation(
        trained_model=training_task.outputs['trained_model'],
        X_test_scaled=model_prep_task.outputs['X_test_scaled'],
        y_test=data_split_task.outputs['y_test']
    )
    data_split_task.after(data_ingestion_task)
    model_prep_task.after(data_split_task)
    training_task.after(model_prep_task)
    evaluation_task.after(training_task)
    
    
    
def compile_pipeline():
    """Compile the Kubeflow pipeline to YAML"""
    try:
        print("Compiling Kubeflow pipeline with GCS support...")
        compiler_instance = compiler.Compiler()
        pipeline_file = "churn_pipeline_gcs.yaml"
        compiler_instance.compile(
            pipeline_func=churn_prediction_pipeline,
            package_path=pipeline_file
        )
        print(f"Pipeline compiled successfully: {pipeline_file}")
        return True, pipeline_file
    except Exception as e:
        print(f"Error compiling pipeline: {e}")
        return False, None

def deploy_pipeline():
    """Deploy pipeline to Kubeflow with GCS data source"""
    max_retries = 3
    retry_delay = 5

    success, pipeline_file = compile_pipeline()
    if not success:
        return False

    for attempt in range(max_retries):
        try:
            print(f"Deploying pipeline to Kubeflow (attempt {attempt + 1}/{max_retries})...")

            client = Client(host='http://localhost:8080')
            
            run_name = f"churn-prediction-gcs-{int(time.time())}"

            try:
                experiment = client.get_experiment(experiment_name="churn-prediction-gcs-experiments")
            except:
                experiment = client.create_experiment(name="churn-prediction-gcs-experiments")

            # Submit pipeline run
            run = client.create_run_from_pipeline_package(
                pipeline_file=pipeline_file,
                run_name=run_name,
                experiment_name="churn-prediction-gcs-experiments"
            )

            print(f"✅ Pipeline run created successfully: {run.run_id}")
            print(f"📊 Data source: gs://churn_data_version/input_raw.csv")
            print(f"🔗 Monitor progress at: http://localhost:8080/#/runs/details/{run.run_id}")
            
            return True

        except Exception as e:
            print(f"Error deploying pipeline (attempt {attempt + 1}): {e}")
            
            if attempt < max_retries - 1:
                print(f"Retrying in {retry_delay} seconds...")
                time.sleep(retry_delay)
                retry_delay *= 2
            else:
                print("All retry attempts failed")
                return False

    return False

def run_kubeflow_pipeline():
    """Main function to run the Kubeflow pipeline with GCS"""
    print("=" * 70)
    print("STARTING KUBEFLOW PIPELINE WITH GOOGLE CLOUD STORAGE")
    print("=" * 70)
    
    if not deploy_pipeline():
        print("Pipeline deployment failed. Exiting.")
        return False
    
    print("=" * 70)
    print("KUBEFLOW PIPELINE DEPLOYMENT COMPLETED")
    print("Check Kubeflow UI at http://localhost:8080 for execution status")
    print("=" * 70)
    
    return True



In [21]:
run_kubeflow_pipeline()

STARTING KUBEFLOW PIPELINE WITH GOOGLE CLOUD STORAGE
Compiling Kubeflow pipeline with GCS support...
Pipeline compiled successfully: churn_pipeline_gcs.yaml
Deploying pipeline to Kubeflow (attempt 1/3)...
Error deploying pipeline (attempt 1): (504)
Reason: Gateway Timeout
HTTP response headers: HTTPHeaderDict({'X-Powered-By': 'Express', 'Date': 'Thu, 05 Jun 2025 08:44:29 GMT', 'Connection': 'keep-alive', 'Keep-Alive': 'timeout=5', 'Transfer-Encoding': 'chunked'})
HTTP response body: Error occured while trying to proxy to: localhost:8080/apis/v2beta1/experiments?filter=%7B%22predicates%22%3A+%5B%7B%22operation%22%3A+1%2C+%22key%22%3A+%22display_name%22%2C+%22stringValue%22%3A+%22churn-prediction-gcs-experiments%22%7D%5D%7D&namespace=

Retrying in 5 seconds...
Deploying pipeline to Kubeflow (attempt 2/3)...
Error deploying pipeline (attempt 2): (504)
Reason: Gateway Timeout
HTTP response headers: HTTPHeaderDict({'X-Powered-By': 'Express', 'Date': 'Thu, 05 Jun 2025 08:44:51 GMT', 'Connect

False

  import pkg_resources
