In [208]:
import kfp
from kfp import dsl
from kfp.components import (
    InputPath,
    InputTextFile,
    OutputPath,
    OutputTextFile,
    func_to_container_op,
)

import pandas as pd
from typing import NamedTuple

import sys

sys.path.insert(0, "..")
from constants import NAMESPACE, HOST
from utils import get_session_cookie, get_or_create_experiment, get_or_create_pipeline

In [209]:
# Where all the runs belong to the pipeline reside in
EXPERIMENT_NAME = "tts-serving"

# Define pipeline components

In [210]:
# The first component to download data, train-test split
# and then dump all the data for downstream components to use
def load_data(
    data_url: str,
    data_serving_path: OutputPath("PKL"),
):
    import wget
    import joblib
    
    # Download sentences.txt to local
    wget.download(data_url)

    # Load data file
    with open("sentences.txt", "r") as f:
        sentences = f.readlines()
    
    sentences = [sentence.strip() for sentence in sentences]

    # Dump data to pkl for downstream components to use
    joblib.dump(sentences, data_serving_path)
    
    print(sentences)
# Instead of using create_component_from_func,
# you can use this instead
load_data_op = func_to_container_op(
    func=load_data,
    packages_to_install=[
        "joblib==1.1.0",
        "wget==3.2"
    ],
)

In [211]:
def serving(
    data_serving_path: InputPath("PKL"),
):
    import joblib
    import requests
    
    headers = {
    "Content-Type": "application/json"}

    # Define our data for prediction
    sentences = joblib.load(data_serving_path)
    for sentence in sentences:
        json_data = {"Text": sentences[0]}
        response = requests.post('http://tts.kserve-deployment.34.170.87.225.sslip.io/predict', json=json_data)
        status = requests.post('https://f189-27-77-246-74.ngrok-free.app/process', json=response.json(), headers=headers)
        print(status)
    
kserve_op = func_to_container_op(
    func=serving,
    packages_to_install=[
        "joblib==1.1.0",
        "requests==2.31.0",
    ],
)

# Define some pipelines

In [212]:
@dsl.pipeline(
    name="text to speech serving", description="send requests to tts models and get response audio."
)
def tts_serving_pipeline(data_url):
    # A sample pipeline showing how to pass data (small) between components.
    load_data_task = load_data_op(data_url=data_url)
    kserve_task = kserve_op(
        data_serving = load_data_task.outputs["data_serving"]
    )
    

# Run the pipelines

In [213]:
# Get the token to authenticate to the `ml-pipeline` service
session_cookie = get_session_cookie()

# Initialize the client
client = kfp.Client(
    host=f"{HOST}/pipeline",
    cookies=f"authservice_session={session_cookie}",
    namespace=NAMESPACE,
)

In [214]:
client.create_run_from_pipeline_func(
    tts_serving_pipeline,
    arguments={
        "data_url": "https://raw.githubusercontent.com/dunghoang369/test/main/sentences.txt"
    },
    experiment_name=EXPERIMENT_NAME,
    namespace=NAMESPACE,
)

RunPipelineResult(run_id=a5af680c-11b5-40dd-a302-c017a4c33ca6)

# Compile the pipelines

In [217]:
# Define the compiled pipeline version, each time
# you change the pipeline, change the version also
PIPELINE_VERSION = "0.0.1"
PIPELINE_NAME = "tts_serving_pipeline"
PIPELINE_DESCRIPTION = "A pipeline to serve text to speech service"

# Define the name of the compiled pipeline
pipeline_package_path = (
    f"../compiled_pipelines/{PIPELINE_NAME}_{PIPELINE_VERSION}.yaml"
)

# Compile the pipeline into a YAML file, you will see it
# is an Argo Workflow object
kfp.compiler.Compiler().compile(
    pipeline_func=tts_serving_pipeline,
    package_path=pipeline_package_path,
)

# Run the pre-compiled pipelines

In [219]:
# Get the existing experiment or create a new one if not exist
experiment = get_or_create_experiment(client, name=EXPERIMENT_NAME, namespace=NAMESPACE)

# Get or create a pipeline to save all runs if not exist
pipeline = get_or_create_pipeline(
    client,
    pipeline_name=PIPELINE_NAME,
    pipeline_package_path=pipeline_package_path,
    version=PIPELINE_VERSION,
    pipeline_description=PIPELINE_DESCRIPTION,
)

In [220]:
from datetime import datetime

# Run from the compiled pipeline
now = datetime.now().strftime("%Y%m%d%H%M%S")  # Get the current time to version the job

# Read the docs here for all possible args
# https://github.com/kubeflow/pipelines/blob/1.8.0/sdk/python/kfp/_client.py
client.run_pipeline(
    experiment_id=experiment.id,
    job_name=f"{PIPELINE_NAME}-{PIPELINE_VERSION}-{now}",
    version_id=pipeline.id,
    params={
        "data_url": "https://raw.githubusercontent.com/dunghoang369/test/main/sentences.txt"
    },
)

{'created_at': datetime.datetime(2024, 3, 29, 8, 51, 52, tzinfo=tzutc()),
 'description': None,
 'error': None,
 'finished_at': datetime.datetime(1970, 1, 1, 0, 0, tzinfo=tzutc()),
 'id': '6d9cc945-77e6-474d-9ec6-51ef8df64beb',
 'metrics': None,
 'name': 'tts_serving_pipeline-0.0.1-20240329155154',
 'pipeline_spec': {'parameters': [{'name': 'data_url',
                                   'value': 'https://raw.githubusercontent.com/dunghoang369/test/main/sentences.txt'}],
                   'pipeline_id': None,
                   'pipeline_manifest': None,
                   'pipeline_name': None,
                   'runtime_config': None,
                   'workflow_manifest': '{"kind":"Workflow","apiVersion":"argoproj.io/v1alpha1","metadata":{"generateName":"text-to-speech-serving-","creationTimestamp":null,"labels":{"pipelines.kubeflow.org/kfp_sdk_version":"1.8.3"},"annotations":{"pipelines.kubeflow.org/kfp_sdk_version":"1.8.3","pipelines.kubeflow.org/pipeline_compilation_time":"2024

# Create a recurring run

In [None]:
# Dont forget to disable recurring run in case you don't need anymore
client.create_recurring_run(
    experiment_id=experiment.id,
    job_name=f"{PIPELINE_NAME}-{PIPELINE_VERSION}-{now}",
    cron_expression="0 0 * * * *",  # hourly
    version_id=pipeline.id,
    params={
        "url": "https://raw.githubusercontent.com/quan-dang/kubeflow-tutorials/master/data/housing.csv"
    },
)