In [1]:
%store -r
import time
import os,json
import requests, operator
from dkube.sdk import *
from dkube.sdk.api import DkubeApi
from dkube.sdk.rsrcs import DkubeModelmonitor
from dkube.sdk.rsrcs.operator import DkubeCluster
from dkube.sdk.rsrcs.modelmonitor import DatasetClass,ModelType,DriftAlgo, DataType
from dkube.sdk.rsrcs.modelmonitor import DatasetFormat,DkubeModelmonitorAlert, TimeZone

In [2]:
MONITOR_NAME = cld_config['MONITOR_NAME']
INPUT_TRAIN_TYPE = cld_config['INPUT_TRAIN_TYPE']
SERVING_DKUBE_USERNAME = cld_config['SERVING_DKUBE_USERNAME']
SERVING_DKUBE_TOKEN = cld_config['SERVING_DKUBE_TOKEN']
SERVING_DKUBE_URL = cld_config['SERVING_DKUBE_URL']
DKUBE_BASE_DATASET = cld_config['DKUBE_BASE_DATASET']
MODEL_NAME = cld_config['MODEL_NAME']
RUN_FREQUENCY = cld_config['RUN_FREQUENCY']
LIVE_DATASET = cld_config['LIVE_DATASET']

MONITORING_DKUBE_USERNAME = cld_config["MONITORING_DKUBE_USERNAME"]
MONITORING_DKUBE_TOKEN = cld_config["MONITORING_DKUBE_TOKEN"]
MONITORING_DKUBE_URL = cld_config["MONITORING_DKUBE_URL"]
SERVING_DKUBE_CLUSTER_NAME = cld_config["SERVING_DKUBE_CLUSTER_NAME"]
SERVING_DEPLOYMENT_ID = cld_config["SERVING_DEPLOYMENT_ID"]

In [3]:
serving_api = DkubeApi(URL=SERVING_DKUBE_URL,token=SERVING_DKUBE_TOKEN)

In [4]:
def get_dataset_version(username, dataset_name, version):
    dataset_versions = monitoring_api.get_dataset_versions(username, dataset_name)
    versions = []
    for each_version in dataset_versions:
        if each_version["version"]["name"] == version:
            uuid = each_version["version"]["uuid"]
            return f"{version}:{uuid}"
        else:
            versions.append(each_version["version"]["name"])
    return f"dataset version {version} not found, available version are {versions}"

## Checking for seperate monitoring cluster and adding cluster and deployment to the monitoring cluster.

In [5]:
if MONITORING_DKUBE_URL:
    monitoring_api = DkubeApi(URL=MONITORING_DKUBE_URL,token=MONITORING_DKUBE_TOKEN)
    DKUBEUSERNAME = MONITORING_DKUBE_USERNAME
    ### adding cluster
    pcluster = DkubeCluster(name=SERVING_DKUBE_CLUSTER_NAME)
    pcluster.update_kind("dkube-remote")
    pcluster.update_class("monitoring")
    pcluster.update_authtype("jwt")
    pcluster.update_url(url=SERVING_DKUBE_URL)
    pcluster.update_jwt_details(jwt_token=SERVING_DKUBE_TOKEN)
    try:
        monitoring_api.configure_clusters(pcluster.cluster)
        print("Cluster added")
    except Exception as e:
        print(e)
    ## Importing deployment
    try:
        DEPLOYMENT_ID = monitoring_api.import_deployment(name=MONITOR_NAME,
                                                         cluster=SERVING_DKUBE_CLUSTER_NAME,
                                                         namespace=SERVING_DKUBE_USERNAME)
        print("Deployment Imported")
    except Exception as e:
        print(e)
        
else:
    monitoring_api = serving_api
    DKUBEUSERNAME = SERVING_DKUBE_USERNAME
    DEPLOYMENT_ID = SERVING_DEPLOYMENT_ID

In [6]:
text_file = open("mm-transformer.py", "r")
#read whole file to a string
script = text_file.read()
#close file
text_file.close()

with open('thresholds.json') as f:
    thresholds = json.load(f)

In [7]:
training_data = f'{DKUBEUSERNAME}:{DKUBE_BASE_DATASET}'
train_data_version = get_dataset_version(DKUBEUSERNAME,
                                            DKUBE_BASE_DATASET, "v1")
labelled_data = f"{DKUBEUSERNAME}:{LIVE_DATASET}"
predict_data_format = "cloudeventlogs"

### Model Monitor

In [8]:
mm=DkubeModelmonitor(deployemnt_id = DEPLOYMENT_ID)

In [9]:
mm.update_modelmonitor_basics(model_type=ModelType.Regression.value, 
                               input_data_type=DataType.Tabular.value,
                               data_timezone=TimeZone.UTC.value)

In [10]:
mm.add_thresholds(thresholds=thresholds)

## Health Monitoring

In [11]:
mm.update_deployment_monitoring_details(enabled=True, frequency=1)

## Add Drift monitoring details

In [12]:
mm.update_drift_monitoring_details(enabled=True,frequency=5,algorithm='auto')

### Add Train, Prediction, and Labelled Datasets

In [13]:
mm.add_datasources(data_class=str(DatasetClass.Train),name=training_data,
                   data_format="tabular",
                   version=train_data_version,transformer_script = script)
