In [77]:
import aiohttp
import asyncio
import pandas as pd
import csv
from typing import Any, Dict, List, Tuple
import os

In [78]:
async def extract(url: str) -> Dict[str, Any]:
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            response.raise_for_status()
            return await response.json()

def get_patients_url(zipcode: str) -> str:
    return f"https://hapi.fhir.org/baseR5/Patient?address-postalcode={zipcode}"

def get_observation_url(patient_id: str):
    return f"https://hapi.fhir.org/baseR5/Observation?subject=patient%2F{patient_id}"

def write_to_csv(filename: str, field_names: List[str], data: Dict[str, Any]) -> None:
    # newline set to '' to handle situation described in https://stackoverflow.com/a/77161460
    with open(filename, 'w', newline='') as csvfile:    
        writer = csv.DictWriter(csvfile, fieldnames=field_names)
        writer.writeheader()
        for row in data:
            writer.writerow(row)

def create_folder_if_not_exists(folder_path):
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)

In [103]:
def patient_to_csv(patient: Dict[str, Any]) -> Tuple[bool, Dict[str, Any]]:
    if "resource" not in patient: return (False, None)
    resource = patient["resource"]
    if not (val := resource.get("id", None)): return (False, None)
    if not (val := resource.get("gender", None)): return (False, None)
    if not (val := resource.get("birthDate", None)): return (False, None)
    name = resource.get("name", None)
    if not name: return (False, None)
    first_name = name[0].get("given", None)
    if not first_name or not first_name[0]: return (False, None)
    return True, {
        "id": resource["id"],
        "first_name": resource["name"][0]["given"][0],
        "gender": resource["gender"],
        "birth_date": resource["birthDate"] 
    }

async def generate_patients(zipcode: str) -> None:
    raw_patients = await extract(get_patients_url(zipcode))
    if "entry" not in raw_patients or not raw_patients["entry"]:
        print("No patients found")
        return
    field_names = ["id","first_name","gender","birth_date"]
    data = []
    for patient in raw_patients["entry"]:
        is_valid, record = patient_to_csv(patient)
        if not is_valid:
            print(f"invalid patient: {patient}")
            continue
        data.append(record)
    if not data:
        print("No valid patients found")
        return        
    write_to_csv(f"{zipcode}/patients.csv", field_names, data)

def observation_to_csv(observation: Dict[str, Any]) -> Tuple[bool, Dict[str, Any]]:
    if "resource" not in observation: return (False, None)
    resource = observation["resource"]
    if not (val := resource.get("id", None)): return (False, None)
    if not (val := resource.get("resourceType", None)): return (False, None)
    if not (val := resource.get("status", None)): return (False, None)
    return True, {
        "id": resource["id"],
        "resource_type": resource["resourceType"],
        "status": resource["status"] 
    }

async def generate_observation(zipcode: str, patient_id: str) -> None:
    raw_observation = await extract(get_observation_url(patient_id))
    if "entry" not in raw_observation or not raw_observation["entry"]:
        print("No observations found")
        return
    field_names = ["id","resource_type","status","patient_id"]
    entry = raw_observation["entry"][0]
    is_valid, record = observation_to_csv(entry)
    if not is_valid:
        print(f"invalid observation: {observation}")
        return
    record["patient_id"] = patient_id
    write_to_csv(f"{zipcode}/observation-{patient_id}.csv", field_names, [record])

async def generate_observations(zipcode: str) -> None:
    patients = []
    rate_limit = 5
    with open(f"{zipcode}/patients.csv", mode ="r")as file:
        csvFile = csv.reader(file)
        for patient in csvFile:
            patients.append(patient[0])
            
    while patients:
        tasks = []
        for _ in range(min(rate_limit, len(patients))):
            patient = patients.pop()
            tasks.append(asyncio.create_task(generate_observation(zipcode, patient)))
        await asyncio.gather(*tasks)
"""
    import asyncio

async def some_async_function(i):
    await asyncio.sleep(1)
    print(f"Task {i} done")

async def main():
    tasks = []
    for i in range(5):
        tasks.append(asyncio.create_task(some_async_function(i)))

    # Wait for all tasks to complete
    await asyncio.gather(*tasks)

"""

async def generate_for_zipcode(zipcode: str = "02718") -> None:
    create_folder_if_not_exists(zipcode)
    await(generate_patients(zipcode))
    await(generate_observations(zipcode))

In [104]:
await generate_for_zipcode()

No observations found
No observations found
No observations found
No observations found
No observations found
No observations found
No observations found
No observations found
No observations found
No observations found
No observations found
No observations found
No observations found
No observations found
No observations found
No observations found


In [None]:
raw_observations = { patient_id: await extract(get_observation_url(patient_id)) for patient_id in patient_ids }
raw_observations

In [74]:
with open("observations.json", "w") as outfile:
    json.dump(raw_observations, outfile)

In [38]:
newline
#raw_patients = pd.json_normalize(parsed_data)
#pd.json_normalize(parsed_data["link"])

Unnamed: 0,relation,url
0,self,https://hapi.fhir.org/baseR5/Patient?address-p...


In [25]:
raw_data.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1 entries, 0 to 0
Data columns (total 7 columns):
 #   Column            Non-Null Count  Dtype 
---  ------            --------------  ----- 
 0   resourceType      1 non-null      object
 1   id                1 non-null      object
 2   type              1 non-null      object
 3   total             1 non-null      int64 
 4   link              1 non-null      object
 5   entry             1 non-null      object
 6   meta.lastUpdated  1 non-null      object
dtypes: int64(1), object(6)
memory usage: 188.0+ bytes


In [34]:
type(raw_data.iloc[0, 5]) 

list

In [51]:

ids


['425',
 '1002',
 '1003',
 '1052',
 '1053',
 '1554',
 '1555',
 '4965',
 '751247',
 '751252',
 '109eee94-76b6-428d-bc08-8be8a58eb166',
 '758488',
 '758489',
 '758493',
 '759495',
 '762136']