# Create a Watson Machine Learning model for term assignment

This notebook demonstrates how a model and scoring function deployed to Watson Machine Learning can be used for automatic term assignment in Metadata Enrichments in IBM Cloud Pak for Data.
The classification approach used by this notebook is based on assumptions that simplify some aspects of the classification task
to keep the code reasonably small and simple:
- ML-based automatic term assignment predicts terms for tables and columns.
- Because column metadata tends to be sparse (often just table and column names), term metadata is optionally treated as supplemental training input.
- Only textual metadata is considered. It is mapped to sequences of tokens supporting a bag-of-words text classification approach.
- The classifier used in this notebook runs in _multi-class_ mode while a _multi-label_ mode might be more appropriate.
- This notebook uses a simple approach to scale probabilities such that they can be compared with results from the other term assignment methods.

This is a quick overview of the steps in this notebook:
1. Define settings and parameters.
2. Create a custom library with logic for feature preparation and scoring.
3. Extract metadata from a Watson Knowledge Catalog project or catalog for training.
4. Train (and test) a model based on a scikit-learn pipeline involving the custom preprocessing library, a vectorizer, and a classifier.
5. Deploy the custom library and model to Watson Machine Learning.
6. Create and deploy a custom scoring function supporting multiple assignments per data asset.
7. Display the settings to enable the metadata enrichment of a project to assign terms based on the deployed ML artifacts.

All steps in this notebook must be run in sequence. Steps marked 'optional' are recommended but do not need to be run. A kernel restart requires rerunning the required steps from the top.

## Prerequisites

- Published business terms
- A catalog or project with data assets that have terms assigned. This serves as input for training. For projects, the notebook considers only assets reviewed with a Metadata Enrichment. For catalogs, it considers all term assignments.
- A Watson Machine Learning deployment space.
- If the notebook is not running inside IBM Watson Studio: User ID and password of a user with access to the catalog/project and the deployment space.

## Set up this notebook

Enter the required settings.

Recommendation: Consider replacing the calls to getpass.getpass() and input() with clear text values if appropriate to avoid having to enter them each time you run the notebook.

In [None]:
cp4d_url = input("Enter your Cloud Pak for Data cluster URL (example: https://cpd-wkc.apps.example.com): ")

import pkgutil

if pkgutil.find_loader('ibm_watson_studio_lib') is not None:
    from ibm_watson_studio_lib import access_project_or_space
    wslib = access_project_or_space()
    wkc_token = wslib.auth.get_current_token()

else:
    import requests
    import getpass
    import os

    cp4d_username = input("Enter your Cloud Pak for Data user ID: ")
    cp4d_password = getpass.getpass("Enter your Cloud Pak for Data password: ")

    def get_authentication_token():
        print("Authenticating")
        auth_response = requests.get(cp4d_url + "/v1/preauth/validateAuth", headers={"username": cp4d_username, "password": cp4d_password}, verify=False)
        auth_response.raise_for_status()
        print("Authentication successful")
        wkc_token = auth_response.json()['accessToken']
        return wkc_token
    wkc_token = get_authentication_token()

HIGHLIGHT = "\033[1;31;43m" # Color coding to highlight important messages

from ibm_watson_machine_learning import APIClient

def initialize_wml_client(wkc_token):
    api_client_auth = { 'token': wkc_token, 'url': cp4d_url, "instance_id": "openshift", 'version': '4.5' }   
    print("Creating WML client")
    wml_client = APIClient(api_client_auth)
    return wml_client

wml_client = initialize_wml_client(wkc_token)
print(f"Notebook configured.")

#### Optional: List available deployment spaces

In [None]:
wml_client.spaces.list(limit=50)

#### Optional: List available projects

In [None]:
import requests

def list_projects():
    projects_response = requests.get(f"{cp4d_url}/v2/projects", headers={"Authorization": f"Bearer {wkc_token}"}, verify=False)
    projects_response.raise_for_status()
    projects_json = projects_response.json()
    if 'resources' in projects_json:
        print("------------------------------------  -------------------------")
        print("ID                                    NAME")
        for project_info in projects_json['resources']:
            if 'entity' in project_info and 'name' in project_info['entity'] and 'metadata' in project_info and 'guid' in project_info['metadata']:
                print(f"{project_info['metadata']['guid']}  {project_info['entity']['name']}")
        print("------------------------------------  -------------------------")

list_projects()    

#### Optional: List available catalogs

In [None]:
import requests

def list_catalogs():
    catalogs_response = requests.get(f"{cp4d_url}/v2/catalogs", headers={"Authorization": f"Bearer {wkc_token}"}, verify=False)
    catalogs_response.raise_for_status()
    catalogs_json = catalogs_response.json()
    if 'catalogs' in catalogs_json:
        print("------------------------------------  -------------------------")
        print("ID                                    NAME")
        for catalog_info in catalogs_json['catalogs']:
            if 'entity' in catalog_info and 'name' in catalog_info['entity'] and 'metadata' in catalog_info and 'guid' in catalog_info['metadata']:
                print(f"{catalog_info['metadata']['guid']}  {catalog_info['entity']['name']}")
        print("------------------------------------  -------------------------")

list_catalogs()   

### Set deployment space ID and ID of training source (catalog or project)

In [None]:
# ID of deployment space for artifacts created by this notebook
wml_space_id = input("Enter the ID of your Watson Machine Learning deployment space: ")
wml_client.set.default_space(wml_space_id)

# ID of catalog/project that will be used for training
training_source = input("Source for training: Enter c for catalog or p for project.")
if training_source != "c" and training_source != "p":
    print(f"{HIGHLIGHT} Source for training must be 'c' or 'p'")
training_source_type = 'catalog' if training_source == "c" else 'project'
training_source_id = input(f"Enter the ID of the {training_source_type} to be used for training: ")

print(f"Artifacts will be deployed to space {wml_space_id}.")
print(f"Training will be based on assigned/rejected terms in {training_source_type} {training_source_id}.")

### Define ML parameters

Define the parameters for training and scoring. They can be adjusted as needed.

#### Parameters for training
- `metadata_scope`: 
  Defines the amount of term metadata that is retrieved as input for training. This parameter can have one of the following values:
  - `names_of_assigned_terms`:
    Only the names of terms that are assigned to assets are considered. The advantage of this option is that all required
    metadata is retrieved from Watson Knowledge Catalog with a single request. However, it disregards additional metadata such as categories or term descriptions
    that might help to improve the quality of the model. To include this additional meta data, choose one of the other two options.
  - `metadata_of_all_terms`: 
    The metadata of all business terms as relevant for training is considered, irrespective of whether these terms are
    assigned to assets or not. This is the option of choice if no terms are assigned or if the number of assigned terms is small.
  - `metadata_of_assigned_terms`: 
    The metadata of terms that are assigned to assets is considered. This is usually a good choice because it enhances the training
    data for assigned assets with additional term metadata such as categories and descriptions.
- `reviewed_only`: 
  If set to True, only manual assignments and assignments on tables or columns that have been reviewed are used for training. If set to False, all assignments are used for training. The default setting is True. This option is only valid if a project is used as the training source. 

#### Parameters for scoring
- `max_num_assignments`:
  Limits the number of assignments returned by scoring. Due to the multi-class approach, the sum of confidences for the returned assignments is
  always 1.0. Thus, a larger value leads to smaller confidences per assignment, which lowers the impact of ML-based term assignment compared to the other methods. 
- `assignment_threshold`:
  Limits the number of assignments returned by scoring to those with a confidence matching or exceeding this value. A higher `max_num_assignments` value
  usually demands a lower `assignment_threshold` value. Otherwise, reasonable assignment might be filtered out.

#### Parameters for feature selection (training and scoring)
- `asset_metadata`: 
  Defines the asset metadata used for training and scoring as well as the order in which metadata is mapped to features. The order is important because
  it can help to preserve important information in the form of n-grams. An example of an important bigram is the combination of a table name and a column name. See the parameter `ngram_range`
  under `feature_mapping` for details on how to enable bigrams.
  
  The parameter value can be one or more of these keys: `schema_name`, `table_name`, `column_name`, `asset_description`.
- `term_metadata`:
  Serves same purpose for terms as `asset_metadata` for assets. The parameter value can be one or more of these keys: `category`, `term_name`, `term_description`. If the training parameter `metadata_scope` is set to `names_of_assigned_terms`, only [] or [`term_name`] are allowed.

#### Parameters for feature mapping
These parameters guide the creation of features from asset and/or term metadata. They are used to configure the scikit-learn CountVectorizer. Defaults are used for parameters not explicitly defined here. See the scikit-learn documentation for details. The parameters are typically used to balance quality versus performance.
- `ngram_range`: Binary tuple defining the lower and upper bound for n-gram usage.
- `min_df`: Absolute or proportional value enabling the vectorizer to disregard tokens that occur in the metadata of a low number of assets/terms.
- `max_df`: Absolute or proportional value enabling the vectorizer to disregard tokens that occur in the metadata of many assets/terms.
- `max_features`: Limits the number of features created from the input metadata.

