In [5]:
import kfp
from kfp.dsl import pipeline, component
from datetime import datetime
from typing import NamedTuple

@component(    base_image='bitnami/spark:3.5',  # Spark-compatible base image
    packages_to_install=['pandas','pyarrow','s3fs','boto3','psycopg2-binary','trino'])

def fetch_data(minio_bucket: str) -> str:
    import boto3
    import os
    from datetime import datetime
    
    # Initialize MinIO client using boto3
    minio_client = boto3.client(
        's3',
        endpoint_url="http://192.168.80.155:32000",
        aws_access_key_id="admin",
        aws_secret_access_key="dlyticaD123"
    )
    # from fastapi import FastAPI
    from trino.dbapi import connect
    # from trino.auth import BasicAuthentication
    # import json
    import csv
    
    # Trino Connection Details
    TRINO_HOST = "192.168.80.155"
    TRINO_PORT = "30686"
    TRINO_USER = "admin"
    # TRINO_USER = "your_username"
    # TRINO_PASSWORD = "admin123"
    TRINO_CATALOG = "iceberg"
    TRINO_SCHEMA = "gold"
    
    # Your SQL Query (put the full query here)
    SQL_QUERY = """ 
         WITH debit_credit_summary AS (
    SELECT
        a.cif_id,
        a.nepali_fiscal_year,
        g.schm_type,
        SUM(total_debit_tran_vol) AS dr_amount_count,
        SUM(total_credit_tran_vol) AS cr_amount_count,
        COUNT(DISTINCT CASE WHEN total_debit_tran_vol > 0 THEN a.nepali_month END) AS dr_month_count,
        COUNT(DISTINCT CASE WHEN total_credit_tran_vol > 0 THEN a.nepali_month END) AS cr_month_count,
        SUM(total_debit_tran_count) AS total_debit_transaction,
        SUM(total_credit_tran_count) AS total_credit_transaction,
        CASE
            WHEN COUNT(DISTINCT CASE WHEN total_debit_tran_count > 0 THEN a.nepali_month END) > 0 THEN
                SUM(total_debit_tran_count) / COUNT(DISTINCT CASE WHEN total_debit_tran_count > 0 THEN a.nepali_month END)
        END AS average_debit_transaction_count,
        CASE
            WHEN COUNT(DISTINCT CASE WHEN total_credit_tran_count > 0 THEN a.nepali_month END) > 0 THEN
                SUM(total_credit_tran_count) / COUNT(DISTINCT CASE WHEN total_credit_tran_count > 0 THEN a.nepali_month END)
        END AS average_credit_transaction_count,
        CASE
            WHEN COUNT(DISTINCT CASE WHEN total_debit_tran_vol > 0 THEN a.nepali_month END) > 0 THEN
                SUM(total_debit_tran_vol) / COUNT(DISTINCT CASE WHEN total_debit_tran_vol > 0 THEN a.nepali_month END)
            ELSE 0
        END AS dr_average,
        CASE
            WHEN COUNT(DISTINCT CASE WHEN total_credit_tran_vol > 0 THEN a.nepali_month END) > 0 THEN
                SUM(total_credit_tran_vol) / COUNT(DISTINCT CASE WHEN total_credit_tran_vol > 0 THEN a.nepali_month END)
            ELSE 0
        END AS cr_average,
        (CASE
            WHEN COUNT(DISTINCT CASE WHEN total_credit_tran_vol > 0 THEN a.nepali_month END) > 0 THEN
                SUM(total_credit_tran_vol) / COUNT(DISTINCT CASE WHEN total_credit_tran_vol > 0 THEN a.nepali_month END)
            ELSE 0
        END) - (CASE
            WHEN COUNT(DISTINCT CASE WHEN total_debit_tran_vol > 0 THEN a.nepali_month END) > 0 THEN
                SUM(total_debit_tran_vol) / COUNT(DISTINCT CASE WHEN total_debit_tran_vol > 0 THEN a.nepali_month END)
            ELSE 0
        END) AS average_yearly_saving,
        SUM(total_credit_tran_vol) - SUM(total_debit_tran_vol) AS total_yearly_saving
 
    FROM
        gold.mv_fact_deposit_account_insights AS a
    JOIN
        gold.dim_gam AS g ON g.cif_id = a.cif_id
    WHERE
        g.schm_type = 'SBA' AND g.schm_type != 'TDA'
    GROUP BY
        a.cif_id, a.nepali_fiscal_year, g.schm_type
    HAVING
        a.nepali_fiscal_year = '2081/2082'
    ),
    
    customer_info AS (
    SELECT
        cif_id,
        CAST(SUBSTR(cust_dob, 1, 4) AS INTEGER) AS birth_year,
        EXTRACT(YEAR FROM CURRENT_DATE) - CAST(SUBSTR(cust_dob, 1, 4) AS INTEGER) AS age,
        CASE 
            WHEN CAST(SUBSTR(cust_dob, 1, 4) AS INTEGER) BETWEEN 1946 AND 1964 THEN 'Baby Boomers'
            WHEN CAST(SUBSTR(cust_dob, 1, 4) AS INTEGER) BETWEEN 1965 AND 1980 THEN 'Gen X'
            WHEN CAST(SUBSTR(cust_dob, 1, 4) AS INTEGER) BETWEEN 1981 AND 1996 THEN 'Gen Y'
            WHEN CAST(SUBSTR(cust_dob, 1, 4) AS INTEGER) BETWEEN 1997 AND 2012 THEN 'Gen Z'
            ELSE 'Minor' 
        END AS age_group,
        employment_status,
        marital_status,
        occupation,
        salary_per_month,
        gender
    FROM
        gold.dim_customers
    WHERE
        cust_type = 'INDIVIDUAL'
    ),
    
    salary_range AS (
    SELECT
        cif_id,
        CASE
            WHEN salary_per_month IS NULL THEN NULL
            WHEN salary_per_month < 10000 THEN 'low salary'
            WHEN salary_per_month >= 10000 AND salary_per_month < 30000 THEN 'moderate salary'
            ELSE 'high salary'
        END AS salary_range
    FROM
        gold.dim_customers
    ),
    
    withdrawal_amount_trends AS (
    SELECT
        cif_id,
        SUM(total_credit_tran_vol) AS total_credit_tran_vol,
        CASE
            WHEN SUM(total_credit_tran_vol) < 0 THEN 'Negative Withdrawal'
            WHEN SUM(total_credit_tran_vol) BETWEEN 0 AND 100000 THEN 'Low Withdrawal'
            WHEN SUM(total_credit_tran_vol) BETWEEN 100000 AND 1000000 THEN 'Moderate Withdrawal'
            ELSE 'High Withdrawal'
        END AS withdrawal_trends
    FROM
        gold.mv_fact_deposit_account_insights
    GROUP BY
        cif_id
    ),
    
    account_details AS (
    SELECT
        cif_id,
        COUNT(foracid) AS total_accounts,
        COUNT(CASE WHEN schm_type = 'SBA' THEN foracid END) AS total_saving_accounts,
        COUNT(CASE WHEN schm_type = 'TDA' THEN foracid END) AS total_fixed_accounts,
        COUNT(CASE WHEN schm_type = 'LAA' THEN foracid END) AS total_loan_accounts,
        COUNT(CASE WHEN schm_type = 'ODA' THEN foracid END) AS total_overdraft_accounts
    FROM
        gold.dim_gam
    GROUP BY
        cif_id
    ),
    
    dormant_table AS (
    SELECT 
        cif_id,
        MAX(CAST(SUBSTR(dormant_date, 1, 10) AS DATE)) AS last_dormant_date,
        SUM(DATE_DIFF('day', CAST(SUBSTR(dormant_date, 1, 10) AS DATE), CURRENT_DATE)) AS total_dormant_days,
        COUNT(CASE WHEN dormant_date IS NOT NULL THEN foracid END) AS total_dormant_account,
        COUNT(CASE WHEN dormant_date IS NULL THEN foracid END) AS total_active_account
    FROM 
        gold.dim_deposit_accounts 
    GROUP BY 
        cif_id
    ),
    
    first_ac_open_last_ac_open AS (
    SELECT
        cif_id,
        MAX(date_diff('day', acct_opn_date_date_part, CURRENT_DATE)) AS first_account_opened_days,
        MIN(date_diff('day', acct_opn_date_date_part, CURRENT_DATE)) AS last_account_opened_days,
        MIN(acct_opn_date_date_part) AS first_account_date,
        MAX(acct_opn_date_date_part) AS last_account_date
    FROM
        gold.dim_gam
    GROUP BY
        cif_id
    ),
    
    recently_active_status AS (
    SELECT
        cif_id,
        MAX(TRY_CAST(SUBSTR(last_customer_induced_transaction_date, 1, 10) AS DATE)) AS last_transaction_date,
        MAX(TRY_CAST(SUBSTR(dormant_date, 1, 10) AS DATE)) AS last_dormant_date,
        CASE
            WHEN MAX(TRY_CAST(SUBSTR(dormant_date, 1, 10) AS DATE)) < MAX(TRY_CAST(SUBSTR(last_customer_induced_transaction_date, 1, 10) AS DATE)) THEN 'Y'
            ELSE 'N'
        END AS Recently_Active
    FROM
        gold.dim_deposit_accounts
    GROUP BY
        cif_id
    )
    
    SELECT DISTINCT
    dcs.cif_id,
    a.total_accounts,
    a.total_saving_accounts,
    a.total_fixed_accounts,
    a.total_loan_accounts,
    a.total_overdraft_accounts,
    dcs.schm_type AS scheme_type,
    dcs.total_debit_transaction AS total_debit_transaction_count,
    dcs.total_credit_transaction AS total_credit_transaction_count,
    dcs.average_debit_transaction_count,
    dcs.average_credit_transaction_count,
    dcs.dr_amount_count AS total_debit_transaction_amount,
    dcs.cr_amount_count AS total_credit_transaction_amount,
    dcs.dr_average AS average_debit_amount,
    dcs.cr_average AS average_credit_amount,
    dcs.average_yearly_saving,
    dcs.total_yearly_saving,
    ci.employment_status,
    ci.marital_status,
    ci.occupation,
    ci.gender,
    ci.age_group,
    -- sr.salary_range,
    wat.withdrawal_trends,
    ad.total_dormant_days,
    ad.total_dormant_account,
    ad.total_active_account,
    fa.first_account_opened_days,
    fa.last_account_opened_days,
    fa.first_account_date,
    fa.last_account_date,
    ra.last_transaction_date,
    ra.last_dormant_date,
    ra.Recently_Active
    FROM
    debit_credit_summary dcs
    JOIN customer_info ci ON dcs.cif_id = ci.cif_id
    JOIN withdrawal_amount_trends wat ON dcs.cif_id = wat.cif_id
    JOIN salary_range sr ON dcs.cif_id = sr.cif_id
    JOIN account_details a ON dcs.cif_id = a.cif_id
    LEFT JOIN dormant_table ad ON dcs.cif_id = ad.cif_id
    LEFT JOIN first_ac_open_last_ac_open fa ON dcs.cif_id = fa.cif_id
    LEFT JOIN recently_active_status ra ON dcs.cif_id = ra.cif_id
    ORDER BY dcs.cif_id
    """
    try:
            print("Connecting to Trino...")
            conn = connect(
                host=TRINO_HOST,
                port=TRINO_PORT,
                user=TRINO_USER,
                # auth=BasicAuthentication(TRINO_USER, TRINO_PASSWORD), 
                catalog=TRINO_CATALOG,
                schema=TRINO_SCHEMA
                # http_scheme="https"
            )
            cursor = conn.cursor()
            
            print("Executing query...")
            cursor.execute(SQL_QUERY)
    
            # Fetch data
            rows = cursor.fetchall()
            columns = [desc[0] for desc in cursor.description]
    
            conn.close()
            print("Query executed successfully.")
    
            # Convert to JSON
            results = [dict(zip(columns, row)) for row in rows]
    
            # Save JSON file
              # Save to CSV file
            output_file = "trino_data.csv"
            with open(output_file, "w", newline="") as f:
                writer = csv.writer(f)
                writer.writerow(columns)  # Write header
                writer.writerows(rows)    # Write data rows
    
            print(f"Data saved to {output_file}")
            # return output_file  # Return the file path
    
    except Exception as e:
            print(f"Error: {str(e)}")
            return None
    current_date = datetime.now().strftime("%Y-%m-%d")
    saving_file_name = f"data/AI360FDREC_raw_data_{current_date}.csv"
    os.makedirs(saving_file_name, exist_ok=True)
    minio_client.upload_file(output_file, minio_bucket, saving_file_name)

    print("CSV file uploaded to MinIO successfully.")
    return saving_file_name
    
