## create pipeline

In [7]:
# Copyright 2021 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the `License.
"""Sample pipeline for passing data in KFP."""
from typing import Dict, List
from kfp import compiler
from kfp import dsl
from kfp.dsl import component
from kfp.dsl import Dataset
from kfp.dsl import Input
from kfp.dsl import InputPath
from kfp.dsl import Model
from kfp.dsl import Output
from kfp.dsl import OutputPath


@component
def preprocess(
    # An input parameter of type string.
    message: str,
    # An input parameter of type dict.
    input_dict_parameter: Dict[str, int],
    # An input parameter of type list.
    input_list_parameter: List[str],
    # Use Output[T] to get a metadata-rich handle to the output artifact
    # of type `Dataset`.
    output_dataset_one: Output[Dataset],
    # A locally accessible filepath for another output artifact of type
    # `Dataset`.
    output_dataset_two_path: OutputPath('Dataset'),
    # A locally accessible filepath for an output parameter of type string.
    output_parameter_path: OutputPath(str),
    # A locally accessible filepath for an output parameter of type bool.
    output_bool_parameter_path: OutputPath(bool),
    # A locally accessible filepath for an output parameter of type dict.
    output_dict_parameter_path: OutputPath(Dict[str, int]),
    # A locally accessible filepath for an output parameter of type list.
    output_list_parameter_path: OutputPath(List[str]),
):
    """Dummy preprocessing step."""

    # Use Dataset.path to access a local file path for writing.
    # One can also use Dataset.uri to access the actual URI file path.
    with open(output_dataset_one.path, 'w') as f:
        f.write(message)

    # OutputPath is used to just pass the local file path of the output artifact
    # to the function.
    with open(output_dataset_two_path, 'w') as f:
        f.write(message)

    with open(output_parameter_path, 'w') as f:
        f.write(message)

    with open(output_bool_parameter_path, 'w') as f:
        f.write(
            str(True))  # use either `str()` or `json.dumps()` for bool values.

    import json
    with open(output_dict_parameter_path, 'w') as f:
        f.write(json.dumps(input_dict_parameter))

    with open(output_list_parameter_path, 'w') as f:
        f.write(json.dumps(input_list_parameter))


@component
def train(
    # Use InputPath to get a locally accessible path for the input artifact
    # of type `Dataset`.
    dataset_one_path: InputPath('Dataset'),
    # Use Input[T] to get a metadata-rich handle to the input artifact
    # of type `Dataset`.
    dataset_two: Input[Dataset],
    # An input parameter of type string.
    message: str,
    # Use Output[T] to get a metadata-rich handle to the output artifact
    # of type `Dataset`.
    model: Output[Model],
    # An input parameter of type bool.
    input_bool: bool,
    # An input parameter of type dict.
    input_dict: Dict[str, int],
    # An input parameter of type List[str].
    input_list: List[str],
    # An input parameter of type int with a default value.
    num_steps: int = 100,
):
    """Dummy Training step."""
    with open(dataset_one_path) as input_file:
        dataset_one_contents = input_file.read()

    with open(dataset_two.path) as input_file:
        dataset_two_contents = input_file.read()

    line = (f'dataset_one_contents: {dataset_one_contents} || '
            f'dataset_two_contents: {dataset_two_contents} || '
            f'message: {message} || '
            f'input_bool: {input_bool}, type {type(input_bool)} || '
            f'input_dict: {input_dict}, type {type(input_dict)} || '
            f'input_list: {input_list}, type {type(input_list)} \n')
    with open(model.path, 'w') as output_file:
        for i in range(num_steps):
            output_file.write(f'Step {i}\n{line}\n=====\n')

    # model is an instance of Model artifact, which has a .metadata dictionary
    # to store arbitrary metadata for the output artifact.
    model.metadata['accuracy'] = 0.9


@dsl.pipeline(name='my-test-pipeline-beta')
def pipeline(message: str, input_dict: Dict[str, int] = {'A': 1, 'B': 2}):

    preprocess_task = preprocess(
        message=message,
        input_dict_parameter=input_dict,
        input_list_parameter=['a', 'b', 'c'],
    )
    train_task = train(
        dataset_one_path=preprocess_task.outputs['output_dataset_one'],
        dataset_two=preprocess_task.outputs['output_dataset_two_path'],
        message=preprocess_task.outputs['output_parameter_path'],
        input_bool=preprocess_task.outputs['output_bool_parameter_path'],
        input_dict=preprocess_task.outputs['output_dict_parameter_path'],
        input_list=preprocess_task.outputs['output_list_parameter_path'],
    )

import os
current_directory = os.getcwd()
yaml_file = os.path.join(current_directory, 'pipeline.yaml')
compiler.Compiler().compile(pipeline_func=pipeline, package_path=yaml_file)

# if __name__ == '__main__':
#     compiler.Compiler().compile(
#         pipeline_func=pipeline, package_path=__file__.replace('.py', '.yaml'))


