In [6]:
import requests 
import yaml
import json
import datetime as dt
import time
import os
import random
from azure.storage.blob import BlobServiceClient, BlobType
from email_validator import validate_email, EmailNotValidError


with open("api-key.yaml", "r") as file:
    data = yaml.full_load(file)

# SurveyMonkey Survey
SM_DATA = {
    "base_url":f"https://api.surveymonkey.com/v3/surveys/{data['sm']['survey-id']}", 
    "headers":{
        "Authorization": f"Bearer {data['sm']['access-token']}"
    }, 
    "survey-details-fp": "sm-survey-key.json" # path to saved question/answer key 
}


# CareerOneStop Skills Matcher 
COS_DATA = {
    "url":f"https://api.careeronestop.org/v1/skillsmatcher/{data['cs']['user-id']}",
    "headers":{
        "Authorization": f"Bearer {data['cs']['token-key']}"
    }, 
    "survey-details-fp": "cos-survey-key.json"
}

## -- Cloud storage and logging -- ## 
# This is set currently to Azure but could change 
# We would have more sophisticated logging in production, 
# e.g. use a dedicated logging service or not just uploading to a bucket, 
# use multiple logfiles in a naming system, different logfiles for type of log data,
# logging alerts for certain kinds of log events and messages, etc.

# AZ_CONNECTION_STR = data['az']['connection-str']
# AZ_CONTAINER_NAME = data['az']['container-name']

# blob_service_client = BlobServiceClient.from_connection_string(AZ_CONNECTION_STR)
# container_client = blob_service_client.get_container_client(AZ_CONTAINER_NAME)
log_file = "logfile.txt" 

## Any files in the script which are currently being read locally (from within app environment) might be read from cloud storage instead.
## A cloud copy of each file should be maintained, at least. 

---

**Utility/wrapper Functions**

In [27]:
## Logging wrapper 
def log_azure(log_data, log_file=log_file):
    print(log_data)
    return None # dummy function placeholder for testing until logging is configured 
    ## TO-DO: Logger should automatically add timestamp to log_data by default.
    """Log to Azure blob -- appends to logfile if it exists already."""
    # Check if log_file exists in container
    blob_client = container_client.get_blob_client(log_file)
    if not blob_client.exists():
        blob_client.upload_blob(log_data, blob_type=BlobType.AppendBlob)
    else:
        # Append the log data to the existing blob
        blob_properties = blob_client.get_blob_properties()
        offset = blob_properties.size
        blob_client.upload_blob(log_data, blob_type=BlobType.AppendBlob, length=len(log_data), offset=offset)

## GET request wrapper
def request(method:str, url:str, json:dict, headers:dict, max_attempts=2):
    """Wrapper for request with logging and retries."""

    attempts = 0
    while attempts < max_attempts: 
        try:
            start_time = time.time()
            if method == "GET":
                response = requests.get(url, headers=headers)
            elif method == "POST":
                response = requests.post(url, json=json, headers=headers)
            end_time = time.time()
            
            log_data = {
                "url":{url}, 
                "time":{dt.datetime.now()}, 
                "response_code":{response.status_code}, 
                "time_taken":f"{end_time - start_time:.2f}"
            }
            log_azure(f"INFO: {method} {log_data['url']} -- {log_data['response_code']} -- {log_data['time_taken']} -- {log_data['time']}")

            return response

        except Exception as e:
            error_data = {
                "url": url,
                "time": dt.datetime.now(),
                "message": str(e)
            }
            wait_time = random.randint(1,4)
            log_azure(f"ERROR: {method} {error_data['url']} -- {error_data['message']} -- {error_data['time']} -- Re-try in {wait_time} seconds.s")

            if attempts == 2: 
                raise Exception(e)

            response = None
            time.sleep(wait_time)
        
        attempts += 1

    return response 
    
## Load JSON -- handles/logs any errors gracefully and returns None: 
def load_json(file_path:str): 
    """Wrapper to load a JSON file and check if it exists"""
    try: 
        with open(file_path, "r") as file: 
            json_data = json.load(file)
        return json_data
    except Exception as e: 
        log_azure(f"ERROR loading {file_path}: {str(e)}.")
        return None

    