In [None]:
import json

# Change values as appropriate (see section above)
parameters = {
    "training": {
        "metadata_scope": "metadata_of_assigned_terms",
        "reviewed_only": True
    },
    "scoring": {
        "max_num_assignments": 2,
        "assignment_threshold": 0.4
    },
    "feature_selection": {
        "term_metadata":  ["category",   "term_name",  "term_description"],
        "asset_metadata": ["table_name", "column_name"]
    },
    "feature_mapping": {
        "ngram_range": (1, 2),
        "min_df": 0,
        "max_df":  1.0,
        "max_features": 50000
    }
}

def parameter_values_are_valid():
    messages = []
    metadata_scope = parameters['training']['metadata_scope']
    if metadata_scope not in ['names_of_assigned_terms', 'metadata_of_assigned_terms', 'metadata_of_all_terms']:
        messages.append(f"Invalid training parameter value for 'metadata_scope': {parameters['training']['metadata_scope']}")
    if len(parameters['feature_selection']['asset_metadata']) == 0:
        messages.append("The feature selection parameter 'asset_metadata' must not be empty")
    if  not all( v in ['schema_name', 'table_name', 'column_name', 'asset_description'] for v in parameters['feature_selection']['asset_metadata'] ):
        messages.append(f"Invalid feature selection parameter value in 'asset_metadata': {parameters['feature_selection']['asset_metadata']}")
    term_metadata = parameters['feature_selection']['term_metadata']
    if not all(v in ['category', 'term_name', 'term_description'] for v in term_metadata ):
        messages.append(f"Invalid feature selection parameter value in 'term_metadata': {term_metadata}")
    if metadata_scope == 'names_of_assigned_terms' and ( len(term_metadata) > 1 or (len(term_metadata) == 1 and 'term_name' not in term_metadata) ):
        messages.append(f"The feature selection parameter 'term_metadata' must be [] or ['term_name'] if the training parameter 'metadata_scope' is set to 'names_of_assigned_terms'" )
    if parameters['scoring']['assignment_threshold'] < 0 or parameters['scoring']['assignment_threshold'] > 1:
        messages.append("The scoring parameter assignment_threshold must be in [0..1]")
    if messages:
        print("Found invalid parameters:")
        for msg in messages:
            print(f"\t {msg}")
        return False
    return True

if parameter_values_are_valid():
    if training_source_type == 'catalog':
        print("Setting the training parameter 'reviewed_only' to 'False' since the training source is a catalog.\n")
        parameters['training']['reviewed_only'] = False
    print("Parameters defined:")
    print(json.dumps(parameters, indent = 3))
else:
    print("Fix invalid parameter values and rerun this cell.")

## Create a library with custom conversion logic for preprocessing and scoring

In this section, we will create a custom Python library called `preprocessing_and_scoring_support`. We create a library because the code is used within this notebook and is also required for prediction in a Watson Machine Learning deployment.
The library will contain code for these two tasks:
1. Preprocess the input for model creation and scoring (`MetadataToTextConverter`)
2. Postprocess the result of the scoring function (`ProbabilityToConfidenceConverter`)

For details on how to use custom functions with the Watson Machine Learning Python SDK, you can check out the example "Use scikit-learn and custom library to predict temparature with ibm-watson-machine learning" at: https://github.com/IBM/watson-machine-learning-samples/blob/master/cpd4.5/notebooks/python_sdk/deployments/custom_library/Use%20scikit-learn%20and%20custom%20library%20to%20predict%20temperature.ipynb

#### The `MetadataToTextConverter` handles
- Specific characteristics of certain metadata (e.g. technical names versus business terms versus (free) text)
- Potential technical characteristics (e.g. name "density" due to length limitations,  case sensitivity/insensitivity, code page support)
- Custom conventions and policies (naming conventions, use of abbreviations, prefixes, languages, etc.)
- Negative assignments (rejected terms)

#### The `ProbabilityToConfidenceConverter` handles
- Mapping of scoring results returned by the ML algorithm (probabilities) to confidences
- Limiting the result size based on the scoring parameters (`max_num_assignments` and `assignment_threshold`)

For technical reasons, the Python code for these classes is written to a file. On some environments, this might lead to the code not being properly displayed as Python source code. It can be helpful to temporarily comment out the operating-system-level commands to view the content as Python source code.

#### Steps:
1. Define Python modules.
2. Create and install the Python package.
3. Optional: Test the functions.

### The MetadataToTextConverter

The `MetadataToTextConverter` performs the preprocessing required to map metadata of assets and/or terms to sequences of tokens that serve as input to the `CountVectorizer`.
The `transform` function reads the input list and processes its content according to the types of metadata included (as defined by the `feature_selection` parameters).
Each type of metadata is mapped to a conversion function that applies dedicated preprocessing (tokenization, normalization, application of transformations such as
stemming, dictionary expansion, stop-word filtering). Metadata of similar type (such as categories and term_names) can be handled by the same converter function.

The function `_create_sequence_of_tokens` handles negative assignments by removing those tokens from the token sequence of a term that are obtained by processing the assets for which this term was rejected thereby lowering or removing the association between these tokens and the term.

The mentioned conversion functions contains only hints about the processing that might be of interest for a certain metadata type. `_convert_description_to_sequence_of_words`
includes a simple example for stop-word processing for illustration purposes. In general, the individual conversion steps are specific to the use case and the metadata at 
hand. The `_tokenizer` converts the sequence of words resulting from the converters to a sequence of lowercase tokens, splitting complex words according to the commonly
used regular expression `'(?u)\b\w\w*\b'`.

The input to the preprocessor is a list of metadata items that take one of two forms:
- Input schema for training:
  - The unique identifier of a term (`term_id`). Multiple subsequent metadata items can have the same term ID.
  - Values as defined by the feature selection parameter `term_metadata`.
  - The polarity: "+" for an assignment and "-" for a negative assignment.
  - Values as defined by the feature selection parameter `asset_metadata`.
  
  If the same term is assigned to multiple assets, the same term ID and term metadata occur multiple times in this list with different instances of the asset metadata.
  
  If the same term is assigned to multiple assets, the same term ID and term metadata occur multiple times in this list with different instances of the asset metadata.
- Input schema for scoring: Values defined by the feature selection parameter `asset_metadata`.

Null values represent metadata that is not present.

#### Example:

term_metadata = ['category', 'term_name', 'term_description'],
asset_metadata = ['table_name', 'column_name', 'asset_description']

len(term_metadata) = len(term_metadata) = 3. Thus the size of the input schema for training is 7 = 1 (term id) + 3 (term metadata) + 3 (asset metadata). The size of
the input schema for scoring is 3 (asset metadata)

1. Example training input representing `Term23` that is assigned to `Table15` and to `Column13` of `Table17`. `Term7` is assigned to `Column24` of `Table15` but
rejected for `Column13` of `Table17`. All terms have descriptions. The only asset that has a description is `Column24` of `Table15`. All terms belong to the same category `Category1`.

```
[ [ "term_id_of_term23", "Category1", "Term23", "Description of term23", "+", "Table15", null, null ],
  [ "term_id_of_term23", "Category1", "Term23", "Description of term23", "+", "Table17", "Column13", null ],
  [ "term_id_of_term7",  "Category1", "Term7", "Description of term7",   "+", "Table15", "Column24", "Description of column24" ],
  [ "term_id_of_term7",  "Category1", "Term7", "Description of term7",   "-", "Table17", "Column13", null ]
]
```

2. Example scoring input for `Table42` and `Column4` of `Table9`. Because the first entry represents a table, the second field which represents the column name is null. `Table42` has a description but the column `Column4` does not which is why its third field representing the descriptio is null.

```
[ [ "Table42", null, "Description of table42" ],
  [ "Table9", "Column4", null ]
]
```

In [None]:
%%bash

#1. Create dictionary structure
mkdir -p term_prediction_model/preprocessing_and_scoring_support

#2. Create a file term_prediction_model/preprocessing_and_scoring_support/metadata_to_text_converter.py containing the following code
echo "import re
from sklearn.base import BaseEstimator, TransformerMixin

