In [0]:
import tarfile, gzip
import pandas as pd
import numpy as np
import os as os
import json, glob
from collections import Counter
from collections import defaultdict
from itertools import chain
from datetime import datetime
from dateutil.relativedelta import relativedelta

pd.set_option('display.max_rows', 30)

In [0]:
if (os.path.exists("synthea-data-chf.tar.gz") == False):
    !wget http://public.gi.ucsc.edu/~rcurrie/synthea-data-chf.tar.gz
if (os.path.exists("synthea-data-myocardial-infarction.tar.gz") == False):
    !wget http://public.gi.ucsc.edu/~rcurrie/synthea-data-myocardial-infarction.tar.gz

In [0]:
class CHFDataset:

    def __init__(self):
        self.dataset = None
        self.trainingset = None

    def set_dataset(self, path, sample_size=None):
        self.dataset = {}

        for json_patient in ExtractJsonFile(path, sample_size):
            id = json_patient['entry'][0]['resource']['id']
            self.dataset[id] = {}
            self.dataset[id]['codes'] = []
            self.dataset[id]['chf_first_discharge'] = None
            self.dataset[id]['chf_rehosp'] = None

            for bundle in ExtractCodes(json_patient):
                if bundle['resource_type'] == "Encounter":
                    if (bundle['encounter_code'] == "IMP" or bundle['encounter_code'] == "EMER") \
                    and bundle['reason_code'] == "Chronic congestive heart failure (disorder)":  
                        if self.dataset[id]['chf_first_discharge'] is None:
                            self.dataset[id]['chf_first_discharge'] = bundle['end_date']
                        elif self.dataset[id]['chf_rehosp'] is None and (pd.date_range(self.dataset[id]['chf_first_discharge'], bundle['start_date']).shape[0] > 29):
                            self.dataset[id]['chf_rehosp'] = bundle['start_date']
                            break

                self.dataset[id]['codes'].append([bundle['start_date'], bundle['reason_code']])

    def set_trainingset(self, label=None, filter=None, window_range=24, step_size=0, num_datasets=1):
        self.trainingset = DFUtil.generate_trainingset(self.dataset, label, filter, window_range, step_size, num_datasets)


In [0]:
class MYINFDataset:

    def __init__(self):
        self.dataset = None
        self.trainingset = None

    def set_dataset(self, path, sample_size=None):
        self.dataset = {}

        for json_patient in ExtractJsonFile(path, sample_size):
            id = json_patient['entry'][0]['resource']['id']
            self.dataset[id] = {}
            self.dataset[id]['codes'] = []
            self.dataset[id]['myinf_hosp'] = None

            for bundle in ExtractCodes(json_patient):
                if bundle['resource_type'] == "Encounter":
                    if (bundle['encounter_code'] == "IMP" or bundle['encounter_code'] == "EMER") \
                    and bundle['reason_code'] == "Myocardial Infarction":  
                        if self.dataset[id]['myinf_hosp'] is None:
                            self.dataset[id]['myinf_hosp'] = bundle['start_date']
                            break

                self.dataset[id]['codes'].append([bundle['start_date'], bundle['reason_code']])

    def set_trainingset(self, label=None, filter=None, window_range=24, step_size=0, num_datasets=1):
        self.trainingset = DFUtil.generate_trainingset(self.dataset, label, filter, window_range, step_size, num_datasets)


In [0]:
class ExtractJsonFile:

    def __init__(self, path, sample_size=None):
        self.path = path
        self.sample_size = sample_size

    def __iter__(self):
        fhircodes = {}
        counter, n = 0, 1

        with tarfile.open(self.path, "r:gz") as tfile:
            for member in tfile:
                if (member.isdir()):
                    continue

                yield pd.read_json(tfile.extractfile(member))

                counter = counter+1
                if (counter == n):
                    print("Processed " + str(counter) + " files")
                    n = n*2
                if (self.sample_size == counter):
                    break
                    

