# Introduction

This notebook will walk you through creating and monitoring your HITs. 

It provides methods to create HITs, pretty-print HIT and assignment status, expire/edit HITs, create qualifications, and download collected data. 

Before continuing, make sure that you have read the README and set all config fields to their desired values.

## Requirements: 

This code requires Python3 and the following packages: 
- boto3 
- beautiful soup 4

Before using, you will have to set up an authentication key to use the Amazon API and include it in a credentials file. See here: https://aws.amazon.com/developers/getting-started/python/

# Setup

Read the config file and establish a connection to MTurk.

A connection is made to production or to the sandbox based on values in the config. 

In [19]:
import datetime
import boto3
import json
import copy
import pprint
from bs4 import BeautifulSoup as bs 
from uuid import uuid4

In [20]:
# path to the config file 
CONFIG_PATH = "configs/config.json"

# where to save downloaded results 
SAVE_PATH = "assignments-prod-1.json" 

# Sandbox or Production? You only spend money in Production.
USING_PROD = False

In [21]:
# Safety flags that prevent you from accidentally messing up your HITs. 
# Set to False except when you are performing these specific tasks.
ALLOW_HIT_CREATION = True
ALLOW_ASSIGNMENT_ADDITION = False
ALLOW_CREATE_QUAL = False
ALLOW_UPDATE_EXPIRATION = False

In [22]:
# Read config and extract relevant settings
with open(CONFIG_PATH, 'r') as f:
    config = json.loads(f.read())
    
hit_config = config['hitCreation']

external_submit = hit_config['externalSubmit']
    
hit_url = hit_config['taskUrl']

if USING_PROD:
    print("USING PROD")
    endpoint_url = 'https://mturk-requester.us-east-1.amazonaws.com'
    origin="production"
else:
    print("USING SANDBOX")
    endpoint_url = 'https://mturk-requester-sandbox.us-east-1.amazonaws.com'
    origin="sandbox"

# If using an external link, add a querystring origin=sandbox or origin=production 
# for use in your js logic if you want. Not done for MTurk submits because it breaks the submit link
if external_submit: 
    hit_url = "%s?origin=%s" % (hit_url, origin)

if external_submit:
    print("Configuring task as external link with data submitted to: %s" % config['advanced']['externalSubmitUrl'])
else:
    print("Configuring task as an iframe within Mturk")
print("TASK URL: " + hit_url)

session = boto3.session.Session(profile_name='default')
cl = session.client('mturk', region_name='us-east-1', endpoint_url=endpoint_url)

USING SANDBOX
Configuring task as an iframe within Mturk
TASK URL: https://human-agent-collaboration.su.domains/experiment


# Make new HIT

In [23]:
# List of qualifications that you will use to filter potential workers. 
# These require that workers come from the US and have an approval rating >= 95%
# Edit this list to specify different qualifications for workers 
QUALS = [
    {
        'QualificationTypeId': '00000000000000000071',
        'Comparator': 'EqualTo',
        'LocaleValues': [{
            'Country': 'US',
        }],
    },
    {
        'QualificationTypeId': '000000000000000000L0',
        'Comparator': 'GreaterThanOrEqualTo',
        'IntegerValues': [
            95
        ],
    },
]

In [24]:
# Helpers for creating HITs. 

# generic helper that sets metadata fields based on the config file.
def create_hit(task, questionText, quals=QUALS): 
    response = cl.create_hit(
        MaxAssignments=task['numAssignments'],
        AutoApprovalDelayInSeconds=604800,
        LifetimeInSeconds=task['lifetime'],
        AssignmentDurationInSeconds=task['duration'],
        Reward=task['rewardAmount'],
        Title=task['title'],
        Keywords=task['keywords'],
        Description=task['description'],
        Question=questionText,
        QualificationRequirements=quals,
    )
    print(response)
    print("\n")

# creates a HIT in the form of an External Question inside an iFrame
def create_hit_iframe(task):
    url = _encode_qs(task['taskUrl'])
    questionText = "<ExternalQuestion xmlns=\"http://mechanicalturk.amazonaws.com/AWSMechanicalTurkDataSchemas/"
    questionText += "2006-07-14/ExternalQuestion.xsd\">\n<ExternalURL>" + url
    questionText += "</ExternalURL>\n  <FrameHeight>700</FrameHeight>\n</ExternalQuestion>"
    create_hit(task, questionText)
    
