In [1]:
from datetime import datetime
import pandas
from typing import List, Any
import pyspark.sql.functions as F

import query_lib
import indicator_lib

In [48]:
BASE_DIR='./test_files/'
CODE_SYSTEM='http://snomed.info/sct'

patient_query = query_lib.patient_query_factory(
    query_lib.Runner.SPARK, BASE_DIR, CODE_SYSTEM)

In [49]:
_VL_CODE = '50373000'  # Height
_ARV_PLAN = '106230009'  # Diagnosis certainty; uuid 159394AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
_DRUG1 = '410596003'  # From "diagnosis certainty" values
_DRUG2 = '395098000'  # From "diagnosis certainty" values
end_date='2020-12-30'
start_date='2020-12-01'
old_start_date='2020-10-01'
BASE_PATIENT_URL='http://localhost:8099/openmrs/ws/fhir2/R4/Patient/'

patient_query.include_obs_values_in_time_range(
    _VL_CODE, min_time=old_start_date, max_time=end_date)
patient_query.include_obs_values_in_time_range(
    _ARV_PLAN, min_time=old_start_date, max_time=end_date)
patient_query.include_all_other_codes(min_time=start_date, max_time=end_date)
# Note the first call to `find_patient_aggregates` starts a local Spark
# cluster, load input files, and flattens observations. These won't be
# done in subsequent calls of this function on the same instance.
# Also same cluster will be reused for other instances of `PatientQuery`.
agg_df = patient_query.find_patient_aggregates(BASE_PATIENT_URL)
agg_df.head(20)

[INDICATORS_LOG 2021-07-13 20:06:39.308425] Number of Patient resources= 1000
[INDICATORS_LOG 2021-07-13 20:06:39.583153] Number of Observation resources= 72869
[INDICATORS_LOG 2021-07-13 20:06:40.521911] Number of flattened obs rows = 41288
[INDICATORS_LOG 2021-07-13 20:06:41.687035] Number of aggregated obs= 988
[INDICATORS_LOG 2021-07-13 20:06:42.235419] Number of joined patient_agg_obs= 988


Unnamed: 0,patientId,birthDate,gender,code,num_obs,min_value,max_value,min_date,max_date,first_value,last_value,first_value_code,last_value_code
0,0ade630a-f6f2-43e5-9020-8be3a4f26068,1970-10-18,male,106230009,1,,,2020-11-19T08:54:07+00:00,2020-11-19T08:54:07+00:00,,,395098000.0,395098000.0
1,2ffbc136-5a1c-4314-90e0-3bfffe9b9e5e,1938-12-18,female,50373000,1,67.0,67.0,2020-10-10T08:27:45+00:00,2020-10-10T08:27:45+00:00,67.0,67.0,,
2,6a97783c-0eec-480f-8202-b1bfea0f3b6d,1995-11-17,male,78564009,1,122.0,122.0,2020-12-07T08:43:26+00:00,2020-12-07T08:43:26+00:00,122.0,122.0,,
3,9b3bad0a-d650-4667-bb2c-78b599212e32,2008-04-19,female,27113001,1,70.0,70.0,2020-12-05T08:36:19+00:00,2020-12-05T08:36:19+00:00,70.0,70.0,,
4,ba4a9540-11b0-43ec-bcfe-5988b1dd7965,1971-01-18,male,50373000,1,39.0,39.0,2020-12-08T08:11:22+00:00,2020-12-08T08:11:22+00:00,39.0,39.0,,
5,e2557413-13ef-476e-a858-5fcce5fe55c3,1983-12-07,male,50373000,1,67.0,67.0,2020-12-09T08:18:28+00:00,2020-12-09T08:18:28+00:00,67.0,67.0,,
6,e5d31544-d92f-42f0-9cb4-8dfeb1bc367c,1961-04-27,female,106230009,2,,,2020-12-07T09:55:01+00:00,2020-12-07T09:55:01+00:00,,,395098000.0,395098000.0
7,35144ce7-0656-4f8f-9464-e265ed946da6,1966-03-25,female,50373000,2,187.0,201.0,2020-10-18T08:59:00+00:00,2020-11-18T08:49:59+00:00,201.0,187.0,,
8,59f04a88-84c7-4abc-8eb9-4e7e61bb0d08,2013-06-01,female,50373000,1,103.0,103.0,2020-10-11T08:27:44+00:00,2020-10-11T08:27:44+00:00,103.0,103.0,,
9,8a7ae068-35ce-429d-af3e-79e25a7ccf6a,1998-09-15,female,50373000,1,42.0,42.0,2020-11-18T08:07:59+00:00,2020-11-18T08:07:59+00:00,42.0,42.0,,


