# Setup

In [None]:
# path to the config file (see example). 
# will work with a config file for https://github.com/kimberli/mturk-template, 
# but a minimal example is included.
# Also supports an additional feature, the "variants" key, which is a list of dictionaries. 
# If "variants" is specified, for each dictionary it contains, those keys will be meshed with the "hitCreation"
# key and one task will be made per variant. Else, config["hitCreation"]["numTasks"] versions of the same
# task will be launched. 
CONFIG_PATH = "./config.json"

# where to save downloaded results 
SAVE_PATH = "." 

In [None]:
from boto3 import client
import json
import copy

_USING_PROD = None

with open(CONFIG_PATH, 'r') as f:
    config = json.loads(f.read())
    hit_config = config['hitCreation']

if hit_config['production']:
    print("USING PROD")
    _USING_PROD = True
    endpoint_url = 'https://mturk-requester.us-east-1.amazonaws.com'
else:
    print("USING SANDBOX")
    _USING_PROD = False
    endpoint_url = 'https://mturk-requester-sandbox.us-east-1.amazonaws.com'
        
cl = client('mturk', region_name='us-east-1', endpoint_url=endpoint_url)

print("TASK URL: " + hit_config['taskUrl'])


# Make new HIT

In [None]:
# Safety flags that prevent you from accidentally messing up your HITs. 
# Set to False except when you are performing these specific tasks. 
ALLOW_HIT_CREATION = False
ALLOW_ASSIGNMENT_ADDITION = False
ALLOW_CREATE_QUAL = False
ALLOW_UPDATE_EXPIRATION = False

In [None]:
# List of qualifications that you will use to filter potential workers. 
# These require that workers come from the US and have an approval rating >= 95%
QUALS = [
       {
           'QualificationTypeId': '00000000000000000071',
           'Comparator': 'EqualTo',
           'LocaleValues': [{
               'Country': 'US',
           }],
       },
        
       {
           'QualificationTypeId': '000000000000000000L0',
           'Comparator': 'GreaterThanOrEqualTo',
           'IntegerValues': [
               95
           ],
       },
    ]

In [None]:
# creates a HIT in the form of an External Question inside an iFrame
def create_hit(task):
    questionText = "<ExternalQuestion xmlns=\"http://mechanicalturk.amazonaws.com/AWSMechanicalTurkDataSchemas/"
    questionText += "2006-07-14/ExternalQuestion.xsd\">\n<ExternalURL>" + task['taskUrl']
    questionText += "</ExternalURL>\n  <FrameHeight>700</FrameHeight>\n</ExternalQuestion>"

    response = cl.create_hit(
        MaxAssignments=task['numAssignments'],
        AutoApprovalDelayInSeconds=604800,
        LifetimeInSeconds=task['lifetime'],
        AssignmentDurationInSeconds=task['duration'],
        Reward=task['rewardAmount'],
        Title=task['title'],
        Keywords=task['keywords'],
        Description=task['description'],
        Question=questionText,
        QualificationRequirements=QUALS,
    )

    print(response)
    print("\n")

In [None]:
if ALLOW_HIT_CREATION: 
    if config.get('variants', None): 
        print("creating " + str(len(config['variants'])) + " variants")
        for var in config['variants']: 
            task = copy.deepcopy(config)
            task.update(var)
            create_hit(task)
    else: 
      print("creating " + str(hit_config['numTasks']) + " tasks")
      for i in range(hit_config['numTasks']):
          create_hit(hit_config)

# HIT monitoring helpers

In [None]:
MAX_RESULTS = 1 # set equal to the number of outstanding hits you have 

hits = cl.list_hits(MaxResults=MAX_RESULTS)['HITs']

In [None]:
# Gets all assignments created for a HIT
def get_all_assignments(hitid): 
    assignments = []
    should_continue = True
    next_token = False
    while (should_continue): 
        args = {
            'HITId': hitid, 
            'MaxResults': 100
        }
        if (next_token): 
            args['NextToken'] = next_token
        r = cl.list_assignments_for_hit(**args)
        next_token = r.get('NextToken', False)
        assignments.extend(r["Assignments"])
        should_continue = len(r["Assignments"]) > 0
    return assignments

In [None]:
import datetime

# Summarizes all hits in `hits` in a human-readable way 
def summarize_hits(hits): 
    print(len(hits))
    ret = ""
    for hit in hits: 
        expiration = hit['Expiration'].replace(tzinfo=None)
        is_expired = expiration < datetime.datetime.now()
        description = ("Title: {title}\n" 
        "ID: {hid}\n"
        "\tAssignments left: {left}\n"
        "\tAssignments completed: {complete}\n"
        "\tAssignments pending: {pending}\n"
        "\tExpired: {exp}\n\n").format(
            title=hit['Title'], 
            hid=hit['HITId'], 
            left=hit['NumberOfAssignmentsAvailable'], 
            complete=hit['NumberOfAssignmentsCompleted'], 
            pending=hit['NumberOfAssignmentsPending'],
            exp=str(is_expired)
        )
        ret += description
    print(ret)

In [None]:
# Summarizes all pending/submitted/approved assignments for all hits in `hits`
def summarize_assignments(hits):
    ret = ""
    for hit in hits: 
        hid = hit['HITId']
        title =  hit['Title']
        name = "HIT %s: %s" % (hid, title)
        ret += name + "\n"
        assignments = get_all_assignments(hid)
        for a in assignments: 
            desc = "\tAssignment {aid}\n\t\tStatus: {status}\n".format(aid=a['AssignmentId'], status=a['AssignmentStatus'])
            ret += desc
    print(ret)

