In [1]:
# USER_FLAG = "--user"
# !pip3 install {USER_FLAG} google-cloud-aiplatform
# !pip3 install {USER_FLAG} kfp==1.8.9

In [None]:
import matplotlib.pyplot as plt
import pandas as pd

from kfp.v2 import compiler, dsl
from kfp.v2.dsl import pipeline, component, Artifact, Dataset, Input, Metrics, Model, Output, InputPath, OutputPath

from google.cloud import aiplatform

from google.cloud import aiplatform_v1

In [None]:
PROJECT_ID = "vertex-tests-377722"
BUCKET_NAME="gs://" + PROJECT_ID + "-bucket"

PATH=%env PATH
%env PATH={PATH}:/home/jupyter/.local/bin
REGION="us-central1"

PIPELINE_ROOT = f"{BUCKET_NAME}/pipeline_root/"

env: PATH=/usr/local/cuda/bin:/opt/conda/bin:/opt/conda/condabin:/usr/local/bin:/usr/bin:/bin:/usr/local/games:/usr/games:/home/jupyter/.local/bin


In [None]:
@component(
    packages_to_install=["google-cloud-bigquery", "pandas", "pyarrow", "db-dtypes"],
    base_image="python:3.7",
    output_component_file="pipelines/create_dataset.yaml"
)
def get_dataframe(
    bq_table: str,
    bean_dataset_path: OutputPath("Dataset")
):
    from google.cloud import bigquery
    import pandas as pd
    import os

    project_number = os.environ["CLOUD_ML_PROJECT_ID"]
    bqclient = bigquery.Client(project=project_number)
    
    table = bigquery.TableReference.from_string(bq_table)
    rows = bqclient.list_rows(table)
    dataframe = rows.to_dataframe(create_bqstorage_client=True)

    dataframe = dataframe.sample(frac=1, random_state=2)
    print(f"This is the output data path: {bean_dataset_path}")
    dataframe.to_csv(bean_dataset_path)

In [None]:
@component(
    packages_to_install=["db-dtypes"],
    base_image="us-docker.pkg.dev/vertex-ai/training/scikit-learn-cpu.0-23:latest",
    output_component_file="pipelines/beans_model_component.yaml",
)
def sklearn_train(
    dataset: Input[Dataset],
    bean_metrics: Output[Metrics],
    model: Output[Model]
):
    from sklearn.tree import DecisionTreeClassifier
    from sklearn.metrics import roc_curve
    from sklearn.model_selection import train_test_split
    from joblib import dump

    import pandas as pd
    
    df = pd.read_csv(dataset.path)
    labels = df.pop("Class").tolist()
    data = df.values.tolist()
    x_train, x_test, y_train, y_test = train_test_split(data, labels)

    skmodel = DecisionTreeClassifier()
    skmodel.fit(x_train, y_train)
    score = skmodel.score(x_test, y_test)

    bean_metrics.log_metric("accuracy",(score * 100.0))
    bean_metrics.log_metric("framework", "Scikit Learn")
    bean_metrics.log_metric("dataset_size", len(df))
    dump(skmodel, model.path + ".joblib")

