In [None]:
%store -r
import os,json, time, operator
from dkube.sdk import *
from dkube.sdk.api import DkubeApi
from dkube.sdk.rsrcs import DkubeModelmonitor
from dkube.sdk.rsrcs.modelmonitor import DatasetClass,ModelType,DriftAlgo, TimeZone
from dkube.sdk.rsrcs.modelmonitor import DatasetFormat,DkubeModelmonitorAlert, DataType

In [None]:
MONITOR_NAME = titanic_d3_config['MONITOR_NAME']
DATA_SOURCE = titanic_d3_config['DATA_SOURCE']
INPUT_TRAIN_TYPE = titanic_d3_config['INPUT_TRAIN_TYPE']
DKUBEUSERNAME = titanic_d3_config['DKUBEUSERNAME']
TOKEN = titanic_d3_config['TOKEN']
DKUBE_URL = titanic_d3_config['DKUBE_URL']
DKUBE_BASE_DATASET = titanic_d3_config['DKUBE_BASE_DATASET']
MODEL_NAME = titanic_d3_config['MODEL_NAME']
RETRAINING_DATASET = titanic_d3_config['RETRAINING_DATASET']
RUN_FREQUENCY = titanic_d3_config['RUN_FREQUENCY']
USE_REMOTE_DEPLOYMENT = titanic_d3_config["USE_REMOTE_DEPLOYMENT"]

In [None]:
api = DkubeApi(URL=os.getenv('DKUBE_URL',DKUBE_URL),token=os.getenv("DKUBE_USER_ACCESS_TOKEN",TOKEN))

In [None]:
def wait_for_deployment_running(deployment_id):
    status = None
    inference_url, inference = None, None
    while True:
        data = api.get_deployment(deployment_id)
        status = data.data.inferenceservice_deployment.parameters.generated.status.state
        inference = data.data.inferenceservice_deployment.parameters.inference
        inference_url = data.data.inferenceservice_deployment.parameters.generated.details.serving.servingurl
        if status == "RUNNING":
            break
        print("waiting for deployment to be running")
        time.sleep(api.wait_interval)
    return inference, inference_url

def get_dataset_version(username, dataset_name, version):
    dataset_versions = api.get_dataset_versions(username, dataset_name)
    versions = []
    for each_version in dataset_versions:
        if each_version["version"]["name"] == version:
            uuid = each_version["version"]["uuid"]
            return f"{version}:{uuid}"
        else:
            versions.append(each_version["version"]["name"])
    return f"dataset version {version} not found, available version are {versions}"

### Fetching deployment

In [None]:
if USE_REMOTE_DEPLOYMENT:
    api.import_deployment(name=MONITOR_NAME)
while True:
    DEPLOYMENT_ID = api.get_deployment_id(name=MONITOR_NAME)
    if DEPLOYMENT_ID:
        break
    print("waiting for deployment to come up")
    time.sleep(api.wait_interval)
if not USE_REMOTE_DEPLOYMENT:
    inference, INFERENCE_URL = wait_for_deployment_running(DEPLOYMENT_ID)
    print("Inference is up at URL: ", INFERENCE_URL)

### Model Monitor

In [None]:
text_file = open("transform_data.py", "r")
#read whole file to a string
script = text_file.read()
#close file
text_file.close()

with open('thresholds.json') as f:
    thresholds = json.load(f)

In [None]:
mm=DkubeModelmonitor(deployemnt_id = DEPLOYMENT_ID)

In [None]:
mm.update_modelmonitor_basics(model_type=ModelType.Classification.value, 
                               input_data_type=DataType.Tabular.value,
                               data_timezone=TimeZone.UTC.value)

In [None]:
mm.add_thresholds(thresholds=thresholds)

## Health Monitoring

In [None]:
# checking if imported cluster has cluster added 
# Or is it a local deployemnt.
deployment = api.get_deployment(DEPLOYMENT_ID)
if ((deployment.data.imported_deployment and deployment.data.imported_deployment.cluster)
    or
    not deployment.data.imported_deployment):
    mm.update_deployment_monitoring_details(enabled=True, frequency=1)

## Add Drift monitoring details

In [None]:
mm.update_drift_monitoring_details(enabled=True,frequency=5,algorithm='auto')

#### TRAINING Details

In [None]:
if DATA_SOURCE == "local" or DATA_SOURCE == "aws-s3":
    training_data = f'{DKUBEUSERNAME}:{DKUBE_BASE_DATASET}'
    train_data_version = 'v1:'+api.get_dataset_versions(
        DKUBEUSERNAME,
        DKUBE_BASE_DATASET)[0]['version']['uuid']
    prediction_data = f"{DKUBEUSERNAME}:{MONITOR_NAME}-predict"
    labelled_data = f"{DKUBEUSERNAME}:{MONITOR_NAME}-groundtruth"

if DATA_SOURCE == 'local':
    predict_data_version = 'v1:'+api.get_dataset_versions(
        DKUBEUSERNAME,
        MONITOR_NAME+'-predict')[0]['version']['uuid']
    labelled_data_version = 'v1:'+api.get_dataset_versions(
        DKUBEUSERNAME,
        MONITOR_NAME+'-groundtruth')[0]['version']['uuid']

predict_data_format = str(DatasetFormat.Tabular)

if DATA_SOURCE == "sql":
    training_data = f'{DKUBEUSERNAME}:{DKUBE_BASE_DATASET}'

### Add Training Dataset

