# Data Processing With PySpark

In this notebook, we refactor the data processing script to leverage the usage of big data tool (pysark)


In [1]:
import sys, time
sys.path.append('../')
import datasets
from dataproc import extract_wvs
from dataproc import get_discharge_summaries
from dataproc import concat_and_split
from dataproc import build_vocab
from dataproc import vocab_index_descriptions
from dataproc import word_embeddings
from constants import MIMIC_3_DIR, DATA_DIR
from collections import Counter, defaultdict
import pandas as pd
import operator

import pyspark.sql.functions as F
from pyspark.sql.types import StringType, IntegerType

Some contants for later use. PySpark configuration is also set here.

In [2]:
from pyspark.sql import SparkSession

Y = 'full' #use all available labels in the dataset for prediction
notes_file = '%s/NOTEEVENTS.csv' % MIMIC_3_DIR # raw note events downloaded from MIMIC-III
vocab_size = 'full' #don't limit the vocab size to a specific number
vocab_min = 3 #discard tokens appearing in fewer than this many documents

spark = SparkSession.builder.master("local[*]").appName("NLP").getOrCreate()
conf = spark.sparkContext._conf.setAll([('spark.executor.memory', '8g'), ('spark.executor.cores', '3'), 
                                        ('spark.cores.max', '3'), ('spark.driver.memory','8g')])

spark.catalog.clearCache()
spark.sparkContext.getConf().getAll()

