In [None]:
USER_FLAG = "--user"

In [None]:
!pip3 install {USER_FLAG} google-cloud-aiplatform --upgrade
!pip3 install {USER_FLAG} kfp google-cloud-pipeline-components --upgrade

In [None]:
!python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
!python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

In [1]:
import os
PROJECT_ID = "kubeflow-on-gcp-123"
BUCKET_NAME="gs://aiplatform-custom"

In [2]:
from typing import NamedTuple

import kfp
from kfp import dsl # contains the domain-specific language (DSL) that you can use to define and interact with pipelines and components
from kfp.v2 import compiler # includes classes and methods for compiling pipeline Python DSL into a workflow JSON spec
from kfp.v2.dsl import (Artifact, Dataset, Input, InputPath, Model, Output,
                        OutputPath, ClassificationMetrics, Metrics, component) #import a number of features from the v2 DSL
from kfp.v2.google.client import AIPlatformClient # client used to interface with the Vertex AI APIs - to be deprecated

from google.cloud import aiplatform # Vertex AI SDK - new interface to interface with APIS programmatically
from google_cloud_pipeline_components import aiplatform as gcc_aip # pre-built components for Vertex AI

In [3]:
PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"

PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"
PIPELINE_ROOT

env: PATH=/usr/local/cuda/bin:/opt/conda/bin:/opt/conda/condabin:/usr/local/bin:/usr/bin:/bin:/usr/local/games:/usr/games:/home/jupyter/.local/bin


'gs://aiplatform-custom/pipeline_root/'

In [4]:
@component(packages_to_install=["pandas","google-cloud-aiplatform", "google-cloud-bigquery-storage","google-cloud-bigquery","pyarrow"], output_component_file="preprocess.yaml")
def preprocess(query: str, results: Output[Dataset]) -> str:
    from google.cloud import bigquery
    import google.auth
    
    creds, project = google.auth.default()
    client = bigquery.Client(project='kubeflow-on-gcp-123', credentials=creds)

    query =     """
            SELECT 
            item_description,
            date,
            SUM(bottles_sold) as total_bottles_sold
            FROM    
            `bigquery-public-data.iowa_liquor_sales.sales`
            WHERE date BETWEEN '2020-01-01' and current_date()
            GROUP BY item_description, date;
    """
    
    dataframe = client.query(query).to_dataframe()
    results = dataframe.head().to_string()
    print("done")
    
    return results

In [5]:
@dsl.pipeline(
    name="bq-pipeline",
    description="BQ pipeline",
    pipeline_root=PIPELINE_ROOT,
)
def bq_pipeline(query: str):
    preprocess_task = preprocess(query=query)

In [6]:
compiler.Compiler().compile(
    pipeline_func=bq_pipeline, package_path="bq_pipeline_spec.json"
)

In [7]:
api_client = AIPlatformClient(
    project_id=PROJECT_ID,
    region=REGION,
)

response = api_client.create_run_from_job_spec(
    job_spec_path="bq_pipeline_spec.json",
    parameter_values={"query": ""},
    enable_caching=True)