## Load to DB (TO-DO)
def load_to_db():
    """Load SM responses to DB"""

## Query DB 
def query_db(): 
    """Wrapper to query DB"""


---

**Webhook Setup** (TO-DO)


While the SM API webhook triggers every time a response is completed, it does not include the response_id of this new response.

So while the webhook can trigger our application to run every time a survey is submitted, we can't just retrieve the new survey response by its id -- we will have to retrieve all responses from SM in bulk, and filter for new responses.

In [17]:
## POST a webhook 
subscription_test_url = "https://webhook.site/7778cda8-d526-4c18-96e3-da7cf1676b11"
sm_webhooks_url = "https://api.surveymonkey.com/v3/webhooks"
header = SM_DATA['headers']
header['Content-Type'] = 'application/json'
json_data = {
    "name": "Test Webhook",
    "subscription_url": subscription_test_url,
    "event_type": "response_completed",
    "object_type": "survey",
    "object_ids": ["409346397"] # survey id 
}

response = requests.post(sm_webhooks_url, headers=header, json=json_data)
response.json()

{'id': '4661653',
 'name': 'Test Webhook',
 'event_type': 'response_completed',
 'subscription_url': 'https://webhook.site/7778cda8-d526-4c18-96e3-da7cf1676b11',
 'object_type': 'survey',
 'object_ids': ['409346397'],
 'href': 'https://api.surveymonkey.com/v3/webhooks/4661653'}

*SM sends HEAD request to test url, no content*
![sm_webhook_head.png](./img/sm_webhook_head.png)

In [25]:
%%bash 
curl --request GET \
  --url https://api.surveymonkey.com/v3/webhooks \
  --header 'Accept: application/json' \
  --header 'Authorization: Bearer MKGIdImxFYBgyHKOyrFs-HKv6giZK7pzt5Wn3gAVWGHSVSNpOjPJEtQPUsbFeseWag3x4RjSBskAfQlyqWYH898l8Dm6ML4BAE8-k-Jl1UFEwmDndCoUkAe6gE6c.NS.'

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100   234  100   234    0     0    639      0 --:--:-- --:--:-- --:--:--   644


{"data": [{"id": "4661653", "name": "Test Webhook", "href": "https://api.surveymonkey.com/v3/webhooks/4661653"}], "per_page": 50, "page": 1, "total": 1, "links": {"self": "https://api.surveymonkey.com/v3/webhooks?per_page=50&page=1"}}

In [27]:
%%bash
curl --request DELETE \
  --url https://api.surveymonkey.com/v3/webhooks/4661653 \
  --header 'Accept: application/json' \
  --header 'Authorization: Bearer MKGIdImxFYBgyHKOyrFs-HKv6giZK7pzt5Wn3gAVWGHSVSNpOjPJEtQPUsbFeseWag3x4RjSBskAfQlyqWYH898l8Dm6ML4BAE8-k-Jl1UFEwmDndCoUkAe6gE6c.NS.'

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed


100   272  100   272    0     0    764      0 --:--:-- --:--:-- --:--:--   770


{"id": "4661653", "name": "Test Webhook", "event_type": "response_completed", "subscription_url": "https://webhook.site/7778cda8-d526-4c18-96e3-da7cf1676b11", "object_type": "survey", "object_ids": ["409346397"], "href": "https://api.surveymonkey.com/v3/webhooks/4661653"}

In [4]:
# ## Get Triggered by Webhook (TO-DO)
# def webhook_trigger() -> dict: 
#     """Placeholder for SurveyMonkey webhook endpoint/trigger and reading payload."""


#     ## Example webhook callback response object: https://developer.surveymonkey.com/api/v3/#api-endpoints--webhook-callbacks
#     placeholder_response = {
#     "name": "My Webhook",
#     "filter_type": "collector",
#     "filter_id": "123456789",
#     "event_type": "response_completed",
#     "event_id": "123456789",
#     "object_type": "response",
#     "object_id": "123456",
#     "event_datetime": "2016-01-01T21:56:31.182613+00:00",
#     "resources": {
#         "respondent_id": "114409718452", # Replaced this with Kamran's response 
#         "recipient_id": "123456789",
#         "collector_id": "123456789",
#         "survey_id": "123456789",
#         "user_id": "123456789"
#         }
#     }
    
