## First Time Installation

Install the latest version of Vertex SDK for Python. First time in instance only.

In [None]:
# ! pip3 install $USER google-cloud-aiplatform==1.15.1 --upgrade
# ! pip3 install $USER google-cloud-bigquery==2.34.4 --upgrade
# ! pip3 install $USER google-cloud-bigquery-storage==2.13.2 --upgrade
# ! pip3 install $USER google-cloud-storage==1.44.0 --upgrade
# ! pip3 install $USER kfp==1.8.13 --upgrade
# ! pip3 install $USER kfp-pipeline-spec==0.1.16 --upgrade
# ! pip3 install $USER kfp-server-api==1.8.2 --upgrade
# ! pip3 install $USER google-cloud-pipeline-components==1.0.14 --upgrade
# ! pip3 install $USER icecream==2.1.1 --upgrade
# ! pip3 install $USER pandas-gbq==0.15.0 --upgrade
# ! pip3 install $USER google-cloud-secret-manager --upgrade
# ! pip3 install $USER google-cloud-pubsub==2.13.4 --upgrade

# import os
# if not os.getenv("IS_TESTING"):
#     # Automatically restart kernel after installs
#     import IPython

#     app = IPython.Application.instance()
#     app.kernel.do_shutdown(True)

# # Check versions
# ! python3 -c "import kfp; print('KFP SDK version: {}'.format(kfp.__version__))"
# ! python3 -c "import google_cloud_pipeline_components; print('google_cloud_pipeline_components version: {}'.format(google_cloud_pipeline_components.__version__))"

## Load your pipeline components and configs

In [None]:
%load_ext autoreload
%autoreload 2

# Related Python packages
from icecream import ic
from kfp.v2 import dsl

# Components
from ml_components.dataimport import (
    get_rundates,
    get_period,
    get_import_query,
    bq_query_no_return,
    bq_query_to_dataframe,
    get_previous_dataset_from_model_league,
    
)
from ml_components.datacheck import (
#     printing,
    grand_drift_check,
)
from ml_components.datapreproc import (
    data_preprocess,
    data_interpolate,
)
from ml_components.modelling import (
    modelling,
)

from ml_components.prediction import (
    prediction,
)

from ml_components.outbound import (
    data_collection,
    html_collection,
    generate_bq_table_from_gsc,
    update_model_league,
)
from ml_components.alert import (
    push_slack_notification,
)
from ml_components.pipelinehelper import (
    func_op,
    save_pipeline,
    run_pipeline,
)

# Configs
from config import (
    SERVICE_ACCOUNT,
    PROJECT_ID,
    REGION,
    RUNNER,
    PIPELINE_NAME,
    BUCKET_NAME,
    E2E_PIPELINE_ROOT as PIPELINE_ROOT,
    USE_VAIEXP,
    PARAMETER_VALUES,
    parameter_checks
)
parameter_checks()

## Define your pipeline
```
@dsl.pipeline(
    name=PIPELINE_NAME,
    pipeline_root=PIPELINE_ROOT,
)
def your_pipeline(
    parameter_value: data_type,
):
    ops_1 = func_op(
        func=function,
        _component_human_name='your ops 1 label',
        base_image='python:3.7', #optional
        function_arg=parameter_value,
    )
    
    ops_2 = func_op(
        func=function,
        _component_human_name='your ops 2 label',
        packages_to_install=[
            'pandas==1.3.3'
        ],
        function_arg=ops_1.outputs['output'],
    )
```

#### Choose your desired GCP pre-built image for your node(s)
- https://cloud.google.com/deep-learning-containers/docs/choosing-container
- `! gcloud container images list --repository="gcr.io/deeplearning-platform-release"`
- You can also just choose 'python:3.x' if want