In [None]:
def refresh_hits(): 
    global hits 
    global MAX_RESULTS
    hits = cl.list_hits(MaxResults=MAX_RESULTS)['HITs']

# HIT monitoring

In [None]:
refresh_hits()
hits

In [None]:
refresh_hits()

summarize_hits(hits)

In [None]:
refresh_hits()
summarize_assignments(hits)
pass

# Approve HITs

In [None]:
# Approves all outstanding hits created for the HITs in hits 
def approve_all(hits): 
    num_approved = 0
    for hit in hits: 
        # make sure you keep getting assignments 
        assignments = get_all_assignments(hit["HITId"])
        #print(assignments)
        for a in assignments: 
            if a['AssignmentStatus'] != 'Approved':
                print("Approving assignment")
                num_approved += 1
                cl.approve_assignment(AssignmentId=a['AssignmentId'])
    print("Approved %d assignments" % num_approved)

In [None]:
refresh_hits()
approve_all(hits)

# Update expiration or num tasks

In [None]:
import datetime 

# changes the expiration date on a HIT to days_from_now days in the future
def update_expiration(hitid, days_from_now): 
    if ALLOW_UPDATE_EXPIRATION: 
        days = days_from_now*datetime.timedelta(days=1)
        expire_time = datetime.datetime.now() + days

        response = cl.update_expiration_for_hit(HITId=hitid, ExpireAt=expire_time)
        print(response)
        return response
    else: 
        raise RuntimeException("This action is not currently enabled; set `ALLOW_UPDATE_EXPIRATION` to true to proceed with this action")
    
def expire_hit(hit): 
    return update_expiration(hit, -10)

In [None]:
def add_assignments(hitid, num_assignments): 
    if ALLOW_ASSIGNMENT_ADDITION: 
        response = cl.create_additional_assignments_for_hit(
            HITId=hitid,
            NumberOfAdditionalAssignments=num_assignments
        )
        print(response)
        return response
    else: 
        raise RuntimException("This action is not currently enabled; set `ALLOW_ASSIGNMENT_ADDITION` to true to proceed with this action")

# Add custom qualifications 

## Add a qualification to disqualify workers who have done work before

- uses "negative qualification" method from https://github.com/cloudyr/MturkR/wiki/qualifications-as-blocks

### NOTE: quals are kept separate for the sandbox and prod. Make sure you are creating and assigning your quals in prod. 

In [None]:
# structure of a new qualification 
NEW_QUAL = {
    'Name': 'qualName',
    'Keywords': 'Keywords for qual',
    'Description': 'What is this qual, and why are you assigning it?',
    'QualificationTypeStatus': 'Active',
    'AutoGranted': False
}

In [None]:
def create_qual(new_qual):
    if ALLOW_CREATE_QUAL: 
        response = cl.create_qualification_type(**new_qual)
        print(response)
        Id = response['QualificationTypeId']
        print("id", Id)
        return Id
    else: 
        raise RuntimException("This action is not currently enabled; set `ALLOW_CREATE_QUAL` to true to proceed with this action")

In [None]:
# Gets all the custom quals you have created. 
def list_quals(): 
    response = cl.list_qualification_types(
            Query='hasCompletedVisualGraphRecallTask',
            MustBeRequestable=False
    )
    print(response)

list_quals()

In [None]:
def assign_qual(qual_id, worker_ids): 
    for worker in worker_ids: 
        response = cl.associate_qualification_with_worker(
                QualificationTypeId=qual_id, 
                WorkerId=worker,
                IntegerValue=1,
                SendNotification=False
        )
        print(response)
        assert response
        
def get_workers_for_hit(hitid): 
    a = get_all_assignments(hitid)
    workers = [a_['WorkerId'] for a_ in a]
    return workers
    
def confirm_quals(qual_id, worker_ids): 
    for w in worker_ids: 
        response = cl.get_qualification_score(
                QualificationTypeId=qual_id,
                WorkerId=w
        )
        response = response['Qualification']
        assert response['Status'] == 'Granted'
        assert response['IntegerValue'] == 1
        
# Assigns qual with `qual_id` to every worker who has completed an assignment for the hit with `hitid`
def assign_qual_for_hit(hitid, qual_id): 
    workers = get_workers_for_hit(hitid)
    print("got workers")
    assign_qual(qual_id, workers)
    print("assigned qual")
    confirm_quals(qual_id, workers)
    print("confirmed qual")

# Download data

In [None]:
from bs4 import BeautifulSoup as bs 
import pprint

def pretty_print(obj):
    pp = pprint.PrettyPrinter(indent=4)
    pp.pprint(obj)
    pp = None

# Downloads all the assignments completed for `hits` as a list of dictionaries. 
# If a download_path is given, also saves that data as json 
def get_assignment_content(hits, download_path="", should_print=False): 
    all_responses = []
    for hit in hits: 
        hitid = hit['HITId']
        assignments = get_all_assignments(hitid)
        for a in assignments:
            a_xml = a['Answer']
            #print(a_xml)
            soup = bs(a_xml, "lxml")
            answers = soup.find_all("answer")
            #print(answers)
            results = {'HITId': hitid}
            for ans in answers: 
                identifier = ans.find('questionidentifier').string
                answer = ans.find('freetext').string
                try: 
                    results[identifier] = json.loads(answer)
                except:
                    results[identifier] = answer
            all_responses.append(results)
    if should_print: 
        pretty_print(all_responses)
    if download_path: 
        with open(download_path, 'w') as outfile: 
            json.dump(all_responses, outfile)
    return all_responses
            

In [None]:
responses = get_assignment_content(hits, download_path=SAVE_PATH, should_print=False)
len(responses)