In [1]:
from datetime import datetime
import pandas
from typing import List, Any
import pyspark.sql.functions as F

import query_lib

In [2]:
BASE_DIR='./test_files/'
BASE_PATIENT_URL='http://localhost:8099/openmrs/ws/fhir2/R4/Patient/'
CODE_SYSTEM='http://snomed.info/sct'
patient_query = query_lib.patient_query_factory(
    query_lib.Runner.SPARK, BASE_DIR, CODE_SYSTEM)

In [3]:
patient_query.include_all_other_codes()
# Note the first call to `find_patient_aggregates` starts a local Spark
# cluster, load input files, and flattens observations. These won't be
# done in subsequent calls of this function on the same instance.
agg_df = patient_query.find_patient_aggregates(BASE_PATIENT_URL)
agg_df.head(20)

[INDICATORS_LOG 2021-07-08 19:12:10.859752] Number of Patient resources= 1000
[INDICATORS_LOG 2021-07-08 19:12:11.971989] Number of Observation resources= 72869
[INDICATORS_LOG 2021-07-08 19:12:18.358948] Number of flattened obs rows = 55866
[INDICATORS_LOG 2021-07-08 19:12:23.177770] Number of aggregated obs= 11524
[INDICATORS_LOG 2021-07-08 19:12:26.420513] Number of joined patient_agg_obs= 11524


Unnamed: 0,patientId,birthDate,gender,code,valueCode,num_obs,min_value,max_value,min_date,max_date,min_date_value,max_date_value,min_date_value_code,max_date_value_code
0,0524d713-4d85-44d8-8e32-b042b44cf7f1,1979-12-16,female,106230009,395098000,4,,,2019-05-10T09:13:47+00:00,2019-12-10T09:36:46+00:00,2019-05-10T09:13:47+00:00_SeP_None,2019-12-10T09:36:46+00:00_SeP_None,2019-05-10T09:13:47+00:00_SeP_395098000,2019-12-10T09:36:46+00:00_SeP_395098000
1,0a35552c-f4f7-4930-81a4-60d85ffcd6ba,1972-03-09,female,271649006,,4,23.0,167.0,2019-03-29T08:57:19+00:00,2020-10-29T08:22:19+00:00,2019-03-29T08:57:19+00:00_SeP_167.0000,2020-10-29T08:22:19+00:00_SeP_166.0000,2019-03-29T08:57:19+00:00_SeP_None,2020-10-29T08:22:19+00:00_SeP_None
2,1255ef83-96cc-4921-9f6c-b75a48b5ada6,1989-03-10,female,271650006,,1,116.0,116.0,2020-04-08T07:58:54+00:00,2020-04-08T07:58:54+00:00,2020-04-08T07:58:54+00:00_SeP_116.0000,2020-04-08T07:58:54+00:00_SeP_116.0000,2020-04-08T07:58:54+00:00_SeP_None,2020-04-08T07:58:54+00:00_SeP_None
3,1a5e83c9-f421-442d-b567-bec1a0beef8e,1945-11-20,male,431314004,,2,38.0,56.0,2019-11-13T08:38:56+00:00,2020-01-29T08:28:56+00:00,2019-11-13T08:38:56+00:00_SeP_56.0000,2020-01-29T08:28:56+00:00_SeP_38.0000,2019-11-13T08:38:56+00:00_SeP_None,2020-01-29T08:28:56+00:00_SeP_None
4,1c69247f-cfc8-401d-8698-5cc21b55d727,1998-02-22,female,106230009,159392AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA,3,,,2019-02-08T08:55:41+00:00,2020-07-12T09:19:41+00:00,2019-02-08T08:55:41+00:00_SeP_None,2020-07-12T09:19:41+00:00_SeP_None,2019-02-08T08:55:41+00:00_SeP_159392AAAAAAAAAA...,2020-07-12T09:19:41+00:00_SeP_159392AAAAAAAAAA...
5,1f419440-399c-4c15-a106-b6a2d5d92f7b,1993-11-04,male,431314004,,6,13.0,85.0,2019-04-15T08:20:41+00:00,2020-11-20T08:39:42+00:00,2019-04-15T08:20:41+00:00_SeP_29.0000,2020-11-20T08:39:42+00:00_SeP_13.0000,2019-04-15T08:20:41+00:00_SeP_None,2020-11-20T08:39:42+00:00_SeP_None
6,1f47fa55-4ff3-4c4f-996e-85fd57885e4c,1936-10-17,male,78564009,,6,2.0,228.0,2019-04-04T08:03:41+00:00,2020-06-27T08:27:42+00:00,2019-04-04T08:03:41+00:00_SeP_228.0000,2020-06-27T08:27:42+00:00_SeP_2.0000,2019-04-04T08:03:41+00:00_SeP_None,2020-06-27T08:27:42+00:00_SeP_None
7,21e17e5e-f3d2-4771-8799-ad5f41724e1f,1974-09-15,male,106230009,159392,4,,,2019-04-27T08:54:39+00:00,2020-01-27T09:13:39+00:00,2019-04-27T08:54:39+00:00_SeP_None,2020-01-27T09:13:39+00:00_SeP_None,2019-04-27T08:54:39+00:00_SeP_159392,2020-01-27T09:13:39+00:00_SeP_159392
8,2228d8b0-f6de-447f-872f-098050d479f3,1934-06-15,female,106230009,159393AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA,5,,,2019-08-23T09:48:49+00:00,2020-10-19T09:41:49+00:00,2019-08-23T09:48:49+00:00_SeP_None,2020-10-19T09:41:49+00:00_SeP_None,2019-08-23T09:48:49+00:00_SeP_159393AAAAAAAAAA...,2020-10-19T09:41:49+00:00_SeP_159393AAAAAAAAAA...
9,275df0f0-5fe4-42eb-9d5d-e88ee558b6fb,1936-10-03,male,106230009,395098000,5,,,2019-07-22T09:41:43+00:00,2020-10-03T09:08:42+00:00,2019-07-22T09:41:43+00:00_SeP_None,2020-10-03T09:08:42+00:00_SeP_None,2019-07-22T09:41:43+00:00_SeP_395098000,2020-10-03T09:08:42+00:00_SeP_395098000


