# Document Classification Demo

This notebook is designed to demonstrate the ease of use of the SAP AI Business Service Document Classification for classification tasks. In this demo we are training a model for classification and evaluate its performance.

For the demo, we are using a Jupyter Notebook and make use of this client library to invoke the most important functions of the DC REST API. 

## Dataset

This notebook requires a datset to train a model on. You need to provide this dataset in the folder defined as `dataset_folder` in the 2nd cell of this notebook below.

The Document Classification Service Python Client accepts datasets as a folder with a pair of files ending in .pdf and .json for each document to be used for training. The .pdf file is the original document while the .json file specifies the ground truth annotation.

An example of the format of the .json ground truth file expected by the service is:
```
{
"classification": [
    {
    "characteristic": "color",
    "value": "red"
    },
    {
    "characteristic": "size",
    "value": "big"
    }
]
}
```

## Settings

In [None]:
!pip install sap-document-classification-client

In [None]:
# Environment specific configuration
api_url = ""
uaa_server = ""
client_id = ""
client_secret = ""

# Model specific configuration
model_name = ""
dataset_folder = ""

## Initialize Demo

In [None]:
# import DC client library
from sap_document_classification_client import dc_api_client

In [None]:
# Obtain the dc client api handler 
my_dc_client = dc_api_client.DCApiClient(api_url, client_id, client_secret, uaa_server)

## Display access token

In [None]:
# Token can be used to interact with e.g. swagger UI to explore DC API
print(my_dc_client.session.headers)

## Create Dataset for training of a new model

In [None]:
# Create Training dataset
response = my_dc_client.create_dataset()
training_dataset_id = response["datasetId"]
print("Dataset created with datasetId: {}".format(training_dataset_id))

In [None]:
# Upload training documents to the dataset from training directory
print("Uploading training documents to the dataset")
my_dc_client.upload_documents_directory_to_dataset(training_dataset_id, dataset_folder)
print("Finished uploading training documents to the dataset")

In [None]:
# Pretty print the dataset statistics
from pprint import pprint
print("Dataset statistics")
dataset_stats = my_dc_client.get_dataset_info(training_dataset_id)
pprint(dataset_stats)

In [None]:
# Visualization of label distribution
%matplotlib inline
import matplotlib.pyplot as plt

nrCharacteristics = len(dataset_stats["groundTruths"])
fig, (ax) = plt.subplots(nrCharacteristics,1, figsize=(10, 15), dpi=80, facecolor='w', edgecolor='k')
if nrCharacteristics==1:
    ax = np.array((ax,)) 
for i in range(nrCharacteristics):
    keys = [element["value"] for element in  dataset_stats["groundTruths"][i]["classes"]]
    total = [element["total"] for element in  dataset_stats["groundTruths"][i]["classes"]]
    ax[i].set_ylabel("Absolute")
    ax[i].bar(keys, total)

## Training

In [None]:
# Train the model
print("Start training job from model with modelName {}".format(model_name))
response = my_dc_client.train_model(model_name, training_dataset_id)
pprint(response)
print("Model training finished with status: {}".format(response.get("status")))
if response.get("status") == "SUCCEEDED":
    model_version = response.get("modelVersion")
    print("Trained model: {}".format(model_name))
    print("Trained model version: {}".format(model_version))

In [None]:
# Check training statistics
reponse = my_dc_client.get_trained_model_info(model_name, model_version)
training_details = response.pop("details")
pprint(response)

## Deployment

In [None]:
# Deploy model
response = my_dc_client.deploy_model(model_name, model_version)
pprint(response)

## Classification

In [None]:
# Test usage of the model by classifying a few documents and collecting results and ground truth
import binascii
import time
import json
import numpy as np
from collections import defaultdict

filenames = my_dc_client._find_files(dataset_folder, "*.PDF")
test_filenames = []
for filename in filenames:
    # Check whether it is a test document
    with open(filename, 'rb') as pdf_file:
        is_test_document = (int(str(binascii.crc32(pdf_file.read()))) % 100) in range(90,100)
    if is_test_document:
        test_filenames.append(filename)

# Classify all test documents
responses = my_dc_client.classify_documents(test_filenames, model_name, model_version)

# Iterate over responses and store results in convenient format
test_prediction = defaultdict(lambda : [])
test_probability = defaultdict(lambda : defaultdict(lambda : []))
test_ground_truth = defaultdict(lambda : [])
for response, filename in zip(responses, test_filenames):
    pprint(response)
    try:
        # Parse response from DC service
        prediction = response["predictions"]
        for element in prediction:
            labels = []
            scores = []
            for subelement in element["results"]:
                labels.append(subelement["label"])
                scores.append(subelement["score"])
                test_probability[element["characteristic"]][subelement["label"]].append(subelement["score"])
            test_prediction[element["characteristic"]].append(labels[np.argmax(np.asarray(scores))])
        # Collect ground truth of all test documents
        with open(filename.replace(".pdf", ".json")) as gt_file:
            gt = json.load(gt_file)
        for element in gt["classification"]:
            test_ground_truth[element["characteristic"]].append(element["value"])
    except KeyError:
        print("Document not used")

In [None]:
# display the ground truth and classification result for a certain document with index idx
idx = 0

for i in range(nrCharacteristics):
    characteristic =dataset_stats["groundTruths"][i]["characteristic"]
    print("Ground truth for characteristic '{}'".format(str(characteristic)) + ": '{}'".format(test_ground_truth[str(characteristic)][idx]))

