<img src="https://github.com/pmservice/ai-openscale-tutorials/raw/master/notebooks/images/banner.png" align="left" alt="banner">

In [None]:
import warnings
warnings.filterwarnings('ignore')
%env PIP_DISABLE_PIP_VERSION_CHECK=1

In [None]:
import sys

PYTHON = sys.executable

In [None]:
!$PYTHON -m pip install --no-warn-conflicts --extra-index-url https://test.pypi.org/simple/ --upgrade tabulate ibm-cloud-sdk-core ibm-watson-openscale ibm-wos-utils | tail -n 1

In [None]:
!$PYTHON -m pip show ibm-wos-utils ibm-watson-openscale ibm-cloud-sdk-core

In [None]:
WOS_CREDENTIALS = {
    "url": "***",
    "username": "***",
    "password": "***"
}

DATAMART_ID = "***" # default is 00000000-0000-0000-0000-000000000000
SUBSCRIPTION_ID = "***"
MONITOR_INSTANCE_ID = "***"

HIVE_METASTORE_URI = "***"

In [None]:
from ibm_cloud_sdk_core.authenticators import CloudPakForDataAuthenticator
from ibm_watson_openscale import APIClient

authenticator = CloudPakForDataAuthenticator(
        url=WOS_CREDENTIALS["url"],
        username=WOS_CREDENTIALS["username"],
        password=WOS_CREDENTIALS["password"],
        disable_ssl_verification=True
    )
wos_client = APIClient(authenticator=authenticator, service_url=WOS_CREDENTIALS["url"])

In [None]:
wos_client.monitor_instances.show()

In [None]:
subscription = wos_client.subscriptions.get(subscription_id=SUBSCRIPTION_ID).result
monitor_instance = wos_client.monitor_instances.get(monitor_instance_id=MONITOR_INSTANCE_ID).result

model_drift_enabled = monitor_instance.entity.parameters.get("model_drift_enabled", False)
data_drift_enabled = monitor_instance.entity.parameters.get("data_drift_enabled", False)

In [None]:
drift_archive = wos_client.monitor_instances.download_drift_model(monitor_instance_id=MONITOR_INSTANCE_ID).result.content

In [None]:

import io
import json
import tarfile
import tempfile
import zipfile

from ibm_wos_utils.drift.batch.constraints.entity import DataConstraintSet
from ibm_wos_utils.drift.batch.constraints.schema import \
    DriftedTransactionsSchema

ddm_properties = None
constraints_set = None
schema = None

with tempfile.TemporaryDirectory() as tmp:
    member = "archive.tar.gz"
    with zipfile.ZipFile(io.BytesIO(drift_archive)) as zf:
        zf.extract(member, tmp)
    with tarfile.open(tmp + "/" + member, mode="r:gz") as tar:
        schema_json = json.load(tar.extractfile("drifted_transactions_schema.json"))
        schema = DriftedTransactionsSchema()
        schema.from_json(schema_json)
        
        if model_drift_enabled:
            ddm_properties = json.load(tar.extractfile("ddm_properties.json"))
        
        if data_drift_enabled:
            constraints_json = json.load(tar.extractfile("data_drift_constraints.json"))
            constraints_set = DataConstraintSet()
            constraints_set.from_json(constraints_json)


In [None]:
import pandas as pd

def get_last_n_drift_measurements(n, client, subscription_id):
    measurements = client.monitor_instances.measurements.query(target_id=subscription_id, monitor_definition_id="drift", recent_count=n).result.measurements
    results = []
    for measurement in measurements:
        results.append([measurement.metadata.id, measurement.entity.run_id, measurement.entity.timestamp])
    results = pd.DataFrame(results, columns=["Measurement ID", "Monitor Run ID", "Timestamp"])
    results.sort_values(by="Timestamp", ascending=False, inplace=True)
    return results

results = get_last_n_drift_measurements(15, wos_client, SUBSCRIPTION_ID)
results

In [None]:
MEASUREMENT_ID = "***"

In [None]:
measurement = wos_client.monitor_instances.measurements.get(measurement_id=MEASUREMENT_ID, monitor_instance_id=MONITOR_INSTANCE_ID).result
measurement_data = measurement.entity.sources[0].data
MONITOR_RUN_ID = measurement.entity.run_id
MONITOR_RUN_ID