class MetadataToTextConverter(BaseEstimator, TransformerMixin):

    def __init__(self, term_metadata_for_training: list, asset_metadata_for_training: list, asset_metadata_for_scoring: list):
        self.asset_metadata_for_scoring = asset_metadata_for_scoring
        self.metadata_for_training = ['term_id'] + term_metadata_for_training + ['polarity'] + asset_metadata_for_training
        self.index_of_polarity_field = len(term_metadata_for_training) + 1
        self.train_on_terms_only = len(asset_metadata_for_training) == 0

    @staticmethod
    def _convert_data_artifact_name_to_sequence_of_words(value):
        '''
        Consider case style (CamelCase, special separators, abbreviations, prefixes, and so on)
        '''
        return value

    @staticmethod
    def _convert_glossary_artifact_name_to_sequence_of_words(value):
        '''
        Consider special conventions for term names
        '''
        return value

    @staticmethod
    def _convert_description_to_sequence_of_words(value):
        '''
        Consider stop word removal, stemming, abbreviations, and so on.
        Simple example of stop word removal.
        '''
        stopwords = [ 'a', 'an', 'the', 'in', 'of' ]
        result_value = ''
        word_list = value.split()
        for word in word_list:
            if word.lower() not in stopwords:
                result_value += word + ' '
        return result_value

    def _convert_to_sequence_of_words(self, metadata_item: dict, metadata_keys: list, start_index = 0, end_index: int = 0):
        i = start_index
        sequence_of_words = ''
        if end_index == 0:
            end_index = len(metadata_item)
        while i < end_index:
            key = metadata_keys[i]
            value = metadata_item[i]
            if value is not None:
                if key == 'column_name' or key == 'schema_name' or key == 'table_name':
                    sow = self._convert_data_artifact_name_to_sequence_of_words(value)
                elif key == 'term_name' or key == 'category':
                    sow = self._convert_glossary_artifact_name_to_sequence_of_words(value)
                elif key == 'asset_description' or key == 'term_description':
                    sow = self._convert_description_to_sequence_of_words(value)
                else:
                    sow = value
                if sow:
                    sequence_of_words += sow + ' '
            i += 1
        return sequence_of_words

    # Move this logic to the respective converter if this tokenization does not apply to all types of metadata values.
    @staticmethod
    def _tokenize(sequence_of_words: str):
        sequence_of_tokens = ''
        # Make sure underscores are treated as separators rather than elements of a name.
        sequence_of_words = sequence_of_words.replace('_', '-')
        tokens = re.findall(r'(?u)\b\w\w*\b', sequence_of_words)
        is_first = True
        for token in tokens:
            if is_first:
                is_first = False
            else:
                sequence_of_tokens += ' '
            sequence_of_tokens += token.lower()
        return sequence_of_tokens

    def _create_new_training_record_with_term_metadata(self, metadata_item: dict):
        training_record = {
            'T': self._convert_to_sequence_of_words(metadata_item, metadata_keys=self.metadata_for_training, start_index=1, end_index=self.index_of_polarity_field),
            '+': '',
            '-': ''
            }
        return training_record

    def _add_asset_metadata_to_training_record(self, polarity: str, metadata_item: dict, training_record: dict):
        if polarity is not None:
            training_record[polarity] += self._convert_to_sequence_of_words(metadata_item, metadata_keys=self.metadata_for_training, start_index=self.index_of_polarity_field + 1)
        return training_record

    def _create_sequence_of_tokens(self, training_record: dict):
        result_sequence = self._tokenize(training_record['T'])
        # Don't add asset metadata if only negative assignments are present
        if not self.train_on_terms_only and len(training_record['+']) > 0:
            positive_sequence_of_tokens = self._tokenize(training_record['+'])
            negative_sequence_of_tokens = self._tokenize(training_record['-'])
            positive_list_of_tokens = positive_sequence_of_tokens.split()
            negative_list_of_tokens = negative_sequence_of_tokens.split()
            # Remove tokens from negative assignments from the list of tokens from (positive) assignments
            for token in positive_list_of_tokens:
                if token not in negative_list_of_tokens:
                    if len(result_sequence) > 0:
                        result_sequence += ' '
                    result_sequence += token
        return result_sequence

    def fit(self, X, y=None):
        # Optionally apply preparatory processing in support of the transform function.
        return self

    def transform(self, metadata_items, y=None):
        '''
        This function converts a list of metadata items to a list holding sequences of tokens representing terms. It calls the
        appropriate conversion function per metadata item depending on its metadata type, applies tokenization on the result, and
        creates a list with one entry (sequence of tokens separated with blank) per term that serves as input for the CountVectorizer.
        
        Example (training):
        [ [ term_id1, cat1, term1, tdesc1, "+", tab3, tab3col1, tab3col1desc ],
          [ term_id1, cat1, term1, tdesc1, "-", tab5, tab5col2, tab5col2desc ],
          [ term_id2, cat2, term2, tdesc2, "+", tab7, tab7col1, tab7col1desc ]
        ]
        
        becomes:
        [ 'ts(cat1) ts(term1) ts(tdesc1) ts(tab3) - (ts(tab5) + ts(tab5col2) + ts(tab5col2desc)) + ts(tab3col1) - (ts(tab5) + ts(tab5col2) + ts(tab5col2desc)) + ts(tab7) - (ts(tab5) + ts(tab5col2) + ts(tab5col2desc)),
          'ts(term2_category) ts(term2_name) ts(term2_description) ts(asset7_table_name) ts(asset7_column_name) ts(asset7_decription)'
        ]
        where
        ts(mi) returns a sequence of tokens for metadata item mi
        ts(mi1) + ts(mi2) concatenates two lists of tokens
        ts(mi1) - ts(mi2) removes all occurrences of tokens from ts(mi1) that occur in ts(mi2)

        The training_data for a term_id is stored in a 'training record' which is a dictionary of the form:
        {
            'T': <sequence of words representing term>,
            '+': <sequence of words representing all assets to which this term is assigned>,
            '-': <sequence of words representing all assets for which this term is rejected>
        }
        '''
        list_of_token_sequences = []
        term_id = None
        training_record = None
        for metadata_item in metadata_items:
            # The input is for scoring if the metadata_item consists of only asset metadata for scoring
            is_scoring_input = len(metadata_item) == len(self.asset_metadata_for_scoring)
            if is_scoring_input:
                sequence_of_words = self._convert_to_sequence_of_words(metadata_item, metadata_keys=self.asset_metadata_for_scoring)
                list_of_token_sequences.append(self._tokenize(sequence_of_words))
            else:
                polarity = None if len(metadata_item) <= self.index_of_polarity_field else metadata_item[self.index_of_polarity_field]
                if metadata_item[0] == term_id:
                    # current metadata_item is training data for same term
                    training_record = self._add_asset_metadata_to_training_record(polarity, metadata_item, training_record)
                else:
                    if training_record is not None:
                        # Switch to a different term: Create and store sequence of tokens for previous term and start a new entry
                        sequence_of_tokens = self._create_sequence_of_tokens(training_record)
                        if len(sequence_of_tokens) > 0:
                            list_of_token_sequences.append(sequence_of_tokens)
                    term_id = metadata_item[0]
                    training_record = self._create_new_training_record_with_term_metadata(metadata_item)
                    training_record = self._add_asset_metadata_to_training_record(polarity, metadata_item, training_record)
        if training_record:
            sequence_of_tokens = self._create_sequence_of_tokens(training_record)
            if len(sequence_of_tokens) > 0:
                list_of_token_sequences.append(sequence_of_tokens)
        return list_of_token_sequences
" > term_prediction_model/preprocessing_and_scoring_support/metadata_to_text_converter.py
    
echo "Created metadata_to_text_converter.py."

### The ProbabilityToConfidenceConverter

The `ProbabilityToConfidenceConverter` is used by the custom scoring function defined later in this notebook. In this sample implementation, it filters
the probabilities returned by the ML algorithm's scoring function to a subset matching the filter criteria of the scoring parameters `max_num_assignments`
and `assignment_threshold`.

This serves two purposes:
 1. It reduces the size of the result by considering only the assignments with the highest probabilities.
 2. It scales the now smaller number of results by adding the sum of the remaining probabilities (filtered by the scoring parameters) proportionally to the remaining
    probabilities. The sum of these remaining probabilities still rounds to 1 though not all of them may show up on the result due to the `assignment_threshold`.

This is done only for the purpose of illustration because the sum of confidences for all assignments does not need to be 1. Thus, the confidences returned
by the scoring function cannot be easily compared to confidences returned by the other term assignment methods. To keep the example simple, this notebook
uses a Naive Bayes classifier that supports multi-class classification with probabilities. A more natural (and theoretically more sound) notion of a confidence value
would require a multi-label-classification approach such as one-vs-rest. However, this would make the example significantly more complex, and performance
can be a challenge when dealing with a large number of features.

In [None]:
%%bash

# 3. Create a file term_prediction_model/preprocessing_and_scoring_support/probability_to_confidence_converter.py containing the code in this cell

echo "import numpy as np

