In [1]:
!pip install kfp


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.0[0m[39;49m -> [0m[32;49m25.1.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [1]:
from kfp import dsl
import kfp


In [8]:
@dsl.component
def add(a: float, b: float) -> float:
    '''Calculates sum of two arguments'''
    return a + b

In [10]:
@dsl.pipeline(
    name='kubeflow 101 pipeline',
    description='An example pipeline that performs addition calculations.')
def add_pipeline(
    a: float = 1.0,
    b: float = 7.0,
):
    first_add_task = add(a=a, b=4.0)
    second_add_task = add(a=first_add_task.output, b=b)
    thrid_add_task = add(a=second_add_task.output, b=1.0)

In [11]:
client = kfp.Client(host='http://localhost:8080')

In [12]:
client.create_run_from_pipeline_func(
    add_pipeline, arguments={
        'a': 7.0,
        'b': 8.0
    })

RunPipelineResult(run_id=e25e27ad-99fa-40c0-82dc-71241f6ab4ed)

## IRIS dataset

In [47]:
from kfp import dsl
from kfp import compiler
from typing import NamedTuple
import kfp

@dsl.component(base_image='python:3.9-slim')
def preprocess_data()-> NamedTuple('Outputs', [
    ('X_train', list),
    ('X_test', list),
    ('y_train', list),
    ('y_test', list)
]):
    import subprocess
    subprocess.run(["pip", "install", "scikit-learn"], check=True)
    from sklearn.datasets import load_iris
    from sklearn.model_selection import train_test_split

    iris = load_iris()
    X_train, X_test, y_train, y_test = train_test_split(
        iris.data, iris.target, test_size=0.2, random_state=42
    )
    # return {'X_train':X_train.tolist(), 'X_test': X_test.tolist(), 'y_train': y_train.tolist(), 'y_test': y_test.tolist()}
    return X_train.tolist(), X_test.tolist(), y_train.tolist(), y_test.tolist()


@dsl.component(base_image='python:3.9-slim')
def train_model(X_train: list, y_train: list)-> str:
    
    import subprocess
    subprocess.run(["pip", "install", "scikit-learn"], check=True)
    subprocess.run(["pip", "install", "pandas"], check =True)
    
    from sklearn.ensemble import RandomForestClassifier
    import pickle
    import pandas as pd
    import os

    clf = RandomForestClassifier()
    df_x = pd.DataFrame(X_train)
    df_y = pd.DataFrame(y_train)
    
    clf.fit(X_train, y_train)
    

    model_path = "/tmp/model.pkl"
    with open(model_path, "wb") as f:
        pickle.dump(clf, f)
    print("Training done.....!")
    return "Pipeline successful"



In [48]:
@dsl.pipeline(name="iris-ds-pipeline", description="Data Science Pipeline for Iris Dataset")
def iris_pipeline():
    step1 = preprocess_data()
    step2 = train_model(X_train=step1.outputs["X_train"], y_train=step1.outputs["y_train"])
    # step3 = evaluate_model(model_path=step2.output, X_test=step1.outputs["output1"], y_test=step1.outputs["output3"])

In [49]:
from kfp import compiler

# Replace `iris_pipeline` with your pipeline function name
compiler.Compiler().compile(pipeline_func=iris_pipeline, package_path='iris_pipeline.yaml')


In [50]:
import kfp

client = kfp.Client(host='http://localhost:8080')  # Adjust if needed

# Upload pipeline
pipeline = client.upload_pipeline(pipeline_package_path='iris_pipeline.yaml', pipeline_name='Iris-Pipeline-6')