In [None]:
# https://www.kubeflow.org/docs/components/pipelines/v2/author-a-pipeline/component-io/
@component(
    packages_to_install=["google-cloud-aiplatform"],
    base_image="python:3.7",
    output_component_file="pipelines/beans_deploy_component.yaml",
)
def deploy_model(
    model: Input[Model],
    bean_metrics: Input[Metrics],
    project: str,
    region: str,
    prev_accuracy: float,
    endpoint_display_name: str,
    bean_vertex_endpoint: Output[Artifact],
    bean_vertex_model: Output[Model]
):
    from google.cloud import aiplatform
    import logging

    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    
    aiplatform.init(project=project, location=region)
    score = bean_metrics.metadata['accuracy']
    logger.info(f"current score: {score}")
    logger.info(f"previous_bean_metrics: {prev_accuracy}")
    
    if prev_accuracy > score:
        logger.info("previous model better than current one. Do something!")
    else:
        logger.info("all good")

    
    logger.info(f"Uploading model in {model.uri} to registry")
    deployed_model = aiplatform.Model.upload(
        display_name="beans-vertex-model",
        artifact_uri = model.uri.replace("model", ""),
        serving_container_image_uri="us-docker.pkg.dev/vertex-ai/prediction/sklearn-cpu.0-23:latest"
    )
    deployed_model.wait()
    logger.info(f"Finished uploading model")

    target_endpoint = None
    for endpoint in aiplatform.Endpoint.list(order_by="update_time desc"):
        if endpoint.display_name == endpoint_display_name:
            logger.info(f"Endpoint already created")
            target_endpoint = endpoint
    
    if target_endpoint is None:
        logger.info(f"Creating endpoint")
        target_endpoint = aiplatform.Endpoint.create(
            project=project,
            display_name=endpoint_display_name,
        )
        logger.info(f"Finished creating endpoint")

    logger.info(f"Deploying model to endpoint")
    target_endpoint.deploy(
        model=deployed_model,
        min_replica_count=1,
        max_replica_count=1,
        machine_type='n1-standard-4',
        traffic_percentage=100,
    )
    logger.info(f"Finished deploying model to endpoint")

    # Save data to the output params
    bean_vertex_endpoint.uri = target_endpoint.resource_name
    bean_vertex_model.uri = deployed_model.resource_name

In [None]:
@component(
    packages_to_install=["google-cloud-aiplatform"],
    base_image="python:3.7",
    output_component_file="pipelines/beans_monitoring_component.yaml",
)
def create_monitoring(
    dataset: Input[Dataset],
    project: str,
    region: str,
    endpoint_display_name: str,
    user_emails: list,
):
    from google.cloud import aiplatform
    import logging
    

    logger = logging.getLogger()
    logger.setLevel(logging.INFO)
    
    DEFAULT_THRESHOLD_VALUE = 0.001

    SKEW_THRESHOLDS = {
        "Area": DEFAULT_THRESHOLD_VALUE,
    }
    DRIFT_THRESHOLDS = {
        "Area": DEFAULT_THRESHOLD_VALUE,
    }
    ATTRIB_SKEW_THRESHOLDS = {
        "Area": DEFAULT_THRESHOLD_VALUE,
    }
    ATTRIB_DRIFT_THRESHOLDS = {
        "Area": DEFAULT_THRESHOLD_VALUE,
    }

    aiplatform.init(project=project, location=region)

    monitor_display_name = 'bean_monitoring'

    # Create sampling configuration
    random_sampling = aiplatform.model_monitoring.RandomSampleConfig(sample_rate=0.8)

    # Create schedule configuration
    schedule_config = aiplatform.model_monitoring.ScheduleConfig(monitor_interval=1)

    # Create alerting configuration.
    alerting_config = aiplatform.model_monitoring.EmailAlertConfig(
        user_emails=user_emails,
        enable_logging=True
    )

    skew_detection_config = aiplatform.model_monitoring.SkewDetectionConfig(
        data_source=dataset.path,
        skew_thresholds=SKEW_THRESHOLDS,
        attribute_skew_thresholds=ATTRIB_SKEW_THRESHOLDS,
        target_field="Class",
    )
    drift_detection_config = aiplatform.model_monitoring.DriftDetectionConfig(
        drift_thresholds=DRIFT_THRESHOLDS,
        attribute_drift_thresholds=ATTRIB_DRIFT_THRESHOLDS,
    )

    objective_config = aiplatform.model_monitoring.ObjectiveConfig(
        drift_detection_config=aiplatform.model_monitoring.DriftDetectionConfig(
            drift_thresholds=DRIFT_THRESHOLDS,
            attribute_drift_thresholds=ATTRIB_DRIFT_THRESHOLDS,
        )
    )
    
    for endpoint in aiplatform.Endpoint.list(order_by="update_time desc"):
        if endpoint.display_name == endpoint_display_name:
            target_endpoint = endpoint


    # Finding current model_id (considering split is always 100% in this case)
    model_ids = []
    creation_datetimes = []

    for model in target_endpoint.list_models():
        model_ids.append(model.id)
        creation_datetimes.append(model.create_time)

    current_model_idx = creation_datetimes.index(max(creation_datetimes))
    model_id = model_ids[current_model_idx]
    
    logger.info(f"updating {model_id}")
    target_monitor = None
    for monitor in aiplatform.ModelDeploymentMonitoringJob.list():
        if monitor.display_name == monitor_display_name:
            target_monitor = monitor
    
    if target_monitor:
        logger.info("updating monitoring job")
        job = target_monitor.update(
            display_name=monitor_display_name,
            logging_sampling_strategy=random_sampling,
            schedule_config=schedule_config,
            alert_config=alerting_config,
            objective_configs=objective_config,
            deployed_model_ids=[model_id],
        )
    else:
        logger.info("creating monitoring job")
        job = aiplatform.ModelDeploymentMonitoringJob.create(
            display_name=monitor_display_name,
            logging_sampling_strategy=random_sampling,
            schedule_config=schedule_config,
            alert_config=alerting_config,
            objective_configs=objective_config,
            project=project,
            location=region,
            deployed_model_ids=[model_id],
            endpoint=target_endpoint
        )