#     return placeholder_response

In [None]:
# url = "https://api.surveymonkey.com/v3/webhooks"

# # Define the headers
# headers = {
#     "Accept": "application/json",
#     "Authorization": f"Bearer {data['sm']['access-token']}",
#     "Content-Type": "application/json",
# }

# # Define the data payload
# data = {
#     "name": "Response Complete Webhook",
#     "subscription_url": "https://surveymonkey.com/webhook_receiver",
#     "authorization": "xyz", 
#     "verify_ssl": False, # not safe for production
#     "event_type": "response_completed",
#     "object_type": "survey",
#     "object_ids": [str(data['sm']['survey-id'])]
# }

# # Send the POST request
# response = requests.post(url, headers=headers, json=data)



---

##### **Main Functions** 

* `get_qa_key()` - GET (or load cached copy) of question/answer key from SM or COS 

<br>

* `combine_qa_keys()` - Combine the SM and COS question/answer keys into one combined key/translation map between the APIs
    - Generates a refreshed map if a change is detected in the SM survey or the COS survey. 
        - The COS survey should not change at all. 
        - Changes in the SM survey should not affect its ability to match the answer keys in the COS survey.

<br>

* `get_sm_responses()` - GET SM survey responses 

<br>

* `process_sm_responses()` - filter and process new SM responses from get_sm_responses()
    - Checks against DB for already processed responses 
    - Checks if response includes valid email address (`has_valid_email()`)
    - Checks for unexpected response ids vs. the combined Q/A key.
    - Loads new responses into database (into 'processing' table) until they are finished

<br>

* `translate_post_cos()` - Use combined answer to translate the processed SM responses to COS JSON format
    - Loads responses from processing table in
    - Adds matching COS information to response questions/answers 
    - If response is missing a required skills-survey answer, fills with "beginner" 

<br>

* `compose_email()` - Extract list of recommended jobs from COS response and compose an email.

In [28]:
## GET/Load question-answer keys for SurveyMonkey Survey and CareerOneStop Skills Matcher
def get_qa_key(api=None, fetch=False) -> dict:
    """
    Load list of questions/answers from either the Survey Monkey API `/details` endpoint or from CareerOneStop. 

    Args: 

    api (str):   Must be one of 'sm' (for Skills Monkey Survey) or 'cos' (for CareerOneStop)

    fetch (bool):   Whether to GET new question/answer key from api or to just use locally saved copy (default is False). 

          If loading our cached copy fails, automatically set to True.

          If fetch == True:
            If the GET request fails, we use our cached copy.
            If the question/answer details have changed, we update our cached copy. Any such changes may break combine_qa_keys() and this app as a whole.            

        Note 500 requests/month limit to SM -- if going to use fetch option, may want to only do so periodically.

    """

    # Set SM vs. COS variables
    if api == "sm": 
        url = f"{SM_DATA['base_url']}/details"
        headers = SM_DATA['headers']
        cached_fp = SM_DATA['survey-details-fp']
    elif api == "cos":
        url = COS_DATA['url']
        headers = COS_DATA['headers']
        cached_fp = COS_DATA['survey-details-fp']
    else:
        raise Exception("`api` must be one of `sm` (SurveyMonkey) or `cos` (CareerOneStop)")
    
    # Load cached details 
    cached_key = load_json(cached_fp)
    if cached_key is None: 
        log_azure(f"WARNING: Loading {cached_fp} failed. Fetching new {api.upper()} key.")
        fetch = True

    ## Attempt Request (if fetch == True)
    fetched_key = None
    if fetch: 
        try: 
            response = request(url=url, headers=headers, method="GET")
            if response.status_code != 200:
                log_azure(f"WARNING: GET {api.upper()} survey details -- Response Code: {response.status_code} -- Proceeding with cached file: {cached_fp}")
            else: 
                fetched_key = response.json()
        except Exception as e: 
            log_azure(f"ERROR: GET {api.upper()} survey details -- Error: {str(e)} -- Proceeding with cached file: {cached_fp}")

    ## Return Block
    if fetched_key is None and cached_key is None:
        raise Exception(f"ERROR: Failed both to fetch new copy and to load cached copy of {api.upper()} Q/A key.")  
    elif fetched_key is None: 
        return cached_key
    elif fetched_key != cached_key: # This block will never run so long as one of 
        log_azure(f"WARNING: GET {api.upper()} survey details -- Fetched Q/A key conflicts with cached copy {cached_fp} -- Updating.")           
        with open(cached_fp, "w") as file: 
            json.dump(fetched_key, file)
        return fetched_key
    else: 
        log_azure(f"INFO: GET {api.upper()} survey details -- Fetched Q/A key matches cached copy {cached_fp}")
        return fetched_key

