In [None]:
# default_exp preprocessing.clean

In [None]:
#all_slow

# Clean

> Functions to split the raw EHR dataset, clean and save for further processing & vocab creation.

In [None]:
#hide
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:85% !important; }</style>"))

In [None]:
#hide
%reload_ext autoreload
%autoreload 2

In [None]:
#export
from lemonpie.basics import *
from fastai.imports import *
import ray

In [None]:
#hide
from nbdev.showdoc import *

In [None]:
ray.init()

2022-09-21 21:35:00,624	INFO services.py:1245 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


{'node_ip_address': '192.168.86.91',
 'raylet_ip_address': '192.168.86.91',
 'redis_address': '192.168.86.91:6379',
 'object_store_address': '/tmp/ray/session_2022-09-21_21-34-59_137609_42784/sockets/plasma_store',
 'raylet_socket_name': '/tmp/ray/session_2022-09-21_21-34-59_137609_42784/sockets/raylet',
 'webui_url': '127.0.0.1:8265',
 'session_dir': '/tmp/ray/session_2022-09-21_21-34-59_137609_42784',
 'metrics_export_port': 54481,
 'node_id': '72f5673008eb220ca05c4b61e91c8adc25c8cccb4a14200f97edb1cf'}

## Split 

Once dataset is assembled, the folder will look as follows .. 

In [None]:
DATA_STORE

'/home/vinod/.lemonpie/datasets'

In [None]:
PATH_1K

'/home/vinod/.lemonpie/datasets/synthea/1K'

In [None]:
os.listdir(f'{PATH_1K}/raw_original')

['patients.csv',
 'observations.csv',
 'allergies.csv',
 'payers.csv',
 'careplans.csv',
 'medications.csv',
 'devices.csv',
 'organizations.csv',
 'imaging_studies.csv',
 'procedures.csv',
 'payer_transitions.csv',
 'supplies.csv',
 'conditions.csv',
 'providers.csv',
 'encounters.csv',
 'immunizations.csv']

In [None]:
#export
def read_raw_ehrdata(path, csv_names = FILENAMES):
    '''Read raw EHR data'''
    dfs = [pd.read_csv(f'{path}/{fname}.csv', low_memory=False) for fname in csv_names]
    return dfs    

In [None]:
dfs = read_raw_ehrdata(f'{PATH_1K}/raw_original')

In [None]:
patients, observations, allergies, careplans, medications, imaging_studies, procedures, conditions, immunizations = dfs

In [None]:
#export
def split_patients(patients, valid_pct=0.2, test_pct=0.2, random_state=1234):
    '''Split the patients dataframe'''
    train_pct = 1 - (valid_pct + test_pct)
    print(f'Splits:: train: {train_pct}, valid: {valid_pct}, test: {test_pct}')
    patients = patients.sample(frac=1, random_state=random_state).reset_index(drop=True)
    return np.split(patients, [int(train_pct*len(patients)), int((train_pct+valid_pct)*len(patients))])

In [None]:
train_pts, valid_pts, test_pts = split_patients(patients, .2,.1)

Splits:: train: 0.7, valid: 0.2, test: 0.1


In [None]:
len(patients), len(train_pts), len(valid_pts), len(test_pts)

(1171, 819, 234, 118)

In [None]:
assert len(patients) == len(train_pts)+len(valid_pts)+len(test_pts)

In [None]:
#export
def split_ehr_dataset(path, valid_pct=0.2, test_pct=0.2, random_state=1234):
    '''Split EHR dataset into train, valid, test and save'''

    train_dfs, valid_dfs, test_dfs = [],[],[]
    
    dfs = read_raw_ehrdata(f'{path}/raw_original')
    all_pts = dfs[0]
    all_pts.rename(str.lower, axis='columns', inplace=True)
    train_pt, valid_pt, test_pt = split_patients(dfs[0], valid_pct, test_pct, random_state)
    train_dfs.append(train_pt)
    valid_dfs.append(valid_pt)
    test_dfs.append(test_pt)
    print(f'Split {FILENAMES[0]} into:: Train: {len(train_pt)}, Valid: {len(valid_pt)}, Test: {len(test_pt)} -- Total before split: {len(dfs[0])}')
    
    for df, name in zip(dfs[1:], FILENAMES[1:]):
        df = df.set_index('PATIENT')
        df_train = df.loc[df.index.intersection(train_pt['id']).unique()]
        df_valid = df.loc[df.index.intersection(valid_pt['id']).unique()]
        df_test = df.loc[df.index.intersection(test_pt['id']).unique()]
        assert len(df) == len(df_train)+len(df_valid)+len(df_test),f'Split failed {name}: {len(df)} != {len(df_train)}+{len(df_valid)}+{len(df_test)}'
        train_dfs.append(df_train.reset_index())
        valid_dfs.append(df_valid.reset_index())
        test_dfs.append(df_test.reset_index())

    
    for split in ['train', 'valid', 'test']:
        d = Path(f'{path}/raw_split/{split}')
        d.mkdir(parents=True, exist_ok=True)
        
        if split == 'train':
            for df, name in zip(train_dfs, FILENAMES):
                df.to_csv(f'{d}/{name}.csv', index=False)
            print(f'Saved train data to {d}')
        
        if split == 'valid':
            for df, name in zip(valid_dfs, FILENAMES):
                df.to_csv(f'{d}/{name}.csv', index=False)
            print(f'Saved valid data to {d}')
    
        if split == 'test':
            for df, name in zip(test_dfs, FILENAMES):
                df.to_csv(f'{d}/{name}.csv', index=False)
            print(f'Saved test data to {d}')

In [None]:
split_ehr_dataset(PATH_1K) #will use default values for split percents

Splits:: train: 0.6, valid: 0.2, test: 0.2
Split patients into:: Train: 702, Valid: 234, Test: 235 -- Total before split: 1171
Saved train data to /home/vinod/.lemonpie/datasets/synthea/1K/raw_split/train
Saved valid data to /home/vinod/.lemonpie/datasets/synthea/1K/raw_split/valid
Saved test data to /home/vinod/.lemonpie/datasets/synthea/1K/raw_split/test


In [None]:
def load_split_data(path):
    all_dfs = []
    for split in ['train', 'valid', 'test']:
            split_path = f'{path}/raw_split/{split}'
            dfs = read_raw_ehrdata(split_path) 
            all_dfs.append(dfs)
    return all_dfs

In [None]:
train_dfs, valid_dfs, test_dfs = load_split_data(PATH_1K)

## Clean

In [None]:
#export
@ray.remote(num_returns=3)
def cleanup_pts(pts, is_train, today=None):
    '''Clean patients df'''
    
    pts.rename(str.lower, axis='columns', inplace=True)
    pts = pts.loc[:, ['id', 'birthdate', 'marital', 'race', 'ethnicity', 'gender', 'birthplace', 'city', 'state', 'zip']]
    pts.rename(columns={"id":"patient"}, inplace=True)
    pts = pts.astype({'birthdate':'datetime64'}) 
    pts['zip'] = pts['zip'].fillna(0.0).astype(int)    
    if today == None: today = pd.Timestamp.today()
    else            : today = pd.to_datetime(today)
    pts['age_now_days'] = pts['birthdate'].apply(lambda bday: (today-bday).days)
    
    pts.fillna('xxxnan', inplace=True)
    if is_train: pt_codes = pts.drop(columns=['patient'], inplace=False)
    pts.set_index('patient', inplace=True)
    pt_demographics = pts
    patients = pts.loc[:,['birthdate']]
    
    return [patients, pt_demographics, pt_codes] if is_train else [patients, pt_demographics, None]

`patients` data frame looks like this before cleanup.. 

In [None]:
patients.head()

