# Demo KFP pipeline

Install requirements:

In [None]:
%%bash

pip install kfp~=1.8.14

Imports:

In [1]:
import warnings
warnings.filterwarnings("ignore")

import kfp
import kfp.dsl as dsl
from kfp.aws import use_aws_secret
from kfp.v2.dsl import (
    component,
    Input,
    Output,
    Dataset,
)

## 1. Connect to client

The default way of accessing Kubeflow is via port-forward. This enables you to get started quickly without imposing any requirements on your environment. Run the following to port-forward Istio's Ingress-Gateway to local port `8080`:

```sh
kubectl port-forward svc/istio-ingressgateway -n istio-system 8080:80
```

In [2]:
import re
import requests
from urllib.parse import urlsplit

def get_istio_auth_session(url: str, username: str, password: str) -> dict:
    """
    Determine if the specified URL is secured by Dex and try to obtain a session cookie.
    WARNING: only Dex `staticPasswords` and `LDAP` authentication are currently supported
             (we default default to using `staticPasswords` if both are enabled)

    :param url: Kubeflow server URL, including protocol
    :param username: Dex `staticPasswords` or `LDAP` username
    :param password: Dex `staticPasswords` or `LDAP` password
    :return: auth session information
    """
    # define the default return object
    auth_session = {
        "endpoint_url": url,    # KF endpoint URL
        "redirect_url": None,   # KF redirect URL, if applicable
        "dex_login_url": None,  # Dex login URL (for POST of credentials)
        "is_secured": None,     # True if KF endpoint is secured
        "session_cookie": None  # Resulting session cookies in the form "key1=value1; key2=value2"
    }

    # use a persistent session (for cookies)
    with requests.Session() as s:

        ################
        # Determine if Endpoint is Secured
        ################
        resp = s.get(url, allow_redirects=True)
        if resp.status_code != 200:
            raise RuntimeError(
                f"HTTP status code '{resp.status_code}' for GET against: {url}"
            )

        auth_session["redirect_url"] = resp.url

        # if we were NOT redirected, then the endpoint is UNSECURED
        if len(resp.history) == 0:
            auth_session["is_secured"] = False
            return auth_session
        else:
            auth_session["is_secured"] = True

        ################
        # Get Dex Login URL
        ################
        redirect_url_obj = urlsplit(auth_session["redirect_url"])

        # if we are at `/auth?=xxxx` path, we need to select an auth type
        if re.search(r"/auth$", redirect_url_obj.path):

            #######
            # TIP: choose the default auth type by including ONE of the following
            #######

            # OPTION 1: set "staticPasswords" as default auth type
            redirect_url_obj = redirect_url_obj._replace(
                path=re.sub(r"/auth$", "/auth/local", redirect_url_obj.path)
            )
            # OPTION 2: set "ldap" as default auth type
            # redirect_url_obj = redirect_url_obj._replace(
            #     path=re.sub(r"/auth$", "/auth/ldap", redirect_url_obj.path)
            # )

        # if we are at `/auth/xxxx/login` path, then no further action is needed (we can use it for login POST)
        if re.search(r"/auth/.*/login$", redirect_url_obj.path):
            auth_session["dex_login_url"] = redirect_url_obj.geturl()

        # else, we need to be redirected to the actual login page
        else:
            # this GET should redirect us to the `/auth/xxxx/login` path
            resp = s.get(redirect_url_obj.geturl(), allow_redirects=True)
            if resp.status_code != 200:
                raise RuntimeError(
                    f"HTTP status code '{resp.status_code}' for GET against: {redirect_url_obj.geturl()}"
                )

            # set the login url
            auth_session["dex_login_url"] = resp.url

        ################
        # Attempt Dex Login
        ################
        resp = s.post(
            auth_session["dex_login_url"],
            data={"login": username, "password": password},
            allow_redirects=True
        )
        if len(resp.history) == 0:
            raise RuntimeError(
                f"Login credentials were probably invalid - "
                f"No redirect after POST to: {auth_session['dex_login_url']}"
            )

        # store the session cookies in a "key1=value1; key2=value2" string
        auth_session["session_cookie"] = "; ".join([f"{c.name}={c.value}" for c in s.cookies])

    return auth_session

In [3]:
import kfp

KUBEFLOW_ENDPOINT = "http://localhost:8080"
KUBEFLOW_USERNAME = "user@example.com"
KUBEFLOW_PASSWORD = "12341234"