In [5]:
import kfp
client = kfp.Client(host='http://98.64.205.69')
print(client.list_experiments())

{'experiments': None, 'next_page_token': None, 'total_size': None}


## upload pipeline with pipeline yaml file

In [17]:
 pipeline_file_path = 'pipeline.yaml' # extract it from your database
 pipeline_name = 'test ppl name'

 client = kfp.Client()
 pipeline = client.pipeline_uploads.upload_pipeline(
                                pipeline_file_path, name=pipeline_name)

ApiException: (500)
Reason: Internal Server Error
HTTP response headers: HTTPHeaderDict({'date': 'Thu, 20 Jun 2024 14:59:23 GMT', 'content-length': '483', 'content-type': 'text/plain; charset=utf-8', 'x-envoy-upstream-service-time': '24', 'server': 'envoy'})
HTTP response body: {"error_message":"Failed to create a pipeline and a pipeline version: Failed to create a pipeline and a pipeline version: Already exist error: Failed to create a new pipeline. The name test ppl name already exists. Please specify a new name","error_details":"Failed to create a pipeline and a pipeline version: Failed to create a pipeline and a pipeline version: Already exist error: Failed to create a new pipeline. The name test ppl name already exists. Please specify a new name"}


In [20]:
# Copyright 2021 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Sample pipeline for passing data in KFP."""
from typing import Dict, List
from kfp import compiler
from kfp import dsl
from kfp.dsl import component
from kfp.dsl import Dataset
from kfp.dsl import Input
from kfp.dsl import InputPath
from kfp.dsl import Model
from kfp.dsl import Output
from kfp.dsl import OutputPath


@component
def preprocess(
    # An input parameter of type string.
    message: str,
    # An input parameter of type dict.
    input_dict_parameter: Dict[str, int],
    # An input parameter of type list.
    input_list_parameter: List[str],
    # Use Output[T] to get a metadata-rich handle to the output artifact
    # of type `Dataset`.
    output_dataset_one: Output[Dataset],
    # A locally accessible filepath for another output artifact of type
    # `Dataset`.
    output_dataset_two_path: OutputPath('Dataset'),
    # A locally accessible filepath for an output parameter of type string.
    output_parameter_path: OutputPath(str),
    # A locally accessible filepath for an output parameter of type bool.
    output_bool_parameter_path: OutputPath(bool),
    # A locally accessible filepath for an output parameter of type dict.
    output_dict_parameter_path: OutputPath(Dict[str, int]),
    # A locally accessible filepath for an output parameter of type list.
    output_list_parameter_path: OutputPath(List[str]),
):
    """Dummy preprocessing step."""

    # Use Dataset.path to access a local file path for writing.
    # One can also use Dataset.uri to access the actual URI file path.
    with open(output_dataset_one.path, 'w') as f:
        f.write(message)

    # OutputPath is used to just pass the local file path of the output artifact
    # to the function.
    with open(output_dataset_two_path, 'w') as f:
        f.write(message)

    with open(output_parameter_path, 'w') as f:
        f.write(message)

    with open(output_bool_parameter_path, 'w') as f:
        f.write(
            str(True))  # use either `str()` or `json.dumps()` for bool values.

    import json
    with open(output_dict_parameter_path, 'w') as f:
        f.write(json.dumps(input_dict_parameter))

    with open(output_list_parameter_path, 'w') as f:
        f.write(json.dumps(input_list_parameter))


@component
def train(
    # Use InputPath to get a locally accessible path for the input artifact
    # of type `Dataset`.
    dataset_one_path: InputPath('Dataset'),
    # Use Input[T] to get a metadata-rich handle to the input artifact
    # of type `Dataset`.
    dataset_two: Input[Dataset],
    # An input parameter of type string.
    message: str,
    # Use Output[T] to get a metadata-rich handle to the output artifact
    # of type `Dataset`.
    model: Output[Model],
    # An input parameter of type bool.
    input_bool: bool,
    # An input parameter of type dict.
    input_dict: Dict[str, int],
    # An input parameter of type List[str].
    input_list: List[str],
    # An input parameter of type int with a default value.
    num_steps: int = 100,
):
    """Dummy Training step."""
    with open(dataset_one_path) as input_file:
        dataset_one_contents = input_file.read()

    with open(dataset_two.path) as input_file:
        dataset_two_contents = input_file.read()

    line = (f'dataset_one_contents: {dataset_one_contents} || '
            f'dataset_two_contents: {dataset_two_contents} || '
            f'message: {message} || '
            f'input_bool: {input_bool}, type {type(input_bool)} || '
            f'input_dict: {input_dict}, type {type(input_dict)} || '
            f'input_list: {input_list}, type {type(input_list)} \n')
    with open(model.path, 'w') as output_file:
        for i in range(num_steps):
            output_file.write(f'Step {i}\n{line}\n=====\n')

    # model is an instance of Model artifact, which has a .metadata dictionary
    # to store arbitrary metadata for the output artifact.
    model.metadata['accuracy'] = 0.9