In [None]:
if DATA_SOURCE == 'sql':
    mm.add_datasources(data_class=str(DatasetClass.Train),name=training_data,data_format=str(DatasetFormat.Tabular),sql_query="select * from titanic",transformer_script = script)
    mm.add_datasources(data_class=str(DatasetClass.Predict),name=training_data,data_format=str(DatasetFormat.Tabular),sql_query="select * from titanic_predict",date_suffix="yyyy/mm/dd/hh")
    mm.add_datasources(data_class=str(DatasetClass.Labelled),name=training_data,data_format=str(DatasetFormat.Tabular),sql_query="select * from titanic_gt",predict_col="Survived",groundtruth_col="GT_target",timestamp_col="timestamp")

if DATA_SOURCE == 'local':
    mm.add_datasources(data_class=str(DatasetClass.Train),name=training_data,data_format=str(DatasetFormat.Tabular),version=train_data_version,transformer_script = script)
    mm.add_datasources(data_class=str(DatasetClass.Predict),name=prediction_data,data_format=str(DatasetFormat.Tabular),version=predict_data_version,date_suffix="none")
    mm.add_datasources(data_class=str(DatasetClass.Labelled),name=labelled_data,data_format=str(DatasetFormat.Tabular),version=labelled_data_version,predict_col="Survived",groundtruth_col="GT_target",timestamp_col="timestamp")

if DATA_SOURCE == 'aws-s3':
    mm.add_datasources(data_class=str(DatasetClass.Train),name=training_data,data_format=str(DatasetFormat.Tabular),version=train_data_version,transformer_script = script)
    mm.add_datasources(data_class=str(DatasetClass.Predict),name=prediction_data,data_format=predict_data_format,date_suffix="yyyy/mm/dd/hh")
    mm.add_datasources(data_class=str(DatasetClass.Labelled),name=labelled_data,data_format=str(DatasetFormat.Tabular),predict_col="Survived",groundtruth_col="GT_target",timestamp_col="timestamp")

### Add Performacne monitoring details

In [None]:
mm.update_performance_monitoring_details(enabled=True,source_type="labelled_data",frequency=RUN_FREQUENCY)

### Create Model Monitor

In [None]:
api.modelmonitor_create(mm,wait_for_completion=True)

#### Extracting ID of the model monitor

In [None]:
# The function below can be used to fetch model monitor ID by name.
# The monitor id will be same as deployment id.
# id = api.modelmonitor_get_id(MONITOR_NAME)
id = DEPLOYMENT_ID
print(id)

### Update Schema

In [None]:
api.modelmonitor_update_schema(id,label='Survived',schema_class='categorical',schema_type="prediction_output")
api.modelmonitor_update_schema(id,label='PassengerId',schema_class='continuous',schema_type="row_id")
api.modelmonitor_update_schema(id,label='timestamp',schema_class='continuous',schema_type="timestamp")

api.modelmonitor_update_schema(id,label='Age',schema_class='continuous',schema_type='input_feature', selected=True)
api.modelmonitor_update_schema(id,label='Fare',schema_class='continuous',schema_type='input_feature', selected=True)
api.modelmonitor_update_schema(id,label='SibSp',schema_class='continuous',schema_type='input_feature', selected=True)
api.modelmonitor_update_schema(id,label='Parch',schema_class='continuous',schema_type='input_feature', selected=True)
api.modelmonitor_update_schema(id,label='Pclass',schema_class='categorical',schema_type='input_feature', selected=True)
api.modelmonitor_update_schema(id,label='Sex_male',schema_class='categorical',schema_type='input_feature', selected=True)
api.modelmonitor_update_schema(id,label='Sex_female',schema_class='categorical',schema_type='input_feature', selected=True)


## Adding Alerts

#### Deployment Health Alert

In [None]:
if ((deployment.data.imported_deployment and deployment.data.imported_deployment.cluster)
    or
    not deployment.data.imported_deployment):
    alert = DkubeModelmonitorAlert(name='latency_alert', alert_class = 'deployment_health')
    alert.add_alert_condition(metric='latency_avg',threshold=300, op=operator.gt)
    api.modelmonitor_add_alert(id,alert)

#### Feature Alert

In [None]:
alert = DkubeModelmonitorAlert(name='age_alert', alert_class = 'feature_drift')
alert.add_alert_condition(feature='Age',threshold=0.22, op=operator.gt)
api.modelmonitor_add_alert(id,alert)

#### Performacne Alert

In [None]:
alert = DkubeModelmonitorAlert(name='accuracy_alert', alert_class = 'performance_decay')
alert.add_alert_condition(metric='accuracy',threshold=0.9, op=operator.lt)
api.modelmonitor_add_alert(id,alert)

## Start the Model Monitor

In [None]:
api.modelmonitor_start(id)

## Retraining

In [None]:
%store -r
id = api.modelmonitor_get_id(MONITOR_NAME)

if INPUT_TRAIN_TYPE == 'retraining':
    api.modelmonitor_stop(id)
    
    training_data = f'{RETRAINING_DATASET}:'+DKUBEUSERNAME
    data_dict = api.get_dataset_versions(DKUBEUSERNAME,RETRAINING_DATASET)[0]['version']
    train_data_version = data_dict['name']+":"+data_dict['uuid']
    
    mm=DkubeModelmonitor(name=MONITOR_NAME)
    mm.update_datasources(name=training_data,data_class=str(DatasetClass.Train),version=train_data_version)
    api.modelmonitor_update(id,mm)
    
    
    ### Start the model monitor
    api.modelmonitor_start(id)



#### 8. Cleanup

In [None]:
CLEANUP = False
if CLEANUP:
    from time import sleep
    RETRIES = 4
    while RETRIES:
        mm = api.modelmonitor_get(id)
        if mm["status"] and mm["status"]["state"].lower() != "active":
            break
        elif mm["status"] and mm["status"]["state"].lower() == "active":
            api.modelmonitor_stop(id)
        RETRIES -= 1
        sleep(5)
    else:
        raise TimeoutError("modelmonitor failed to stopped")
    api.modelmonitor_delete(id)