In [None]:
print("IBM Watson OpenScale analyzed {} transactions between {} and {} for drift. Here's a summary.".format(measurement_data["transactions_count"], measurement_data["start"], measurement_data["end"]))

if model_drift_enabled:
    print("  - Total {} transactions out of {} transactions are causing drop in accuracy.".format(measurement_data["drifted_transactions"]["count"], measurement_data["transactions_count"]))

if data_drift_enabled:
    print("  - Total {} transactions out of {} transactions are causing drop in data consistency.".format(measurement_data["data_drifted_transactions"]["count"], measurement_data["transactions_count"]))
    
if model_drift_enabled and data_drift_enabled:
    print("  - Total {} transactions out of {} transactions are causing both drop in accuracy and drop in data consistency.".format(measurement_data["model_data_drifted_transactions"]["count"], measurement_data["transactions_count"]))

In [None]:
if model_drift_enabled:
    display(pd.DataFrame(measurement_data["drifted_transactions"]["drift_model_confidence_count"]))

In [None]:
if data_drift_enabled:
    display(pd.Series(measurement_data["data_drifted_transactions"]["features_count"]).sort_values(ascending=False))

In [None]:
if data_drift_enabled:
    display(pd.Series(measurement_data["data_drifted_transactions"]["constraints_count"]).sort_values(ascending=False))

In [None]:
from pyspark import SparkConf
from pyspark.sql import SparkSession

conf = SparkConf()\
        .setAppName("Analyze Drifted Transactions")\
        .set("spark.hadoop.hive.metastore.uris", HIVE_METASTORE_URI)
spark = SparkSession.builder.config(conf=conf).enableHiveSupport().getOrCreate()


In [None]:
def get_table_details_from_subscription(subscription, table_type):
    data_source = [source for source in subscription.entity.data_sources if source.type == table_type]
    if len(data_source) == 0:
        raise Exception("Details not found for data source type: {} in subscription.".format(table_type))
    data_source = data_source[0]
    
    return data_source.database_name, data_source.schema_name, data_source.table_name
    

In [None]:
payload_database_name, _, payload_table_name = get_table_details_from_subscription(subscription, "payload")
drift_database_name, _, drift_table_name = get_table_details_from_subscription(subscription, "drift")


In [None]:
drifted_txns_df = spark.sql("select * from {}.{} where run_id = '{}'".format(drift_database_name, drift_table_name, MONITOR_RUN_ID))
drifted_txns_df.printSchema()

In [None]:
payload_txns_df = spark.sql("select * from {}.{}".format(payload_database_name, payload_table_name))
payload_txns_df.printSchema()

In [None]:
%%time
print("Total number of drifted transactions: {}".format(drifted_txns_df.count()))
print("Total number of model drift transactions: {}".format(drifted_txns_df.where("is_model_drift").count()))
print("Total number of data drift transactions: {}".format(drifted_txns_df.where("is_data_drift").count()))
print("Total number of model + data drift transactions: {}".format(drifted_txns_df.where("is_model_drift").where("is_data_drift").count()))
print()

In [None]:
from tabulate import tabulate
from IPython.display import HTML

# TODO move this to joblib

def show_dataframe(spark_df, num_rows = 10, priority_columns = []):
    show_df = spark_df.limit(num_rows).toPandas()
    original_columns = list(show_df.columns)
    new_columns = []
    priority_columns += ["scoring_id", "scoring_timestamp"]
    for column in priority_columns:
        if column in original_columns:
            new_columns.append(column)
            original_columns.remove(column)
    new_columns += original_columns
    return HTML(tabulate(show_df[new_columns], headers=new_columns, tablefmt="html"))

In [None]:
%%time

dm_conf_lower = 0.5
dm_conf_upper = 0.6

result = drifted_txns_df\
    .where("is_model_drift")\
    .where(drifted_txns_df.drift_model_confidence.between(dm_conf_lower,dm_conf_upper))\
    .select(["scoring_id","drift_model_confidence"])

count = result.count()

print("Total {} transactions are causing drop in accuracy where drift model confidence is between {} and {}".format(count, dm_conf_lower, dm_conf_upper))