if MONITORING_DKUBE_URL:
    mm.add_datasources(data_class=str(DatasetClass.Predict), name=labelled_data,
                       data_format=predict_data_format, s3_subpath=SERVING_DEPLOYMENT_ID)
else:
    mm.add_datasources(data_class=str(DatasetClass.Predict),
                       data_format=predict_data_format,)

mm.add_datasources(data_class=str(DatasetClass.Labelled),name=labelled_data,
                   data_format="tabular", s3_subpath=SERVING_DEPLOYMENT_ID + "/livedata",
                   predict_col="charges",groundtruth_col="GT_target",timestamp_col="timestamp")

### Performance Monitoring

In [14]:
mm.update_performance_monitoring_details(enabled=True,source_type="labelled_data",frequency=5)

### Create Model monitor

In [15]:
monitoring_api.modelmonitor_create(mm,wait_for_completion=True)

ModelMonitor larryc1200-res2 - waiting for completion, current state baselining
ModelMonitor larryc1200-res2 - waiting for completion, current state baselining
ModelMonitor larryc1200-res2 - waiting for completion, current state baselining
ModelMonitor larryc1200-res2 - is in pending state and reason please confirm input output and row id in schema


'dkube-insurance-pl-vz48l-100953785'

### Extracting id of the Model Monitor

In [16]:
# The function below can be used to fetch model monitor ID by name.
# The monitor id will be same as deployment id.
# id = monitoring_api.modelmonitor_get_id(MONITOR_NAME)
id = DEPLOYMENT_ID
print(id)

dkube-insurance-pl-vz48l-100953785


### Schema update

In [17]:
monitoring_api.modelmonitor_update_schema(id,label='charges',schema_class='continuous',schema_type="prediction_output",selected=False)
monitoring_api.modelmonitor_update_schema(id,label='unique_id',schema_class='continuous',schema_type="row_id",selected=False)
monitoring_api.modelmonitor_update_schema(id,label='timestamp',schema_class='continuous',schema_type="timestamp",selected=False)

## age and bmi to continuous
monitoring_api.modelmonitor_update_schema(id,label='age',schema_class='continuous',schema_type='input_feature',selected=True)
monitoring_api.modelmonitor_update_schema(id,label='bmi',schema_class='continuous',schema_type='input_feature',selected=True)

## select these features
monitoring_api.modelmonitor_update_schema(id,label='sex',schema_type='input_feature',schema_class='categorical',selected=True)
monitoring_api.modelmonitor_update_schema(id,label='children',schema_type='input_feature',schema_class='categorical',selected=True)
monitoring_api.modelmonitor_update_schema(id,label='smoker',schema_type='input_feature',schema_class='categorical',selected=True)
monitoring_api.modelmonitor_update_schema(id,label='region',schema_type='input_feature',schema_class='categorical',selected=True)

{'code': 200,
 'message': 'Modelmonitor update successful',
 'uuid': None,
 'name': None}

### Add alerts

#### Deployment Health Alert

In [18]:
alert = DkubeModelmonitorAlert(name='latency_alert', alert_class = 'deployment_health')
alert.add_alert_condition(metric='latency_avg',threshold=300, op=operator.gt)
monitoring_api.modelmonitor_add_alert(id,alert)

{'response': {'code': 200,
  'message': 'Modelmonitor alert creation successful',
  'uuid': None,
  'name': None}}

#### Feature Alert

In [19]:
alert = DkubeModelmonitorAlert(name='age_alert', alert_class = 'feature_drift')
alert.add_alert_condition(feature='age',threshold=0.02, op=operator.lt)
monitoring_api.modelmonitor_add_alert(id,alert)

{'response': {'code': 200,
  'message': 'Modelmonitor alert creation successful',
  'uuid': None,
  'name': None}}

#### Performance Alert

In [20]:
alert = DkubeModelmonitorAlert(name='mae_alert', alert_class = 'performance_decay')
alert.add_alert_condition(metric='mae',threshold=2000, op=operator.gt)
monitoring_api.modelmonitor_add_alert(id,alert)

{'response': {'code': 200,
  'message': 'Modelmonitor alert creation successful',
  'uuid': None,
  'name': None}}

### Start the model monitor

In [21]:
monitoring_api.modelmonitor_start(id)

ModelMonitor larryc1200-res2 - is in starting state
ModelMonitor larryc1200-res2 - is in starting state
ModelMonitor larryc1200-res2 - is in starting state
ModelMonitor larryc1200-res2 - is in starting state


{'response': {'code': 200,
  'message': 'Modelmonitor update successful',
  'uuid': 'dkube-insurance-pl-vz48l-100953785',
  'name': 'larryc1200-res2'}}

### Retraining / Rebaselining Model Monitor

### Cleanup

In [22]:
CLEANUP = False
if CLEANUP:
    from time import sleep
    RETRIES = 4
    while RETRIES:
        mm = monitoring_api.modelmonitor_get(id)
        if mm["status"] and mm["status"]["state"].lower() != "active":
            break
        elif mm["status"] and mm["status"]["state"].lower() == "active":
            monitoring_api.modelmonitor_stop(id)
        RETRIES -= 1
        sleep(5)
    else:
        raise TimeoutError("modelmonitor failed to stopped")
    monitoring_api.modelmonitor_delete(id)