@dsl.pipeline(name='my-test-pipeline-beta')
def pipeline(message: str, input_dict: Dict[str, int] = {'A': 1, 'B': 2}):

    preprocess_task = preprocess(
        message=message,
        input_dict_parameter=input_dict,
        input_list_parameter=['a', 'b', 'c'],
    )
    train_task = train(
        dataset_one_path=preprocess_task.outputs['output_dataset_one'],
        dataset_two=preprocess_task.outputs['output_dataset_two_path'],
        message=preprocess_task.outputs['output_parameter_path'],
        input_bool=preprocess_task.outputs['output_bool_parameter_path'],
        input_dict=preprocess_task.outputs['output_dict_parameter_path'],
        input_list=preprocess_task.outputs['output_list_parameter_path'],
    )

import os
current_directory = os.getcwd()
yaml_file = os.path.join(current_directory, 'nnncc.yaml')
compiler.Compiler().compile(pipeline_func=pipeline, package_path=yaml_file)


pipeline_file_path = 'nnncc.yaml' # extract it from your database
pipeline_name = 'nnncc'

client = kfp.Client()
pipeline = client.pipeline_uploads.upload_pipeline(
                                pipeline_file_path, name=pipeline_name)

In [4]:
import kfp
print(kfp.__version__)


2.4.0


In [34]:
import kfp
from kfp import Client

client = kfp.Client()

# 列出所有的 pipelines
pipelines = client.list_pipelines()
for pipeline in pipelines.pipelines:
    print("Pipeline ID:", pipeline._pipeline_id)
    print("Pipeline Name:", pipeline._display_name)

    # 获取并打印该 pipeline 的所有版本信息
    versions = client.list_pipeline_versions(pipeline_id=pipeline._pipeline_id)

Pipeline ID: 97838453-7a91-42b8-a277-c4c550a9e2c4
Pipeline Name: [Tutorial] Data passing in python components
Pipeline ID: c74e3130-3806-4955-a299-c32f1e22bccd
Pipeline Name: [Tutorial] DSL - Control structures
Pipeline ID: d0427560-86fd-42f1-a42b-314195292746
Pipeline Name: test ppl name
Pipeline ID: cf2d56ea-d345-4f2a-bf71-95e935224ebe
Pipeline Name: test ppl name02
Pipeline ID: fcb42b6e-66f1-4e32-8a11-0faeed19d81b
Pipeline Name: nnn
Pipeline ID: 903f88fc-2255-488e-9668-d806808948a2
Pipeline Name: nnncc


In [1]:
import kfp
from kfp import Client

client = kfp.Client()

# 列出所有的 pipelines
pipelines = client.list_pipelines()
for pipeline in pipelines.pipelines:
    print("Pipeline ID:", pipeline._pipeline_id)
    print("Pipeline Name:", pipeline._display_name)

    # 获取该 pipeline 的版本信息
    versions_response = client.list_pipeline_versions(pipeline_id=pipeline._pipeline_id)
    print(dir(versions_response))  # 打印所有属性和方法，以便了解如何正确访问版本信息


