## Dependencies

In [None]:
%%capture

%pip install ray[all] requests

In [None]:
import os
import ray
import datetime
import requests

# IPython tools
from IPython.display import clear_output

In [None]:
_ = ray.init(redis_max_memory=10**9, object_store_memory=7.8**9)

## Parameters

In [None]:
api_url = "https://apimapa.sicor.com.co"
api_username = ""
api_password = ""

test_patient_id = 5331

pull_data_end_date = "2017-11-07"

concurrent_workers = 100
max_consecutive_error = 150
api_data_save_path = "./api-data/"
api_error_save_path = "./ERROR"

## Build URLs

In [None]:
import urllib.parse


auth_url = urllib.parse.urljoin(api_url, "login")
map_meta_data_url = urllib.parse.urljoin(api_url, "get_mapa/") 
map_data_url = urllib.parse.urljoin(api_url, "tabla_mediciones/")
map_measure_url = urllib.parse.urljoin(api_url, "MAPA/")
map_drug_url = urllib.parse.urljoin(api_url, "medicamentos/")

pull_data_end_date = datetime.datetime.strptime(pull_data_end_date, "%Y-%m-%d")

## Get latest patient Id

In [None]:
import glob


def get_max_patient_id(api_data_save_path):
    files = glob.glob(os.path.join(api_data_save_path, "*.json"))

    if files:
        return max([int(os.path.basename(file).split('.')[0]) for file in files])z

    return 0


start_patient_id = get_max_patient_id(api_data_save_path)
start_patient_id

## Get data from API

In [None]:
def get_api_token(url, username, password):
    """ Get authentication token to access to other API URLs

    Parameters:
        url (str): URL from where token is going te be pulled
        username (str): API username
        password (str): API password
    
    Returns:
        str: API Euthentication token
    """
    
    payload = {
        "user": username,
        "password": password,
    }
    
    response = requests.post(url, data=payload)
    response.raise_for_status()
    
    return response.json()['res']

In [None]:
api_token = get_api_token(auth_url, api_username, api_password)

In [None]:
import urllib.parse


def get_api_data(url, token, patient_id):
    """ Get data from an specific API URL
    
    Parameters:
        url (str): API url to make a GET request to get JSON data
        token (str): Authentication token needed for the request
        patient_id (int): The patient ABPM test ID 

    Returns:
        dict: JSON data from the API 
    """
    
    url = urllib.parse.urljoin(url, f"{patient_id}/")
   
    response = requests.get(
        url,
        headers = {
            "authorization": f"Bearer {token}",
        },
    )

    response.raise_for_status()
    
    return response.json()

In [None]:
_ = get_api_data(map_data_url, api_token, test_patient_id)
_ = get_api_data(map_measure_url, api_token, test_patient_id)
_ = get_api_data(map_drug_url, api_token, test_patient_id)
_ = get_api_data(map_meta_data_url, api_token, test_patient_id)

In [None]:
import urllib.parse


def get_complete_api_data(username, password, patient_id):
    """ Get data, measure and drugs for an specific ABPM test
    
    Parameters:
        username (str): API user name used to get API token
        password (str): API password used to get API token
        patient_id (int): Patient ABPM test ID to pull data from

    Returns:
        dict: JSON data pulled from API, id has the ABPM test ID.
        Data contains the test meta data like start date, night 
        time and other importante data. Measure contains the real ABPM
        measurements. Drugs contain the drugs taken by a patient during
        the ABPM test.
    """
    
    token = get_api_token(auth_url, username, password)
    
    map_data = get_api_data(map_data_url, token, patient_id)
    map_measure = get_api_data(map_measure_url, token, patient_id)
    map_drugs =  get_api_data(map_drug_url, token, patient_id)
    map_meta_data = get_api_data(map_meta_data_url, api_token, test_patient_id)
    
    return {
        "id": patient_id,
        "data": map_data,
        "meta_data": map_meta_data,
        "measure": map_measure,
        "drugs": map_measure,
    }

In [None]:
_ = get_complete_api_data(api_username, api_password, test_patient_id)

## Parallelized API data collection

