In [4]:
PROJECT_ID = "dev-amer-analyt-vertex-svc-aa"  
LOCATION = "us-east1" 
USER_EMAIL = "abhishek.patil3@fractal.ai"  
BUCKET_URI = f"gs://dev-amer-analyt-vertex-svc-aa-vertex-staging-us-central1/data_drift" 


In [5]:
import os
import random

import google.cloud.aiplatform as aiplatform
from google.cloud import bigquery
from google.cloud.aiplatform import model_monitoring

aiplatform.init(project=PROJECT_ID, location=LOCATION)
bqclient = bigquery.Client(project=PROJECT_ID)

In [6]:

DEPLOY_VERSION = "xgboost-cpu.1-1"

DEPLOY_IMAGE = "{}-docker.pkg.dev/vertex-ai/prediction/{}:latest".format(
    LOCATION.split("-")[0], DEPLOY_VERSION
)

print("Deployment:", DEPLOY_IMAGE)

Deployment: us-docker.pkg.dev/vertex-ai/prediction/xgboost-cpu.1-1:latest


In [7]:
MACHINE_TYPE = "n1-standard"

VCPU = "4"
TRAIN_COMPUTE = MACHINE_TYPE + "-" + VCPU
print("Train machine type", TRAIN_COMPUTE)

MACHINE_TYPE = "n1-standard"

VCPU = "4"
DEPLOY_COMPUTE = MACHINE_TYPE + "-" + VCPU
print("Deploy machine type", DEPLOY_COMPUTE)

Train machine type n1-standard-4
Deploy machine type n1-standard-4


In [8]:

MODEL_ARTIFACT_URI = (
    "gs://cloud-samples-data/vertex-ai/model-deployment/models/xgboost_iris"
)

model = aiplatform.Model.upload(
    display_name="fracta_iris_dataddift_test",
    artifact_uri=MODEL_ARTIFACT_URI,
    serving_container_image_uri=DEPLOY_IMAGE,
    sync=True,
)

print(model)

Creating Model
Create Model backing LRO: projects/650487766623/locations/us-east1/models/8563493536375242752/operations/8193045666428092416
Model created. Resource name: projects/650487766623/locations/us-east1/models/8563493536375242752@1
To use this Model in another session:
model = aiplatform.Model('projects/650487766623/locations/us-east1/models/8563493536375242752@1')
<google.cloud.aiplatform.models.Model object at 0x7fce8c41c880> 
resource name: projects/650487766623/locations/us-east1/models/8563493536375242752


In [9]:

MIN_NODES = 1
MAX_NODES = 1


endpoint = model.deploy(
    deployed_model_display_name="fracta_iris_dataddift_test",
    machine_type=DEPLOY_COMPUTE,
    min_replica_count=MIN_NODES,
    max_replica_count=MAX_NODES,
)

Creating Endpoint
Create Endpoint backing LRO: projects/650487766623/locations/us-east1/endpoints/6847243846347128832/operations/3172657981816832000
Endpoint created. Resource name: projects/650487766623/locations/us-east1/endpoints/6847243846347128832
To use this Endpoint in another session:
endpoint = aiplatform.Endpoint('projects/650487766623/locations/us-east1/endpoints/6847243846347128832')
Deploying model to Endpoint : projects/650487766623/locations/us-east1/endpoints/6847243846347128832
Deploy Endpoint model backing LRO: projects/650487766623/locations/us-east1/endpoints/6847243846347128832/operations/6257623726565621760
Endpoint model deployed. Resource name: projects/650487766623/locations/us-east1/endpoints/6847243846347128832


In [10]:
# Monitoring Interval
MONITOR_INTERVAL = 1   # in hours

# Create schedule configuration
schedule_config = model_monitoring.ScheduleConfig(monitor_interval=MONITOR_INTERVAL)

# Create alerting configuration.
alerting_config = model_monitoring.EmailAlertConfig(
    user_emails=[USER_EMAIL], enable_logging=True
)

# Sampling rate (optional, default=.8)
SAMPLE_RATE = 0.8  # @param {type:"number"}

# Create sampling configuration
logging_sampling_strategy = model_monitoring.RandomSampleConfig(sample_rate=SAMPLE_RATE)

In [11]:
DRIFT_THRESHOLD_VALUE = 0.05
ATTRIBUTION_DRIFT_THRESHOLD_VALUE = 0.05


DRIFT_THRESHOLDS = {
    "sepal_length": DRIFT_THRESHOLD_VALUE,
    "petal_length": DRIFT_THRESHOLD_VALUE,
}

drift_config = model_monitoring.DriftDetectionConfig(drift_thresholds=DRIFT_THRESHOLDS)

In [12]:
# URI to training dataset.
DATASET_BQ_URI = "bq://dev-amer-analyt-vertex-svc-aa.dy_vertex_ai_test.fractal_iris_table_reference"  # @param {type:"string"}
# Prediction target column name in training dataset.
TARGET = "species"

SKEW_THRESHOLD_VALUE = 0.05

SKEW_THRESHOLDS = {
    "sepal_length": SKEW_THRESHOLD_VALUE,
    "petal_length": SKEW_THRESHOLD_VALUE,
}

skew_config = model_monitoring.SkewDetectionConfig(
    data_source=DATASET_BQ_URI,
    skew_thresholds=SKEW_THRESHOLDS,
    # attribute_skew_thresholds=ATTRIBUTE_SKEW_THRESHOLDS,
    target_field=TARGET,
)

In [13]:
explanation_config = model_monitoring.ExplanationConfig()

