In [1]:
!pip install -r requirements.txt -q

[0m

In [2]:
import kfp
from kfp import dsl
from kfp.components import (
    InputPath,
    InputTextFile,
    OutputPath,
    OutputTextFile,
    func_to_container_op,
)

import pandas as pd
from typing import NamedTuple

import sys

sys.path.insert(0, "..")
from constants import NAMESPACE, HOST, NAMESPACE
from utils import get_session_cookie, get_or_create_experiment, get_or_create_pipeline

In [3]:
# Where all the runs belong to the pipeline reside in
EXPERIMENT_NAME = "underwrite-model-boosting"

In [None]:
# The first component to download data, train-test split
# and then dump all the data for downstream components to use
def prepare_data(
    url: str,
    X_train_path: OutputPath("PKL"),
    y_train_path: OutputPath("PKL"),
    X_val_path: OutputPath("PKL"),
    y_val_path: OutputPath("PKL"),
    X_test_path: OutputPath("PKL"),
    y_test_path: OutputPath("PKL"),
):
    import pandas as pd
    import wget
    from sklearn.model_selection import train_test_split
    import joblib
    import os
    import boto3

    # Download dataset to local
    wget.download(url)

    # Create X and y
    df = pd.read_csv("housing.csv")
    X = df.drop(columns=["price"])
    y = df["price"]

    # Create train and test set
    X_train, X_test, y_train, y_test = train_test_split(
        X, y, test_size=0.1, random_state=42
    )

    # Continue to split train set into train and validation sets
    X_train, X_val, y_train, y_val = train_test_split(
        X_train, y_train, test_size=0.1, random_state=42
    )

    # Dump data to pkl for downstream components to use
    joblib.dump(X_train, X_train_path)
    joblib.dump(y_train, y_train_path)
    joblib.dump(X_val, X_val_path)
    joblib.dump(y_val, y_val_path)
    joblib.dump(X_test, X_test_path)
    joblib.dump(y_test, y_test_path)


# Instead of using create_component_from_func,
# you can use this instead
prepare_data_op = func_to_container_op(
    func=prepare_data,
    packages_to_install=[
        "scikit-learn==1.3.2",
        "joblib==1.3.2",
        "pandas==2.1.3",
        "numpy==1.26.2"
        "wget==3.2",
    ],
)

In [2]:
@kfp.dsl.component
def download_csv_op(pvc_path: kfp.dsl.OutputPath(str)):
    import boto3
    import os

    s3 = boto3.client(
        's3',
        endpoint_url='http://minio-service.kubeflow.svc.cluster.local:9000',
        aws_access_key_id='minio',
        aws_secret_access_key='minio123',
    )

    os.makedirs(os.path.dirname(pvc_path), exist_ok=True)
    s3.download_file('sample-data', 'data/application_train.csv', pvc_path)
