In [None]:
import os
PROJECT_ID = 'qwiklabs-gcp-00-59947c43422f'
BUCKET_NAME="gs://" + PROJECT_ID + "-bucket"

import kfp

from kfp.v2 import compiler, dsl
from kfp.v2.dsl import component, pipeline, Artifact, ClassificationMetrics, Input, Output, Model, Metrics

from google.cloud import aiplatform
from google_cloud_pipeline_components import aiplatform as gcc_aip
from typing import NamedTuple

In [None]:
REGION="us-central1"

PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PIPELINE_ROOT

In [None]:
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin

In [None]:
#Building the trainer file 
IMAGE_NAME = "exodus_kfp"
TAG = "latest"
TRAINING_CONTAINER_IMAGE_URI = f"gcr.io/{PROJECT_ID}/{IMAGE_NAME}:{TAG}"
TRAINING_CONTAINER_IMAGE_URI

In [None]:
!gcloud builds submit --timeout 15m --tag $TRAINING_CONTAINER_IMAGE_URI trainer_image

In [None]:
## Custom code to create a component that creates pandas dataframe
from typing import NamedTuple

In [None]:
import pandas as pd
#def bq_pd_df_creater() -> NamedTuple("Outputs",[("train_set",pd.DataFrame),("eval_set",pd.DataFrame)]):

In [None]:
@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
    output_component_file="bq_pandas_reader.yaml",
    packages_to_install=["google-cloud-aiplatform","pandas"],
) 
def bq_pd_df_creater() -> NamedTuple("Outputs",[("train_set",pd.DataFrame),("eval_set",pd.DataFrame)]):
    from google.cloud import bigquery
    bqclient = bigquery.Client()
    
    train_set_query = """
    SELECT
    *
    FROM `qwiklabs-gcp-00-59947c43422f.kfp_test.full_data` AS train_set WHERE MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(train_set))), 10) IN (1, 2, 3, 4)
    """
    
    eval_set_query = """
    SELECT
    *
    FROM `qwiklabs-gcp-00-59947c43422f.kfp_test.full_data` AS train_set WHERE MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(train_set))), 10) IN (2)
    """
    
    train_set=(
        bqclient.query(train_set_query)
        .result()
        .to_dataframe(
            # Optionally, explicitly request to use the BigQuery Storage API. As of
            # google-cloud-bigquery version 1.26.0 and above, the BigQuery Storage
            # API is used by default.
            create_bqstorage_client=True,)
    )
    
    eval_set=(
        bqclient.query(eval_set_query)
        .result()
        .to_dataframe(
            # Optionally, explicitly request to use the BigQuery Storage API. As of
            # google-cloud-bigquery version 1.26.0 and above, the BigQuery Storage
            # API is used by default.
            create_bqstorage_client=True,)
    )
    
    return (train_set,eval_set)

In [None]:
@component(
    base_image="gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest",
    output_component_file="bq_pandas_reader.yaml",
    packages_to_install=["google-cloud-aiplatform","pandas"],
)
def the_head_printer(
    test_df,
    eval_df
):
    return str(test_df[0])
    

In [None]:
@pipeline(
    name="hello-pandas",
    description="experiment with KF",
    pipeline_root=PIPELINE_ROOT,
)

# You can change the `text` and `emoji_str` parameters here to update the pipeline output
def intro_pipeline(text: str = "Vertex Pipelines", emoji_str: str = "sparkles"):
    train_eval_df_task = bq_pd_df_creater()
    header_printer_task = the_head_printer(
        test_df = train_eval_df_task.outputs["train_set"],
        eval_df = train_eval_df_task["eval_set"]
    )


In [None]:
compiler.Compiler().compile(
    pipeline_func=intro_pipeline, package_path="kf_bq_test.json"
)

In [None]:
def bq_pd_df_creater():
    from google.cloud import bigquery
    bqclient = bigquery.Client()
    
    train_set_query = """
    SELECT
    *
    FROM `qwiklabs-gcp-00-59947c43422f.kfp_test.full_data` AS train_set WHERE MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(train_set))), 10) IN (1, 2, 3, 4)
    """
    
    eval_set_query = """
    SELECT
    *
    FROM `qwiklabs-gcp-00-59947c43422f.kfp_test.full_data` AS train_set WHERE MOD(ABS(FARM_FINGERPRINT(TO_JSON_STRING(train_set))), 10) IN (2)
    """
    
    train_set=(
        bqclient.query(train_set_query)
        .result()
        .to_dataframe(
            # Optionally, explicitly request to use the BigQuery Storage API. As of
            # google-cloud-bigquery version 1.26.0 and above, the BigQuery Storage
            # API is used by default.
            create_bqstorage_client=True,)
    )
    
    eval_set=(
        bqclient.query(eval_set_query)
        .result()
        .to_dataframe(
            # Optionally, explicitly request to use the BigQuery Storage API. As of
            # google-cloud-bigquery version 1.26.0 and above, the BigQuery Storage
            # API is used by default.
            create_bqstorage_client=True,)
    )
    
    return (train_set,eval_set)

In [None]:
create_pandas_df = kfp.components.create_component_from_func(
    func=bq_pd_df_creater,
    output_component_file='bq_pandas_reader.yaml', # This is optional. It saves the component spec for future use.
    base_image='gcr.io/deeplearning-platform-release/tf2-cpu.2-3:latest',
    packages_to_install=['pandas','google-cloud-aiplatform'])

In [None]:
@pipeline(
    name="hello-pandas",
    description="experiment with KF",
    pipeline_root=PIPELINE_ROOT,
)

# You can change the `text` and `emoji_str` parameters here to update the pipeline output
def intro_pipeline():
    train_eval_df_task = create_pandas_df
    header_printer_task = the_head_printer(
        test_df = train_eval_df_task.outputs["train_set"],
        eval_df = train_eval_df_task.outputs["eval_set"]
    )


In [None]:
compiler.Compiler().compile(
    pipeline_func=intro_pipeline, package_path="kf_bq_test.json"
)