In [109]:
import json
import pprint
import boto3
from datetime import datetime
%matplotlib inline
from urllib.parse import urlparse
import pandas as pd
import numpy as np

In [110]:
s3_client = boto3.client('s3')
s3 = boto3.resource('s3')

In [111]:
def get_bucket_keys(bucket_name, prefix):

    
    keys = []
    paginator = s3_client.get_paginator('list_objects')
    operation_parameters = {'Bucket': bucket_name, 'Prefix': prefix}
    page_iterator = paginator.paginate(**operation_parameters)
    for page in page_iterator:
        for obj in page['Contents']:
            keys.append(obj['Key'])
    return keys

In [112]:
def get_bucket_key_from_s3_uri(s3_path):
    parsed_url = urlparse(s3_path, allow_fragments=False)
    bucket = parsed_url.netloc
    key = parsed_url.path.lstrip('/')
    return bucket, key

In [113]:
def get_worker_accuracy(worker_annotation, true_annotation):
    if true_annotation is None:
        return -1
    if worker_annotation == true_annotation:
        return 1
    else:
        return 0

In [114]:
def process_raw_annotation(worker_response, label_category_name='label'):
    raw_annotation = worker_response['answerContent']['crowd-classifier'][label_category_name]
    return raw_annotation

In [115]:
def process_worker_responses(bucket_name, all_keys_worker_response, dict_manifest_index_gt, dict_manifest_index_consolidated, label_category_name):
    dict_worker_response = {}
    for worker_response_key in all_keys_worker_response:
        if worker_response_key.endswith('.json'):
            response_content = json.loads(s3_client.get_object(Bucket = bucket_name, Key = worker_response_key)['Body'].read().decode('utf-8'))
            manifest_index = worker_response_key.split("/")[-2]
            list_worker_response = response_content['answers']
            if label_category_name is None:
                label_category_name = 'label'
            for worker_response in list_worker_response:
               processed_annotation = process_raw_annotation(worker_response, label_category_name)
               ground_truth_annotation = None
               consolidated_annotation = None
               if manifest_index in dict_manifest_index_gt:
                   ground_truth_annotation = dict_manifest_index_gt[manifest_index]
               consolidated_annotation = dict_manifest_index_consolidated[manifest_index]
               worker_accuracy_gt = get_worker_accuracy(processed_annotation, ground_truth_annotation)
               worker_accuracy_consolidated = get_worker_accuracy(processed_annotation, consolidated_annotation)
               worker_id = worker_response['workerId']
               if worker_id not in dict_worker_response:
                    dict_worker_response[worker_id] = []
               dict_worker_response[worker_id].append({'ManifestIndex': manifest_index, 'GTAccuracy': worker_accuracy_gt, 'ConsolidatedAccuracy':worker_accuracy_consolidated})

    return  dict_worker_response
        
    

In [116]:
def extract_ground_truth_file(ground_truth_file):
    dict_manifest_index_gt = {}
    if ground_truth_file is not None:
        bucket_name, key = get_bucket_key_from_s3_uri(ground_truth_file)
        content = (s3_client.get_object(Bucket = bucket_name, Key = key)['Body'].read().decode('utf-8'))
        content_list = content.split('\n')
        for x in content_list:
            if x:
                dict_ = json.loads(x)
                for key_, value_ in dict_.items():
                    dict_manifest_index_gt[key_] = value_
    return dict_manifest_index_gt

In [117]:
def extract_consolidated_file(consolidated_file, labeling_job_name):
    dict_manifest_index_consolidated = {}
    if consolidated_file is not None:
        i=0
        bucket_name, key = get_bucket_key_from_s3_uri(consolidated_file)
        content = (s3_client.get_object(Bucket = bucket_name, Key = key)['Body'].read().decode('utf-8'))
        content_list = content.split('\n')
        for x in content_list:
          if x:
              dict_manifest_index_consolidated[str(i)] = (json.loads(x)[labeling_job_name+'-metadata']['class-name'])
              i+=1
    return dict_manifest_index_consolidated
    