Unnamed: 0,Id,BIRTHDATE,DEATHDATE,SSN,DRIVERS,PASSPORT,PREFIX,FIRST,LAST,SUFFIX,...,BIRTHPLACE,ADDRESS,CITY,STATE,COUNTY,ZIP,LAT,LON,HEALTHCARE_EXPENSES,HEALTHCARE_COVERAGE
0,1d604da9-9a81-4ba9-80c2-de3375d59b40,1989-05-25,,999-76-6866,S99984236,X19277260X,Mr.,José Eduardo181,Gómez206,,...,Marigot Saint Andrew Parish DM,427 Balistreri Way Unit 19,Chicopee,Massachusetts,Hampden County,1013.0,42.228354,-72.562951,271227.08,1334.88
1,034e9e3b-2def-4559-bb2a-7850888ae060,1983-11-14,,999-73-5361,S99962402,X88275464X,Mr.,Milo271,Feil794,,...,Danvers Massachusetts US,422 Farrell Path Unit 69,Somerville,Massachusetts,Middlesex County,2143.0,42.360697,-71.126531,793946.01,3204.49
2,10339b10-3cd1-4ac3-ac13-ec26728cb592,1992-06-02,,999-27-3385,S99972682,X73754411X,Mr.,Jayson808,Fadel536,,...,Springfield Massachusetts US,1056 Harris Lane Suite 70,Chicopee,Massachusetts,Hampden County,1020.0,42.181642,-72.608842,574111.9,2606.4
3,8d4c4326-e9de-4f45-9a4c-f8c36bff89ae,1978-05-27,,999-85-4926,S99974448,X40915583X,Mrs.,Mariana775,Rutherford999,,...,Yarmouth Massachusetts US,999 Kuhn Forge,Lowell,Massachusetts,Middlesex County,1851.0,42.636143,-71.343255,935630.3,8756.19
4,f5dcd418-09fe-4a2f-baa0-3da800bd8c3a,1996-10-18,,999-60-7372,S99915787,X86772962X,Mr.,Gregorio366,Auer97,,...,Patras Achaea GR,1050 Lindgren Extension Apt 38,Boston,Massachusetts,Suffolk County,2135.0,42.352434,-71.02861,598763.07,3772.2


In [None]:
train_pts_cleaned = cleanup_pts.remote(train_dfs[0], is_train=True, today=SYNTHEA_DATAGEN_DATES['1K'])
train_pts_cleaned = ray.get(train_pts_cleaned) #train_pts_data[0], train_pts_data[1], train_pts_data[2]

In [None]:
valid_pts_cleaned = ray.get(cleanup_pts.remote(valid_dfs[0], is_train=False, today=SYNTHEA_DATAGEN_DATES['1K']))

The cleanup function produces the following 3 dfs - `patients`, `pt_demographics`, `pt_codes` **for training data**

In [None]:
for df in train_pts_cleaned:
    display(df.head(3))

Unnamed: 0_level_0,birthdate
patient,Unnamed: 1_level_1
b1d50391-79c5-403c-919f-3ded66c9d77a,1959-09-01
e52a1bbc-7b12-4d01-82cc-1196da05e399,2016-12-29
88587157-4de8-4459-b6d4-fd571b847575,1980-05-31


Unnamed: 0_level_0,birthdate,marital,race,ethnicity,gender,birthplace,city,state,zip,age_now_days
patient,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
b1d50391-79c5-403c-919f-3ded66c9d77a,1959-09-01,M,black,hispanic,F,Westborough Massachusetts US,Springfield,Massachusetts,0,22493
e52a1bbc-7b12-4d01-82cc-1196da05e399,2016-12-29,xxxnan,white,nonhispanic,F,Methuen Massachusetts US,Boston,Massachusetts,2134,1554
88587157-4de8-4459-b6d4-fd571b847575,1980-05-31,M,white,nonhispanic,F,Boston Massachusetts US,South Hadley,Massachusetts,0,14915


Unnamed: 0,birthdate,marital,race,ethnicity,gender,birthplace,city,state,zip,age_now_days
0,1959-09-01,M,black,hispanic,F,Westborough Massachusetts US,Springfield,Massachusetts,0,22493
1,2016-12-29,xxxnan,white,nonhispanic,F,Methuen Massachusetts US,Boston,Massachusetts,2134,1554
2,1980-05-31,M,white,nonhispanic,F,Boston Massachusetts US,South Hadley,Massachusetts,0,14915


And the cleanup function produces the following 2 dfs - `patients`, `pt_demographics` **for validation and test data**, i.e. no `pt_codes` which only come from training

In [None]:
for df in valid_pts_cleaned[:2]:
    display(df.head(3))

Unnamed: 0_level_0,birthdate
patient,Unnamed: 1_level_1
372a8506-f31a-45fa-b563-c50305d983c3,2013-05-15
b16ca449-bc14-4619-a936-07f6f3db7119,2013-05-03
29265f8e-2df1-46cd-8689-e9cbbced50f4,2007-10-05


Unnamed: 0_level_0,birthdate,marital,race,ethnicity,gender,birthplace,city,state,zip,age_now_days
patient,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1
372a8506-f31a-45fa-b563-c50305d983c3,2013-05-15,xxxnan,asian,nonhispanic,F,Haverhill Massachusetts US,Woburn,Massachusetts,1890,2878
b16ca449-bc14-4619-a936-07f6f3db7119,2013-05-03,xxxnan,black,nonhispanic,M,Athol Massachusetts US,Chelsea,Massachusetts,2149,2890
29265f8e-2df1-46cd-8689-e9cbbced50f4,2007-10-05,xxxnan,white,nonhispanic,M,Boston Massachusetts US,Plymouth,Massachusetts,0,4927


**The case for keeping a record of the data generation date**

Also note the difference in `age_now` if it were calculated based on default (`pd.Timestamp.today()`) vs `SYNTHEA_DATAGEN_DATES['1K']` which is the data generation date for this 1K dataset.  

In [None]:
# age_now -- today vs run_date
(pd.to_datetime(pd.Timestamp.today()) - train_pts_cleaned[0].iloc[2])[0].days, (pd.to_datetime(SYNTHEA_DATAGEN_DATES['1K']) - train_pts_cleaned[0].iloc[2])[0].days

(15444, 14915)

In [None]:
SYNTHEA_DATAGEN_DATES['1K'], pd.Timestamp.today()

('04-01-2021', Timestamp('2022-09-12 15:44:11.182717'))

In [None]:
#export
@ray.remote(num_returns=2)
def cleanup_obs(obs, is_train):
    '''Clean observations df'''
    
    obs.rename(str.lower, axis='columns', inplace=True)
    obs.units.fillna('xxxnan', inplace=True)
    obs.dropna(subset=['value'], inplace=True)
    
    obs.rename(columns={"code":"orig_code", "description":"desc"}, inplace=True)
    obs['code'] = obs['orig_code'].str.cat(obs[['value', 'units', 'type']].astype(str), sep='||')

    if is_train: obs_codes = obs.loc[:, ['orig_code', 'desc', 'value', 'units', 'type']]
    
    obs = obs.loc[:, ['patient', 'date', 'code']]
    obs = obs.astype({'date':'datetime64'})
    obs.set_index('patient', inplace=True)
    
    return [obs, obs_codes] if is_train else [obs, None]

- Drops rows with null in the `VALUE` column
- Creates a new `code` column with a concatenation of `code`, `value`, `units` and `type`
    - so that we can use the following logic during vocab creation for observations (further detailed in the vocab documentation)

**For `numeric`**
```
for 'numeric'
    get unique 'codes'
    for each unique code
        get unique 'units'
            for each unique unit
                bucketize 'values'
                create vocab entry for each 'bucket' -- code||value_bucket||units
```
**For `text`**
```
for 'text'
    get unique 'codes'
    for each unique code
        get unique 'units' #this will be null
            for each unique unit
                get unique 'values'
                create vocab entry for each -- code||value||units
```

'observations' df before cleanup ..

In [None]:
observations.head()