In [None]:
@ray.remote
def get_complete_api_data_async(username, password, patient_id):
    """ Wrapper for the get_complete_api_data so that it can be ran
    in parallel and asyncrhonusly 
    
    Parameters:
        username (str): API user name used to get API token
        password (str): API password used to get API token
        patient_id (int): Patient ABPM test ID to pull data from

    Returns:
        dict: JSON data pulled from API, id has the ABPM test ID.
        Data contains the test meta data like start date, night 
        time and other importante data. Measure contains the real ABPM
        measurements. Drugs contain the drugs taken by a patient during
        the ABPM test.
    """
    
    try:
        user_data = get_complete_api_data(username, password, patient_id)
    except requests.HTTPError as error:
        with open(api_error_save_path, "a+") as file:
            error_data = f"ERROR -- Patient ID: {patient_id} -- {error.response.status_code} \n"
            file.write(error_data)
            
        return None
    
    return user_data

In [None]:
_ = ray.get(get_complete_api_data_async.remote(api_username, api_password, test_patient_id))

## Stop condition for API data pull

In [None]:
def list_contains_date_grater_than(patient_data_list, end_date):
    """
    
    Parameters:
        patient_data_list (dict): List of ABPM request resonses
        end_date (datetime): The newest date for the revelant data
        we are pretending to get.

    Returns:
        bool: If the end_data is less than any of the start_dates of
        the ABPM return True so that pulling data ends. Other way
        return False so that data pull continues.
    """
    
    for map_data in patient_data_list:   
        if map_data and map_data.get('data'):  
            start_date = map_data['data'][0]['fecha_dt'].split(' ')[0]
            start_date = datetime.datetime.strptime(start_date, '%Y-%m-%d')
        else:
            start_date = datetime.datetime.strptime("1900-1-1", '%Y-%m-%d')
                   
        if end_date < start_date:
            return True
        
    return False

In [None]:
# Test the list_contains_date_grater_than function

assert list_contains_date_grater_than(
    [{
        'data': [
            {
                'fecha_dt': "2015-1-1 03:55:21"
            }
        ]
    }],     
    
    datetime.datetime.strptime("2011-1-1", '%Y-%m-%d')
) == True

In [None]:
# Test the list_contains_date_grater_than function

assert list_contains_date_grater_than(
    [{
        'data': [
            {
                'fecha_dt': "2011-1-1 03:55:21"
            }
        ]
    }],     
    
    datetime.datetime.strptime("2015-1-1", '%Y-%m-%d')
) == False

## Save the data from the API

In [None]:
def pull_api_data(start_patient_id, pull_data_end_date, concurrent_workers, max_consecutive_error):
    """ Get data from API for all patients in the range start_patient_id to the first
    patient_id whos start_date < pull_data_end_date.
    
    Parameters:
        start_patient_id (int): The firts ABPM test ID from where to start pulling data
        pull_data_end_date (datetime): The upper date cap for test to be pulled
        concurrent_workers (int): The number of concurrent requests to be made at a single time
        max_consecutive_error (int): The maximum number of continous errors before stoping the pulling process

    Returns:
        generator: Generates an API response for each of the users pulled at batches of size concurrent_workers
    """
    
    error_count = 0
    finish_pulling = False
    start_pull_index = start_patient_id
        
    while not finish_pulling:
        end_pull_index = start_pull_index + concurrent_workers
        
        futures = [
            get_complete_api_data_async.remote(
                api_username,
                api_password,
                patient_id
            ) for patient_id in range(start_pull_index, end_pull_index)
        ]
        
        api_data = ray.get(futures)
        
        finish_pulling = list_contains_date_grater_than(api_data, pull_data_end_date)
        
        if finish_pulling: 
            break
        
        for id, patient_data in enumerate(api_data):
            if patient_data and patient_data.get("data"):
                error_count = 0
                yield patient_data
            else:
                error_count += 1
                
        if max_consecutive_error < error_count:
            finish_pulling = True
            
        start_pull_index = end_pull_index

In [None]:
import json
import time


start_time = time.time()
os.makedirs(api_data_save_path, exist_ok=True)

for index, patient_data in enumerate(pull_api_data(start_patient_id, pull_data_end_date, concurrent_workers, max_consecutive_error)):
    elapsed_time = time.time() - start_time

    with open(os.path.join(api_data_save_path, f"{patient_data.get('id')}.json"), "w+") as file:
        file.write(json.dumps(patient_data))
            
    clear_output(wait=True)
    print(f"Speed: {index / elapsed_time}r/s -- Elapse Time: {elapsed_time}s -- Patient Id: {patient_data.get('id')}")

## API Errors

In [None]:
with open(api_error_save_path, "r+") as file:
    print(file.read())