@component(
    base_image='quay.io/datanature_dev/jupyternotebook:java_home14',
    packages_to_install=[
        'pyspark==3.5.0',
        'pandas==2.0.3',
        'numpy==1.24.4',
        'boto3==1.28.57',
        'scikit-learn==1.3.0',
        'matplotlib==3.7.2',
        'pyarrow==12.0.1',
        'urllib3==2.0.4'
    ]
)
def feature_engineering_and_segmentation(
    file_path: str, 
    minio_bucket: str
) -> NamedTuple('Outputs', [
    ('segmentation_path', str),
    ('model_path', str),
    ('cluster_plot_path', str),
    ('cluster_interpretations', str),
    ('final_output_path', str)
]):
    # First ensure numpy is imported with the correct version
    import numpy as np
    np.__version__  # This helps catch version issues early
    
    # Then import other packages
    import boto3
    from botocore.config import Config
    import pandas as pd
    import matplotlib
    matplotlib.use('Agg')  # Set non-interactive backend
    import matplotlib.pyplot as plt
    from datetime import datetime
    import os
    import json
    import tempfile
    import time
    from collections import namedtuple
    from io import StringIO
    import shutil
    
    # Now import pyspark components
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, when, mean, monotonically_increasing_id, stddev, count, abs
    from pyspark.sql.types import DoubleType, StringType
    from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA
    from pyspark.ml.clustering import BisectingKMeans
    from pyspark.ml.evaluation import ClusteringEvaluator
    from pyspark.sql.functions import udf

    def calculate_feature_distinctiveness(cluster_profiles):
        """Enhanced feature distinctiveness calculation"""
        feature_variance = cluster_profiles.std()
        distinctiveness = feature_variance / (feature_variance.max() + 1e-10)
        return distinctiveness

    def interpret_clusters(cluster_profiles):
        interpretations = {}
        distinctiveness = calculate_feature_distinctiveness(cluster_profiles)
        
        for cluster in cluster_profiles.index:
            features = cluster_profiles.columns.tolist()
            profile_components = []
            
            if 'purchases' in features:
                spend_level = cluster_profiles.loc[cluster, 'purchases']
                mean_spend = cluster_profiles['purchases'].mean()
                std_spend = cluster_profiles['purchases'].std()
                
                if spend_level > mean_spend + 1.5 * std_spend:
                    profile_components.append("High Spender")
                elif spend_level < mean_spend - 1.5 * std_spend:
                    profile_components.append("Low Spender")
                else:
                    profile_components.append("Moderate Spender")
            
            if 'cash_advance' in features and 'cash_advance_frequency' in features:
                cash_advance = cluster_profiles.loc[cluster, 'cash_advance']
                cash_freq = cluster_profiles.loc[cluster, 'cash_advance_frequency']
                mean_cash = cluster_profiles['cash_advance'].mean()
                mean_freq = cluster_profiles['cash_advance_frequency'].mean()
                
                if cash_advance > mean_cash * 2 and cash_freq > mean_freq * 2:
                    profile_components.append("Intensive Cash Advance User")
                elif cash_advance < mean_cash * 0.5 and cash_freq < mean_freq * 0.5:
                    profile_components.append("Rare Cash Advance User")
            
            if 'balance' in features and 'credit_limit' in features:
                balance_ratio = cluster_profiles.loc[cluster, 'balance'] / (cluster_profiles.loc[cluster, 'credit_limit'] + 1e-10)
                
                if balance_ratio > 0.8:
                    profile_components.append("Very High Credit Utilization")
                elif balance_ratio > 0.5:
                    profile_components.append("High Credit Utilization")
                elif balance_ratio < 0.2:
                    profile_components.append("Low Credit Utilization")
                else:
                    profile_components.append("Moderate Credit Utilization")
            
            if 'payments' in features and 'minimum_payments' in features:
                payment_ratio = cluster_profiles.loc[cluster, 'payments'] / (cluster_profiles.loc[cluster, 'minimum_payments'] + 1e-10)
                
                if payment_ratio > 3:
                    profile_components.append("Aggressive Overpayer")
                elif payment_ratio > 2:
                    profile_components.append("Consistent Overpayer")
                elif payment_ratio < 1.2:
                    profile_components.append("Minimum Payment User")
            
            if 'balance' in features and 'credit_limit' in features and 'cash_advance' in features:
                risk_score = (
                    cluster_profiles.loc[cluster, 'balance'] / (cluster_profiles.loc[cluster, 'credit_limit'] + 1e-10) * 0.4 +
                    cluster_profiles.loc[cluster, 'cash_advance'] / (cluster_profiles.loc[cluster, 'credit_limit'] + 1e-10) * 0.6
                )
                
                if risk_score > 0.7:
                    profile_components.append("Extremely High Financial Risk")
                elif risk_score > 0.5:
                    profile_components.append("High Financial Risk")
                elif risk_score < 0.2:
                    profile_components.append("Low Financial Risk")
                else:
                    profile_components.append("Moderate Financial Risk")
            
            if profile_components:
                interpretations[cluster] = "; ".join(profile_components)
            else:
                interpretations[cluster] = "Undefined Customer Segment"
        
        return interpretations

    def upload_to_minio(local_path, bucket, object_key, max_retries=5):
        """Uploads a file to MinIO with improved error handling and content-length issues fixed"""
        for attempt in range(max_retries):
            try:
                # Create a fresh client for each upload attempt
                client = boto3.client(
                    's3',
                    endpoint_url="http://192.168.80.155:32000",
                    aws_access_key_id="admin",
                    aws_secret_access_key="dlyticaD123",
                    verify=False,
                    config=Config(
                        connect_timeout=30,
                        read_timeout=60,
                        retries={'max_attempts': 3}
                    )
                )
                
                # Get the file size
                file_size = os.path.getsize(local_path)
                
                # For CSV files, we'll read and write with pandas to ensure proper formatting
                if object_key.endswith('.csv'):
                    df = pd.read_csv(local_path)
                    csv_buffer = StringIO()
                    df.to_csv(csv_buffer, index=False)
                    client.put_object(
                        Bucket=bucket,
                        Key=object_key,
                        Body=csv_buffer.getvalue(),
                        ContentType='text/csv'
                    )
                else:
                    # For non-CSV files, use standard upload
                    with open(local_path, 'rb') as file_data:
                        client.put_object(
                            Bucket=bucket,
                            Key=object_key,
                            Body=file_data,
                            ContentLength=file_size
                        )
                    
                print(f"Successfully uploaded {local_path} to {bucket}/{object_key}")
                return f"{bucket}/{object_key}"
                
            except Exception as e:
                if attempt < max_retries - 1:
                    wait_time = 2 ** attempt  # Exponential backoff
                    print(f"Upload attempt {attempt+1} failed: {str(e)}. Retrying in {wait_time} seconds...")
                    time.sleep(wait_time)
                else:
                    print(f"Upload failed after {max_retries} attempts: {str(e)}")
                    raise

    def clean_directory_structure(minio_client, bucket):
        """Clean up and organize the directory structure in MinIO"""
        try:
            # Create necessary folders if they don't exist
            for folder in ['results/csv/', 'results/json/', 'images/', 'models/', 'archive/']:
                minio_client.put_object(Bucket=bucket, Key=folder, Body='')
            
            # Move old results to archive if they exist
            current_date = datetime.now().strftime('%m_%d')
            
            # Archive previous CSV if exists
            try:
                csv_objects = minio_client.list_objects_v2(Bucket=bucket, Prefix='results/csv/')
                if 'Contents' in csv_objects:
                    for obj in csv_objects['Contents']:
                        if obj['Key'].endswith('.csv'):
                            # Extract filename
                            filename = os.path.basename(obj['Key'])
                            # Copy to archive
                            minio_client.copy_object(
                                Bucket=bucket,
                                CopySource={'Bucket': bucket, 'Key': obj['Key']},
                                Key=f'archive/{filename}'
                            )
                            # Delete original
                            minio_client.delete_object(Bucket=bucket, Key=obj['Key'])
            except Exception as e:
                print(f"Warning: Could not archive previous CSV files: {str(e)}")
            
            # Archive previous JSON if exists
            try:
                json_objects = minio_client.list_objects_v2(Bucket=bucket, Prefix='results/json/')
                if 'Contents' in json_objects:
                    for obj in json_objects['Contents']:
                        if obj['Key'].endswith('.json'):
                            # Extract filename
                            filename = os.path.basename(obj['Key'])
                            # Copy to archive
                            minio_client.copy_object(
                                Bucket=bucket,
                                CopySource={'Bucket': bucket, 'Key': obj['Key']},
                                Key=f'archive/{filename}'
                            )
                            # Delete original
                            minio_client.delete_object(Bucket=bucket, Key=obj['Key'])
            except Exception as e:
                print(f"Warning: Could not archive previous JSON files: {str(e)}")
                
        except Exception as e:
            print(f"Warning: Could not clean directory structure: {str(e)}")

    # Initialize Spark with optimized settings
    spark = SparkSession.builder \
        .appName("AdvancedCustomerSegmentation") \
        .config("spark.driver.memory", "8g") \
        .config("spark.executor.memory", "8g") \
        .config("spark.sql.shuffle.partitions", "200") \
        .config("spark.sql.execution.arrow.pyspark.enabled", "false") \
        .config("spark.executor.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true") \
        .config("spark.driver.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true") \
        .getOrCreate()

    # Initialize MinIO client with improved configuration
    boto_config = Config(
        connect_timeout=30,
        read_timeout=60,
        retries={'max_attempts': 3}
    )
    
    minio_client = boto3.client(
        's3',
        endpoint_url="http://192.168.80.155:32000",
        aws_access_key_id="admin",
        aws_secret_access_key="dlyticaD123",
        verify=False,
        config=boto_config
    )

    try:
        # Clean up and organize directory structure
        clean_directory_structure(minio_client, minio_bucket)
        
        current_date = datetime.now().strftime('%m_%d')
        today_full = datetime.now().strftime('%Y%m%d_%H%M%S')
        local_path = "/tmp/raw_data.csv"
        
        # Download with retry logic
        max_retries = 3
        for attempt in range(max_retries):
            try:
                minio_client.download_file(minio_bucket, file_path, local_path)
                break
            except Exception as e:
                if attempt == max_retries - 1:
                    raise
                time.sleep(5)

        # Read data with explicit schema
        df = spark.read \
            .option("header", "true") \
            .option("inferSchema", "true") \
            .csv(local_path)

        if 'custid' in df.columns:
            df = df.withColumnRenamed("custid", "cif_id")

        # Feature engineering
        df = df.withColumn("balance_utilization_ratio", 
            when(col("credit_limit") != 0, col("balance") / col("credit_limit")).otherwise(0)
        )
        df = df.withColumn("cash_advance_intensity", 
            when(col("credit_limit") != 0, col("cash_advance") / col("credit_limit")).otherwise(0)
        )
        df = df.withColumn("payment_effort_ratio", 
            when(col("minimum_payments") != 0, col("payments") / col("minimum_payments")).otherwise(0)
        )
        df = df.withColumn("purchase_diversity", 
            abs(col("oneoff_purchases") - col("installments_purchases")) / 
            (abs(col("oneoff_purchases") + col("installments_purchases")) + 1e-10)
        )
        df = df.na.fill(0)

        numeric_cols = [
            'purchases', 'oneoff_purchases', 'installments_purchases', 
            'cash_advance', 'cash_advance_frequency', 
            'balance', 'credit_limit', 
            'payments', 'minimum_payments',
            'balance_utilization_ratio', 
            'cash_advance_intensity', 
            'payment_effort_ratio',
            'purchase_diversity'
        ]

        # Feature transformation
        assembler = VectorAssembler(inputCols=numeric_cols, outputCol="features")
        assembled = assembler.transform(df.drop("cif_id"))
        scaler = StandardScaler(inputCol="features", outputCol="scaled_features", withMean=True, withStd=True)
        scaled_data = scaler.fit(assembled).transform(assembled)

        # Dimensionality reduction
        pca = PCA(k=5, inputCol="scaled_features", outputCol="pca_features")
        pca_data = pca.fit(scaled_data).transform(scaled_data)

        # Clustering optimization
        evaluator = ClusteringEvaluator(featuresCol='pca_features')
        k_range = range(6, 15)
        k_metrics = []

        for k in k_range:
            kmeans = BisectingKMeans(k=k, seed=42, featuresCol="pca_features")
            model = kmeans.fit(pca_data)
            predictions = model.transform(pca_data)
            
            cost = model.summary.trainingCost if hasattr(model.summary, 'trainingCost') else 0
            silhouette = evaluator.evaluate(predictions)
            
            centers = model.clusterCenters()
            separation = np.mean([np.min(np.linalg.norm(centers[i] - centers[j])) 
                                  for i in range(len(centers)) 
                                  for j in range(i+1, len(centers))])
            
            k_metrics.append({
                'k': k,
                'cost': cost,
                'silhouette': silhouette,
                'separation': separation
            })

        # Determine optimal k
        k_metrics_df = pd.DataFrame(k_metrics)
        k_metrics_df['normalized_cost'] = (k_metrics_df['cost'] - k_metrics_df['cost'].min()) / (k_metrics_df['cost'].max() - k_metrics_df['cost'].min())
        k_metrics_df['normalized_silhouette'] = (k_metrics_df['silhouette'] - k_metrics_df['silhouette'].min()) / (k_metrics_df['silhouette'].max() - k_metrics_df['silhouette'].min())
        k_metrics_df['normalized_separation'] = (k_metrics_df['separation'] - k_metrics_df['separation'].min()) / (k_metrics_df['separation'].max() - k_metrics_df['separation'].min())
        
        k_metrics_df['composite_score'] = (
            0.3 * (1 - k_metrics_df['normalized_cost']) +
            0.4 * k_metrics_df['normalized_silhouette'] +
            0.3 * k_metrics_df['normalized_separation']
        )
        
        optimal_k = k_metrics_df.loc[k_metrics_df['composite_score'].idxmax(), 'k']

        # Plotting
        fig, (ax1, ax2, ax3) = plt.subplots(3, 1, figsize=(12, 15))
        ax1.plot(k_metrics_df['k'], k_metrics_df['cost'], marker='o', color='blue')
        ax1.set_xlabel('Number of Clusters (k)')
        ax1.set_ylabel('Cost', color='blue')
        ax1.grid(True)

        ax2.plot(k_metrics_df['k'], k_metrics_df['silhouette'], marker='x', color='green')
        ax2.set_xlabel('Number of Clusters (k)')
        ax2.set_ylabel('Silhouette Score', color='green')
        ax2.grid(True)

        ax3.plot(k_metrics_df['k'], k_metrics_df['separation'], marker='^', color='red')
        ax3.set_xlabel('Number of Clusters (k)')
        ax3.set_ylabel('Cluster Separation', color='red')
        ax3.grid(True)

        plt.tight_layout()
        cluster_plot_path = f"/tmp/cluster_diagnostics_{today_full}.png"
        plt.savefig(cluster_plot_path)
        plt.close()
        
        # Final clustering
        kmeans = BisectingKMeans(k=optimal_k, seed=42, featuresCol="pca_features")
        model = kmeans.fit(pca_data)
        clustered_data = model.transform(pca_data)

        # Add cluster information
        df_with_id = df.withColumn("row_id", monotonically_increasing_id())
        clustered_data = clustered_data.withColumn("row_id", monotonically_increasing_id())
        clustered_data = clustered_data.join(df_with_id.select("row_id", "cif_id"), on="row_id").drop("row_id")

        # Generate cluster profiles
        summary = clustered_data.groupBy("prediction").agg(
            *[mean(col(c)).alias(c) for c in numeric_cols],
            *[stddev(col(c)).alias(f"{c}_std") for c in numeric_cols],
            count("cif_id").alias("cluster_size")
        )
        
        # Convert to Pandas safely
        cluster_data = summary.collect()
        cluster_profiles = pd.DataFrame([row.asDict() for row in cluster_data]).set_index("prediction")
        
        # Get interpretations
        interpretations = interpret_clusters(cluster_profiles)
        
        # Add cluster interpretation to dataframe
        interpret_udf = udf(lambda x: interpretations.get(x, "Unknown"), StringType())
        clustered_data = clustered_data.withColumn("interpretation", interpret_udf(col("prediction")))

        # Prepare final output with all relevant columns
        output_columns = ["cif_id", "prediction", "interpretation"] + numeric_cols
        final_output = clustered_data.select(
            col("cif_id").alias("cif_id"),
            col("prediction").alias("cluster"),
            col("interpretation").alias("interpretations")
        )

        # Convert to Pandas DataFrame for CSV export
        final_pdf = pd.DataFrame(
            final_output.rdd.map(lambda row: row.asDict()).collect()
        )

        # Define output paths
        output_paths = {
            'final_output': f"/tmp/final_segmented_customers_{current_date}.csv",
            'interpretations': f"/tmp/cluster_interpretations_{current_date}.json",
            'plot': cluster_plot_path,
            'model': f"/tmp/segmentation_model_{today_full}"
        }
        
        # Save final output (only the 3 columns)
        final_pdf.to_csv(output_paths['final_output'], index=False)
        print(f"Final output sample:\n{final_pdf.head()}")
        
        # Save interpretations
        with open(output_paths['interpretations'], 'w') as f:
            json.dump(interpretations, f, indent=2)
        
        # Save model
        model.write().overwrite().save(output_paths['model'])
        
        # Upload to MinIO with specified folder structure
        minio_paths = {}
        
        # Upload CSV files to results/csv/
        csv_key = f"results/csv/customer_segments_{current_date}.csv"
        try:
            uploaded_path = upload_to_minio(output_paths['final_output'], minio_bucket, csv_key)
            minio_paths['final_output'] = uploaded_path
        except Exception as e:
            print(f"Error uploading CSV: {str(e)}")
            raise

        # Upload JSON to results/json/
        json_key = f"results/json/cluster_interpretations_{current_date}.json"
        try:
            uploaded_path = upload_to_minio(output_paths['interpretations'], minio_bucket, json_key)
            minio_paths['interpretations'] = uploaded_path
        except Exception as e:
            print(f"Error uploading JSON: {str(e)}")
            raise

        # Upload plot to images/
        plot_key = f"images/cluster_diagnostics_{today_full}.png"
        try:
            uploaded_path = upload_to_minio(output_paths['plot'], minio_bucket, plot_key)
            minio_paths['plot'] = uploaded_path
        except Exception as e:
            print(f"Error uploading plot: {str(e)}")
            raise

        # Upload model to models/
        model_key_prefix = f"models/segmentation_model_{today_full}"
        try:
            for root, _, files in os.walk(output_paths['model']):
                for file in files:
                    file_path = os.path.join(root, file)
                    relative_path = os.path.relpath(file_path, output_paths['model'])
                    s3_key = f"{model_key_prefix}/{relative_path}"
                    upload_to_minio(file_path, minio_bucket, s3_key)
            minio_paths['model'] = f"{minio_bucket}/{model_key_prefix}"
        except Exception as e:
            print(f"Error uploading model files: {str(e)}")
            minio_paths['model'] = f"{minio_bucket}/{model_key_prefix}_partial"

        output = namedtuple('Outputs', [
            'segmentation_path',
            'model_path',
            'cluster_plot_path',
            'cluster_interpretations',
            'final_output_path'
        ])
        
        return output(
            f"{minio_bucket}/{csv_key}",  # segmentation_path
            f"{minio_bucket}/{model_key_prefix}",  # model_path
            f"{minio_bucket}/{plot_key}",  # cluster_plot_path
            f"{minio_bucket}/{json_key}",  # cluster_interpretations
            f"{minio_bucket}/{csv_key}"   # final_output_path
        )
        
    except Exception as e:
        print(f"Error in feature_engineering_and_segmentation: {str(e)}")
        raise
    finally:
        if spark:
            spark.stop()
            