In [118]:
# labeling_job_output_location: Labeling job output file provided to the input of CreateLabelingJob. Requires the trailing
# slash to be present
# labeling_job_name: Labeling job name
# ground_truth_file: golden dataset in similar format to labeling job input manifest. Each line needs to be valid JSON
# s3_output_worker_metrics: Output s3 location which would get created to store worker metrics. 
# label_category_name: Optional arguement. Needs to be provided if overriding default label category name

In [119]:
def write_worker_metrics(labeling_job_output_location, labeling_job_name, ground_truth_file, s3_output_worker_metrics, label_category_name=None):
    # read the ground truth file and form a dict
    dict_manifest_index_gt = extract_ground_truth_file(ground_truth_file)
    output_manifest = labeling_job_output_location + '/' + labeling_job_name + '/manifests/output/output.manifest'
    
    # read the output manifest and form a dict
    dict_manifest_index_consolidated = extract_consolidated_file(output_manifest, labeling_job_name)
    
    # get the raw worker annotations for each data object id
    worker_response_root = labeling_job_output_location + '/' + labeling_job_name + '/annotations/worker-response'
    bucket_name, key = get_bucket_key_from_s3_uri(worker_response_root)
    
    
    all_keys_worker_response = get_bucket_keys(bucket_name, key)
    
    worker_response_dict = process_worker_responses(bucket_name, all_keys_worker_response, dict_manifest_index_gt, dict_manifest_index_consolidated, label_category_name)
    output_dict = {}
    
    for worker_id, accuracy_list in worker_response_dict.items():
        list_gt_accuracy = []
        list_consolidated_accuracy = []
        for y_ in accuracy_list:
            list_gt_accuracy.append(y_['GTAccuracy'])
            list_consolidated_accuracy.append(y_['ConsolidatedAccuracy'])
        num_data_objects_golden = len(list_gt_accuracy)-list_gt_accuracy.count(-1)
        list_gt_accuracy_only_annotated = [x for x in list_gt_accuracy if x != -1]

        if len(list_gt_accuracy_only_annotated) == 0:
            avg_gt_accuracy = 0
        else:
            avg_gt_accuracy = np.mean(list_gt_accuracy_only_annotated)
        
        avg_consolidated_accuracy = np.mean(list_consolidated_accuracy)
        num_data_objects = len(accuracy_list)
        output_dict[worker_id] = {'Total Number Of Objects Annotated': num_data_objects, \
                                  'Number Of Golden Standard Objects Annotated': num_data_objects_golden, \
                                  'Average Accuracy Compared To Other Workers': avg_consolidated_accuracy,
                                  'Average Golden Standard Accuracy': avg_gt_accuracy}
    
    pp = pprint.PrettyPrinter(indent=4)
    pp.pprint(output_dict)
    bucket_name, key = get_bucket_key_from_s3_uri(s3_output_worker_metrics)
    s3_client.put_object(Body=json.dumps(output_dict), Bucket=bucket_name, Key=key)
       
  

In [120]:
# Sample to invoke the script. Please replace with own inputs
bucket_root_url = 's3://gec-ml-blog'
golden_answers = bucket_root_url + '/' + 'golden.manifest'
worker_metrics_output = bucket_root_url + '/' + 'worker_metrics.json'
labeling_job_name = 'person-animal-plant'
labeling_job_output_location = bucket_root_url + '/' + 'output'
write_worker_metrics(labeling_job_output_location, labeling_job_name, golden_answers, worker_metrics_output)

{   'public.us-east-1.A17N3LOP5HGSQ8': {   'Average Accuracy Compared To Other Workers': 1.0,
                                           'Average Golden Standard Accuracy': 1.0,
                                           'Number Of Golden Standard Objects Annotated': 2,
                                           'Total Number Of Objects Annotated': 3},
    'public.us-east-1.A1C2T79XTDHE39': {   'Average Accuracy Compared To Other Workers': 1.0,
                                           'Average Golden Standard Accuracy': 0,
                                           'Number Of Golden Standard Objects Annotated': 0,
                                           'Total Number Of Objects Annotated': 1},
    'public.us-east-1.A1FNXSAI2PJ024': {   'Average Accuracy Compared To Other Workers': 1.0,
                                           'Average Golden Standard Accuracy': 1.0,
                                           'Number Of Golden Standard Objects Annotated': 1,
                     