# Multimodal Batch Inference on Amazon Bedrock

This notebook is based on the original amazon-bedrock-samples notebook: <https://github.com/aws-samples/amazon-bedrock-samples/blob/main/introduction-to-bedrock/batch_api/batch-inference-transcript-summarization.ipynb>


## Install required packages


In [None]:
!python -m pip install pip -Uq
!python -m pip install boto3 botocore -Uq

In [None]:
# Restart kernel
from IPython.core.display import HTML

HTML("<script>Jupyter.notebook.kernel.restart()</script>")

## Configuration


In [None]:
import base64
import json
import logging
import os
import time
from datetime import datetime

import boto3

In [None]:
# Setup logging
logger = logging.getLogger(__name__)
logging.basicConfig(
    level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)

# Create Bedrock client for batch inference job
bedrock = boto3.client(service_name="bedrock")

# Create S3 client
s3 = boto3.client("s3")

# Constants (CHANGE ME!)
ROLE_ARN = "<your_role_arn>"
BUCKET_NAME = "<your_bucket_name>"

MODEL_ID = "anthropic.claude-3-5-sonnet-20240620-v1:0"

INPUT_PREFIX = "batch_inference_data"
OUTPUT_PREFIX = "job_outputs"
BATCH_FILE_PREFIX = "batch-image-descriptions"
LOCAL_IMAGE_DIRECTORY = "images"
LOCAL_BATCH_DATA = "batch_inference_data"

# Prompts
SYSTEM_PROMPT = """You are an expert food writer and culinary storyteller specializing in vivid, sensory-rich descriptions of food imagery. 
With your deep knowledge of global cuisines, cooking techniques, and food photography, you craft compelling narratives that bring dishes to life through precise, evocative language. 
You balance technical accuracy with artistic flair, incorporating details about texture, color, composition, and presentation while making each description uniquely engaging. 
Your writing style is warm and accessible while maintaining professional expertise."""

USER_PROMPT_TITLE = (
    "Create a compelling title for this image of no more than 7-10 words."
)

USER_PROMPT_DESCRIPTION = "Write a brief description for this image."

USER_PROMPT_KEYWORDS = """Generate a list of 15-20 descriptive tags or short phrases that capture key visual elements of this image. 
Consider all aspects of the image, including visual content, colors, mood, style, and composition. Your output should be a comma-delimited list.

Format your response as a single line of comma-separated tags, ordered from most to least prominent. Do not use numbering or bullet points. Do not end the list with a period.

Example output:
sunlit forest, vibrant green foliage, misty atmosphere, dappled light, towering trees, forest floor, earthy tones, morning dew, 
tranquil mood, nature photography, depth of field, vertical composition, organic patterns, woodland creatures, biodiversity, 
environmental theme, soft focus background, wide-angle shot, seasonal change, ethereal quality"""

## Prepare data for the batch inference


### Formatting input data

The input data should be in JSONL format, with each line representing a single transcript for summarization. Each line in your JSONL file should follow this structure:

```json
{"recordId": "11 character alphanumeric string", "modelInput": {JSON body}}
```

Here, `recordId` is an 11-character alphanumeric string, working as a unique identifier for each entry. If you omit this field, the batch inference job will automatically add it in the output.

The format of the `modelInput` JSON object should match the body field for the model you are using in the `InvokeModel` request. For example, if you're using the Anthropic Claude 3.5 model on Amazon Bedrock, you should use the MessageAPI, and your model input might look like the following:

```json
{
  "recordId": "IMG00000001",
  "modelInput": {
    "anthropic_version": "bedrock-2023-05-31",
    "system": "You are an expert at writing descriptions of images.",
    "max_tokens": 1024,
    "temperature": 0.3,
    "top_p": 0.1,
    "top_k": 100,
    "messages": [
      {
        "role": "user",
        "content": [
          {
            "type": "text",
            "text": "Write a description for this image."
          },
          {
            "type": "image",
            "source": {
              "type": "base64",
              "media_type": "image/jpeg",
              "data": "/9j/4AAQSkZJRgABAQEASABIAAD/4gIcSUNDX1BST0Z..."
            }
          }
        ]
      }
    ]
  }
}
```


### Generating model inputs

The `prepare_model_inputs` function reads the input text files from an Amazon S3 bucket, generates unique record IDs, and prepares the model inputs according to the Anthropic Claude 3 model format.


In [None]:
class IDGenerator:
    def __init__(self, prefix="IMG", padding=8, start=1):
        """
        Initialize the ID Generator

        Args:
            prefix (str): Prefix for the ID (default: "IMG")
            padding (int): Number of digits to pad with zeros (default: 8)
            start (int): Starting number for the counter (default: 1)
        """
        self.prefix = prefix
        self.padding = padding
        self.counter = start

    def get_next_id(self) -> str:
        """Generate the next ID in sequence"""
        # Format the number with leading zeros
        number = str(self.counter).zfill(self.padding)
        # Create the ID by combining prefix and padded number
        id_string = f"{self.prefix}{number}"
        # Increment the counter
        self.counter += 1
        return id_string

    def reset_counter(self, value=1) -> None:
        """Reset the counter to a specific value"""
        self.counter = value

    def get_current_counter(self) -> int:
        """Get the current counter value"""
        return self.counter