@component(
    base_image='quay.io/datanature_dev/jupyternotebook:java_home14',
    packages_to_install=[
        'pyspark==3.5.0',
        'pandas==2.0.3',
        'numpy==1.24.4',
        'boto3==1.28.57',
        'pyarrow==12.0.1',
        'urllib3==2.0.4'
    ]
)
def save_predictions_to_trino(
    segmentation_path: str,
    minio_bucket: str
) -> str:
    import os
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col
    from pyspark.sql.utils import AnalysisException
    import boto3
    from botocore.config import Config
    from datetime import datetime

    # Configuration
    access_key_id = 'admin'
    secret_access_key = 'dlyticaD123'
    minio_endpoint = 'dn-minio-tenant-hl.dn-minio-tenant.svc.cluster.local:9000'
    data_bucket = 'ai360fd-recommendation'
    hive_metastore_uri = "thrift://dn-hive-metastore.dn-hive-metastore.svc.cluster.local:9083"
    iceberg_warehouse_location = f"s3a://{data_bucket}/data/"
    custom_catalog = "iceberg_catalog"
    app_name = "Customer Segmentation Loader"
    additional_packages = "org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.2,org.apache.spark:spark-avro_2.12:3.5.0,com.crealytics:spark-excel_2.12:0.13.7,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.262,org.apache.hadoop:hadoop-common:3.3.4"
    
    # Initialize MinIO client for file management
    minio_client = boto3.client(
        's3',
        endpoint_url="http://192.168.80.155:32000",
        aws_access_key_id="admin",
        aws_secret_access_key="dlyticaD123",
        verify=False,
        config=Config(
            connect_timeout=30,
            read_timeout=60,
            retries={'max_attempts': 3}
        )
    )

    def archive_results():
        """Archive the results after successful Trino upload"""
        current_date = datetime.now().strftime('%m_%d')
        try:
            # Archive CSV file
            csv_key = segmentation_path.replace(f"{minio_bucket}/", "")
            archive_csv_key = f"archive/customer_segments_{current_date}.csv"
            
            # Copy CSV to archive
            minio_client.copy_object(
                Bucket=minio_bucket,
                CopySource={'Bucket': minio_bucket, 'Key': csv_key},
                Key=archive_csv_key
            )
            
            # Delete original CSV
            minio_client.delete_object(Bucket=minio_bucket, Key=csv_key)
            
            # Find and archive corresponding JSON file
            json_prefix = csv_key.replace("csv/", "json/").replace(".csv", ".json")
            json_objects = minio_client.list_objects_v2(Bucket=minio_bucket, Prefix=json_prefix)
            
            if 'Contents' in json_objects:
                json_key = json_objects['Contents'][0]['Key']
                archive_json_key = f"archive/{os.path.basename(json_key)}"
                
                # Copy JSON to archive
                minio_client.copy_object(
                    Bucket=minio_bucket,
                    CopySource={'Bucket': minio_bucket, 'Key': json_key},
                    Key=archive_json_key
                )
                
                # Delete original JSON
                minio_client.delete_object(Bucket=minio_bucket, Key=json_key)
            
            print("Results successfully archived")
            
        except Exception as e:
            print(f"Warning: Could not archive results: {str(e)}")

    # Initialize Spark Session with more compatible settings
    spark = SparkSession.builder \
        .appName(app_name) \
        .config("spark.driver.memory", "8g") \
        .config("spark.executor.memory", "8g") \
        .config("spark.executor.memoryOverhead", "2g") \
        .config("spark.driver.maxResultSize", "2g") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.fs.s3a.endpoint", minio_endpoint) \
        .config("spark.hadoop.fs.s3a.access.key", access_key_id) \
        .config("spark.hadoop.fs.s3a.secret.key", secret_access_key) \
        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
        .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
        .config("spark.hadoop.hive.metastore.uris", hive_metastore_uri) \
        .config("spark.sql.warehouse.dir", iceberg_warehouse_location) \
        .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog") \
        .config(f"spark.sql.catalog.{custom_catalog}", "org.apache.iceberg.spark.SparkCatalog") \
        .config(f"spark.sql.catalog.{custom_catalog}.warehouse", iceberg_warehouse_location) \
        .config(f"spark.sql.catalog.{custom_catalog}.s3.endpoint", minio_endpoint) \
        .config(f"spark.sql.catalog.{custom_catalog}.s3.access-key", access_key_id) \
        .config(f"spark.sql.catalog.{custom_catalog}.s3.secret-key", secret_access_key) \
        .config(f"spark.sql.catalog.{custom_catalog}.s3.path-style-access", "true") \
        .config("spark.jars.packages", additional_packages) \
        .config("spark.sql.repl.eagerEval.enabled", True) \
        .config("spark.sql.debug.maxToStringFields", 1000) \
        .config("spark.sql.legacy.createHiveTableByDefault", "false") \
        .config("spark.sql.iceberg.handle-timestamp-without-timezone", "true") \
        .config("spark.sql.adaptive.enabled", "true") \
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
        .config("spark.sql.shuffle.partitions", "10") \
        .config("spark.hadoop.fs.s3a.connection.maximum", "100") \
        .config("spark.sql.parquet.compression.codec", "snappy") \
        .enableHiveSupport() \
        .getOrCreate()

    try:
        # Extract the object key from the segmentation_path
        if segmentation_path.startswith(f"{minio_bucket}/"):
            object_key = segmentation_path[len(minio_bucket)+1:]
        else:
            object_key = segmentation_path
        
        # Construct the S3 path for Spark to read
        input_s3_path = f"s3a://{minio_bucket}/{object_key}"
        print(f"Reading data from: {input_s3_path}")
        
        # Read the segmentation data
        df = spark.read \
            .option("header", "true") \
            .option("inferSchema", "true") \
            .csv(input_s3_path)
        
        # Print input schema for debugging
        print("Input DataFrame Schema:")
        df.printSchema()
        
        # Rename columns to match our desired schema
        column_mapping = {
            "cluster": "cluster_id",
            "interpretations": "interpretation"
        }
        
        for old_name, new_name in column_mapping.items():
            if old_name in df.columns:
                df = df.withColumnRenamed(old_name, new_name)
        
        # Ensure required columns exist
        required_columns = ["cif_id", "cluster_id", "interpretation"]
        missing_columns = [col for col in required_columns if col not in df.columns]
        if missing_columns:
            raise ValueError(f"Missing required columns: {missing_columns}")
        
        # Select only the required columns and cache to minimize re-computation
        df = df.select(*required_columns).cache()
        
        # Define the table name
        table_name = f"{custom_catalog}.gold.customer_segments"
        
        # Handle existing table
        try:
            # Check if table exists
            print(f"Checking if {table_name} exists")
            
            tables = spark.catalog.listTables(f"{custom_catalog}.gold")
            table_exists = any(t.name.lower() == "customer_segments" for t in tables)
            
            if table_exists:
                print(f"Table {table_name} exists, will use 'overwrite' mode")
                # Get sample of existing data to verify schema
                try:
                    existing_df = spark.table(table_name).limit(1)
                    print("Existing table schema:")
                    existing_df.printSchema()
                except Exception as e:
                    print(f"Could not read existing table: {str(e)}")
            else:
                print(f"Table {table_name} does not exist")
                
        except Exception as e:
            print(f"Error checking table existence: {str(e)}")
            print("Proceeding with assumption that table does not exist")
            table_exists = False
        
        # Write data - handle both cases (exists or not)
        print("Writing data to table...")
        try:
            if table_exists:
                # Overwrite existing table
                print(f"Overwriting existing table {table_name}")
                df.writeTo(table_name).overwrite()
            else:
                # Create new table
                print(f"Creating new table {table_name}")
                df.writeTo(table_name).using("iceberg").create()
            
            # Verify the data was written
            verification_df = spark.table(table_name)
            count = verification_df.count()
            print(f"Successfully wrote {count} records to {table_name}")
            
            # Archive the results after successful write
            archive_results()
            
            return f"Successfully loaded {count} records to {table_name}"
            
        except Exception as write_error:
            if "XMinioStorageFull" in str(write_error):
                print("Storage full error detected")
                # Handle storage full error by using compact format and reducing data size
                print("Attempting to write with more compact format and reduced dataset")
                
                # Sample smaller dataset as emergency fallback
                small_df = df.sample(fraction=0.1).cache()
                
                # Try write with most efficient settings
                small_df.coalesce(1).writeTo(table_name).using("iceberg").option("write-format", "parquet").option("compression", "snappy").createOrReplace()
                
                warn_msg = "WARNING: Storage was full - only wrote 10% sample of data"
                print(warn_msg)
                return warn_msg
            else:
                # Re-raise other errors
                raise
                
    except Exception as e:
        print(f"Error in save_predictions_to_trino: {str(e)}")
        return f"Error: {str(e)}"
    finally:
        if 'spark' in locals():
            spark.stop()
