In [None]:
import os
import sys
import sagemaker
import pandas as pd
import boto3
import numpy as np
from sagemaker import get_execution_role
import json
import warnings
warnings.filterwarnings('ignore')

sess = sagemaker.Session()


role = '<IAM_ROLE_HERE>'
region = sess.boto_session.region_name

print(f"Sagemaker Role : {role}")
print(f"Region : {region}")

In [None]:
# ARN of PRIVATE WORK TEAM CREATED USING SAGEMAKER UI WHICH IS TO BE USED TO BE LABEL THE DATA 
PRIVATE_WORKTEAM_ARN = "arn:aws:sagemaker:ap-southeast-1:050381676378:workteam/private-crowd/test-labelling-team-1"
BOUNDING_BOX_PREHUMAN_ARN = "arn:aws:lambda:ap-southeast-1:377565633583:function:PRE-BoundingBox"
BOUNDING_BOX_ACS_ARN = "arn:aws:lambda:ap-southeast-1:377565633583:function:ACS-BoundingBox"

# S3 PATH OF THE DATASET . PATH WHERE IMAGES ARE LOCATED 
DATASET_PATH = 'ayush/labeling_job_test/dataset-small/'
BUCKET_NAME = "sixsense-organization-assets"
MANIFEST_UPLOAD_DIR = 'ayush/labeling_job_test/labelling_test/'
MANIFEST_FILE_NAME = 'sample.manifest'

OUTPUT_DIR = 'ayush/labeling_job_test/labelling_test/'

LABEL_LIST = ["MASK"]
LABEL_FILE_UPLOAD_DIR = "ayush/labeling_job_test/labelling_test/"
LABEL_FILE_NAME = "labels.json"

JOB_NAME_PREFIX = "automated-labelling-1"
TASK_DESCRIPTION = "Draw boxes on the faces wearking masks"
TASK_KEYWORDS = ["BoundingBox"]
TASK_TITLE = "MASK Detection Labelling"

MaxConcurrentTaskCount=200
NumberOfHumanWorkersPerDataObject=1
TaskAvailabilityLifetimeInSeconds=3600 # 1 hour
TaskTimeLimitInSeconds=300 # 5 minutes
TEMPLATE_FILE_URI="s3://sixsense-organization-assets/ayush/labeling_job_test/labelling_test/instructions.template"

MANIFEST_FILE_URI = os.path.join(f"s3://{BUCKET_NAME}",MANIFEST_UPLOAD_DIR,MANIFEST_FILE_NAME)
OUTPUT_PATH_URI = os.path.join(f"s3://{BUCKET_NAME}",OUTPUT_DIR)
LABEL_FILE_URI = os.path.join(f"s3://{BUCKET_NAME}",LABEL_FILE_UPLOAD_DIR,LABEL_FILE_NAME)
print(MANIFEST_FILE_URI)
print(OUTPUT_PATH_URI)
print(LABEL_FILE_URI)

In [None]:
def generate_manifest_file(bucket_name, dataset_path, manifest_upload_dir, manifest_file_name):
    """
    Generates a manifest file containing the location of images used in the labelling job
    params:
        bucket_name : s3 bucket name 
        dataset_path : relative s3 path of the image dataset 
        manifest_upload_dir : s3 directory to upload the manifest file
        manifest_file_name : name of the manifest file 
    """
    image_extensions = ['png', 'jpg', 'jpeg']
    local_manifest_file_path = os.path.join(os.getcwd(), manifest_file_name)
    s3 = boto3.resource('s3')
    dataset_bucket = s3.Bucket(bucket_name)
    with open(local_manifest_file_path,'w') as outfile:
        for object_summary in dataset_bucket.objects.filter(Prefix=dataset_path):
            object_key = object_summary.key
            file_extension  = object_key.split('.')[-1]
            if file_extension in image_extensions:
                file_name  = object_key.split('/')[-1]
                file_path = os.path.join(f"s3://{bucket_name}",dataset_path,file_name)
                data_dict = {"source-ref": file_path}
                outfile.write(json.dumps(data_dict) + "\n") 
    print(f"Manifest File Creation Done. Uploading Manifest file to : {manifest_upload_dir}")
    try:
        dataset_bucket.upload_file(local_manifest_file_path, os.path.join(manifest_upload_dir,manifest_file_name))
    except Exception as e:
        raise Exception(f"Failed to Upload {local_manifest_file_path} to {manifest_upload_dir}\nError : {e}")
    print(f"Upload Successful")
    os.remove(local_manifest_file_path)

In [None]:
generate_manifest_file(bucket_name=BUCKET_NAME,
                       dataset_path=DATASET_PATH,
                       manifest_upload_dir=MANIFEST_UPLOAD_DIR,
                       manifest_file_name=MANIFEST_FILE_NAME)