Pipeline ID: 97838453-7a91-42b8-a277-c4c550a9e2c4
Pipeline Name: [Tutorial] Data passing in python components
['__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__getstate__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_next_page_token', '_pipeline_versions', '_total_size', 'attribute_map', 'discriminator', 'local_vars_configuration', 'next_page_token', 'openapi_types', 'pipeline_versions', 'to_dict', 'to_str', 'total_size']
Pipeline ID: c74e3130-3806-4955-a299-c32f1e22bccd
Pipeline Name: [Tutorial] DSL - Control structures
['__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__getstate__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module



In [9]:
if hasattr(versions_response, 'pipeline_versions') and versions_response.pipeline_versions:
    version_example = versions_response.pipeline_versions[0]
    print(dir(version_example))
else:
    print("No versions or unable to access a version example.")


['__class__', '__delattr__', '__dict__', '__dir__', '__doc__', '__eq__', '__format__', '__ge__', '__getattribute__', '__getstate__', '__gt__', '__hash__', '__init__', '__init_subclass__', '__le__', '__lt__', '__module__', '__ne__', '__new__', '__reduce__', '__reduce_ex__', '__repr__', '__setattr__', '__sizeof__', '__str__', '__subclasshook__', '__weakref__', '_code_source_url', '_created_at', '_description', '_display_name', '_error', '_package_url', '_pipeline_id', '_pipeline_spec', '_pipeline_version_id', 'attribute_map', 'code_source_url', 'created_at', 'description', 'discriminator', 'display_name', 'error', 'local_vars_configuration', 'openapi_types', 'package_url', 'pipeline_id', 'pipeline_spec', 'pipeline_version_id', 'to_dict', 'to_str']


In [36]:
import kfp
from kfp import Client

# Connect to Kubeflow Pipelines
client = kfp.Client()

# Get the pipeline ID by name
pipeline_name = 'nnncc'
pipeline_id = client.get_pipeline_id(pipeline_name)
print(f"Pipeline ID for '{pipeline_name}': {pipeline_id}")

# Get all versions for the specified pipeline
versions_response = client.list_pipeline_versions(pipeline_id=pipeline_id)


# Print each version's ID and Display Name
if versions_response.pipeline_versions:
    for version in versions_response.pipeline_versions:
        # 使用 _pipeline_version_id 属性获取版本 ID
        print(f"Version ID: {version._pipeline_version_id}, Version Name: {version.display_name}")
else:
    print("No versions found for the given pipeline.")

print(versions_response.pipeline_versions[0]._pipeline_version_id)

Pipeline ID for 'nnncc': 903f88fc-2255-488e-9668-d806808948a2
Version ID: 5628dfd8-4e10-4afd-8007-87f1c1bd7e21, Version Name: nnncc
5628dfd8-4e10-4afd-8007-87f1c1bd7e21


In [1]:
import kfp
from kfp import Client
from random import randint, choice, sample
from string import ascii_letters


# connect to Kubeflow Pipelines
client = kfp.Client()

# check pipeline id
pipeline_id = client.get_pipeline_id('nnncc')
print(f"pipeline_id:{pipeline_id}")  # --> same
# print(f"pipeline._pipeline_id:{pipeline._pipeline_id}")
# Get all versions for the specified pipeline
versions_response = client.list_pipeline_versions(pipeline_id=pipeline_id)
version_id = versions_response.pipeline_versions[0]._pipeline_version_id

input_dict_parameter = {k: randint(1, 100) for k in sample(ascii_letters, 10)}
input_list_parameter = [''.join(choice(ascii_letters) for _ in range(10)) for _ in range(5)]


# set Pipeline params
params = {
    'message': 'Hello, this is a test. This is a complex test message with multiple characters and numbers 1234567890',
    # 'input_dict_parameter': input_dict_parameter,
    # 'input_list_parameter': input_list_parameter
}
print(f"params:{params}")   
# create an experiment
experiment = client.create_experiment(name='Test Experiment')
print(f"experiment.experiment_id:{experiment.experiment_id}") 

# run  Pipeline, 'My Pipeline Run' is run name
# run = client.run_pipeline(experiment.experiment_id, 'My Pipeline Run', pipeline_id=pipeline_id, params=params, version_id=version_id)
try:
    run = client.run_pipeline(experiment.experiment_id, 'My Pipeline Run', pipeline_id=pipeline_id, params=params, version_id=version_id)
    print(f"Run ID: {run.run_id}")
except Exception as e:
    print(f"Failed to start pipeline run: {e}")




pipeline_id:None


ApiValueError: Missing the required parameter `pipeline_id` when calling `list_pipeline_versions`

In [41]:
import kfp
from kfp import compiler
from kfp import dsl
from kfp.dsl import component
from kfp.dsl import Dataset
from kfp.dsl import Input
from kfp.dsl import InputPath
from kfp.dsl import Model
from kfp.dsl import Output
from kfp.dsl import OutputPath
import os

import pandas as pd


    
@component(packages_to_install=["pandas"])
def getdata(url: str, prepared_csv: OutputPath()) -> None:
    import urllib.request
    import tarfile
    import pandas as pd
    import glob
    # get data from url and unzip
    with urllib.request.urlopen(url) as res:
        tarfile.open(fileobj=res, mode="r|gz").extractall('data')
    
    #read unzip data into dataframe
    df = pd.concat([pd.read_csv(csv_file, header=None)
                    for csv_file in glob.glob('data/*.csv')])
    with open(prepared_csv, "w") as f:
        df.to_csv(f, index=False)
    print('--- inside getdata ---')
    print(f"df: {df}")

    
@component(packages_to_install=["pandas"])
# def preprocess(csv_in_file: InputPath(), preprocessed_output_csv: OutputPath())-> None:
def preprocess(csv_in_file: InputPath(), preprocessed_output_csv: OutputPath()):
    import pandas as pd
    print(f"csv_in_file: {csv_in_file}")
    
    with open(csv_in_file, 'r') as f:
        df = pd.read_csv(f)
        
    # preprocess here
    with open(preprocessed_output_csv, 'w') as f:
        df.to_csv(f, index=False)
    
    print('--- inside preprocess ---')
    print(f"df: {df}")



@component(packages_to_install=["pandas"])
def train(csv_in_file: InputPath()):

    with open(csv_in_file) as f:
        df = pd.read_csv(f)
        
    print('--- inside train ---')
    print(f"df: {df}")
    


@dsl.pipeline(name='my-test-pipeline-beta')
def pipeline(url: str):
    prepare_data_task = getdata(url=url)
    print(prepare_data_task)
    preprocess_task = preprocess(
        csv_in_file=prepare_data_task.outputs['prepared_csv']
    )
#     prepare_data = getdata(url=url)
#     print(prepare_data)

#     preprocess_data = preprocess(
#         csv_in_file = prepare_data.outputs['prepared_csv']  
#     )
    

    
    
namespace='kubeflow-user-example-com'
project_name='hello-world-csv'

client = kfp.Client()
    
#Compile the pipeline to YAML
try:
    yaml_file = os.path.join(os.getcwd(), project_name+'.yaml')
    compiler.Compiler().compile(pipeline_func=pipeline, package_path=yaml_file)
    print(f"Successfully compiled to yaml...")
except Exception as e:
    print(f"Failed to compile to YAML: {e}")


# Create an experiment
try:
    experiment = client.create_experiment(name=project_name+'-expr', namespace=namespace)
    print(f"The experiment created with ID==>{experiment.experiment_id}") 
except Exception as e:
    print(f"Failed to Create the experiment: {e}")


#Upload the pipeline
try:
    
    #Create shared pipeline
    #pipeline = client.pipeline_uploads.upload_pipeline(yaml_file, name='sample-pipeline')
    
    #Create private pipeline
    pipeline = client.pipeline_uploads.upload_pipeline(yaml_file, name=project_name+'-pipeline', namespace=namespace)
    
    pipeline_id = pipeline.pipeline_id
    
    versions_response = client.list_pipeline_versions(pipeline_id=pipeline_id)
    version_id = versions_response.pipeline_versions[0]._pipeline_version_id
    
    print(f"The yaml file uploaded to a new pipeline with PipelineID: {pipeline_id}, PipelineVersion: {version_id}")
    
except Exception as e:
    print(f"Failed upload the pipeline: {e}")


params = {
    'url': 'https://storage.googleapis.com/ml-pipeline-playground/iris-csv-files.tar.gz'
}


#Create a RUN
try:
    run = client.run_pipeline(experiment.experiment_id, project_name+'-run', pipeline_id=pipeline_id, params=params, version_id=version_id)
    print(f"Run ID: {run.run_id}")
except Exception as e:
    print(f"Failed to run the pipeline: {e}")


<kfp.dsl.pipeline_task.PipelineTask object at 0x7f5096b09c50>
Successfully compiled to yaml...


The experiment created with ID==>e8e056b5-9d65-436d-8f97-2d628413b178
Failed upload the pipeline: (500)
Reason: Internal Server Error
HTTP response headers: HTTPHeaderDict({'date': 'Tue, 25 Jun 2024 10:12:00 GMT', 'content-length': '505', 'content-type': 'text/plain; charset=utf-8', 'x-envoy-upstream-service-time': '63', 'server': 'envoy'})
HTTP response body: {"error_message":"Failed to create a pipeline and a pipeline version: Failed to create a pipeline and a pipeline version: Already exist error: Failed to create a new pipeline. The name hello-world-csv-pipeline already exists. Please specify a new name","error_details":"Failed to create a pipeline and a pipeline version: Failed to create a pipeline and a pipeline version: Already exist error: Failed to create a new pipeline. The name hello-world-csv-pipeline already exists. Please specify a new name"}

Failed to run the pipeline: name 'version_id' is not defined


In [40]:
import kfp
from kfp import compiler
# from kfp import dsl
# from kfp.dsl import component
# from kfp.dsl import Input
# from kfp.dsl import Output
from kfp.dsl import Dataset

import io
import logging
from minio import Minio
from minio.error import S3Error
import pandas as pd
import requests
from typing import NamedTuple
from kfp.v2 import dsl
from kfp.v2.dsl import component, Output, Input


@component(packages_to_install=['minio==7.1.14'])
# def table_data_exists(bucket: str, table_code: str, year: int) -> bool:
def table_data_exists(bucket: str, table_code: str, year: int) -> Output[bool]:
    """
   
    Check for the existence of Census table data in MinIO.
    """
    object_name=f'{table_code}-{year}.csv'

    logger = logging.getLogger('kfp_logger')
    logger.setLevel(logging.INFO)
    logger.info(bucket)
    logger.info(table_code)
    logger.info(year)
    logger.info(object_name)
  
    try:
        # Create client with access and secret key.
        client = Minio('10.0.84.43:80',
                   'minio',
                   'minio123',
                   secure=False)

        bucket_found = client.bucket_exists(bucket)
        if not bucket_found:
            return False

        objects = client.list_objects(bucket)
        found = False
        for obj in objects:
            logger.info(obj.object_name)
            if object_name == obj.object_name: found = True

    except S3Error as s3_err:
        logger.error(f'S3 Error occurred: {s3_err}.')
    except Error as err:
        logger.error(f'Error occurred: {err}.')

    return found


@component(packages_to_install=['pandas==1.3.5', 'requests'])
def download_table_data(dataset: str, table_code: str, year: int, table_df: Output[Dataset]):
    """
    Returns all fields for the specified table. The output is a DataFrame saved to csv.
    """
    logger = logging.getLogger('kfp_logger')
    logger.setLevel(logging.INFO)

    census_endpoint = f'https://api.census.gov/data/{year}/{dataset}'
    census_key = 'Census key here.'
   
   # Setup a simple dictionary for the requests parameters.
    get_token = f'group({table_code})'
    params = {'key': census_key,
             'get': get_token,
             'for': 'county:*'
             }

    # sending get request and saving the response as response object
    response = requests.get(url=census_endpoint, params=params)
   
   # Extract the data in json format.
   # The first row of our matrix contains the column names. The remaining rows
   # are the data.
    survey_data = response.json()
    df = pd.DataFrame(survey_data[1:], columns = survey_data[0])
    df.to_csv(table_df.path, index=False)
    logger.info(f'Table {table_code} for {year} has been downloaded.')


@component(packages_to_install=['pandas==1.3.5', 'minio==7.1.14'])
def save_table_data(bucket: str, table_code: str, year: int, table_df: Input[Dataset]):
    

    object_name=f'{table_code}-{year}.csv'

    logger = logging.getLogger('kfp_logger')
    logger.setLevel(logging.INFO)
    logger.info(bucket)
    logger.info(table_code)
    logger.info(year)
    logger.info(object_name)

    df = pd.read_csv(table_df.path)

    try:
       # Create client with access and secret key
        client = Minio('10.0.84.43:80',
                   'minio',
                   'minio123',
                   secure=False)

       # Make the bucket if it does not exist.
        found = client.bucket_exists(bucket)
        if not found:
            logger.info(f'Creating bucket: {bucket}.')
            client.make_bucket(bucket)

       # Upload the dataframe as an object.
        encoded_df = df.to_csv(index=False).encode('utf-8')
        client.put_object(bucket, object_name, data=io.BytesIO(encoded_df), length=len(encoded_df), content_type='application/csv')
        logger.info(f'{object_name} successfully uploaded to bucket {bucket}.')
        logger.info(f'Object length: {len(df)}.')

    except S3Error as s3_err:
        logger.error(f'S3 Error occurred: {s3_err}.')
    except Error as err:
        logger.error(f'Error occurred: {err}.')


@component(packages_to_install=['pandas==1.3.5', 'minio==7.1.14'])
def get_table_data(bucket: str, table_code: str, year: int, table_df: Output[Dataset]):

    object_name=f'{table_code}-{year}.csv'

    logger = logging.getLogger('kfp_logger')
    logger.setLevel(logging.INFO)
    logger.info(bucket)
    logger.info(table_code)
    logger.info(year)
    logger.info(object_name)

    # Get data of an object.
    try:
        
        # Create client with access and secret key
        client = Minio('10.0.84.43:80',
                   'minio',
                   'minio123',
                   secure=False)

        response = client.get_object(bucket, object_name)
        df = pd.read_csv(io.BytesIO(response.data))
        df.to_csv(table_df.path, index=False)
        logger.info(f'Object: {object_name} has been retrieved from bucket: {bucket} in MinIO object storage.')
        logger.info(f'Object length: {len(df)}.')

    except S3Error as s3_err:
        logger.error(f'S3 Error occurred: {s3_err}.')
    except Error as err:
        logger.error(f'Error occurred: {err}.')

    finally:
        response.close()
        response.release_conn()

        

@dsl.pipeline(
   name='census-pipeline',
   description='Pipeline that will download Census data and save to MinIO.'
)
def census_pipeline(bucket: str, dataset: str, table_code: str, year: int):

    # initiate result param
    # result_table_data = None
    exists = table_data_exists(bucket=bucket, table_code=table_code, year=year)

    with dsl.Condition(exists.output == 'True', name='Data_Exists'): 
        table_data = download_table_data(dataset=dataset, table_code=table_code, year=year)
        save_table_data(bucket=bucket,
                      table_code=table_code,
                      year=year,
                      table_df=table_data.outputs['table_df'])
        result_table_data = table_data.outputs['table_df']
    with dsl.Condition(exists.output == 'False', name='Data_Not_Exists'):
        result_table_data = get_table_data(bucket=bucket, table_code=table_code, year=year)        

    return result_table_data




  with dsl.Condition(exists.output == 'True', name='Data_Exists'):
  with dsl.Condition(exists.output == 'False', name='Data_Not_Exists'):


ValueError: Got unknown pipeline output: <kfp.dsl.pipeline_task.PipelineTask object at 0x7f50966ee150>

In [4]:
import kfp
from kfp import compiler
from kfp import dsl
from kfp.dsl import component
from kfp.dsl import Dataset
from kfp.dsl import Input
from kfp.dsl import InputPath
from kfp.dsl import Model
from kfp.dsl import Output
from kfp.dsl import OutputPath
import os

import pandas as pd


    
@component(packages_to_install=["pandas"])
def getdata(url: str, prepared_csv: OutputPath()) -> None:
    import urllib.request
    import tarfile
    import pandas as pd
    import glob
    # get data from url and unzip
    with urllib.request.urlopen(url) as res:
        tarfile.open(fileobj=res, mode="r|gz").extractall('data')
    
    #read unzip data into dataframe
    df = pd.concat([pd.read_csv(csv_file, header=None)
                    for csv_file in glob.glob('data/*.csv')])
    with open(prepared_csv, "w") as f:
        df.to_csv(f, index=False)
    print('--- inside getdata ---')
    print(f"df: {df}")

    
@component(packages_to_install=["pandas"])
# def preprocess(csv_in_file: InputPath(), preprocessed_output_csv: OutputPath())-> None:
def preprocess(csv_in_file: InputPath(), preprocessed_output_csv: OutputPath()):
    import pandas as pd
    print(f"csv_in_file: {csv_in_file}")
    
    with open(csv_in_file, 'r') as f:
        df = pd.read_csv(f)
        
    # preprocess here
    with open(preprocessed_output_csv, 'w') as f:
        df.to_csv(f, index=False)
    
    print('--- inside preprocess ---')
    print(f"df: {df}")



@component(packages_to_install=["pandas"])
def train(csv_in_file: InputPath()):
    import pandas as pd

    with open(csv_in_file) as f:
        df = pd.read_csv(f)
        
    with open('minio://mlpipeline/v2/artifacts/test-pipleine2/test/out_put.csv', 'w') as f:
        df.to_csv(f, index=False)
        
    print('--- inside train ---')
    print(f"df: {df}")
    


@dsl.pipeline(name='test_pipleine2')
def pipeline(url: str):
    data_from_source = getdata(url=url)
    print(data_from_source)
    preprocessed_data = preprocess(
        csv_in_file=data_from_source.outputs['prepared_csv']
    )
    trained_data = train(
        csv_in_file=preprocessed_data.outputs['preprocessed_output_csv']
    )
#     prepare_data = getdata(url=url)
#     print(prepare_data)

#     preprocess_data = preprocess(
#         csv_in_file = prepare_data.outputs['prepared_csv']  
#     )
    

    
    
namespace='kubeflow-user-example-com'
project_name='test project name gcc2'

client = kfp.Client()
    
#Compile the pipeline to YAML
try:
    yaml_file = os.path.join(os.getcwd(), project_name+'.yaml')
    compiler.Compiler().compile(pipeline_func=pipeline, package_path=yaml_file)
    print(f"Successfully compiled to yaml...")
except Exception as e:
    print(f"Failed to compile to YAML: {e}")


# Create an experiment
try:
    experiment = client.create_experiment(name=project_name+'-expr', namespace=namespace)
    print(f"The experiment created with ID==>{experiment.experiment_id}") 
except Exception as e:
    print(f"Failed to Create the experiment: {e}")


#Upload the pipeline
try:
    
    #Create shared pipeline
    #pipeline = client.pipeline_uploads.upload_pipeline(yaml_file, name='sample-pipeline')
    
    #Create private pipeline
    pipeline = client.pipeline_uploads.upload_pipeline(yaml_file, name=project_name+'-pipeline', namespace=namespace)
    
    pipeline_id = pipeline.pipeline_id
    
    versions_response = client.list_pipeline_versions(pipeline_id=pipeline_id)
    version_id = versions_response.pipeline_versions[0]._pipeline_version_id
    
    print(f"The yaml file uploaded to a new pipeline with PipelineID: {pipeline_id}, PipelineVersion: {version_id}")
    
except Exception as e:
    print(f"Failed upload the pipeline: {e}")


params = {
    'url': 'https://storage.googleapis.com/ml-pipeline-playground/iris-csv-files.tar.gz'
}


#Create a RUN
try:
    run = client.run_pipeline(experiment.experiment_id, project_name+'-run', pipeline_id=pipeline_id, params=params, version_id=version_id)
    print(f"Run ID: {run.run_id}")
except Exception as e:
    print(f"Failed to run the pipeline: {e}")


<kfp.dsl.pipeline_task.PipelineTask object at 0x7efeb163c3d0>
Successfully compiled to yaml...


The experiment created with ID==>65bf4d7d-abba-4fa1-ae48-30e056e089b1
The yaml file uploaded to a new pipeline with PipelineID: 119e60ec-05b4-4739-8009-87bd11a6765d, PipelineVersion: 655f494f-7c55-4a8a-84c1-0459b0ffd623


Run ID: ede78dc1-7a4b-4a83-8242-9bc866d0562e


In [1]:
import kfp
from kfp import compiler
from kfp import dsl
from kfp.dsl import component
from kfp.dsl import Dataset
from kfp.dsl import Input
from kfp.dsl import InputPath
from kfp.dsl import Model
from kfp.dsl import Output
from kfp.dsl import OutputPath
import os

import pandas as pd
import io


    
@component(packages_to_install=["pandas"])
def getdata(url: str, prepared_csv: OutputPath()) -> None:
    import urllib.request
    import tarfile
    import pandas as pd
    import glob
    # get data from url and unzip
    with urllib.request.urlopen(url) as res:
        tarfile.open(fileobj=res, mode="r|gz").extractall('data')
    
    #read unzip data into dataframe
    df = pd.concat([pd.read_csv(csv_file, header=None)
                    for csv_file in glob.glob('data/*.csv')])
    with open(prepared_csv, "w") as f:
        df.to_csv(f, index=False)
    print('--- inside getdata ---')
    print(f"df: {df}")

    
@component(packages_to_install=["pandas"])
# def preprocess(csv_in_file: InputPath(), preprocessed_output_csv: OutputPath())-> None:
def preprocess(csv_in_file: InputPath(), preprocessed_output_csv: OutputPath()):
    import pandas as pd
    print(f"csv_in_file: {csv_in_file}")
    
    with open(csv_in_file, 'r') as f:
        df = pd.read_csv(f)
        
    # preprocess here
    with open(preprocessed_output_csv, 'w') as f:
        df.to_csv(f, index=False)
    
    print('--- inside preprocess ---')
    print(f"df: {df}")



@component(packages_to_install=["pandas", "minio"])
def train(csv_in_file: InputPath(), bucket_name: str, object_name: str):
    import pandas as pd
    from minio import Minio
    from minio.error import S3Error
    import io
    client = Minio(
        "10.0.84.43:80",
        access_key='minio',
        secret_key='minio123',
        secure=False
    )
    

    with open(csv_in_file) as f:
        df = pd.read_csv(f)
        
    # csv_str = df.to_csv(index=False)
    csv_bytes = io.BytesIO(df.to_csv(index=False).encode('utf-8'))
    try:
        client.put_object(
            bucket_name,
            object_name,
            data=csv_bytes,
            length=csv_bytes.getbuffer().nbytes
        )
    except S3Error as exc:
        print("Error occurred while uploading to MinIO:", exc)
    
        
    print('--- inside train ---')
    print(f"df: {df}")
    


@dsl.pipeline(name='test_pipleine2')
def pipeline(url: str):
    data_from_source = getdata(url=url)
    print(data_from_source)
    preprocessed_data = preprocess(
        csv_in_file=data_from_source.outputs['prepared_csv']
    )
    trained_data = train(
        csv_in_file=preprocessed_data.outputs['preprocessed_output_csv'],
        bucket_name = 'mlpipeline',
        object_name=f'outputtest.csv'
    )


    
    
namespace='kubeflow-user-example-com'
project_name='test project name gcc2'

client = kfp.Client()
    
#Compile the pipeline to YAML
try:
    yaml_file = os.path.join(os.getcwd(), project_name+'.yaml')
    compiler.Compiler().compile(pipeline_func=pipeline, package_path=yaml_file)
    print(f"Successfully compiled to yaml...")
except Exception as e:
    print(f"Failed to compile to YAML: {e}")


# Create an experiment
try:
    experiment = client.create_experiment(name=project_name+'-expr', namespace=namespace)
    print(f"The experiment created with ID==>{experiment.experiment_id}") 
except Exception as e:
    print(f"Failed to Create the experiment: {e}")


#Upload the pipeline
try:
    
    #Create shared pipeline
    #pipeline = client.pipeline_uploads.upload_pipeline(yaml_file, name='sample-pipeline')
    
    #Create private pipeline
    pipeline = client.pipeline_uploads.upload_pipeline(yaml_file, name=project_name+'-pipeline', namespace=namespace)
    
    pipeline_id = pipeline.pipeline_id
    
    versions_response = client.list_pipeline_versions(pipeline_id=pipeline_id)
    version_id = versions_response.pipeline_versions[0]._pipeline_version_id
    
    print(f"The yaml file uploaded to a new pipeline with PipelineID: {pipeline_id}, PipelineVersion: {version_id}")
    
except Exception as e:
    print(f"Failed upload the pipeline: {e}")


params = {
    'url': 'https://storage.googleapis.com/ml-pipeline-playground/iris-csv-files.tar.gz'
}


#Create a RUN
try:
    run = client.run_pipeline(experiment.experiment_id, project_name+'-run', pipeline_id=pipeline_id, params=params, version_id=version_id)
    print(f"Run ID: {run.run_id}")
except Exception as e:
    print(f"Failed to run the pipeline: {e}")


<kfp.dsl.pipeline_task.PipelineTask object at 0x7f4f9d80bd50>
Successfully compiled to yaml...


  return component_factory.create_component_from_func(


The experiment created with ID==>8f207a51-74c6-4429-9fcb-94cf481081eb
The yaml file uploaded to a new pipeline with PipelineID: 78bae54c-c97f-4c40-80e9-e2b3d529bf9f, PipelineVersion: 3c3e725d-07aa-4c92-ac81-3b0a6157e3f6


Run ID: a1b0d166-b2c6-433c-9aa7-839583d26a62