# Pipeline definition
@pipeline(name="Customer Segmentation Pipeline")
def customer_segmentation_pipeline(
    minio_bucket: str = "ai360ctzn-customer-segmentation"
):
    # Step 1: Fetch data
    fetch_task = fetch_data_trino(minio_bucket=minio_bucket)
    fetch_task.set_caching_options(False)
    
    # Step 2: Process data and create segments
    segmentation_task = feature_engineering_and_segmentation(
        file_path=fetch_task.output,
        minio_bucket=minio_bucket
    )
    segmentation_task.set_caching_options(False)
    
    # Step 3: Save results to Trino
    save_to_trino = save_predictions_to_trino(
        segmentation_path=segmentation_task.outputs['final_output_path'],
        minio_bucket=minio_bucket
    )
    save_to_trino.set_caching_options(False)

# Compile the pipeline
if __name__ == "__main__":
    from kfp import compiler
    compiler.Compiler().compile(
        customer_segmentation_pipeline,
        "customer_segmentation_pipeline.yaml"
    )
    print("Pipeline compiled successfully.")

Pipeline compiled successfully.


In [6]:
import re
from urllib.parse import urlsplit, urlencode

import kfp
import requests
import urllib3


class KFPClientManager:
    """
    A class that creates `kfp.Client` instances with Dex authentication.
    """

    def __init__(
        self,
        api_url: str,
        dex_username: str,
        dex_password: str,
        dex_auth_type: str = "local",
        skip_tls_verify: bool = False,
    ):
        """
        Initialize the KfpClient"

        :param api_url: the Kubeflow Pipelines API URL
        :param skip_tls_verify: if True, skip TLS verification
        :param dex_username: the Dex username
        :param dex_password: the Dex password
        :param dex_auth_type: the auth type to use if Dex has multiple enabled, one of: ['ldap', 'local']
        """
        self._api_url = api_url
        self._skip_tls_verify = skip_tls_verify
        self._dex_username = dex_username
        self._dex_password = dex_password
        self._dex_auth_type = dex_auth_type
        self._client = None

        # disable SSL verification, if requested
        if self._skip_tls_verify:
            urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

        # ensure `dex_default_auth_type` is valid
        if self._dex_auth_type not in ["ldap", "local"]:
            raise ValueError(
                f"Invalid `dex_auth_type` '{self._dex_auth_type}', must be one of: ['ldap', 'local']"
            )

    def _get_session_cookies(self) -> str:
        """
        Get the session cookies by authenticating against Dex
        :return: a string of session cookies in the form "key1=value1; key2=value2"
        """

        # use a persistent session (for cookies)
        s = requests.Session()

        # GET the api_url, which should redirect to Dex
        resp = s.get(
            self._api_url, allow_redirects=True, verify=not self._skip_tls_verify
        )
        if resp.status_code == 200:
            pass
        elif resp.status_code == 403:
            # if we get 403, we might be at the oauth2-proxy sign-in page
            # the default path to start the sign-in flow is `/oauth2/start?rd=<url>`
            url_obj = urlsplit(resp.url)
            url_obj = url_obj._replace(
                path="/oauth2/start", query=urlencode({"rd": url_obj.path})
            )
            resp = s.get(
                url_obj.geturl(), allow_redirects=True, verify=not self._skip_tls_verify
            )
        else:
            raise RuntimeError(
                f"HTTP status code '{resp.status_code}' for GET against: {self._api_url}"
            )

        # if we were NOT redirected, then the endpoint is unsecured
        if len(resp.history) == 0:
            # no cookies are needed
            return ""

        # if we are at `../auth` path, we need to select an auth type
        url_obj = urlsplit(resp.url)
        if re.search(r"/auth$", url_obj.path):
            url_obj = url_obj._replace(
                path=re.sub(r"/auth$", f"/auth/{self._dex_auth_type}", url_obj.path)
            )

        # if we are at `../auth/xxxx/login` path, then we are at the login page
        if re.search(r"/auth/.*/login$", url_obj.path):
            dex_login_url = url_obj.geturl()
        else:
            # otherwise, we need to follow a redirect to the login page
            resp = s.get(
                url_obj.geturl(), allow_redirects=True, verify=not self._skip_tls_verify
            )
            if resp.status_code != 200:
                raise RuntimeError(
                    f"HTTP status code '{resp.status_code}' for GET against: {url_obj.geturl()}"
                )
            dex_login_url = resp.url

        # attempt Dex login
        resp = s.post(
            dex_login_url,
            data={"login": self._dex_username, "password": self._dex_password},
            allow_redirects=True,
            verify=not self._skip_tls_verify,
        )
        if resp.status_code != 200:
            raise RuntimeError(
                f"HTTP status code '{resp.status_code}' for POST against: {dex_login_url}"
            )

        # if we were NOT redirected, then the login credentials were probably invalid
        if len(resp.history) == 0:
            raise RuntimeError(
                f"Login credentials are probably invalid - "
                f"No redirect after POST to: {dex_login_url}"
            )

        # if we are at `../approval` path, we need to approve the login
        url_obj = urlsplit(resp.url)
        if re.search(r"/approval$", url_obj.path):
            dex_approval_url = url_obj.geturl()

            # approve the login
            resp = s.post(
                dex_approval_url,
                data={"approval": "approve"},
                allow_redirects=True,
                verify=not self._skip_tls_verify,
            )
            if resp.status_code != 200:
                raise RuntimeError(
                    f"HTTP status code '{resp.status_code}' for POST against: {url_obj.geturl()}"
                )

        return "; ".join([f"{c.name}={c.value}" for c in s.cookies])

    def _create_kfp_client(self) -> kfp.Client:
        try:
            session_cookies = self._get_session_cookies()
        except Exception as ex:
            raise RuntimeError(f"Failed to get Dex session cookies") from ex

        # monkey patch the kfp.Client to support disabling SSL verification
        # kfp only added support in v2: https://github.com/kubeflow/pipelines/pull/7174
        original_load_config = kfp.Client._load_config

        def patched_load_config(client_self, *args, **kwargs):
            config = original_load_config(client_self, *args, **kwargs)
            config.verify_ssl = not self._skip_tls_verify
            return config

        patched_kfp_client = kfp.Client
        patched_kfp_client._load_config = patched_load_config

        return patched_kfp_client(
            host=self._api_url,
            cookies=session_cookies,
        )

    def create_kfp_client(self) -> kfp.Client:
        """Get a newly authenticated Kubeflow Pipelines client."""
        return self._create_kfp_client()