class ProbabilityToConfidenceConverter:

    def __init__(self, max_num_assignments: int, assignment_threshold: float):
        self.max_num_assignments = max_num_assignments
        self.assignment_threshold = assignment_threshold

    def compute_confidences(self, probabilities, classes):

        # Sort probabilities and classes (= term_ids) in decending probability order
        probabilities, matching_term_ids = (list(t) for t in zip(*sorted(zip(probabilities, classes), reverse=True, key=lambda x: x[0])))

        max_num_assignments = self.max_num_assignments
        assignment_threshold = self.assignment_threshold

        # Sum of probabilities for disregarded results (beyond max_num_assignments)
        sum_disregarded_probabilities = np.sum(probabilities[max_num_assignments:])
        # Keep results only up to max_num_assignments
        matching_probabilities = probabilities[:max_num_assignments]
        matching_term_ids = matching_term_ids[:max_num_assignments]
        sum_matching_probabilities = np.sum(matching_probabilities)
        if sum_matching_probabilities == 0:
            raise RuntimeError('The sum of matching probabilities may not be zero.')
        
        # The adjustment factor is the proportion of disregarded probabilies versus matching probabilities
        adjustment_factor = sum_disregarded_probabilities / sum_matching_probabilities

        # Multiplying the adjustment factor with the matching probabilities gives the value to be added to each matching probability such that
        # the proportion of the value to be added is the same as the proportions of the matching probabilities.
        adjustments = np.multiply(matching_probabilities, adjustment_factor)

        # Adding the adjustment to the matching probabilities gives values with the same proportion that sum up to 1.
        matching_confidences = np.add(matching_probabilities, adjustments)
        confidences = []
        term_ids = []
        for confidence, term_id in zip(matching_confidences, matching_term_ids):
            if confidence >= assignment_threshold:
                confidences.append(confidence)
                term_ids.append(term_id)
        return confidences, term_ids
" > term_prediction_model/preprocessing_and_scoring_support/probability_to_confidence_converter.py

echo "Created probability_to_confidence_converter.py."

### Create and install Python package `preprocessing_and_scoring_support`

The following commands create a Python package `preprocessing_and_scoring_support` and install it on the current Python environment.

Note: If you made changes to the custom functions, the Jupyter kernel might need to be restarted to pick up the changed package.

In [None]:
%%bash

echo 'Library containing classes in support of term prediction.' > term_prediction_model/README.md
echo '__version__ = "0.1"'  > term_prediction_model/preprocessing_and_scoring_support/__init__.py

echo "from setuptools import setup
VERSION='0.1'
setup(name='preprocessing_and_scoring_support',
      version=VERSION,
      url='...',
      author='...',
      author_email='...',
      license='...',
      packages=[
            'preprocessing_and_scoring_support'
      ],
      zip_safe=False
)" > term_prediction_model/setup.py

cd term_prediction_model/
python setup.py sdist --formats=zip
cd ..
mv term_prediction_model/dist/preprocessing_and_scoring_support-0.1.zip .

pip3 install --force-reinstall preprocessing_and_scoring_support-0.1.zip

### Optional: Test the MetadataToTextConverter and ProbabilityToConfidenceConverter

This is a simple unit test that imports the two custom Python modules from the `preprocessing_and_scoring_support` package and verifies its `transformation` and `compute_confidences` functions based on some basic input data. It can be extended to cover additional cases such as different metadata sets but it's kept simple for illustration purposes.

Tip: If you want to step through the custom modules using the debugger you will need to make a couple of modifications:
- Comment out the bash commands in the custom modules (`#%%bash`, `#echo "import...` and copy the import statement following the echo command to the next line).
- Comment out the line that writes the result to a file near the end of the cell (`#" > term_prediction_model/...`).
- Replace the environment variables `${WML_TAS_CPD_...}` in the `ProbabilityToConfidenceConverter` with dummy values of the respective type (for example, `${TAS_WML_CP4D_TOKEN_LIFETIME}` with 0 and `${TAS_WML_CP4D_URL}`, `${TAS_WML_CP4D_USER}`, `${TAS_WML_CP4D_PW}` with empty strings).
- Comment out the import statements `#from preprocessing_and_scoring_support....` in the cell being debugged such that it refers to the code in the notebook rather than in the package.

In [None]:
import unittest
import math

from preprocessing_and_scoring_support.metadata_to_text_converter import MetadataToTextConverter
from preprocessing_and_scoring_support.probability_to_confidence_converter import ProbabilityToConfidenceConverter

class TestMetadataToTextConverter(unittest.TestCase):
    def setUp(self):
        self.term_metadata = ['category', 'term_name', 'term_description']
        self.asset_metadata = ['table_name', 'column_name', 'asset_description']
        self.m2tc = MetadataToTextConverter(term_metadata_for_training=self.term_metadata, asset_metadata_for_training=self.asset_metadata, asset_metadata_for_scoring=self.asset_metadata)

    def test_training_transformation_term_metadata_only(self):
        m2tc_2 = MetadataToTextConverter(term_metadata_for_training=self.term_metadata, asset_metadata_for_training=[], asset_metadata_for_scoring=self.asset_metadata)
        #        term_id      category            term_name          term_description
        row1 = [ "term_1_id", "Banking Business", "Bank Account",    "Details of a bank account."           ]
        row2 = [ "term_2_id", "Banking Business", "Bank Statistics", "Key metrics of the banking business." ]
        transform_output = m2tc_2.transform([row1, row2])
        self.assertEqual(2, len(transform_output))
        self.assertEqual('banking business bank account details bank account', transform_output[0])
        self.assertEqual('banking business bank statistics key metrics banking business', transform_output[1])
    
    def test_training_transformation_asset_metadata_only(self):
        m2tc_2 = MetadataToTextConverter(term_metadata_for_training=[], asset_metadata_for_training=self.asset_metadata, asset_metadata_for_scoring=self.asset_metadata)
        row1 = [ "term_1_id", "+", "BANK_CUSTOMERS",  None,   None ]
        row2 = [ "term_1_id", "+", "BANK_CUSTOMERS",  "AGE",  None ]
        transform_output = m2tc_2.transform([row1, row2])
        self.assertEqual(1, len(transform_output))
        self.assertEqual('bank customers bank customers age', transform_output[0])

    def test_training_transformation(self):
        row1 = [ "term_1_id", "Banking Business", "Bank Account",    "Details of a bank account.",           "+",     "ACCOUNTS",   None,         None ]
        row2 = [ "term_1_id", "Banking Business", "Bank Account",    "Details of a bank account.",           "+",     "ACCOUNTS",   "ACCOUNT_ID", None ]
        row3 = [ "term_2_id", "Banking Business", "Bank Statistics", "Key metrics of the banking business.", "+",     None,         None,         None ]
        transform_output = self.m2tc.transform([row1, row2, row3])
        self.assertEqual(2, len(transform_output))
        self.assertEqual('banking business bank account details bank account accounts accounts account id', transform_output[0])
        self.assertEqual('banking business bank statistics key metrics banking business', transform_output[1])

    def test_training_transformation_with_negative_assignments(self):
        row1 = [ "term_1_id", "Other Business", "Data", "Any data.",  "+", "DATA1", None,   "Data item for any business not related to banking." ]
        row2 = [ "term_1_id", "Other Business", "Data", "Any data.",  "-", "DATA2", "COL1", "Data item for banking business."                    ]
        transform_output = self.m2tc.transform([row1, row2])
        self.assertEqual(1, len(transform_output))
        # Note that neither 'item' nor 'banking' occur in the training data
        self.assertEqual('other business data any data data1 any not related to', transform_output[0])

    def test_training_transformation_with_negative_assignments_negative_only(self):
        m2tc_2 = MetadataToTextConverter(term_metadata_for_training=[], asset_metadata_for_training=self.asset_metadata, asset_metadata_for_scoring=self.asset_metadata)
        row1 = [ "term_1_id", "+", "BANK_CUSTOMERS",  "surviving",   None ]
        row2 = [ "term_2_id", "-", "BANK_CUSTOMERS",  "AGE",  None        ]
        row3 = [ "term_2_id", "-", "BANK_CUSTOMERS",  "ADDRESS",  None        ]
        row4 = [ "term_3_id", "-", "BANK_CUSTOMERS",  "PROFESSION",  None ]
        # The training data will not include entries from term2 or term3 because they only consist of negative assignments
        transform_output = m2tc_2.transform([row1, row2, row3, row4])
        self.assertEqual(1, len(transform_output))
        self.assertEqual('bank customers surviving', transform_output[0])

    def test_training_transformation_with_negative_assignments_tokens_removed(self):
        m2tc_2 = MetadataToTextConverter(term_metadata_for_training=[], asset_metadata_for_training=self.asset_metadata, asset_metadata_for_scoring=self.asset_metadata)
        row1 = [ "term_1_id", "+", "BANK_CUSTOMERS",  None,   None ]
        row2 = [ "term_1_id", "-", "BANK_CUSTOMERS",  "AGE",  None ]
        # The negative assignment removes all tokens for term1 resulting in an empty training set
        transform_output = m2tc_2.transform([row1, row2])
        self.assertEqual(0, len(transform_output))

    def test_scoring_transformation(self):
        row1 = [ "ACCOUNTS",  None,          None ]
        row2 = [ "ACCOUNTS",  "ACCOUNT_ID",  None ]
        transform_output = self.m2tc.transform([row1, row2])
        self.assertEqual(2, len(transform_output))
        self.assertEqual('accounts', transform_output[0])
        self.assertEqual('accounts account id', transform_output[1])
    

