In [4]:
import kfp
from kfp.v2 import dsl
from kfp.v2.dsl import (
    component,
    Input,
    Output,
    Artifact,
    Dataset,
    ClassificationMetrics,
    Metrics,
    HTML,
    Markdown
)

from typing import NamedTuple
@component(
    packages_to_install=[],
    base_image="python:3.8"
)
def get_data_batch() -> NamedTuple('MyFunctionOutputs', [('output_name_1', str), ('output_name_2', str)]):
    print("getting data")

    # Exports a sample tensorboard:
    metadata = {
      'outputs' : [{
        'type': 'tensorboard',
        'source': 'gs://ml-pipeline-dataset/tensorboard-train',
      }]
    }

    # Exports two sample metrics:
    metrics = {
      'metrics': [{
          'name': 'quotient',
          'numberValue':  float(2),
        },{
          'name': 'remainder',
          'numberValue':  float(3),
        }]}

    from collections import namedtuple
    import json
    divmod_output = namedtuple('MyFunctionOutputs', ['output_name_1', 'output_name_2'])
    return divmod_output("test", "test2")

@component(
    packages_to_install=["sklearn"],
    base_image="python:3.8"
)
def get_latest_data(metrics: Output[ClassificationMetrics]):
    print("Getting latest data")
    from sklearn.metrics import confusion_matrix
    
    train_y = [1, 0, 0, 1, 0, 1, 0, 0, 1, 0, 1, 0]
    predictions = [1, 1, 0, 1, 0, 1, 1, 0, 1, 0, 0, 0]
    
    metrics.log_confusion_matrix(
        ['Setosa', 'Versicolour'],
        confusion_matrix(train_y, predictions).tolist() # .tolist() to convert np array to list.
    )


@component(
    packages_to_install=[],
    base_image="public.ecr.aws/j1r0q0g6/notebooks/notebook-servers/jupyter-tensorflow-full:v1.4"
)
def reshape_data():
    print("reshaping data")


@component(
    packages_to_install=[],
    base_image="public.ecr.aws/j1r0q0g6/notebooks/notebook-servers/jupyter-tensorflow-full:v1.4"
)
def model_building(no_epochs:int):
    print("model building")
    print(no_epochs)
    print(type(no_epochs))
    

@dsl.pipeline(
    name='output-test',
    description='test outputs'
)
def output_test(no_epochs:int):
    step1_1 = get_data_batch()
    step1_2 = get_latest_data()
    
    step2 = reshape_data()
    step2.after(step1_1)
    step2.after(step1_2)
    
    step3 = model_building(no_epochs)
    step3.after(step2)


if __name__ == "__main__":
    client = kfp.Client()

    arguments = {
        "no_epochs" : 3
    }

    run_directly = 1
    
    if (run_directly == 1):
        client.create_run_from_pipeline_func(output_test,arguments=arguments,experiment_name="test",mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE)
    else:
        kfp.compiler.Compiler(mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE).compile(pipeline_func=output_test,package_path='output_test.yaml')
        client.upload_pipeline_version(pipeline_package_path='output_test.yaml',pipeline_version_name="0.3",pipeline_name="pipeline test",description="just for testing")