# Amazon SageMaker Ground Truth Demonstration for Generating Question and Answer pairs


1. [Introduction](#Introduction)
2. [Prerequisites](#Prerequisites)
    1. [Create a Work Team](#Create-Workteam)
    2. [IAM Role Set up](#IAM-role-set-up)
    3. [Prepare the data](#Prepare-the-data)
3. [Run a Ground Truth labeling job](#Run-a-Ground-Truth-labeling-job)
    1. [Specify Parameters for Labeling Job](#Specify-Labeling-Parameters)
    2. [Create the instruction template](#Create-the-instruction-template)
    3. [Create a Labeling Job](#Create-a-Labeling-Job)
    4. [Gather Human feedback through labeling portal](#Gather-human-feedback-through-labeling-portal)
    5. [Monitoring Labeling Job Status](#Monitoring-Labeling-Job-Status)
4. [Post Processing and Analysis of SageMaker GroundTruth Labeling Job Results](#Post-Processing-and-Analysis-of-SageMaker-Ground-Truth-Labeling-Job-Results)
    1. [Merging Worker Responses](#Merging-Worker-Responses-into-a-Consolidated-JSON-File)
    2. [Analyzing Time Spent on Labeling Tasks](#Analyzing-Time-Spent-on-Labeling-Tasks)
    3. [Consolidating Worker Responses and Output Manifest](#Consolidating-Worker-Responses-and-Output-Manifest)

## Introduction

This notebook guides you through the process of setting up a Human-in-the-Loop (HITL) workflow for Reinforcement Learning from Human Feedback (RLHF) using Amazon SageMaker Ground Truth's Question and Answer template. This labeling UI is designed to facilitate human annotators in reading a text passage and generating questions and answers to build a Q&A demonstration dataset. This UI supports customizable text input and flexible question-answer generation, with options to create questions referencing entire passages or highlight specific text portions for targeted Q&A. The color-coded matching feature visually links questions to relevant text sections, enhancing the annotation process. 

The primary objectives of this interface are to produce diverse, context-rich Q&A datasets that capture the nuances of human text comprehension, ultimately supporting the development of more sophisticated NLP models.

Some Common Use cases:

1. Improving chatbot and virtual assistant capabilities, ensuring they can understand and respond to customer queries accurately.
2. Enhancing search engine question-answering features
3. Creating educational content where models can assist in generating questions and answers for students based on reading materials.
4. Assisting content creators in generating relevant questions and answers for articles, blogs, and other textual content.
5. Supporting research in natural language understanding and generation

The steps include  setting up necessary Ground Truth pre-requisites, downloading an input manifest JSON, creating a worker task template, creating and monitoring a labeling job, and post-processing the results to deliver a consolidated dataset with worker responses.

## Prerequisites

You will create some of the resources you need to launch a Ground Truth labeling job in this notebook.

Lets get the latest version of SDK, restart kernel and import some essential libraries to set up the environment for downloading data, handling JSON files, and leveraging AWS services for machine learning workflows.

In [None]:
!pip install -q --upgrade pip
!pip install awscli -q --upgrade
!pip install botocore -q --upgrade
!pip install sagemaker -q --upgrade
!pip install py7zr
!pip install datasets

# NOTE: Restart Kernel after the above command

In [None]:
import boto3
import datetime
import json
import sagemaker

### Create-Workteam 
A work team is a group of workers that complete labeling tasks. If you want to preview the worker UI and execute the labeling task you will need to create a private work team, add yourself as a worker to this team, and provide the work team ARN below.

In [None]:
WORKTEAM_ARN = ""

print(f"This notebook will use the work team ARN: {WORKTEAM_ARN}")

### IAM-role-set-up
The IAM execution role you used to create this notebook instance must have the following permissions:

AWS managed policy AmazonSageMakerGroundTruthExecution. Run the following code-block to see your IAM execution role name. This GIF demonstrates how to add this policy to an IAM role in the IAM console. You can also find instructions in the IAM User Guide: Adding and removing IAM identity permissions.

When you create your role, you specify Amazon S3 permissions. Make sure that your IAM role has access to the S3 bucket that you plan to use in this example. If you do not specify an S3 bucket in this notebook, the default bucket in the AWS region you are running this notebook instance will be used. If you do not require granular permissions, you can attach AmazonS3FullAccess to your role.

In [None]:
role = sagemaker.get_execution_role()
role_name = role.split("/")[-1]
print("********************************************************************************")
print("The IAM execution role name:", role_name)
print("The IAM execution role ARN:", role)
print("********************************************************************************")
print(
    "IMPORTANT: Make sure this execution role has the AWS Managed policy AmazonGroundTruthExecution attached."
)

### Prepare-the-data
Before we create the labeling job, we need to ensure that the input data is in the format expected by GroundTruth.
<b>gt_input_manifest_qna.json</b> contains sample text passages in Ground Truth Input format under the "source" field. We upload the file to Amazon S3 bucket.

In [None]:
# Initialize SageMaker session and S3 resource
sess = sagemaker.Session()
bucket = sess.default_bucket()
s3 = boto3.resource('s3')
s3_client = boto3.client('s3')
region = boto3.session.Session().region_name

# Define the S3 bucket and folder
bucket_name = bucket
prefix = 'genai'
output_file = 'data/gt_input_manifest_qna.json'
s3_path = f"{prefix}/{output_file}"

# Upload the file to S3
s3.Bucket(bucket_name).upload_file(output_file, s3_path)
input_manifest_uri = f"s3://{bucket_name}/{s3_path}"

print(f"File uploaded to s3://{bucket_name}/{s3_path}")

## Run-a-Ground-Truth-labeling-job

###  Create-the-instruction-template

The instruction template is designed to facilitate the creation of high-quality question-answer datasets for natural language processing and machine learning applications. It contains instructions to help them perform their task accurately. Annotators can engage deeply with text passages, generating thoughtful questions and answers that reflect human-like comprehension and reasoning.

In [None]:
from IPython.display import display, HTML

def make_template(save_fname="instructions_qna.liquid"):
    template = """
<html>
  <head>
    <link rel="stylesheet" href="https://assets.crowd.aws/css/crowd-html-elements-v2.css" />
    <title>Question-Answer Generation Tool</title>
    <script src="https://assets.crowd.aws/crowd-html-elements-v2.js"></script>
  </head>
  <body>
    <div>
      <crowd-form id="crowd-form-qa" style="display: none"></crowd-form>
      <crowd-question-answer-generation
        crowd-form-element-id="crowd-form-qa"
        text='{{ task.input.source }}'
        min-questions="1"
        max-questions="5"
        question-min-words="1"
        question-max-words="20"
        answer-min-words="1"
        answer-max-words="20"
        question-tags='["What", "Why", "When", "Where", "Which", "Who", "How"]'
        allow-custom-question-tags="true"
        instructions='- Create questions and answers based on the provided text passage.
                      - Select at least 1 text reference for a Q&A pair (unless it references the entire text passage)
                      - Question tags are "Who", "What", "When", "Where", "Why", "How", "Which" and "Whose". Select the appropriate tag based on your question.
                      - Other types of questions are permitted (example- yes/no, completion statements etc). If applicable, add a custom tag for such Q&A pairs.'></crowd-question-answer-generation>
    </div>
    <script src="https://assets.crowd.aws/crowd-html-elements.js"></script>
  </body>
</html>
    """
    with open(save_fname, "w") as f:
        f.write(template)

# Create the template file locally
make_template(save_fname="./instructions_qna.liquid")

# Define the S3 path
file_name = 'instructions_qna.liquid'
ui_s3_path = f"{prefix}/{file_name}"
UITEMPLATES3URI = f"s3://{bucket}/{ui_s3_path}"

# Upload the file to S3 using the s3 client
s3_client.upload_file("./instructions_qna.liquid", bucket, ui_s3_path)

print(f"File uploaded to {UITEMPLATES3URI}")

### Specify-Labeling-Parameters
Specify a Labeling Job Name, Parameters for the Labeling such as TaskTitle, TaskDescription, TaskKeywords and use the CreateLabelingJob API to launch the Ground truth Labeling Job.

In [None]:
now = datetime.datetime.now()
timestamp_str = now.strftime("%Y%m%d-%H%M%S")
labeling_job_name = "immersionday-genai-qna-" + timestamp_str

### Create-a-Labeling-Job

In [None]:
client = boto3.client('sagemaker')

client.create_labeling_job(
    LabelingJobName=labeling_job_name,
    LabelAttributeName='label',
    InputConfig={
        'DataSource': {
            'S3DataSource': {
                'ManifestS3Uri': input_manifest_uri  #Enter S3 URI of Input Data Json
            }
        }
    },
    OutputConfig={
        'S3OutputPath': f's3://{bucket}/output/' #Enter S3 URI of Output folder
    },
    RoleArn=role, #Enter IAM Sagemaker Execution Role here,
    HumanTaskConfig={
        'WorkteamArn': WORKTEAM_ARN, #Enter Workteam ARN
        'UiConfig': {
            'UiTemplateS3Uri': UITEMPLATES3URI #Enter S3 URI of UI template
        },
        'TaskKeywords': [
            'QnA',
        ],
        'TaskTitle': 'Generative AI - Question and Answer pairs',
        'TaskDescription': "Create questions and answers based on the provided text passage.",
        'NumberOfHumanWorkersPerDataObject': 1,
        'TaskTimeLimitInSeconds': 60*30,
        'TaskAvailabilityLifetimeInSeconds': 60*60*24*10,
        'MaxConcurrentTaskCount': 100
    })

### Gather-human-feedback-through-labeling-portal

In [None]:
workforce = client.describe_workforce(WorkforceName="default")
worker_portal_url = 'https://' + workforce["Workforce"]["SubDomain"]


# Display the URL and instructions
display(HTML(f"""
<body>
<h4>Gather human preference data</h4>
<p>Please complete the human evaluation tasks available in the labeling portal.</p>
<p><a href="{worker_portal_url}">{worker_portal_url}</a>
<p><b>Ensure all tasks are completed before proceeding to the next steps in this notebook.<b></p>
<body>
"""))

### Monitoring-Labeling-Job-Status
We track the status of the ongoing labeling job. It is essential to monitor the job's progress and wait for its completion by the annotators. Once the labeling job is finished, we can then proceed to gather feedback from the annotators. This process ensures that we only collect feedback after the entire job is completed, thereby maintaining the accuracy and reliability of the feedback collected.

In [None]:
client.describe_labeling_job(LabelingJobName=labeling_job_name)

## Post-Processing-and-Analysis-of-SageMaker-Ground-Truth-Labeling-Job-Results
<b>[OPTIONAL]</b>

### Merging-Worker-Responses-into-a-Consolidated-JSON-File

This section focuses on combining all individual worker responses into a single, comprehensive JSON file. We gather the individual JSON files from the "worker-response" folder, each representing a worker's answers to labeling tasks, and merge them into one consolidated file. The resulting consolidated file makes it easier to analyze worker performance, compare responses across different tasks, and prepare the data for further processing or quality checks.

In [None]:
import boto3
import json

s3_client = boto3.client('s3')

def merge_json_files(bucket, prefix, output_bucket, output_key):
    merged_data = []

    # List iteration buckets
    iteration_response = s3_client.list_objects_v2(Bucket=bucket, Prefix=prefix, Delimiter='/')
    for iteration in iteration_response.get('CommonPrefixes', []):
        # List JSON objects in each iteration bucket
        json_response = s3_client.list_objects_v2(Bucket=bucket, Prefix=iteration['Prefix'])
        for obj in json_response.get('Contents', []):
            # Read and append each JSON file, adding the S3 URI or file name
            key = obj['Key']
            json_obj = s3_client.get_object(Bucket=bucket, Key=key)
            json_data = json.loads(json_obj['Body'].read().decode('utf-8'))
            json_data['workerjsonUri'] = f"s3://{bucket}/{key}"  # Add the source S3 URI to the JSON object
            merged_data.append(json_data)  # Append the modified JSON object

    # Write merged data to a new JSON file in S3
    merged_json = json.dumps(merged_data)
    s3_client.put_object(Body=merged_json, Bucket=output_bucket, Key=output_key)
    print(f"Merged JSON file created at https://s3.console.aws.amazon.com/s3/object/{output_bucket}?region={region}&prefix={output_key}")

# Replace with your actual bucket names and prefixes
source_bucket_name = bucket
iteration_prefix = f'output/{labeling_job_name}/annotations/worker-response/iteration-1/'
output_bucket_name = bucket
output_key = f'output/{labeling_job_name}/merged-worker-data.json'

merge_json_files(source_bucket_name, iteration_prefix, output_bucket_name, output_key)

### Analyzing-Time-Spent-on-Labeling-Tasks
This section analyzes the time spent on labeling tasks in our SageMaker Ground Truth job, from the consolidated worker json file. It calculates the total time invested across all tasks, counts the number of completed tasks, and determines the average time per task. These insights help us understand the overall effort of our labeling project, assess its scale, and plan future workflows more effectively. By examining these time metrics, we can make informed decisions about resource allocation and set realistic expectations for upcoming labeling jobs.

In [None]:
from colorama import Fore, Back, Style, init

init(autoreset=True)  # Initialize colorama

def calculate_and_display_average_time(bucket, key):
    json_obj = s3_client.get_object(Bucket=bucket, Key=key)
    merged_json_data = json.loads(json_obj['Body'].read().decode('utf-8'))
    total_time = 0
    count = 0
    for json_data in merged_json_data:
        if 'answers' in json_data:
            for answer in json_data['answers']:
                if 'timeSpentInSeconds' in answer:
                    total_time += answer['timeSpentInSeconds']
                    count += 1
    
    average_time = total_time / count if count > 0 else 0
    
    # Create a fancy display
    print("\n" + "="*50)
    print(Fore.CYAN + Style.BRIGHT + "     LABELING TASK SUMMARY     ")
    print("="*50 + "\n")
    
    print(Fore.YELLOW + "📊 " + Style.BRIGHT + "Count of Tasks:")
    print(Fore.WHITE + f"   {count:,} tasks")
    
    print(Fore.YELLOW + "\n⏱️ " + Style.BRIGHT + "Total Time Spent:")
    print(Fore.WHITE + f"   {total_time:.2f} seconds")
    print(Fore.WHITE + f"   ({total_time / 3600:.2f} hours)")
    
    print(Fore.YELLOW + "\n⌛ " + Style.BRIGHT + "Average Time per Task:")
    print(Fore.WHITE + f"   {average_time:.2f} seconds")
    
    print("\n" + "="*50)

calculate_and_display_average_time(output_bucket_name, output_key)

### Consolidating-Worker-Responses-and-Output-Manifest

This section consolidates our labeling project data into a comprehensive single file. We merge information from two key sources: the worker responses and the output manifest. By combining the "source" data from the output manifest with the "answers" provided by workers, we create a unified view of all labeling tasks. This consolidation streamlines our data analysis process, allowing us to easily connect each task's original source with its corresponding worker annotations.

In [None]:
def merge_data_with_manifest(output_manifest_bucket, output_manifest_key, merged_data_bucket, merged_data_key):
    try:
        # Fetch and load the output manifest file
        manifest_obj = s3_client.get_object(Bucket=output_manifest_bucket, Key=output_manifest_key)
        manifest_data = manifest_obj['Body'].read().decode('utf-8').splitlines()
        
        # Load the merged JSON data
        merged_data_obj = s3_client.get_object(Bucket=merged_data_bucket, Key=merged_data_key)
        merged_data = json.loads(merged_data_obj['Body'].read().decode('utf-8'))
        
        # Convert merged data to a dictionary for easier lookup based on workerjsonUri
        merged_data_dict = {item['workerjsonUri']: item for item in merged_data}
        
        final_data = []
        
        for line in manifest_data:
            manifest_entry = json.loads(line)
            worker_response_ref = manifest_entry.get("label-metadata", {}).get("worker-response-ref")
            
            # Find the matching entry in the merged data
            if worker_response_ref in merged_data_dict:
                # Merge the manifest data with the corresponding worker response data
                combined_data = {**manifest_entry, **merged_data_dict[worker_response_ref]}
                final_data.append(combined_data)
        
        # Optionally, write the final merged data to a new file in S3 or handle it as needed
        final_json = json.dumps(final_data)
        final_output_key = f'output/{labeling_job_name}/final-merged-data.json'  # Customize this as needed
        s3_client.put_object(Body=final_json, Bucket=output_manifest_bucket, Key=final_output_key)
        print(f"Final merged JSON file created at https://s3.console.aws.amazon.com/s3/object/{output_manifest_bucket}?region={region}&prefix={final_output_key}")
    
    except s3_client.exceptions.NoSuchKey as e:
        print(f"Error: {e}")

# Replace with your actual bucket names and keys
output_manifest_bucket = bucket
output_manifest_key = f'output/{labeling_job_name}/manifests/output/output.manifest'
merged_data_bucket = bucket
merged_data_key = f'output/{labeling_job_name}/merged-worker-data.json'

merge_data_with_manifest(output_manifest_bucket, output_manifest_key, merged_data_bucket, merged_data_key)