class TestProbabilityToConfidenceConverter(unittest.TestCase):
    def setUp(self):
        max_num_term_assignments = 2
        assignment_threshold = 0.4
        self.p2cc = ProbabilityToConfidenceConverter(max_num_assignments=max_num_term_assignments, assignment_threshold=assignment_threshold)

    def test_probability_to_confidence_conversion(self):
        probabilities = [0.1, 0.4, 0.2, 0.3]
        classes = probabilities
        confidences, term_ids = self.p2cc.compute_confidences(probabilities, classes)
        self.assertAlmostEqual(1.0, math.fsum(confidences))
        i = 1
        is_correct = True
        while is_correct and i < len(confidences):
            if confidences[i - 1] < confidences[i] or term_ids[i - 1] < term_ids[i]:
                is_correct = False
            i += 1
        self.assertTrue("Result not in proper order.", is_correct)


test_result = unittest.main(argv=[''], verbosity=3, exit=False)
assert len(test_result.result.failures) == 0

## Create model

Model creation involves the following steps:
1. Retrieve metadata of data assets, columns, and terms needed for model training from the catalog/project with the ID `training_source_id`.
2. Create the input for training in the format understood by the custom preprocessor (MetadataToTextConverter).
3. Train a classifier based on this metadata.

### Retrieve metadata of data assets and columns with assigned or removed terms

Store the retrieved metadata in dictionaries `terms`, `assets`, `term_id_to_assigned_asset_ids_map`, and `term_id_to_rejected_asset_ids_map`.

In [None]:
import requests

# Metadata of data assets in the training source with assigned or removed terms
assets = {}

# Names of all assigned or removed terms stored in the metadata of the data assets in the training source
terms = {}

# The repository_id is used to create a global_id from the artifact_id of a term
repository_id = None

def to_map(list_of_named_objects, value_key = 'value'):
    # Utility function: Create a dictionary {"a":"x", "b":"y", ...} from a list [ {'name':"a", '<value_key>':"x"}, {'name':"b", '<value_key>':"y"}, ...]
    result_map = {}
    for named_object in list_of_named_objects:
        key = named_object['name']
        if value_key in named_object:
            result_map[key] = named_object[value_key]
    return result_map

def retrieve_assignments_and_rejections(ignore_reviewed: bool):
    global repository_id
    term_id_to_assigned_asset_ids_map = {}
    term_id_to_rejected_asset_ids_map = {}
    num_tables_with_assigned_terms = 0
    num_tables_with_rejected_terms = 0
    num_columns_with_assigned_terms = 0
    num_columns_with_rejected_terms = 0
    term_id = None  # Keep one term_id to retrieve repository_id

    def add_term_ids_for_asset_id(asset_id, term_list, term_id_to_asset_ids_map):
        # Add asset_id to all entries of term_id_to_asset_ids_map for all term_ids in term_list and add the corresponding term names to the global terms dictionary
        for term in term_list:
            nonlocal term_id
            term_id = term.get('term_id', None)
            if term_id:
                if term_id in term_id_to_asset_ids_map:
                    term_id_to_asset_ids_map[term_id].append(asset_id)
                else:
                    term_id_to_asset_ids_map[term_id] = [ asset_id ]
                terms[term_id] = { 'term_name': term['term_display_name'] }

    headers = {"Authorization": f"Bearer {wkc_token}", "Content-Type": "application/json"}
    params =  {f"{training_source_type}_id": training_source_id}
    payload = json.dumps({"query":f"asset.{training_source_type}_id:{training_source_id}", "limit":200, "include": "entity"})

    while payload:
        search_result_response = requests.post(f"{cp4d_url}/v2/asset_types/data_asset/search", headers=headers, params=params, data=payload, verify=False)
        search_result_response.raise_for_status()
        data_assets = search_result_response.json()['results']
        for data_asset in data_assets:
            if 'entity' not in data_asset or 'metadata' not in data_asset or 'name' not in data_asset['metadata']:
                raise RuntimeError(f'Metadata of data asset is malformed: {json.dumps(data_asset)}')
            entity = data_asset['entity']
            data_asset_enrichment_area_info = entity.get('metadata_enrichment_area_info',{})
            data_asset_has_required_review_status = ignore_reviewed or 'review_date' in data_asset_enrichment_area_info
            asset_terms = entity.get('asset_terms', {})
            assigned_terms = asset_terms.get('list', [])
            asset_id = data_asset['metadata']['asset_id']
            property_map = to_map(entity.get('data_asset', {}).get('properties', []))
            schema_name = property_map.get('schema_name', '')
            table_name = data_asset['metadata']['name']
            # Ignore data asset if it has no terms assigned even if terms might be rejected
            if data_asset_has_required_review_status and len(assigned_terms) > 0:
                num_tables_with_assigned_terms += 1
                assets[asset_id] = {
                    'schema_name': schema_name,
                    'column_name': None,         # column_name == None because this is a table
                    'asset_description': data_asset['metadata'].get('description', None),
                    'table_name':  table_name
                }
                add_term_ids_for_asset_id(asset_id, assigned_terms, term_id_to_assigned_asset_ids_map)
                rejected_terms = asset_terms.get('rejected_terms', [])
                if len(rejected_terms) > 0:
                    num_tables_with_rejected_terms += 1
                    add_term_ids_for_asset_id(asset_id, rejected_terms, term_id_to_rejected_asset_ids_map)
            if 'column_info' in entity:
                column_review_dates = None if ignore_reviewed else to_map(data_asset_enrichment_area_info.get('columns', []), value_key='review_date')
                for column_name, column in entity['column_info'].items():
                    column_has_required_review_status = column_review_dates is None or column_name in column_review_dates
                    assigned_column_terms = column.get('column_terms', [])
                    # Ignore column if it has no terms assigned even if terms might be rejected
                    if column_has_required_review_status and len(assigned_column_terms) > 0:
                        num_columns_with_assigned_terms += 1
                        column_id = asset_id + ':' + column_name
                        assets[column_id] = {
                            'schema_name': schema_name,
                            'column_name': column_name,
                            'asset_description': column.get('column_description', ''),
                            'table_name':  table_name
                        }
                        add_term_ids_for_asset_id(column_id, assigned_column_terms, term_id_to_assigned_asset_ids_map)
                        rejected_column_terms = column.get('rejected_terms', [])
                        if len(rejected_column_terms) > 0:
                            num_columns_with_rejected_terms += 1
                            add_term_ids_for_asset_id(column_id, rejected_column_terms, term_id_to_rejected_asset_ids_map)
        payload = search_result_response.json().get('next', None)
        print(f"Found {str(num_tables_with_assigned_terms)} table(s) and {str(num_columns_with_assigned_terms)} column(s) with assignments.")
        print(f"Found {str(num_tables_with_rejected_terms)} table(s) and {str(num_columns_with_rejected_terms)} column(s) with assignments and rejected assignments.")
        repository_id = term_id.split("_")[0] if term_id else None
    return term_id_to_assigned_asset_ids_map, term_id_to_rejected_asset_ids_map


def retrieve_assignments():
    ignore_reviewed = not parameters['training']['reviewed_only']
    term_id_to_assigned_asset_ids_map, term_id_to_rejected_asset_ids_map = retrieve_assignments_and_rejections(ignore_reviewed=ignore_reviewed)
    if len(term_id_to_assigned_asset_ids_map) == 0:
        print(f"No terms are assigned in this {training_source_type}. The model can only be build based on term meta data.")
        print("Setting the training parameter 'metadata_scope' to 'metadata_of_all_terms' and the asset meta data for training to [].")
        parameters['training']['metadata_scope'] = 'metadata_of_all_terms'
        asset_metadata_for_training = []
    else:
        asset_metadata_for_training = parameters['feature_selection']['asset_metadata']
    return asset_metadata_for_training, term_id_to_assigned_asset_ids_map, term_id_to_rejected_asset_ids_map

