In [1]:
import kfp
from kfp import dsl
from kfp.dsl import InputPath, OutputPath, pipeline, component
import logging
import os

# Create Necessary Directories
os.makedirs("model_tracking", exist_ok=True)

# Setup Logging
os.makedirs("logs", exist_ok=True)
logging.basicConfig(filename="logs/pipeline.log",
                    level=logging.INFO,
                    format="%(asctime)s - %(levelname)s - %(message)s")



# DOWNLOAD DATA FROM APIs, SQL and CSV (via Minio)
@component(
    base_image="python:3.11",
    packages_to_install=["pandas==2.2.3", "requests==2.32.3", "minio==7.2.15", "sqlalchemy==2.0.38", "pymysql==1.1.1", "psycopg2==2.9.10"]
)
def data_ingestion(input_csv: str, api_endpoint: str, sql_details: dict, output_csv: OutputPath('Dataset')) -> None:
    import requests
    import pandas as pd
    import psycopg2
    from minio import Minio
    import logging
    import time

    # Create an empty Dataframe
    df = pd.DataFrame()

    # From API
    if api_endpoint:
        try:
            response = requests.get(api_endpoint)
            # response.raise_for_status()
            # Load JSON response into Datafrom
            df_api = pd.DataFrame(response.json())
            if not df_api.empty:
                logging.info("Data from API: %s", df_api.describe())
                # Merge all Dataframes into one
                df = df_api if df.empty else pd.merge(df, df_api, on='key', how='inner')
        except Exception as e:
            logging.error("Error while fetching API data: %s", e)

    # From SQL  
    if sql_details:
        retries = 3
        while retries > 0:
            try:
                # Connect to the PostgreSQL database using psycopg2
                conn = psycopg2.connect(
                    host=sql_details['DB_HOST'],
                    port=sql_details['DB_PORT'],
                    dbname=sql_details['DB_NAME'],
                    user=sql_details['DB_USER'],
                    password=sql_details['DB_PASSWORD']
                )
                cursor = conn.cursor()
    
                # Execute the query and fetch the data into a DataFrame
                cursor.execute(sql_details['query'])
                # Fetch all rows from the executed query
                data = cursor.fetchall()
                # Get column names from the cursor
                colnames = [desc[0] for desc in cursor.description]
                # Create DataFrame from the fetched data
                df_db = pd.DataFrame(data, columns=colnames)
    
                if not df_db.empty:
                    logging.info("Data from Database: %s", df_db.describe())
                    # Merge all Dataframes into one
                    df = df_db if 'df' not in locals() else pd.merge(df, df_db, on='key', how='inner')
    
                # Close the cursor and connection
                cursor.close()
                conn.close()
    
                # If successful, exit the loop
                break
            except Exception as e:
                logging.error("Error while querying Database: %s - Retries left: %d", e, retries - 1)
                retries -= 1
                time.sleep(1)  # wait and retrying

    # From CSV via Minio
    if input_csv:
        try:
            minio_client = Minio(endpoint="192.168.203.181:30900",
                                 access_key="minioadmin",
                                 secret_key="minioadmin",
                                 secure=False)
            minio_client.fget_object(bucket_name="datasets",
                                     object_name=input_csv,
                                     file_path='/tmp/dataset.csv')
            # If the CSV file is downloaded then load it into Datafrom
            df_minio = pd.read_csv('/tmp/dataset.csv')
            if not df_minio.empty:
                logging.info("Data from Minio: %s", df_minio.describe())
                # Merge all Dataframes into one
                df = df_minio if df.empty else pd.merge(df, df_minio, on='key', how='inner')                
        except Exception as e:
            logging.error("Error downloading file from Minio: %s", e)

    if not df.empty:
        df.reset_index(drop=True, inplace=True)
        df.to_csv(output_csv, index=False)
        logging.info("Downloaded and merged data: %s", df.describe())
        
@component(
    base_image="python:3.11",
    packages_to_install=[
        "pandas==2.2.3",           # Downgrade pandas to a compatible version
        "matplotlib==3.10.0",
        "seaborn==0.13.2",
        "ydata-profiling==4.12.2"   # Adjust this to the latest compatible version
    ]
)
def data_eda(input_csv: InputPath('Dataset'), eda_report: OutputPath('HTML'), plot_path: OutputPath('Plot')) -> None:
    import pandas as pd
    import matplotlib.pyplot as plt
    import seaborn as sns
    from ydata_profiling import ProfileReport
    
    df = pd.read_csv(input_csv)
    
    # Generate EDA report
    profile = ProfileReport(df, title='EDA Report')
    profile.to_file(eda_report)
    
    # Example visualization
    plt.figure(figsize=(10, 6))
    sns.countplot(data=df, x='Churn')
    plt.title('Churn Distribution')
    plt.savefig(plot_path)
    plt.close()

