In [1]:
!python distrt_pipeline.py

In [2]:
from kfp.compiler import Compiler
from kfp.client import Client
from distrt_pipeline import madrigal_pipeline
import time

KUBEFLOW_PIPELINES_URL = "http://localhost:8888" #exposed using kubectl -n kubeflow port-forward svc/ml-pipeline 8888 &
PIPELINE_NAME = "madrigal_pipeline"
PIPELINE_FILE = "madrigal_pipeline.yaml"
EXPERIMENT_NAME = "madrigal_experiment"

client = Client(host=KUBEFLOW_PIPELINES_URL)

# Compile pipeline
Compiler().compile(pipeline_func=madrigal_pipeline, package_path=PIPELINE_FILE)

# Get existing pipeline ID
existing_pipelines = client.list_pipelines().pipelines or []
pipeline_id = next((p.pipeline_id for p in existing_pipelines if p.display_name == PIPELINE_NAME), None)

if pipeline_id:
    # Update pipeline version instead of re-creating
    print(f"Updating existing pipeline: {PIPELINE_NAME}")
    pipeline_version_name = f"{PIPELINE_NAME}_v{int(time.time())}"  # Unique version name
    pipeline_version = client.upload_pipeline_version(
        pipeline_package_path=PIPELINE_FILE,
        pipeline_version_name=pipeline_version_name,
        pipeline_id=pipeline_id
    )
    version_id = pipeline_version.pipeline_version_id  # Corrected attribute
else:
    # Create new pipeline if it doesn't exist
    print(f"Creating new pipeline: {PIPELINE_NAME}")
    pipeline = client.upload_pipeline(PIPELINE_FILE, pipeline_name=PIPELINE_NAME)
    pipeline_id = pipeline.pipeline_id
    version_id = pipeline.default_version.pipeline_version_id  # Corrected attribute

# Get or create experiment
experiments = client.list_experiments().experiments or []
experiment = next((e for e in experiments if e.display_name == EXPERIMENT_NAME), None) or \
             client.create_experiment(name=EXPERIMENT_NAME)

Updating existing pipeline: madrigal_pipeline




In [4]:
# Launch new pipeline run with both pipeline_id and version_id
print(f"Launching pipeline run for pipeline_id={pipeline_id}, version_id={version_id}")
run = client.run_pipeline(
    experiment_id=experiment.experiment_id,
    job_name=f"{PIPELINE_NAME}_run",
    pipeline_id=pipeline_id,
    version_id=version_id,
    enable_caching=False
)

print(f"✅ Pipeline run started: {run.run_id}")

Launching pipeline run for pipeline_id=e9e47a65-41af-4733-b5f5-a459bc5b478e, version_id=cb6e77a3-2141-4c11-9042-dd5b1760e3f9


✅ Pipeline run started: 5fbf1eb2-77c2-4169-8181-617d0fd72c18