In [7]:
## Create translation map from Survey Monkey key to COS key 
def combine_qa_keys(fetch=False) -> dict: 
    """Creates translation map from SM to COS using the question/answer keys of each.
    
    Args: 

    fetch (bool): Setting for get_qa_key() -- whether to only load local cache of question/answer keys or to fetch new copies.
    
    """

    ## Get question/answer keys 
    sm_key = get_qa_key("sm", fetch=fetch)
    cos_key = get_qa_key("cos", fetch=fetch)

    ## Prepare translation map
    combined_map = {
        'non-skills-matcher':[], # not to send to COS (background questions)
        'skills-matcher':[], # to send to COS skills matcher 
    }

    ## Adding relevant SM information to map
    for p in sm_key['pages']:
        for q in p['questions']:
            question_type = 'skills-matcher' if "skills matcher" in p['title'].lower()  else "non-skills-matcher" 
            # ^^ page title filter fails for demo -- limited to one page (see line 35)
            answer_ids = [d['id'] for d in q['answers']['choices']] if 'answers' in q.keys() else None
         
            combined_map[question_type].append({
                'question_id':{'sm':q['id']},
                'question_number':{'sm':q['position']},
                'question_text':{'sm':[h['heading'] for h in q['headings']]},
                'answer_ids':answer_ids
            })

    # Moving skills matcher/non skills matcher questions for demo, since page filter fails for demo (only one page) (TO-DO: remove when using official survey)
    combined_map['skills-matcher'] = combined_map['non-skills-matcher'][1:]
    combined_map['non-skills-matcher'] = [combined_map['non-skills-matcher'][0]]

    ## Adding relevant COS information 
    # Check that the number of questions to send to the COS API matches the expected amount -- this would trip on test run with demo survey
    # if len(combined_map['skills-matcher']) != len(cos_key['Skills']):
    #     log_azure(f"ERROR: No. of skills-matcher questions retrieved from SM {len(combined_map['skills-matcher'])} doesn't match number in COS {len(cos_key['Skills'])}")
    #     raise Exception

    for n in range(len(combined_map['skills-matcher'])): 
        cos_q = cos_key['Skills'][n]
        cos_answer_ids = [cos_q["DataPoint20"],
                        cos_q["DataPoint35"], 
                        cos_q["DataPoint50"], 
                        cos_q["DataPoint65"], 
                        cos_q["DataPoint80"]]

        combined_map['skills-matcher'][n]['question_id']['cos'] = cos_q['ElementId']
        combined_map['skills-matcher'][n]['question_number']['cos'] = n + 1 # correcting for 0 index
        combined_map['skills-matcher'][n]['question_text']['cos'] = cos_q['Question']
        combined_map['skills-matcher'][n]['answer_ids'] = dict(zip(combined_map['skills-matcher'][n]['answer_ids'], cos_answer_ids))

    ## Casting question lists to dictionary for easier lookup in translation -- keeping these as lists made the previous insertion step easier
    combined_map['skills-matcher'] = {q['question_id']['sm']:q for q in combined_map['skills-matcher']}
    combined_map['non-skills-matcher'] = {q['question_id']['sm']:q for q in combined_map['non-skills-matcher']}

    return combined_map

In [29]:
# sm_key = get_qa_key(api="sm", fetch=False) 
# cos_key = get_qa_key(api="cos", fetch=False) 
# combined_map = combine_qa_keys(fetch=False)