In [0]:
class ExtractCodes:

    def __init__(self, json_patient):
        self.json_patient = json_patient

    def __iter__(self):
        for entry in self.json_patient['entry']:
            resource_type = entry['resource']['resourceType']
            
            if resource_type == "Encounter":
                start_date = entry['resource']['period']['start'][0:10]
                end_date = entry['resource']['period']['end'][0:10]
                encounter_code = entry['resource']['class']['code']
                try:
                    reason_code = entry['resource']['reasonCode'][0]['coding'][0]['display']
                except:
                    reason_code = entry['resource']['type'][0]['coding'][0]['display']

                yield {'resource_type':"Encounter", 'start_date':start_date, 'end_date':end_date, 'reason_code':reason_code, 'encounter_code':encounter_code}
            
            elif resource_type == "Observation":
                start_date = entry['resource']['effectiveDateTime'][0:10]
                reason_code = entry['resource']['code']['coding'][0]['display']
                yield {'resource_type':"Observation", 'start_date':start_date, 'reason_code':reason_code}
            
            elif resource_type == "Procedure":
                start_date = entry['resource']['performedPeriod']['start'][0:10]
                reason_code = entry['resource']['code']['coding'][0]['display']
                yield {'resource_type':"Procedure", 'start_date':start_date, 'reason_code':reason_code}
            
            elif resource_type == "Condition":
                start_date = entry['resource']['onsetDateTime'][0:10]
                reason_code = entry['resource']['code']['coding'][0]['display']
                yield {'resource_type':"Condition", 'start_date':start_date, 'reason_code':reason_code}
            
            elif resource_type == "Immunization":
                start_date = entry['resource']['occurrenceDateTime'][0:10]
                reason_code = entry['resource']['vaccineCode']['coding'][0]['display']
                yield {'resource_type':"Immunization", 'start_date':start_date, 'reason_code':reason_code}
            
            elif resource_type == "MedicationRequest":
                start_date = entry['resource']['authoredOn'][0:10]
                reason_code = entry['resource']['medicationCodeableConcept']['coding'][0]['display']
                yield {'resource_type':"MedicationRequest", 'start_date':start_date, 'reason_code':reason_code}