In [7]:
kfp_client_manager = KFPClientManager(
    api_url="http://192.168.80.155:31904/pipeline",
    skip_tls_verify=True,

    dex_username="user@dlytica.com",
    dex_password="dlytica@D123#",

    # can be 'ldap' or 'local' depending on your Dex configuration
    dex_auth_type="local",
)

# get a newly authenticated KFP client
# TIP: long-lived sessions might need to get a new client when their session expires
kfp_client = kfp_client_manager.create_kfp_client()

# # test the client by listing experiments
# experiments = kfp_client.list_experiments(namespace="kubeflow-user-example-com")
# print(experiments)


from datetime import datetime
timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
pipeline_name = 'recommendation_pipeline'
pipelines = kfp_client.list_pipelines(page_size=100)
# experiments = kfp_client.list_experiments()
# print(experiments)
# print(pipelines)
for pipeline in pipelines.pipelines:
    print(pipeline.display_name)
    if pipeline.display_name == pipeline_name:
        print("pipeline matched")
        pipeline_id = pipeline.pipeline_id
        print(pipeline_id)
        # pipeline_id = '4758a6db-8057-463f-a185-a68f3e45d920'
        pipeline_versions = kfp_client.list_pipeline_versions(pipeline_id=pipeline_id, page_size=100)
        for version in pipeline_versions.pipeline_versions:
            print(f"Version ID: {version.pipeline_version_id}, Name: {version.display_name}")
        for version in pipeline_versions.pipeline_versions:
            version_id = version.pipeline_version_id
            try:
                kfp_client.delete_pipeline_version(pipeline_id = pipeline_id, pipeline_version_id=version_id)  
                print(f"Deleted version: {version_id}")
                
            except Exception as e:
                print(f"Failed to delete version {version_id}: {e}")
        kfp_client.delete_pipeline(pipeline.pipeline_id)
description = 'this is automl pipeline'
kfp_client.upload_pipeline('customer_segmentation_pipeline.yaml',pipeline_name = pipeline_name, description = description)

run = kfp_client.create_run_from_pipeline_func(
    customer_segmentation_pipeline,
    arguments={"minio_bucket": "ai360ctzn-customer-segmentation"}
)

[Tutorial] Data passing in python components
[Tutorial] DSL - Control structures
Citizen_pipeline16
Customer_Segmentation_Pipeline
recommendation_pipeline
pipeline matched
671aeb3c-cf72-442c-8dd0-80f2ce2d28cc
Version ID: 1000f23f-0588-49ca-a3c0-12844cee5603, Name: recommendation_pipeline
Deleted version: 1000f23f-0588-49ca-a3c0-12844cee5603