asset_metadata_for_training, term_id_to_assigned_asset_ids_map, term_id_to_rejected_asset_ids_map = retrieve_assignments()

### Extract additional term metadata (if configured)

Extract additional metadata based on the setting of the training parameter `metadata_scope`. If the parameter is set to `metadata_of_all_terms` or `metadata_of_assigned_terms`, the value of the `terms` variable is replaced with a more elaborate dictionary that contains category and description metadata for terms. If the parameter is set to `names_of_assigned_terms`, no further API calls are made and the current value of the `terms` variable is used.

In [None]:
import os
from zipfile import ZipFile
from io import BytesIO, StringIO
from csv import DictReader

# Add artifact types and corresponding process_xxx_metadata functions to extract metadata from other glossary artifact types as needed
glossary_artifact_types = ["category", "glossary_term"]

def export_glossary_metadata():
    headers = {"Authorization": f"Bearer {wkc_token}"}
    params = {"artifact_types": ','.join(glossary_artifact_types)}
    search_result_response = requests.get(f"{cp4d_url}/v3/governance_artifact_types/export", headers=headers, params=params, verify=False)
    search_result_response.raise_for_status()
    in_memory_zip = BytesIO(search_result_response.content)
    artifact_exports = {}
    with ZipFile(in_memory_zip, 'a') as glossary_export:
        infolist = glossary_export.infolist()
        for info in infolist:
            if not info.is_dir() and info.filename:
                with glossary_export.open(info) as glossary_export_csv:
                    for glossary_artifact_type in glossary_artifact_types:
                        if info.filename.startswith(glossary_artifact_type):
                            artifact_exports[glossary_artifact_type] = glossary_export_csv.read().decode()
    return artifact_exports

def process_glossary_metadata(artifact_type, artifact_exports):
    term_export_csv = artifact_exports[artifact_type]
    with open(StringIO(term_export_csv), 'r') as terms_csv:
        for row in DictReader(terms_csv, delimiter=',', quotechar='"'):
            yield row

def retrieve_repository_id(term_id):
    headers = {"Authorization": f"Bearer {wkc_token}"}
    response = requests.get(f"{cp4d_url}/v3/glossary_terms/{term_id}/versions", headers=headers, verify=False)
    response.raise_for_status()
    repo_id = response.json().get('resources', [{}])[0].get('metadata', {}).get('source_repository_id', None)
    if repo_id is None:
        raise RuntimeError(f'Result of GET /v3/glossary_terms/<term_id>/versions is malformed: {json.dumps(response.json())}')
    return repo_id

def process_term_metadata(category_id_to_category_name_map, artifact_exports, all_terms):
    global repository_id
    # Read term metadata and extend terms dictionary in support of parameters['feature_selection']['term_metadata']
    term_export_csv = artifact_exports["glossary_term"]
    for row in DictReader(StringIO(term_export_csv), delimiter=',', quotechar='"'):
        artifact_id = row['Artifact ID']
        if artifact_id:
            if not repository_id:
                repository_id = retrieve_repository_id(artifact_id)
            global_id = f"{repository_id}_{artifact_id}"
            if all_terms or global_id in terms:
                category_id = row['Category']
                terms[global_id] = {
                    'term_name': row['Name'],
                    'term_description': row.get('Description', ''),
                    'category': category_id_to_category_name_map.get(category_id, '')
                }

def process_category_metadata(artifact_exports):
    category_id_to_category_name_map = {}
    category_export_csv = artifact_exports["category"]
    for row in DictReader(StringIO(category_export_csv), delimiter=',', quotechar='"'):
        category_id_to_category_name_map[row['Artifact ID']] = row['Name']
    return category_id_to_category_name_map

def extend_term_metadata(artifact_exports, all_terms):
    category_id_to_category_name_map = process_category_metadata(artifact_exports)
    # Read term metadata and replace category_id with name of category in each entry of the terms dictionary
    process_term_metadata(category_id_to_category_name_map, artifact_exports, all_terms=all_terms)

def optionally_extend_term_metadata():
    retrieval_option = parameters['training']['metadata_scope']
    if retrieval_option == "names_of_assigned_terms":
        print(f"Using names of the {str(len(terms))} terms that are assigned or rejected for training.")
    else:
        artifact_exports = export_glossary_metadata()
        if retrieval_option == "metadata_of_assigned_terms":
            extend_term_metadata(artifact_exports, all_terms=False)
            print(f"Extended metadata of the {str(len(terms))} terms that are assigned or rejected.")
        else:
            # Add/extend metadata of all terms
            extend_term_metadata(artifact_exports, all_terms=True)
            if len(terms) > 0:
                print(f"Added metadata of all {str(len(terms))} terms.")

optionally_extend_term_metadata()
if len(terms) == 0:
    print(f"{HIGHLIGHT} No terms are present in the glossary. A model can not be trained. Re-run the notebook when terms have been created.")

### Prepare metadata for training

Create the training input from the term and asset metadata stored in `terms`, `term_id_to_assigned_asset_ids_map`, `term_id_to_rejected_asset_ids_map`, and `assets`.

Though not required by the `MetadataToTextConverter`, the training input is created in a compact 'canonical format' that includes the term metadata only in the first row for
a term ID to save memory footprint:

```
[ 
  [ "term_id_1", "Category", "Term1", "Term1 description.", "+", ...asset metadata... ],
  [ "term_id_1", null,       null,    null,                 "+", ...asset metadata... ],
  [ "term_id_1", null,       null,    null,                 "-", ...asset metadata... ],
  [ "term_id_1", null,       null,    null,                 "-", ...asset metadata... ],
  [ "term_id_2", "Category", "Term2", "Term2 description.", "+", ...asset metadata... ],
  ...
]
```

The `MetadataToTextConverter` requires all entries of a term ID to be stored in adjacent rows. Term metadata is read from the first row with this term ID. Term metadata in other
rows with the same term ID are ignored and can thus be null.

In [None]:
def create_training_data(terms, assets):
    train_data = []
    train_data_row_with_term_metadata = None

    def add_asset_metadata(polarity, asset_ids, train_data_row_with_term_metadata, subsequent_train_data_row):
        for asset_id in asset_ids:
            asset = assets[asset_id]
            term_metadata_row = train_data_row_with_term_metadata if train_data_row_with_term_metadata else subsequent_train_data_row
            if train_data_row_with_term_metadata:
                train_data_row_with_term_metadata = None
            asset_metadata_row = [ polarity ]
            for am in asset_metadata_for_training:
                asset_metadata_row.append(asset.get(am, ''))
            train_data.append(term_metadata_row + asset_metadata_row)

    train_term_ids = []
    term_metadata = parameters['feature_selection']['term_metadata']
    for term_id, term in terms.items():
        train_data_row_with_term_metadata = [ term_id ]
        subsequent_train_data_row = [ term_id ]
        for tm in term_metadata:
            train_data_row_with_term_metadata.append(term.get(tm, ''))
            subsequent_train_data_row.append(None)
        positive_asset_ids = term_id_to_assigned_asset_ids_map.get(term_id, None)
        if positive_asset_ids:
            add_asset_metadata("+", positive_asset_ids, train_data_row_with_term_metadata, subsequent_train_data_row)
            train_data_row_with_term_metadata = None
            negative_asset_ids = term_id_to_rejected_asset_ids_map.get(term_id, None)
            if negative_asset_ids:
                add_asset_metadata("-", negative_asset_ids, train_data_row_with_term_metadata, subsequent_train_data_row)
            train_term_ids.append(term_id)
        elif term_metadata:
            train_data.append(train_data_row_with_term_metadata)
            train_term_ids.append(term_id)
    return train_data, train_term_ids

train_data, train_term_ids = create_training_data(terms, assets)

print("Training data created.")

### Optional: Review preprocessed training data

The next cell lists the first up to 10 associations between a term (represented by name) and the feature string ('bag of words') representing it in the training input.

In [None]:
def print_preprocessed_training_input(train_data, train_term_ids):
    term_metadata_for_training = parameters['feature_selection']['term_metadata']
    m2tc = MetadataToTextConverter(term_metadata_for_training=term_metadata_for_training, asset_metadata_for_training=asset_metadata_for_training, asset_metadata_for_scoring=[])
    preprocessing_output = m2tc.transform(train_data)

    for i in range(min(len(train_term_ids), 10)):
        term_id = train_term_ids[i]
        print(f"{terms[term_id]['term_name']}\t<-\t{preprocessing_output[i]}")

print("Preprocessed training data:")
print_preprocessed_training_input(train_data, train_term_ids)

### Train model

Create a training pipeline that includes the following components:
- The `MetadataToTextConverter` as preprocessor
- A `CountVectorizer` configured with the values in the feature mapping parameters as vectorizer
- A Naive Bayes classifier `MultinomialNB` supporting multi-class classification and the computation of probabilities as classifier