In [57]:
agg_df[(~agg_df['valueCode'].isnull())][
    ['patientId', 'code', 'valueCode', 'min_date', 'min_date_value_code']].head()

Unnamed: 0,patientId,code,valueCode,min_date,min_date_value_code
0,0524d713-4d85-44d8-8e32-b042b44cf7f1,106230009,395098000,2019-05-10T09:13:47+00:00,2019-05-10T09:13:47+00:00_SeP_395098000
4,1c69247f-cfc8-401d-8698-5cc21b55d727,106230009,159392AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA,2019-02-08T08:55:41+00:00,2019-02-08T08:55:41+00:00_SeP_159392AAAAAAAAAA...
7,21e17e5e-f3d2-4771-8799-ad5f41724e1f,106230009,159392,2019-04-27T08:54:39+00:00,2019-04-27T08:54:39+00:00_SeP_159392
8,2228d8b0-f6de-447f-872f-098050d479f3,106230009,159393AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA,2019-08-23T09:48:49+00:00,2019-08-23T09:48:49+00:00_SeP_159393AAAAAAAAAA...
9,275df0f0-5fe4-42eb-9d5d-e88ee558b6fb,106230009,395098000,2019-07-22T09:41:43+00:00,2019-07-22T09:41:43+00:00_SeP_395098000


In [4]:
obs = patient_query._obs_df
#obs.withColumn('coding', F.explode('code.coding')).head()
#obs.withColumn('coding', F.explode('code.coding')).filter(F.col('coding.system').isNull()).head()
obs.withColumn('coding', F.explode('code.coding')).filter('coding.system IS NULL').head()

