# Generation

In [10]:
# import libraries

import sys
import os
import json
import math

import pandas as pd

notebook_dir = os.path.abspath("..")
src_path = os.path.abspath(os.path.join(notebook_dir, ".."))
sys.path.append(src_path)

from questionnaires.answering.personal import personal_answer
from questionnaires.answering.lifestyle import lifestyle_answer
from questionnaires.answering.health import health_answer
from questionnaires.answering.family import family_answer

In [11]:
# load applicants data

data_folder = '../../../data/applicants'

with open(f"{data_folder}/applicants.json", 'r', encoding='utf-8') as f:
    applicants = json.load(f)

In [12]:
# regional indicators

indicators = [
    'mortality',
    'morbidity',
    'healthcare',
    'lifestyle',
    'education',
    'socioeconomic',
    'environment',
    'infrastructure',
    'security'
]

In [13]:
def collect_geo_indicators(municipality : str) -> list[str]:
    '''
    Fetches the main risk indicators from a municipality
    '''
    demographics = {}
    for indicator in indicators:
        content = pd.read_csv(f"../../../data/demographics/processed/{indicator}.csv", encoding='utf-8')
        if municipality in content['municipality'].values:
            municipality_data = content[content['municipality'] == municipality].iloc[0]
            for column in municipality_data.index[1:]:
                value = municipality_data[column]
                if value in ['very low', 'low', 'high', 'very high']:
                    demographics[column] = value
    return demographics

In [14]:
# load electronic health records data

data_folder = '../../../data/health_records'

patients = pd.read_csv(f"{data_folder}/patients.csv")

allergies = pd.read_csv(f"{data_folder}/allergies.csv")
careplans = pd.read_csv(f"{data_folder}/careplans.csv")
conditions = pd.read_csv(f"{data_folder}/conditions.csv")
devices = pd.read_csv(f"{data_folder}/devices.csv")
encounters = pd.read_csv(f"{data_folder}/encounters.csv")
imagings = pd.read_csv(f"{data_folder}/imagings.csv")
immunizations = pd.read_csv(f"{data_folder}/immunizations.csv")
medications = pd.read_csv(f"{data_folder}/medications.csv")
observations = pd.read_csv(f"{data_folder}/observations.csv")
procedures = pd.read_csv(f"{data_folder}/procedures.csv")

In [15]:
def clean_nans(obj : dict | list | float) -> dict | list | str:
    '''
    Replaces all NaN values with an empty string
    '''
    if isinstance(obj, dict):
        return {k: clean_nans(v) for k, v in obj.items()}
    elif isinstance(obj, list):
        return [clean_nans(item) for item in obj]
    elif isinstance(obj, float) and math.isnan(obj):
        return ''
    else:
        return obj

In [16]:
def get_health_records(patient_id : str) -> dict:
    '''
    Gather all health records for a given patient
    '''
    health_record = {}
    health_record['patient'] = patients[patients['Id'] == patient_id][['Id', 'BIRTHDATE', 'MARITAL', 'RACE', 'ETHNICITY', 'GENDER']]\
        .to_dict(orient='records')[0]
    health_record['allergies'] = allergies[allergies['PATIENT'] == patient_id]['DESCRIPTION'].unique().tolist()
    health_record['careplans'] = careplans[careplans['PATIENT'] == patient_id]['DESCRIPTION'].unique().tolist()
    health_record['conditions'] = conditions[conditions['PATIENT'] == patient_id]['DESCRIPTION'].unique().tolist()
    health_record['devices'] = devices[devices['PATIENT'] == patient_id]['DESCRIPTION'].unique().tolist()
    health_record['encounters'] = encounters[encounters['PATIENT'] == patient_id]['DESCRIPTION'].unique().tolist()
    pat_imagings = imagings[imagings['PATIENT'] == patient_id][['BODYSITE_DESCRIPTION', 'MODALITY_DESCRIPTION']]
    health_record['imagings'] = [
        f"{modality}: {bodysite}" for modality, bodysite in zip(pat_imagings['MODALITY_DESCRIPTION'], pat_imagings['BODYSITE_DESCRIPTION'])
    ]
    health_record['immunizations'] = immunizations[immunizations['PATIENT'] == patient_id]['DESCRIPTION'].tolist()
    health_record['medications'] = medications[medications['PATIENT'] == patient_id]['DESCRIPTION'].tolist()
    health_record['procedures'] = procedures[procedures['PATIENT'] == patient_id]['DESCRIPTION'].tolist()
    sections = ['allergies', 'careplans', 'conditions', 'devices', 'encounters', 'imagings', 'immunizations', 'medications', 'procedures']
    records = []
    for section in sections:
        records += health_record[section]
    health_record['records'] = records
    health_record['observations'] = observations[observations['PATIENT'] == patient_id][['DESCRIPTION', 'VALUE', 'UNITS']]\
        .values.tolist()
    return clean_nans(health_record)

In [17]:
# questionnaire sections

categories = [
    'personal',
    'lifestyle',
    'family',
    'health'
]

In [18]:
# load risks

data_folder = '../../../data/questionnaires'

risks = {}
for category in categories:
    with open(f"{data_folder}/questions/{category}.json", 'r') as f:
        risks[category] = json.load(f)

In [19]:
# retrieve geographical indicators and health records for each applicant

for id, applicant in applicants.items():
    applicant['demographics'] = collect_geo_indicators(applicant['municipality'])
    applicant['health_records'] = get_health_records(id)

In [20]:
def category_answers(category : str, applicant : dict) -> list[dict]:
    '''
    Obtains the answers from the applicant to the questions of a certain category
    '''
    answers = {}
    for factor in risks[category].keys():
        answer = 0
        if category == 'personal':
            answer = personal_answer(applicant, factor)
        elif category == 'lifestyle':
            answer = lifestyle_answer(applicant, factor)
        elif category == 'health':
            answer = health_answer(applicant, factor)
        elif category == 'family':
            answer = family_answer(applicant, factor)
        answers[factor] = answer
    return answers

In [21]:
# generate all answers for each category

dataset = {}
for id, applicant in applicants.items():
    all_answers = {}
    for category in categories:
        answers = category_answers(category, applicant)
        all_answers.update(answers)
    dataset[id] = all_answers

In [22]:
# save all applicant answers

data_folder = '../../../data/applicants'

with open(f"{data_folder}/answers.json", 'w') as file:
    json.dump(dataset, file, indent=4, ensure_ascii=False)