In [50]:
agg_df[(agg_df['code'] == '106230009') & (agg_df['min_date'] != agg_df['max_date'])][
    ['patientId', 'code', 'min_date', 'max_date', 'first_value_code', 'last_value_code']].head()

Unnamed: 0,patientId,code,min_date,max_date,first_value_code,last_value_code
18,a9d24c86-aea3-45a9-b15b-7072a84a88db,106230009,2020-10-12T09:13:01+00:00,2020-10-27T09:54:00+00:00,410596003,410596003
20,32c55c0e-de13-437d-bee4-8e99b0671847,106230009,2020-10-11T09:18:35+00:00,2020-12-08T09:23:34+00:00,410596003,410596003
58,8b41e217-65de-416c-ac7d-47eeff9c9860,106230009,2020-10-14T09:29:20+00:00,2020-11-08T09:23:20+00:00,410596003,395098000
71,13362029-6165-4f50-b80b-790d91ec0b17,106230009,2020-10-01T09:31:42+00:00,2020-11-04T09:13:41+00:00,395098000,395098000
73,6d610c28-44c7-40fb-b9b0-d84e52f425db,106230009,2020-11-05T08:58:55+00:00,2020-11-08T09:51:54+00:00,395098000,395098000


# Inspecting underlying Spark data-frames
The _user_ of the library does not need to deal with the underlying distributed query processing system. However, the _developer_ of the library needs an easy way to inspect the internal data of these systems. Here is how:

In [53]:
obs = patient_query._obs_df
#obs.withColumn('coding', F.explode('code.coding')).head()
#obs.withColumn('coding', F.explode('code.coding')).filter(F.col('coding.system').isNull()).head()
obs.withColumn('coding', F.explode('code.coding')).filter('coding.system IS NULL').head()