In [None]:
@pipeline(
    pipeline_root=PIPELINE_ROOT,
    name="ml-pipeline",
)
def pipeline(
    prev_accuracy: float,
    user_emails: list,
    bq_table: str = "",
    project: str = PROJECT_ID,
    region: str = REGION,
    endpoint_display_name: str = "bean-model-endpoint",
):
    dataset_task = get_dataframe(bq_table)

    model_task = sklearn_train(dataset_task.output)

    deploy_task = deploy_model(
        model=model_task.outputs["model"],
        bean_metrics=model_task.outputs["bean_metrics"],
        prev_accuracy=prev_accuracy,
        project=project,
        region=region,
        endpoint_display_name=endpoint_display_name
    )
    
    monitoring_task = create_monitoring(
        dataset=dataset_task.output,
        project=project,
        region=region,
        endpoint_display_name=endpoint_display_name
    ).after(deploy_task)

In [None]:
compiler.Compiler().compile(
    pipeline_func=pipeline,
    package_path="pipelines/ml_pipeline.json"
)



TypeError: Create monitoring() missing 1 required positional argument: 'user_emails'

# Running the pipeline

In [29]:
from datetime import datetime

TIMESTAMP = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")

In [30]:
prev_accuracy = aiplatform.get_pipeline_df(pipeline="ml-pipeline").iloc[0]['metric.accuracy']
run1 = aiplatform.PipelineJob(
    display_name="bean-pipeline",
    template_path="pipelines/ml_pipeline.json",
    job_id=f"bean-pipeline-{TIMESTAMP}",
    parameter_values={"bq_table": "sara-vertex-demos.beans_demo.small_dataset",
                      "prev_accuracy": prev_accuracy,
                      "user_emails": [os.environ("user_email")]},
    enable_caching=False,
)
# parameter_values={"bq_table": "sara-vertex-demos.beans_demo.large_dataset"},

In [31]:
run1.submit()

Creating PipelineJob
PipelineJob created. Resource name: projects/785901084017/locations/us-central1/pipelineJobs/bean-pipeline-2023-02-16-16-52-35
To use this PipelineJob in another session:
pipeline_job = aiplatform.PipelineJob.get('projects/785901084017/locations/us-central1/pipelineJobs/bean-pipeline-2023-02-16-16-52-35')
View Pipeline Job:
https://console.cloud.google.com/vertex-ai/locations/us-central1/pipelines/runs/bean-pipeline-2023-02-16-16-52-35?project=785901084017


# Sending data for prediction

In [47]:
df = pd.read_csv("gs://vertex-tests-377722-bucket/pipeline_root/785901084017/bean-pipeline-2023-02-15-18-15-24/get-dataframe_3540178127177121792/bean_dataset_path")

