In [17]:
from sqlalchemy import create_engine
import pandas as pd

#  Test Connection Function
def test_db_connection():
    try:
        engine = create_engine(
            "postgresql://pgadmin:MyPass06@ml-pipeline-pg-server.postgres.database.azure.com:5432/ml_pipeline_db"
        )
        connection = engine.connect()
        print(" Connection Successful!")
        
        #  Test Query
        # query1 = "SELECT table_name FROM information_schema.tables WHERE table_schema='public';"
        query = "SELECT * FROM heart_disease_data;"
        tables = pd.read_sql(query, connection)
        print("📝 Available Tables:", tables)
        
        connection.close()
    except Exception as e:
        print(" Connection Failed:", e)

# 🔥 Run the Test
test_db_connection()

 Connection Successful!
📝 Available Tables:      age  sex  cp  trestbps  chol  fbs  restecg  thalach  exang  oldpeak  \
0     63    1   3       145   233    1        0      150      0      2.3   
1     37    1   2       130   250    0        1      187      0      3.5   
2     41    0   1       130   204    0        0      172      0      1.4   
3     56    1   1       120   236    0        1      178      0      0.8   
4     57    0   0       120   354    0        1      163      1      0.6   
..   ...  ...  ..       ...   ...  ...      ...      ...    ...      ...   
298   57    0   0       140   241    0        1      123      1      0.2   
299   45    1   3       110   264    0        1      132      0      1.2   
300   68    1   0       144   193    1        1      141      0      3.4   
301   57    1   0       130   131    0        1      115      1      1.2   
302   57    0   1       130   236    0        0      174      0      0.0   

     slope  ca  thal  target  
0        0  

In [18]:
from kfp.dsl import component, pipeline, InputPath, OutputPath, Condition
import kfp.compiler as compiler

In [19]:
# -------------------------------------------------
# COMPONENT 1: Fetch Heart Disease Data from Azure and Split
# -------------------------------------------------
from kfp.v2.dsl import component, OutputPath

@component(
    base_image="python:3.9",
    packages_to_install=["pandas", "sqlalchemy", "psycopg2"]
)
def fetch_heart_data_from_azure(eval_path: OutputPath("Dataset"), train_path: OutputPath("Dataset")):
    import pandas as pd
    import numpy as np
    from sqlalchemy import create_engine

    try:
        engine = create_engine(
            "postgresql://pgadmin:MyPass06@ml-pipeline-pg-server.postgres.database.azure.com:5432/ml_pipeline_db"
        )
        df = pd.read_sql("SELECT * FROM heart_disease_data", engine)
        print("Fetched data from Azure PostgreSQL")
    except Exception as e:
        print("Azure DB fetch failed, generating synthetic fallback data.", e)
        np.random.seed(42)
        df = pd.DataFrame({
            "age": np.random.randint(29, 77, 303),
            "sex": np.random.choice([0, 1], 303),
            "cp": np.random.randint(0, 4, 303),
            "trestbps": np.random.randint(94, 200, 303),
            "chol": np.random.randint(126, 564, 303),
            "fbs": np.random.choice([0, 1], 303),
            "restecg": np.random.randint(0, 2, 303),
            "thalach": np.random.randint(71, 202, 303),
            "exang": np.random.choice([0, 1], 303),
            "oldpeak": np.round(np.random.uniform(0.0, 6.2, 303), 1),
            "slope": np.random.randint(0, 3, 303),
            "ca": np.random.randint(0, 5, 303),
            "thal": np.random.randint(0, 4, 303),
            "target": np.random.choice([0, 1], 303)
        })

    # Save first 100 rows as eval, full as train
    df.head(100).to_csv(eval_path, index=False)
    df.to_csv(train_path, index=False)
    print("Saved 100-row eval set and full train set")


In [20]:
# -------------------------------------------------
# COMPONENT 2: Check if Heart Disease Model Already Exists in MinIO
# -------------------------------------------------
from kfp.v2.dsl import component, OutputPath

@component(
    base_image="python:3.9",
    packages_to_install=["boto3"]
)
def check_heart_model_exists(status: OutputPath(str)):
    import boto3

    s3 = boto3.client(
        "s3",
        endpoint_url="http://10.97.217.252:9000",
        aws_access_key_id="minio",
        aws_secret_access_key="minio123"
    )

    response = s3.list_objects_v2(Bucket="mlpipeline", Prefix="models/")
    print("Files in MinIO 'models/' bucket:")
    for obj in response.get("Contents", []):
        print(" -", obj["Key"])

    try:
        s3.head_object(Bucket="mlpipeline", Key="models/heart_model.pkl")
        result = "exists"
        print("Model already exists in MinIO.")
    except Exception as e:
        result = "first_run"
        print("Model not found. First run detected.")

    with open(status, "w") as f:
        f.write(result)


