**Post-Processing Amazon Textract with Location-Aware Transformers**

# Optional Extras

> *This notebook works well with the `Data Science 3.0 (Python 3)` kernel on SageMaker Studio - use the same as for NB1*

This notebook discusses optional extra/alternative steps separate from the typical pipeline setup flow. You won't typically need to run these steps unless specifically instructed.

## Common setup

First, as usual, we'll set up and import required libraries. You should run these cells regardless of which optional section(s) you're using:

In [None]:
!pip install sagemaker-studio-image-build "sagemaker>=2.87,<3"

In [None]:
%load_ext autoreload
%autoreload 2

# Python Built-Ins:
import json
from logging import getLogger
import os
import time

# External Dependencies:
import boto3  # General-purpose AWS SDK for Python
import sagemaker  # High-level Python SDK for Amazon SageMaker

# Local Dependencies:
import util

logger = getLogger()

# Configuration:
bucket_name = sagemaker.Session().default_bucket()
bucket_prefix = "textract-transformers/"
print(f"Working in bucket s3://{bucket_name}/{bucket_prefix}")
config = util.project.init("ocr-transformers-demo")
print(config)

# AWS service clients:
smclient = boto3.client("sagemaker")
ssm = boto3.client("ssm")

## Contents

The sections of this notebook are independent:

- **[Manual thumbnail generator setup](#Manual-thumbnail-generator-setup)**: Customise online page thumbnail generation endpoint
- **[Optimise costs with endpoint auto-scaling](#Optimise-costs-with-endpoint-auto-scaling)**: Configure your SageMaker endpoint(s) to auto-scale based on incoming request volume
- **[Experimenting with alternative OCR engines](#Experimenting-with-alternative-OCR-engines)**: Substitute Amazon Textract with open-source OCR tools, for use with unsupported languages

---

## Manual thumbnail generator setup

> This section walks through manually building and configuring the endpoint to generate resized page thumbnail images in real time.
>
> You may find it useful if you want to customise the container image or script used by this process, or if you deployed your pipeline without thumbnailing support but want to experiment with image-based models from notebooks.
>
> ⚠️ **Note:** Deploying and registering a thumbnailing endpoint from the notebook will still not turn on thumbnail generation in a pipeline deployed without support for it. Instead, refer to your CDK app parameters to ensure the pipeline state machine gets updated to include a thumbnail generation step.

### Build and register custom container image

The tools we use to read PDF files aren't installed by default in the pre-built SageMaker containers and aren't `pip install`able, so the thumbnail generator will need a custom container image. We can derive a custom image from an existing AWS DLC serving container, to minimise boilerplate code because a SageMaker-compatible serving stack will already be included.

Because SageMaker Studio kernels are already containerized, you won't be able to run typical `docker build` commands you may be used to: So we'll use the [SageMaker Studio Image Build CLI](https://github.com/aws-samples/sagemaker-studio-image-build-cli) to build the image and store it in your account's [Amazon Elastic Container Registry (ECR)](https://aws.amazon.com/ecr/):

In [None]:
# Configurations:
preproc_ecr_repo_name = "sm-ocr-preproc"
preproc_ecr_image_tag = "pytorch-1.10-inf-cpu"

preproc_framework_version = "1.10"
preproc_py_version = "py38"

base_image_uri = sagemaker.image_uris.retrieve(
    framework="pytorch",
    region=os.environ["AWS_REGION"],
    instance_type="ml.c5.xlarge",  # (Just used to check whether GPUs/accelerators are used)
    py_version=preproc_py_version,
    image_scope="inference",  # Inference base because we'll also deploy as an endpoint later
    version=preproc_framework_version,
)

# Combine together into the final URI (not needed for the build, but used later in the notebook):
account_id = sagemaker.Session().account_id()
region = os.environ["AWS_REGION"]
preproc_ecr_image_uri = "{}.dkr.ecr.{}.amazonaws.com/{}:{}".format(
    account_id, region, preproc_ecr_repo_name, preproc_ecr_image_tag
)
print(f"Will build to {preproc_ecr_image_uri}")

In [None]:
%%time
# (No need to re-run this cell if your image is already in ECR)

# Actually build & push the container image:
!cd custom-containers/preproc && sm-docker build . \
    --repository {ecr_repo_name}:{ecr_image_tag} \
    --role {config.sm_image_build_role} \
    --build-arg BASE_IMAGE={base_image_uri}

In [None]:
# Check from notebook whether the image was successfully created:
ecr = boto3.client("ecr")
imgs_desc = ecr.describe_images(
    registryId=account_id,
    repositoryName=preproc_ecr_repo_name,
    imageIds=[{"imageTag": preproc_ecr_image_tag}],
)
assert len(imgs_desc["imageDetails"]) > 0, "Couldn't find ECR image {} after build".format(
    preproc_ecr_image_uri
)

### Deploy and test the thumbnailer endpoint

Because the custom image is based on the standard SageMaker PyTorch inference container, our [preproc/preproc.py](preproc/preproc.py) script can [work with the existing serving stack](https://sagemaker.readthedocs.io/en/stable/frameworks/pytorch/using_pytorch.html#id3) by exposing custom `model_fn`, `input_fn`, `predict_fn`, and/or `output_fn` functions.

We'll bundle the scripts into a `.tar.gz` file in the format the PyTorch container expects: With inference code in a `code/` subfolder.

Normally this process (and the setting of the `SAGEMAKER_PROGRAM` and `SAGEMAKER_SUBMIT_DIRECTORY` environment variables) is handled automatically by the `PyTorchModel` - which allows "re-packing" the tarball from a training job to create a new tarball with new `source_dir` and `entry_point` scripts. In this case though, we don't need such a two-step process because there's no training artifact to start from and no actual "model" in this tarball - PyTorch or otherwise. Our script just defines code to extract and resize page images, and a dummy `model_fn` so the endpoint won't crash from failing to find a model.

In [None]:
# Compress the archive locally and list the compressed contents:
preproc_model_path = util.deployment.tar_as_inference_code("preproc", "data/preproc-model.tar.gz")
print(f"(Re)-created {preproc_model_path}")
!tar -ztvf {preproc_model_path}
print()

# Upload to S3:
preproc_model_key = "".join((
    bucket_prefix,
    "preproc-model/",
    util.uid.append_timestamp("model"),  # (Maintain history in S3)
    ".tar.gz"
))
preproc_model_s3uri = f"s3://{bucket_name}/{preproc_model_key}"
!aws s3 cp {preproc_model_path} {preproc_model_s3uri}

Once a `model.tar.gz` is available on S3, we're ready to create and deploy a SageMaker "Model" and Endpoint.

In [None]:
from sagemaker.pytorch import PyTorchModel

if config.thumbnails_callback_topic_arn.startswith("arn:"):
    async_notification_config = {
        "SuccessTopic": config.thumbnails_callback_topic_arn,
        "ErrorTopic": config.thumbnails_callback_topic_arn,
    }
else:
    logger.warning("Pipeline stack deployed without thumbnailing callback topic")
    async_notification_config = {}


class PatchedPyTorchModel(PyTorchModel):
    """Modified PyTorchModel to allow manually setting SM Script Mode environment vars

    See: https://github.com/aws/sagemaker-python-sdk/issues/3361
    """

    def prepare_container_def(self, *args, **kwargs):
        # Call the parent function:
        result = super().prepare_container_def(*args, **kwargs)
        # ...But allow our manual env vars configuration to override the internals:
        manual_env = dict(self.env)
        result["Environment"].update(manual_env)
        return result


preproc_model = PatchedPyTorchModel(
    name=util.uid.append_timestamp("ocr-thumbnail"),
    model_data=preproc_model_s3uri,
    entry_point=None,  # Set manually via tarball and SAGEMAKER_PROGRAM
    framework_version="1.10",
    py_version="py38",
    image_uri=preproc_ecr_image_uri,
    role=sagemaker.get_execution_role(),
    env={
        "PYTHONUNBUFFERED": "1",
        "MMS_MAX_REQUEST_SIZE": str(100*1024*1024),  # 100MiB instead of default ~6.2MiB
        "MMS_MAX_RESPONSE_SIZE": str(100*1024*1024),  # 100MiB instead of default ~6.2MiB
        "SAGEMAKER_PROGRAM": "preproc.py",
    },
)

preproc_predictor = preproc_model.deploy(
    initial_instance_count=1,
    instance_type="ml.m5.xlarge",
    async_inference_config=sagemaker.async_inference.AsyncInferenceConfig(
        output_path=f"s3://{config.model_results_bucket}/preproc",
        max_concurrent_invocations_per_instance=2,
        notification_config=async_notification_config,
    ),
)

This endpoint accepts images or documents and outputs resized page thumbnail images.

For multi-page documents the main output format is `application/x-npz`, which produces a [compressed numpy archive](https://numpy.org/doc/stable/reference/generated/numpy.savez_compressed.html#numpy.savez_compressed) in which `images` is an **array of images** each represented by **PNG bytes**. These formats require customizing the client (predictor) *serializer* and *deserializer* from the default for PyTorch. Since `Predictor` de/serializers set the `Content-Type` and `Accept` headers, we'll also need to re-configure the serializer whenever switching between input document types (for example PDF vs PNG).

To support potentially large documents, the preprocessor is deployed to an **asynchronous** endpoint which enables larger request and response payload sizes.

So how would it look to test the endpoint from Python? Let's see an example:

In [None]:
%%time

# Choose an input (document or image):
input_file = "data/raw/121 Financial Credit Union/Visa Credit Card Agreement.pdf"
#input_file = "data/imgs-clean/121 Financial Credit Union/Visa Credit Card Agreement-0001-1.png"

# Ensure de/serializers are correctly set up:
preproc_predictor.serializer = util.deployment.FileSerializer.from_filename(input_file)
preproc_predictor.deserializer = util.deployment.CompressedNumpyDeserializer()
# Duplication because of https://github.com/aws/sagemaker-python-sdk/issues/3100
preproc_predictor.predictor.serializer = preproc_predictor.serializer
preproc_predictor.predictor.deserializer = preproc_predictor.deserializer

# Run prediction:
print("Calling endpoint...")
resp = preproc_predictor.predict(input_file)
print(f"Got response of type {type(resp)}")

# Render result:
util.viz.draw_thumbnails_response(resp)

### Connect thumbnailer to the deployed processing pipeline

Once your thumbnailer endpoint is deployed and working, you can connect it into your document processing pipeline via SSM parameter configuration - just like the main enrichment model. This will only have an effect if your pipeline was already deployed with thumbnailing enabled, so the cell below will first check whether that seems to be the case.

In [None]:
if config.thumbnails_callback_topic_arn == "undefined":
    raise ValueError(
        "This pipeline CDK stack was deployed with thumbnailing disabled (by setting parameter "
        "use_thumbnails=False). Either redeploy the CDK stack with updated settings to enable "
        "thumbnailing, or continue without (and consider deleting the thumbnailing endpoint you "
        "created, to save unnecessary cost)."
    )

thumbnail_endpoint_name = preproc_predictor.endpoint_name
print(f"Configuring pipeline with thumbnailer: {thumbnail_endpoint_name}")

ssm.put_parameter(
    Name=config.thumbnail_endpoint_name_param,
    Overwrite=True,
    Value=thumbnail_endpoint_name,
)

### Clean up experimental models

Clean up any endpoints you created that are no longer required, to free up resources and avoid unnecessary ongoing costs. The below code demonstrates how to delete an endpoint, and its associated configuration & model records. you may also like to clean up the `preproc-model/` S3 folder to remove any old draft versions.

> ⚠️ **Note:** If you delete the active endpoint/model your deployed pipeline is configured to use for thumbnailing, your pipeline will fail to process new documents.

---

*[Back to contents](#Contents)*

## Optimise costs with endpoint auto-scaling

> This section demonstrates how you can enable and customise auto-scaling on your SageMaker endpoints to optimise resource use and cost.
>
> **Note:** For endpoints automatically deployed by the pipeline stack (such as the thumbnail generator), there are options available to configure this directly in CDK - which you may prefer.

SageMaker Async Inference endpoints support [auto-scaling down to zero instances](https://docs.aws.amazon.com/sagemaker/latest/dg/async-inference-autoscale.html) when not in use, which can provide significant cost-savings for use cases where document processing is occasional and the pipeline is often idle.

⏰ **However:** You should be aware that enabling scale-to-zero can introduce cold-start delays of **several minutes** if requests arrive when all instances backing your endpoint have been shut down.

### Setting up auto-scaling

You can configure auto-scaling for your endpoint(s) by first registering them with the [application auto-scaling service](https://docs.aws.amazon.com/autoscaling/application/userguide/what-is-application-auto-scaling.html) and then applying a scaling policy as shown in the following cells.

First, configure which SageMaker endpoint you want to auto-scale by name. SageMaker endpoints may be backed by multiple [variants](https://docs.aws.amazon.com/sagemaker/latest/dg/model-ab-testing.html) which can scale independently, but this sample only typically uses the default "AllTraffic" variant.

In [None]:
# For example, maybe you want to configure whichever enrichment model is currently in pipeline:
endpoint_name = ssm.get_parameter(
    Name=config.sagemaker_endpoint_name_param,
)["Parameter"]["Value"]

# Default variant name unless you know otherwise:
variant_name = "AllTraffic"

print(f"Configuring endpoint name:\n  {endpoint_name}")
print(f"Configuring variant name:\n  {variant_name}")

resource_id = f"endpoint/{endpoint_name}/variant/{variant_name}"
print(f"\nAuto-scaling resource ID:\n  {resource_id}")

In [None]:
endpoint_name = "ocr-thumbnail-2022-10-14-03-37-58-529"
variant_name = "AllTraffic"

From your endpoint and variant name, register a scalable target to configure overall limits:

In [None]:
appscaling = boto3.client("application-autoscaling")

# Define and register your endpoint variant
appscaling.register_scalable_target(
    ServiceNamespace="sagemaker",
    ResourceId=resource_id,
    ScalableDimension="sagemaker:variant:DesiredInstanceCount",
    MinCapacity=0,  # (MinCapacity 0 not supported with real-time endpoints)
    MaxCapacity=5,
)
print(f"Endpoint registered with auto-scaling service: {endpoint_name}")

We can also list any scaling policies that may already be active on this resource:

In [None]:
appscaling.describe_scaling_policies(ResourceId=resource_id, ServiceNamespace="sagemaker")

As discussed in the [SageMaker Asynchronous Inference Developer Guide](https://docs.aws.amazon.com/sagemaker/latest/dg/async-inference-autoscale.html), the typical recommended scaling policy for asynchronous endpoints is to track a target on the number of queued requests per active instance - `ApproximateBacklogSizePerInstance`.

However, ⚠️ setting this target value `>=1.0` can yield **un-bounded latency** for single requests arriving when the endpoint has scaled off to 0 instances - because scale-out will not be triggered until a big enough queue has formed.

You can **combine multiple policies** to set up backlog target tracking but also ensure at least one instance gets started when any requests are in queue, using the alternative `HasBacklogWithoutCapacity` metric:

In [None]:
# Main backlog-per-instance target tracking policy:
scaling_policy_resp = appscaling.put_scaling_policy(
    PolicyName="BacklogTargetTracking",
    ServiceNamespace="sagemaker",
    ResourceId=resource_id,
    ScalableDimension="sagemaker:variant:DesiredInstanceCount",
    PolicyType="TargetTrackingScaling",
    TargetTrackingScalingPolicyConfiguration={
        "TargetValue": 4.0,
        "CustomizedMetricSpecification": {
            "MetricName": "ApproximateBacklogSizePerInstance",
            "Namespace": "AWS/SageMaker",
            "Dimensions": [
                {"Name": "EndpointName", "Value": endpoint_name},
            ],
            "Statistic": "Average",
        },
        "ScaleInCooldown": 5 * 60,  # (seconds)
        "ScaleOutCooldown": 4 * 60,  # (seconds)
    },
)
print(f"Created/updated scaling policy ARN:\n{scaling_policy_resp['PolicyARN']}")

In [None]:
# Extra policy to ensure one-off requests get processed promptly:
scaling_policy_resp = appscaling.put_scaling_policy(
    PolicyName="BootstrapSingleRequests",
    ServiceNamespace="sagemaker",
    ResourceId=resource_id,
    ScalableDimension="sagemaker:variant:DesiredInstanceCount",
    PolicyType="StepScaling",
    StepScalingPolicyConfiguration={
        "AdjustmentType": "ChangeInCapacity",
        "StepAdjustments": [{"MetricIntervalLowerBound": 1.0, "ScalingAdjustment": +1}],
        "Cooldown": 150,  # (Seconds)
        "MetricAggregationType": "Average",
    },
)
print(f"Created/updated scaling policy ARN:\n{scaling_policy_resp['PolicyARN']}")

Your endpoint should now be set up to auto-scale. Refer to the [Endpoints section of the SageMaker Console](https://console.aws.amazon.com/sagemaker/home?#/endpoints) on the detail page for your target endpoint to check.

### Disabling auto-scaling

If you'd like to de-register an endpoint from auto-scaling, you can delete attached policies and de-register the target as shown below:

In [None]:
policies = appscaling.describe_scaling_policies(
    ResourceId=resource_id,
    ServiceNamespace="sagemaker",
)["ScalingPolicies"]

print(f"Deleting scaling policies for {resource_id}:")
time.sleep(3)

for policy in policies:
    appscaling.delete_scaling_policy(
        PolicyName=policy["PolicyName"],
        ServiceNamespace=policy["ServiceNamespace"],
        ResourceId=policy["ResourceId"],
        ScalableDimension=policy["ScalableDimension"],
    )
    print(f" - {policy['PolicyName']}")
print("\nDone")

In [None]:
print(f"De-registering from auto-scaling:\n  {resource_id}")
time.sleep(3)

appscaling.deregister_scalable_target(
    ServiceNamespace="sagemaker",
    ResourceId=resource_id,
    ScalableDimension="sagemaker:variant:DesiredInstanceCount",
)
print("Done")

---

*[Back to contents](#Contents)*

## Experimenting with alternative OCR engines

> This section demonstrates how to process a batch of documents using alternative, open-source-based OCR engines on Amazon SageMaker - in case you have a use case requiring languages not yet supported by Amazon Textract.

As detailed further in the [Customization Guide](../CUSTOMIZATION_GUIDE.md) - You can use alternative, open-source-based OCR engines with this solution if needed, by packaging them to produce Amazon Textract-compatible result formats and integrating them with the pipeline, for which we use Amazon SageMaker Asynchronous Inference for consistency with other steps.

First, (re)-deploy your solution with the `BUILD_SM_OCRS` variable set, to create container image(s) and SageMaker model(s) for your chosen OCR engine(s).

Because resource tags are automatically added to these deployed models, you'll be able to look them up using the same name - by the code below. For example, `ocr_engine_name=tesseract` in the notebook assumes `BUILD_SM_OCRS=...,tesseract,...` at CDK deploy time:

In [None]:
ocr_engine_name = "tesseract"
ocr_model_desc = util.ocr.describe_sagemaker_ocr_model(ocr_engine_name)

print(f"Found OCR engine {ocr_engine_name}:\n  {ocr_model_desc['ModelName']}")

ocr_image_uri = ocr_model_desc["PrimaryContainer"]["Image"]
print(f"\nImage: {ocr_image_uri}")
ocr_environment = ocr_model_desc["PrimaryContainer"]["Environment"]
print(f"Environment variables:\n{ocr_environment}")

Just like with batch page image generation in notebook 1, we'll use a SageMaker Processing Job to run the work on a scalable cluster of instances. The input document locations are specified the same way as for page image generation, so the code below takes the whole corpus (S3 prefix) for simplicity.

> ⏰ If you'd like to select **just a subset of documents**, you can instead set `ocr_inputs` using the same manifest-based "OPTION 2" approach shown to set `preproc_inputs` in the *Extract clean input images* section of [Notebook 1](1.%20Data%20Preparation.ipynb).

In [None]:
from sagemaker.processing import FrameworkProcessor, ProcessingInput, ProcessingOutput
from util.preproc import DummyFramework

# S3 input & output locations:
raw_s3uri = f"s3://{bucket_name}/{bucket_prefix}data/raw"
textract_s3uri = f"s3://{bucket_name}/{bucket_prefix}data/textracted"

# (Assuming whole corpus - see NB1 image pre-processing for manifest-based example)
ocr_inputs = [
    ProcessingInput(
        destination="/opt/ml/processing/input/raw",  # Expected input location, per our script
        input_name="raw",
        s3_data_distribution_type="ShardedByS3Key",  # Distribute between instances, if multiple
        source=raw_s3uri,  # S3 prefix for full raw document collection
    ),
]

After defining the input and output locations, and with our pre-prepared container image identified, we're ready to run the job.

> ⏰ In our tests, the provided Tesseract OCR integration took around 35 minutes on 5x `ml.c5.4xlarge` instances, to process the full ~2,500 document credit cards corpus for English and Thai.

In [None]:
%%time

processor = FrameworkProcessor(
    estimator_cls=DummyFramework,
    image_uri=ocr_image_uri,  # As created above
    framework_version=None,
    base_job_name="ocr-custom",
    role=sagemaker.get_execution_role(),
    instance_count=5,
    instance_type="ml.c5.4xlarge",
    volume_size_in_gb=16,
    max_runtime_in_seconds=60*60,
    env={
        "OMP_THREAD_LIMIT": "1",  # Optimize Tesseract parallelism for batch
        "PYTHONUNBUFFERED": "1",  # For debugging
        **ocr_environment,
        # Override defaults from the model env vars like this:
        "OCR_DEFAULT_LANGUAGES": "eng,tha",
    },
)

processor.run(
    code="ocr.py",  # OCR script
    source_dir="preproc",
    inputs=ocr_inputs[:],  # Either whole corpus or sample, as above
    outputs=[
        ProcessingOutput(
            destination=textract_s3uri,
            output_name="ocr",
            s3_upload_mode="Continuous",
            source="/opt/ml/processing/output/ocr",  # Output folder, per our script
        ),
    ],
    #logs=False,
    #wait=False,
)

Once the job is complete, you can crawl the results on Amazon S3 to build up an equivalent manifest ready for the next stage of data preparation:

In [None]:
# Given that raw docs live under some S3 prefix:
raw_s3uri_prefix = raw_s3uri

# ...And Amazon Textract results live under another:
textract_s3uri = f"s3://{bucket_name}/{bucket_prefix}data/textracted"


# ...And you can define a mapping from one to the other:
def doc_uri_to_textract_uri(doc_uri: str) -> str:
    if not doc_uri.startswith(raw_s3uri_prefix):
        raise ValueError(
            "Document S3 URI '%s' did not start with expected prefix: '%s'"
            % (doc_uri, raw_s3uri_prefix)
        )
    # Replace raw prefix with Textract prefix, and add "/consolidated.json" to filename:
    return textract_s3uri + doc_uri[len(raw_s3uri_prefix):] + "/consolidated.json"

# Then build up the combined manifest, checking existence for each result:
out_filename = "data/textracted-all-smocr.manifest.jsonl"
print(f"Building manifest: {out_filename} ...")
with open("data/raw-sample.manifest.jsonl") as fin:
    with open(out_filename, "w") as fout:
        for doc in (json.loads(line) for line in fin):
            textract_uri = doc_uri_to_textract_uri(doc["raw-ref"])
            if not util.s3.s3_object_exists(textract_uri):
                raise ValueError(
                    "Mapped OCR result URI does not exist in S3.\nFor: %s\nGot: %s"
                    % (doc["raw-ref"], textract_uri)
                )
            doc["textract-ref"] = textract_uri
            fout.write(json.dumps(doc) + "\n")
print("Done!")

The above steps demonstrate how to process documents in batch with alternative, open-source OCR engines, to produce datasets ready for experimenting with multi-lingual model architectures like LayoutXLM. To actually deploy the alternative OCR into your document pipeline, use the `DEPLOY_SM_OCR` and `USE_SM_OCR` variables at CDK deployment. You'll likely want to update `OCR_DEFAULT_LANGUAGES` in [/pipeline/ocr/sagemaker_ocr.py](../pipeline/ocr/sagemaker_ocr.py) to align with your use case's language needs.

---

*[Back to contents](#Contents)*