# 3. What is the most common medication (in MedicationRequest), and what are the top 5 encounter types associated with these medications?

In [None]:
# %load helper.py
import requests
import jwt
import datetime
import json
import fhirpathpy
from flatten_json import flatten
from typing import Optional
from collections import defaultdict
import pandas as pd

from rich import print

# Status bars for long-running cels
from tqdm.notebook import trange, tqdm

class BulkDataFetcher:
    def __init__(
        self,
        base_url: str,
        client_id: str,
        private_key: str,
        key_id: str,
        endpoint: Optional[str] = None,
        session: Optional[str] = None
    ):
        self.base_url = base_url
        self.client_id = client_id
        self.private_key = private_key
        self.key_id = key_id

        self.token = None
        self.token_expire_time = None

        if endpoint is None:
            self.endpoint = "Patient"
        else:
            self.endpoint = endpoint


        if session is None:
            self.session = requests.Session()
        else:
            self.session = session

        r = self.session.get(f'{base_url}/.well-known/smart-configuration')
        smart_config = r.json()
        self.token_endpoint = smart_config['token_endpoint']

        self.resource_types = []
        self.fhir_paths = {}

        # Store raw FHIR resource instances; populated as part of get_dataframes()
        self.resources_by_type = {}


    def get_token(self):
        if self.token and datetime.datetime.now() < self.expire_time:
            # the existing token is still valid so use it
            return self.token

        assertion = jwt.encode({
                'iss': self.client_id,
                'sub': self.client_id,
                'aud': self.token_endpoint,
                'exp': int((datetime.datetime.now() + datetime.timedelta(minutes=5)).timestamp())
        }, self.private_key, algorithm='RS384',
        headers={"kid": key_id})

        r = self.session.post(self.token_endpoint, data={
            'scope': 'system/*.read',
            'grant_type': 'client_credentials',
            'client_assertion_type': 'urn:ietf:params:oauth:client-assertion-type:jwt-bearer',
            'client_assertion': assertion
        })

        token_response = r.json()
        self.token = token_response['access_token']
        self.expire_time = datetime.datetime.now() + datetime.timedelta(seconds=token_response['expires_in'])

        return self.token

    def add_resource_type(self, resource_type: str, fhir_paths = None):
        self.resource_types.append(resource_type)
        if fhir_paths:
            # fhir_paths=[
            #    ("id", "identifier[0].value"),
            #    ("marital_status", "maritalStatus.coding[0].code")
            # ]
            compiled_fhir_paths = [(f[0], fhirpathpy.compile(f[1])) for f in fhir_paths]
            self.fhir_paths[resource_type] = compiled_fhir_paths

    def _invoke_request(self):
        types = ','.join(self.resource_types)
        url = f'{self.base_url}/{self.endpoint}/$export?_type={types}'
        print(f'Fetching from {url}')
        r = self.session.get(url, headers={'Authorization': f'Bearer {self.get_token()}', 'Accept': 'application/fhir+json', 'Prefer': 'respond-async'})

        self.check_url = r.headers['Content-Location']
        return self.check_url

    def _wait_until_ready(self):
        while True:
            r = self.session.get(self.check_url, headers={'Authorization': f'Bearer {self.get_token()}', 'Accept': 'application/fhir+json'})

            # There are three possible options here: http://hl7.org/fhir/uv/bulkdata/export.html#bulk-data-status-request
            # Error = 4xx or 5xx status code
            # In-Progress = 202
            # Complete = 200

            if r.status_code == 200:
                # complete
                response = r.json()
                self.output_files = response['output']
                return self.output_files

            elif r.status_code == 202:
                # in progress
                delay = r.headers['Retry-After']

                sleep(int(delay))

            else:
                raise RuntimeError(r.text)

    def get_dataframes(self):
        self._invoke_request()
        self._wait_until_ready()

        resources_by_type = {}
        self.resources_by_type = {} # Reset store of raw FHIR resources each time this is run

        for output_file in self.output_files:
            download_url = output_file['url']
            resource_type = output_file['type']

            r = self.session.get(download_url, headers={'Authorization': f'Bearer {get_token()}', 'Accept': 'application/fhir+json'})

            ndjson = r.text.strip()

            if resource_type not in resources_by_type:
                resources_by_type[resource_type] = []
                self.resources_by_type[resource_type] = []

            for line in ndjson.split('\n'):
                resource = json.loads(line)

                # Make raw resource instances available for future use
                self.resources_by_type[resource_type].append(resource)

                if resource_type in self.fhir_paths:
                    fhir_paths = self.fhir_paths[resource_type]
                    filtered_resource = {}
                    for f in fhir_paths:
                        fieldname = f[0]
                        func = f[1]
                        filtered_resource[fieldname] = func(resource)

                        if isinstance(filtered_resource[fieldname], list) and len(filtered_resource[fieldname]) == 1:
                            filtered_resource[fieldname] = filtered_resource[fieldname][0]
                    resource = filtered_resource

                resources_by_type[resource_type].append(resource)

        dfs = {}

        for resource_type, resources in resources_by_type.items():
            dfs[resource_type] = pd.json_normalize(list(map(lambda r: flatten(r), resources)))

        return dfs

    def get_example_resource(self, resource_type: str, resource_id: Optional[str] = None):
        if self.resources_by_type is None:
            print("You need to run get_dataframes() first")
            return None

        if resource_type not in self.resources_by_type:
            print(f"{resource_type} not available. Try one of these: {', '.join(self.resources_by_type.keys())}")
            return None

        if resource_id is None:
            return self.resources_by_type[resource_type][0]

        resource = [r for r in self.resources_by_type[resource_type] if r['id'] == resource_id]

        if len(resource) > 0:
            return resource[0]

        print(f"No {resource_type} with id={resource_id} was found.")
        return None

    def reprocess_dataframes(self, fhir_paths):
        return BulkDataFetcher._reprocess_dataframes(self.resources_by_type, fhir_paths)

    @classmethod
    def _reprocess_dataframes(cls, obj_resources_by_type, user_fhir_paths):
        parsed_resources_by_type = defaultdict(list)

        for this_resource_type in obj_resources_by_type.keys():
            if this_resource_type in user_fhir_paths:
                user_fhir_paths[this_resource_type] = [(f[0], fhirpathpy.compile(f[1])) for f in user_fhir_paths[this_resource_type]]
            for resource in obj_resources_by_type[this_resource_type]:
                if this_resource_type in user_fhir_paths:
                    filtered_resource = {}
                    for f in user_fhir_paths[this_resource_type]:
                        fieldname = f[0]
                        func = f[1]
                        filtered_resource[fieldname] = func(resource)

                        if isinstance(filtered_resource[fieldname], list) and len(filtered_resource[fieldname]) == 1:
                            filtered_resource[fieldname] = filtered_resource[fieldname][0]
                    parsed_resources_by_type[this_resource_type].append(filtered_resource)
                else:
                    parsed_resources_by_type[this_resource_type].append(resource)

        dfs = {}

        for t, res in parsed_resources_by_type.items():
            dfs[t] = pd.json_normalize(list(map(lambda r: flatten(r), res)))

        return dfs