# Note: "pandas-profiling" version needs to be compatible with current pydantic. You might need to adjust versions if issues persist.
    
@component(
    base_image="python:3.11",
    packages_to_install=["pandas==2.2.3", "scikit-learn==1.6.1", "imblearn==0.0"]
)
def data_processing(input_csv: InputPath('Dataset'), processed_X: OutputPath('Dataset'), processed_y: OutputPath('Dataset')) -> None:
    import pandas as pd
    from sklearn.impute import SimpleImputer
    from sklearn.preprocessing import StandardScaler
    from imblearn.over_sampling import SMOTE
    import numpy as np
    import logging

    df = pd.read_csv(input_csv)

    # Data Imputation and Cleaning
    num_imputer = SimpleImputer(strategy='median')
    cat_imputer = SimpleImputer(strategy='most_frequent')

    df.loc[:, df.select_dtypes(include='number').columns] = num_imputer.fit_transform(df.select_dtypes(include='number'))
    df.loc[:, df.select_dtypes(include='object').columns] = cat_imputer.fit_transform(df.select_dtypes(include='object'))

    # Encode 'Churn' column
    if df['Churn'].isnull().any():
        df['Churn'].fillna(df['Churn'].mode()[0], inplace=True)
    df['Churn'] = df['Churn'].map({'Yes': 1, 'No': 0})

    # Feature Engineering
    if 'customer_id' in df.columns:
        df = df.drop(columns=['customer_id'])

    X = df.drop(columns='Churn')
    y = df['Churn']

    # Verify no NaN values in the target variable
    if y.isnull().any():
        logging.error("Target variable y contains NaN values after imputation.")
        return

    # Apply SMOTE for balancing
    smote = SMOTE(random_state=42)
    X_resampled, y_resampled = smote.fit_resample(X, y)

    pd.DataFrame(X_resampled).to_csv(processed_X, index=False)
    pd.Series(y_resampled).to_csv(processed_y, index=False)
    logging.info("Processed data saved with shapes: X: %s, y: %s", X_resampled.shape, y_resampled.shape)
    
# Model Training Component
@component(
    base_image="python:3.11",
    packages_to_install=["pandas==2.2.3", "scikit-learn==1.6.1", "joblib==1.4.2", "mlflow==2.11.0"]
)
def model_training(processed_X: InputPath('Dataset'), processed_y: InputPath('Dataset'), 
                   knn_model: OutputPath('Dataset'), lg_model: OutputPath('Dataset'), svm_model: OutputPath('Dataset')) -> None:
    import pandas as pd
    from sklearn.model_selection import train_test_split
    from sklearn.neighbors import KNeighborsClassifier    
    from sklearn.linear_model import LogisticRegression
    from sklearn.svm import SVC
    from sklearn.metrics import accuracy_score
    import joblib
    import mlflow
    import mlflow.sklearn

    # Start MLflow tracking
    if mlflow.active_run():
        mlflow.end_run()

    mlflow.start_run()

    X_processed = pd.read_csv(processed_X)
    y_processed = pd.read_csv(processed_y)

    X_train, X_test, y_train, y_test = train_test_split(X_processed, y_processed, test_size=0.2, random_state=55)

    # KNN Model
    knn = KNeighborsClassifier(n_neighbors=3)
    knn.fit(X_train, y_train)
    y_pred_knn = knn.predict(X_test)
    knn_accuracy = accuracy_score(y_test, y_pred_knn)
    logging.info("KNN Accuracy: %.2f", knn_accuracy * 100)
    joblib.dump(knn, knn_model)
    mlflow.log_metric("KNN Accuracy", knn_accuracy)
    mlflow.sklearn.log_model(knn, "knn_model")

    # Logistic Regression
    lg = LogisticRegression(max_iter=1000, random_state=42)
    lg.fit(X_train, y_train)
    y_pred_lg = lg.predict(X_test)
    lg_accuracy = accuracy_score(y_test, y_pred_lg)
    logging.info("Logistic Regression Accuracy: %.2f", lg_accuracy * 100)
    joblib.dump(lg, lg_model)
    mlflow.log_metric("Logistic Regression Accuracy", lg_accuracy)
    mlflow.sklearn.log_model(lg, "lg_model")

    # SVM
    svm = SVC()
    svm.fit(X_train, y_train)
    y_pred_svm = svm.predict(X_test)
    svm_accuracy = accuracy_score(y_test, y_pred_svm)
    logging.info("SVM Accuracy: %.2f", svm_accuracy * 100)
    joblib.dump(svm, svm_model)
    mlflow.log_metric("SVM Accuracy", svm_accuracy)
    mlflow.sklearn.log_model(svm, "svm_model")

    # End MLflow run
    mlflow.end_run()

