In [1]:
import os
import time
import datetime
import warnings

from kfp.dsl import Dataset
from kfp.dsl import Input
from kfp_client_manager import KFPClientManager
import kfp.components as comps
import kfp.dsl as dsl
from kfp import compiler
from kfp import kubernetes

In [2]:
warnings.simplefilter(action="ignore", category=FutureWarning)

In [3]:
# Initialize a KFPClientManager
kfp_client_manager = KFPClientManager(
    api_url="http://localhost:8080/pipeline",
    skip_tls_verify=True,
    dex_username="user@example.com",
    dex_password="12341234",
    # can be 'ldap' or 'local' depending on your Dex configuration
    dex_auth_type="local",
    namespace="kubeflow-user-example-com",
)

In [4]:
kfp_client = kfp_client_manager.create_kfp_client()

In [5]:
experiments = kfp_client.list_experiments()
print(experiments)

{'experiments': [{'created_at': datetime.datetime(2025, 2, 19, 18, 38, 17, tzinfo=tzutc()),
                  'description': None,
                  'display_name': 'csv_cleaner',
                  'experiment_id': '93b6988a-cc6c-4735-967f-68b01fac8dc6',
                  'namespace': 'kubeflow-user-example-com',
                  'storage_state': 'AVAILABLE'},
                 {'created_at': datetime.datetime(2025, 2, 19, 19, 10, 37, tzinfo=tzutc()),
                  'description': None,
                  'display_name': 'show_dataframe',
                  'experiment_id': 'b89a6cc8-bb0e-4efd-9b62-5e922a5c7e38',
                  'namespace': 'kubeflow-user-example-com',
                  'storage_state': 'AVAILABLE'},
                 {'created_at': datetime.datetime(2025, 2, 19, 22, 51, 52, tzinfo=tzutc()),
                  'description': None,
                  'display_name': 'dataframe',
                  'experiment_id': '4f449500-90af-4de8-a90b-ee7239cb20cb',
                

In [7]:
DOWNLOAD_COMP_YML = "./components/download/component.yml"
WRITE_S3_COMP_YML = "./components/persist/component.yml"
TRAIN_COMP_YML = "./components/train/component.yml"
PREDICT_COMP_YML = "./components/predict/component.yml"

In [8]:
csv_download_op = comps.load_component_from_file(DOWNLOAD_COMP_YML)
write_s3_op = comps.load_component_from_file(WRITE_S3_COMP_YML)
xgboost_train_on_csv_op = comps.load_component_from_file(TRAIN_COMP_YML)
xgboost_predict_on_csv_op = comps.load_component_from_file(PREDICT_COMP_YML)

In [12]:
@dsl.pipeline(name="iris-classification", description="Train and predict pipeline")
def csv_cleaner_pipeline(persist_s3: bool):
    
    training_data_csv = (
        csv_download_op() 
        .set_caching_options(enable_caching=False)
        # Example of injecting an env variable into the comp container
        .set_env_variable(name="ENV", value="anything")
    ).outputs["csv_file"]


    with dsl.If(persist_s3 == True):
        keys = [f"ml_platform/iris_{i}.csv" for i in range(5)]
        
        with dsl.ParallelFor(keys, parallelism=3) as key:    
            
            save_csv = (
                write_s3_op(csv_file=training_data_csv, key=key)
                .set_caching_options(enable_caching=False)
            )

            # Inject S3 credentials into the comp container as env variables
            kubernetes.use_secret_as_env(
                save_csv,
                secret_name="s3-credentials",
                secret_key_to_env={'S3_BUCKET': 'bucket'})
            
            kubernetes.use_secret_as_env(
                save_csv,
                secret_name="s3-credentials",
                secret_key_to_env={'AWS_ACCESS_KEY_ID': 'access_key'})
            
            kubernetes.use_secret_as_env(
                save_csv,
                secret_name="s3-credentials",
                secret_key_to_env={'AWS_SECRET_ACCESS_KEY': 'secret_key'})
                
            kubernetes.use_secret_as_env(
                save_csv,
                secret_name="s3-credentials",
                secret_key_to_env={'S3_ENDPOINT_URL': 'endpoint_url'})
    
    model_trained_on_csv = xgboost_train_on_csv_op(
        training_data=training_data_csv,
         label_column=4,
         objective='multi:softmax',
         num_iterations=200
    ).set_memory_limit('1Gi').outputs['model']

    xgboost_predict_on_csv_op(
        data=training_data_csv,
        model=model_trained_on_csv,
        label_column=4,
    ).set_memory_limit('1Gi')
    

In [13]:
compiler.Compiler().compile(
    pipeline_func=csv_cleaner_pipeline,
    package_path='iris_s3_pipeline.yml')

In [11]:
ts = datetime.datetime.fromtimestamp(time.time()).strftime('%Y-%m-%d_%H-%M-%S')
experiment_name = 'csv_to_s3'
run_name = '_run_' + ts

run_result = kfp_client.create_run_from_pipeline_package(
    pipeline_file="iris_s3_pipeline.yml", 
    experiment_name=experiment_name, 
    run_name=run_name,
    arguments={"persist_s3": False}
)