<img src="https://github.com/pmservice/ai-openscale-tutorials/raw/master/notebooks/images/banner.png" align="left" alt="banner">

# Notebook for generating configuration for batch subscriptions in IBM Watson OpenScale in IBM Cloud Pak for Data v3.5

This notebook shows how to generate the following artefacts:
1. Common configuration JSON needed to configure an IBM Watson OpenScale subscription.
2. Drift Configuration Archive
3. DDLs for creating Feedback, Payload and Drifted Transactions tables

The user needs to provide the necessary inputs (where marked) and download the generated artefacts. These artefacts 
have to be then uploaded to IBM Watson OpenScale UI. 

PS: This notebook can only generate artefacts for one model at a time. For multiple models, this notebook needs to be run for each model separately.

**Contents:**
1. [Installing Dependencies](#Installing-Dependencies)
2. [Select IBM Watson OpenScale Services](#Select-IBM-Watson-OpenScale-Services)
3. [Read sample scoring data](#Read-sample-scoring-data)
4. [Specify Model Inputs](#Specify-Model-Inputs)
5. [Generate Common Configuration JSON](#Generate-Common-Configuration-JSON)
6. [Generate DDL for creating Scored Training data table](#Generate-DDL-for-creating-Scored-Training-data-table)
6. [Generate DDL for creating Feedback table](#Generate-DDL-for-creating-Feedback-table)
7. [Generate DDL for creating Payload table](#Generate-DDL-for-creating-Payload-table)
8. [Provide Spark Connection Details](#Provide-Spark-Connection-Details)
9. [Provide Storage Inputs](#Provide-Storage-Inputs)
10. [Provide Spark Resource Settings [Optional]](#Provide-Spark-Resource-Settings-[Optional])
11. [Provide Additional Spark Settings [Optional]](#Provide-Additional-Spark-Settings-[Optional])
12. [Provide Drift Parameters [Optional]](#Provide-Drift-Parameters-[Optional])
13. [Run Drift Configuration Job](#Run-Drift-Configuration-Job)
14. [Download Drift Archive](#Download-Drift-Archive)
15. [Generate DDL for creating Drifted Transactions Table](#Generate-DDL-for-creating-Drifted-Transactions-table)

### Installing Dependencies

In [None]:
# Note: Restart kernel after the dependencies are installed
!pip install pyspark
!pip install ibm-cos-sdk-core==2.4.4 requests
!pip install ibm-wos-utils~=2.1.1

### Select IBM Watson OpenScale Services

Currently, this notebook has support to generate configuration information related to quality and drift service. 

Details of the service-specific flags available:

- ENABLE_QUALITY: Flag to allow generation of common configuration details needed if quality alone is selected
<!-- - ENABLE_FAIRNESS : Flag to allow generation of fairness specific data distribution needed for configuration -->
<!-- - ENABLE_EXPLAIN : Flag to allow generation of explainability specific information -->
- ENABLE_MODEL_DRIFT: Flag to allow generation of Drift Archive containing relevant information for Model Drift.
- ENABLE_DATA_DRIFT: Flag to allow generation of Drift Archive containing relevant information for Data Drift.



In [None]:
# ----------------------------------------------------------------------------------------------------
# IBM Confidential
# OCO Source Materials
# 5900-A3Q, 5737-J33
# Copyright IBM Corp. 2020
# The source code for this Notebook is not published or other-wise divested of its trade
# secrets, irrespective of what has been deposited with the U.S.Copyright Office.
# ----------------------------------------------------------------------------------------------------

VERSION = 1.0


# Optional Input: Keep an identifiable name. This id is used to append to various table creation DDLs.
# A random UUID is used if this is not present.
# NOTEBOOK_RUN_ID = "some_identifiable_name"
NOTEBOOK_RUN_ID = None


# Service Configuration Flags
ENABLE_QUALITY = True
ENABLE_MODEL_DRIFT = True
ENABLE_DATA_DRIFT = True

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName(
    "Common Configuration Generation").getOrCreate()

### Read sample scoring data

A sample scoring data is required to infer the schema of the complete data, so the size of the sample should be chosen accordingly. 

Additionally, the sample scoring data should have the following fields:
1. Feature Columns
2. Label/Target Column
3. Prediction Column (with same data type as the label column)
4. Probability Column (an array of model probabilities for all the class labels. Not required for regression models)

**STORAGE_FORMAT** : One of ["csv", "parquet", "orc"]

Note: Please select the format in which your training data is stored in Hive. The same format will be used to generate the various CREATE DDLs in this notebook.

In [None]:
STORAGE_FORMAT = "csv"
# STORAGE_FORMAT = "parquet"
# STORAGE_FORMAT = "orc"


The sample data should be of type `pyspark.sql.dataframe.DataFrame`. The cell below gives samples on:
- how to read a CSV file from the local system into a Pyspark Dataframe.
- how to read parquet files in a directory from the local system into a Pyspark Dataframe.
- how to read orc files in a directory from the local system into a Pyspark Dataframe.

It is important that the same storage format is chosen as the training data, otherwise there could be schema mismatches.

In [None]:
if STORAGE_FORMAT == "csv":
    # Load a csv or a directory containing csv files as PySpark DataFrame
    # spark_df = spark.read.csv("/path/to/dir/containing/csv/files", header=True, inferSchema=True)
    pass

elif STORAGE_FORMAT == "parquet":
    # Load a directory containing parquet files as PySpark DataFrame
    # spark_df = spark.read.parquet("/path/to/dir/containing/parquet/files")
    pass
    
elif STORAGE_FORMAT == "orc":
    # Load a directory containing orc files as PySpark DataFrame
    # spark_df = spark.read.orc("/path/to/dir/containing/orc/files")
    pass

else:
    # Load data from any source which matches the schema of the training data
    pass

spark_df.printSchema()

### Specify Model Inputs

#### Specify the Model Type

- Specify **binary** if the model is a binary classifier.
- Specify **multiclass** if the model is a multi-class classifier.
- Specify **regression** if the model is a regressor.

In [None]:
# MODEL_TYPE = "binary"
MODEL_TYPE = "multiclass"
# MODEL_TYPE = "regression"

#### Provide Column Details 

To proceed with this notebook, the following information is required.:

- **LABEL_COLUMN**: The column which contains the target field (also known as label column or the class label).
- **PREDICTION_COLUMN**: The column containing the model output. This should be of the same data type as the label column.
- **PROBABILITY_COLUMN**: The column (of type array) containing the model probabilities for all the possible prediction outcomes. This is not required for regression models.

In [None]:
LABEL_COLUMN = "<label_column>"
PREDICTION_COLUMN = "<model prediction column>"
PROBABILITY_COLUMN = "<model probability column. ignored in case of regression models>"

Based on the sample data and key columns provided above, the notebook will deduce the feature columns and the categorical columns. They will be printed in the output of this cell. If you wish to make changes to them, you can do so in the subsequent cell.

In [None]:
from pyspark.sql.types import BooleanType, StringType

feature_columns = spark_df.columns.copy()
feature_columns.remove(LABEL_COLUMN)
feature_columns.remove(PREDICTION_COLUMN)

if MODEL_TYPE != "regression":
    feature_columns.remove(PROBABILITY_COLUMN)

print("Feature Columns : {}".format(feature_columns))

categorical_columns = [f.name for f in spark_df.schema.fields if isinstance(f.dataType, (BooleanType, StringType)) and f.name in feature_columns]
print("Categorical Columns : {}".format(categorical_columns))

In [None]:
config_info = {
    "problem_type": MODEL_TYPE,
    "label_column": LABEL_COLUMN,
    "prediction": PREDICTION_COLUMN,
    "probability": PROBABILITY_COLUMN
}

config_info["feature_columns"] = feature_columns
config_info["categorical_columns"] = categorical_columns

In [None]:
from ibm_wos_utils.joblib.utils.notebook_utils import validate_config_info
validate_config_info(config_info)

### Generate Common Configuration JSON

IBM Watson OpenScale requires two additional fields - a unique identifier for each record in your feedback/payload tables ("scoring_id") and a timestamp field ("scoring_timestamp") denoting when that record entered the table. These fields are automatically added in the common configuration JSON. 

Please make sure that these fields are present in the respective tables.

In [None]:
from ibm_wos_utils.joblib.utils.notebook_utils import generate_schemas
from ibm_wos_utils.joblib.utils.notebook_utils import create_download_link

common_config = config_info.copy()
common_configuration = generate_schemas(spark_df, common_config)

config_json = {}
config_json["common_configuration"] = common_configuration
config_json["batch_notebook_version"] = VERSION

create_download_link(config_json, "config")

### Generate DDL for creating Scored Training data table

In [None]:
from ibm_wos_utils.joblib.utils.ddl_utils import generate_scored_training_table_ddl

# Database Name where Scored Training Table should be created. If None or "", the default database is used.
SCORED_TRAINING_DATABASE_NAME = None

# Path to the Scored Training Data in HDFS. Leave as None if you wish to load data later.
path_to_hdfs_directory = None

# Additional Table Properties that are required for table creation.
# Please set the table property `skip.header.line.count` as shown 
# if the scored training data is stored as CSV and it contains the header row.
# Leave as None if no additional properties are required.
# table_properties = {
#     "skip.header.line.count": 1
# }
table_properties = None

create_ddl = generate_scored_training_table_ddl(config_json, database_name=SCORED_TRAINING_DATABASE_NAME,\
                                         table_suffix=NOTEBOOK_RUN_ID, stored_as=STORAGE_FORMAT,\
                                         path_to_hdfs_directory=path_to_hdfs_directory,\
                                         table_properties=table_properties)
print(create_ddl)

### Generate DDL for creating Feedback table


In [None]:
from ibm_wos_utils.joblib.utils.ddl_utils import generate_feedback_table_ddl

# Database Name where Feedback Table should be created. If None or "", the default database is used.
FEEDBACK_DATABASE_NAME = None

# Path to the Feedback Data in HDFS. Leave as None if you wish to load data later.
path_to_hdfs_directory = None

# Additional Table Properties that are required for table creation.
# Please set the table property `skip.header.line.count` as shown 
# if the feedback data is stored as CSV and it contains the header row.
# Leave as None if no additional properties are required.
# table_properties = {
#     "skip.header.line.count": 1
# }
table_properties = None

if ENABLE_QUALITY:
    create_ddl = generate_feedback_table_ddl(config_json, database_name=FEEDBACK_DATABASE_NAME,\
                                             table_suffix=NOTEBOOK_RUN_ID, stored_as=STORAGE_FORMAT,\
                                             path_to_hdfs_directory=path_to_hdfs_directory,\
                                             table_properties=table_properties)
    print(create_ddl)

### Generate DDL for creating Payload table


In [None]:
from ibm_wos_utils.joblib.utils.ddl_utils import generate_payload_table_ddl

# Database Name where Payload Table should be created. If None or "", the default database is used.
PAYLOAD_DATABASE_NAME = ""

# Path to the Payload Data in HDFS. Leave as None if you wish to load data later.
path_to_hdfs_directory = None

# Additional Table Properties that are required for table creation.
# Please set the table property `skip.header.line.count` as shown 
# if the payload data is stored as CSV and it contains the header row.
# Leave as None if no additional properties are required.
# table_properties = {
#     "skip.header.line.count": 1
# }
table_properties = None

if ENABLE_MODEL_DRIFT or ENABLE_DATA_DRIFT:
    create_ddl = generate_payload_table_ddl(config_json, database_name=PAYLOAD_DATABASE_NAME,\
                                            table_suffix=NOTEBOOK_RUN_ID, stored_as=STORAGE_FORMAT,\
                                            path_to_hdfs_directory=path_to_hdfs_directory,\
                                            table_properties=table_properties)
    print(create_ddl)

### Provide Spark Connection Details

1. If your job is going to run on Spark cluster as part of an IBM Analytics Engine instance on IBM Cloud Pak for Data, enter the following details:
    
    - **IAE_SPARK_DISPLAY_NAME**: Display Name of the Spark instance in IBM Analytics Engine
    - **IAE_SPARK_JOBS_ENDPOINT**: Spark Jobs Endpoint for IBM Analytics Engine
    - **IBM_CPD_VOLUME**: IBM Cloud Pak for Data storage volume name
    - **IBM_CPD_USERNAME**: IBM Cloud Pak for Data username
    - **IBM_CPD_APIKEY**: IBM Cloud Pak for Data API key


2. If your job is going to run on Spark Cluster as part of a Remote Hadoop Ecosystem, enter the following details:

    - **SPARK_MANAGER_ENDPOINT**: Endpoint URL where the Spark Manager Application is running
    - **SPARK_MANAGER_USERNAME**: Username to connect to Spark Manager Application
    - **SPARK_MANAGER_PASSWORD**: Password to connect to Spark Manager Application

#### Credentials Block for Spark in IAE

In [None]:
from ibm_wos_utils.joblib.utils.constants import SparkType

IAE_SPARK_DISPLAY_NAME = "<Display Name of the Spark instance in IBM Analytics Engine>"
IAE_SPARK_JOBS_ENDPOINT = "<Spark Jobs Endpoint for IBM Analytics Engine>"
IBM_CPD_VOLUME = "<IBM Cloud Pak for Data storage volume name>"
IBM_CPD_USERNAME = "<IBM Cloud Pak for Data username>"
IBM_CPD_APIKEY = "<IBM Cloud Pak for Data API key>"

# Credentials Block for Spark in IAE
credentials = {
    "connection": {
        "display_name": IAE_SPARK_DISPLAY_NAME,
        "endpoint": IAE_SPARK_JOBS_ENDPOINT,
        "location_type": SparkType.IAE_SPARK.value,
        "volume": IBM_CPD_VOLUME
    },
    "credentials": {
        "username": IBM_CPD_USERNAME,
        "apikey": IBM_CPD_APIKEY
    }
}

#### Credentials Block for Spark in Remote Hadoop Ecosystem

In [None]:
from ibm_wos_utils.joblib.utils.constants import SparkType

SPARK_MANAGER_ENDPOINT = "<Endpoint URL where Spark Manager Application is running>"
SPARK_MANAGER_USERNAME = "<Username to connect to Spark Manager Application>"
SPARK_MANAGER_PASSWORD = "<Password to connect to Spark Manager Application>"

# Credentials Block for Spark in Remote Hadoop Ecosystem
credentials = {
    "connection": {
        "endpoint": SPARK_MANAGER_ENDPOINT,
        "location_type": SparkType.REMOTE_SPARK.value
    },
    "credentials": {
        "username": SPARK_MANAGER_USERNAME,
        "password": SPARK_MANAGER_PASSWORD
    }
}

### Provide Storage Inputs

Enter Hive details. 
 - **HIVE_METASTORE_URI**: Thrift URI for Hive Metastore to connect to
 - **TRAINING_DATABASE_NAME**: Name of the Database in Hive that has training table/view
 - **TRAINING_TABLE_NAME**: Name of the Table in HIve that has the scored training data.


In [None]:
HIVE_METASTORE_URI = "<Thrift URI for Hive Metastore to connect to>"
TRAINING_DATABASE_NAME = "<Name of the Database in Hive that has training table/view>"
TRAINING_TABLE_NAME = "<Name of the Table in HIve that has the scored training data>"

In [None]:
storage_details = {
    "type": "hive",
    "connection": {
        "metastore_url": HIVE_METASTORE_URI,
    }
}

tables = [
    {
        "database": TRAINING_DATABASE_NAME,
        "table": TRAINING_TABLE_NAME,
        "type": "training"
    }
]

### Provide Spark Resource Settings [Optional]

Configure how much of your Spark Cluster resources can this job consume. Leave the variable `spark_settings` to `None` or `{}` if no customisation is required.

In [None]:
"""
spark_settings = {
    # max_num_executors: Maximum Number of executors to launch for this session
    "max_num_executors": 2,
    
    # min_executors: Minimum Number of executors to launch for this session
    "min_executors": 1,
    
    # executor_cores: Number of cores to use for each executor
    "executor_cores": 2,
    
    # executor_memory: Amount of memory (in GBs) to use per executor process
    "executor_memory": 1,
    
    #driver_cores: Number of cores to use for the driver process
    "driver_cores": 2,
    
    # driver_memory: Amount of memory (in GBs) to use for the driver process 
    "driver_memory": 1
}
"""
spark_settings = None

### Provide Additional Spark Settings [Optional]

Any other Spark property that can be set via **SparkConf**, provide them in the next cell. These properties are sent to the Spark cluster verbatim. Leave the variable `conf` to `None` or `{}` if no additional property is required.

- [A list of available properties for Spark 2.4.6](https://spark.apache.org/docs/2.4.6/configuration.html#available-properties)

In [None]:
"""
conf = {
    "spark.yarn.maxAppAttempts": 1
}
"""

conf = None

### Provide Drift Parameters [Optional]

Provide the optional drift parameters in this cell. Leave the variable `drift_parameters` to `None` or `{}` if no additional parameter is required.

In [None]:
"""
drift_parameters = {
    "model_drift": {
        # enable_drift_model_tuning - Controls whether there will be Hyper-Parameter 
        # Optimisation in the Drift Detection Model. Default: False
        "enable_drift_model_tuning": True,
        
        # max_bins - Specify the maximum number of categories in categorical columns.
        # Default: OpenScale will determine an approximate value. Use this only in cases
        # where OpenScale approximation fails.
        "max_bins": 10,
    },
    "data_drift": {
        # enable_two_col_learner - Enable learning of data constraints on two column 
        # combinations. Default: True
        "enable_two_col_learner": True,
        
        # categorical_unique_threshold - Used to discard categorical columns with a
        # large number of unique values relative to total rows in the column.
        # Should be between 0 and 1. Default: 0.8
        "categorical_unique_threshold": 0.7,
        
        # max_distinct_categories - Used to discard categorical columns with a large
        # absolute number of unique categories. Also, used for not learning
        # categorical-categorical constraint, if potential combinations of two columns
        # are more than this number. Default: 100000
        "max_distinct_categories": 10000 
    }
}
"""


drift_parameters = None

### Run Drift Configuration Job

In [None]:
SHOW_PROGRESS = True

arguments = {
    "common_configuration": common_configuration,
    "enable_data_drift": ENABLE_DATA_DRIFT,
    "enable_model_drift": ENABLE_MODEL_DRIFT,
    "drift_parameters": drift_parameters,
    "monitoring_run_id": NOTEBOOK_RUN_ID,
    "storage": storage_details,
    "tables": tables,
    "show_progress": SHOW_PROGRESS
}

job_params = {
    "arguments": arguments,
    "spark_settings": spark_settings,
    "dependency_zip": [],
    "conf": conf
}

If `SHOW_PROGRESS` is `True`, the following cell will run the Drift configuration job. It will also print the status of job in the output section. Please wait for the status to be **FINISHED**.

A successful job status goes through the following values:
1. STARTED
2. Model Drift Configuration STARTED
3. Data Drift Configuration STARTED
    - Data Drift: Summary Stats Calculated
    - Data Drift: Column Stats calculated.
    - Data Drift: (number/total) CategoricalDistributionConstraint columns processed
    - Data Drift: (number/total) NumericRangeConstraint columns processed
    - Data Drift: (number/total) CategoricalNumericRangeConstraint columns processed
    - Data Drift: (number/total) CatCatDistributionConstraint columns processed
4. FINISHED

If at anytime there is a failure, you will see a **FAILED** status with an exception trace.

In [None]:
from ibm_wos_utils.joblib.clients.engine_client import EngineClient
from ibm_wos_utils.drift.batch.jobs.drift_configuration import DriftConfiguration
from ibm_wos_utils.joblib.utils.notebook_utils import JobStatus

client = EngineClient(credentials=credentials)
job_response = client.engine.run_job(job_name="Drift_Configuration_Job", job_class=DriftConfiguration,
                                     job_args=job_params, background=True)

# Print Job Status.
if SHOW_PROGRESS:
    JobStatus(client, job_response).print_status()

If `SHOW_PROGRESS` is `False`, you can run the below cell to check the job status at any point manually.

In [None]:
if not SHOW_PROGRESS:
    job_id = job_response.get("id")
    print(client.engine.get_job_status(job_id))

### Download Drift Archive


In [None]:
from tempfile import NamedTemporaryFile
from ibm_wos_utils.joblib.utils.notebook_utils import create_download_link
    
if ENABLE_MODEL_DRIFT or ENABLE_DATA_DRIFT:
    drift_archive = client.engine.get_file(job_response.get(
            "output_file_path") + "/drift_configuration")

    with NamedTemporaryFile() as tf:
        tf.write(drift_archive)
        tf.flush()
        drift_archive = spark.sparkContext.sequenceFile(tf.name).collect()[0][1]

    display(create_download_link(drift_archive, "drift"))

### Generate DDL for creating Drifted Transactions table


In [None]:
from ibm_wos_utils.joblib.utils.ddl_utils import generate_drift_table_ddl

# Database Name where Drifted Transactions Table should be created. If None or "", the default database is used.
DRIFT_DATABASE_NAME = None

if ENABLE_MODEL_DRIFT or ENABLE_DATA_DRIFT:
    print(generate_drift_table_ddl(drift_archive, database_name=DRIFT_DATABASE_NAME, table_suffix=NOTEBOOK_RUN_ID))