In [None]:
def encode_image(image_path) -> str:
    """
    Encodes an image from the given file path to a base64 string.
    Args:
        image_path (str): The file path to the image to be encoded.
    Returns:
        str: The base64 encoded string representation of the image.
    """

    with open(image_path, "rb") as image_file:
        image_data = image_file.read()
        encoded_string = base64.b64encode(image_data)
        return encoded_string.decode("utf-8")

In [None]:
# Instantiate an instance of the class with a prefix of "IMG" and padding of 8 digits
id_gen = IDGenerator(prefix="IMG", padding=8, start=1)

In [None]:
def prepare_model_input(image_path) -> str:
    """
    Prepares the input for the Anthropic Claude 3.5 model by creating a request body
    that includes a text prompt and an image encoded in base64 format.
    Args:
        image_path (str): The file path to the image that needs to be described.
    Returns:
        str: A dictionary containing the record ID and the model input formatted
             for the Anthropic Claude 3.5 model.
    """

    body = {
        "anthropic_version": "bedrock-2023-05-31",
        "system": SYSTEM_PROMPT,
        "max_tokens": 512,
        "temperature": 0.3,
        "top_p": 0.1,
        "top_k": 100,
        "messages": [
            {
                "role": "user",
                "content": [
                    {
                        "type": "text",
                        "text": USER_PROMPT_DESCRIPTION,
                    },
                    {
                        "type": "image",
                        "source": {
                            "type": "base64",
                            "media_type": "image/jpeg",
                            "data": encode_image(image_path),
                        },
                    },
                ],
            }
        ],
    }

    # Prepare the model input
    model_input = {"recordId": id_gen.get_next_id(), "modelInput": body}

    return model_input

In [None]:
# Initialize the model_inputs list
model_inputs = []

# Get sorted list of only the .jpeg images in the directory
images = [
    image
    for image in sorted(os.listdir(LOCAL_IMAGE_DIRECTORY))
    if image.endswith(".jpeg")
]

# Iterate over each image in the list up to the batch size
batch_size = 100
for image in images[:batch_size]:
    image_path = os.path.join(LOCAL_IMAGE_DIRECTORY, image)
    model_input = prepare_model_input(image_path)

    # Append the model input to the list
    model_inputs.append(model_input)

### Writing to JSONL file

The `write_jsonl` function takes a list of data (in this case, the list of model inputs) and a file path, and writes the data to a local JSONL file.

For each item in the data list, the function converts the item to a JSON string using `json.dumps` and writes it to the file, followed by a newline character.


In [None]:
def write_jsonl(data, file_path) -> None:
    """
    Writes a list of dictionaries to a file in JSON Lines format.
    Args:
        data (list): A list of dictionaries to be written to the file.
        file_path (str): The path to the file where the data will be written.
    Returns:
        None
    """

    with open(file_path, "w") as file:
        for item in data:
            json_str = json.dumps(item)
            file.write(json_str + "\n")

In [None]:
# Write model inputs to the .jsonl batch inference data file
batch_file_name = (
    f"{BATCH_FILE_PREFIX}-{batch_size}-{int(datetime.now().timestamp())}.jsonl"
)
logger.info(batch_file_name)

write_jsonl(model_inputs, f"./{LOCAL_BATCH_DATA}/{batch_file_name}")

### Uploading to Amazon S3


In [None]:
def upload_to_s3(path, bucket_name, bucket_subfolder=None) -> bool:
    """
    Uploads a file or directory to an S3 bucket.

    Args:
        path (str): The local path to the file or directory to upload.
        bucket_name (str): The name of the S3 bucket to upload to.
        bucket_subfolder (str, optional): The subfolder within the S3 bucket to upload to. Defaults to None.

    Returns:
        bool: True if the upload was successful, False if there was an error uploading a file,
              or None if the path is not a file or directory.
    """

    # Check if the path is a file
    if os.path.isfile(path):
        # If the path is a file, upload it directly
        object_name = (
            os.path.basename(path)
            if bucket_subfolder is None
            else f"{bucket_subfolder}/{os.path.basename(path)}"
        )
        try:
            s3.upload_file(path, bucket_name, object_name)
            logger.info(f"Successfully uploaded {path} to {bucket_name}/{object_name}")
            return True
        except Exception as e:
            logger.error(f"Error uploading {path} to S3: {e}")
            return False
    elif os.path.isdir(path):
        # If the path is a directory, recursively upload all files within it
        for root, dirs, files in os.walk(path):
            for file in files:
                file_path = os.path.join(root, file)
                relative_path = os.path.relpath(file_path, path)
                object_name = (
                    relative_path
                    if bucket_subfolder is None
                    else f"{bucket_subfolder}/{relative_path}"
                )
                try:
                    s3.upload_file(file_path, bucket_name, object_name)
                    logger.info(
                        f"Successfully uploaded {file_path} to {bucket_name}/{object_name}"
                    )
                except Exception as e:
                    logger.error(f"Error uploading {file_path} to S3: {e}")
        return None
    else:
        logger.warning(f"{path} is not a file or directory.")
        return None