In [None]:
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.pipeline import Pipeline
from sklearn.naive_bayes import MultinomialNB
from preprocessing_and_scoring_support.metadata_to_text_converter import MetadataToTextConverter

def train(train_data, term_ids):
        model_parameters = parameters['feature_mapping']
        args =  {
                "analyzer": "word",
                "preprocessor": None,
                "tokenizer": None
        }
        if 'ngram_range' in  model_parameters:
                args['ngram_range'] =  model_parameters['ngram_range']
        if 'min_df' in  model_parameters:
                args['min_df'] =  model_parameters['min_df']
        if 'max_df' in model_parameters:
                args['max_df'] =  model_parameters['max_df']
        if 'max_features' in  model_parameters:
                args['max_features'] =  model_parameters['max_features']
        term_metadata_for_training = parameters['feature_selection']['term_metadata']
        asset_metadata_for_scoring = parameters['feature_selection']['asset_metadata']
        preprocessor = MetadataToTextConverter(asset_metadata_for_training=asset_metadata_for_training, term_metadata_for_training=term_metadata_for_training, asset_metadata_for_scoring=asset_metadata_for_scoring)
        vectorizer = CountVectorizer(**args)
        classifier = MultinomialNB()
        pipeline = Pipeline([('preprocess', preprocessor), ('vectorize', vectorizer),  ('classify', classifier)])
        model = pipeline.fit(train_data, term_ids)
        feature_names = vectorizer.get_feature_names_out()
        num_features = len(feature_names)
        print(f"Number of features: {str(num_features)}")
        print(f"First up to 10 features: {feature_names[0:min(num_features, 10)]}")
        term_ids = classifier.classes_.tolist()
        return pipeline, model, term_ids

pipeline, model, term_ids = train(train_data, train_term_ids)

print("Model training completed.")

### Define test data

This data is used by different tests performed in steps that follow. Therefore, it is kept in the global variable `test_data`. To ensure the test provides useful results, the table names (`TAB1`, `TAB2`) and column names (`CLIENT`, `ADDRESS`) might need to be changed to values that are compatible with the training data.

In [None]:
test_data = [ [ "TAB1", "CLIENT" ], [ "TAB2", "ADDRESS" ] ]

scoring_payload = {"input_data": [{"values": test_data}]}

print("Test data defined.")

### Optional: Test model locally

In [None]:
from preprocessing_and_scoring_support.probability_to_confidence_converter import ProbabilityToConfidenceConverter

max_num_assignments = parameters['scoring']['max_num_assignments']
assignment_threshold = parameters['scoring']['assignment_threshold']
ptcc = ProbabilityToConfidenceConverter(max_num_assignments=max_num_assignments, assignment_threshold=assignment_threshold)

def limit_to_decimals(f: float, num_decimals: int = 4):
    return float(f'%.{num_decimals}f' % f)

def test_predict(test_data):
    predictions = pipeline.predict_proba(test_data)

    for i in range(len(test_data)):
        adjusted_confidences, final_term_ids = ptcc.compute_confidences(predictions[i], term_ids)
        print(f"Prediction for {test_data[i]}:")
        for j in range(len(adjusted_confidences)):
            final_term_id = final_term_ids[j]
            print(f"\t{terms[final_term_id]['term_name']} : {limit_to_decimals(adjusted_confidences[j])}")

test_predict(test_data)

## Deploy model artifacts in Watson Machine Learning

This notebook stores the model artifacts directly in a deployment space to simplify the development and test cycle. Alternatively, model artifacts can be stored in a project and promoted to a deployment space as a separate step.

Steps:
1. Define name prefix for all artifacts to be deployed
2. Define names of model artifacts.
3. Delete existing deployments for this prefix (if present).
4. Store the custom library and add it to the software specification for term prediction.
5. Store the model in the default deployment space.
6. Deploy the model.
7. Optional: Test the model.

### Define name prefix for all artifacts to be deployed

All deployed artifacts created by this notebook share a common prefix. Change the prefix if you want to keep artifacts created by a previous run of the notebook.

In [None]:
prefix = "demo"

print(f"Names of deployed artifacts will have '{prefix}' as prefix.")

In [None]:
# Artifact names
package_extension_name = prefix + "_tp_library"
software_specification_name = prefix + "_ta_library_spec"
model_name = prefix + "_tp_model"

# Deployment name
model_deployment_name = model_name + "_deployment"

# Name to be used in URL for built-in scoring
model_serving_name = model_name

print("Model artifact names defined.")

### Delete deployed model artifacts (if present)

This step is optional when running the notebook for the first time with this prefix.

In [None]:
def delete_artifacts(name, wml_resource, wml_resource_name):
    print(f"Deleting {wml_resource_name} with name {name}")
    num_artifacts_deleted = 0
    while True:
        id = wml_resource.get_id_by_name(name)
        if id == 'Not Found':
            break
        wml_resource.delete(id)
        num_artifacts_deleted += 1
    print(f"\t{num_artifacts_deleted} deleted.")

def delete_deployments(serving_name):
    print(f"Deleting deployments with serving name '{serving_name}'")
    num_deployments_deleted = 0
    details = wml_client.deployments.get_details(serving_name=serving_name)
    resources = details.get('resources', [])
    for resource in resources:
        id = wml_client.deployments.get_id(resource)
        wml_client.deployments.delete(id)
        num_deployments_deleted += 1
    print(f"\t{num_deployments_deleted} deleted.")

def delete_models(name):
    print(f"Deleting models with name '{name}'")
    num_models_deleted = 0
    details = wml_client.repository.get_model_details()
    resources = details.get('resources', [])
    for resource in resources:
        if resource.get('metadata', {}).get('name', '') == name:
            id = wml_client.repository.get_model_id(resource)
            wml_client.repository.delete(id)
            num_models_deleted += 1
    print(f"\t{num_models_deleted} deleted.")


delete_artifacts(package_extension_name, wml_client.package_extensions, "package extensions")
delete_artifacts(software_specification_name, wml_client.software_specifications, "software specifications")
delete_deployments(model_serving_name)
delete_models(model_name)

### Store the custom library in the deployment space

In [None]:
def store_custom_library():
    meta_props_pkg_extn = {
        wml_client.package_extensions.ConfigurationMetaNames.NAME: package_extension_name,
        wml_client.package_extensions.ConfigurationMetaNames.DESCRIPTION: "Library supporting term prediction",
        wml_client.package_extensions.ConfigurationMetaNames.TYPE: "pip_zip"
    }

    pkg_extn_details =  wml_client.package_extensions.store(meta_props=meta_props_pkg_extn, file_path="preprocessing_and_scoring_support-0.1.zip")
    pkg_extn_id = wml_client.package_extensions.get_id (pkg_extn_details)
    return pkg_extn_id

package_extension_id = store_custom_library()

## Extend the base software specification with the custom library

In [None]:
def extend_base_sw_specification(pkg_extn_uid):
    
    base_software_specification_id = wml_client.software_specifications.get_id_by_name("runtime-23.1-py3.10")

    meta_props_sw_spec = {
        wml_client.software_specifications.ConfigurationMetaNames.NAME: software_specification_name,
        wml_client.software_specifications.ConfigurationMetaNames.DESCRIPTION: "Software specification of library supporting term prediction",
        wml_client.software_specifications.ConfigurationMetaNames.BASE_SOFTWARE_SPECIFICATION: {"guid": base_software_specification_id}
    }

    sw_spec_details = wml_client.software_specifications.store(meta_props=meta_props_sw_spec)
    sw_spec_id = wml_client.software_specifications.get_id(sw_spec_details)

    print("Extending software specification with library supporting term prediction")
    wml_client.software_specifications.add_package_extension(sw_spec_id, pkg_extn_uid)
    return sw_spec_id

software_specification_id = extend_base_sw_specification(package_extension_id)

### Store and deploy model

In [None]:
from ibm_watson_machine_learning.wml_client_error import WMLClientError