Row(id='http://localhost:8099/openmrs/ws/fhir2/R4/Observation/cabe475f-b72e-4d75-bccc-2f0b01594f51', meta=None, implicitRules=None, language=None, text=None, contained=None, identifier=None, basedOn=None, status='final', category=[Row(id=None, coding=[Row(id=None, system='http://terminology.hl7.org/CodeSystem/observation-category', version=None, code='exam', display='Exam', userSelected=None)], text=None)], code=Row(id=None, coding=[Row(id=None, system=None, version=None, code='5088AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA', display='Temperature (C)', userSelected=None), Row(id=None, system='http://loinc.org', version=None, code='8310-5', display='Temperature (C)', userSelected=None), Row(id=None, system='https://openconceptlab.org/orgs/CIEL/sources/CIEL', version=None, code='5088', display='Temperature (C)', userSelected=None)], text=None), subject=Row(DeviceId=None, GroupId=None, LocationId=None, PatientId='b1d8fbb7-1a31-46f0-863d-8e171071849c', id=None, reference='Patient/b1d8fbb7-1a31-46f0-

In [54]:
# Finding two different coded values for our fake _ARV_PLAN observation code.
obs.withColumn('coding', F.explode('code.coding')).filter(
    'coding.code = "159394AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"').head(2)

[Row(id='http://localhost:8099/openmrs/ws/fhir2/R4/Observation/cabf1f3f-0fda-4a39-83ac-ed6269cb6e44', meta=None, implicitRules=None, language=None, text=None, contained=None, identifier=None, basedOn=None, status='amended', category=[Row(id=None, coding=[Row(id=None, system='http://terminology.hl7.org/CodeSystem/observation-category', version=None, code='exam', display='Exam', userSelected=None)], text=None)], code=Row(id=None, coding=[Row(id=None, system=None, version=None, code='159394AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA', display='Diagnosis certainty', userSelected=None), Row(id=None, system='https://openconceptlab.org/orgs/CIEL/sources/CIEL', version=None, code='159394', display='Diagnosis certainty', userSelected=None), Row(id=None, system='http://snomed.info/sct', version=None, code='106230009', display='Diagnosis certainty', userSelected=None)], text=None), subject=Row(DeviceId=None, GroupId=None, LocationId=None, PatientId='7498171c-27f3-44f3-a833-adebec21b719', id=None, reference='P

In [55]:
agg_df[(agg_df['code'] == _ARV_PLAN) & agg_df['last_value_code'].isin([_DRUG1, _DRUG2])].head()

Unnamed: 0,patientId,birthDate,gender,code,num_obs,min_value,max_value,min_date,max_date,first_value,last_value,first_value_code,last_value_code
0,0ade630a-f6f2-43e5-9020-8be3a4f26068,1970-10-18,male,106230009,1,,,2020-11-19T08:54:07+00:00,2020-11-19T08:54:07+00:00,,,395098000,395098000
6,e5d31544-d92f-42f0-9cb4-8dfeb1bc367c,1961-04-27,female,106230009,2,,,2020-12-07T09:55:01+00:00,2020-12-07T09:55:01+00:00,,,395098000,395098000
18,a9d24c86-aea3-45a9-b15b-7072a84a88db,1976-02-20,female,106230009,3,,,2020-10-12T09:13:01+00:00,2020-10-27T09:54:00+00:00,,,410596003,410596003
19,3172ae5e-b167-443a-91e6-58da33bfb00b,1990-03-09,male,106230009,2,,,2020-11-19T09:14:46+00:00,2020-11-19T09:14:46+00:00,,,395098000,395098000
20,32c55c0e-de13-437d-bee4-8e99b0671847,1964-04-07,male,106230009,3,,,2020-10-11T09:18:35+00:00,2020-12-08T09:23:34+00:00,,,410596003,410596003


In [56]:
agg_df[(agg_df['code'] == _ARV_PLAN) & agg_df['last_value_code'].isin([_DRUG1, _DRUG2])].index.size

341

In [57]:
agg_df[(agg_df['code'] == _ARV_PLAN) & agg_df['last_value_code'].isin([_DRUG1, _DRUG2])].groupby(
    'patientId').count().index.size

341

In [58]:
indicator_lib.calc_TX_NEW(agg_df, ARV_plan=_ARV_PLAN, start_drug=[_DRUG1], end_date_str=end_date)

Unnamed: 0,TX_NEW,buckets,TX_NEW_count,TX_NEW_ratio
0,False,1-4_ALL-GENDERS,7,0.020528
1,False,1-4_female,3,0.008798
2,False,1-4_male,4,0.01173
3,False,10-14_ALL-GENDERS,8,0.02346
4,False,10-14_female,2,0.005865
5,False,10-14_male,6,0.017595
6,False,15-19_ALL-GENDERS,6,0.017595
7,False,15-19_female,3,0.008798
8,False,15-19_male,3,0.008798
9,False,20-24_ALL-GENDERS,6,0.017595


In [61]:
indicator_lib.calc_TX_PVLS(
    agg_df, VL_code=_VL_CODE, failure_threshold=150,
    end_date_str=end_date)

Unnamed: 0,sup_VL,buckets,sup_VL_count,sup_VL_ratio
0,False,1-4_ALL-GENDERS,6,0.017595
1,False,1-4_female,3,0.008798
2,False,1-4_male,3,0.008798
3,False,10-14_ALL-GENDERS,9,0.026393
4,False,10-14_female,5,0.014663
5,False,10-14_male,4,0.01173
6,False,15-19_ALL-GENDERS,5,0.014663
7,False,15-19_female,4,0.01173
8,False,15-19_male,1,0.002933
9,False,20-24_ALL-GENDERS,7,0.020528


# Indicator library development
This is an example to show how the `indicator_lib.py` functions can be incrementally developed based on the query library DataFrames.

In [45]:
patient_query._flat_obs.head()

Row(coding=Row(id=None, system='http://snomed.info/sct', version=None, code='106230009', display='Diagnosis certainty', userSelected=None), valueCoding=Row(id=None, system=None, version=None, code='159392AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA', display='Confirmed diagnosis', userSelected=None), value=Row(quantity=None, codeableConcept=Row(id=None, coding=[Row(id=None, system=None, version=None, code='159392AAAAAAAAAAAAAAAAAAAAAAAAAAAAAA', display='Confirmed diagnosis', userSelected=None), Row(id=None, system='https://openconceptlab.org/orgs/CIEL/sources/CIEL', version=None, code='159392', display='Confirmed diagnosis', userSelected=None), Row(id=None, system='http://snomed.info/sct', version=None, code='395098000', display='Confirmed diagnosis', userSelected=None)], text=None), string=None, boolean=None, range=None, ratio=None, sampledData=None, attachment=None, time=None, dateTime=None, period=None), patientId='7498171c-27f3-44f3-a833-adebec21b719', dateTime='2019-02-27T09:35:23+00:00', dateA

In [47]:
VL_CODE = '50373000'
agg_df[(agg_df['code'] == VL_CODE)].head()

Unnamed: 0,patientId,birthDate,gender,code,valueCode,num_obs,min_value,max_value,min_date,max_date,min_date_value,max_date_value,min_date_value_code,max_date_value_code
2,ba4a9540-11b0-43ec-bcfe-5988b1dd7965,1971-01-18,male,50373000,,1,39.0,39.0,2020-12-08T08:11:22+00:00,2020-12-08T08:11:22+00:00,2020-12-08T08:11:22+00:00_SeP_39.0000,2020-12-08T08:11:22+00:00_SeP_39.0000,2020-12-08T08:11:22+00:00_SeP_None,2020-12-08T08:11:22+00:00_SeP_None
3,e2557413-13ef-476e-a858-5fcce5fe55c3,1983-12-07,male,50373000,,1,67.0,67.0,2020-12-09T08:18:28+00:00,2020-12-09T08:18:28+00:00,2020-12-09T08:18:28+00:00_SeP_67.0000,2020-12-09T08:18:28+00:00_SeP_67.0000,2020-12-09T08:18:28+00:00_SeP_None,2020-12-09T08:18:28+00:00_SeP_None
4,fa624acb-c26b-427e-9bc4-c9dc38efa586,1979-08-24,female,50373000,,1,121.0,121.0,2020-12-02T08:34:40+00:00,2020-12-02T08:34:40+00:00,2020-12-02T08:34:40+00:00_SeP_121.0000,2020-12-02T08:34:40+00:00_SeP_121.0000,2020-12-02T08:34:40+00:00_SeP_None,2020-12-02T08:34:40+00:00_SeP_None
10,4d99f41d-afb7-46c6-a919-c856279b303a,1958-09-19,male,50373000,,1,15.0,15.0,2020-12-04T08:45:17+00:00,2020-12-04T08:45:17+00:00,2020-12-04T08:45:17+00:00_SeP_15.0000,2020-12-04T08:45:17+00:00_SeP_15.0000,2020-12-04T08:45:17+00:00_SeP_None,2020-12-04T08:45:17+00:00_SeP_None
12,9cc14c03-7547-4f61-b795-8c03b2813920,2010-08-04,male,50373000,,1,179.0,179.0,2020-12-02T07:59:36+00:00,2020-12-02T07:59:36+00:00,2020-12-02T07:59:36+00:00_SeP_179.0000,2020-12-02T07:59:36+00:00_SeP_179.0000,2020-12-02T07:59:36+00:00_SeP_None,2020-12-02T07:59:36+00:00_SeP_None


In [44]:
def _find_age_band(birth_date: str, end_date: datetime) -> str:
  """Given the birth date, finds the age_band for PEPFAR disaggregation."""
  age = None
  try:
    # TODO handle all different formats (issues #174)
    birth = datetime.strptime(birth_date, '%Y-%m-%d')
    age = int((end_date - birth).days / 365.25)
  except Exception as e:
    common.custom_log('Invalid birth_date format: {}'.format(e))
    age = 999999

  if age == 999999:
    return 'ERROR'
  if age < 1:
    return '0-1'
  if age <= 4:
    return '1-4'
  if age <= 9:
    return '5-9'
  if age <= 14:
    return '10-14'
  if age <= 19:
    return '15-19'
  if age <= 24:
    return '20-24'
  if age <= 49:
    return '25-49'
  return '50+'


def _agg_buckets(birth_date: str, gender: str, end_date: datetime) -> List[str]:
  """Generates the list of all PEPFAR disaggregation buckets."""
  age_band = _find_age_band(birth_date, end_date)
  return [age_band + '_' + gender, 'ALL-AGES_' + gender,
          age_band + '_ALL-GENDERS', 'ALL-AGES_ALL-GENDERS']

def calc_TX_PVLS(patient_agg_obs: pandas.DataFrame, VL_code: str,
    failure_threshold: int, end_date_str: str = None) -> pandas.DataFrame:
  """Calculates TX_PVLS indicator with its corresponding disaggregations.

  Args:
    patient_agg_obs: An output from `patient_query.find_patient_aggregates()`.
    VL_code: The code for viral load values.
    failure_threshold: VL count threshold of failure.
    end_date: The string representation of the last date as 'YYYY-MM-DD'.
  Returns:
    The aggregated DataFrame.
  """
  end_date = datetime.today()
  if end_date_str:
    end_date = datetime.strptime(end_date_str, '%Y-%m-%d')
  temp_df = patient_agg_obs[(patient_agg_obs['code'] == VL_code)].copy()
  # Note the above copy is used to avoid setting a new column on a slice next:
  temp_df['sup_VL'] = (temp_df['max_value'] < failure_threshold)
  temp_df['buckets'] = temp_df.apply(
      lambda x: _agg_buckets(x.birthDate, x.gender, end_date), axis=1)
  temp_df_exp = temp_df.explode('buckets')
  temp_df_exp = temp_df_exp.groupby(['sup_VL', 'buckets'], as_index=False)\
      .count()[['sup_VL', 'buckets', 'patientId']]\
      .rename(columns={'patientId': 'count'})
  # calculate ratio
  num_patients = len(temp_df.index)
  temp_df_exp['ratio'] = temp_df_exp['count']/num_patients
  return temp_df_exp
    
VL_CODE = '50373000'
calc_TX_PVLS(agg_df, VL_CODE, 150, end_date_str='2020-12-30')

Unnamed: 0,sup_VL,buckets,count,ratio
0,False,1-4_ALL-GENDERS,28,0.030668
1,False,1-4_female,17,0.01862
2,False,1-4_male,11,0.012048
3,False,10-14_ALL-GENDERS,35,0.038335
4,False,10-14_female,16,0.017525
5,False,10-14_male,19,0.020811
6,False,15-19_ALL-GENDERS,36,0.03943
7,False,15-19_female,21,0.023001
8,False,15-19_male,15,0.016429
9,False,20-24_ALL-GENDERS,45,0.049288