# print("Survey Monkey Q/A Key:".upper())
# display(sm_key)
# print("\nCareerOneStop Q/A Key:".upper())
# display(cos_key)
# print("\nCombined SM/COS Q/A Key".upper())
# display(combined_map)

In [20]:
def get_sm_api_response(per_page=10000, test_mode=True):
    """GET list of all survey responses from /surveys/{id}/responses/bulk?per_page={per_page}. 

    Arg: 

    per_pages (int): The amount of responses to retrieve per page. 

        - The SM API returns survey responses in pages, up to `per_page` responses per page. 
        - The SM API response begins with the *earliest* survey responses, NOT the most recent responses.
            - If all survey responses do not fit on one page, the response object of the GET request will include a link to the page 
            of the most recent responses. See `example_multiple_pages.json` for an example, in the `links.last` attribute.
        - Once the amount of total responses exceeds `per_page`, we will have to make a second API call to the most recent page 
        for the most recent responses.
            - I've set `per_page=10000` as a hacky way to get around this. The function will make the second API request if this amount is exceeded

    test (bool): mode to reducing API request by just loading cached copy.

    """
    if test_mode:
        with open("sm_responses.json", "r") as file:
            return json.load(file) 

    url =  SM_DATA['base_url'] + f"/responses/bulk?per_page={per_page}"
    response = request(url, headers=SM_DATA['headers'])
    response_json = response.json()

    if 'last' in response_json['links'].keys():
        url = response_json['links']['last']
        log_azure(f"WARNING: We've miraculously exceeded {per_page} survey responses. Making second request to {url}.")
        response = request(url, headers=SM_DATA['headers'])
        response_json = response.json()

    return response_json

def has_valid_email(sm_survey_response:dict) -> bool: 
    """Check if SM survey response includes a valid email address in the email question.
    Use to skip a response in process_sm_responses().
    """
    # TO-DO: The Survey Monkey Survey should at least validate the text format of the email address (though it doesn't support deliverability validation).
    # Link: https://help.surveymonkey.com/en/surveymonkey/create/validating-text-fields/

    # Load translation map
    combined_map = combine_qa_keys()

    # Identify the question_id of the email question in the answer key  
    email_question_id = [q['question_id']['sm'] for q in combined_map['non-skills-matcher'].values() 
                            if 'email' in q['question_text']['sm'][0].lower()][0]
    
    # Check if sm_response contains the email question (if any question is ommitted, the respondent left it blank)      
    questions = [q for p in sm_survey_response['pages'] for q in p['questions']]
    if not any(q['id'] == email_question_id for q in questions): 
        return False 
    
    # Validate email if one was provided 
    email_address = [q for q in questions if q['id'] == email_question_id][0]['answers'][0]['text']
    try:
        validate_email(email_address, check_deliverability=True)
        return True
    except EmailNotValidError as e:
        log_azure(f"WARNING: {sm_survey_response['id']} contains invalid email address: {email_address} -- {str(e)}. Skipping.")
        return False 