# Helper to create a HIT in the form of a simple UI with a link to an external page and an
# input box for a completion code 
def create_hit_external(task):
    with open('questionform_template.xml', 'r') as myfile:
        template=myfile.read() 
    question_xml = template % (hit_config["title"], hit_config["description"], hit_url)
    create_hit(task, question_xml)

# helper to encode the querystring as required by MTurk
def _encode_qs(url):
    return url.replace("&", "&amp;")

In [25]:
# Use this cell to launch your HIT! 
if ALLOW_HIT_CREATION: 
    if not (hit_config.get('variants', False) or hit_config.get('numTasks', False)): 
        raise RuntimeError("You must specify either hitCreation.numTasks or hitCreation.variants in your config.json file")
    
    hit_creation_function = create_hit_external if external_submit else create_hit_iframe
    
    if hit_config.get('numTasks', False): 
        print("creating " + str(hit_config['numTasks']) + " tasks")
        for i in range(hit_config['numTasks']):
            hit_creation_function(hit_config)
    else: 
        print("creating " + str(len(config['variants'])) + " variants")
        for var in hit_config['variants']: 
            task = copy.deepcopy(config)
            task.update(var)
            hit_creation_function(task)
else: 
    raise RuntimeError("This action is not currently enabled; set `ALLOW_HIT_CREATION` to true to proceed with this action")