def get_features_and_labels(df):
    target = "Class"
    return df.drop(target, axis=1).values, df[target].values

test_dataset, test_labels = get_features_and_labels(df)

for endpoint in aiplatform.Endpoint.list(order_by="update_time desc"):
    if endpoint.display_name == "bean-model-endpoint":
        target_endpoint = endpoint

for i in range(50):
    prediction = target_endpoint.predict(test_dataset.tolist()[0:50])


# Getting pipeline metrics

In [49]:
df = aiplatform.get_pipeline_df(pipeline="ml-pipeline")
df

Unnamed: 0,pipeline_name,run_name,param.input:region,param.input:bq_table,param.input:prev_accuracy,param.input:endpoint_display_name,param.input:project,metric.dataset_size,metric.accuracy,metric.framework
0,ml-pipeline,bean-pipeline-2023-02-16-16-52-35,us-central1,sara-vertex-demos.beans_demo.small_dataset,98.857143,bean-model-endpoint,vertex-tests-377722,700.0,100.0,Scikit Learn
1,ml-pipeline,bean-pipeline-2023-02-15-22-38-10,us-central1,sara-vertex-demos.beans_demo.small_dataset,98.857143,bean-model-endpoint,vertex-tests-377722,700.0,98.857143,Scikit Learn
2,ml-pipeline,bean-pipeline-2023-02-15-22-26-24,us-central1,sara-vertex-demos.beans_demo.small_dataset,98.857143,bean-model-endpoint,vertex-tests-377722,700.0,98.857143,Scikit Learn
3,ml-pipeline,bean-pipeline-2023-02-15-22-24-13,us-central1,sara-vertex-demos.beans_demo.small_dataset,98.857143,bean-model-endpoint,vertex-tests-377722,700.0,98.857143,Scikit Learn
4,ml-pipeline,bean-pipeline-2023-02-15-22-20-06,us-central1,sara-vertex-demos.beans_demo.small_dataset,98.857143,bean-model-endpoint,vertex-tests-377722,700.0,98.857143,Scikit Learn


# Querying artifacts

In [50]:
API_ENDPOINT = "{}-aiplatform.googleapis.com".format(REGION)
metadata_client = aiplatform_v1.MetadataServiceClient(
  client_options={
      "api_endpoint": API_ENDPOINT
  }
)

In [51]:

MODEL_FILTER="schema_title = \"system.Model\""
artifact_request = aiplatform_v1.ListArtifactsRequest(
    parent="projects/{0}/locations/{1}/metadataStores/default".format(PROJECT_ID, REGION),
    filter=MODEL_FILTER
)
model_artifacts = metadata_client.list_artifacts(artifact_request)

In [52]:
LIVE_FILTER = "create_time > \"2023-02-10T00:00:00-00:00\" AND state = LIVE"
artifact_req = {
    "parent": "projects/{0}/locations/{1}/metadataStores/default".format(PROJECT_ID, REGION),
    "filter": LIVE_FILTER
}
live_artifacts = metadata_client.list_artifacts(artifact_req)

In [None]:
data = {'uri': [], 'createTime': [], 'type': []}

for i in live_artifacts:
    data['uri'].append(i.uri)
    data['createTime'].append(i.create_time)
    data['type'].append(i.schema_title)

df = pd.DataFrame.from_dict(data)
df

Unnamed: 0,uri,createTime,type
0,gs://vertex-tests-377722-bucket/pipeline_root/...,2023-02-16 16:55:15.940000+00:00,system.Metrics
1,projects/785901084017/locations/us-central1/mo...,2023-02-16 16:58:00.051000+00:00,system.Model
2,projects/785901084017/locations/us-central1/en...,2023-02-16 16:58:00.084000+00:00,system.Artifact
3,gs://vertex-tests-377722-bucket/pipeline_root/...,2023-02-16 16:55:15.910000+00:00,system.Model
4,gs://vertex-tests-377722-bucket/pipeline_root/...,2023-02-16 16:52:52.420000+00:00,system.Dataset
...,...,...,...
92,gs://vertex-tests-377722-bucket/pipeline_root/...,2023-02-14 18:33:54.089000+00:00,system.Model
93,gs://vertex-tests-377722-bucket/pipeline_root/...,2023-02-14 18:32:41.543000+00:00,system.Dataset
94,gs://vertex-tests-377722-bucket/pipeline_root/...,2023-02-14 17:45:50.190000+00:00,system.Metrics
95,gs://vertex-tests-377722-bucket/pipeline_root/...,2023-02-14 17:45:50.138000+00:00,system.Model