print("Model predictions:")
pprint(responses[idx])

## Find thresholds to avoid all false classifications in test set

In [None]:
# These thresholds can be used for example to define when manual annotation (and ideally feedback into training processes) takes place
thresholds = defaultdict(lambda : defaultdict(lambda : 0))
is_wrong_classification = defaultdict(lambda : [])

for characteristic in test_ground_truth.keys():
    unique_labels = np.unique(np.asarray(test_ground_truth[characteristic]))
    for label in unique_labels:
        # This loop is only necessary if all documents are classified correctly to set thresholds to 1
        thresholds[characteristic][label] = 1
    for idx in range(len(test_ground_truth[characteristic])):
        predicted_label = test_prediction[characteristic][idx]
        is_wrong_classification[characteristic].append(test_prediction[characteristic][idx] != test_ground_truth[characteristic][idx])
        if is_wrong_classification[characteristic][idx]:
            if(thresholds[characteristic][predicted_label] > test_probability[characteristic][predicted_label][idx]):
                thresholds[characteristic][predicted_label] = test_probability[characteristic][predicted_label][idx]
                
for characteristic in test_ground_truth.keys():
    print(characteristic)
    print("{} of {} documents classified wrong".format(sum(is_wrong_classification[characteristic]), len(test_ground_truth[characteristic])))
    pprint({ k:v for k,v in thresholds[characteristic].items() })

# Confusion Matrix

In [None]:
import matplotlib.pyplot as plt
import numpy as np
from sklearn.metrics import confusion_matrix

font = {'size'   : 22}
plt.rc('font', **font)

def plot_confusion_matrix(ax, char, y_true, y_pred, classes,
                          normalize=True,
                          title=None,
                          cmap=plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    if not title:
        if normalize:
            title = "{}: Normalized confusion matrix".format(char)
        else:
            title = "{}: Confusion matrix without normalization".format(char)

    # Compute confusion matrix
    cm = confusion_matrix(y_true, y_pred)
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
    im = ax.imshow(cm, interpolation='nearest', cmap=cmap)

        # We want to show all ticks...
    ax.set(xticks=np.arange(cm.shape[1]),
           yticks=np.arange(cm.shape[0]),
           # ... and label them with the respective list entries
           xticklabels=classes, yticklabels=classes,
           title=title,
           ylabel="True label",
           xlabel="Predicted label",
           xlim=(-0.5,len(classes)-0.5),
           ylim=(-0.5,len(classes)-0.5))


    # Rotate the tick labels and set their alignment.
    plt.setp(ax.get_xticklabels(), rotation=45, ha='right',
             rotation_mode='anchor')

    # Loop over data dimensions and create text annotations.
    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i in range(cm.shape[0]):
        for j in range(cm.shape[1]):
            ax.text(j, i, format(cm[i, j], fmt),
                    ha='center', va='center', color='white' if cm[i, j] > thresh else 'black')
    fig.show()

fig, ax = plt.subplots(len(test_ground_truth.keys()), 1, figsize=(14,28))
if len(test_ground_truth.keys())==1:
    ax = np.array((ax,)) 
for idx, characteristic in enumerate(test_ground_truth.keys()):
    plot_confusion_matrix(ax[idx], characteristic,
                          test_ground_truth[characteristic], 
                          test_prediction[characteristic], 
                          np.unique(np.asarray(test_ground_truth[characteristic])), 
                          normalize=False)
fig.subplots_adjust(hspace=0.5)

## Precision Recall curves

In [None]:
## Visualize PR curve for each characteristic (NOTE this as a bit boring in this example, create a more challenging dataset for algorithm?)
import matplotlib.pyplot as plt
import numpy as np
from sklearn.metrics import precision_recall_curve

def plot_f_score(ax):
    f_scores = np.linspace(0.2, 0.8, num=4)
    for f_score in f_scores:
        x = np.linspace(0.01, 1)
        y = f_score * x / (2 * x - f_score)
        l, = ax.plot(x[y >= 0], y[y >= 0], color='gray', alpha=0.2)
        ax.annotate('f1={0:0.1f}'.format(f_score), xy=(0.9, y[45] + 0.02))

fig, ax = plt.subplots(len(test_ground_truth.keys()), 1, figsize=(12, 24), dpi=80, facecolor='w', edgecolor='k')
if len(test_ground_truth.keys())==1:
    ax = np.array((ax,)) 

for idx, characteristic in enumerate(test_ground_truth.keys()):
    for label in np.unique(np.asarray(test_ground_truth[characteristic])):
        gt = [subelement == label for subelement in test_ground_truth[characteristic]]
        prediction = test_probability[characteristic][label]
        precision, recall, thresholds = precision_recall_curve(gt, prediction)
        ax[idx].plot(recall, precision, label=label)
    ax[idx].set_xlabel('Recall')
    ax[idx].set_ylabel('Precision')
    ax[idx].set_xlim(-0.1,1.1)
    ax[idx].set_ylim(-0.1,1.1)
    ax[idx].set_title('{}: Precision-Recall curves'.format(characteristic))   
    ax[idx].spines["top"].set_visible(False)
    ax[idx].spines["right"].set_visible(False)
    ax[idx].get_xaxis().tick_bottom()
    ax[idx].get_yaxis().tick_left()
    ax[idx].legend()
    ax[idx].grid()
    plot_f_score(ax[idx])

fig.show()