auth_session = get_istio_auth_session(
    url=KUBEFLOW_ENDPOINT,
    username=KUBEFLOW_USERNAME,
    password=KUBEFLOW_PASSWORD
)

client = kfp.Client(host=f"{KUBEFLOW_ENDPOINT}/pipeline", cookies=auth_session["session_cookie"])
# print(client.list_experiments())

## 2. Components

There are different ways to define components in KFP. Here, we use the **@component** decorator to define the components as Python function-based components.

The **@component** annotation converts the function into a factory function that creates pipeline steps that execute this function. This example also specifies the base container image to run you component in.

Pull data component:

In [5]:
@component(
    base_image="python:3.10",
    packages_to_install=["pandas~=1.4.2"],
)
def pull_data(url: str, data: Output[Dataset]):
    """
    Pull data component.
    """
    import pandas as pd

    df = pd.read_csv(url, sep=";")
    df.to_csv(data.path, index=None)

Distributed processing component:

In [36]:
@component(
    base_image="python:3.8.13",
    packages_to_install=["pandas~=1.4.2", "scikit-learn~=1.0.2", "ray==2.2.0", "pydantic<2"],
)
def distributed_processing(
    data: Input[Dataset],
    processed_data: Output[Dataset],
):
    """
    Distributed processing component.
    """
    import ray
    import pandas as pd
    from sklearn.preprocessing import StandardScaler
    import numpy as np
    
    # Load dataset
    df = pd.read_csv(data.path)
    
    # Initialize Ray
    runtime_env = {
    "pip": ["scikit-learn~=1.0.2"],
    }
    ray.init(
        address="ray://raycluster-kuberay-head-svc.default.svc.cluster.local:10001",
        runtime_env=runtime_env,
    )
    print(ray.cluster_resources())
    
    # Define a Ray task for normalization
    @ray.remote
    def normalize_data(data):
        scaler = StandardScaler()
        return scaler.fit_transform(data)
    
    # Split the DataFrame into smaller chunks for parallel processing
    df_chunks = df.to_numpy()
    chunk_size = int(len(df) // ray.available_resources()["CPU"])  # Assuming one chunk per CPU
    df_chunks = [df_chunks[i:i + chunk_size] for i in range(0, len(df_chunks), chunk_size)]
    
    # Distribute chunks across Ray cluster
    future_results = [normalize_data.remote(chunk) for chunk in df_chunks]
    
    # Gather and combine results
    normalized_chunks = ray.get(future_results)
    processed_df = pd.DataFrame(np.concatenate(normalized_chunks), columns=df.columns)
    
    # Shut down Ray
    ray.shutdown()
    
    processed_df.to_csv(processed_data.path, index=None)

## 3. Pipeline

Pipeline definition:

In [37]:
@dsl.pipeline(
      name='demo-ray-kfp-pipeline',
      description='An example pipeline with Ray for distributed processing.',
)
def pipeline(url: str):
    pull_task = pull_data(url=url)
    preprocess_task = distributed_processing(data=pull_task.outputs["data"])

Pipeline arguments:

In [38]:
# Specify pipeline argument values
arguments = {
    "url": "http://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv",
}

## 4. Submit run

In [39]:
run_name = "demo-run-ray-kfp"
experiment_name = "demo-ray-kfp-experiment"

client.create_run_from_pipeline_func(
    pipeline_func=pipeline,
    run_name=run_name,
    experiment_name=experiment_name,
    arguments=arguments,
    mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE,
    enable_caching=True,
    namespace="kubeflow-user-example-com"
)

RunPipelineResult(run_id=43d75315-e3dc-448c-ae8a-59c916a88e71)

## 5. Check run

### Kubeflow Pipelines UI

The default way of accessing Kubeflow is via port-forward. This enables you to get started quickly without imposing any requirements on your environment. Run the following to port-forward Istio's Ingress-Gateway to local port `8080`:

```sh
kubectl port-forward svc/istio-ingressgateway -n istio-system 8080:80
```

After running the command, you can access the Kubeflow Central Dashboard by doing the following:

1. Open your browser and visit [http://localhost:8080/](http://localhost:8080/). You should get the Dex login screen.
2. Login with the default user's credential. The default email address is `user@example.com` and the default password is `12341234`.

### Access Ray dashboard

Run a port-forward from your local computer to the ray dashboard:

`kubectl port-forward raycluster-kuberay-head-862db 8265:8265 -n default`

Go to http://localhost:8265/ and you should see the Ray dashboard.