In [None]:
@dsl.pipeline(
    name=PIPELINE_NAME,
    pipeline_root=PIPELINE_ROOT,
)
def grand_pipeline(
    bucket_name: str,
    project_id: str,
    project_name: str,
    region: str,
    job_id: str,
    run_date: str,
    train_size: float,
    training_months: int,
    sku_column: str,
    ts_column: str,
    val_column: str,
    metrics_name: str,
    freq: str,
    allowed_models: str,
    feature_importance_dict_str: str,
    numerical_drift_partition_threshold: float,
    numerical_importance_partition_threshold: float,
    categorical_drift_partition_threshold: float,
    categorical_importance_partition_threshold: float,
    category_threshold: int,
    delta: int,
    forecasting_horizon_days: int,
    model_params: str,
    runner: str,
    commit_short_sha: str,
    tracking_cutoff: int,
):
    ########################################################################################################################
    ########################################################################################################################
    ################################################### Ops Declaration ####################################################
    ########################################################################################################################
    ########################################################################################################################

    
    ###########################################################
    ######################### Rundate #########################
    ###########################################################

    rundates_op = func_op(
        func=get_rundates,
        _component_human_name='get_rundates',
        base_image='python:3.7',
        packages_to_install=['pandas==1.3.3'],
        run_date=run_date,
    )

    usage_run_date = rundates_op.outputs['usage_run_date']
    src_run_dt = rundates_op.outputs['src_run_dt']

    ###########################################################
    ####################### Import Data #######################
    ###########################################################

    period_op = func_op(
        func=get_period,
        _component_human_name='get_period',
        base_image='python:3.7',
        packages_to_install=['pandas==1.3.3'],
        run_date=usage_run_date,
        training_months=training_months,
    )

    # Get queries
    import_train_data_query_op = func_op(
        func=get_import_query,
        _component_human_name='get_train_import_query',
        base_image='python:3.7',
        start_date=period_op.outputs['train_start_date'],
        end_date=period_op.outputs['train_end_date'],
    )

    # Get datasets using queries
    train_dataset_op = func_op(
        func=bq_query_to_dataframe,
        _component_human_name='get_train_dataset',
        base_image='python:3.7',
        project_id=project_id,
        cpu_limit='2',
        memory_limit='8G',
        query=import_train_data_query_op.outputs['query'],
    )


    ###########################################################
    #################### Data Preprocessing ###################
    ###########################################################

    train_data_preprocess_op = func_op(
        func=data_preprocess,
        _component_human_name='train_data_preprocess',
        base_image='python:3.7',
        packages_to_install=['pandas==1.3.3'],
        cpu_limit='2',
        memory_limit='8G',
        sku_column=sku_column,
        ts_column=ts_column,
        val_column=val_column,
        dataset=train_dataset_op.outputs['dataset'],
    )


    parallel_op = dsl.ParallelFor(
        train_data_preprocess_op.outputs['train_dataset_path'], 
        parallelism=1000000,
    )
    with parallel_op as table:
        # print_op = func_op(
        #     func=printing,
        #     _component_human_name='print',
        #     base_image='python:3.7',
        #     msg=table.sku_name,
        # )

        ###########################################################
        #################### Data Interpolation ###################
        ###########################################################

        interpolate_op = func_op(
            func=data_interpolate,
            _component_human_name=f'interpolate',
            base_image='gcr.io/deeplearning-platform-release/sklearn-cpu',
            packages_to_install=[
                'darts==0.19.0',
            ],
            cpu_limit='2',
            memory_limit='8G',
            sku_name=table.sku_name,
            train_start_date=period_op.outputs['train_start_date'],
            train_end_date=period_op.outputs['train_end_date'],
            freq=freq,
            sku_column=sku_column,
            ts_column=ts_column,
            val_column=val_column,
            processed_dataset=train_data_preprocess_op.outputs['processed_dataset'],
        )

        ###########################################################
        #################### Data Drift Check #####################
        ###########################################################

        # Get previous datasets using model league
        prev_train_dataset_op = func_op(
            func=get_previous_dataset_from_model_league,
            _component_human_name='prev_train_dataset',
            base_image='python:3.7',
            packages_to_install=['pandas-gbq==0.15.0'],
            cpu_limit='2',
            memory_limit='4G',
            retry=3,
            project_id=project_id,
            project_name=project_name,
            runner=runner,
            sku_column=sku_column,
            ts_column=ts_column,
            val_column=val_column,
            sku_name=table.sku_name,
            location=region,
        )

        drift_check_op = func_op(
            func=grand_drift_check,
            _component_human_name='drift_check',
            base_image='gcr.io/deeplearning-platform-release/sklearn-cpu',
            cpu_limit='2',
            memory_limit='4G',
            bucket_name=bucket_name,
            src_run_dt=src_run_dt,
            sku_column=sku_column,
            sku_name=table.sku_name,
            dataset_p=interpolate_op.outputs['interpolated_dataset'],
            dataset_q=prev_train_dataset_op.outputs['previous_dataset'],
            feature_importance_dict_str=feature_importance_dict_str,
            numerical_drift_partition_threshold=numerical_drift_partition_threshold,
            numerical_importance_partition_threshold=numerical_importance_partition_threshold,
            categorical_drift_partition_threshold=categorical_drift_partition_threshold,
            categorical_importance_partition_threshold=categorical_importance_partition_threshold,
            category_threshold=category_threshold,
            delta=delta,
        )

        ###########################################################
        ######################## Modelling ########################
        ###########################################################

        modelling_op = func_op(
            func=modelling,
            _component_human_name='modelling',
            base_image='gcr.io/deeplearning-platform-release/sklearn-cpu',
            packages_to_install=[
                'darts==0.19.0',
                'pandas-gbq==0.15.0',
            ],
            cpu_limit='4',
            memory_limit='4G',
            drift_status=drift_check_op.outputs['drift_status'],
            project_id=project_id,
            project_name=project_name,
            runner=runner,
            bucket_name=bucket_name,
            src_run_dt=src_run_dt,
            sku_name=table.sku_name,
            train_size=train_size,
            sku_column=sku_column,
            ts_column=ts_column,
            val_column=val_column,
            allowed_models=allowed_models,
            model_params=model_params,
            previous_train_dataset=prev_train_dataset_op.outputs['previous_train_dataset'],
            previous_val_dataset=prev_train_dataset_op.outputs['previous_val_dataset'],
            interpolated_dataset=interpolate_op.outputs['interpolated_dataset'],
        )

        ###########################################################
        ######################## Prediction #######################
        ###########################################################

        prediction_op = func_op(
            func=prediction,
            _component_human_name='prediction',
            base_image='gcr.io/deeplearning-platform-release/sklearn-cpu',
            packages_to_install=[
                'darts==0.19.0',
            ],
            cpu_limit='4',
            memory_limit='4G',
            bucket_name=bucket_name,
            src_run_dt=src_run_dt,
            sku_name=table.sku_name,
            freq=freq,
            sku_column=sku_column,
            ts_column=ts_column,
            val_column=val_column,
            metrics_name=metrics_name,
            model_params=model_params,
            forecasting_horizon_days=forecasting_horizon_days,
            interpolated_dataset=interpolate_op.outputs['interpolated_dataset'],
            accuracy_board_dataset=modelling_op.outputs['accuracy_board_dataset'],
        )


    ###########################################################
    ######################### Outbound ########################
    ###########################################################

    ################################
    ########## Collection ##########
    ################################

    html_collection_op = func_op(
        func=html_collection,
        _component_human_name=f'html_collection',
        base_image='gcr.io/deeplearning-platform-release/sklearn-cpu',
        cpu_limit='2',
        memory_limit='8G',
        bucket_name=bucket_name,
        src_run_dt=src_run_dt,
    )
    html_collection_op.after(parallel_op)

    accuracy_board_collection_op = func_op(
        func=data_collection,
        _component_human_name=f'accuracy_board_collection',
        base_image='gcr.io/deeplearning-platform-release/sklearn-cpu',
        cpu_limit='2',
        memory_limit='8G',
        bucket_name=bucket_name,
        src_run_dt=src_run_dt,
        mode='board',
    )
    accuracy_board_collection_op.after(parallel_op)

    train_collection_op = func_op(
        func=data_collection,
        _component_human_name=f'train_collection',
        base_image='gcr.io/deeplearning-platform-release/sklearn-cpu',
        cpu_limit='2',
        memory_limit='8G',
        bucket_name=bucket_name,
        src_run_dt=src_run_dt,
        mode='train',
    )
    train_collection_op.after(parallel_op)

    val_collection_op = func_op(
        func=data_collection,
        _component_human_name=f'val_collection',
        base_image='gcr.io/deeplearning-platform-release/sklearn-cpu',
        cpu_limit='2',
        memory_limit='8G',
        bucket_name=bucket_name,
        src_run_dt=src_run_dt,
        mode='val',
    )
    val_collection_op.after(parallel_op)

    prediction_collection_op = func_op(
        func=data_collection,
        _component_human_name=f'prediction_collection',
        base_image='gcr.io/deeplearning-platform-release/sklearn-cpu',
        cpu_limit='2',
        memory_limit='8G',
        bucket_name=bucket_name,
        src_run_dt=src_run_dt,
        mode='prediction',
    )
    prediction_collection_op.after(parallel_op)


    ################################
    ########## BQ Tracking #########
    ################################

    board_persist_to_bq_op = func_op(
        func=generate_bq_table_from_gsc,
        _component_human_name='board_persist_to_bq',
        base_image='python:3.7',
        cpu_limit='2',
        memory_limit='8G',
        retry=3,
        project_id=project_id,
        project_name=project_name,
        dataset_id='MLOPS_TRACKING',
        runner=runner,
        table_name='accuracy_board',
        src_run_dt='',
        dataset_format='PARQUET',
        location=region,
        dataset_to_save=accuracy_board_collection_op.outputs['grand_dataset'],
    )

    train_dataset_to_bq_op = func_op(
        func=generate_bq_table_from_gsc,
        _component_human_name='train_dataset_to_bq',
        base_image='python:3.7',
        cpu_limit='2',
        memory_limit='8G',
        retry=3,
        project_id=project_id,
        project_name=project_name,
        dataset_id='MLOPS_TRAIN_DATASET',
        runner=runner,
        table_name='train',
        src_run_dt=src_run_dt,
        dataset_format='PARQUET',
        location=region,
        dataset_to_save=train_collection_op.outputs['grand_dataset'],
    )

    train_persist_to_bq_op = func_op(
        func=generate_bq_table_from_gsc,
        _component_human_name='train_persist_to_bq',
        base_image='python:3.7',
        cpu_limit='2',
        memory_limit='8G',
        retry=3,
        project_id=project_id,
        project_name=project_name,
        dataset_id='MLOPS_TRAIN_DATASET',
        runner=runner,
        table_name='train',
        src_run_dt='',
        dataset_format='PARQUET',
        location=region,
        dataset_to_save=train_collection_op.outputs['grand_dataset'],
    )

    val_dataset_to_bq_op = func_op(
        func=generate_bq_table_from_gsc,
        _component_human_name='val_dataset_to_bq',
        base_image='python:3.7',
        cpu_limit='2',
        memory_limit='8G',
        retry=3,
        project_id=project_id,
        project_name=project_name,
        dataset_id='MLOPS_VAL_DATASET',
        runner=runner,
        table_name='val',
        src_run_dt=src_run_dt,
        dataset_format='PARQUET',
        location=region,
        dataset_to_save=val_collection_op.outputs['grand_dataset'],
    )

    val_persist_to_bq_op = func_op(
        func=generate_bq_table_from_gsc,
        _component_human_name='val_persist_to_bq',
        base_image='python:3.7',
        cpu_limit='2',
        memory_limit='8G',
        retry=3,
        project_id=project_id,
        project_name=project_name,
        dataset_id='MLOPS_VAL_DATASET',
        runner=runner,
        table_name='val',
        src_run_dt='',
        dataset_format='PARQUET',
        location=region,
        dataset_to_save=val_collection_op.outputs['grand_dataset'],
    )

    prediction_dataset_to_bq_op = func_op(
        func=generate_bq_table_from_gsc,
        _component_human_name='prediction_dataset_to_bq',
        base_image='python:3.7',
        cpu_limit='2',
        memory_limit='8G',
        retry=3,
        project_id=project_id,
        project_name=project_name,
        dataset_id='MLOPS_PREDICTION_DATASET',
        runner=runner,
        table_name='prediction',
        src_run_dt=src_run_dt,
        dataset_format='PARQUET',
        location=region,
        dataset_to_save=prediction_collection_op.outputs['grand_dataset'],
    )

    prediction_persist_to_bq_op = func_op(
        func=generate_bq_table_from_gsc,
        _component_human_name='prediction_persist_to_bq',
        base_image='python:3.7',
        cpu_limit='2',
        memory_limit='8G',
        retry=3,
        project_id=project_id,
        project_name=project_name,
        dataset_id='MLOPS_PREDICTION_DATASET',
        runner=runner,
        table_name='prediction',
        src_run_dt='',
        dataset_format='PARQUET',
        location=region,
        dataset_to_save=prediction_collection_op.outputs['grand_dataset'],
    )


    #########################################
    ########## Update Model League ##########
    #########################################

    update_model_league_op = func_op(
        func=update_model_league,
        _component_human_name='update_model_league',
        base_image='python:3.7',
        packages_to_install=['pandas-gbq==0.15.0'],
        cpu_limit='2',
        memory_limit='8G',
        project_id=project_id,
        project_name=project_name,
        location=region,
        job_id=job_id,
        sku_column=sku_column,
        metrics_name=metrics_name,
        src_run_dt=src_run_dt,
        train_bq_path=train_dataset_to_bq_op.outputs['bq_path'],
        val_bq_path=val_dataset_to_bq_op.outputs['bq_path'],
        prediction_bq_path=prediction_dataset_to_bq_op.outputs['bq_path'],
        runner=runner,
        commit_short_sha=commit_short_sha,
        tracking_cutoff=tracking_cutoff,
        accuracy_board_dataset=accuracy_board_collection_op.outputs['grand_dataset'],
    )


    ########################################################################################################################
    ########################################################################################################################
    ##################################################### Ops Caching ######################################################
    ########################################################################################################################
    ########################################################################################################################

    # pr = persist_run
    nodes_no_cache = [
        rundates_op, #pr

#         period_op,
#         import_train_data_query_op,
        # train_dataset_op,
        # train_data_preprocess_op,

        prev_train_dataset_op, #pr
#         drift_check_op,

        html_collection_op,
        accuracy_board_collection_op,
        train_collection_op,
        val_collection_op,
        prediction_collection_op,

        # train_dataset_to_bq_op,
        # val_dataset_to_bq_op,
        # prediction_dataset_to_bq_op,

        board_persist_to_bq_op, #pr
        train_persist_to_bq_op, #pr
        val_persist_to_bq_op, #pr
        prediction_persist_to_bq_op, #pr
        
        update_model_league_op, #pr
    ]
    
    if nodes_no_cache:
        for node in nodes_no_cache:
            node.set_caching_options(enable_caching=False)

## Save your pipeline

In [None]:
TEMPLATE_PATH, JOB_ID, DISPLAY_NAME = save_pipeline(
    pipeline=grand_pipeline,
    pipeline_name=PIPELINE_NAME,
    bucket_name=BUCKET_NAME,
    mode=None,
)

## Run your pipeline locally

In [None]:
run_pipeline(
    project_id=PROJECT_ID,
    staging_bucket=BUCKET_NAME,
    location=REGION,
    display_name=DISPLAY_NAME,
    template_path=TEMPLATE_PATH,
    job_id=JOB_ID,
    pipeline_root=PIPELINE_ROOT,
    service_account=SERVICE_ACCOUNT,
    parameter_values=PARAMETER_VALUES,
    # enable_caching=False,
    use_vaiexp=USE_VAIEXP,
)