In [None]:
path = "/gcs/vertex-tests-377722-bucket/pipeline_root/785901084017/bean-pipeline-2023-02-15-18-15-24/get-dataframe_3540178127177121792/bean_dataset_path/"
from google.cloud import aiplatform
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

DEFAULT_THRESHOLD_VALUE = 0.001

SKEW_THRESHOLDS = {
    "Area": DEFAULT_THRESHOLD_VALUE,
}
DRIFT_THRESHOLDS = {
    "Area": DEFAULT_THRESHOLD_VALUE,
}
ATTRIB_SKEW_THRESHOLDS = {
    "Area": DEFAULT_THRESHOLD_VALUE,
}
ATTRIB_DRIFT_THRESHOLDS = {
    "Area": DEFAULT_THRESHOLD_VALUE,
}

endpoint_display_name = "bean-model-endpoint"
aiplatform.init(project=PROJECT_ID, location=REGION)

# Create sampling configuration
random_sampling = aiplatform.model_monitoring.RandomSampleConfig(sample_rate=0.8)

# Create schedule configuration
schedule_config = aiplatform.model_monitoring.ScheduleConfig(monitor_interval=1)

# Create alerting configuration.
alerting_config = aiplatform.model_monitoring.EmailAlertConfig(
    user_emails=["diego2.rdrigues@gmail.com"],
    enable_logging=True
)


        
skew_detection_config = aiplatform.model_monitoring.SkewDetectionConfig(
    data_source=path,
    skew_thresholds=SKEW_THRESHOLDS,
    attribute_skew_thresholds=ATTRIB_SKEW_THRESHOLDS,
    target_field="Class",
)
drift_detection_config = aiplatform.model_monitoring.DriftDetectionConfig(
    drift_thresholds=DRIFT_THRESHOLDS,
    attribute_drift_thresholds=ATTRIB_DRIFT_THRESHOLDS,
)

logger.info("creating objective_config")
objective_config = aiplatform.model_monitoring.ObjectiveConfig(
    drift_detection_config=aiplatform.model_monitoring.DriftDetectionConfig(
        drift_thresholds=DRIFT_THRESHOLDS,
        attribute_drift_thresholds=ATTRIB_DRIFT_THRESHOLDS,
    )
)

# Finding model_id
model_ids = []
creation_datetimes = []
endpoint_display_name = "bean-model-endpoint"
monitor_display_name = 'bean_monitoring'

# TODO: receber como parâmetro
for endpoint in aiplatform.Endpoint.list(order_by="update_time desc"):
    if endpoint.display_name == endpoint_display_name:
        target_endpoint = endpoint

for model in target_endpoint.list_models():
    model_ids.append(model.id)
    creation_datetimes.append(model.create_time)

idx = creation_datetimes.index(max(creation_datetimes))
model_id = model_ids[idx]

target_monitor = None
for monitor in aiplatform.ModelDeploymentMonitoringJob.list():
    if monitor.display_name == monitor_display_name:
        target_monitor = monitor

print(target_monitor)

job = target_monitor.update(
    display_name=monitor_display_name,
    logging_sampling_strategy=random_sampling,
    schedule_config=schedule_config,
    alert_config=alerting_config,
    objective_configs=objective_config,
    deployed_model_ids=[model_id],
)

print("done!")

<google.cloud.aiplatform.jobs.ModelDeploymentMonitoringJob object at 0x7fe38117a750> 
resource name: projects/785901084017/locations/us-central1/modelDeploymentMonitoringJobs/7473684686397505536
done!
