In [None]:
# Copyright 2023 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Fraudfinder - ML Pipeline

<table align="left">
  <td>
    <a href="https://console.cloud.google.com/ai-platform/notebooks/deploy-notebook?download_url=https://github.com/GoogleCloudPlatform/fraudfinder/blob/main/vertex_ai/06_formalization.ipynb">
       <img src="https://www.gstatic.com/cloud/images/navigation/vertex-ai.svg" alt="Google Cloud Notebooks">Open in Cloud Notebook
    </a>
  </td> 
  <td>
    <a href="https://colab.research.google.com/github/GoogleCloudPlatform/fraudfinder/blob/main/vertex_ai/06_formalization.ipynb">
      <img src="https://cloud.google.com/ml-engine/images/colab-logo-32px.png" alt="Colab logo"> Open in Colab
    </a>
  </td>
  <td>
    <a href="https://github.com/GoogleCloudPlatform/fraudfinder/blob/main/vertex_ai/06_formalization.ipynb">
        <img src="https://cloud.google.com/ml-engine/images/github-logo-32px.png" alt="GitHub logo">
      View on GitHub
    </a>
  </td>
</table>

## Overview

[Fraudfinder](https://github.com/googlecloudplatform/fraudfinder) is a series of labs on how to build a real-time fraud detection system on Google Cloud. Throughout the Fraudfinder labs, you will learn how to read historical bank transaction data stored in data warehouse, read from a live stream of new transactions, perform exploratory data analysis (EDA), do feature engineering, ingest features into a feature store, train a model using feature store, register your model in a model registry, evaluate your model, deploy your model to an endpoint, do real-time inference on your model with feature store, and monitor your model.

### Objective

This notebook shows how to use Feature Store, Pipelines and Model Monitoring for building an end-to-end demo using both components defined in `google_cloud_pipeline_components` and custom components. 

This lab uses the following Google Cloud services and resources:

- [Vertex AI](https://cloud.google.com/vertex-ai/)
- [BigQuery](https://cloud.google.com/bigquery/)

Steps performed in this notebook:

* Create a Vetex AI Pipeline to orchestrate and automate the ML workflow

### Costs

This tutorial uses billable components of Google Cloud:

* Vertex AI
* BigQuery

Learn about [Vertex AI
pricing](https://cloud.google.com/vertex-ai/pricing), [BigQuery pricing](https://cloud.google.com/bigquery/pricing) and use the [Pricing
Calculator](https://cloud.google.com/products/calculator/)
to generate a cost estimate based on your projected usage.

### Load configuration settings from the setup notebook

Set the constants used in this notebook and load the config settings from the `00_environment_setup.ipynb` notebook.

In [1]:
import json

try:
    with open("../config_path.json", "r") as f:
        config_path = json.load(f)
except FileNotFoundError:
    print("config_path.json not found. Please make sure the file exists.")
    ID = None

In [2]:
from utils import read_from_bucket, VertexConfig


config = read_from_bucket(config_path["bucket"], config_path["conf_uri"])
config = VertexConfig(**config)

PROJECT_NUM = !gcloud projects list --filter="$config.PROJECT_ID" --format="value(PROJECT_NUMBER)"
PROJECT_NUM = PROJECT_NUM[0]

### Import libraries and define constants

#### Libraries
Next you will import the libraries needed for this notebook. 

Note that currently this notebook uses KFP SDK v1, whereas the environment includes KFP v2. As an interim solution, we will downlevel KFP and the Google Cloud Pipeline Components in order to use the v1 code here as-is. See the [KFP migration guide](https://www.kubeflow.org/docs/components/pipelines/v2/migration/) for more details of moving from v1 to v2. 

In [3]:
# General
import os
from typing import List

# Vertex Pipelines
import kfp
from kfp import dsl, compiler

from google.cloud import aiplatform as vertex_ai
from google_cloud_pipeline_components.v1 import dataset, custom_job, endpoint
from google_cloud_pipeline_components.types import artifact_types
from google_cloud_pipeline_components._placeholders import PERSISTENT_RESOURCE_ID_PLACEHOLDER

In [4]:
print("kfp version:", kfp.__version__)

kfp version: 2.7.0


#### Variables

In [5]:
# Components variables
COMPONENTS_DIR = os.path.join(os.curdir, "pipelines", "components")
INGEST_FEATURE_STORE = f"{COMPONENTS_DIR}/ingest_feature_store_{config.ID}.yaml"
TRAIN_MODEL = f"{COMPONENTS_DIR}/train_model_{config.ID}.yaml"
EVALUATE = f"{COMPONENTS_DIR}/evaluate_{config.ID}.yaml"

# Pipeline variables
PIPELINE_DIR = os.path.join(os.curdir, "pipelines")
PIPELINE_PACKAGE_PATH = f"{PIPELINE_DIR}/pipeline_{config.ID}.json"

#### Initialize the Vertex AI SDK
Initialize the Vertex AI SDK for Python for your project and corresponding bucket.

In [6]:
# Vertex AI SDK
vertex_ai.init(
    project=config.PROJECT_ID,
    location=config.REGION,
    staging_bucket=config.BUCKET_NAME
)

In [None]:
!gsutil ubla set on gs://{vertex_config.BUCKET_NAME}

Enabling Uniform bucket-level access for gs://fraud-finder-lab-fraudfinder...


#### Create directories 
Create a directory for you pipeline and pipeline components. 

In [8]:
!mkdir -p -m 777 $PIPELINE_DIR $COMPONENTS_DIR

### Create a end-to-end Pipeline and execute it on Vertex AI Pipelines.

We will build a pipeline that you will execute using [Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines/introduction). Vertex AI Pipelines helps you to automate, monitor, and govern your ML systems by orchestrating your ML workflow in a serverless manner, and storing your workflow's artifacts using Vertex ML Metadata. Authoring ML Pipelines that run on Vertex AI pipelines can be done in two different ways:

* [Tensorflow Extended](https://www.tensorflow.org/tfx/guide)
* [Kubeflow Pipelines SDK](https://kubeflow-pipelines.readthedocs.io/en/1.8.13/)

Based on your preference you can choose between the two options. This notebook will only focus on Kubeflow Pipelines.

If you don't have familiarity in authoring pipelines in Vertex AI Pipelines, we suggest the following resources:
* [Introduction to Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines/introduction)
* [Build a Pipeline in Vertex AI Pipelines](https://cloud.google.com/vertex-ai/docs/pipelines/build-pipeline)

### Define Custom Components for your pipeline

We will use a mix of prebuilt (Google Cloud Pipeline Components) and custom components in this notebook. The difference is:

* Prebuilt components are official [Google Cloud Pipeline Components](https://cloud.google.com/vertex-ai/docs/pipelines/components-introduction)(GCPC). The GCPC Library provides a set of prebuilt components that are production quality, consistent, performant, and easy to use in Vertex AI Pipelines.
* As you will build in the cell below, a data scientist or ML engineer typically authored the custom component. This means you have more control over the component (container) code. In this case, it's a Python-function-based component. You also have the option to build a component yourself by packaging code into a container.

In the following two cells, you will build two custom components:

    *Feature Store component.

    *Evaluation component.

### Define feature store component

Notice that the component assumes that contains the entities-timestamps "query" is already created.

#### Feature Store
Next you will build a custom component using the [KFP SDK](https://kubeflow-pipelines.readthedocs.io/en/1.8.13/). Here you will take a Python function and create a component out of it. This component will take features from the Vertex AI Feature Store and output them on Google Cloud Storage (GCS). 

In [30]:
@dsl.component(
    base_image=config.BASE_IMAGE,
    packages_to_install=[
        "gcsfs==2025.3.2",
        "google-cloud-aiplatform==1.88.0",
        "google-cloud-bigquery==3.26.0",
        "bigframes==1.42.0",
        "pandas==2.2.3",
        "Jinja2==3.1.6",
    ],
)
def ingest_features_gcs(
    project_id: str,
    location: str,
    bucket_name: str,
    read_instances_table: str,
) -> str:
    # Libraries --------------------------------------------------------------------------------------------------------------------------
    from datetime import datetime
    from pathlib import Path
    import json
    import bigframes
    import bigframes.pandas
    from google.cloud import bigquery
    from vertexai.resources.preview.feature_store import FeatureGroup, offline_store


    # Variables --------------------------------------------------------------------------------------------------------------------------
    timestamp = datetime.now().strftime("%Y%m%d%H%M%S")
    export_data_dir = f"/gcs/{bucket_name}/data/snapshots/{timestamp}"
    export_data_path = f"{export_data_dir}/000000.csv"
    customer_entity = "customer"
    terminal_entity = "terminal"
    customer_features_str = [
        "customer_id_nb_tx_1day_window",
        "customer_id_nb_tx_7day_window",
        "customer_id_nb_tx_14day_window",
        "customer_id_avg_amount_1day_window",
        "customer_id_avg_amount_7day_window",
        "customer_id_avg_amount_14day_window",
        "customer_id_nb_tx_15min_window",
        "customer_id_nb_tx_30min_window",
        "customer_id_nb_tx_60min_window",
        "customer_id_avg_amount_15min_window",
        "customer_id_avg_amount_30min_window",
        "customer_id_avg_amount_60min_window",
    ]
    terminal_features_str = [
        "terminal_id_nb_tx_1day_window",
        "terminal_id_nb_tx_7day_window",
        "terminal_id_nb_tx_14day_window",
        "terminal_id_risk_1day_window",
        "terminal_id_risk_7day_window",
        "terminal_id_risk_14day_window",
        "terminal_id_nb_tx_15min_window",
        "terminal_id_nb_tx_30min_window",
        "terminal_id_nb_tx_60min_window",
        "terminal_id_avg_amount_15min_window",
        "terminal_id_avg_amount_30min_window",
        "terminal_id_avg_amount_60min_window",
    ]
    read_instances_query = f"""
        SELECT
            gt.timestamp,
            gt.customer_id,
            gt.terminal_id,
            gt.tx_amount,
            gt.tx_fraud,
        FROM 
            `{project_id}.tx.{read_instances_table}` as gt;
    """

    # Main -------------------------------------------------------------------------------------------------------------------------------

    ## Run batch job request
    # get instances to fetch features
    print("ingest_features_gcs: query", read_instances_query)
    bq_client = bigquery.Client(project=project_id, location=location)
    job_config = bigquery.QueryJobConfig()
    client_result = bq_client.query(read_instances_query, job_config=job_config)
    # Wait for query/job to finish running. then get & return data frame
    instances_df = client_result.result().to_arrow().to_pandas()
    print("ingest_features_gcs - instances_df", instances_df.shape)

    # read from feature store
    customer_fg = FeatureGroup(name=customer_entity)
    customer_features = [customer_fg.get_feature(c_feat) for c_feat in customer_features_str]
    terminal_fg = FeatureGroup(name=terminal_entity)
    terminal_features = [terminal_fg.get_feature(t_feat) for t_feat in terminal_features_str]
    sample_df = offline_store.fetch_historical_feature_values(
        entity_df=instances_df,
        features=customer_features + terminal_features,
    )
    sample_df = sample_df.to_pandas()
    print("ingest_features_gcs - sample_df", sample_df.shape)
    # vertex AI pipeline support cloud storage FUSE
    if not Path(export_data_dir).exists():
        Path(export_data_dir).mkdir(parents=True, exist_ok=True)
    sample_df.to_csv(export_data_path, index=False)

    # Store metadata
    snapshot_files_fmt = [export_data_path.replace("/gcs/", "gs://")]
    snapshot_files_string = json.dumps(snapshot_files_fmt)
    print("ingest_features_gcs - snapshot_files_string", snapshot_files_string)

    return snapshot_files_string


compiler.Compiler().compile(ingest_features_gcs, INGEST_FEATURE_STORE)

#### Custom training
You will package the training job like in previous module 05 as a custom training job component.

In [31]:
@dsl.component(
    base_image=config.BASE_IMAGE,
    packages_to_install=[
        "gcsfs==2025.3.2",
        "tqdm==4.67.1",
        "numpy==2.2.4",
        "pandas==2.2.3",
        "torch==2.6.0",
        "dask==2025.3.0",
        "dask-ml==2025.1.0",
        "distributed==2025.3.0",
        "google-cloud-pipeline-components==2.17.0",
        "google-cloud-aiplatform==1.88.0",
    ]
)
def train_model(
    project: str,
    location: str,
    bucket: str,
    dataset: dsl.Input[artifact_types.VertexDataset],
    dtype: dict,
    drop_cols: List[str],
    target_col: str,
    feat_cols: List[str],
    model_reg: str,
    model_serving_image_uri: str,
    trained_model: dsl.Output[artifact_types.VertexModel],
    test_ds: dsl.Output[dsl.Dataset],
):
    import os
    from typing import List
    from pathlib import Path
    from datetime import datetime, timezone

    import numpy as np
    import pandas as pd
    import tqdm
    import torch
    import torch.nn as nn
    import torch.nn.functional as F
    from torch.autograd import Variable
    import dask.dataframe as dask_df
    from dask_ml.model_selection import train_test_split

    from google.cloud import aiplatform as vertex_ai
    from google.cloud.aiplatform import explain



    ## Read environmental variables
    def gcs_path_to_local_path(old_path: str) -> str:
        new_path = old_path.replace("gs://", "/gcs/")
        return new_path

    def resample(df: pd.DataFrame, replace: bool, frac: float = 1, random_state: int = 8) -> pd.DataFrame:
        shuffled_df = df.sample(frac=frac, replace=replace, random_state=random_state)
        return shuffled_df

    def preprocess(df: pd.DataFrame, drop_cols: List[str] = None) -> pd.DataFrame:
        if drop_cols:
            df = df.drop(columns=drop_cols, errors="ignore")

        # Drop rows with NaN"s
        df = df.dropna()

        # Convert integer valued (numeric) columns to floating point
        numeric_columns = df.select_dtypes(["float32", "float64"]).columns
        numeric_format = {col:"float32" for col in numeric_columns}
        df = df.astype(numeric_format)

        return df

    class Model(nn.Module):
        def __init__(self, input_dim):
            super().__init__()
            self.l1 = nn.Linear(input_dim, 100)
            self.l2 = nn.Linear(100, 30)
            self.l3 = nn.Linear(30, 2)

        def forward(self, x):
            x = F.relu(self.l1(x))
            x = F.relu(self.l2(x))
            x = F.relu(self.l3(x))
            x = F.softmax(x, dim=1)
            return x

    ## Training variables
    N_PARTITIONS = 4
    vertex_ai.init(project=project, location=location, staging_bucket=f"gs://{bucket}")

    # manually extract and split 
    dataset_id = dataset.metadata['resourceName'].split("/")[-1]
    dataset = vertex_ai.TabularDataset(dataset.metadata['resourceName'])
    dataset_uris = dataset.gca_resource.metadata['inputConfig']['gcsSource']['uri']
    dataset_uris = [gcs_path_to_local_path(dataset_uri) for dataset_uri in dataset_uris]
    print("train_model - dataset_uris", dataset_uris)
    ds_df = dask_df.read_csv(dataset_uris, dtype=dtype)
    train_df, test_df = train_test_split(ds_df, test_size=0.2, shuffle=True)
    eval_df, test_df = train_test_split(test_df, test_size=0.5)
    TRAINING_DIR = (
        f"/gcs/{bucket}/aiplatform-custom-training-"
        f"{datetime.now(timezone.utc).strftime('%Y-%m-%d-%H:%M:%S.%f')}"
    )
    TRAINING_DATA_DIR = (
        f"{TRAINING_DIR}/dataset-{dataset_id}-tables-"
        f"{datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%fZ')}"
    )
    TRAINING_DATA_PATH = f"{TRAINING_DATA_DIR}/training-0000*-of-0000{N_PARTITIONS}.csv"
    EVAL_DATA_PATH = f"{TRAINING_DATA_DIR}/validation-0000*-of-0000{N_PARTITIONS}.csv"
    TEST_DATA_PATH = f"{TRAINING_DATA_DIR}/test-0000*-of-0000{N_PARTITIONS}.csv"
    train_df.repartition(npartitions=N_PARTITIONS).to_csv(TRAINING_DATA_PATH)
    eval_df.repartition(npartitions=N_PARTITIONS).to_csv(EVAL_DATA_PATH)
    test_df.repartition(npartitions=N_PARTITIONS).to_csv(TEST_DATA_PATH)
    print("train_model - dataset prepared")
    MODEL_DIR = f"{TRAINING_DIR}/model"
    MODEL_PATH = f"{MODEL_DIR}/model.pt"

    # preprocessing
    train_df = train_df.compute()
    test_df = test_df.compute()
    preprocessed_train_df = preprocess(train_df, drop_cols)
    preprocessed_test_df = preprocess(test_df, drop_cols)
    
    # downsampling
    train_nfraud_df = preprocessed_train_df[preprocessed_train_df[target_col]==0]
    train_fraud_df = preprocessed_train_df[preprocessed_train_df[target_col]==1]
    train_nfraud_downsample = resample(
        train_nfraud_df,
        replace=False, 
        frac=len(train_fraud_df)/len(train_df)
    )
    ds_preprocessed_train_df = pd.concat([train_nfraud_downsample, train_fraud_df])
    print("train_model - ds_preprocessed_train_df", ds_preprocessed_train_df.shape)
    # Train torch model
    device = "cuda" if torch.cuda.is_available() else "cpu"
    device = torch.device(device)

    # target, features split
    x_train = ds_preprocessed_train_df[feat_cols].astype(np.float32).values
    x_train = Variable(torch.from_numpy(x_train)).float().to(device)
    y_train = ds_preprocessed_train_df.loc[:, target_col].astype(int).values
    y_train = Variable(torch.from_numpy(y_train)).long().to(device)
    x_true = preprocessed_test_df[feat_cols].astype(np.float32).values
    x_true = Variable(torch.from_numpy(x_true)).float().to(device)
    y_true = preprocessed_test_df.loc[:, target_col].astype(int).values
    y_true = Variable(torch.from_numpy(y_true)).long().to(device)
    preprocessed_test_dask_df = dask_df.from_pandas(preprocessed_test_df, npartitions=4)
    preprocessed_test_dask_df.to_csv(os.path.join(test_ds.path, "test-0000*-of-0000{N_PARTITIONS}.csv"))
    print("train_model - preprocessed_test_df", preprocessed_test_df.shape)
    # start training; for demo purpose, no validation/early stopping is implemented
    EPOCHS = 5
    model = Model(x_train.shape[1]).to(device)
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    loss_fn = nn.CrossEntropyLoss()
    for _ in tqdm.trange(EPOCHS):
        y_pred = model(x_train)
        loss = loss_fn(y_pred, y_train)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    if not Path(MODEL_DIR).exists():
        Path(MODEL_DIR).mkdir(parents=True, exist_ok=True)
    model_jit = torch.jit.trace(model, example_kwarg_inputs={"x": torch.from_numpy(np.random.random((10, x_train.shape[1]))).float()})
    torch.jit.save(model_jit, MODEL_PATH)
    print("train_model - jit model saved")
    # upload model
    explanation_params = explain.ExplanationParameters(
        sampled_shapley_attribution=explain.SampledShapleyAttribution(
            path_count=10,
        ),
    )
    explanation_metadata = explain.ExplanationMetadata(
        inputs={
            feat: {} for feat in feat_cols
        },
        outputs={
            "probability": {}
        }
    )
    vertex_ai_model = vertex_ai.Model.upload(
        serving_container_image_uri=model_serving_image_uri,
        artifact_uri=MODEL_DIR.replace("/gcs/", "gs://"),
        display_name=model_reg,
        description="Vertex AI Pipeline Custom Model",
        explanation_metadata=explanation_metadata,
        explanation_parameters=explanation_params,
    )
    trained_model.uri = vertex_ai_model.uri
    trained_model.metadata["resourceName"] = vertex_ai_model.resource_name
    trained_model.metadata["path"] = MODEL_PATH
    print(trained_model.metadata)
    print(trained_model.path, trained_model.uri)


compiler.Compiler().compile(train_model, TRAIN_MODEL)

#### Define an evaluate custom component
Next you will build a custom component that will evaluate our pytorch model. This component will output `avg_precision_score` so that it can be used downstream for validating the model before deployment. 

In [32]:
from typing import NamedTuple


@dsl.component(
    base_image=config.BASE_IMAGE,
    packages_to_install=[
        "gcsfs==2025.3.2",
        "numpy==2.2.4",
        "pandas==2.2.3",
        "torch==2.6.0",
        "dask[dataframe]==2025.3.0",
        "pyarrow==19.0.1",
        "distributed==2025.3.0",
        "google-cloud-pipeline-components==2.17.0",
        "google-cloud-aiplatform==1.88.0",
        "scikit-learn==1.6.1",
    ],
)
def evaluate_model(
    threshold: float,
    model_in: dsl.Input[artifact_types.VertexModel],
    test_ds: dsl.Input[dsl.Dataset],
    dtype: dict,
    target_col: str,
    feat_cols: List[str],
    metrics_uri: str,
) -> NamedTuple(
    "outputs",
    meta_metrics=dsl.Metrics,
    graph_metrics=dsl.ClassificationMetrics,
    avg_prec=float,
):
    # Libraries --------------------------------------------------------------------------------------------------------------------------
    import json
    from pathlib import Path
    import dask.dataframe as dask_df
    import numpy as np
    import pandas as pd
    import torch
    import torch.nn as nn
    from sklearn.metrics import (confusion_matrix, average_precision_score, f1_score, 
                                log_loss, precision_score, recall_score)


    device = "cuda" if torch.cuda.is_available() else "cpu"
    device = torch.device(device)


    def evaluate_model(model: nn.Module, x_true: np.ndarray, y_true: np.ndarray | pd.Series, thresh: float) -> dict:
        #calculate metrics
        metrics={}
        
        x_true = torch.from_numpy(x_true.astype(np.float32)).to(device)
        y_true = y_true.astype(int)
        y_score = model(x_true)[:, 1].detach().numpy()
        y_pred = np.where(y_score >= thresh, 1, 0)
        c_matrix = confusion_matrix(y_true, y_pred)
        
        avg_precision_score = round(average_precision_score(y_true, y_score), 3)
        f1 = round(f1_score(y_true, y_pred), 3)
        lg_loss = round(log_loss(y_true, y_pred), 3)
        prec_score = round(precision_score(y_true, y_pred), 3)
        rec_score = round(recall_score(y_true, y_pred), 3)
        
        metrics["confusion_matrix"] = c_matrix.tolist()
        metrics["avg_precision_score"] = avg_precision_score
        metrics["f1_score"] = f1
        metrics["log_loss"] = lg_loss
        metrics["precision_score"] = prec_score
        metrics["recall_score"] = rec_score
        
        return metrics


    # load the dataframe, dask save to path as folder, need to put wildcard
    print("eval", test_ds.path)
    print("eval", model_in.path)
    test_df = dask_df.read_csv(f"{test_ds.path}/*", dtype=dtype)
    test_df = test_df.compute()
    model = torch.jit.load(model_in.metadata["path"])
    eval_metrics = evaluate_model(model, test_df[feat_cols].values, test_df[target_col].values, thresh=threshold)

    # Variables --------------------------------------------------------------------------------------------------------------------------
    metrics_path = metrics_uri.replace("gs://", "/gcs/")
    labels = ["not fraud", "fraud"]

    # Main -------------------------------------------------------------------------------------------------------------------------------
    metrics_path_dir = Path(metrics_path).parent
    if not metrics_path_dir.exists():
        metrics_path_dir.mkdir(parents=True, exist_ok=True)
    with open(metrics_path, mode="w") as metrics_file:
        json.dump(eval_metrics, metrics_file, indent=2)

    ## metrics
    c_matrix = eval_metrics["confusion_matrix"]
    avg_precision_score = eval_metrics["avg_precision_score"]
    f1 = eval_metrics["f1_score"]
    lg_loss = eval_metrics["log_loss"]
    prec_score = eval_metrics["precision_score"]
    rec_score = eval_metrics["recall_score"]

    meta_metrics = dsl.Metrics()
    meta_metrics.log_metric("avg_precision_score", avg_precision_score)
    meta_metrics.log_metric("f1_score", f1)
    meta_metrics.log_metric("log_loss", lg_loss)
    meta_metrics.log_metric("precision_score", prec_score)
    meta_metrics.log_metric("recall_score", rec_score)
    graph_metrics = dsl.ClassificationMetrics()
    graph_metrics.log_confusion_matrix(labels, c_matrix)


    ## model metadata
    # model_out.metadata["framework"] = "torch"
    # model_out.metadata["algorithm"] = "FNN"
    # model_out.metadata["type"] = "classification"
    print("metadata metrics", meta_metrics.metadata)
    print("graph metrics", graph_metrics.metadata)

    eval_output = NamedTuple(
        "outputs",
        meta_metrics=dsl.Metrics,
        graph_metrics=dsl.ClassificationMetrics,
        avg_prec=float, 
    )
    return eval_output(
        meta_metrics=meta_metrics,
        graph_metrics=graph_metrics,
        avg_prec=avg_precision_score,
    )


compiler.Compiler().compile(evaluate_model, EVALUATE)

#### Author your pipeline
Next you will author the pipeline using the KFP SDK. This pipeline consists of the following steps:

* Ingest features
* Create Vertex AI Dataset
* Train Pytorch model
* Evaluate model
* Condition
* Create endpoint
* Deploy model into endpoint

In [33]:
@dsl.pipeline(
    pipeline_root=config.PIPELINE_ROOT,
    name=config.PIPELINE_NAME,
)
def pipeline(
    project_id: str = config.PROJECT_ID,
    location: str = config.REGION,
    bucket_name: str = config.BUCKET_NAME,
    deploy_machine_type: str = config.DEPLOY_COMPUTE,
    metrics_uri: str = config.METRICS_URI,
    model_threshold: float = config.MODEL_THRESHOLD,
    thold: float = config.AVG_PR_THRESHOLD,
):
    # Ingest data from featurestore
    ingest_features_op = ingest_features_gcs(
        project_id=project_id,
        location=location,
        bucket_name=bucket_name,
        read_instances_table=config.READ_INSTANCES_TABLE,
    )

    # create dataset
    dataset_create_op = dataset.TabularDatasetCreateOp(
        display_name=config.DATASET_NAME,
        project=project_id,
        gcs_source=ingest_features_op.output,
    ).after(ingest_features_op)

    # custom training job component - script
    persistence_resource_id = (
        config.PERSISTENT_RESOURCE_ID if config.PERSISTENT_RESOURCE_ID 
        else PERSISTENT_RESOURCE_ID_PLACEHOLDER
    )
    service_account = config.SERVICE_ACCOUNT if config.SERVICE_ACCOUNT else ""
    train_model_component = custom_job.create_custom_training_job_from_component(
        train_model,
        display_name=config.JOB_NAME,
        replica_count=config.REPLICA_COUNT,
        machine_type=config.TRAIN_COMPUTE,
        base_output_directory=f"gs://{config.BUCKET_NAME}",
        service_account=service_account,
        persistent_resource_id=persistence_resource_id,
    )
    train_model_op = train_model_component(
        project=project_id,
        location=config.REGION,
        bucket=config.BUCKET_NAME,
        dataset=dataset_create_op.outputs["dataset"],
        dtype=config.DATA_SCHEMA,
        drop_cols=config.DROP_COLUMNS,
        target_col=config.TARGET_COLUMN,
        feat_cols=config.FEAT_COLUMNS,
        model_reg=config.MODEL_REGISTRY,
        model_serving_image_uri=config.MODEL_SERVING_IMAGE_URI,
    ).after(dataset_create_op)

    # evaluate component
    evaluate_model_op = evaluate_model(
        threshold=model_threshold,
        model_in=train_model_op.outputs["trained_model"], 
        test_ds=train_model_op.outputs["test_ds"],
        dtype=config.DATA_SCHEMA,
        target_col=config.TARGET_COLUMN,
        feat_cols=config.FEAT_COLUMNS,
        metrics_uri=metrics_uri,
    ).after(train_model_op)

    # if threshold on avg_precision_score
    with dsl.If(
        evaluate_model_op.outputs["avg_prec"] > thold, name=config.AVG_PR_CONDITION
    ):
        # create endpoint
        create_endpoint_op = endpoint.EndpointCreateOp(
            display_name=f"{config.ENDPOINT_NAME}_torch_pipeline_{config.ID}",
            project=project_id,
            location=config.REGION,
        ).after(evaluate_model_op)

        # deploy the model
        custom_model_deploy_op = endpoint.ModelDeployOp(
            model=train_model_op.outputs["trained_model"],
            endpoint=create_endpoint_op.outputs["endpoint"],
            deployed_model_display_name=f"{config.MODEL_NAME}_torch_pipeline_{config.ID}",
            dedicated_resources_machine_type=deploy_machine_type,
            dedicated_resources_min_replica_count=config.REPLICA_COUNT,
        ).after(create_endpoint_op)

#### Compile and run the pipeline
After authoring the pipeline you can use the compiler to compile the pipeline. 

In [34]:
# compile the pipeline
pipeline_compiler = compiler.Compiler()
pipeline_compiler.compile(pipeline_func=pipeline, package_path=PIPELINE_PACKAGE_PATH)

Next you can use the Vertex AI SDK to create a job on Vertex AI Pipelines. 

In [35]:
# instantiate pipeline representation
pipeline_job = vertex_ai.PipelineJob(
    display_name=config.PIPELINE_NAME,
    template_path=PIPELINE_PACKAGE_PATH,
    pipeline_root=config.PIPELINE_ROOT,
    enable_caching=False,
    project=config.PROJECT_ID,
    location=config.REGION,
)

In [36]:
pipeline_job.run(
    service_account=config.SERVICE_ACCOUNT,
    sync=False,
)

Now you can test the endpoint when the model has completed deployment.

In [None]:
import numpy as np

endpoint = vertex_ai.Endpoint("projects/<project-num>/locations/us-central1/endpoints/<endpoint-id>")

payload = {
    "instances": [{feat: val for feat, val in zip(config.FEAT_COLUMNS, vals)} for vals in np.random.random((2, len(config.FEAT_COLUMNS))).tolist()]
}
resp = endpoint.explain(payload["instances"])

In [39]:
print(resp)

Prediction(predictions=[{'class': 'is_fraud', 'probability': 0.5399022102355957}, {'class': 'is_fraud', 'probability': 0.5536338686943054}], deployed_model_id='774763171930963968', metadata=None, model_version_id=None, model_resource_name=None, explanations=[attributions {
  baseline_output_value: 0.54401206970214844
  instance_output_value: 0.5399022102355957
  feature_attributions {
    struct_value {
      fields {
        key: "tx_amount"
        value {
          number_value: 0.00057162642478942867
        }
      }
      fields {
        key: "terminal_id_risk_7day_window"
        value {
          number_value: 0.003065508604049682
        }
      }
      fields {
        key: "terminal_id_risk_1day_window"
        value {
          number_value: -0.0004806816577911377
        }
      }
      fields {
        key: "terminal_id_risk_14day_window"
        value {
          number_value: -0.00096499323844909666
        }
      }
      fields {
        key: "terminal_id_nb_tx_7day_

You have successfully launched an e2e ML pipeline using Vertex AI Pipeline.

Congratuations! You have completed the Vertex AI 101! Before you go, you may want to clean up necessary resources using Notebook: `07_resource_cleanup.ipynb`.