def deploy_model():
    
    meta_props_model = {
        wml_client.repository.ModelMetaNames.NAME: model_name,
        wml_client.software_specifications.ConfigurationMetaNames.DESCRIPTION: "Software specification of term prediction model",
        wml_client.repository.ModelMetaNames.SOFTWARE_SPEC_UID: software_specification_id,
        wml_client.repository.ModelMetaNames.TYPE: "scikit-learn_1.1"
    }      

    # Store model in WML repository
    model_metadata = wml_client.repository.store_model(model=pipeline, meta_props=meta_props_model)
    model_id = wml_client.repository.get_model_id(model_metadata)

    metadata = {
        wml_client.deployments.ConfigurationMetaNames.NAME: model_deployment_name,
        wml_client.deployments.ConfigurationMetaNames.DESCRIPTION: "Deployment of term prediction model",
        wml_client.deployments.ConfigurationMetaNames.ONLINE: { "parameters": { "serving_name": model_serving_name } }
    }

    model_deployment_id = None
    try:
        model_deployment_details = wml_client.deployments.create(model_id, meta_props=metadata)
        model_deployment_id = wml_client.deployments.get_id(model_deployment_details)
    except WMLClientError as e:
        if "non_unique_serving_name" in e.error_msg:
            print(f"{HIGHLIGHT} A model with serving name '{model_serving_name}' is already deployed. Delete the existing deployment '{model_deployment_name}' or use a different prefix/name.")
    return model_deployment_id

model_deployment_id = deploy_model()

### Optional: Test model deployment with built-in scoring function

In [None]:
deployment_prediction = wml_client.deployments.score(model_serving_name, scoring_payload)

print(json.dumps(deployment_prediction, indent=2))

## Custom scoring function

The built-in scoring function returns a single `term_id` per asset or column in the `prediction` field and returns probabilities for all terms in the `probabilities` field. However, the requirements for automatic term assigment are as follows:
- Pairs of term IDs and confidence values for each term assigned to an asset must exist.
- It must be possible to limit the result size by maximum number of terms assigned and by a confidence threshold.

To achieve this, a custom scoring function is defined:
- It computes confidences from probabilities based on the compute_confidences() function of the custom ProbabilityToConfidenceConverter class.
- It returns term IDs and confidences in two lists with the same size: `term_ids` and `confidences`
- It limits the size of the result and scales confidences according to the `scoring` parameters `max_num_assignments` and `assignment_threshold`.

Steps:
1. Define the custom scoring function.
2. Delete deployed scoring function (if present)
3. Optional: Test the custom scoring function locally.
4. Deploy the custom scoring function.
5. Test the deployed scoring function.

In [None]:
scoring_function_name = prefix + "_tp_scoring"

# Deployment name
scoring_function_deployment_name = scoring_function_name + "_deployment"

# Name to be used in URL for custom scoring
scoring_function_serving_name = scoring_function_name

print("Scoring function names defined.")

### Delete the deployed scoring function (if present)

This step is optional when running the notebook for the first time with this prefix.

In [None]:
def delete_functions(name):
    print(f"Deleting functions with name '{name}'")
    num_functions_deleted = 0
    details = wml_client.repository.get_function_details()
    resources = details.get('resources', [])
    for resource in resources:
        if resource.get('metadata', {}).get('name', '') == name:
            id = wml_client.repository.get_function_id(resource)
            wml_client.repository.delete(id)
            num_functions_deleted += 1
    print(f"\t{num_functions_deleted} deleted.")

delete_deployments(scoring_function_serving_name)
delete_functions(scoring_function_name)

## Create the custom scoring function

In [None]:
max_num_assignments = parameters['scoring']['max_num_assignments']
assignment_threshold = parameters['scoring']['assignment_threshold']
classes = term_ids

def tp_score(wml_url=cp4d_url, model_deployment_id=model_deployment_id, wml_space_id=wml_space_id, classes=classes, max_num_assignments=max_num_assignments, assignment_threshold=assignment_threshold):
    from ibm_watson_studio_lib import access_project_or_space
    from ibm_watson_machine_learning import APIClient
    from preprocessing_and_scoring_support.probability_to_confidence_converter import ProbabilityToConfidenceConverter

    def convert_output(model_response):
        # add the list of classes at the top level
        predictions = model_response['predictions']
        prediction = predictions[0]
        prediction_values = prediction['values']
        new_prediction_values = []

        ptcc = ProbabilityToConfidenceConverter(max_num_assignments=max_num_assignments, assignment_threshold=assignment_threshold)

        # Iterate over prediction results per asset and replace them with values obtained from the ProbabilityToConfidenceConverter
        for prediction_value in prediction_values:
            probabilities = prediction_value[1]
            predicted_confidences, predicted_term_ids = ptcc.compute_confidences(probabilities, classes)
            new_prediction_values.append([predicted_term_ids, predicted_confidences])
        return {'predictions': [{'fields': ['term_ids', 'confidences'], 'values': new_prediction_values}]}  

    def score(payload):
        try:
            token = access_project_or_space().auth.get_current_token()
            client = APIClient({'instance_id': 'openshift', 'token': token, 'url': wml_url, 'version': '4.5'})
            client.set.default_space(wml_space_id)
            model_response = client.deployments.score(model_deployment_id, payload)
            return convert_output(model_response)
        except Exception as e:
            return {'error': repr(e)}
            
    return score

print("Scoring function defined")

### Optional: Test the custom scoring function locally

This test can only be run in IBM Watson Studio since ithe custom scoring function depends on the `ibm_watson_studio_lib`.

In [None]:
if pkgutil.find_loader('ibm_watson_studio_lib') is not None:
    import json

    test_prediction = tp_score()(scoring_payload)
    print(json.dumps(test_prediction, indent=2))
else:
    print("The local test for custom scoring is only available when running this notebook in IBM Watson Studio.")

### Store and deploy the custom scoring function

In [None]:
def deploy_custom_scoring_function():

    meta_props_scoring_function = {
        wml_client.repository.FunctionMetaNames.NAME: scoring_function_name,
        wml_client.repository.FunctionMetaNames.DESCRIPTION: "Scoring function for term prediction",
        wml_client.repository.FunctionMetaNames.SOFTWARE_SPEC_UID: software_specification_id
    }

    # Store function in WML space
    scoring_function_details = wml_client.repository.store_function(meta_props=meta_props_scoring_function, function=tp_score)
    scoring_function_id = wml_client.repository.get_function_id(scoring_function_details)

    # Deploy function
    meta_props_scoring_function_deployment = {
        wml_client.deployments.ConfigurationMetaNames.NAME: scoring_function_deployment_name,
        wml_client.repository.FunctionMetaNames.DESCRIPTION: "Deployment of scoring function for term prediction",
        wml_client.deployments.ConfigurationMetaNames.ONLINE: { "parameters": { "serving_name": scoring_function_serving_name } }
    }
    scoring_function_deployment_id = None
    try:
        scoring_function_deployment_details = wml_client.deployments.create(scoring_function_id, meta_props=meta_props_scoring_function_deployment)
        scoring_function_deployment_id = wml_client.deployments.get_id(scoring_function_deployment_details)
    except WMLClientError as e:
        if "non_unique_serving_name" in e.error_msg:
            print(f"{HIGHLIGHT} A scoring function with serving name '{scoring_function_serving_name}' is already deployed. Delete the existing deployment '{scoring_function_deployment_name}' or use a different prefix/name.")
    return scoring_function_deployment_id

scoring_function_deployment_id = deploy_custom_scoring_function()

### Test the deployed scoring function

In [None]:
import json

deployment_prediction = wml_client.deployments.score(scoring_function_serving_name, scoring_payload)

print(json.dumps(deployment_prediction, indent=2))

## Enable MDE asset for term prediction

To enable an MDE asset for term prediction based on the deployed scoring function...
- Open its Default Settings.
- Select `Custom service` under `Term assignment methods to use` > `Machine learning`.
- Click 'Select service'.
- Run the cell below and enter the names of the deployment space and scoring function deployment.
- Click `Test deployment` to verify that the scoring function is deployed.
- Click `Next`.
- Enter the input transformation code and output transformation code.
- Review and apply by clicking `Select`.


In [None]:
def print_values():
    wml_space_details = wml_client.spaces.get_details(space_id=wml_space_id)
    print(f"Deployment space: {wml_space_details['entity']['name']}")
    print(f"Deployment:       {scoring_function_deployment_name}")

    input_transformation_code = "{\"input_data\":[{\"values\":$append([ [$$.metadata.name, \"\"] ], $$.entity.data_asset.columns.[[$$.metadata.name, name]])}]}"
    output_transformation_code = "{\"term_assignments\": predictions[0].values ~> $map(function($x){function($z){$count($z) > 1? $z : [$z]}($x[0] ~> $zip($x[1]) ~> $map(function($y){{\"term_id\": $y[0], \"confidence\": $y[1]}})) })}"

    print(f"Input transformation code:  {input_transformation_code}")
    print(f"Output transformation code: {output_transformation_code}")

print_values()

### Optional: Remove source code for custom library

Delete the source files required to build the custom preprocessing and scoring library from the current directory of your build environment. These files can be re-created any time by rerunning the cells of section 'Create a library with custom conversion logic for preprocessing and scoring'.

In [None]:
%%bash

rm -rf term_prediction_model/
rm -f preprocessing_and_scoring_support-0.1.zip

echo "Source files for custom library deleted from workspace."