In [21]:
# -------------------------------------------------
# COMPONENT 3: Evaluate Heart Disease Model from MinIO
# -------------------------------------------------
from kfp.v2.dsl import component, InputPath, OutputPath

@component(
    base_image="python:3.9",
    packages_to_install=["pandas", "scikit-learn", "joblib", "boto3"]
)
def evaluate_heart_model(eval_path: InputPath("Dataset"), result: OutputPath(str)):
    import pandas as pd
    import joblib
    import boto3
    from sklearn.metrics import accuracy_score

    # MinIO client setup
    s3 = boto3.client(
        "s3",
        endpoint_url="http://10.97.217.252:9000",
        aws_access_key_id="minio",
        aws_secret_access_key="minio123"
    )

    # Download model from MinIO
    s3.download_file("mlpipeline", "models/heart_model.pkl", "heart_model.pkl")
    print("Model downloaded from MinIO")

    # Load eval dataset and model
    df = pd.read_csv(eval_path)
    X = df.drop("target", axis=1)  # For heart disease, label column is 'target'
    y = df["target"]

    model = joblib.load("heart_model.pkl")
    preds = model.predict(X)
    acc = accuracy_score(y, preds)

    print(f"Eval Accuracy: {acc:.2f}")

    # Write evaluation result
    with open(result, "w") as f:
        f.write("good" if acc >= 0.85 else "bad")


In [22]:
from kfp.v2.dsl import component, InputPath
@component(
    base_image="python:3.9",
    packages_to_install=["pandas", "scikit-learn", "joblib", "boto3", "prometheus_client"]
)
def train_heart_model(train_path: InputPath("Dataset")):
    import pandas as pd
    import joblib
    import boto3
    from sklearn.ensemble import RandomForestClassifier
    from sklearn.model_selection import train_test_split
    from sklearn.metrics import accuracy_score
    from prometheus_client import Gauge, start_http_server
    import threading
    import time

    # Start Prometheus server in a thread (non-blocking)
    def start_prometheus():
        start_http_server(8000)
        while True:
            time.sleep(1000)  # keep it alive

    threading.Thread(target=start_prometheus, daemon=True).start()

    # Train the model
    df = pd.read_csv(train_path)
    X = df.drop("target", axis=1)
    y = df["target"]
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2)

    model = RandomForestClassifier(random_state=42)
    model.fit(X_train, y_train)
    acc = accuracy_score(y_test, model.predict(X_test))
    print(f"Training complete - Accuracy: {acc:.2f}")

    # Save model locally
    joblib.dump(model, "model.pkl")

    # Export metric to Prometheus
    acc_metric = Gauge('model_accuracy', 'Accuracy of the trained model')
    acc_metric.set(acc)

    # Upload model to MinIO
    s3 = boto3.client(
        "s3",
        endpoint_url="http://10.97.217.252:9000",
        aws_access_key_id="minio",
        aws_secret_access_key="minio123"
    )
    s3.upload_file("model.pkl", "mlpipeline", "models/model.pkl")
    print("Model uploaded to MinIO")
    # Save to local
    with open("accuracy.txt", "w") as f:
        f.write(str(acc))

    # Upload to MinIO
    s3.upload_file("accuracy.txt", "mlpipeline", "metrics/accuracy.txt")
    print("Uploaded accuracy.txt to MinIO (mlpipeline/metrics/accuracy.txt)")

In [23]:
from kfp import dsl

@dsl.pipeline(
    name="heart-disease-auto-retraining-pipeline",
    description="Retrains the heart disease model if accuracy drops on new data"
)
def retraining_pipeline():
    # Step 1: Fetch data from Azure PostgreSQL and split into eval/train
    data = fetch_heart_data_from_azure()

    # Step 2: Check if model already exists in MinIO
    model_status_op = check_heart_model_exists()

    # Step 3a: If no model exists, train the model
    with dsl.If(model_status_op.outputs["status"] == "first_run"):
        train_heart_model(train_path=data.outputs["train_path"])

    # Step 3b: If model exists, evaluate it on new data
    with dsl.If(model_status_op.outputs["status"] == "exists"):
        evaluation_op = evaluate_heart_model(eval_path=data.outputs["eval_path"])

        # Step 4: Retrain if evaluation result is bad (accuracy < threshold)
        with dsl.If(evaluation_op.outputs["result"] == "bad"):
            train_heart_model(train_path=data.outputs["train_path"])


In [24]:
# -------------------------------------------------
# COMPILE THE PIPELINE
# -------------------------------------------------
if __name__ == '__main__':
    compiler.Compiler().compile(
        pipeline_func=retraining_pipeline,
        package_path="project_pipeline_v6.yaml"
    )