Row(id='http://localhost:8099/openmrs/ws/fhir2/R4/Observation/cabe475f-b72e-4d75-bccc-2f0b01594f51', meta=None, implicitRules=None, language=None, text=None, contained=None, identifier=None, basedOn=None, status='final', category=[Row(id=None, coding=[Row(id=None, system='http://terminology.hl7.org/CodeSystem/observation-category', version=None, code='exam', display='Exam', userSelected=None)], text=None)], code=Row(id=None, coding=[Row(id=None, system=None, version=None, code='5088AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA', display='Temperature (C)', userSelected=None), Row(id=None, system='http://loinc.org', version=None, code='8310-5', display='Temperature (C)', userSelected=None), Row(id=None, system='https://openconceptlab.org/orgs/CIEL/sources/CIEL', version=None, code='5088', display='Temperature (C)', userSelected=None)], text=None), subject=Row(DeviceId=None, GroupId=None, LocationId=None, PatientId='b1d8fbb7-1a31-46f0-863d-8e171071849c', id=None, reference='Patient/b1d8fbb7-1a31-46f0-

In [29]:
obs.withColumn('coding', F.explode('code.coding')).where(
    'coding.code = "5090AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"').head()

Row(id='http://localhost:8099/openmrs/ws/fhir2/R4/Observation/cac76831-2e26-4ffb-9af1-4fec365c677f', meta=None, implicitRules=None, language=None, text=None, contained=None, identifier=None, basedOn=None, status='final', category=[Row(id=None, coding=[Row(id=None, system='http://terminology.hl7.org/CodeSystem/observation-category', version=None, code='exam', display='Exam', userSelected=None)], text=None)], code=Row(id=None, coding=[Row(id=None, system=None, version=None, code='5090AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA', display='Height (cm)', userSelected=None), Row(id=None, system='http://loinc.org', version=None, code='8302-2', display='Height (cm)', userSelected=None), Row(id=None, system='http://snomed.info/sct', version=None, code='50373000', display='Height (cm)', userSelected=None), Row(id=None, system='https://openconceptlab.org/orgs/CIEL/sources/CIEL', version=None, code='5090', display='Height (cm)', userSelected=None)], text=None), subject=Row(DeviceId=None, GroupId=None, Locati

In [5]:
patient_query._flat_obs.head()

Row(coding=Row(id=None, system='http://snomed.info/sct', version=None, code='106230009', display='Diagnosis certainty', userSelected=None), valueCoding=Row(id=None, system=None, version=None, code='159392AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA', display='Confirmed diagnosis', userSelected=None), value=Row(quantity=None, codeableConcept=Row(id=None, coding=[Row(id=None, system=None, version=None, code='159392AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA', display='Confirmed diagnosis', userSelected=None), Row(id=None, system='https://openconceptlab.org/orgs/CIEL/sources/CIEL', version=None, code='159392', display='Confirmed diagnosis', userSelected=None), Row(id=None, system='http://snomed.info/sct', version=None, code='395098000', display='Confirmed diagnosis', userSelected=None)], text=None), string=None, boolean=None, range=None, ratio=None, sampledData=None, attachment=None, time=None, dateTime=None, period=None), patientId='7498171c-27f3-44f3-a833-adebec21b719', dateTime='2019-02-27T09:35:23+00:00', dateA

In [32]:
VL_CODE = '50373000'
agg_df[(agg_df['code'] == VL_CODE)]

Unnamed: 0,patientId,birthDate,gender,code,valueCode,num_obs,min_value,max_value,min_date,max_date,min_date_value,max_date_value,min_date_value_code,max_date_value_code
11,2ffbc136-5a1c-4314-90e0-3bfffe9b9e5e,1938-12-18,female,50373000,,9,33.0000,206.0000,2019-01-09T08:08:44+00:00,2020-10-10T08:27:45+00:00,2019-01-09T08:08:44+00:00_SeP_33.0000,2020-10-10T08:27:45+00:00_SeP_67.0000,2019-01-09T08:08:44+00:00_SeP_None,2020-10-10T08:27:45+00:00_SeP_None
47,ba4a9540-11b0-43ec-bcfe-5988b1dd7965,1971-01-18,male,50373000,,9,12.0000,209.0000,2019-09-13T08:06:23+00:00,2020-12-08T08:11:22+00:00,2019-09-13T08:06:23+00:00_SeP_74.0000,2020-12-08T08:11:22+00:00_SeP_39.0000,2019-09-13T08:06:23+00:00_SeP_None,2020-12-08T08:11:22+00:00_SeP_None
54,e094f2c1-893d-457b-b098-dd1ab151bad6,1941-12-04,female,50373000,,10,36.0000,227.0000,2018-12-29T08:01:00+00:00,2020-05-29T07:52:58+00:00,2018-12-29T08:01:00+00:00_SeP_88.0000,2020-05-29T07:52:58+00:00_SeP_103.0000,2018-12-29T08:01:00+00:00_SeP_None,2020-05-29T07:52:58+00:00_SeP_None
55,e1dddb6f-a90e-407e-bcb2-6a200ac57f94,1997-11-13,female,50373000,,6,16.0000,218.0000,2019-08-03T07:51:31+00:00,2020-09-03T08:06:31+00:00,2019-08-03T07:51:31+00:00_SeP_213.0000,2020-09-03T08:06:31+00:00_SeP_218.0000,2019-08-03T07:51:31+00:00_SeP_None,2020-09-03T08:06:31+00:00_SeP_None
56,e2557413-13ef-476e-a858-5fcce5fe55c3,1983-12-07,male,50373000,,10,26.0000,191.0000,2019-01-17T08:44:29+00:00,2020-12-09T08:18:28+00:00,2019-01-17T08:44:29+00:00_SeP_80.0000,2020-12-09T08:18:28+00:00_SeP_67.0000,2019-01-17T08:44:29+00:00_SeP_None,2020-12-09T08:18:28+00:00_SeP_None
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
11441,9a362c60-e6ce-450b-b000-68ceba94250a,1949-08-14,female,50373000,,6,36.0000,190.0000,2019-02-01T09:02:20+00:00,2020-08-20T08:18:21+00:00,2019-02-01T09:02:20+00:00_SeP_45.0000,2020-08-20T08:18:21+00:00_SeP_36.0000,2019-02-01T09:02:20+00:00_SeP_None,2020-08-20T08:18:21+00:00_SeP_None
11445,a8b875cc-fd9b-4205-b502-b6a0e63c0d75,1952-07-18,male,50373000,,8,34.0000,188.0000,2018-12-25T08:11:52+00:00,2020-12-01T08:38:52+00:00,2018-12-25T08:11:52+00:00_SeP_187.0000,2020-12-01T08:38:52+00:00_SeP_139.0000,2018-12-25T08:11:52+00:00_SeP_None,2020-12-01T08:38:52+00:00_SeP_None
11460,dbced016-af16-4e5f-8200-ac69925bb48e,1945-05-04,female,50373000,,6,31.0000,205.0000,2019-03-02T07:46:51+00:00,2020-08-04T08:08:50+00:00,2019-03-02T07:46:51+00:00_SeP_205.0000,2020-08-04T08:08:50+00:00_SeP_46.0000,2019-03-02T07:46:51+00:00_SeP_None,2020-08-04T08:08:50+00:00_SeP_None
11469,07e1ed01-200e-4bb5-8a2c-073e2b4a9e34,1961-05-24,female,50373000,,5,153.0000,216.0000,2018-12-24T08:56:18+00:00,2020-03-26T08:32:18+00:00,2018-12-24T08:56:18+00:00_SeP_181.0000,2020-03-26T08:32:18+00:00_SeP_182.0000,2018-12-24T08:56:18+00:00_SeP_None,2020-03-26T08:32:18+00:00_SeP_None


In [44]:
def _find_age_band(birth_date: str, end_date: datetime) -> str:
  """Given the birth date, finds the age_band for PEPFAR disaggregation."""
  age = None
  try:
    # TODO handle all different formats (issues #174)
    birth = datetime.strptime(birth_date, '%Y-%m-%d')
    age = int((end_date - birth).days / 365.25)
  except Exception as e:
    common.custom_log('Invalid birth_date format: {}'.format(e))
    age = 999999

  if age == 999999:
    return 'ERROR'
  if age < 1:
    return '0-1'
  if age <= 4:
    return '1-4'
  if age <= 9:
    return '5-9'
  if age <= 14:
    return '10-14'
  if age <= 19:
    return '15-19'
  if age <= 24:
    return '20-24'
  if age <= 49:
    return '25-49'
  return '50+'


def _agg_buckets(birth_date: str, gender: str, end_date: datetime) -> List[str]:
  """Generates the list of all PEPFAR disaggregation buckets."""
  age_band = _find_age_band(birth_date, end_date)
  return [age_band + '_' + gender, 'ALL-AGES_' + gender,
          age_band + '_ALL-GENDERS', 'ALL-AGES_ALL-GENDERS']

def calc_TX_PVLS(patient_agg_obs: pandas.DataFrame, VL_code: str,
    failure_threshold: int, end_date_str: str = None) -> pandas.DataFrame:
  """Calculates TX_PVLS indicator with its corresponding disaggregations.

  Args:
    patient_agg_obs: An output from `patient_query.find_patient_aggregates()`.
    VL_code: The code for viral load values.
    failure_threshold: VL count threshold of failure.
    end_date: The string representation of the last date as 'YYYY-MM-DD'.
  Returns:
    The aggregated DataFrame.
  """
  end_date = datetime.today()
  if end_date_str:
    end_date = datetime.strptime(end_date_str, '%Y-%m-%d')
  temp_df = patient_agg_obs[(patient_agg_obs['code'] == VL_code)].copy()
  # Note the above copy is used to avoid setting a new column on a slice next:
  temp_df['sup_VL'] = (temp_df['max_value'] < failure_threshold)
  temp_df['buckets'] = temp_df.apply(
      lambda x: _agg_buckets(x.birthDate, x.gender, end_date), axis=1)
  temp_df_exp = temp_df.explode('buckets')
  temp_df_exp = temp_df_exp.groupby(['sup_VL', 'buckets'], as_index=False)\
      .count()[['sup_VL', 'buckets', 'patientId']]\
      .rename(columns={'patientId': 'count'})
  # calculate ratio
  num_patients = len(temp_df.index)
  temp_df_exp['ratio'] = temp_df_exp['count']/num_patients
  return temp_df_exp
    
VL_CODE = '50373000'
calc_TX_PVLS(agg_df, VL_CODE, 150, end_date_str='2020-12-30')

Unnamed: 0,sup_VL,buckets,count,ratio
0,False,1-4_ALL-GENDERS,28,0.030668
1,False,1-4_female,17,0.01862
2,False,1-4_male,11,0.012048
3,False,10-14_ALL-GENDERS,35,0.038335
4,False,10-14_female,16,0.017525
5,False,10-14_male,19,0.020811
6,False,15-19_ALL-GENDERS,36,0.03943
7,False,15-19_female,21,0.023001
8,False,15-19_male,15,0.016429
9,False,20-24_ALL-GENDERS,45,0.049288