objective_config = model_monitoring.ObjectiveConfig(
    skew_detection_config=skew_config,
    drift_detection_config=drift_config,
    explanation_config=None,
)
     

In [14]:
yaml = """type: array
properties:
  sepal_length:
    type: number
  sepal_width:
    type: number
  petal_length:
    type: number
  petal_width:
    type: number
required:
  - sepal_length
  - sepal_width
  - petal_length
  - petal_width
"""

print(yaml)

with open("schema.yaml", "w") as f:
    f.write(yaml)

! gsutil cp schema.yaml {BUCKET_URI}/schema.yaml

type: array
properties:
  sepal_length:
    type: number
  sepal_width:
    type: number
  petal_length:
    type: number
  petal_width:
    type: number
required:
  - sepal_length
  - sepal_width
  - petal_length
  - petal_width

Copying file://schema.yaml [Content-Type=application/octet-stream]...
/ [1 files][  230.0 B/  230.0 B]                                                
Operation completed over 1 objects/230.0 B.                                      


In [15]:
monitoring_job = aiplatform.ModelDeploymentMonitoringJob.create(
    display_name="fractal_test_datadrift_iris",
    project=PROJECT_ID,
    location=LOCATION,
    endpoint=endpoint,
    logging_sampling_strategy=logging_sampling_strategy,
    schedule_config=schedule_config,
    alert_config=alerting_config,
    objective_configs=objective_config,
    analysis_instance_schema_uri=f"{BUCKET_URI}/schema.yaml",
)

print(monitoring_job)


Creating ModelDeploymentMonitoringJob
ModelDeploymentMonitoringJob created. Resource name: projects/650487766623/locations/us-east1/modelDeploymentMonitoringJobs/8135730324295385088
To use this ModelDeploymentMonitoringJob in another session:
mdm_job = aiplatform.ModelDeploymentMonitoringJob('projects/650487766623/locations/us-east1/modelDeploymentMonitoringJobs/8135730324295385088')
View Model Deployment Monitoring Job:
https://console.cloud.google.com/ai/platform/locations/us-east1/model-deployment-monitoring/8135730324295385088?project=650487766623
<google.cloud.aiplatform.jobs.ModelDeploymentMonitoringJob object at 0x7fce8d7ed810> 
resource name: projects/650487766623/locations/us-east1/modelDeploymentMonitoringJobs/8135730324295385088


In [4]:
jobs = monitoring_job.list()
jobs

In [5]:
job = jobs[0]
print(job.state)

In [10]:
instances = []
for _ in range(1000):
    sepal_length = random.uniform(0.5, 3.5)
    sepal_width = random.uniform(0.2, 2.0)
    petal_length = random.uniform(0.5, 2.0)
    petal_width = random.uniform(0.2, 1.5)
    instances.append([sepal_length, sepal_width, petal_length, petal_width])

In [12]:
responses = []
for i, instance in enumerate(instances):
    response = endpoint.predict(instances=[instance])
    responses.append(response)
    if i % 100 == 0:
        print(f"Completed {i} rows.")

# print the prediction for the first instance
print(responses[0])

Completed 0 rows.
Completed 100 rows.
Completed 200 rows.
Completed 300 rows.
Completed 400 rows.
Completed 500 rows.
Completed 600 rows.
Completed 700 rows.
Completed 800 rows.
Completed 900 rows.
Prediction(predictions=[0.0], deployed_model_id='331819415123263488', metadata=None, model_version_id='1', model_resource_name='projects/650487766623/locations/us-east1/models/7023262463814533120', explanations=None)


In [3]:
import time

while True:

    ENDPOINT_ID = endpoint.resource_name.split("/")[-1]

    table = bigquery.TableReference.from_string(
        f"{PROJECT_ID}.model_deployment_monitoring_{ENDPOINT_ID}.serving_predict"
    )
    rows = bqclient.list_rows(table)
    print(rows.total_rows)
    if rows.total_rows > 0:
        break
    time.sleep(180)

In [15]:
instances = []
for _ in range(1000):
    sepal_length = random.uniform(0.5, 3.5) * 80000.0
    sepal_width = random.uniform(0.2, 2.0)
    petal_length = random.uniform(0.5, 2.0) * 2000
    petal_width = random.uniform(0.2, 1.5)
    instances.append([sepal_length, sepal_width, petal_length, petal_width])

In [16]:
responses = []
for i, instance in enumerate(instances):
    response = endpoint.predict(instances=[instance])
    responses.append(response)
    if i % 100 == 0:
        print(f"Completed {i} rows.")

# print the prediction for the first instance
print(responses[0])

Completed 0 rows.
Completed 100 rows.
Completed 200 rows.
Completed 300 rows.
Completed 400 rows.
Completed 500 rows.
Completed 600 rows.
Completed 700 rows.
Completed 800 rows.
Completed 900 rows.
Prediction(predictions=[2.0], deployed_model_id='7293258539131207680', metadata=None, model_version_id='1', model_resource_name='projects/650487766623/locations/us-east1/models/400719211766218752', explanations=None)


In [None]:

while True:

    ENDPOINT_ID = endpoint.resource_name.split("/")[-1]

    table = bigquery.TableReference.from_string(
        f"{PROJECT_ID}.model_deployment_monitoring_{ENDPOINT_ID}.serving_predict"
    )
    rows = bqclient.list_rows(table)
    print(rows.total_rows)
    if rows.total_rows > 2399:
        break
    time.sleep(180)

2394
2394
2394
2394
2394
2394
2394


KeyboardInterrupt: 

In [6]:
4

4