In [None]:
def generate_label_file(bucket_name, label_list, label_file_name, label_file_upload_dir):
    """
    Generate a json file containing information of labels to be annotated and upload on S3
    params:
        bucket_name : S3 Bucket Name
        label_list : list of labels to be annotated 
        label_file_name : name of label file 
        label_file_upload_dir : s3 directory to upload label file
    """
    label_dict = {}
    label_dict = {
        "document-version" : "2023-02-09",
        "labels" : []
    }
    for label in label_list:
        label_dict['labels'] = {
            "label" : label
        }
    local_label_file_path = os.path.join(os.getcwd(), label_file_name)
    with open(local_label_file_path,'w') as f:
        json.dump(label_dict,f)
        
    s3 = boto3.resource('s3')
    s3_bucket = s3.Bucket(bucket_name)
    print(f"Uploading Label File {label_file_name} to {label_file_upload_dir}")
    try:
        s3_bucket.upload_file(local_label_file_path, os.path.join(label_file_upload_dir,label_file_name))
    except Exception as e:
        raise Exception(f"Failed to upload {local_label_file_path} to {label_file_upload_dir}")
    print("Uploaded Successfully")
    os.remove(local_label_file_path)

In [None]:
generate_label_file(bucket_name=BUCKET_NAME, 
                    label_list=LABEL_LIST,
                    label_file_name=LABEL_FILE_NAME,
                    label_file_upload_dir=LABEL_FILE_UPLOAD_DIR)

In [None]:
def create_human_task_config(acs_arn,
                             pre_human_arn,
                             MaxConcurrentTaskCount,
                             NumberOfHumanWorkersPerDataObject,
                             TaskAvailabilityLifetimeInSeconds,
                             TaskTimeLimitInSeconds,
                             TaskDescription,
                             TaskKeywords,
                             TaskTitle,
                             template_file_uri,
                             work_team_arn
                            ):
    """
    The function will create a config defining certain rules and parameters for human labellers
    params:
        acs_arn : ACS arn for bounding box job in ap-southeast-1
        pre_human_arn : Pre-Human arn for bounding box job in ap-southeast-1
        MaxConcurrentTaskCount : Images sent at a time to the workteam
        NumberOfHumanWorkersPerDataObject : Workers to label each image
        TaskAvailabilityLifetimeInSeconds : Time to complete all pending tasks
        TaskTimeLimitInSeconds :  Time to complete each image
        TaskDescription : Brief description of the task 
        TaskKeywords : Keywords related to Task
        TaskTitle : Title of the task,
        template_file_uri : Template of the file containing description and rules for the job
        work_team_arn : 
    returns:
        human_task_config
    """
    human_task_config = {
        "AnnotationConsolidationConfig": {
            "AnnotationConsolidationLambdaArn": acs_arn,
        },
        "PreHumanTaskLambdaArn": pre_human_arn,
        "MaxConcurrentTaskCount": MaxConcurrentTaskCount, 
        "NumberOfHumanWorkersPerDataObject": NumberOfHumanWorkersPerDataObject,
        "TaskAvailabilityLifetimeInSeconds": TaskAvailabilityLifetimeInSeconds, 
        "TaskDescription": TaskDescription,
        "TaskKeywords": TaskKeywords,
        "TaskTimeLimitInSeconds": TaskTimeLimitInSeconds,  
        "TaskTitle": TaskTitle,
        "UiConfig": {
            "UiTemplateS3Uri": template_file_uri,
        },
        "WorkteamArn" : work_team_arn
    }
    return human_task_config

In [None]:
human_task_config = create_human_task_config(acs_arn=BOUNDING_BOX_ACS_ARN,
                             pre_human_arn=BOUNDING_BOX_PREHUMAN_ARN,
                             MaxConcurrentTaskCount=MaxConcurrentTaskCount,
                             NumberOfHumanWorkersPerDataObject=NumberOfHumanWorkersPerDataObject,
                             TaskAvailabilityLifetimeInSeconds=TaskAvailabilityLifetimeInSeconds,
                             TaskTimeLimitInSeconds=TaskTimeLimitInSeconds,
                             TaskDescription=TASK_DESCRIPTION,
                             TaskKeywords=TASK_KEYWORDS,
                             TaskTitle=TASK_TITLE,
                             template_file_uri=TEMPLATE_FILE_URI,
                             work_team_arn=PRIVATE_WORKTEAM_ARN
                            )
human_task_config

In [None]:
def create_ground_truth_request(manifest_file_uri,
                                output_path_uri,
                                human_task_config,
                                job_name,
                                iam_role,
                                label_file_uri
                               ):
    """
    Generates a ground truth request dictionary to create a labelling job
    """
    ground_truth_request = {
        "InputConfig": {
            "DataSource": {
                "S3DataSource": {
                    "ManifestS3Uri": manifest_file_uri,
                }
            },
            "DataAttributes": {
                "ContentClassifiers": ["FreeOfPersonallyIdentifiableInformation", "FreeOfAdultContent"]
            },
        },
        "OutputConfig": {
            "S3OutputPath": output_path_uri,
        },
        "HumanTaskConfig": human_task_config,
        "LabelingJobName": job_name,
        "RoleArn": iam_role,
        "LabelAttributeName": "category",
        "LabelCategoryConfigS3Uri": label_file_uri,
    }
    return ground_truth_request

In [None]:
ground_truth_request=create_ground_truth_request(manifest_file_uri=MANIFEST_FILE_URI,
                                output_path_uri=OUTPUT_PATH_URI,
                                human_task_config=human_task_config,
                                job_name=JOB_NAME_PREFIX,
                                iam_role=role,
                                label_file_uri=LABEL_FILE_URI
                               )
ground_truth_request

In [None]:
sagemaker_client = boto3.client("sagemaker")
sagemaker_client.create_labeling_job(**ground_truth_request)