In [1]:
import json
import kfp
from kfp.components import InputPath, OutputPath
import kfp.dsl as dsl
from kfp.dsl import PipelineConf, data_passing_methods
from kubernetes.client.models import V1Volume, V1PersistentVolumeClaimVolumeSource
import os
from pydoc import importfile
import requests
from tensorflow import keras
from typing import List


%load_ext lab_black

In [2]:
KFP_CLIENT = kfp.Client()

with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace") as f:
    NAMESPACE = f.read()
NAMESPACE

'user-example-com'

In [3]:
component = os.path.join("trino", "component.yaml")
load_dataframe_via_trino_comp = kfp.components.load_component_from_file(component)

In [5]:
@dsl.pipeline(
    name="Component Test - Load Dataframe via Jtopen Trino",
    description="A simple component test",
)
def train_pipeline():
    load_dataframe_via_trino_comp(
        query="SELECT * FROM jtopen.demo.fraud limit 3",
        columns=None,
        columns_query="show columns from jtopen.demo.fraud",
    )

In [6]:
KFP_CLIENT.create_run_from_pipeline_func(
    train_pipeline, arguments={}, namespace=NAMESPACE
)

RunPipelineResult(run_id=0f97d5ab-0148-4a17-bdfb-d9fc0a6134da)

## Test Simple Fraud Pipeline

In [4]:
import os

ARGUMENTS = {
    "blackboard": "artefacts",
    "model_name": "fraud-detection",
    "cluster_configuration_secret": os.getenv(
        "CLUSTER_CONFIGURATION_SECRET", default=""
    ),
    "training_gpus": os.getenv("TRAINING_GPUS", default="1"),
    "training_node_selector": os.getenv("TRAINING_NODE_SELECTOR", default=""),
}

with open("/var/run/secrets/kubernetes.io/serviceaccount/namespace") as f:
    NAMESPACE = f.read()

In [7]:
def disable_cache_transformer(op):
    if isinstance(op, dsl.ContainerOp):
        op.execution_options.caching_strategy.max_cache_staleness = "P0D"
    else:
        op.add_pod_annotation(
            name="pipelines.kubeflow.org/max_cache_staleness", value="P0D"
        )
    return op


vol_claim = V1PersistentVolumeClaimVolumeSource(
    "{{workflow.name}}-%s" % ARGUMENTS["blackboard"]
)
print(vol_claim)
pipeline_conf = PipelineConf()
pipeline_conf.add_op_transformer(disable_cache_transformer)
# pipeline_conf.data_passing_method = data_passing_methods.KubernetesVolume(
#     volume=V1Volume(
#         name=ARGUMENTS["blackboard"],
#         persistent_volume_claim=V1PersistentVolumeClaimVolumeSource(
#             "{{workflow.name}}-%s" % ARGUMENTS["blackboard"]
#         ),
#     ),
#     path_prefix=f'{ARGUMENTS["blackboard"]}/',
# )

{'claim_name': '{{workflow.name}}-artefacts', 'read_only': None}


In [8]:
@dsl.pipeline(
    name="Debug Pipeline",
    description="A debugging pipeline",
)
def debug_pipeline(
    blackboard: str,
    model_name: str,
    cluster_configuration_secret: str,
    training_gpus: int,
    training_node_selector: str,
):
    load_dataframe_via_trino_comp(
        query="SELECT * FROM  jtopen.demo.fraud LIMIT 3",
        columns=None,
        columns_query="SHOW COLUMNS FROM jtopen.demo.fraud",
    )


kfp.Client().create_run_from_pipeline_func(
    debug_pipeline,
    arguments=ARGUMENTS,
    namespace=NAMESPACE,
    pipeline_conf=pipeline_conf,
)

RunPipelineResult(run_id=4e398caa-e149-45b0-95eb-592eb77e252e)