In [0]:
class DFUtil:

    @staticmethod
    def generate_trainingset(dataset, label=None, filter=None, window_range=24, step_size=0, num_datasets=1):
        df = pd.DataFrame.from_dict(dataset).T.reset_index().rename(columns={'index':'id'})
        if label is not None: df = df.dropna(subset=[filter])

        dfs = []
        for i in range(num_datasets):
            dfs.append(DFUtil.df_to_buckets(df, dataset, label, window_range, i*step_size))
        
        df = list(chain.from_iterable(dfs)) # Flatten multidimensional dfs in a single list
        trainingset = DFUtil.normalize(df, label)
        return trainingset

    # Converts a DataFrame of codes into monthly buckets
    @staticmethod
    def df_to_buckets(df, dict_patients, label, window_range=24, offset=0):
        frames = []

        for index, row in df.iterrows():
            id = row['id']

            # Get specified range
            try: # via label
                end_range = pd.to_datetime(datetime.strptime(dict_patients[id][label], '%Y-%m-%d').date())
            except: # via latest date
                end_range = pd.to_datetime(datetime.strptime(row['codes'][-1][0], '%Y-%m-%d').date())
            end_range = end_range + relativedelta(months=1) - relativedelta(months=offset)
            start_range = end_range - relativedelta(months=window_range)
            
            # Set DataFrame to range
            df = pd.DataFrame(row['codes']).rename(columns={0:'date', 1:'codes'})
            try:
                df['date'] = pd.to_datetime(df['date'])
            except:
                continue

            df = df[df['date'].between(start_range, end_range)].set_index('date')

            # Group codes by month
            df = df.groupby(pd.Grouper(freq='M'))
            df = df.aggregate(lambda x: tuple(x)).reset_index()

            # Flatten df['codes'] into array[month][code]
            arr_codes = []
            for codes in df['codes']:
                code_dict = {}
                for code in codes:
                    code_dict[code] = 1.0
                arr_codes.append(code_dict)

            # Add flattened codes to df
            df = df.join(pd.DataFrame.from_dict(arr_codes).fillna(0)).drop(columns=['codes'])
            # Fill in missing months
            df.set_index('date', inplace=True)
            df = df.reindex(pd.date_range(start_range, end_range, freq='M'), fill_value=0)
            # Remove date as index
            df = df.reset_index(drop=True)

            # Add label
            if (dict_patients[id][label] is not None and offset == 0):
                df[label] = 1.0
            else:
                df[label] = 0.0

            frames.append(df)
            
        return frames

    @staticmethod
    def normalize(bucket_frames, training_label):
        all_columns = []
        for frame in bucket_frames:
            all_columns.extend(x for x in frame.columns.tolist() if not x in all_columns)
            
        final_frames = []
        for df in bucket_frames:
            cols = df.columns.tolist()
            cols.extend(x for x in all_columns if not x in cols)
            df = df.reindex(columns=sorted(cols, reverse=False), fill_value=0)
            col = df[training_label] # move training_label column to end of dateframe
            df.drop(labels=[training_label], axis=1,inplace = True)
            df[training_label] = col
            final_frames.append(df)

        return final_frames

    @staticmethod
    def shuffleColumns(dfs, training_label, num_shuffled):
        dfs_shuffled = []
        for _ in range(num_shuffled):
            df_shuffled = dfs.copy()
            # makes column labels the first row. (numpy only works with numbered columns so this preserves our label names)
            df_shuffled[0] = pd.DataFrame(np.vstack([df_shuffled[0].columns, df_shuffled[0]]))
            # randomize columns using numpy
            arr = df_shuffled[0].to_numpy()
            np.random.shuffle(arr.T)
            # convert back to pandas dataframe
            df_shuffled[0] = pd.DataFrame(arr)
            df_shuffled[0].columns = df_shuffled[0].iloc[0]
            df_shuffled[0] = df_shuffled[0].drop(df_shuffled[0].index[0]).reset_index(drop=True)
            # move training_label to end of dataframe
            col = df_shuffled[0][training_label]
            df_shuffled[0].drop(labels=[training_label], axis=1, inplace = True)
            df_shuffled[0][training_label] = col
            # reindex all dfs on df_shuffled[0]
            for i in range(len(df_shuffled)):
              df_shuffled[i] = df_shuffled[i].reindex(df_shuffled[0].columns, axis=1)

            dfs_shuffled.append(df_shuffled)

        return dfs_shuffled

In [0]:
class OSUtil:

    @staticmethod
    def trainingset_from_csv(path):
        dfs_trainingset = []
        files = os.listdir(path)
        for file in files:
          try:
            dfs_trainingset.append(pd.read_csv(file, index_col ='Unnamed: 0'))
          except Exception as e:
            print(e)
            continue
        return dfs_trainingset

    @staticmethod
    def export_csv(path, dfs, overwrite=False):
        if (os.path.exists(path) == False):
            !mkdir $path
        if (overwrite == True):
            !rm $path"/*.csv"
        print('exporting ' + path)
        for i in range(len(dfs)):
            dfs[i].to_csv(path + '/patient' + str(i) + '.csv')
        print('done')

    @staticmethod
    def zip_folder(input_path, output_path):
        print('compressing ' + input_path)
        !zip -r $output_path".zip" $input_path
        print('finished')

    @staticmethod
    def delete_folder(path):
        !rm -rf $path

### Create training data from congestive heart failure patients

In [0]:
chf = CHFDataset()

In [0]:
chf.set_dataset("synthea-data-chf.tar.gz", sample_size=16)

In [0]:
chf.set_trainingset(label="chf_rehosp", filter="chf_first_discharge", window_range=24)

### Create training data from myocardial infarction patients



In [0]:
myinf = MYINFDataset()

In [0]:
myinf.set_dataset("synthea-data-myocardial-infarction.tar.gz", sample_size=16)

In [0]:
myinf.set_trainingset(label="myinf_hosp", filter="myinf_hosp", window_range=6, step_size=1, num_datasets=6)

### Export dataframes to csv files and zip

In [0]:
#OSUtil.export_csv("csv-chf", chf.trainingset)

In [0]:
#OSUtil.zip_folder('csv-chf', 'csv-chf')