Unnamed: 0,DATE,PATIENT,ENCOUNTER,CODE,DESCRIPTION,VALUE,UNITS,TYPE
0,2012-01-23T17:45:28Z,034e9e3b-2def-4559-bb2a-7850888ae060,e88bc3a9-007c-405e-aabc-792a38f4aa2b,8302-2,Body Height,193.3,cm,numeric
1,2012-01-23T17:45:28Z,034e9e3b-2def-4559-bb2a-7850888ae060,e88bc3a9-007c-405e-aabc-792a38f4aa2b,72514-3,Pain severity - 0-10 verbal numeric rating [Sc...,2.0,{score},numeric
2,2012-01-23T17:45:28Z,034e9e3b-2def-4559-bb2a-7850888ae060,e88bc3a9-007c-405e-aabc-792a38f4aa2b,29463-7,Body Weight,87.8,kg,numeric
3,2012-01-23T17:45:28Z,034e9e3b-2def-4559-bb2a-7850888ae060,e88bc3a9-007c-405e-aabc-792a38f4aa2b,39156-5,Body Mass Index,23.5,kg/m2,numeric
4,2012-01-23T17:45:28Z,034e9e3b-2def-4559-bb2a-7850888ae060,e88bc3a9-007c-405e-aabc-792a38f4aa2b,8462-4,Diastolic Blood Pressure,82.0,mm[Hg],numeric


In [None]:
train_obs_cleaned = ray.get(cleanup_obs.remote(train_dfs[1], is_train=True))

after cleanup..

In [None]:
for df in train_obs_cleaned:
    display(df.head())

Unnamed: 0_level_0,date,code
patient,Unnamed: 1_level_1,Unnamed: 2_level_1
f5dcd418-09fe-4a2f-baa0-3da800bd8c3a,2010-11-20 03:04:34,8302-2||169.6||cm||numeric
f5dcd418-09fe-4a2f-baa0-3da800bd8c3a,2010-11-20 03:04:34,72514-3||4.0||{score}||numeric
f5dcd418-09fe-4a2f-baa0-3da800bd8c3a,2010-11-20 03:04:34,29463-7||63.8||kg||numeric
f5dcd418-09fe-4a2f-baa0-3da800bd8c3a,2010-11-20 03:04:34,39156-5||22.2||kg/m2||numeric
f5dcd418-09fe-4a2f-baa0-3da800bd8c3a,2010-11-20 03:04:34,59576-9||81.9||%||numeric


Unnamed: 0,orig_code,desc,value,units,type
0,8302-2,Body Height,169.6,cm,numeric
1,72514-3,Pain severity - 0-10 verbal numeric rating [Sc...,4.0,{score},numeric
2,29463-7,Body Weight,63.8,kg,numeric
3,39156-5,Body Mass Index,22.2,kg/m2,numeric
4,59576-9,Body mass index (BMI) [Percentile] Per age and...,81.9,%,numeric


In [None]:
test_obs_cleaned = ray.get(cleanup_obs.remote(test_dfs[1], is_train=False))

after cleanup..

In [None]:
for df in test_obs_cleaned[:1]:
    display(df.head())

Unnamed: 0_level_0,date,code
patient,Unnamed: 1_level_1,Unnamed: 2_level_1
034e9e3b-2def-4559-bb2a-7850888ae060,2012-01-23 17:45:28,8302-2||193.3||cm||numeric
034e9e3b-2def-4559-bb2a-7850888ae060,2012-01-23 17:45:28,72514-3||2.0||{score}||numeric
034e9e3b-2def-4559-bb2a-7850888ae060,2012-01-23 17:45:28,29463-7||87.8||kg||numeric
034e9e3b-2def-4559-bb2a-7850888ae060,2012-01-23 17:45:28,39156-5||23.5||kg/m2||numeric
034e9e3b-2def-4559-bb2a-7850888ae060,2012-01-23 17:45:28,8462-4||82.0||mm[Hg]||numeric


In [None]:
#export
@ray.remote(num_returns=2)
def cleanup_algs(allergies, is_train):
    '''Clean allergies df'''
    
    allergies.rename(str.lower, axis='columns', inplace=True)
    allergies.drop(columns=['encounter'], inplace=True)
    
    stops = pd.DataFrame(allergies.loc[allergies['stop'].notnull(),:])
    allergies['code'] = allergies['code'].apply(lambda x: f'{str(x)}||START')
    stops['code'] = stops['code'].apply(lambda x: f'{str(x)}||STOP')
    allergies.drop(columns=['stop'], inplace=True)
    stops.drop(columns=['start'], inplace=True)
    allergies.rename(columns={"start":"date", "description":"desc"}, inplace=True)
    stops.rename(columns={"stop":"date", "description":"desc"}, inplace=True)
    allergies = allergies.append(stops, ignore_index=True)
    
    if is_train: alg_codes = allergies.loc[:, ['code', 'desc']]
        
    allergies.drop(columns=['desc'], inplace=True)
    allergies = allergies.astype({'date':'datetime64'})
    allergies.set_index('patient', inplace=True)
    return [allergies, alg_codes] if is_train else [allergies, None]

`allergies` have a start and stop date in the same row indicating when an allergy (indicated by its code) started and stopped (or not) for a patient. <br>
So in the cleanup, we flatten that out, meaning create new rows for stop dates. <br>
The dataframe looks as follows before cleanup..

In [None]:
allergies.head()

Unnamed: 0,START,STOP,PATIENT,ENCOUNTER,CODE,DESCRIPTION
0,1982-10-25,,76982e06-f8b8-4509-9ca3-65a99c8650fe,b896bf40-8b72-42b7-b205-142ee3a56b55,300916003,Latex allergy
1,1982-10-25,,76982e06-f8b8-4509-9ca3-65a99c8650fe,b896bf40-8b72-42b7-b205-142ee3a56b55,300913006,Shellfish allergy
2,2002-01-25,,71ba0469-f0cc-4177-ac70-ea07cb01c8b8,7be1a590-4239-4826-9872-031327f3c368,419474003,Allergy to mould
3,2002-01-25,,71ba0469-f0cc-4177-ac70-ea07cb01c8b8,7be1a590-4239-4826-9872-031327f3c368,232347008,Dander (animal) allergy
4,2002-01-25,,71ba0469-f0cc-4177-ac70-ea07cb01c8b8,7be1a590-4239-4826-9872-031327f3c368,418689008,Allergy to grass pollen


In [None]:
train_alg_cleaned = ray.get(cleanup_algs.remote(train_dfs[2], is_train=True))

Resulting in the following output after cleanup.. 

In [None]:
for df in train_alg_cleaned:
    display(df.head(3))
    display(df.tail(3))

Unnamed: 0_level_0,date,code
patient,Unnamed: 1_level_1,Unnamed: 2_level_1
76982e06-f8b8-4509-9ca3-65a99c8650fe,1982-10-25,300916003||START
76982e06-f8b8-4509-9ca3-65a99c8650fe,1982-10-25,300913006||START
71ba0469-f0cc-4177-ac70-ea07cb01c8b8,2002-01-25,419474003||START


Unnamed: 0_level_0,date,code
patient,Unnamed: 1_level_1,Unnamed: 2_level_1
96942a16-75bc-4026-bd63-e985b0ca1d6d,2016-09-18,418689008||STOP
96942a16-75bc-4026-bd63-e985b0ca1d6d,2016-09-18,419263009||STOP
e6ff4bf9-09c2-4976-aa84-cca142207cf8,2016-06-25,300916003||STOP


Unnamed: 0,code,desc
0,300916003||START,Latex allergy
1,300913006||START,Shellfish allergy
2,419474003||START,Allergy to mould


Unnamed: 0,code,desc
368,418689008||STOP,Allergy to grass pollen
369,419263009||STOP,Allergy to tree pollen
370,300916003||STOP,Latex allergy


In [None]:
#export
@ray.remote(num_returns=2)
def cleanup_crpls(careplans, is_train):
    '''Clean careplans df'''
    
    careplans.rename(str.lower, axis='columns', inplace=True)
    careplans = careplans.loc[:, ['start', 'stop', 'patient', 'code', 'description']]
    
    stops = pd.DataFrame(careplans.loc[careplans['stop'].notnull(),:])
    careplans['code'] = careplans['code'].apply(lambda x: f'{str(x)}||START')
    stops['code'] = stops['code'].apply(lambda x: f'{str(x)}||STOP')
    careplans.drop(columns=['stop'], inplace=True)
    stops.drop(columns=['start'], inplace=True)
    careplans.rename(columns={"start":"date", "description":"desc"}, inplace=True)
    stops.rename(columns={"stop":"date", "description":"desc"}, inplace=True)
    careplans = careplans.append(stops, ignore_index=True)
    
    if is_train: crpl_codes = careplans.loc[:, ['code', 'desc']]

    careplans.drop(columns=['desc'], inplace=True)
    careplans = careplans.astype({'date':'datetime64'})
    careplans.set_index('patient', inplace=True)
    return [careplans, crpl_codes] if is_train else [careplans, None]

In [None]:
careplans.head()

Unnamed: 0,Id,START,STOP,PATIENT,ENCOUNTER,CODE,DESCRIPTION,REASONCODE,REASONDESCRIPTION
0,d2500b8c-e830-433a-8b9d-368d30741520,2010-01-23,2012-01-23,034e9e3b-2def-4559-bb2a-7850888ae060,d0c40d10-8d87-447e-836e-99d26ad52ea5,53950000,Respiratory therapy,10509002.0,Acute bronchitis (disorder)
1,07d9ddd8-dfa1-4e43-9bfe-39f63f4ace15,2011-05-13,2011-08-02,10339b10-3cd1-4ac3-ac13-ec26728cb592,e1ab4933-07a1-49f0-b4bd-05500919061d,53950000,Respiratory therapy,10509002.0,Acute bronchitis (disorder)
2,a3bb6e99-3b99-44b3-974c-e230b4511b5c,2011-12-31,2012-11-30,f5dcd418-09fe-4a2f-baa0-3da800bd8c3a,16300c56-a035-4126-a656-68c093da6dfc,53950000,Respiratory therapy,10509002.0,Acute bronchitis (disorder)
3,9f5284b7-425a-486a-b36e-ab818c018f2f,2016-12-29,2017-01-05,034e9e3b-2def-4559-bb2a-7850888ae060,3b639086-5fbc-4720-8c31-e8c8c0f1d660,53950000,Respiratory therapy,10509002.0,Acute bronchitis (disorder)
4,47ede16c-c216-4f81-a16b-0e858de9cdc3,2017-01-22,2017-02-12,10339b10-3cd1-4ac3-ac13-ec26728cb592,4ec8d55b-05fc-42a5-bfa3-1e233874a362,225358003,Wound care,284551006.0,Laceration of foot


In [None]:
train_crpl_cleaned = ray.get(cleanup_crpls.remote(careplans, is_train=True))

In [None]:
for df in train_crpl_cleaned:
    display(df.head(3))
    display(df.tail(3))

Unnamed: 0_level_0,date,code
patient,Unnamed: 1_level_1,Unnamed: 2_level_1
034e9e3b-2def-4559-bb2a-7850888ae060,2010-01-23,53950000||START
10339b10-3cd1-4ac3-ac13-ec26728cb592,2011-05-13,53950000||START
f5dcd418-09fe-4a2f-baa0-3da800bd8c3a,2011-12-31,53950000||START


Unnamed: 0_level_0,date,code
patient,Unnamed: 1_level_1,Unnamed: 2_level_1
6d048a56-edb8-4f29-891d-7a84d75a8e78,2002-11-30,53950000||STOP
fca3178e-fb68-41c3-8598-702d3ca68b96,1983-09-29,91251008||STOP
fca3178e-fb68-41c3-8598-702d3ca68b96,1984-11-22,385691007||STOP


Unnamed: 0,code,desc
0,53950000||START,Respiratory therapy
1,53950000||START,Respiratory therapy
2,53950000||START,Respiratory therapy


Unnamed: 0,code,desc
5431,53950000||STOP,Respiratory therapy
5432,91251008||STOP,Physical therapy procedure
5433,385691007||STOP,Fracture care


In [None]:
#export
@ray.remote(num_returns=2)
def cleanup_meds(medications, is_train):
    '''Clean `medications` df'''
    
    medications.rename(str.lower, axis='columns', inplace=True)
    medications = medications.loc[:, ['start', 'stop', 'patient', 'code', 'description']]
    
    stops = pd.DataFrame(medications.loc[medications['stop'].notnull(),:])
    medications['code'] = medications['code'].apply(lambda x: f'{str(x)}||START')
    stops['code'] = stops['code'].apply(lambda x: f'{str(x)}||STOP')
    medications.drop(columns=['stop'], inplace=True)
    stops.drop(columns=['start'], inplace=True)
    medications.rename(columns={"start":"date", "description":"desc"}, inplace=True)
    stops.rename(columns={"stop":"date", "description":"desc"}, inplace=True)
    medications = medications.append(stops, ignore_index=True)
    
    if is_train: med_codes = medications.loc[:, ['code', 'desc']]

    medications.drop(columns=['desc'], inplace=True)
    medications = medications.astype({'date':'datetime64'})
    medications.set_index('patient', inplace=True)
    return [medications, med_codes] if is_train else [medications, None]

In [None]:
medications.head()

Unnamed: 0,START,STOP,PATIENT,PAYER,ENCOUNTER,CODE,DESCRIPTION,BASE_COST,PAYER_COVERAGE,DISPENSES,TOTALCOST,REASONCODE,REASONDESCRIPTION
0,2010-05-05T00:26:23Z,2011-04-30T00:26:23Z,8d4c4326-e9de-4f45-9a4c-f8c36bff89ae,b1c428d6-4f07-31e0-90f0-68ffa6ff8c76,1e0d6b0e-1711-4a25-99f9-b1c700c9b260,389221,Etonogestrel 68 MG Drug Implant,677.08,0.0,12,8124.96,,
1,2011-04-30T00:26:23Z,2012-04-24T00:26:23Z,8d4c4326-e9de-4f45-9a4c-f8c36bff89ae,b1c428d6-4f07-31e0-90f0-68ffa6ff8c76,6aa37300-d1b4-48e7-a2f8-5e0f70f48f38,389221,Etonogestrel 68 MG Drug Implant,624.09,0.0,12,7489.08,,
2,2012-04-24T00:26:23Z,2013-04-19T00:26:23Z,8d4c4326-e9de-4f45-9a4c-f8c36bff89ae,b1c428d6-4f07-31e0-90f0-68ffa6ff8c76,7253a9f9-6f6d-429a-926a-7b1d424eae3f,748856,Yaz 28 Day Pack,43.32,0.0,12,519.84,,
3,2011-05-13T12:58:08Z,2011-05-27T12:58:08Z,10339b10-3cd1-4ac3-ac13-ec26728cb592,d47b3510-2895-3b70-9897-342d681c769d,e1ab4933-07a1-49f0-b4bd-05500919061d,313782,Acetaminophen 325 MG Oral Tablet,8.14,0.0,1,8.14,10509002.0,Acute bronchitis (disorder)
4,2011-12-08T15:02:18Z,2011-12-22T15:02:18Z,1d604da9-9a81-4ba9-80c2-de3375d59b40,b1c428d6-4f07-31e0-90f0-68ffa6ff8c76,792fae81-a007-44b0-8221-46953737b089,562251,Amoxicillin 250 MG / Clavulanate 125 MG Oral T...,11.91,0.0,1,11.91,444814009.0,Viral sinusitis (disorder)


In [None]:
train_med_cleaned = ray.get(cleanup_meds.remote(medications, is_train=True))

In [None]:
for df in train_med_cleaned:
    display(df.head(3))
    display(df.tail(3))

Unnamed: 0_level_0,date,code
patient,Unnamed: 1_level_1,Unnamed: 2_level_1
8d4c4326-e9de-4f45-9a4c-f8c36bff89ae,2010-05-05 00:26:23,389221||START
8d4c4326-e9de-4f45-9a4c-f8c36bff89ae,2011-04-30 00:26:23,389221||START
8d4c4326-e9de-4f45-9a4c-f8c36bff89ae,2012-04-24 00:26:23,748856||START


Unnamed: 0_level_0,date,code
patient,Unnamed: 1_level_1,Unnamed: 2_level_1
6d048a56-edb8-4f29-891d-7a84d75a8e78,2005-12-31 17:27:52,2123111||STOP
fca3178e-fb68-41c3-8598-702d3ca68b96,1983-09-29 17:27:52,243670||STOP
fca3178e-fb68-41c3-8598-702d3ca68b96,1984-11-22 17:27:52,313782||STOP


Unnamed: 0,code,desc
0,389221||START,Etonogestrel 68 MG Drug Implant
1,389221||START,Etonogestrel 68 MG Drug Implant
2,748856||START,Yaz 28 Day Pack


Unnamed: 0,code,desc
84080,2123111||STOP,NDA020503 200 ACTUAT Albuterol 0.09 MG/ACTUAT ...
84081,243670||STOP,Aspirin 81 MG Oral Tablet
84082,313782||STOP,Acetaminophen 325 MG Oral Tablet


In [None]:
#export
@ray.remote(num_returns=2)
def cleanup_img(imaging_studies, is_train):
    '''Clean `imaging` df'''
    
    imaging_studies.rename(str.lower, axis='columns', inplace=True)
    imaging_studies.rename(columns={"bodysite_code":"code", "bodysite_description":"desc"}, inplace=True)
    if is_train: img_codes = imaging_studies.loc[:, ['code', 'desc']]
        
    imaging_studies = imaging_studies.loc[:, ['patient', 'date', 'code']]
    imaging_studies = imaging_studies.astype({'date':'datetime64'})
    imaging_studies.set_index('patient', inplace=True)
    return [imaging_studies, img_codes] if is_train else [imaging_studies, None]

In [None]:
imaging_studies.head()

Unnamed: 0,Id,DATE,PATIENT,ENCOUNTER,BODYSITE_CODE,BODYSITE_DESCRIPTION,MODALITY_CODE,MODALITY_DESCRIPTION,SOP_CODE,SOP_DESCRIPTION
0,d3e49b38-7634-4416-879d-7bc68bf3e7df,2014-07-08T15:35:36Z,b58731cc-2d8b-4c2d-b327-4cab771af3ef,3a36836d-da25-4e73-808b-972b669b7e4e,40983000,Arm,DX,Digital Radiography,1.2.840.10008.5.1.4.1.1.1.1,Digital X-Ray Image Storage
1,46baf530-4941-40ab-8219-685a08fd9086,2014-01-22T18:58:37Z,2ffe9369-24e4-414b-8973-258fad09313a,33b71e4b-0690-4fe9-897a-dc3b2ff9215c,40983000,Arm,DX,Digital Radiography,1.2.840.10008.5.1.4.1.1.1.1,Digital X-Ray Image Storage
2,b8fb8a6e-a2f5-46c9-8b3f-a35aa982efcd,2001-12-01T02:08:27Z,86b97fc7-ae8f-4e0d-8e66-db68f36e7a76,e42d1046-568d-46c2-b0a5-d910b2f3bd1d,8205005,Wrist,DX,Digital Radiography,1.2.840.10008.5.1.4.1.1.1.1,Digital X-Ray Image Storage
3,10c8a016-4504-4653-bddf-2dd3610886c8,2004-07-03T20:46:46Z,71ba0469-f0cc-4177-ac70-ea07cb01c8b8,323fca87-817f-4d58-8486-ba92ea739399,51299004,Clavicle,DX,Digital Radiography,1.2.840.10008.5.1.4.1.1.1.1,Digital X-Ray Image Storage
4,4221534c-d379-4c6b-a22e-d7eae3fa2609,2017-02-08T08:42:44Z,d49f748f-928d-40e8-92c8-73e4c5679711,cfef48b3-b769-4794-a3e7-f57f7ba8d387,344001,Ankle,DX,Digital Radiography,1.2.840.10008.5.1.4.1.1.1.1,Digital X-Ray Image Storage


In [None]:
train_img_cleaned = ray.get(cleanup_img.remote(imaging_studies, is_train=True))

In [None]:
for df in train_img_cleaned:
    display(df.head(3))

Unnamed: 0_level_0,date,code
patient,Unnamed: 1_level_1,Unnamed: 2_level_1
b58731cc-2d8b-4c2d-b327-4cab771af3ef,2014-07-08 15:35:36,40983000
2ffe9369-24e4-414b-8973-258fad09313a,2014-01-22 18:58:37,40983000
86b97fc7-ae8f-4e0d-8e66-db68f36e7a76,2001-12-01 02:08:27,8205005


Unnamed: 0,code,desc
0,40983000,Arm
1,40983000,Arm
2,8205005,Wrist


In [None]:
#export
@ray.remote(num_returns=2)
def cleanup_procs(procedures, is_train):
    '''Clean `procedures` df'''
    
    procedures.rename(str.lower, axis='columns', inplace=True)
    procedures.rename(columns={"description":"desc"}, inplace=True)
    if is_train: proc_codes = procedures.loc[:, ['code', 'desc']]
    
    procedures = procedures.loc[:, ['patient', 'date', 'code']]
    procedures = procedures.astype({'date':'datetime64'})
    procedures.set_index('patient', inplace=True)
    return [procedures, proc_codes] if is_train else [procedures, None]

In [None]:
procedures.head()

Unnamed: 0,DATE,PATIENT,ENCOUNTER,CODE,DESCRIPTION,BASE_COST,REASONCODE,REASONDESCRIPTION
0,2011-04-30T00:26:23Z,8d4c4326-e9de-4f45-9a4c-f8c36bff89ae,6aa37300-d1b4-48e7-a2f8-5e0f70f48f38,169553002,Insertion of subcutaneous contraceptive,14896.56,,
1,2010-07-27T12:58:08Z,10339b10-3cd1-4ac3-ac13-ec26728cb592,dae2b7cb-1316-4b78-954f-fa610a6c6d0e,430193006,Medication Reconciliation (procedure),726.51,,
2,2010-11-20T03:04:34Z,f5dcd418-09fe-4a2f-baa0-3da800bd8c3a,7ff86631-0378-4bfc-92ce-1edd697eb18e,430193006,Medication Reconciliation (procedure),788.5,,
3,2011-02-07T03:04:34Z,f5dcd418-09fe-4a2f-baa0-3da800bd8c3a,b8f76eba-7795-4dcd-a544-f27ac2ef3d46,117015009,Throat culture (procedure),2070.44,195662009.0,Acute viral pharyngitis (disorder)
4,2011-04-19T03:04:34Z,f5dcd418-09fe-4a2f-baa0-3da800bd8c3a,640837d9-845a-433c-9fad-47426664a69d,117015009,Throat culture (procedure),2479.39,195662009.0,Acute viral pharyngitis (disorder)


In [None]:
train_proc_cleaned = ray.get(cleanup_procs.remote(procedures, is_train=True))

In [None]:
for df in train_proc_cleaned:
    display(df.head(3))

Unnamed: 0_level_0,date,code
patient,Unnamed: 1_level_1,Unnamed: 2_level_1
8d4c4326-e9de-4f45-9a4c-f8c36bff89ae,2011-04-30 00:26:23,169553002
10339b10-3cd1-4ac3-ac13-ec26728cb592,2010-07-27 12:58:08,430193006
f5dcd418-09fe-4a2f-baa0-3da800bd8c3a,2010-11-20 03:04:34,430193006


Unnamed: 0,code,desc
0,169553002,Insertion of subcutaneous contraceptive
1,430193006,Medication Reconciliation (procedure)
2,430193006,Medication Reconciliation (procedure)


In [None]:
#export
@ray.remote(num_returns=2)
def cleanup_cnds(conditions, is_train):
    '''Clean `conditions` df'''
    
    conditions.rename(str.lower, axis='columns', inplace=True)
    conditions.drop(columns=['encounter'], inplace=True)
    stops = pd.DataFrame(conditions.loc[conditions['stop'].notnull(),:])
    conditions['code'] = conditions['code'].apply(lambda x: f'{str(x)}||START')
    stops['code'] = stops['code'].apply(lambda x: f'{str(x)}||STOP')
    conditions.drop(columns=['stop'], inplace=True)
    stops.drop(columns=['start'], inplace=True)
    conditions.rename(columns={"start":"date", "description":"desc"}, inplace=True)
    stops.rename(columns={"stop":"date","description":"desc"}, inplace=True)
    conditions = conditions.append(stops, ignore_index=True)
        
    if is_train: cnd_codes = conditions.loc[:, ['code', 'desc']]
        
    conditions.drop(columns=['desc'], inplace=True)
    conditions = conditions.astype({'date':'datetime64'})
    conditions.set_index('patient', inplace=True)
    return [conditions, cnd_codes] if is_train else [conditions, None]

In [None]:
conditions.head()

Unnamed: 0,START,STOP,PATIENT,ENCOUNTER,CODE,DESCRIPTION
0,2001-05-01,,1d604da9-9a81-4ba9-80c2-de3375d59b40,8f104aa7-4ca9-4473-885a-bba2437df588,40055000,Chronic sinusitis (disorder)
1,2011-08-09,2011-08-16,8d4c4326-e9de-4f45-9a4c-f8c36bff89ae,9d35ec9f-352a-4629-92ef-38eae38437e7,444814009,Viral sinusitis (disorder)
2,2011-11-16,2011-11-26,8d4c4326-e9de-4f45-9a4c-f8c36bff89ae,ae7555a9-eaff-4c09-98a7-21bc6ed1b1fd,195662009,Acute viral pharyngitis (disorder)
3,2011-05-13,2011-05-27,10339b10-3cd1-4ac3-ac13-ec26728cb592,e1ab4933-07a1-49f0-b4bd-05500919061d,10509002,Acute bronchitis (disorder)
4,2011-02-06,2011-02-14,f5dcd418-09fe-4a2f-baa0-3da800bd8c3a,b8f76eba-7795-4dcd-a544-f27ac2ef3d46,195662009,Acute viral pharyngitis (disorder)


In [None]:
train_cnd_cleaned = ray.get(cleanup_cnds.remote(conditions, is_train=True))

In [None]:
for df in train_cnd_cleaned:
    display(df.head(3))
    display(df.tail(3))

Unnamed: 0_level_0,date,code
patient,Unnamed: 1_level_1,Unnamed: 2_level_1
1d604da9-9a81-4ba9-80c2-de3375d59b40,2001-05-01,40055000||START
8d4c4326-e9de-4f45-9a4c-f8c36bff89ae,2011-08-09,444814009||START
8d4c4326-e9de-4f45-9a4c-f8c36bff89ae,2011-11-16,195662009||START


Unnamed: 0_level_0,date,code
patient,Unnamed: 1_level_1,Unnamed: 2_level_1
fca3178e-fb68-41c3-8598-702d3ca68b96,1986-03-02,43878008||STOP
fc817953-cc8b-45db-9c85-7c0ced8fa90d,2010-11-25,444814009||STOP
fc817953-cc8b-45db-9c85-7c0ced8fa90d,2012-05-14,444814009||STOP


Unnamed: 0,code,desc
0,40055000||START,Chronic sinusitis (disorder)
1,444814009||START,Viral sinusitis (disorder)
2,195662009||START,Acute viral pharyngitis (disorder)


Unnamed: 0,code,desc
12938,43878008||STOP,Streptococcal sore throat (disorder)
12939,444814009||STOP,Viral sinusitis (disorder)
12940,444814009||STOP,Viral sinusitis (disorder)


In [None]:
#export
@ray.remote(num_returns=2)
def cleanup_immns(immunizations, is_train):
    '''Clean `immunizations` df'''
    
    immunizations.rename(str.lower, axis='columns', inplace=True)
    immunizations.rename(columns={"description":"desc"}, inplace=True)
    if is_train: imm_codes = immunizations.loc[:, ['code', 'desc']]
        
    immunizations = immunizations.loc[:, ['patient', 'date', 'code']]
    immunizations = immunizations.astype({'date':'datetime64'})
    immunizations.set_index('patient', inplace=True)
    return [immunizations, imm_codes] if is_train else [immunizations, None]

In [None]:
immunizations.head()

Unnamed: 0,DATE,PATIENT,ENCOUNTER,CODE,DESCRIPTION,BASE_COST
0,2010-07-27T12:58:08Z,10339b10-3cd1-4ac3-ac13-ec26728cb592,dae2b7cb-1316-4b78-954f-fa610a6c6d0e,140,Influenza seasonal injectable preservative ...,140.52
1,2010-11-20T03:04:34Z,f5dcd418-09fe-4a2f-baa0-3da800bd8c3a,7ff86631-0378-4bfc-92ce-1edd697eb18e,140,Influenza seasonal injectable preservative ...,140.52
2,2012-01-23T17:45:28Z,034e9e3b-2def-4559-bb2a-7850888ae060,e88bc3a9-007c-405e-aabc-792a38f4aa2b,140,Influenza seasonal injectable preservative ...,140.52
3,2011-11-26T03:04:34Z,f5dcd418-09fe-4a2f-baa0-3da800bd8c3a,1923c698-accd-4d70-ba09-e1938f6e96d1,140,Influenza seasonal injectable preservative ...,140.52
4,2011-07-28T15:02:18Z,1d604da9-9a81-4ba9-80c2-de3375d59b40,b85c339a-6076-43ed-b9d0-9cf013dec49d,140,Influenza seasonal injectable preservative ...,140.52


In [None]:
train_imm_cleaned = ray.get(cleanup_immns.remote(immunizations, is_train=True))

In [None]:
for df in train_imm_cleaned:
    display(df.head(3))

Unnamed: 0_level_0,date,code
patient,Unnamed: 1_level_1,Unnamed: 2_level_1
10339b10-3cd1-4ac3-ac13-ec26728cb592,2010-07-27 12:58:08,140
f5dcd418-09fe-4a2f-baa0-3da800bd8c3a,2010-11-20 03:04:34,140
034e9e3b-2def-4559-bb2a-7850888ae060,2012-01-23 17:45:28,140


Unnamed: 0,code,desc
0,140,Influenza seasonal injectable preservative ...
1,140,Influenza seasonal injectable preservative ...
2,140,Influenza seasonal injectable preservative ...


## Extract Labels (y)

The labels we intend to predict are conditions and must be in the `CONDITIONS` dict
- Adding them to the `patients` df
- And adding the patient's age when the particular condition was recorded

In [None]:
for key in CONDITIONS.keys():
    print(key,"::",f'{CONDITIONS[key]}||START')

diabetes :: 44054006||START
stroke :: 230690007||START
alzheimers :: 26929004||START
coronary_heart :: 53741008||START
lung_cancer :: 254637007||START
breast_cancer :: 254837009||START
rheumatoid_arthritis :: 69896004||START
epilepsy :: 84757009||START


In [None]:
#export
@ray.remote
def extract_ys(patients, conditions, cnd_dict):
    '''Extract labels from conditions df and add them to patients df with age.'''
    for key in cnd_dict.keys():
        patients = patients.merge(conditions[conditions.code==f'{cnd_dict[key]}||START'], how='left', left_index=True, right_index=True)
        patients[f'{key}'] = patients.code.notna()
        patients[f'{key}_age'] = ((patients.date - patients.birthdate)//np.timedelta64(1,'Y'))
        patients = patients.drop(columns=['date','code'])
    return patients

In [None]:
tmp_pts = ray.get(extract_ys.remote(train_pts_cleaned[0], train_cnd_cleaned[0], cnd_dict=CONDITIONS))

In [None]:
tmp_pts.count()

birthdate                   702
diabetes                    702
diabetes_age                 43
stroke                      702
stroke_age                   30
alzheimers                  702
alzheimers_age               12
coronary_heart              702
coronary_heart_age           39
lung_cancer                 702
lung_cancer_age              12
breast_cancer               702
breast_cancer_age            11
rheumatoid_arthritis        702
rheumatoid_arthritis_age      2
epilepsy                    702
epilepsy_age                 15
dtype: int64

## Insert Age

Inserting patient's age in months and years into each record df
- this can be modified to records the patient's age in **days** or even **hours** that might be more relevant for datasets involving hospitalizations or ER admissions 

In [None]:
#export
@ray.remote
def insert_age(df, pts_df):
    '''Insert age in years and months into each of the rec dfs'''
    
    df = df.merge(pts_df, left_index=True, right_index=True)
    df['age']        = (df['date'] - df['birthdate'])//np.timedelta64(1,'Y')
    df['age_months'] = (df['date'] - df['birthdate'])//np.timedelta64(1,'M')
    return df.drop(columns=['birthdate'])

### Clean all

In [None]:
#export
@ray.remote(num_returns=2)
def clean_preprocess_dataset(path, is_train, conditions_dict, today=None):
    '''Cleans and preprocesses all dfs in a single split'''
    dfs = read_raw_ehrdata(path)
 
    pt_data   = cleanup_pts.remote(dfs[0],   is_train, today)
    obs_data  = cleanup_obs.remote(dfs[1],   is_train)
    alg_data  = cleanup_algs.remote(dfs[2],  is_train)
    crpl_data = cleanup_crpls.remote(dfs[3], is_train)
    med_data  = cleanup_meds.remote(dfs[4],  is_train)
    img_data  = cleanup_img.remote(dfs[5],   is_train)
    proc_data = cleanup_procs.remote(dfs[6], is_train)
    cnd_data  = cleanup_cnds.remote(dfs[7],  is_train)
    imm_data  = cleanup_immns.remote(dfs[8], is_train)   
    
    data_tables = [pt_data[0], pt_data[1], obs_data[0], alg_data[0], crpl_data[0], med_data[0], img_data[0], proc_data[0], cnd_data[0], imm_data[0]]
    
    patients, patient_demographics, conditions, rec_tables = data_tables[0], data_tables[1], data_tables[8], data_tables[2:]
    rec_dfs = [insert_age.remote(rec_df, patients) for rec_df in rec_tables]
    patients = extract_ys.remote(patients, conditions, conditions_dict)
    
    data_tables = [patients, patient_demographics]
    data_tables.extend(rec_dfs)
    
    if is_train:
        code_tables = [pt_data[2], obs_data[1], alg_data[1], crpl_data[1], med_data[1], img_data[1], proc_data[1], cnd_data[1], imm_data[1]]
    
    return (data_tables, code_tables) if is_train else (data_tables, None)

In [None]:
data_tables, _ = ray.get(clean_preprocess_dataset.remote(f'{PATH_1K}/raw_split/valid', is_train=False, conditions_dict=CONDITIONS))

patients, pt_demographics, observations, allergies, \
careplans, medications, imaging_studies, procedures, conditions, immunizations = ray.get(data_tables)

In [None]:
conditions.count()

code          2525
age           2525
age_months    2525
dtype: int64

In [None]:
data_tables, code_tables = ray.get(clean_preprocess_dataset.remote(f'{PATH_1K}/raw_split/train', is_train=True, conditions_dict=CONDITIONS))

patients, pt_demographics, observations, allergies, \
careplans, medications, imaging_studies, procedures, conditions, immunizations = ray.get(data_tables)

pt_codes, obs_codes, alg_codes, crpl_codes, med_codes, img_codes, proc_codes, cnd_codes, imm_codes = ray.get(code_tables)

In [None]:
conditions.count()

code          7666
age           7666
age_months    7666
dtype: int64

In [None]:
obs_codes.count()

orig_code    173312
desc         173312
value        173312
units        173312
type         173312
dtype: int64

## Do-All Functions
The actual functions that will be called from other modules

In [None]:
#export
@ray.remote
def persist_cleaned(path, split_name, cleaned_dfs, code_tables=None):
    '''Save cleaned EHR data to disk'''
    csv_names = FILENAMES.copy()
    csv_names.insert(1,'patient_demographics')
            
    cleaned_dir = Path(f'{path}/cleaned/{split_name}')
    cleaned_dir.mkdir(parents=True, exist_ok=True)
    
    cleaned_dfs = ray.get(cleaned_dfs)
    
    patients = cleaned_dfs[0]
    patients.reset_index(inplace=True)
    patients.to_csv(f'{cleaned_dir}/patients.csv', index_label='indx')

    for df, name in zip(cleaned_dfs[1:], csv_names[1:]):
        df.to_csv(f'{cleaned_dir}/{name}.csv')

    print(f'Saved cleaned "{split_name}" data to {cleaned_dir}')
        
    if split_name == 'train':
        codes_dir = Path(f'{cleaned_dir}/codes')
        codes_dir.mkdir(parents=True, exist_ok=True)
        
        code_tables = ray.get(code_tables)
        for code_df,name in zip(code_tables, FILENAMES):
            code_df.to_csv(f'{codes_dir}/code_{name}.csv', index_label='indx')
        print(f'Saved vocab code tables to {codes_dir}')
    return split_name

In [None]:
#export
def clean_raw_ehrdata(path, valid_pct, test_pct, conditions_dict, today=None):
    '''Split, clean, preprocess raw EHR data & save cleaned data to disk'''
    
    # split
    split_ehr_dataset(path, valid_pct, test_pct)
    
    # clean + preprocess
    all_splits = []
    for split in ['train', 'valid', 'test']:
        split_path = f'{path}/raw_split/{split}'
        
        if split == 'train': data_tables, code_tables = clean_preprocess_dataset.remote(split_path, True,  conditions_dict, today)
        else               : data_tables, _           = clean_preprocess_dataset.remote(split_path, False, conditions_dict, today)
            
        all_splits.append(data_tables)
    
    # persist
    remaining = []
    remaining.append(persist_cleaned.remote(path, 'train', all_splits[0], code_tables))
    remaining.append(persist_cleaned.remote(path, 'valid', all_splits[1]))
    remaining.append(persist_cleaned.remote(path, 'test',  all_splits[2]))
    
    while len(remaining) > 0:
        ready, remaining = ray.wait(remaining)
        for r in ready:
            split_completed = ray.get(r)
            print(f'Completed - {split_completed}')
    return

In [None]:
clean_raw_ehrdata(PATH_1K, 0.2, 0.2, CONDITIONS, SYNTHEA_DATAGEN_DATES['1K'])

Splits:: train: 0.6, valid: 0.2, test: 0.2
Split patients into:: Train: 702, Valid: 234, Test: 235 -- Total before split: 1171
Saved train data to /home/vinod/.lemonpie/datasets/synthea/1K/raw_split/train
Saved valid data to /home/vinod/.lemonpie/datasets/synthea/1K/raw_split/valid
Saved test data to /home/vinod/.lemonpie/datasets/synthea/1K/raw_split/test
Completed - valid
[2m[36m(pid=45737)[0m Saved cleaned "valid" data to /home/vinod/.lemonpie/datasets/synthea/1K/cleaned/valid
Completed - test
[2m[36m(pid=45735)[0m Saved cleaned "test" data to /home/vinod/.lemonpie/datasets/synthea/1K/cleaned/test
[2m[36m(pid=45728)[0m Saved cleaned "train" data to /home/vinod/.lemonpie/datasets/synthea/1K/cleaned/train
Completed - train


[2m[36m(pid=45728)[0m Saved vocab code tables to /home/vinod/.lemonpie/datasets/synthea/1K/cleaned/train/codes


In [None]:
#export
def load_cleaned_ehrdata(path):
    '''Load cleaned, age-filtered EHR data'''
    
    csv_names = FILENAMES.copy()
    csv_names.insert(1,'patient_demographics')
    
    train_dfs = [pd.read_csv(f'{path}/cleaned/train/{fname}.csv', low_memory=False, index_col=0) for fname in csv_names]
    valid_dfs = [pd.read_csv(f'{path}/cleaned/valid/{fname}.csv', low_memory=False, index_col=0) for fname in csv_names]
    test_dfs  = [pd.read_csv(f'{path}/cleaned/test/{fname}.csv', low_memory=False, index_col=0) for fname in csv_names]
                             
    return train_dfs, valid_dfs, test_dfs

In [None]:
#export
def load_ehr_vocabcodes(path):
    '''Load codes for vocabs'''
    
    code_dfs = [pd.read_csv(f'{path}/cleaned/train/codes/code_{fname}.csv', low_memory=False, na_filter=False, index_col=0) for fname in FILENAMES]
                             
    return code_dfs

In [None]:
train_dfs, valid_dfs, test_dfs = load_cleaned_ehrdata(PATH_1K)
code_dfs = load_ehr_vocabcodes(PATH_1K)

In [None]:
# for df in train_dfs:
#     display(df.head())

In [None]:
thispt = train_dfs[0].iloc[20]

In [None]:
thispt

patient                     10134dbf-72d1-4381-b8f3-9530cca6622a
birthdate                                             1958-09-08
diabetes                                                    True
diabetes_age                                                52.0
stroke                                                      True
stroke_age                                                  60.0
alzheimers                                                 False
alzheimers_age                                               NaN
coronary_heart                                             False
coronary_heart_age                                           NaN
lung_cancer                                                False
lung_cancer_age                                              NaN
breast_cancer                                              False
breast_cancer_age                                            NaN
rheumatoid_arthritis                                       False
rheumatoid_arthritis_age 

In [None]:
# for df in code_dfs:
#     display(df.head())

Making sure condition counts match - after extracting `y` for each patient

In [None]:
CONDITIONS

{'diabetes': '44054006',
 'stroke': '230690007',
 'alzheimers': '26929004',
 'coronary_heart': '53741008',
 'lung_cancer': '254637007',
 'breast_cancer': '254837009',
 'rheumatoid_arthritis': '69896004',
 'epilepsy': '84757009'}

`patients` dfs after cleaning, with `y` extracted

In [None]:
pts_train, pts_valid, pts_test = train_dfs[0], valid_dfs[0], test_dfs[0]

`conditions` dfs

In [None]:
cnd_train, cnd_valid, cnd_test = train_dfs[8], valid_dfs[8], test_dfs[8]

Tests to ensure counts match

In [None]:
# export

def test_extract_ys(pt_dfs, cnd_dfs, conditions_dict=CONDITIONS):
    """Test for extract_ys function."""
    for pts_df, cnds_df, split in zip(pt_dfs, cnd_dfs, ['train','valid','test']):
        print(f"Checking {split} dfs...")
        for this_cnd in conditions_dict.keys():
            code = f"{conditions_dict[this_cnd]}||START"
            cnds_df_counts = len(cnds_df[cnds_df['code'] == code])
            pts_df_counts = len(pts_df[pts_df[this_cnd] == 1])
            assert cnds_df_counts == pts_df_counts, f"Error in {split} for {this_cnd} -- {cnds_df_counts} != {pts_df_counts}"

        print(f"Tests passed for {split} - all condition counts match")
    return

In [None]:
test_extract_ys([pts_train, pts_valid, pts_test],[cnd_train, cnd_valid, cnd_test])

Checking train dfs...
Tests passed for train - all condition counts match
Checking valid dfs...
Tests passed for valid - all condition counts match
Checking test dfs...
Tests passed for test - all condition counts match


In [None]:
# export

def get_label_counts(pt_dfs, conditions_dict=CONDITIONS):
    """Get label counts in the given split of the dataset."""
    all_counts = []
    for pts_df, split in zip(pt_dfs, ['train','valid','test']):
        split_counts = {}
        for this_cnd in conditions_dict.keys():
            split_counts[this_cnd] = len(pts_df[pts_df[this_cnd] == 1])
        all_counts.append(split_counts)
    
    return all_counts

In [None]:
get_label_counts([pts_train, pts_valid, pts_test])

[{'diabetes': 43,
  'stroke': 30,
  'alzheimers': 12,
  'coronary_heart': 39,
  'lung_cancer': 12,
  'breast_cancer': 11,
  'rheumatoid_arthritis': 2,
  'epilepsy': 15},
 {'diabetes': 14,
  'stroke': 7,
  'alzheimers': 7,
  'coronary_heart': 11,
  'lung_cancer': 0,
  'breast_cancer': 8,
  'rheumatoid_arthritis': 0,
  'epilepsy': 5},
 {'diabetes': 19,
  'stroke': 11,
  'alzheimers': 6,
  'coronary_heart': 11,
  'lung_cancer': 2,
  'breast_cancer': 2,
  'rheumatoid_arthritis': 0,
  'epilepsy': 2}]

# Ray Testing

In [None]:
SYNTHEA_DATAGEN_DATES

{'1K': '04-01-2021',
 '10K': '04-01-2021',
 '20K': '04-01-2021',
 '100K': '04-01-2021',
 '250K': '04-01-2021'}

## Before

**1K**
```
CPU times: user 4.41 s, sys: 218 ms, total: 4.63 s
Wall time: 4.66 s
```
**10K**
```
CPU times: user 1min 23s, sys: 5.58 s, total: 1min 28s
Wall time: 1min 29s
```

## After

### 1K

In [None]:
%time clean_raw_ehrdata(PATH_1K, 0.2, 0.2, CONDITIONS, SYNTHEA_DATAGEN_DATES['1K'])

Splits:: train: 0.6, valid: 0.2, test: 0.2
Split patients into:: Train: 702, Valid: 234, Test: 235 -- Total before split: 1171
Saved train data to /home/vinod/.lemonpie/datasets/synthea/1K/raw_split/train
Saved valid data to /home/vinod/.lemonpie/datasets/synthea/1K/raw_split/valid
Saved test data to /home/vinod/.lemonpie/datasets/synthea/1K/raw_split/test
Completed - valid
[2m[36m(pid=16347)[0m Saved cleaned "valid" data to /home/vinod/.lemonpie/datasets/synthea/1K/cleaned/valid
Completed - test
[2m[36m(pid=16346)[0m Saved cleaned "test" data to /home/vinod/.lemonpie/datasets/synthea/1K/cleaned/test
[2m[36m(pid=16349)[0m Saved cleaned "train" data to /home/vinod/.lemonpie/datasets/synthea/1K/cleaned/train
Completed - train
CPU times: user 2.58 s, sys: 165 ms, total: 2.74 s
Wall time: 4.57 s


[2m[36m(pid=16349)[0m Saved vocab code tables to /home/vinod/.lemonpie/datasets/synthea/1K/cleaned/train/codes


### 10K

In [None]:
%time clean_raw_ehrdata(PATH_10K, 0.2, 0.2, CONDITIONS, SYNTHEA_DATAGEN_DATES['10K'])

Splits:: train: 0.6, valid: 0.2, test: 0.2
Split patients into:: Train: 6645, Valid: 2215, Test: 2216 -- Total before split: 11076
Saved train data to /home/vinod/.lemonpie/datasets/synthea/10K/raw_split/train
Saved valid data to /home/vinod/.lemonpie/datasets/synthea/10K/raw_split/valid
Saved test data to /home/vinod/.lemonpie/datasets/synthea/10K/raw_split/test
Completed - valid
[2m[36m(pid=16353)[0m Saved cleaned "valid" data to /home/vinod/.lemonpie/datasets/synthea/10K/cleaned/valid
Completed - test
[2m[36m(pid=16346)[0m Saved cleaned "test" data to /home/vinod/.lemonpie/datasets/synthea/10K/cleaned/test
[2m[36m(pid=16349)[0m Saved cleaned "train" data to /home/vinod/.lemonpie/datasets/synthea/10K/cleaned/train
[2m[36m(pid=16349)[0m Saved vocab code tables to /home/vinod/.lemonpie/datasets/synthea/10K/cleaned/train/codes
Completed - train
CPU times: user 49.1 s, sys: 4.39 s, total: 53.5 s
Wall time: 1min 31s


### 20K

In [None]:
%time clean_raw_ehrdata(PATH_20K, 0.2, 0.2, CONDITIONS, SYNTHEA_DATAGEN_DATES['20K'])

Splits:: train: 0.6, valid: 0.2, test: 0.2
Split patients into:: Train: 13317, Valid: 4439, Test: 4439 -- Total before split: 22195
Saved train data to /home/vinod/.lemonpie/datasets/synthea/20K/raw_split/train
Saved valid data to /home/vinod/.lemonpie/datasets/synthea/20K/raw_split/valid
Saved test data to /home/vinod/.lemonpie/datasets/synthea/20K/raw_split/test
Completed - valid
[2m[36m(pid=16339)[0m Saved cleaned "valid" data to /home/vinod/.lemonpie/datasets/synthea/20K/cleaned/valid
Completed - test
[2m[36m(pid=16346)[0m Saved cleaned "test" data to /home/vinod/.lemonpie/datasets/synthea/20K/cleaned/test
[2m[36m(pid=16349)[0m Saved cleaned "train" data to /home/vinod/.lemonpie/datasets/synthea/20K/cleaned/train
[2m[36m(pid=16349)[0m Saved vocab code tables to /home/vinod/.lemonpie/datasets/synthea/20K/cleaned/train/codes
Completed - train
CPU times: user 1min 42s, sys: 12.3 s, total: 1min 54s
Wall time: 3min 10s


### 100K

In [None]:
%time clean_raw_ehrdata(PATH_100K, 0.2, 0.2, CONDITIONS, SYNTHEA_DATAGEN_DATES['100K'])

Splits:: train: 0.6, valid: 0.2, test: 0.2
Split patients into:: Train: 66523, Valid: 22174, Test: 22175 -- Total before split: 110872
Saved train data to /home/vinod/.lemonpie/datasets/synthea/100K/raw_split/train
Saved valid data to /home/vinod/.lemonpie/datasets/synthea/100K/raw_split/valid
Saved test data to /home/vinod/.lemonpie/datasets/synthea/100K/raw_split/test
[2m[36m(pid=16350)[0m Saved cleaned "test" data to /home/vinod/.lemonpie/datasets/synthea/100K/cleaned/test
Completed - test
[2m[36m(pid=16353)[0m Saved cleaned "valid" data to /home/vinod/.lemonpie/datasets/synthea/100K/cleaned/valid
Completed - valid
[2m[36m(pid=16349)[0m Saved cleaned "train" data to /home/vinod/.lemonpie/datasets/synthea/100K/cleaned/train
[2m[36m(pid=16349)[0m Saved vocab code tables to /home/vinod/.lemonpie/datasets/synthea/100K/cleaned/train/codes
Completed - train
CPU times: user 10min 21s, sys: 3min 6s, total: 13min 28s
Wall time: 19min 46s


### Execution Times

**1K**
```
CPU times: user 2.41 s, sys: 93.8 ms, total: 2.5 s
Wall time: 4.28 s
```
**10K**
```
CPU times: user 46.6 s, sys: 3.89 s, total: 50.5 s
Wall time: 1min 27s
```
**20K**
```
CPU times: user 1min 35s, sys: 11 s, total: 1min 46s
Wall time: 2min 59s
```
**100K**
```
CPU times: user 10min 21s, sys: 3min 6s, total: 13min 28s
Wall time: 19min 46s
```

## Tests

In [None]:
# export

def test_cleaned_ehrdata(dataset_paths = [PATH_1K, PATH_10K, PATH_20K, PATH_100K]):
    """Loads and sanity checks cleaned data for all datasets."""

    for dataset_path in dataset_paths:
        train_dfs, valid_dfs, test_dfs = load_cleaned_ehrdata(dataset_path)
        code_dfs = load_ehr_vocabcodes(dataset_path)
        pts_train, pts_valid, pts_test = train_dfs[0], valid_dfs[0], test_dfs[0]
        cnd_train, cnd_valid, cnd_test = train_dfs[8], valid_dfs[8], test_dfs[8]
        print("\n----------")
        print(f"Running tests for {dataset_path}")
        test_extract_ys([pts_train, pts_valid, pts_test],[cnd_train, cnd_valid, cnd_test])
        print(f"Label counts")
        for label_counts in get_label_counts([pts_train, pts_valid, pts_test]):
            print(label_counts)

In [None]:
test_cleaned_ehrdata()


----------
Running tests for /home/vinod/.lemonpie/datasets/synthea/1K
Checking train dfs...
Tests passed for train - all condition counts match
Checking valid dfs...
Tests passed for valid - all condition counts match
Checking test dfs...
Tests passed for test - all condition counts match
Label counts
{'diabetes': 43, 'stroke': 30, 'alzheimers': 12, 'coronary_heart': 39, 'lung_cancer': 12, 'breast_cancer': 11, 'rheumatoid_arthritis': 2, 'epilepsy': 15}
{'diabetes': 14, 'stroke': 7, 'alzheimers': 7, 'coronary_heart': 11, 'lung_cancer': 0, 'breast_cancer': 8, 'rheumatoid_arthritis': 0, 'epilepsy': 5}
{'diabetes': 19, 'stroke': 11, 'alzheimers': 6, 'coronary_heart': 11, 'lung_cancer': 2, 'breast_cancer': 2, 'rheumatoid_arthritis': 0, 'epilepsy': 2}

----------
Running tests for /home/vinod/.lemonpie/datasets/synthea/10K
Checking train dfs...
Tests passed for train - all condition counts match
Checking valid dfs...
Tests passed for valid - all condition counts match
Checking test dfs...


## Export -

In [None]:
#hide
from nbdev.export import *
notebook2script()

Converted 00_basics.ipynb.
Converted 01_preprocessing_clean.ipynb.
Converted 02_preprocessing_vocab.ipynb.
Converted 03_preprocessing_transform.ipynb.
Converted 04_data.ipynb.
Converted 05_metrics.ipynb.
Converted 06_learn.ipynb.
Converted 07_models.ipynb.
Converted 08_experiment.ipynb.
Converted 998_coherent_clean.ipynb.
Converted 999_amp_testing.ipynb.
Converted 99_quick_walkthru.ipynb.
Converted 99_running_exps.ipynb.
Converted index.ipynb.