def process_sm_responses(sm_api_response) -> list[dict]: 
    """Filter and process new SM survey responses from get_sm_api_response()


        - Checks for unexpected question ids vs. the combined Q/A key.
        - Checks against DB for already processed responses 
        - Checks if response includes valid email address (`has_valid_email()`)
        - Loads (raw) new responses into database (into 'processing' table) until they are finished
           - When these responses are successfully sent to COS and then emailed to user, they will be moved to main DB table.
    """

    ## -- Read list of already processed responses from database (TO-DO) -- ##
    # Database useful to check against repeated user email or IP + answers (avoid redundant emails), for retries of failures 

    placeholder_processed_response_ids = []
    ## -------------------------------------------------------------------- ## 

    # Create/load question answer/key map 
    combined_map = combine_qa_keys()    

    ## 1.) Check for unexpected question ids in new responses vs. those in COS translation map. Attempt to refresh keys if there's a mismatch.
    # If there are still unexpected ids, there's probably an error with combine_qa_keys().
    new_resp_question_ids = set(q['id'] for resp in sm_api_response['data'] for p in resp['pages'] for q in p['questions']
                    if q['id'] not in placeholder_processed_response_ids)
    refreshes = 0
    while refreshes < 2: 
        skills_matcher_ids = set(combined_map['skills-matcher'].keys())
        non_skills_matcher_ids = set(combined_map['non-skills-matcher'].keys())
        expected_ids = skills_matcher_ids.union(non_skills_matcher_ids)
        unexpected_ids = new_resp_question_ids.difference(expected_ids)

        if len(unexpected_ids) > 0: 
            if refreshes == 0:  
                log_azure(f"WARNING: Unexpected question ids in SM responses: {unexpected_ids}. Refreshing question/answer key map.")
                combined_map = combine_qa_keys(fetch=True)
            elif refreshes == 1: 
                log_azure(f"ERROR: Unexpected question ids remain after refresh: {unexpected_ids}.") 
                raise Exception(f"ERROR: Unexpected question ids remain after refresh: {unexpected_ids}.")
            refreshes += 1
        else: 
            break 

    ## 2.) Filter for new survey responses and survey responses that have valid email addresses
    processed_responses = []
    for resp in sm_api_response['data']: 
        if resp['id'] not in placeholder_processed_response_ids and has_valid_email(resp): 

            resp_dict = {
            'response_id':resp['id'],
            'collector_id':resp['collector_id'], 
            'questions':[] 
            }

            # Add questions information
            for p in resp['pages']:
                for q in p['questions']:

                    # Get matching question dictionary from combined_map, based on question type and SM question_id
                    question_type = 'non-skills-matcher' if q['id'] in non_skills_matcher_ids else 'skills-matcher'
                    q_map = combined_map[question_type][q['id']] 

                    # Get rest of information from q_map
                    question_number = {'sm':q_map['question_number']['sm']}
                    if 'cos' in q_map['question_number'].keys():
                        question_number['cos'] = q_map['question_number']['cos']
                    
                    question_id = {'sm':q_map['question_id']['sm']}
                    if 'cos' in q_map['question_id'].keys():
                        question_id['cos'] = q_map['question_id']['cos']

                    if question_type == 'skills-matcher': 
                        answers = [{'sm':a['choice_id'],
                                    'cos':q_map['answer_ids'][a['choice_id']]} for a in q['answers']]
                    else: 
                        answers = [{'sm':a} for a in q['answers']]
                    # answers = []
                    # for a in q['answers']: 
                        # if 'choice_id' in a.keys(): 
                        #     answers.append({'sm':a['choice_id']})
                        # else: 
                        #     answers.append({'sm':a})

                    resp_dict['questions'].append({'question_id':question_id, 
                                                   'question_number':question_number, 
                                                   'question_type':question_type, 
                                                   'answers':answers})
                                                
            processed_responses.append(resp_dict)

    ## -- 3. Write (raw) new responses to processing table in DB (TO-DO)  -- ## 
    # load_to_db(...)
    ## -------------------------------------------------------------------- ## 

    return processed_responses

In [23]:
def translate_post_cos(processed_sm_responses): 
    """Translate processed SM survey responses to COS POST objects, retrieve responses."""

    cos_data = []
    for resp in processed_sm_responses:
        
        cos_request = {'SKAValueList':
                    [{'ElementId':q['question_id']['cos'], 
                      'DataValue':str(q['answers'][0]['cos'])} for q in resp['questions'] 
                        if q['question_type'] == 'skills-matcher']}
        
        cos_response = request(method="POST", 
                             url=COS_DATA['url'],
                             json=cos_request, 
                             headers=COS_DATA['headers'])
        
        cos_data.append({'sm_response_id':resp['response_id'],
                         'cos_request': cos_request, 
                         'cos_response':cos_response})

    return cos_data

In [30]:
sm_api_response = get_sm_api_response(test_mode=True)
processed_sm_responses = process_sm_responses(sm_api_response)
cos_requests = translate_post_cos(processed_sm_responses)

INFO: POST {'https://api.careeronestop.org/v1/skillsmatcher/XjV8e71wBCteYXb'} -- {200} -- 0.49 -- {datetime.datetime(2023, 9, 29, 23, 9, 24, 377236)}