class SyntheaDataFetcher:
    def __init__(self, ndjson_file_path):
        self.resources_by_type = {}

        num_lines = sum(1 for line in open(ndjson_file_path,'r'))
        with open(ndjson_file_path, 'r') as file:
            for line in tqdm(file, total=num_lines):
                json_obj = json.loads(line)
                this_resource_type = json_obj['resourceType']
                if this_resource_type not in self.resources_by_type:
                    self.resources_by_type[this_resource_type] = []
                self.resources_by_type[this_resource_type].append(json_obj)

        print("Resources available: ")
        print('\n'.join(['- '+ x for x in self.resources_by_type.keys()]))

    def get_example_resource(self, resource_type: str, resource_id: Optional[str] = None):
        if self.resources_by_type is None:
            print("You need to run get_dataframes() first")
            return None

        if resource_type not in self.resources_by_type:
            print(f"{resource_type} not available. Try one of these: {', '.join(self.resources_by_type.keys())}")
            return None

        if resource_id is None:
            return self.resources_by_type[resource_type][0]

        resource = [r for r in self.resources_by_type[resource_type] if r['id'] == resource_id]

        if len(resource) > 0:
            return resource[0]

        print(f"No {resource_type} with id={resource_id} was found.")
        return None

    def reprocess_dataframes(self, user_fhir_paths):
        return BulkDataFetcher._reprocess_dataframes(self.resources_by_type, user_fhir_paths)


In [4]:
synthea_fetcher = SyntheaDataFetcher('../synthea_40.ndjson')

  0%|          | 0/12498 [00:00<?, ?it/s]

In [5]:
print(synthea_fetcher.get_example_resource("MedicationRequest"))

We will need:

- `encounter.reference` to find the referenced encounter
- `medicationCodeableConcept.coding` for the medication

In [6]:
print(synthea_fetcher.get_example_resource("Encounter"))

We will need:

- `id` for joining with the MedicationRequest data
- `type.coding` to identify the type of encounter

In [8]:
dfs = synthea_fetcher.reprocess_dataframes({
    'MedicationRequest': [
        ('encounter', 'encounter.reference'),
        ('med_code', 'medicationCodeableConcept.coding[0].code'),
        ('med_code_display', 'medicationCodeableConcept.coding[0].display'),
        ('med_code_system', 'medicationCodeableConcept.coding[0].system'),
    ],
    'Encounter': [
        ('id', 'id'),
        ('type_code', 'type.coding[0].code'),
        ('type_code_display', 'type.coding[0].display'),
        ('type_code_system', 'type.coding[0].system'),
    ]
})

df_med = dfs['MedicationRequest']
df_encounter = dfs['Encounter']