# Define the pipeline
@pipeline(
    name='Customer Churn Prediction Pipeline',
    description='A pipeline to perform customer churn prediction.'
)
def churn_prediction_pipeline(input_csv: str, api_endpoint: str, sql_details: dict):
    # Step 1 - Data Ingestion
    ingest = data_ingestion(input_csv=input_csv, api_endpoint=api_endpoint, sql_details=sql_details)
    print("Step1 => Ingested: ", ingest.outputs['output_csv'])

    # Step 2 - EDA
    eda = data_eda(input_csv=ingest.outputs['output_csv']) 
    print("Step2 => EDA Report : ", eda.outputs['eda_report'], "\n\t Plot Path: ", eda.outputs['plot_path'])
    
    # Step 3 - Data Processing
    process = data_processing(input_csv=ingest.outputs['output_csv'])
    print("Step2 => Processed X: ", process.outputs['processed_X'], "\n\t Processed y: ",  process.outputs['processed_y'])
    
    # Step 4 - Model Training
    train = model_training(processed_X=process.outputs['processed_X'], processed_y=process.outputs['processed_y'])
    print("Step3 => Trained Model KNN: ", train.outputs['knn_model'], "\n\t Trained Model LG: ",  
          train.outputs['lg_model'], "\n\t Trained Model SVM: ",  train.outputs['svm_model'] )
    
# Compile and run the pipeline
if __name__ == '__main__':
    # Compile the pipeline into a package
    kfp.compiler.Compiler().compile(churn_prediction_pipeline, 'churn_prediction_pipeline.yaml')
    
    # Connect to Kubeflow Pipelines and execute the pipeline
    client = kfp.Client()
    
    # Define API endpoint
    api_endpoint = ''  

    # CSV file
    input_csv = 'customer_churn_dataset-testing-copy.csv'
    
    # Placeholder for SQL details
    sql_params = {'DB_HOST': '192.168.203.181', 'DB_PORT': '30543', 'DB_NAME': 'fin-db', 'DB_USER': 'app',
                  'DB_PASSWORD': 'TOwVvKU9yVsFj4xkaoLoEpKwmGso5GHkMLh9RRO32ma0xMNhKBR2THGUlwg68Yxd', 
                  'query': 'SELECT * FROM accounts LIMIT 100'}
    client.create_run_from_pipeline_func(churn_prediction_pipeline,
                                         arguments={'input_csv':input_csv,
                                                    'api_endpoint':api_endpoint,
                                                    'sql_details': sql_params},
                                         enable_caching=False)

Step1 => Ingested:  {{channel:task=data-ingestion;name=output_csv;type=system.Dataset@0.0.1;}}
Step2 => EDA Report :  {{channel:task=data-eda;name=eda_report;type=system.HTML@0.0.1;}} 
	 Plot Path:  {{channel:task=data-eda;name=plot_path;type=system.Artifact@0.0.1;}}
Step2 => Processed X:  {{channel:task=data-processing;name=processed_X;type=system.Dataset@0.0.1;}} 
	 Processed y:  {{channel:task=data-processing;name=processed_y;type=system.Dataset@0.0.1;}}
Step3 => Trained Model KNN:  {{channel:task=model-training;name=knn_model;type=system.Dataset@0.0.1;}} 
	 Trained Model LG:  {{channel:task=model-training;name=lg_model;type=system.Dataset@0.0.1;}} 
	 Trained Model SVM:  {{channel:task=model-training;name=svm_model;type=system.Dataset@0.0.1;}}