[('spark.executor.id', 'driver'),
 ('spark.app.id', 'local-1573964629429'),
 ('spark.cores.max', '3'),
 ('spark.driver.port', '44139'),
 ('spark.rdd.compress', 'True'),
 ('spark.driver.memory', '8g'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.executor.memory', '8g'),
 ('spark.app.name', 'NLP'),
 ('spark.executor.cores', '3'),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.host', '192.168.1.109'),
 ('spark.ui.showConsoleProgress', 'true')]

## Combine diagnosis and procedure codes and reformat them

The codes in MIMIC-III are given in separate files for procedures and diagnoses, and the codes are given without periods, which might lead to collisions if we naively combine them. So we have to add the periods back in the right place.



In [3]:
df_procedure = spark.read.csv('%s/PROCEDURES_ICD.csv' % MIMIC_3_DIR, header=True)
df_diagnoses = spark.read.csv('%s/DIAGNOSES_ICD.csv' % MIMIC_3_DIR, header=True)

Get the information of dataframes

In [4]:
print(df_procedure.columns)
print(df_diagnoses.columns)
print(df_procedure.count())
print(df_diagnoses.count())

['ROW_ID', 'SUBJECT_ID', 'HADM_ID', 'SEQ_NUM', 'ICD9_CODE']
['ROW_ID', 'SUBJECT_ID', 'HADM_ID', 'SEQ_NUM', 'ICD9_CODE']
240095
651047


Print top 5 rows of each dataframe

In [5]:
df_procedure.show(5)
df_diagnoses.show(5)

+------+----------+-------+-------+---------+
|ROW_ID|SUBJECT_ID|HADM_ID|SEQ_NUM|ICD9_CODE|
+------+----------+-------+-------+---------+
|   944|     62641| 154460|      3|     3404|
|   945|      2592| 130856|      1|     9671|
|   946|      2592| 130856|      2|     3893|
|   947|     55357| 119355|      1|     9672|
|   948|     55357| 119355|      2|     0331|
+------+----------+-------+-------+---------+
only showing top 5 rows

+------+----------+-------+-------+---------+
|ROW_ID|SUBJECT_ID|HADM_ID|SEQ_NUM|ICD9_CODE|
+------+----------+-------+-------+---------+
|  1297|       109| 172335|      1|    40301|
|  1298|       109| 172335|      2|      486|
|  1299|       109| 172335|      3|    58281|
|  1300|       109| 172335|      4|     5855|
|  1301|       109| 172335|      5|     4254|
+------+----------+-------+-------+---------+
only showing top 5 rows



Define UDF to process the column data and parse the ICD9 code to regular format (with period)

In [6]:
def reformat (code, file_type):
    """
        Put a period in the right place because the MIMIC-3 data files exclude them.
        Generally, procedure codes have dots after the first two digits, 
        while diagnosis codes have dots after the first three digits.
    """
    if code is None:
        return code
    code = ''.join(code.split('.'))
    if file_type == "diagnoses":
        if code.startswith('E'):
            if len(code) > 4:
                code = code[:4] + '.' + code[4:]
        else:
            if len(code) > 3:
                code = code[:3] + '.' + code[3:]
    else:
        code = str(int(code))
        code = code[:2] + '.' + code[2:]
    return code 

code_reformat_udf = F.udf(reformat, StringType())

In [7]:
df_diagnoses = df_diagnoses.withColumn("absolute_code", code_reformat_udf(df_diagnoses["ICD9_CODE"], F.lit("diagnoses")))
df_procedure = df_procedure.withColumn("absolute_code", code_reformat_udf(df_procedure["ICD9_CODE"], F.lit("procedure")))
df_codes = df_diagnoses.union(df_procedure)
df_codes.show(5)

+------+----------+-------+-------+---------+-------------+
|ROW_ID|SUBJECT_ID|HADM_ID|SEQ_NUM|ICD9_CODE|absolute_code|
+------+----------+-------+-------+---------+-------------+
|  1297|       109| 172335|      1|    40301|       403.01|
|  1298|       109| 172335|      2|      486|          486|
|  1299|       109| 172335|      3|    58281|       582.81|
|  1300|       109| 172335|      4|     5855|        585.5|
|  1301|       109| 172335|      5|     4254|        425.4|
+------+----------+-------+-------+---------+-------------+
only showing top 5 rows



Export all codes to a local file

In [8]:
df_codes.toPandas().to_csv('%s/pyspark/ALL_CODES.csv' % MIMIC_3_DIR, index=False,
               columns=['ROW_ID', 'SUBJECT_ID', 'HADM_ID', 'SEQ_NUM', 'absolute_code'],
               header=['ROW_ID', 'SUBJECT_ID', 'HADM_ID', 'SEQ_NUM', 'ICD9_CODE'])

Print the number of all codes after using the pyspark method. And compare with the outputs from Pandas ETL methods

In [9]:
import pandas as pd
#In the full dataset (not just discharge summaries)
df = spark.read.csv('%s/pyspark/ALL_CODES.csv' % MIMIC_3_DIR, header=True)
df_code_unique = set(df.select('ICD9_CODE').distinct().rdd.map(lambda x: x[0]).collect())
df_reference = pd.read_csv('%s/ALL_CODES.csv' % MIMIC_3_DIR, dtype={"ICD9_CODE": str})
all_code = set(df_reference['ICD9_CODE'].unique())
print("Unique code number from Pandas ETL Script: ", len(all_code))
print("Unique code number from PySpark ETL Script:", len(df_code_unique))
print("Number of different code: ", len(df_code_unique.difference(all_code)))
print("Different code: ", df_code_unique.difference(all_code))

Unique code number from Pandas ETL Script:  8994
Unique code number from PySpark ETL Script: 8994
Number of different code:  1
Different code:  {None}


## Tokenize and preprocess raw text

Preprocessing time!

In the original notebook, the discharge summaries were processed line by line without any dataset level processing. Here, we will use pyspark to process the data.

First, we created a UDF to tokenize the discharge text information

In [10]:
from nltk.tokenize import RegexpTokenizer
#retain only alphanumeric
tokenizer = RegexpTokenizer(r'\w+')
def tokenize_discharge(note):
    """
    This function will tokenize discharge summaries in the note column
    It will tokenize discharge summarizes
    :param note: text format note
    :return: tokenized strings
    """
    tokens = [t.lower() for t in tokenizer.tokenize(note) if not t.isnumeric()]
    text = ' '.join(tokens)
    return text
tokenize_discharge_udf = F.udf(tokenize_discharge, StringType())

Next, we will process the discharge summary. 
This will:
- Select only discharge summaries and their addenda
- remove punctuation and numeric-only tokens, removing 500 but keeping 250mg
- lowercase all tokens
Then, we will save a copy of tokenized discharge file in outoput.
To compare the performance to PySpark processing against

In [11]:
df_discharge = spark.read.option("multiLine", True).option("escape", "\"").\
                csv("%s/NOTEEVENTS.csv" %MIMIC_3_DIR, header=True)
start = time.time()
df_discharge = df_discharge.filter(df_discharge['CATEGORY'] == "Discharge summary")\
    .withColumn("tokens", tokenize_discharge_udf(df_discharge['TEXT']))
print("Saving discharge summary to a file...")
df_discharge = df_discharge.select("SUBJECT_ID","HADM_ID","CHARTTIME","tokens")
df_discharge = df_discharge.withColumnRenamed("tokens", "TEXT")
df_discharge.write.csv('%s/pyspark/hdfs/disch_full.csv' % MIMIC_3_DIR, header=True, mode="overwrite")
end = time.time()
print("Total Processing time: ", end-start, " seconds")
df_discharge.show(5)

Dischange dataframe schema:  StructType(List(StructField(ROW_ID,StringType,true),StructField(SUBJECT_ID,StringType,true),StructField(HADM_ID,StringType,true),StructField(CHARTDATE,StringType,true),StructField(CHARTTIME,StringType,true),StructField(STORETIME,StringType,true),StructField(CATEGORY,StringType,true),StructField(DESCRIPTION,StringType,true),StructField(CGID,StringType,true),StructField(ISERROR,StringType,true),StructField(TEXT,StringType,true)))
Saving discharge summary to a file...
Total Processing time:  55.37991976737976  seconds
+----------+-------+---------+--------------------+
|SUBJECT_ID|HADM_ID|CHARTTIME|                TEXT|
+----------+-------+---------+--------------------+
|     22532| 167853|     null|admission date di...|
|     13702| 107527|     null|admission date di...|
|     13702| 167118|     null|admission date di...|
|     13702| 196489|     null|admission date di...|
|     26880| 135453|     null|admission date di...|
+----------+-------+---------+----

Following is the comparison of data processing time for the discharge summary file. Currently, the spark configuration is `('spark.executor.memory', '8g'), ('spark.executor.cores', '3'), ('spark.cores.max', '3'), ('spark.driver.memory','8g')`

| Original Script Processing time (sec) | PySpark Script Processing Time including  (sec) |
| --- | ---|
|66.5|55.4|

Next, we will check the data processing quality by comparing with the original script output.

Since the output from last step was written to hdfs, the file name has been changed. So in this step, we need to manual change the file name and file location.

In [12]:
#How many admissions?
df_discharge = spark.read.csv('%s/pyspark/disch_full.csv' % MIMIC_3_DIR, header=True)
admission_set = set(df_discharge.select('HADM_ID').distinct().rdd.map(lambda x: x[0]).collect())

df_original = pd.read_csv('%s/disch_full.csv' % MIMIC_3_DIR, dtype={"HADM_ID": str})
original_admission_set = set(df_original['HADM_ID'].unique())

print("Unique code number from original ETL Script: ", len(original_admission_set))
print("Unique code number from PySpark ETL Script:", len(admission_set))
print("Number of different code: ", len(admission_set.difference(original_admission_set)))
print("Different code: ", admission_set.difference(original_admission_set))

Unique code number from original ETL Script:  52726
Unique code number from PySpark ETL Script: 52726
Number of different code:  0
Different code:  set()


Validate the data processing with count of tokens and unique token types. Output from original script was `num types 150853 num tokens 79801402`

In [13]:
#Tokens and types
df_discharge_explode = df_discharge.withColumn("TOKEN", F.explode(F.split(df_discharge['TEXT'], " ")) )
print("Number of tokens: ", df_discharge_explode.count())
print("Number of types: ", df_discharge_explode.select('TOKEN').distinct().count())

Number of tokens:  79801402
Number of types:  150853


## Consolidate labels with set of discharge summaries

Then, we will filter the code based on `HADM_ID` with discharge summaries, so we will exclude the Code records which doesn't have any discharge summaries asccociated with.

In [14]:
df_codes = spark.read.csv('%s/pyspark/ALL_CODES.csv' % MIMIC_3_DIR, header=True)
df_codes_filtered = df_codes.filter(df_codes['HADM_ID'].isin(admission_set))
df_codes_filtered.toPandas().to_csv('%s/pyspark/ALL_CODES_filtered.csv' % MIMIC_3_DIR, index=False,
               columns=['SUBJECT_ID', 'HADM_ID', 'ICD9_CODE'],
               header=['SUBJECT_ID', 'HADM_ID', 'ICD9_CODE'])

Now, we can validate the data processing by comparing the number of unique HADM_IDs (original script has 52726 unqiue IDs)

In [15]:
df_codes_filtered = spark.read.csv('%s/pyspark/ALL_CODES_filtered.csv' % MIMIC_3_DIR, header=True)
print("The number of HADM_IDs with both codes and discharge summary: ", 
      df_codes_filtered.select("HADM_ID").distinct().count())

The number of HADM_IDs with both codes and discharge summary:  52726


## Append labels to notes in a single file

In this step, we will append the ICD code label to another column in discharge summary table.

In [16]:
df_label_agg = df_codes_filtered.groupby("SUBJECT_ID", "HADM_ID").\
               agg(F.concat_ws(";", F.collect_list(df_codes_filtered["ICD9_CODE"])).alias("LABELS"))
df_label_agg.show(5)

+----------+-------+--------------------+
|SUBJECT_ID|HADM_ID|              LABELS|
+----------+-------+--------------------+
|     10661| 139315|13.9;11.4;38.93;3...|
|      1086| 114240|431;584.9;425.4;5...|
|     11604| 178435|431;401.9;433.10;...|
|     11657| 103198|34.91;96.71;96.04...|
|     11691| 138190|96.6;38.93;577.0;...|
+----------+-------+--------------------+
only showing top 5 rows



In [17]:
df_discharge = spark.read.csv('%s/pyspark/disch_full.csv' % MIMIC_3_DIR, header=True)
df_discharge_labeled = df_discharge.join(df_label_agg, (df_discharge['SUBJECT_ID'] == df_label_agg['SUBJECT_ID']) & \
                                        ((df_discharge['HADM_ID'] == df_label_agg['HADM_ID'])), how="left").\
                                         select(df_discharge.SUBJECT_ID, df_discharge.HADM_ID, "TEXT", "LABELS")
print(df_discharge_labeled.select('HADM_ID').distinct().count())
df_discharge_labeled.show(5)

52726
+----------+-------+--------------------+--------------------+
|SUBJECT_ID|HADM_ID|                TEXT|              LABELS|
+----------+-------+--------------------+--------------------+
|     10661| 139315|admission date di...|13.9;11.4;38.93;3...|
|      1086| 114240|admission date di...|431;584.9;425.4;5...|
|     11604| 178435|admission date di...|431;401.9;433.10;...|
|     11657| 103198|admission date di...|34.91;96.71;96.04...|
|     11691| 138190|admission date di...|96.6;38.93;577.0;...|
+----------+-------+--------------------+--------------------+
only showing top 5 rows



In [18]:
df_discharge_labeled.coalesce(1).\
            write.csv('%s/pyspark/hdfs/notes_labeled.csv' % MIMIC_3_DIR, header=True, mode="overwrite")

## Create train/dev/test splits

In [19]:
fname = '%s/notes_labeled.csv' % MIMIC_3_DIR
base_name = "%s/disch" % MIMIC_3_DIR #for output
tr, dv, te = concat_and_split.split_data(fname, base_name=base_name)

SPLITTING
0 read
10000 read
20000 read
30000 read
40000 read
50000 read


## Build vocabulary from training data

In [36]:
vocab_min = 3
vname = '%s/vocab.csv' % MIMIC_3_DIR
build_vocab.build_vocab(vocab_min, tr, vname)

reading in data...
removing rare terms
51917 terms qualify out of 140794 total
writing output


## Sort each data split by length for batching

In [3]:
for splt in ['train', 'dev', 'test']:
    filename = '%s/disch_%s_split.csv' % (MIMIC_3_DIR, splt)
    df = pd.read_csv(filename)
    df['length'] = df.apply(lambda row: len(str(row['TEXT']).split()), axis=1)
    df = df.sort_values(['length'])
    df.to_csv('%s/%s_full.csv' % (MIMIC_3_DIR, splt), index=False)

## Pre-train word embeddings

Let's train word embeddings on all words

In [35]:
##https://spark.apache.org/docs/2.2.0/mllib-feature-extraction.html

w2v_file = word_embeddings.word_embeddings('full', '%s/disch_full.csv' % MIMIC_3_DIR, 100, 0, 5)

building word2vec vocab on /home/chaopu/CS6250_BD4H/Final_Projects/caml-mimic/caml-mimic/mimicdata/mimic3/disch_full.csv...
training...
writing embeddings to /home/chaopu/CS6250_BD4H/Final_Projects/caml-mimic/caml-mimic/mimicdata/mimic3/processed_full.w2v


## Write pre-trained word embeddings with new vocab

In [36]:
extract_wvs.gensim_to_embeddings('%s/processed_full.w2v' % MIMIC_3_DIR, '%s/vocab.csv' % MIMIC_3_DIR, Y)

100%|██████████| 51917/51917 [00:00<00:00, 207286.27it/s]


## Pre-process code descriptions using the vocab

In [37]:
vocab_index_descriptions.vocab_index_descriptions('%s/vocab.csv' % MIMIC_3_DIR,
                                                  '%s/description_vectors.vocab' % MIMIC_3_DIR)

100%|██████████| 22267/22267 [00:00<00:00, 130775.23it/s]


## Filter each split to the top 50 diagnosis/procedure codes

In [43]:
topK = 50

In [41]:
#first calculate the top k
df_nl = spark.read.csv('%s/notes_labeled.csv' % MIMIC_3_DIR, header=True)
df_nl_explode = df_nl.select("LABELS").withColumn("LABEL_SINGLE", F.explode(F.split(df_nl["LABELS"], ";"))).\
                select("LABEL_SINGLE")
df_nl_explode = df_nl_explode.groupBy("LABEL_SINGLE").count().orderBy("count", ascending=False)
df_nl_explode.show(5)

+------------+-----+
|LABEL_SINGLE|count|
+------------+-----+
|       401.9|20053|
|       38.93|14444|
|       428.0|12842|
|      427.31|12594|
|      414.01|12179|
+------------+-----+
only showing top 5 rows



In [47]:
code_topK = df_nl_explode.limit(topK).select("LABEL_SINGLE").rdd.map(lambda x: x[0]).collect()

In [49]:
import csv
with open('%s/pyspark/TOP_%s_CODES.csv' % (MIMIC_3_DIR, str(topK)), 'w') as of:
    w = csv.writer(of)
    for code in code_topK:
        w.writerow([code])

In [53]:
for splt in ['train', 'dev', 'test']:
    print(splt)
    hadm_ids = set()
    with open('%s/%s_50_hadm_ids.csv' % (MIMIC_3_DIR, splt), 'r') as f:
        for line in f:
            hadm_ids.add(line.rstrip())
    
    with open('%s/notes_labeled.csv' % MIMIC_3_DIR, 'r') as f:
        with open('%s/%s_%s.csv' % (MIMIC_3_DIR, splt, str(topK)), 'w') as of:
            r = csv.reader(f)
            w = csv.writer(of)
            #header
            w.writerow(next(r))
            i = 0
            for row in r:
                hadm_id = row[1]
                if hadm_id not in hadm_ids:
                    continue
                codes = set(str(row[3]).split(';'))
                filtered_codes = codes.intersection(set(code_topK))
                if len(filtered_codes) > 0:
                    w.writerow(row[:3] + [';'.join(filtered_codes)])
                    i += 1

train
dev
test


Next, we will calculate the add a column `length` with the length of tokens in each record, and then order by this column.

In [None]:
def count_token(text):
    word_list = text.split(" ")
    return len(word_list)
count_token_udf = F.udf(count_token, IntegerType())

In [57]:
for splt in ['train', 'dev', 'test']:
    filename = '%s/%s_%s.csv' % (MIMIC_3_DIR, splt, str(topK))
    df = spark.read.csv(filename, header=True)
    df = df.withColumn("length", count_token_udf(df['TEXT'])).orderBy("length")
    df.toPandas().to_csv('%s/pyspark/%s_%s.csv' % (MIMIC_3_DIR, splt, str(topK)), index=False)