# Bedrock Batch inference with Amazon Nova FMs to summarize call transcripts

With [Batch Inference](https://docs.aws.amazon.com/bedrock/latest/userguide/batch-inference.html), you can provide a set of prompts as a single input file and receive responses as a single output file, allowing you to get simultaneous large-scale predictions. The responses are processed and stored in your Amazon S3 bucket so you can access them at a later time. Amazon Bedrock offers support for Amazon Nova FMs for batch inference at a 50% lower price compared to on-demand inference pricing. Please refer to model list [here](https://docs.aws.amazon.com/bedrock/latest/userguide/batch-inference-supported.html).

## Introduction
Call center transcript summarization is a crucial task for businesses seeking to extract valuable insights from customer interactions. As the volume of call data grows, traditional analysis methods struggle to keep pace, creating a demand for scalable solutions. Batch Inference for Amazon Bedrock provides a powerful tool to address this challenge by enabling organizations to process large volumes of data efficiently.

This notebook demonstrates how to leverage batch inference for summarizing call center transcripts at scale by processing substantial volumes of text transcripts in batches. Though we are using example of call transcript summarization here, you can really apply this to any other use case that does not need a real time output.

## Prerequisites

Before you begin, ensure that you have the following prerequisites in place:
1. Updated boto3 to 1.35.1 or greater version
2. You must have permissions to invoke `create_model_invocation_job` API. Refer to the documentation to learn about [required permissions for batch inference job](https://docs.aws.amazon.com/bedrock/latest/userguide/batch-inference-prereq.html#batch-inference-permissions).
3. Permission to read and write data on Amazon S3 bucket.
4. Bedrock Batch Inference requires a service role so that it can access and write to S3 on your behalf. You can create the service role manually [see here](https://docs.aws.amazon.com/bedrock/latest/userguide/batch-iam-sr.html) or use the AWS Console workflow which can create one for you [here](https://us-east-1.console.aws.amazon.com/bedrock/home?region=us-east-1#/batch-inference/create). We also provide a quick way to create a service role in the code below.
5. Call transcript dataset:
    * This notebook was built using synthetic call transcripts in `.txt` files. If you want to try it with your own dataset, upload your call transcripts to an Amazon S3 bucket in `.txt` format. Each text file in the S3 bucket should contain only one call transcript.
6. Ensure that you are in a AWS region that is supported for Batch Inference. Refer [here](https://docs.aws.amazon.com/bedrock/latest/userguide/batch-inference-supported.html) for documentation.
7. The default maximum size of a single file (in GB) submitted for batch inference for Nova models is 1 GB. However, you can request an increase [here](https://us-east-1.console.aws.amazon.com/servicequotas/home/services/bedrock/quotas/L-68FC8D47) as needed.
 

In [None]:
import boto3 
print(boto3.__version__) 
# if not upgrade boto3 1.35.1 or greater version, uncomment below
# %pip install --upgrade pip
# %pip install boto3 --upgrade

In [None]:
# restart kernel if needed in you installed fresh from previous cell
from IPython.core.display import HTML
HTML("<script>Jupyter.notebook.kernel.restart()</script>")

In [None]:
from datetime import datetime
import time

# Bedrock client for batch inference job
bedrock = boto3.client(service_name="bedrock")

# Create an S3 client
s3 = boto3.client('s3')
region = boto3.session.Session().region_name

# Set the S3 bucket name and prefix for the text files
bucket_name = '<>' #Update your bucket name here
raw_data_prefix = 'novabatchinf/raw'
output_prefix = 'output'
 
# Batch API parameters:
jobName = 'novabatchinf-job' + str(int(datetime.now().timestamp()))
job_s3_prefix = 'novabatchinf/' + jobName
modelId = "amazon.nova-lite-v1:0"  # or use other model from here: https://docs.aws.amazon.com/nova/latest/userguide/what-is-nova.html#w78aab7c19

## Data Preparation

Before initiating a batch inference job for call center transcript summarization, it's crucial to properly format and upload your data to an S3 bucket. Learn more about data format requirments in our [documentation](https://docs.aws.amazon.com/bedrock/latest/userguide/batch-inference-data.html).

### Formatting Input Data

The input data should be in JSONL format, with each line representing a single transcript for summarization. Each line in your JSONL file should follow this structure:

```json
{"recordId": "11 character alphanumeric string", "modelInput": {JSON body}}
```

Here, `recordId` is an 11-character alphanumeric string, working as a unique identifier for each entry. If you omit this field, the batch inference job will automatically add it in the output.

The format of the `modelInput` JSON object should match the body field for the model you are using in the `InvokeModel` request. Since we are using the Amazon Nova Pro model on Amazon Bedrock, your model input might look like the following:

```python
message_list = [{"role": "user", "content": [{"text": original_text}]}]
inf_params = {"max_new_tokens": 500, "top_p": 0.9, "top_k": 20, "temperature": 0.7}
system_list = [
            {
                "text": "Write an accurate 250 word gender-neutral summary of the following text without adding preamble or additonal information not present in the original text"
            }
]

{"recordId": "CALL0000001", 
 "modelInput": {
      "schemaVersion": "messages-v1",
      "system": system_list,
      "messages": message_list,
      "inferenceConfig": inf_params,
      }
}
```

#### Download the sample synthetic dataset
If you do not have a dataset but want to try out Batch Inference for Amazon Bedrock, you can use the synthetic call data available in the dataset directory.


In [None]:
import os
import zipfile

# Set the path to the zip file
zip_file_path = './dataset/synthetic_call_transcript_data.zip'

# Set the path to the destination folder
dest_folder_path = './unzipped_transcripts'

# Create the destination folder if it doesn't exist
if not os.path.exists(dest_folder_path):
    os.makedirs(dest_folder_path)

# Open the zip file
with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
    # Extract all files to the destination folder
    zip_ref.extractall(dest_folder_path)

print(f"Files extracted to {dest_folder_path}")

Let's now construct the jsonl file with the synthetic dataset in the appropriate format that Nova models can understand.

In [7]:
import os
import json
from tqdm import tqdm

def create_jsonl_for_batch_inference(input_folder, output_file):
    inf_params = {"max_new_tokens": 500, "top_p": 0.9, "top_k": 20, "temperature": 0.7}
    system_list = [
        {
            "text": "Write an accurate 250 word gender-neutral summary of the following text without adding preamble or additional information not present in the original text"
        }
    ]

    with open(output_file, 'w', encoding='utf-8') as outfile:
        for filename in tqdm(os.listdir(input_folder)):
            if filename.endswith('.txt'):
                file_path = os.path.join(input_folder, filename)
                record_id = os.path.splitext(filename)[0]

                try:
                    with open(file_path, 'r', encoding='utf-8') as infile:
                        content = infile.read().strip()

                    message_list = [
                        {
                            "role": "user",
                            "content": [
                                {
                                    "text": content
                                }
                            ]
                        }
                    ]

                    # https://docs.aws.amazon.com/nova/latest/userguide/complete-request-schema.html
                    json_object = {
                        "recordId": record_id,
                        "modelInput": {
                            "schemaVersion": "messages-v1", 
                            "system": system_list,
                            "messages": message_list,
                            "inferenceConfig": inf_params,
                        }
                    }

                    json.dump(json_object, outfile, ensure_ascii=False)
                    outfile.write('\n')
                except Exception as e:
                    print(f"Error processing file {filename}: {str(e)}")

In [None]:
input_folder = './unzipped_transcripts/textual' # the folder that contains the
jsonl_file = "raw.jsonl"
create_jsonl_for_batch_inference(input_folder, jsonl_file)

### Uploading to Amazon S3

The `upload_to_s3` function uploads a file or directory to an AWS S3 bucket. It takes three arguments:

1. `path`: The path to the file or directory to be uploaded.
2. `bucket_name`: The name of the S3 bucket.
3. `bucket_subfolder` (optional): The name of the subfolder within the S3 bucket where the prepared data should be uploaded.

In [9]:
def upload_to_s3(path, bucket_name, bucket_subfolder=None):
    # check if the path is a file
    if os.path.isfile(path):
        # If the path is a file, upload it directly
        object_name = os.path.basename(path) if bucket_subfolder is None else f"{bucket_subfolder}/{os.path.basename(path)}"
        try:
            s3.upload_file(path, bucket_name, object_name)
            print(f"Successfully uploaded {path} to {bucket_name}/{object_name}")
            return True
        except Exception as e:
            print(f"Error uploading {path} to S3: {e}")
            return False
    elif os.path.isdir(path):
        # If the path is a directory, recursively upload all files within it
        for root, dirs, files in os.walk(path):
            for file in files:
                file_path = os.path.join(root, file)
                relative_path = os.path.relpath(file_path, path)
                object_name = relative_path if bucket_subfolder is None else f"{bucket_subfolder}/{relative_path}"
                try:
                    s3.upload_file(file_path, bucket_name, object_name)
                    # print(f"Successfully uploaded {file_path} to {bucket_name}/{object_name}")
                except Exception as e:
                    print(f"Error uploading {file_path} to S3: {e}")
        return None
    else:
        print(f"{path} is not a file or directory.")
        return None

In [None]:
# uploads the data from local to S3 bucket for batch inference
upload_to_s3(path=f"./{jsonl_file}", 
             bucket_name=bucket_name, 
             bucket_subfolder=job_s3_prefix)

## Creating the Batch Inference Job

Once the data is prepared and uploaded to an Amazon S3, you can create the batch inference job.

### Configuring Input and Output Data

Before submitting the batch inference job, you need to configure the input and output data locations in Amazon S3. This is done using the `inputDataConfig` and `outputDataConfig` parameters.

The `inputDataConfig` specifies the Amazon S3 URI where the prepared input data (JSONL file) is stored and, the `outputDataConfig` specifies the Amazon S3 URI where the processed output data will be stored by the batch inference job.

In [11]:
inputDataConfig=({
    "s3InputDataConfig": {
        "s3Uri": f"s3://{bucket_name}/{job_s3_prefix}/{jsonl_file}"
    }
})

outputDataConfig=({
    "s3OutputDataConfig": {
        "s3Uri": f"s3://{bucket_name}/{job_s3_prefix}/{output_prefix}/"
    }
})

Sanity check the Input and Output configs to ensure the S3Uri Paths are correct

In [None]:
print(inputDataConfig)
print(outputDataConfig)

### Create the Amazon Bedrock Batch Service Role
Bedrock Batch Inference requires a service role so that it can access and write to S3 on your behalf. You can create the service role manually [see here](https://docs.aws.amazon.com/bedrock/latest/userguide/batch-iam-sr.html). You may use the below code or use the AWS Console workflow which can create one for you [here](https://us-east-1.console.aws.amazon.com/bedrock/home?region=us-east-1#/batch-inference/create).

In [None]:
# Get the AWS account ID
sts_client = boto3.client('sts')
account_id = sts_client.get_caller_identity()['Account']

# Create IAM client
iam_client = boto3.client('iam')

# Define the role name
role_name = "AmazonNovaBedrockBatchServiceRole"

# Define the policy document
policy_document = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Action": [
                "s3:GetObject",
                "s3:PutObject",
                "s3:ListBucket"
            ],
            "Resource": [
                f"arn:aws:s3:::{bucket_name}",
                f"arn:aws:s3:::{bucket_name}/*"
                # Add another 2 lines if your output bucket is different from input bucket see here for reference: https://docs.aws.amazon.com/bedrock/latest/userguide/batch-iam-sr.html
            ],
            "Condition": {
                "StringEquals": {
                    "aws:ResourceAccount": account_id
                }
            }
        }
    ]
}

# Define the trust relationship
trust_relationship = {
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "Service": "bedrock.amazonaws.com"
            },
            "Action": "sts:AssumeRole",
            "Condition": {
                "StringEquals": {
                    "aws:SourceAccount": account_id
                },
                "ArnEquals": {
                    "aws:SourceArn": f"arn:aws:bedrock:{region}:{account_id}:model-invocation-job/*"
                }
            }
        }
    ]
}