In [9]:
df_med

Unnamed: 0,encounter,med_code,med_code_display,med_code_system
0,urn:uuid:09febec4-11a0-41b4-a5ef-5dabf8ffaf3e,429503,Hydrochlorothiazide 12.5 MG,http://www.nlm.nih.gov/research/umls/rxnorm
1,urn:uuid:bff4fa9a-18fb-42d4-910b-60d9934a85c2,313782,Acetaminophen 325 MG Oral Tablet,http://www.nlm.nih.gov/research/umls/rxnorm
2,urn:uuid:05a0507d-34a2-4e72-82b1-43955561f7a1,313782,Acetaminophen 325 MG Oral Tablet,http://www.nlm.nih.gov/research/umls/rxnorm
3,urn:uuid:3879ad6c-0839-48f3-8e17-30d5637f874b,895994,120 ACTUAT Fluticasone propionate 0.044 MG/ACT...,http://www.nlm.nih.gov/research/umls/rxnorm
4,urn:uuid:3879ad6c-0839-48f3-8e17-30d5637f874b,745679,200 ACTUAT Albuterol 0.09 MG/ACTUAT Metered Do...,http://www.nlm.nih.gov/research/umls/rxnorm
...,...,...,...,...
296,urn:uuid:4dea002e-b200-4518-8b9b-c6490828cbfd,316672,Simvistatin 10 MG,http://www.nlm.nih.gov/research/umls/rxnorm
297,urn:uuid:2ee93d04-4d92-4465-b352-705ad9712e11,1803932,Leucovorin 100 MG Injection,http://www.nlm.nih.gov/research/umls/rxnorm
298,urn:uuid:2ee93d04-4d92-4465-b352-705ad9712e11,1736776,10 ML oxaliplatin 5 MG/ML Injection,http://www.nlm.nih.gov/research/umls/rxnorm
299,urn:uuid:1199e19a-5c24-4482-988e-10b07afcf604,316672,Simvistatin 10 MG,http://www.nlm.nih.gov/research/umls/rxnorm


In [10]:
df_encounter

Unnamed: 0,id,type_code,type_code_display,type_code_system
0,09febec4-11a0-41b4-a5ef-5dabf8ffaf3e,162673000,General examination of patient (procedure),http://snomed.info/sct
1,5db2fad0-8954-4619-a4a4-c393ed3cdca5,162673000,General examination of patient (procedure),http://snomed.info/sct
2,17c76e62-cedb-4fcd-b002-2024da136183,230690007,Stroke,http://snomed.info/sct
3,d33011f9-1420-41cd-8606-c63caddecedd,162673000,General examination of patient (procedure),http://snomed.info/sct
4,6c89c472-7eb0-41e8-80bb-81565ad5d976,50849002,Emergency room admission (procedure),http://snomed.info/sct
...,...,...,...,...
1131,3f005de1-d8ac-411c-b419-750a8dd24a41,162673000,General examination of patient (procedure),http://snomed.info/sct
1132,78dad556-5c30-43c3-85f0-52a68ffb8346,185349003,Encounter for 'check-up',http://snomed.info/sct
1133,4f219688-a713-4735-85e7-dde1ea49ac5c,162673000,General examination of patient (procedure),http://snomed.info/sct
1134,c2dd6942-bf2d-46e5-85b9-df63be038da6,162673000,General examination of patient (procedure),http://snomed.info/sct


Get the most frequent medication code:

In [15]:
df_med_count = df_med.groupby(['med_code']).size().reset_index(name='frequency') \
.sort_values(by='frequency', ascending=False).reset_index()

df_med_count

Unnamed: 0,index,med_code,frequency
0,39,316672,100
1,35,313782,20
2,61,849574,8
3,48,746030,7
4,44,562251,7
...,...,...,...
69,41,406022,1
70,26,200243,1
71,29,310436,1
72,31,311700,1


In [18]:
(code, display) = df_med[df_med['med_code'] == df_med_count.loc[0,'med_code']].reset_index().loc[0, ['med_code', 'med_code_display']]

In [19]:
code

'316672'

In [20]:
display

'Simvistatin 10 MG'

Now that we know the most frequent medication, find the encounters associated with this medication.

In [28]:
# First remove the `urn:uuid:` from the beginning of the `encounter` column for merging
df_med['encounter'] = df_med['encounter'].apply(lambda x: x.replace('urn:uuid:', ''))

# Inner join with encounters to get just the encounters associated with medication `316672`
df_encounters_simvistatin = df_encounter.merge(
    right = df_med.loc[df_med['med_code'] == code],
    how='inner',
    left_on='id',
    right_on='encounter'
)

df_encounters_simvistatin.groupby(['type_code', 'type_code_display']).size().reset_index(name='frequency') \
.sort_values(by='frequency', ascending=False).reset_index()

Unnamed: 0,index,type_code,type_code_display,frequency
0,0,390906007,Follow-up encounter,100