if count:
    print("Showing 10 such transactions in the order of drift_model_confidence")

    result = payload_txns_df\
        .join(result, ["scoring_id"], "leftsemi")\
        .join(result, ["scoring_id"], "left")\
        .sort(["drift_model_confidence"], ascending=False)

    display(show_dataframe(result, priority_columns=["drift_model_confidence"]))


In [None]:
from ibm_wos_utils.drift.batch.util.constants import ConstraintName
# from ibm_wos_utils
import pyspark.sql.functions as F
from itertools import product
import hashlib

# Scenarios:
# SELECT * FROM drifted_annotations WHERE categorical LIKE '%1%' 


# def get_constraint_id(constraint_name, columns):
#     return hashlib.sha224(
#             bytes(",".join([constraint_name.value] + sorted(map(lambda x: x.lower(), columns))), "utf-8")).hexdigest()


def get_constraint_id(constraint_name, columns):
    return hashlib.sha224(
            bytes(",".join([constraint_name.value] + sorted(columns)), "utf-8")).hexdigest()

def get_constraints_for_column(column):
    return {cid: constraint for cid, constraint in constraints_set.constraints.items()
            if column.lower() in map(lambda x: x.lower(), constraint.columns)}


def get_bitmap(constraints_set, schema, constraint_names=[], columns=[]):
    if not constraint_names and not columns:
        raise Exception("Need either constraint_names or columns to create a bitmap.")

    valid_constraints = constraint_names.copy()
    if len(valid_constraints) == 0:
        valid_constraints = list(ConstraintName)

    bitmap = {key: ["_"] * len(value) for key,value in schema.bitmap.items()}
    for column in columns:
        learnt_constraints = get_constraints_for_column(column)
        for ctr_id, ctr in learnt_constraints.items():
            if ctr.name in valid_constraints:
                idx = schema.bitmap[ctr.name.value].index(ctr_id)
                bitmap[ctr.name.value][idx] = "1"
                
    return bitmap


def get_query(constraints_set, schema, constraint_names=[], columns=[], operation = "or"):
    if not constraint_names and not columns:
        raise Exception("Need either constraint_names or columns to create a query.")
    
    if operation not in ("or", "and"):
        raise Exception("Unsupported operation '{}' passed as an argument.".format(operation))
    
    subqueries = []
    if not columns:
        for constraint_name in constraint_names:
            subqueries.append(F.col(constraint_name.value).like("%1%"))

    else:
        bitmap = get_bitmap(constraints_set, schema, constraint_names, columns)
        for name, values in bitmap.items():
            if "1" in values:
                subqueries.append(F.col(name).like("".join(values)))
        
    if not subqueries:
        return
    
    result = subqueries.pop()
    
    for subquery in subqueries:
        result = (result | subquery) if operation == "or" else (result & subquery)
    
    return result

def get_query(constraints_set, schema, constraint):
    

In [None]:
%%time

filter_query = get_query(constraints_set, schema, constraints=[ConstraintName.CATEGORICAL_DISTRIBUTION_CONSTRAINT], operation="or")
print(filter_query)

result = drifted_txns_df\
    .where("is_data_drift")\
    .where(filter_query)\
    .select(["scoring_id"])
print(result.explain())

count = result.count()

print("Total {} transactions are satisfying the given query.".format(count))

if count:
    print("Showing 10 such transactions.")

    result = payload_txns_df\
        .join(result, ["scoring_id"], "leftsemi")\
        .join(result, ["scoring_id"], "left")\

    display(show_dataframe(result, priority_columns=["drift_model_confidence"]))


In [None]:
%%time

filter_query = get_query(constraints_set, schema, constraints=[ConstraintName.CATEGORICAL_DISTRIBUTION_CONSTRAINT], columns=["checkingstatus"], operation="or")

result = drifted_txns_df\
    .where("is_data_drift")\
    .where(filter_query)\
    .select(["scoring_id"])
count = result.count()

print("Total {} transactions are satisfying the given query.".format(count))

if count:
    print("Showing 10 such transactions.")

    result = payload_txns_df\
        .join(result, ["scoring_id"], "leftsemi")\
        .join(result, ["scoring_id"], "left")\

    display(show_dataframe(result, priority_columns=["drift_model_confidence"]))


In [None]:
filter_query