## Amazon Comprehend Custom

[Amazon Comprehend](https://aws.amazon.com/comprehend/) provides pre-trained and user-trainable tools for discovering insights and relationships from text.

Pre-trained [features](https://aws.amazon.com/comprehend/features/) include:

- Sentiment analysis
- Topic analysis
- Keyphrase extraction
- Syntax analysis
- Entity recognition
- Medical entity recognition and ontology linking
- Language detection
- PII detection and masking
- Event data extraction

With [Comprehend Custom](https://docs.aws.amazon.com/comprehend/latest/dg/auto-ml.html), users can train custom **classification** or **entity detection** models on domain-specific datasets.

In this example, we'll demonstrate the basics by training a **custom topic classifier** on the Yahoo answers corpus cited in the paper [Text Understanding from Scratch](https://arxiv.org/abs/1502.01710) by Xiang Zhang and Yann LeCun.

## Fetching the data

The raw dataset is available on the [AWS Open Data Registry](https://registry.opendata.aws/fast-ai-nlp/), but we'll use a pre-process version associated with ["Building a custom classifier using Amazon Comprehend"](https://aws.amazon.com/blogs/machine-learning/building-a-custom-classifier-using-amazon-comprehend/) - from the AWS Machine Learning blog.

In [None]:
!aws s3 cp s3://aws-ml-blog/artifacts/comprehend-custom-classification/comprehend-test.csv ./data/comprehend/
!aws s3 cp s3://aws-ml-blog/artifacts/comprehend-custom-classification/comprehend-train.csv ./data/comprehend/

## Setting up

For this workshop, our S3 bucket and IAM roles have already been set up. We just need to look up their names, and then upload the prepared data to S3.

In [None]:
# Python Built-Ins:
import json
import os
import time

# External Dependencies:
import boto3

# The S3 bucket has already been created & named for us:
account_id = os.environ["AWS_ACCOUNT_ID"]
region = os.environ["AWS_DEFAULT_REGION"]
bucket_name = f"comprehend-custom-{account_id}-{region}"

comprehend = boto3.client("comprehend")
s3 = boto3.resource("s3")
bucket = s3.Bucket(bucket_name)

print(f"Using S3 bucket: {bucket_name}")

In [None]:
bucket.Object(f"data/comprehend-train.csv").upload_file(f"data/comprehend/comprehend-train.csv")
bucket.Object(f"data/comprehend-test.csv").upload_file(f"data/comprehend/comprehend-test.csv")

In [None]:
# The Comprehend Execution Role has also been created already, but we need to look it up:
ssm = boto3.client("ssm")
ssm_response = ssm.get_parameters(Names=["/workshop/ComprehendRoleArn"])

try:
    comprehend_param = next(
        param for param in ssm_response["Parameters"]
        if param["Name"] == "/workshop/ComprehendRoleArn"
    )
    comprehend_role_arn = comprehend_param["Value"]
    print(comprehend_role_arn)
except StopIteration as e:
    print(ssm_response)
    raise ValueError("Couldn't retrieve Comprehend Execution Role from SSM") from e

## Create the Comprehend Model

With the data uploaded to S3, we're ready to create or Comprehend Custom model.

You could follow this process either through the [Comprehend Console](https://console.aws.amazon.com/comprehend/v2/home?#classification) or by running the code cells below.

If you create your classifier in the console instead, just be sure to declare the `cls_arn` in this notebook (available on the classifier details screen in the console).

In [None]:
create_cls_resp = comprehend.create_document_classifier(
    DocumentClassifierName="yahoo-answers",
    DataAccessRoleArn=comprehend_role_arn,
    InputDataConfig={
        "DataFormat": "COMPREHEND_CSV", # (Or "AUGMENTED_MANIFEST" for SM Ground Truth)
        "S3Uri": f"s3://{bucket_name}/data/comprehend-train.csv",
    },
    OutputDataConfig={
        'S3Uri': f"s3://{bucket_name}/training-output",
    },
    LanguageCode="en", # See also 'es'|'fr'|'de'|'it'|'pt'|'ar'|'hi'|'ja'|'ko'|'zh'|'zh-TW'
)

print(create_cls_resp)
cls_arn = create_cls_resp["DocumentClassifierArn"]

Classifier training can take time, depending on the size and complexity of your dataset, so the above operation is asynchronous. Below, we set up a polling loop to wait for the classifier to become ready.

In [None]:
status = None
pending_statuses = { "SUBMITTED", "TRAINING", "DELETING", "STOP_REQUESTED" }
success_statuses = { "TRAINED" }
fail_statuses = { "STOPPED", "IN_ERROR" }

while True:
    cls_desc = comprehend.describe_document_classifier(
        DocumentClassifierArn=cls_arn,
    )
    status = cls_desc["DocumentClassifierProperties"]["Status"]
    if status in success_statuses:
        print(f"\nReady: {status}")
    elif status in fail_statuses:
        raise ValueError(f"Entered fail state {status}")
    elif status not in pending_statuses:
        raise ValueError(f"Entered unexpected state {status}")
    print(".", end="")
    time.sleep(60)

Once the classifier is trained, you'll be able to see a [range of metrics](https://docs.aws.amazon.com/comprehend/latest/dg/cer-doc-class.html) describing its performance - visible through the Comprehend Console or via APIs (see the `cls_desc` variable from above).

## Running batch jobs

Once the model is trained (regardless of whether it was a classifier or entity detector), we can either:

- Deploy it to an endpoint for real-time inference, or
- Run batch jobs to process defined datasets

In general, using the right deployment type for your use case will be more economical and simpler to orchestrate. See the [Comprehend pricing page](https://aws.amazon.com/comprehend/pricing/) for details on how each option is charged.

In this example, we already have a test dataset and would like to review model performance on it - so we'll set up a batch job:

In [None]:
create_job_resp = comprehend.start_document_classification_job(
    #JobName='string',
    DocumentClassifierArn=cls_arn,
    InputDataConfig={
        "S3Uri": f"s3://{bucket_name}/data/comprehend-test.csv",
        "InputFormat": "ONE_DOC_PER_LINE",
    },
    OutputDataConfig={
        "S3Uri": f"s3://{bucket_name}/results",
    },
    DataAccessRoleArn=comprehend_role_arn,
)

print(create_job_resp)
job_id = create_job_resp["JobId"]
# Also JobStatus: 'SUBMITTED'|'IN_PROGRESS'|'COMPLETED'|'FAILED'|'STOP_REQUESTED'|'STOPPED'

As with training, this is an asynchronous process so we'll have to wait for the job to finish before the results are available:

In [None]:
status = None
pending_statuses = { "SUBMITTED", "IN_PROGRESS", "STOP_REQUESTED" }
success_statuses = { "COMPLETED" }
fail_statuses = { "STOPPED", "FAILED" }

while True:
    job_desc = comprehend.describe_document_classification_job(
        JobId=job_id,
    )
    status = job_desc["DocumentClassificationJobProperties"]["JobStatus"]
    if status in success_statuses:
        print(f"\nReady: {status}")
    elif status in fail_statuses:
        raise ValueError(f"Entered fail state {status}")
    elif status not in pending_statuses:
        raise ValueError(f"Entered unexpected state {status}")
    print(".", end="")
    time.sleep(30)

## Analysing batch results

Per [the Comprehend Developer Guide](https://docs.aws.amazon.com/comprehend/latest/dg/how-class-run.html), output from classification jobs is a **compressed archive** containing a **single JSON-lines result file**.

We can download the result from S3 to this notebook, and extract the tarball:

In [None]:
!aws s3 cp s3://$bucket_name/results/$job_id/output/output.tar.gz ./data/comprehend/results/output.tar.gz

!cd data/comprehend/results && tar -xvzf output.tar.gz

Each line of this file is a JSON object corresponding to one of our input test documents, so for example we can read the results as follows:

In [None]:
with open("data/comprehend/results/output/output.jsonl", "r") as f:
    for ix, line in enumerate(f):
        res = json.loads(line)
        print(f"Doc result:\n{json.dumps(res, indent=2)}")

        if ix >= 1:
            break