# Check if the role already exists
try:
    existing_role = iam_client.get_role(RoleName=role_name)
    print(f"Role '{role_name}' already exists. Skipping creation.")
    roleArn = existing_role['Role']['Arn']
except iam_client.exceptions.NoSuchEntityException:
    # Create the role
    try:
        create_role_response = iam_client.create_role(
            RoleName=role_name,
            AssumeRolePolicyDocument=json.dumps(trust_relationship),
            Description="Service role for Amazon Nova Bedrock Batch inference"
        )
        
        # Attach the inline policy to the role
        iam_client.put_role_policy(
            RoleName=role_name,
            PolicyName="NovaBedrockBatchServiceRolePolicy",
            PolicyDocument=json.dumps(policy_document)
        )
        
        print(f"Successfully created role: {create_role_response['Role']['Arn']}")
        roleArn = create_role_response['Role']['Arn']
    except Exception as e:
        print(f"Error creating role: {str(e)}")


### Submitting the Batch Inference Job

To submit the batch inference job, you use the `create_model_invocation_job` API from the Amazon Bedrock client. This API requires the following parameters:

- `roleArn`: The Amazon Resource Name (ARN) of the IAM role with permissions to invoke the batch inference API for Amazon Bedrock.
- `modelId`: The ID of the model you want to use for batch inference (e.g., `amazon.nova-lite-v1:0` or other modelIds from [here](https://docs.aws.amazon.com/nova/latest/userguide/what-is-nova.html)).
- `jobName`: A name for your batch inference job.
- `inputDataConfig`: The configuration for the input data, as defined in the previous step.
- `outputDataConfig`: The configuration for the output data, as defined in the previous step.

The API call returns a response containing the ARN of the submitted batch inference job.

In [14]:
response=bedrock.create_model_invocation_job(
    roleArn=roleArn,
    modelId=modelId,
    jobName=jobName,
    inputDataConfig=inputDataConfig,
    outputDataConfig=outputDataConfig
)

### Monitoring Job Status

After submitting the batch inference job, you can monitor its status using the `get_model_invocation_job` API from the Amazon Bedrock client. This API requires the `jobIdentifier` parameter, which is the ARN of the submitted job.

In [None]:
jobArn = response.get('jobArn')
job_id = jobArn.split('/')[1]

print(jobName)

status = ''
while status not in ['Completed', 'Failed']:
    job_response = bedrock.get_model_invocation_job(jobIdentifier=jobArn)
    status = job_response['status']
    if status == 'Failed':
        print(job_response)
    elif status == 'Completed':
        print(datetime.now(), ": ", status)
        break
    else: 
        print(datetime.now(), ": ", status)
        time.sleep(300)

## Retrieving and Analyzing Output

When your batch inference job is complete, Amazon Bedrock creates a dedicated folder in the specified S3 bucket, using the job ID as the folder name. This folder contains a summary of the batch inference job, along with the processed inference data in JSONL format.

### Accessing and Understanding Output Format

The output files contain the processed text, observability data, and the parameters used for inference. The format of the output data will depend on the model you used for batch inference. The notebook provides an example of how to access and process this information from the output JSONL file for Amazon Nova models.

Additionally, in the output location specified for your batch inference job, you'll find a `manifest.json.out` file that provides a summary of the processed records. This file includes information such as the total number of records processed, the number of successfully processed records, the number of records with errors, and the total input and output token counts. For example:

```json
{
  "totalRecordCount": 1100,
  "processedRecordCount": 1100,
  "successRecordCount": 1100,
  "errorRecordCount": 0,
  "inputTokenCount": 1362960,
  "outputTokenCount": 142037
}
```

Let's download the final output from the batch job and read the first line as a sample output.

In [None]:
import json
# Set the S3 bucket name and prefix for the text files. 
# Last part in the path is the batch job's job id
prefix = f"{job_s3_prefix}/{output_prefix}/{job_id}/"

# Set local file path
local_file_path = f"./{jsonl_file}.out"

# Download the JSON file from S3
try:
    object_key = f"{prefix}{jsonl_file}.out"
    s3.download_file(bucket_name, object_key, local_file_path)
    print(f"Successfully downloaded file from S3 to {local_file_path}")

    # Read and print only the first line of the file
    with open(local_file_path, 'r') as file:
        first_line = file.readline().strip()
        try:
            data = json.loads(first_line)
            model_output = data.get('modelOutput', {})

            print("\nSample modelOutput (from first line):")
            print(json.dumps(model_output, indent=2))

        except json.JSONDecodeError as e:
            print(f"Error decoding JSON in the first line: {e}")

except Exception as e:
    print(f"Error processing file: {e}")


### Integrating with Existing Workflows

After retrieving the processed output data, you can integrate it into your existing workflows or analytics systems for further analysis or downstream processing. For example, you could:

- Store the summarized transcripts in a database for easy access and querying.
- Perform sentiment analysis or topic modeling on the summarized transcripts to gain additional insights.
- Categorize the summarizes into actionable business buckets and develop anomaly detection.
- Develop dashboards or reports to visualize and analyze the summarized data.

The specific integration steps will depend on your existing workflows and systems, but the processed output data from the batch inference job can be easily incorporated into various data pipelines and analytics processes.

## Conclusion

The notebook covers the entire process, from data preparation and formatting to job submission, output retrieval, and integration with existing workflows. By implementing batch inference for call transcript summarization, you can streamline your analysis processes and gain a competitive edge in understanding customer needs and improving your call center operations.

Feel free to adapt and extend this notebook to suit your specific requirements, and explore other use cases where batch inference can be applied to optimize your interactions with foundation models at scale.