In [None]:
# Upload the data from local to S3 bucket for batch inference
upload_to_s3(
    path=f"./{LOCAL_BATCH_DATA}/{batch_file_name}",
    bucket_name=BUCKET_NAME,
    bucket_subfolder=INPUT_PREFIX,
)

## Creating the Batch Inference Job

Once the data is prepared and uploaded to an Amazon S3, you can create the batch inference job.

### Configuring input and output data

Before submitting the batch inference job, you need to configure the input and output data locations in Amazon S3. This is done using the `inputDataConfig` and `outputDataConfig` parameters.

The `inputDataConfig` specifies the Amazon S3 URI where the prepared input data (JSONL file) is stored and, the `outputDataConfig` specifies the Amazon S3 URI where the processed output data will be stored by the batch inference job.


In [None]:
inputDataConfig = {
    "s3InputDataConfig": {
        "s3Uri": f"s3://{BUCKET_NAME}/{INPUT_PREFIX}/{batch_file_name}"
    }
}

outputDataConfig = {
    "s3OutputDataConfig": {
        "s3Uri": f"s3://{BUCKET_NAME}/{INPUT_PREFIX}/{OUTPUT_PREFIX}/"
    }
}

logger.info(inputDataConfig)
logger.info(outputDataConfig)

### Submitting the Batch Inference Job

To submit the batch inference job, you use the `create_model_invocation_job` API from the Amazon Bedrock client.


In [None]:
job_name = f"batch-job-{batch_size}-{str(int(datetime.now().timestamp()))}"
logger.info(job_name)

response = bedrock.create_model_invocation_job(
    roleArn=ROLE_ARN,
    modelId=MODEL_ID,
    jobName=job_name,
    inputDataConfig=inputDataConfig,
    outputDataConfig=outputDataConfig,
)

### Monitoring job status

After submitting the batch inference job, you can monitor its status using the `get_model_invocation_job` API from the Amazon Bedrock client. This API requires the `jobIdentifier` parameter, which is the ARN of the submitted job.


In [None]:
job_arn = response.get("jobArn")
job_id = job_arn.split("/")[1]

logger.info(f"Job ARN: {job_arn}")

status = ""
while status not in ["Completed", "Failed"]:
    job_response = bedrock.get_model_invocation_job(jobIdentifier=job_arn)
    status = job_response["status"]
    if status == "Failed":
        logger.info(job_response)
    elif status == "Completed":
        logger.info(f"Status: {status}")
        break
    else:
        logger.info(f"Status: {status}")
        time.sleep(120)

## Retrieving and analyzing output

When your batch inference job is complete, Amazon Bedrock creates a dedicated folder in the specified S3 bucket, using the job ID as the folder name. This folder contains a summary of the batch inference job, along with the processed inference data in JSONL format.

### Accessing and understanding output format

The output files contain the processed text, observability data, and the parameters used for inference. The format of the output data will depend on the model you used for batch inference. The notebook provides an example of how to access and process this information from the output JSONL file for Anthropic Claude 3.5 models.

Additionally, in the output location specified for your batch inference job, you'll find a `manifest.json.out` file that provides a summary of the processed records. This file includes information such as the total number of records processed, the number of successfully processed records, the number of records with errors, and the total input and output token counts.


In [None]:
# Set the S3 bucket name and prefix for the text files.
# Last part in the path is the batch job's job id
prefix = f"{INPUT_PREFIX}/{OUTPUT_PREFIX}/{job_id}/"

# Initialize the list
output_data = []

# Read the JSON file from S3
try:
    object_key = f"{prefix}{batch_file_name}.out"
    response = s3.get_object(
        Bucket=BUCKET_NAME,
        Key=object_key,
    )
    json_data = response["Body"].read().decode("utf-8")
    output_entry = None

    # Process the JSON data and output the first entry
    lines = json_data.splitlines()
    for line in lines[0:1]:
        data = json.loads(line)
        output_entry = {
            "request_id": data["modelOutput"]["id"],
            "output_text": data["modelOutput"]["content"][0]["text"],
            "observability": {
                "input_tokens": data["modelOutput"]["usage"]["input_tokens"],
                "output_tokens": data["modelOutput"]["usage"]["output_tokens"],
                "model": data["modelOutput"]["model"],
                "stop_reason": data["modelOutput"]["stop_reason"],
                "recordId": data["recordId"],
                "max_tokens": data["modelInput"]["max_tokens"],
                "temperature": data["modelInput"]["temperature"],
                "top_p": data["modelInput"]["top_p"],
                "top_k": data["modelInput"]["top_k"],
            },
        }
        output_data.append(output_entry)
    logger.info(f"Successfully read {len(output_data)} JSON objects from S3.")
    logger.info(json.dumps(output_entry, indent=4))
except Exception as e:
    logger.error(f"Error reading JSON file from S3: {e}")