<img src="https://www.ibm.com/watson/health/ai-stories/assets/images/ibm-watson-health-logo.png" style="float: left; width: 40%; margin-bottom: 0.5em;">

## Building Longitudinal Patient Record with Spark on FHIR

Author: **Nick Kadochnikov** <nkadochn@us.ibm.com>

[Section 1: Set-up cloud object storage function and credentials](#section_1)  
[Section 2: Condition resource](#section_2)  
[Section 3: Observation resource](#section_3)  
[Section 4: Patient resource](#section_4)  
[Section 5: Linking Conditions with Patients](#section_5)  
[Section 6: Creating LPR](#section_6)  
[Section 7: Saving LPR into COS](#section_7)  

In [1]:
import json
import requests
import os
import types
import pandas as pd
import ibm_boto3
import glob
from ibm_botocore.client import Config
from pprint import pprint
import shutil

import ibmos2spark
from pyspark.sql.functions import *
from pyspark.sql.types import *

pd.set_option('display.max_columns', None)
pd.set_option('display.max_colwidth', 500)

Waiting for a Spark session to start...
Spark Initialization Done! ApplicationId = app-20201114154030-0001
KERNEL_ID = f666a35b-6594-48f3-8bb1-c04efe258091


In [2]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

<a id='section_1'></a>
## Section 1. Set-up COS Functions and Credentials

#### Define COS credentials

In [3]:
# Temporary credentials provided for DevDays only

synthetic_mass_read_only = \
{
  "apikey": "HNJj8lVRmT-wX-n3ns2d8A8_iLFITob7ibC6aH66GZQX",
  "endpoints": "https://control.cloud-object-storage.cloud.ibm.com/v2/endpoints",
  "iam_apikey_description": "Auto-generated for key 418c8c60-5c31-4ed0-8a08-0f6641a01d46",
  "iam_apikey_name": "dev_days",
  "iam_role_crn": "crn:v1:bluemix:public:iam::::serviceRole:Reader",
  "iam_serviceid_crn": "crn:v1:bluemix:public:iam-identity::a/f0dfe396162db060e2e2a53ff465dfa0::serviceid:ServiceId-e13864d8-8b73-4901-8060-b84123e5ca1c",
  "resource_instance_id": "crn:v1:bluemix:public:cloud-object-storage:global:a/f0dfe396162db060e2e2a53ff465dfa0:3067bed7-8108-4d6e-ba32-5d5f643700e5::"
}

In [4]:
cos_api_key = synthetic_mass_read_only
input_bucket = 'syntheticmass-south'

# https://s3.private.dal.us.cloud-object-storage.appdomain.cloud
# https://s3.private.us-south.cloud-object-storage.appdomain.cloud

credentials = {
    'service_id': cos_api_key['iam_serviceid_crn'],
    'api_key': cos_api_key['apikey'],
    'endpoint': 'https://s3.private.us-south.cloud-object-storage.appdomain.cloud',
    'iam_service_endpoint': 'https://iam.ng.bluemix.net/oidc/token'
}

configuration_name = 'syntheticmass-read' #Must be unique for each bucket / configuration!
spark_cos = ibmos2spark.CloudObjectStorage(sc, credentials, configuration_name, 'bluemix_cos')

In [5]:
boto_cos = ibm_boto3.resource('s3',
                         ibm_api_key_id=cos_api_key['apikey'],
                         ibm_service_instance_id=cos_api_key['iam_serviceid_crn'],
                         ibm_auth_endpoint=credentials['iam_service_endpoint'],
                         config=Config(signature_version='oauth'),
                         endpoint_url=credentials['endpoint'])

#### Set-up COS Functions

In [6]:
def copy_to_local(cos, bucket_name, file_name, local_file_name):
    try:
        print("Downloading {0} from Bucket {1}".format(file_name, bucket_name))
        cos.Bucket(bucket_name).download_file(Key=file_name, Filename=local_file_name)
        print("{0} downloaded".format(local_file_name))
    except Exception as e:
        print("GENERAL ERROR: {0}".format(e))
        

def copy_from_local(local_file_name, cos, bucket_name, file_name):
    try:
        print("Uploading {0}".format(local_file_name))
        cos.Bucket(bucket_name).upload_file(Key=file_name, Filename=local_file_name)
        print("{0} uploaded to Bucket {1}".format(file_name, bucket_name))
    except Exception as e:
        print("GENERAL ERROR: {0}".format(e))
        
def get_bucket_contents(cos, bucket_name):
    print("Retrieving bucket contents from: {0}".format(bucket_name))
    try:
        files = cos.Bucket(bucket_name).objects.all()
        for file in files:
            print("Item: {0} ({1} bytes).".format(file.key, file.size))
    except ClientError as be:
        print("CLIENT ERROR: {0}\n".format(be))
    except Exception as e:
        print("Unable to retrieve bucket contents: {0}".format(e))
        
def create_text_file(cos, bucket_name, item_name, file_text):
    print("Creating new item: {0}".format(item_name))
    try:
        cos.Object(bucket_name, item_name).put(
            Body=file_text
        )
        print("Item: {0} created!".format(item_name))
    except ClientError as be:
        print("CLIENT ERROR: {0}\n".format(be))
    except Exception as e:
        print("Unable to create text file: {0}".format(e))
        
def copy_from_local_with_time(df,resource, bucket):
    today = date.today()
    current_date = today.strftime("%d/%m/%Y")

    # create file name
    file_name = "{date}_{df_name}".format(date = current_date,df_name = df)
    
    #push to COS
    copy_from_local('./{}'.format(df), resource, bucket, file_name)
    
def delete_all_files(cos, bucket_name, should_contain=''):
    files = cos.Bucket(bucket_name).objects.all()
    for file in files:
        if should_contain in file.key:
            file.delete()
            print("DELETED: {0}".format(file.key))    

#### COS List Files

In [7]:
cos_fhir_read_bucket = input_bucket

# get_bucket_contents(boto_cos, cos_fhir_read_bucket)

<a id='section_2'></a>
## Section 2. Explore "Condition" resource

In [8]:
input_file = 'r4_Condition.ndjson'


%time condition = spark.read\
    .option('multiline','false')\
    .format('json')\
  .load(spark_cos.url(input_file, input_bucket))

condition.count()

CPU times: user 12 ms, sys: 4 ms, total: 16 ms
Wall time: 22 s


880043

In [9]:
# condition = condition.limit(100000).cache()
condition.limit(5)

abatementDateTime,clinicalStatus,code,encounter,id,onsetDateTime,recordedDate,resourceType,subject,verificationStatus
,"[[[active, http:/...","[[[38341003, Hype...",[urn:uuid:a1de588...,97af11ac-672f-4ee...,1998-03-07T21:14:...,1998-03-07T21:14:...,Condition,[urn:uuid:9e0ed03...,"[[[confirmed, htt..."
2009-10-29T21:14:...,"[[[resolved, http...","[[[10509002, Acut...",[urn:uuid:c516e14...,f90d73a9-00fc-49a...,2009-10-22T21:14:...,2009-10-22T21:14:...,Condition,[urn:uuid:9e0ed03...,"[[[confirmed, htt..."
2009-11-04T21:14:...,"[[[resolved, http...","[[[444814009, Vir...",[urn:uuid:54d8f6e...,f21db262-d11f-42f...,2009-10-28T21:14:...,2009-10-28T21:14:...,Condition,[urn:uuid:9e0ed03...,"[[[confirmed, htt..."
2010-06-28T21:14:...,"[[[resolved, http...","[[[62106007, Conc...",[urn:uuid:fddf27f...,de112595-e70d-416...,2010-04-29T21:14:...,2010-04-29T21:14:...,Condition,[urn:uuid:9e0ed03...,"[[[confirmed, htt..."
2017-07-20T07:18:...,"[[[resolved, http...","[[[444814009, Vir...",[urn:uuid:e3dd3e3...,b677ae13-cba5-4c5...,2017-07-06T07:18:...,2017-07-06T07:18:...,Condition,[urn:uuid:6fe098b...,"[[[confirmed, htt..."


In [10]:
condition.printSchema()

root
 |-- abatementDateTime: string (nullable = true)
 |-- clinicalStatus: struct (nullable = true)
 |    |-- coding: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- code: string (nullable = true)
 |    |    |    |-- system: string (nullable = true)
 |-- code: struct (nullable = true)
 |    |-- coding: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- code: string (nullable = true)
 |    |    |    |-- display: string (nullable = true)
 |    |    |    |-- system: string (nullable = true)
 |    |-- text: string (nullable = true)
 |-- encounter: struct (nullable = true)
 |    |-- reference: string (nullable = true)
 |-- id: string (nullable = true)
 |-- onsetDateTime: string (nullable = true)
 |-- recordedDate: string (nullable = true)
 |-- resourceType: string (nullable = true)
 |-- subject: struct (nullable = true)
 |    |-- reference: string (nullable = true)
 |-- verificationStatus: struct 

### Identifying all patients with Diabetes in Confirmed status in the last X years

#### Record selection {Option-1} preserving names

In [11]:
condition_diabetes = condition.\
where(array_contains('clinicalStatus.coding.code', 'active')).\
where(array_contains('verificationStatus.coding.code', 'confirmed')).\
where(array_contains('code.coding.code', '44054006')).\
where(condition.onsetDateTime > '1900-01-01').\
select(\
       (condition['subject.reference'].substr(10, 40).alias('patient_id')),
       'clinicalStatus.coding.code',\
       'verificationStatus.coding.code',\
       'code.coding.code', \
       'code.coding.display',\
       to_date(condition['onsetDateTime']).alias('first_observation_date'))

condition_diabetes.limit(5)

patient_id,code,code.1,code.2,display,first_observation_date
22bb9254-8abb-467...,[active],[confirmed],[44054006],[Diabetes],1980-06-19
7c485227-4e00-42d...,[active],[confirmed],[44054006],[Diabetes],1993-06-24
4c65f823-e939-417...,[active],[confirmed],[44054006],[Diabetes],2013-06-08
a851e37f-f93e-421...,[active],[confirmed],[44054006],[Diabetes],1985-07-20
40c9e21b-9a15-475...,[active],[confirmed],[44054006],[Diabetes],2011-09-01


In [12]:
condition_diabetes.printSchema()

root
 |-- patient_id: string (nullable = true)
 |-- code: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- code: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- code: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- display: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- first_observation_date: date (nullable = true)



#### Record selection {Option-2} with renaming

In [13]:
condition_diabetes = condition.\
where(array_contains('clinicalStatus.coding.code', 'active')).\
where(array_contains('verificationStatus.coding.code', 'confirmed')).\
where(array_contains('code.coding.code', '44054006')).\
where(condition.onsetDateTime > '1900-01-01').\
select(\
       (condition['subject.reference'].substr(10, 40).alias('patient_id')),
       col('clinicalStatus.coding.code').alias('condition_status'),\
       col('verificationStatus.coding.code').alias('verification_status'),\
       col('code.coding.code').alias('snomed_code'), \
       col('code.coding.display').alias('snomed_name'),\
       to_date(condition['onsetDateTime']).alias('first_observation_date'))

condition_diabetes.limit(5)

patient_id,condition_status,verification_status,snomed_code,snomed_name,first_observation_date
9683a6fb-c277-418...,[active],[confirmed],[44054006],[Diabetes],2017-07-06
f4cc08a7-15e3-47a...,[active],[confirmed],[44054006],[Diabetes],1957-07-09
d9c0c67c-ad91-4c1...,[active],[confirmed],[44054006],[Diabetes],2019-03-30
8ea4b567-9912-46c...,[active],[confirmed],[44054006],[Diabetes],2011-06-16
d3186727-4f34-4a6...,[active],[confirmed],[44054006],[Diabetes],1966-03-01


In [14]:
condition_diabetes.printSchema()

root
 |-- patient_id: string (nullable = true)
 |-- condition_status: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- verification_status: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- snomed_code: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- snomed_name: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- first_observation_date: date (nullable = true)



#### Record selection {Option-3} with renaming and flattening

In [15]:
condition_diabetes = condition.\
where(array_contains('clinicalStatus.coding.code', 'active')).\
where(array_contains('verificationStatus.coding.code', 'confirmed')).\
where(array_contains('code.coding.code', '44054006')).\
where(condition.onsetDateTime > '1900-01-01').\
withColumn('condition_status', condition['clinicalStatus.coding.code'].getItem(0)).\
withColumn('verification_status', condition['verificationStatus.coding.code'].getItem(0)).\
withColumn('snomed_code', condition['code.coding.code'].getItem(0)).\
withColumn('snomed_name', condition['code.coding.display'].getItem(0)).\
select(\
       (condition['subject.reference'].substr(10, 40).alias('patient_id')),
       'condition_status',\
       'verification_status',\
       'snomed_code', \
       'snomed_name',\
       to_date(condition['onsetDateTime']).alias('first_observation_date'))

condition_diabetes.limit(5)

patient_id,condition_status,verification_status,snomed_code,snomed_name,first_observation_date
4557c356-5cd3-483...,active,confirmed,44054006,Diabetes,2018-10-25
84ec9e91-2bc9-4be...,active,confirmed,44054006,Diabetes,1986-11-04
6671cdcf-ea78-46f...,active,confirmed,44054006,Diabetes,2015-05-17
ed4edac7-daf7-4db...,active,confirmed,44054006,Diabetes,2010-10-01
68eaf7e6-05c9-4a2...,active,confirmed,44054006,Diabetes,2018-11-14


In [16]:
condition_diabetes.printSchema()

root
 |-- patient_id: string (nullable = true)
 |-- condition_status: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- snomed_code: string (nullable = true)
 |-- snomed_name: string (nullable = true)
 |-- first_observation_date: date (nullable = true)



<a id='section_3'></a>
## Section 3. Explore "Observation" resource

In [17]:
input_file = 'r4_Observation.ndjson'


%time observation = spark.read\
    .option('multiline','false')\
    .format('json')\
  .load(spark_cos.url(input_file, input_bucket))

observation.count()

CPU times: user 92 ms, sys: 20 ms, total: 112 ms
Wall time: 1min 10s


20515269

In [18]:
# observation = observation.limit(100000).cache()
observation.limit(5)

category,code,component,effectiveDateTime,encounter,id,issued,resourceType,status,subject,valueCodeableConcept,valueQuantity,valueString
"[[[[laboratory, l...","[[[785-6, MCH [En...",,2018-11-20T13:31:...,[urn:uuid:ca8c667...,17fba280-0835-46e...,2018-11-20T13:31:...,Observation,final,[urn:uuid:f3650b2...,,"[pg, http://units...",
"[[[[laboratory, l...","[[[786-4, MCHC [M...",,2018-11-20T13:31:...,[urn:uuid:ca8c667...,61ef0100-4f06-4c0...,2018-11-20T13:31:...,Observation,final,[urn:uuid:f3650b2...,,"[g/dL, http://uni...",
"[[[[laboratory, l...","[[[21000-5, Eryth...",,2018-11-20T13:31:...,[urn:uuid:ca8c667...,35e723ef-afa3-4a7...,2018-11-20T13:31:...,Observation,final,[urn:uuid:f3650b2...,,"[fL, http://units...",
"[[[[laboratory, l...","[[[777-3, Platele...",,2018-11-20T13:31:...,[urn:uuid:ca8c667...,4b87b127-352e-419...,2018-11-20T13:31:...,Observation,final,[urn:uuid:f3650b2...,,"[10*3/uL, http://...",
"[[[[laboratory, l...","[[[32207-3, Plate...",,2018-11-20T13:31:...,[urn:uuid:ca8c667...,e90ae5c1-54fe-417...,2018-11-20T13:31:...,Observation,final,[urn:uuid:f3650b2...,,"[fL, http://units...",


In [19]:
observation.printSchema()

root
 |-- category: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- coding: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- code: string (nullable = true)
 |    |    |    |    |-- display: string (nullable = true)
 |    |    |    |    |-- system: string (nullable = true)
 |-- code: struct (nullable = true)
 |    |-- coding: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- code: string (nullable = true)
 |    |    |    |-- display: string (nullable = true)
 |    |    |    |-- system: string (nullable = true)
 |    |-- text: string (nullable = true)
 |-- component: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- code: struct (nullable = true)
 |    |    |    |-- coding: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- code: string (nullabl

#### Record selection with renaming and flattening

In [20]:
observation_diabetes = observation.\
where(array_contains('code.coding.code', '4548-4')).\
withColumn('loinc_code', observation['code.coding.code'].getItem(0)).\
withColumn('loinc_name', observation['code.coding.display'].getItem(0)).\
select(\
        (observation['subject.reference'].substr(10, 40).alias('patient_id')),
        'loinc_code',\
        'loinc_name',\
        to_date(observation['effectiveDateTime']).alias('effective_date'))

observation_diabetes.limit(5)

patient_id,loinc_code,loinc_name,effective_date
d3d39284-0811-4c6...,4548-4,Hemoglobin A1c/He...,1995-03-25
d3d39284-0811-4c6...,4548-4,Hemoglobin A1c/He...,1997-03-29
d3d39284-0811-4c6...,4548-4,Hemoglobin A1c/He...,1999-04-03
d3d39284-0811-4c6...,4548-4,Hemoglobin A1c/He...,2001-03-10
d3d39284-0811-4c6...,4548-4,Hemoglobin A1c/He...,2002-03-16


####  Check the frequency of HbA1c tests

In [21]:
%time loinc_code_freqency = observation_diabetes .groupby('patient_id', 'loinc_code', 'loinc_name').\
    agg(\
        count('effective_date').alias('records_count')\
       )

loinc_code_freqency.where(loinc_code_freqency['records_count'] > 1).limit(5)

CPU times: user 4 ms, sys: 4 ms, total: 8 ms
Wall time: 34.6 ms


patient_id,loinc_code,loinc_name,records_count
506694f3-f741-466...,4548-4,Hemoglobin A1c/He...,10
6ad12ea3-279d-436...,4548-4,Hemoglobin A1c/He...,7
2cc1789c-4617-4f0...,4548-4,Hemoglobin A1c/He...,10
bd2f2e15-8fb6-4f5...,4548-4,Hemoglobin A1c/He...,8
62c4a7fa-105a-480...,4548-4,Hemoglobin A1c/He...,10


In [22]:
loinc_code_freqency.describe('records_count')

summary,records_count
count,38637.0
mean,11.279861272873152
stddev,14.05664672738714
min,1.0
max,113.0


<a id='section_4'></a>
## Section 4.  Explore "Patient" resource

In [23]:
input_file = 'r4_Patient.ndjson'


%time patient = spark.read\
    .option('multiline','false')\
    .format('json')\
  .load(spark_cos.url(input_file, input_bucket))

patient.count()

CPU times: user 8 ms, sys: 0 ns, total: 8 ms
Wall time: 3.02 s


117311

In [24]:
# patient = patient.limit(100000).cache()
patient.limit(5)

address,birthDate,communication,deceasedDateTime,extension,gender,id,identifier,maritalStatus,multipleBirthBoolean,multipleBirthInteger,name,resourceType,telecom,text
"[[Lowell, US, [[[...",1940-11-26,"[[[[[en-US, Engli...",,"[[[[ombCategory, ...",female,6c5332a6-e49d-4df...,[[https://github....,"[[[M, M, http://t...",False,,"[[Osinski784, [Ho...",Patient,"[[phone, home, 55...","[<div xmlns=""http..."
"[[Danvers, US, [[...",2013-09-06,"[[[[[en-US, Engli...",,"[[[[ombCategory, ...",male,89d90dc7-1776-444...,[[https://github....,"[[[S, Never Marri...",False,,"[[Fay398, [Hollis...",Patient,"[[phone, home, 55...","[<div xmlns=""http..."
"[[Westford, US, [...",1996-07-01,"[[[[[en-US, Engli...",,"[[[[ombCategory, ...",female,aacee13c-aea8-45c...,[[https://github....,"[[[S, Never Marri...",False,,"[[Klein929, [Anto...",Patient,"[[phone, home, 55...","[<div xmlns=""http..."
"[[Worcester, US, ...",1979-02-11,"[[[[[es, Spanish,...",,"[[[[ombCategory, ...",male,e093f51c-562b-4d1...,[[https://github....,"[[[M, M, http://t...",False,,"[[Aguayo104, [Dav...",Patient,"[[phone, home, 55...","[<div xmlns=""http..."
"[[Mansfield, US, ...",1984-11-28,"[[[[[en-US, Engli...",,"[[[[ombCategory, ...",female,aacdab09-c4cd-406...,[[https://github....,"[[[M, M, http://t...",False,,"[[Gulgowski816, [...",Patient,"[[phone, home, 55...","[<div xmlns=""http..."


In [25]:
patient.printSchema()

root
 |-- address: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- country: string (nullable = true)
 |    |    |-- extension: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- extension: array (nullable = true)
 |    |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |    |-- url: string (nullable = true)
 |    |    |    |    |    |    |-- valueDecimal: double (nullable = true)
 |    |    |    |    |-- url: string (nullable = true)
 |    |    |-- line: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- postalCode: string (nullable = true)
 |    |    |-- state: string (nullable = true)
 |-- birthDate: string (nullable = true)
 |-- communication: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- language: struct (nullable = true)

<a id='section_5'></a>
## Section 5.  Link Condition with Patients (diabetes)

In [26]:
patient_all = patient.select(col('id').alias('patient_id'), col('birthDate').alias('birth_date'), 'gender')

In [27]:
patient_all.limit(5)

patient_id,birth_date,gender
2d913c00-48ad-4af...,2004-05-23,male
2ecfa822-e49b-420...,1998-11-22,male
4a9668b1-ff30-495...,2015-02-20,male
39b933da-1358-429...,1971-04-27,male
a60f42cd-76c3-42b...,1982-04-19,male


In [28]:
# Number of records with Diabetes
condition_diabetes.count()

8081

In [29]:
# Number of patients
patient_all.count()

117311

In [30]:
# patient_diabetes = patient_all.join(condition_diabetes, (patient_all.patient_id == condition_diabetes.patient_id), how='inner')

# To prevent duplicate column
%time patient_diabetes = patient_all.join(condition_diabetes, ['patient_id'], how='inner')

# patient_diabetes = patient_diabetes.limit(100000).cache()
patient_diabetes.count()

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 22.3 ms


8081

In [31]:
patient_diabetes.limit(5)

patient_id,birth_date,gender,condition_status,verification_status,snomed_code,snomed_name,first_observation_date
08b1c9ae-9d60-4f5...,1966-03-10,female,active,confirmed,44054006,Diabetes,1989-07-13
09c21f37-89b6-41e...,1948-11-01,female,active,confirmed,44054006,Diabetes,1992-05-11
10209746-e589-450...,1982-02-02,male,active,confirmed,44054006,Diabetes,2010-04-13
10b99cb0-5a8e-478...,1948-08-10,male,active,confirmed,44054006,Diabetes,1977-06-14
1170f625-d812-438...,1964-05-23,male,active,confirmed,44054006,Diabetes,1992-09-26


In [32]:
patient_diabetes.printSchema()

root
 |-- patient_id: string (nullable = true)
 |-- birth_date: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- condition_status: string (nullable = true)
 |-- verification_status: string (nullable = true)
 |-- snomed_code: string (nullable = true)
 |-- snomed_name: string (nullable = true)
 |-- first_observation_date: date (nullable = true)



<a id='section_6'></a>
## Section 6.  Create Longitudinal Patient Record

#### Prepare condition resource

In [33]:
condition_all = condition.\
withColumn('condition_status', condition['clinicalStatus.coding.code'].getItem(0)).\
withColumn('verification_status', condition['verificationStatus.coding.code'].getItem(0)).\
withColumn('snomed_code', condition['code.coding.code'].getItem(0)).\
withColumn('snomed_name', condition['code.coding.display'].getItem(0)).\
select(\
       (condition['subject.reference'].substr(10, 40).alias('patient_id')),
       'condition_status',\
       'verification_status',\
       'snomed_code', \
       'snomed_name',\
       to_date(condition['onsetDateTime']).alias('first_observation_date'))

condition_all.limit(5)

patient_id,condition_status,verification_status,snomed_code,snomed_name,first_observation_date
ebac0cc1-07a5-449...,resolved,confirmed,43878008,Streptococcal sor...,2009-06-30
ecd26f59-fe71-4a8...,resolved,confirmed,10509002,Acute bronchitis ...,2009-08-20
ebac0cc1-07a5-449...,resolved,confirmed,195662009,Acute viral phary...,2010-06-09
ebac0cc1-07a5-449...,resolved,confirmed,444814009,Viral sinusitis (...,2011-05-12
ecd26f59-fe71-4a8...,resolved,confirmed,58150001,Fracture of clavicle,2010-03-05


In [34]:
condition_target = condition_diabetes.\
    withColumnRenamed('snomed_code', 'target_code').\
    withColumnRenamed('snomed_name', 'target_name').\
    withColumnRenamed('first_observation_date', 'target_first_date').\
    drop('condition_status', 'verification_status')

condition_target.limit(5)

patient_id,target_code,target_name,target_first_date
89c668fd-0e66-494...,44054006,Diabetes,2013-11-26
71d5b64e-4ea0-451...,44054006,Diabetes,1994-12-31
82e4efba-ab48-428...,44054006,Diabetes,2015-12-26
2a9048fc-56d9-489...,44054006,Diabetes,1982-03-29
2c945863-0afc-46b...,44054006,Diabetes,1982-09-08


#### Get all other conditions for patients with diabetes

In [35]:
%time patient_condition = condition_all.join(condition_target, ['patient_id'], how='inner')

patient_condition.count()

CPU times: user 4 ms, sys: 0 ns, total: 4 ms
Wall time: 10.7 ms


120835

In [36]:
# patient_condition = patient_condition.limit(100000).cache()
patient_condition.limit(5)

patient_id,condition_status,verification_status,snomed_code,snomed_name,first_observation_date,target_code,target_name,target_first_date
08b1c9ae-9d60-4f5...,active,confirmed,38341003,Hypertension,1989-07-13,44054006,Diabetes,1989-07-13
08b1c9ae-9d60-4f5...,active,confirmed,44054006,Diabetes,1989-07-13,44054006,Diabetes,1989-07-13
08b1c9ae-9d60-4f5...,active,confirmed,302870006,Hypertriglyceride...,1992-07-16,44054006,Diabetes,1989-07-13
08b1c9ae-9d60-4f5...,active,confirmed,431855005,Chronic kidney di...,1992-07-16,44054006,Diabetes,1989-07-13
08b1c9ae-9d60-4f5...,active,confirmed,127013003,Diabetic renal di...,1992-07-16,44054006,Diabetes,1989-07-13


#### Restrict to include only relevant comorbities and complications

In [37]:
# Limit to the relevant status
# patient_condition = patient_condition.filter((patient_condition['condition_status'] == 'active') & (patient_condition['verification_status'] == 'confirmed'))

# Limit to the relevant comorbities and complications
patient_relevant = patient_condition.where(col('snomed_code').isin({\
        '53741008',\
        '709044004',\
        '49436004',\
        '230690007',\
        '38341003',\
        '84114007',\
        '400047006',\
        '69896004',\
        '86049000',\
        '363346000',\
        '64859006',\
        '35489007',\
        '195967001',\
        '13645005',\
        '52448006',\
        '391193001',\
        '84757009',\
        '40930008',\
        '1855002',\
        '56265001',\
        '230690007',\
        '65956007',\
        '60951000119105',\
        '105597003',\
        '97331000119101',\
        '422034002',\
        '1501000119109',\
        '1551000119108',\
        '90708001',\
        '127013003',\
        '46177005',\
        '57182000',\
        '386033004',\
        '368581000119106',\
        '81723002 ',\
        '161622006',\
        '429280009',\
        '161621004',\
        '698423002',\
        '90781000119102',\
        '157141000119108'
            }))

patient_relevant.limit(5)

patient_id,condition_status,verification_status,snomed_code,snomed_name,first_observation_date,target_code,target_name,target_first_date
08b1c9ae-9d60-4f5...,active,confirmed,38341003,Hypertension,1989-07-13,44054006,Diabetes,1989-07-13
08b1c9ae-9d60-4f5...,active,confirmed,127013003,Diabetic renal di...,1992-07-16,44054006,Diabetes,1989-07-13
08b1c9ae-9d60-4f5...,active,confirmed,90781000119102,Microalbuminuria ...,2004-08-05,44054006,Diabetes,1989-07-13
09c21f37-89b6-41e...,active,confirmed,38341003,Hypertension,1966-12-26,44054006,Diabetes,1992-05-11
09c21f37-89b6-41e...,active,confirmed,127013003,Diabetic renal di...,1996-05-20,44054006,Diabetes,1992-05-11


In [38]:
# Identify the first / last date of each condition
%time lpr_group = patient_relevant.groupby('patient_id', 'target_code', 'target_name', 'snomed_code', 'snomed_name').\
    agg(\
        min('target_first_date').alias('target_first_date'),\
        min('first_observation_date').alias('first_observation_date'),\
        max('first_observation_date').alias('last_observation_date'),\
       )

lpr_group.limit(5)

CPU times: user 8 ms, sys: 4 ms, total: 12 ms
Wall time: 39.2 ms


patient_id,target_code,target_name,snomed_code,snomed_name,target_first_date,first_observation_date,last_observation_date
123a02a9-5d5a-402...,44054006,Diabetes,38341003,Hypertension,2009-10-17,1997-10-18,1997-10-18
123a02a9-5d5a-402...,44054006,Diabetes,368581000119106,Neuropathy due to...,2009-10-17,2019-01-05,2019-01-05
1cf894aa-e622-4be...,44054006,Diabetes,53741008,Coronary Heart Di...,1993-12-28,2013-03-26,2013-03-26
1cf894aa-e622-4be...,44054006,Diabetes,422034002,Diabetic retinopa...,1993-12-28,2014-11-18,2014-11-18
22b284c3-5200-493...,44054006,Diabetes,422034002,Diabetic retinopa...,1940-03-08,1940-03-08,1940-03-08


#### Add patient birth data and gender.  We now have row-based LPR

In [39]:
lpr_row = patient_all.join(lpr_group, ['patient_id'], how='inner')

lpr_row.limit(5)

patient_id,birth_date,gender,target_code,target_name,snomed_code,snomed_name,target_first_date,first_observation_date,last_observation_date
08b1c9ae-9d60-4f5...,1966-03-10,female,44054006,Diabetes,38341003,Hypertension,1989-07-13,1989-07-13,1989-07-13
08b1c9ae-9d60-4f5...,1966-03-10,female,44054006,Diabetes,127013003,Diabetic renal di...,1989-07-13,1992-07-16,1992-07-16
08b1c9ae-9d60-4f5...,1966-03-10,female,44054006,Diabetes,90781000119102,Microalbuminuria ...,1989-07-13,2004-08-05,2004-08-05
09c21f37-89b6-41e...,1948-11-01,female,44054006,Diabetes,38341003,Hypertension,1992-05-11,1966-12-26,1966-12-26
09c21f37-89b6-41e...,1948-11-01,female,44054006,Diabetes,127013003,Diabetic renal di...,1992-05-11,1996-05-20,1996-05-20


In [40]:
lpr_row.count()

21982

In [41]:
# lpr_row.limit(100000).cache()
lpr_row.count()

21982

#### Convert row-based LPR into column-based LPR

In [42]:
%time lpr_col = lpr_row.groupby('patient_id', 'birth_date', 'gender', 'target_code', 'target_name', 'target_first_date').\
    pivot('snomed_code').\
    agg(\
        first('first_observation_date').alias('first_dt'),\
        last('first_observation_date').alias('last_dt')\
           )

lpr_col.limit(5)

CPU times: user 84 ms, sys: 52 ms, total: 136 ms
Wall time: 11.5 s


patient_id,birth_date,gender,target_code,target_name,target_first_date,127013003_first_dt,127013003_last_dt,1501000119109_first_dt,1501000119109_last_dt,1551000119108_first_dt,1551000119108_last_dt,157141000119108_first_dt,157141000119108_last_dt,161621004_first_dt,161621004_last_dt,161622006_first_dt,161622006_last_dt,195967001_first_dt,195967001_last_dt,230690007_first_dt,230690007_last_dt,368581000119106_first_dt,368581000119106_last_dt,38341003_first_dt,38341003_last_dt,422034002_first_dt,422034002_last_dt,429280009_first_dt,429280009_last_dt,46177005_first_dt,46177005_last_dt,49436004_first_dt,49436004_last_dt,53741008_first_dt,53741008_last_dt,60951000119105_first_dt,60951000119105_last_dt,64859006_first_dt,64859006_last_dt,698423002_first_dt,698423002_last_dt,69896004_first_dt,69896004_last_dt,84757009_first_dt,84757009_last_dt,90781000119102_first_dt,90781000119102_last_dt,97331000119101_first_dt,97331000119101_last_dt
08b1c9ae-9d60-4f5...,1966-03-10,female,44054006,Diabetes,1989-07-13,1992-07-16,1992-07-16,,,,,,,,,,,,,,,,,1989-07-13,1989-07-13,,,,,,,,,,,,,,,,,,,,,2004-08-05,2004-08-05,,
09c21f37-89b6-41e...,1948-11-01,female,44054006,Diabetes,1992-05-11,1996-05-20,1996-05-20,,,,,,,,,,,,,,,1996-05-20,1996-05-20,1966-12-26,1966-12-26,,,,,,,,,,,,,2008-11-03,2008-11-03,,,,,,,,,,
10209746-e589-450...,1982-02-02,male,44054006,Diabetes,2010-04-13,,,,,,,,,,,,,,,,,,,2000-03-28,2000-03-28,2010-04-13,2010-04-13,,,,,,,,,,,,,,,,,,,,,,
10b99cb0-5a8e-478...,1948-08-10,male,44054006,Diabetes,1977-06-14,1995-05-16,1995-05-16,,,1996-07-09,1996-07-09,,,,,,,,,,,,,1977-06-14,1977-06-14,1992-06-23,1992-06-23,,,,,,,,,,,,,,,,,,,,,,
1170f625-d812-438...,1964-05-23,male,44054006,Diabetes,1992-09-26,,,,,,,,,,,,,,,,,,,1992-09-26,1992-09-26,,,,,,,,,,,,,,,,,,,1980-05-19,1980-05-19,,,,


In [43]:
lpr_col.count()

7393

<a id='section_7'></a>
## Section 7.  Save LPR into COS

#### Set-up COS credentials with write persmissions  
Participants must provide their own COS credentials with write permissions

#### Save patients with diabetes

In [44]:
# output_file = '/lpr/patient_diabetes'
# mode = 'overwrite'

# %time patient_diabetes.write.mode("overwrite").parquet(spark_cos.url(output_file, output_bucket))

#### Save observations with diabetes

In [45]:
# output_file = '/lpr/observation_diabetes'
# mode = 'overwrite'

# %time observation_diabetes.write.mode("overwrite").parquet(spark_cos.url(output_file, output_bucket))

#### Save row-based LPR

In [46]:
# output_file = '/lpr/lpr_row'
# mode = 'overwrite'

# %time lpr_row.write.mode("overwrite").parquet(spark_cos.url(output_file, output_bucket))

#### Save column-based LPR

In [47]:
# output_file = '/lpr/lpr_col'
# mode = 'overwrite'

# %time lpr_col.write.mode("overwrite").parquet(spark_cos.url(output_file, output_bucket))

In [48]:
import datetime
import pytz

datetime.datetime.now(pytz.timezone('US/Central')).strftime("%a, %d %B %Y %H:%M:%S")

'Sat, 14 November 2020 09:50:23'