creating 1 tasks
{'HIT': {'HITId': '3HJ1EVZS3XSR473JXQI48L53C9X3RG', 'HITTypeId': '3RDTAMZS7C1WL692VW0X0D9RV5ZXGS', 'HITGroupId': '3GV45MIM46S3XKVWVUR7062QBUVZM0', 'CreationTime': datetime.datetime(2021, 8, 23, 22, 55, 21, tzinfo=tzlocal()), 'Title': 'Play Overcooked with an AI agent', 'Description': 'Try to deliver as many soups as possible in the kitchen-themed video game Overcooked with an AI agent as your partner. Then, answer some questions about your experience.', 'Question': '<ExternalQuestion xmlns="http://mechanicalturk.amazonaws.com/AWSMechanicalTurkDataSchemas/2006-07-14/ExternalQuestion.xsd">\n<ExternalURL>https://human-agent-collaboration.su.domains/experiment</ExternalURL>\n  <FrameHeight>700</FrameHeight>\n</ExternalQuestion>', 'Keywords': 'overcooked, game, survey', 'HITStatus': 'Assignable', 'MaxAssignments': 9, 'Reward': '2.00', 'AutoApprovalDelayInSeconds': 604800, 'Expiration': datetime.datetime(2021, 8, 24, 22, 55, 21, tzinfo=tzlocal()), 'AssignmentDurationInSecond

# HIT monitoring helpers

Helper functions that will be useful for monitoring the status of your HIT. See next section for how to use them.

In [26]:
# Contacts MTurk API to get all assignments for a HIT
# Returns them in a list. 
def get_all_assignments(hitid): 
    assignments = []
    should_continue = True
    next_token = False
    while (should_continue): 
        args = {
            'HITId': hitid, 
            'MaxResults': 100
        }
        if (next_token): 
            args['NextToken'] = next_token
        r = cl.list_assignments_for_hit(**args)
        next_token = r.get('NextToken', False)
        assignments.extend(r["Assignments"])
        should_continue = len(r["Assignments"]) > 0
    return assignments

# Summarizes all hits in `hits` in a human-readable way. 
# Prints out the HIT Title, id, if it is expired, and how many assignments it has
# completed, pending, and left for work. 
def summarize_hits(hits): 
    print(len(hits))
    ret = ""
    for hit in hits: 
        expiration = hit['Expiration'].replace(tzinfo=None)
        is_expired = expiration < datetime.datetime.now()
        description = ("Title: {title}\n" 
        "ID: {hid}\n"
        "\tAssignments left: {left}\n"
        "\tAssignments completed: {complete}\n"
        "\tAssignments pending: {pending}\n"
        "\tExpired: {exp}\n\n").format(
            title=hit['Title'], 
            hid=hit['HITId'], 
            left=hit['NumberOfAssignmentsAvailable'], 
            complete=hit['NumberOfAssignmentsCompleted'], 
            pending=hit['NumberOfAssignmentsPending'],
            exp=str(is_expired)
        )
        ret += description
    print(ret)
    
# Prints a human-readable summary of all pending/submitted/approved assignments for all hits in `hits`
def summarize_assignments(hits):
    ret = ""
    for hit in hits: 
        hid = hit['HITId']
        title =  hit['Title']
        name = "HIT %s: %s" % (hid, title)
        ret += name + "\n"
        assignments = get_all_assignments(hid)
        if len(assignments) == 0: 
            ret += "\tNo pending/submitted/approved assignments for this HIT\n"
        for a in assignments: 
            desc = "\tAssignment {aid}\n\t\tStatus: {status}\n".format(aid=a['AssignmentId'], status=a['AssignmentStatus'])
            ret += desc
    print(ret)
    
# Refreshes data about the requested hits
def refresh_hits(): 
    global hits 
    global MAX_RESULTS
    hits = cl.list_hits(MaxResults=MAX_RESULTS)['HITs']

# HIT monitoring

In [31]:
MAX_RESULTS = 1
# API call to grab HIT data from MTurk 
hits = cl.list_hits(MaxResults=MAX_RESULTS)['HITs']

In [32]:
# Summarizes all outstanding HITs
refresh_hits()
summarize_hits(hits)

1
Title: Play Overcooked with an AI agent
ID: 3HJ1EVZS3XSR473JXQI48L53C9X3RG
	Assignments left: 8
	Assignments completed: 0
	Assignments pending: 0
	Expired: False




In [33]:
# Summarizes assignments for all oustanding HITs 
refresh_hits()
summarize_assignments(hits)

HIT 3HJ1EVZS3XSR473JXQI48L53C9X3RG: Play Overcooked with an AI agent
	Assignment 3TUI152ZZLWY5N7YN9KN3MBHE5I1QN
		Status: Submitted



# Approve HITs

Approves all outstanding assignments for the HITs displayed above. 

In [5]:
def approve_all(hits): 
    num_approved = 0
    for hit in hits: 
        # make sure you keep getting assignments 
        assignments = get_all_assignments(hit["HITId"])
        #print(assignments)
        for a in assignments: 
            if a['AssignmentStatus'] != 'Approved':
                print("Approving assignment")
                num_approved += 1
                cl.approve_assignment(AssignmentId=a['AssignmentId'])
    print("Approved %d assignments" % num_approved)

In [12]:
refresh_hits()
approve_all(hits)

Approving assignment
Approving assignment
Approving assignment
Approving assignment
Approving assignment
Approving assignment
Approving assignment
Approving assignment
Approving assignment
Approved 9 assignments


# Update expiration or num tasks

In [None]:
# changes the expiration date on a HIT to days_from_now days in the future
def update_expiration(hitid, days_from_now): 
    if ALLOW_UPDATE_EXPIRATION: 
        days = days_from_now*datetime.timedelta(days=1)
        expire_time = datetime.datetime.now() + days

        response = cl.update_expiration_for_hit(HITId=hitid, ExpireAt=expire_time)
        print(response)
        return response
    else: 
        raise RuntimeError("This action is not currently enabled; set `ALLOW_UPDATE_EXPIRATION` to true to proceed with this action")
    
def expire_hit(hit): 
    return update_expiration(hit, -10)

In [None]:
def add_assignments(hitid, num_assignments): 
    if ALLOW_ASSIGNMENT_ADDITION: 
        response = cl.create_additional_assignments_for_hit(
            HITId=hitid,
            NumberOfAdditionalAssignments=num_assignments
        )
        print(response)
        return response
    else: 
        raise RuntimeError("This action is not currently enabled; set `ALLOW_ASSIGNMENT_ADDITION` to true to proceed with this action")

In [None]:
# Use this cell to expire a HIT 
HIT_id_to_expire = "FILL THIS IN" 
expire_hit(HIT_id_to_expire)

In [None]:
# Use this cell to add assignments to a HIT 
HIT_id_to_add_assignments = "FILL THIS IN"
num_assignments_to_add = 0
add_assignments(HIT_id_to_add_assignments, num_assignments_to_add)

# Add custom qualifications 

## Add a qualification to disqualify workers who have done work before

- uses "negative qualification" method from https://github.com/cloudyr/MturkR/wiki/qualifications-as-blocks

#### NOTE: quals are kept separate for the sandbox and prod. Make sure you are creating and assigning your quals in prod. 

### Structure of a new qualification

In [None]:
NEW_QUAL = {
    'Name': 'qualName',
    'Keywords': 'Keywords for qual',
    'Description': 'What is this qual, and why are you assigning it?',
    'QualificationTypeStatus': 'Active',
    'AutoGranted': False
}

### Helpers for creating, viewing, and assigning qualifications

In [20]:
# Registers a custom qualification with MTurk 
def create_qual(new_qual):
    if ALLOW_CREATE_QUAL: 
        response = cl.create_qualification_type(**new_qual)
        print(response)
        Id = response['QualificationType']['QualificationTypeId']
        print("id", Id)
        return Id
    else: 
        raise RuntimException("This action is not currently enabled; set `ALLOW_CREATE_QUAL` to true to proceed with this action")
        
# Gets all the custom quals you have created and prints them
def list_quals(): 
    response = cl.list_qualification_types(
            Query='hasCompletedVisualGraphRecallTask',
            MustBeRequestable=False
    )
    print(response)
    
# Assigns a qualification to a worker 
def assign_qual(qual_id, worker_ids): 
    for worker in worker_ids: 
        response = cl.associate_qualification_with_worker(
                QualificationTypeId=qual_id, 
                WorkerId=worker,
                IntegerValue=1,
                SendNotification=False
        )
        print(response)
        assert response
        
# Gets the ids of all workers who worked on a particular hit 
def get_workers_for_hit(hitid): 
    a = get_all_assignments(hitid)
    workers = [a_['WorkerId'] for a_ in a]
    return workers
    
# Confirms that every worker in worker_ids has qual with qual_id
def confirm_quals(qual_id, worker_ids): 
    for w in worker_ids: 
        response = cl.get_qualification_score(
                QualificationTypeId=qual_id,
                WorkerId=w
        )
        response = response['Qualification']
        assert response['Status'] == 'Granted'
        assert response['IntegerValue'] == 1
        
# Assigns qual with `qual_id` to every worker who has completed an assignment for the hit with `hitid`
def assign_qual_for_hit(hitid, qual_id): 
    workers = get_workers_for_hit(hitid)
    print("got workers")
    assign_qual(qual_id, workers)
    print("assigned qual")
    confirm_quals(qual_id, workers)
    print("confirmed qual")

### Use the following cells to manipulate qualifications

In [None]:
# Use this cell to view the custom qualifications you have created
list_quals()

In [None]:
# Use this cell to create a new qual 
qual_to_create = {}
create_qual(qual_to_create)

In [None]:
# Use this cell to assign a custom qual to every worker who has done a specific HIT
hit_id = "FILL THIS IN"
qual_id_to_assign = "FILL THIS IN"
assign_qual_for_hit(hit_id, qual_id_to_assign)

# Create Compensation HIT

Mistakes happen, and sometimes they can lead to a worker who put in an honest effort being unable to complete a task and get paid. It's a good idea to compensate these workers when they reach out because it helps maintain relations with workers and is the right thing to do.

However, workers can only be paid upon completing a task. The workaround is to create a custom qualification, assign it to the worker you want to compensate, and create a no-work HIT requiring the custom qualification. This code does that.

In [18]:
# worker_ids is str[]
# compensation is str but should match the regex ^\d*\.\d\d$ (e.g. "1.00")
# for_hit_id is str -- optional, but helpful for records
def compensate_workers(worker_ids, compensation, for_hit_id=""):
    with open('compensation.xml', 'r') as myfile:
        question_xml=myfile.read()

    keywords = 'compensation'
    description = 'Compensation for HIT'
    if for_hit_id:
        keywords += ', ' + for_hit_id
        description += ' ' + for_hit_id

    # create qual, assign to workers
    custom_qual = {
        'Name': str(uuid4()), # a qual must have a unique name
        'Keywords': keywords,
        'Description': description,
        'QualificationTypeStatus': 'Active',
        'AutoGranted': False
    }
    qual_id = create_qual(custom_qual)
    assign_qual(qual_id, worker_ids)

    # create HIT requiring qual
    task = {
        'numAssignments': len(worker_ids),
        'lifetime': 3 * 24 * 60 * 60, # 3 days
        'duration': 5 * 60, # 5 min
        'rewardAmount': compensation,
        'title': description,
        'keywords': keywords,
        'description': description,
    }
    quals = [{
        'QualificationTypeId': qual_id,
        'Comparator': 'Exists',
        'ActionsGuarded': 'DiscoverPreviewAndAccept'
    }]
    create_hit(task, question_xml, quals)

In [25]:
worker_ids = [] # worker_id strings in a list
compensation = "0.00" # change to the amount of dollars you want to give
for_hit_id = "" # hit_id string (what you are compensating for)
compensate_workers(worker_ids, compensation, for_hit_id)

{'QualificationType': {'QualificationTypeId': '3ZKUQXDZ4GWPO7ECLTL0G0F7KLK828', 'CreationTime': datetime.datetime(2021, 8, 20, 17, 38, 47, tzinfo=tzlocal()), 'Name': '9042defd-5698-4c88-aab5-36fb63c64ae9', 'Description': 'Compensation for HIT Compensation hit for playing Overcooked, thanks for your time!', 'Keywords': 'compensation, Compensation hit for playing Overcooked, thanks for your time!', 'QualificationTypeStatus': 'Active', 'IsRequestable': True, 'AutoGranted': False}, 'ResponseMetadata': {'RequestId': '9f127279-adf9-4628-a740-f40fcc5d7ad2', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '9f127279-adf9-4628-a740-f40fcc5d7ad2', 'content-type': 'application/x-amz-json-1.1', 'content-length': '419', 'date': 'Fri, 20 Aug 2021 22:38:46 GMT'}, 'RetryAttempts': 0}}
id 3ZKUQXDZ4GWPO7ECLTL0G0F7KLK828
{'ResponseMetadata': {'RequestId': 'f1bf95ea-154d-4e2f-a2b8-539a7870cb51', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': 'f1bf95ea-154d-4e2f-a2b8-539a7870cb51', 'c

# Download data

Helper to download data from MTurk 

In [34]:
def pretty_print(obj):
    pp = pprint.PrettyPrinter(indent=4)
    pp.pprint(obj)
    pp = None

# Downloads all the assignments completed for `hits` as a list of dictionaries. 
# If a download_path is given, also saves that data as json 
def get_assignment_content(hits, download_path="", should_print=False): 
    all_responses = []
    for hit in hits: 
        hitid = hit['HITId']
        assignments = get_all_assignments(hitid)
        for a in assignments:
            a_xml = a['Answer']
            #print(a_xml)
            soup = bs(a_xml, "html.parser")
            answers = soup.find_all("answer")
            #print(answers)
            results = {'HITId': a['HITId'], 'AssignmentId': a['AssignmentId'], 'WorkerId': a['WorkerId']}
            for ans in answers: 
                identifier = ans.find('questionidentifier').string
                answer = ans.find('freetext').string
                try: 
                    results[identifier] = json.loads(answer)
                except:
                    results[identifier] = answer
            all_responses.append(results)
    if should_print: 
        pretty_print(all_responses)
    if download_path: 
        with open(download_path, 'w') as outfile: 
            json.dump(all_responses, outfile)
    return all_responses
            

In [35]:
# Use this cell to download data
# responses = get_assignment_content(hits, download_path=SAVE_PATH, should_print=True)
responses = get_assignment_content(hits, download_path=SAVE_PATH, should_print=False)

In [47]:
game_id = responses[0]['results']['data'][10]['trial_id']

In [46]:
responses[0]['results']['data'][10]

{'trial_id': '1629777679.4360483',
 'params': {'playerZero': 'human',
  'playerOne': 'sac_self_play_simple_0',
  'layouts': ['simple'],
  'gameTitle': 'Partner 4, Game 1',
  'gameType': 'overcooked_recorder',
  'gameTime': 60,
  'gameTotalPartners': 4,
  'gamePartnerNum': 4}}

In [39]:
import json
import pandas as pd
import matplotlib.pyplot as plt

In [42]:
f = open('../server/data/server/' + game_id + '.json')
  
data = json.load(f)
  
f.close()

In [48]:
data['trajectory'][0]

{'state': {'players': [{'position': [1, 2],
    'orientation': [0, -1],
    'held_object': None},
   {'position': [3, 1], 'orientation': [0, -1], 'held_object': None}],
  'objects': [],
  'order_list': []},
 'joint_action': [[0, 0], [1, 0]],
 'reward': 0,
 'time_left': 59.99704599380493,
 'score': 0,
 'time_elapsed': 0.0029556751251220703,
 'cur_gameloop': 1,
 'layout': [['X', 'X', 'P', 'X', 'X'],
  ['O', ' ', ' ', ' ', 'O'],
  ['X', ' ', ' ', ' ', 'X'],
  ['X', 'D', 'X', 'S', 'X']],
 'layout_name': 'simple',
 'trial_id': '1629777679.4360483',
 'player_0_id': '8f7067cd561f476abaaedbbb14b2a97d',
 'player_1_id': 'sac_self_play_simple_0_1',
 'player_0_is_human': True,
 'player_1_is_human': False,
 'info': {'curr_game_reward': 0}}