# Run Spark ML LDA on MIMIC3

## 0. Import libraries.

In [1]:
from pyspark.ml.feature import IDF
from pyspark.sql import functions as f

from pyspark.ml.feature import CountVectorizer,StringIndexer
from pyspark.ml.clustering import LDA

In [4]:
spark

## 1. Load data to DataFrame.

In [3]:
charts_df = (sqlContext.read
         .format("com.databricks.spark.csv")
         .option("header", "true")
         .load("../../mimic3/data/CHARTEVENTS.csv"))

adm_df = (sqlContext.read
         .format("com.databricks.spark.csv")
         .option("header", "true")
         .load("../../mimic3/data/ADMISSIONS.csv"))

dias_df = spark.read.csv("../../mimic3/data/DIAGNOSES_ICD.csv", header=True, mode="DROPMALFORMED")
pats_df = spark.read.csv("../../mimic3/data/PATIENTS.csv", header=True, mode="DROPMALFORMED")
icu_df = spark.read.csv("../../mimic3/data/ICUSTAYS.csv", header=True, mode="DROPMALFORMED")
dic_ICD_df = spark.read.csv("../../mimic3/data/D_ICD_DIAGNOSES.csv", header=True, mode="DROPMALFORMED")

## 2. Check if ICD9_code has description

In [3]:
# dic_ICD_df.printSchema()
# dic_ICD_df.show(truncate = False)
dic_ICD_df.where(dic_ICD_df["ICD9_CODE"] == "7793").count()

0

## 3. Aggregate diagnosis codes list through grouping by "SUBJECT_ID".

### 3.1 Filtering noisy disease codes

There are two ways to filter noisy disease codes. One is to filter noisy codes in data pre-processing stage, which means filtering the noisy codes before topic modelling being executed. Another is to filter noisy codes in display stage, which means filtering the noisy codes during display after topic modelling being executed.

The Pros and Cons:

   The first way reduces the data size, and improves running performance. However, the hidden relationships among diagnosis codes are removed. The codes show as follow:
   
```python
filteredDiags = spark.sql("SELECT ICD9_CODE,SUBJECT_ID FROM diagnosis WHERE ICD9_CODE not in ('', '4019','7793','2724','2449')")
```

   The second way keeps all dataset runnning topic modelling and does not distroy the hidden relationships. However, this way only masks the noisy diagnosis codes not to display. The codes show as follow:
   
```python
noisyWordList = ['4019','7793','2724','2449','0389','5849','2875','4280','30560']
...
if ... and vocabs[terms[k]] not in noisyWordList:
```


In [7]:
dias_df.createOrReplaceTempView("diagnosis")
# pats_dias = dias_df.where(dias_df["ICD9_CODE"] != "").groupBy("SUBJECT_ID").agg(f.concat_ws(",", f.collect_list("ICD9_CODE")))
filteredDiags = spark.sql("SELECT SUBJECT_ID,ICD9_CODE FROM diagnosis WHERE ICD9_CODE not in ('', '4019','7793','2724','2449')")
# filteredDiags = spark.sql("SELECT ICD9_CODE,SUBJECT_ID FROM diagnosis WHERE ICD9_CODE != ''")
pats_dias = filteredDiags.groupBy("SUBJECT_ID").agg(f.collect_list("ICD9_CODE"))
pats_dias = pats_dias.select(f.col("SUBJECT_ID"),f.col("collect_list(ICD9_CODE)").alias("codes"))

In [11]:
filteredDiags.show()
# filteredDiags.write.format("com.databricks.spark.csv").option("header", "false").save("../DT2B/usa_healthcare.csv")
filteredDiags.toPandas().to_csv("../DT2B/usa_data.csv", header=False, index=False)



+----------+---------+
|SUBJECT_ID|ICD9_CODE|
+----------+---------+
|       109|    40301|
|       109|      486|
|       109|    58281|
|       109|     5855|
|       109|     4254|
|       109|     2762|
|       109|     7100|
|       109|     2767|
|       109|     7243|
|       109|    45829|
|       109|     2875|
|       109|    28521|
|       109|    28529|
|       109|    27541|
|       109|    40301|
|       109|     5856|
|       109|    58381|
|       109|     7100|
|       109|     5589|
|       109|     2875|
+----------+---------+
only showing top 20 rows



In [16]:
import numpy
data = numpy.genfromtxt("../DT2B/usa_data.csv",delimiter=',',dtype=str)

In [6]:
ts = ['V053','3051','4280','41401','41071','4111','2720','9971','412','V4582','40390','4139','43310']

for t in ts:
    dic = dic_ICD_df.where(dic_ICD_df["ICD9_CODE"] == t)
    print t+" : "+dic.collect()[0][3]

V053 : Need for prophylactic vaccination and inoculation against viral hepatitis
3051 : Tobacco use disorder
4280 : Congestive heart failure, unspecified
41401 : Coronary atherosclerosis of native coronary artery
41071 : Subendocardial infarction, initial episode of care
4111 : Intermediate coronary syndrome
2720 : Pure hypercholesterolemia
9971 : Cardiac complications, not elsewhere classified
412 : Old myocardial infarction
V4582 : Percutaneous transluminal coronary angioplasty status
40390 : Hypertensive chronic kidney disease, unspecified, with chronic kidney disease stage I through stage IV, or unspecified
4139 : Other and unspecified angina pectoris
43310 : Occlusion and stenosis of carotid artery without mention of cerebral infarction


## 4. Data Pre-processing for LDA

### 4.1 LDA on Freqency of words

#### 4.1.1 Frequency for words's weight

In [18]:
#index subject_id to label
indexer = StringIndexer(inputCol="SUBJECT_ID", outputCol="label")
indexed = indexer.fit(pats_dias).transform(pats_dias)

#terms' count vector
vector = CountVectorizer(inputCol="codes", outputCol="features")
countVect = vector.fit(indexed)

#get vocaulary
vocabs = countVect.vocabulary
result = countVect.transform(indexed)

#### 4.1.2 Run on Real World data MIMIC3

In [6]:
dataset = result.select("label","features")

#train LDA model
lda_mimic = LDA(k=10, maxIter=10)
model_mimic = lda_mimic.fit(dataset)

# #get metric standard.
# ll = model_mimic.logLikelihood(dataset)
# lp = model_mimic.logPerplexity(dataset)
# print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
# print("The upper bound on perplexity: " + str(lp))

In [7]:
# Describe topics.
tf_topics = model_mimic.describeTopics(10)

In [8]:
#List topics and each code in one topic
tf_list_topics = tf_topics.select("termIndices").collect()
for i in range(len(tf_list_topics)):
    print "topic {}:".format(i+1)
    terms = tf_list_topics[i][0]
    for k in range(len(terms)):
        dic = dic_ICD_df.where(dic_ICD_df["ICD9_CODE"] == vocabs[terms[k]])
        if dic.count()==0:
            print vocabs[terms[k]]+" : NULL"
        else:
            print vocabs[terms[k]]+" : "+dic.collect()[0][3]
    

topic 1:
V290 : Observation for suspected infectious condition
V3000 : Single liveborn, born in hospital, delivered without mention of cesarean section
V053 : Need for prophylactic vaccination and inoculation against viral hepatitis
7742 : Neonatal jaundice associated with preterm delivery
769 : Respiratory distress syndrome in newborn
77081 : Primary apnea of newborn
7470 : Patent ductus arteriosus
76515 : Other preterm infants, 1,250-1,499 grams
5849 : Acute kidney failure, unspecified
V502 : Routine or ritual circumcision
topic 2:
4019 : Unspecified essential hypertension
41401 : Coronary atherosclerosis of native coronary artery
431 : Intracerebral hemorrhage
53081 : Esophageal reflux
5849 : Acute kidney failure, unspecified
2724 : Other and unspecified hyperlipidemia
25000 : Diabetes mellitus without mention of complication, type II or unspecified type, not stated as uncontrolled
2851 : Acute posthemorrhagic anemia
2720 : Pure hypercholesterolemia
E8810 : Accidental fall from ladd

In [10]:
#get distribution matrix of documents to topics
docTopics = model_mimic.transform(dataset)

## 4.2 LDA on TF-IDF of words

#### 4.2.1 FT-IDF for words' weight

In [9]:
dataset = result.select(f.col("label"),f.col("features").alias("rawFeatures"))
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(dataset)
rescaledData = idfModel.transform(dataset)

#### 4.2.2  Run on Real World data MIMIC3

In [10]:
tfidf_df = rescaledData.select("label","features")
#train LDA model
tfidf_lda_mimic = LDA(k=10, maxIter=10)
tfidf_model_mimic = lda_mimic.fit(tfidf_df)
tfidf_topics = model_mimic.describeTopics(30)
tfidf_topics_terms = tfidf_topics.select("termIndices").collect()

Show the diagnosis with description and not to filter noisy disease codes.

In [12]:
#List topics and each code in one topic
for i in range(len(tfidf_topics_terms)):
    print "topic {}:".format(i+1)
    terms = tfidf_topics_terms[i][0]
    displayNumOfTerms = 10
    n = 0
    for k in range(len(terms)):
        dic = dic_ICD_df.where(dic_ICD_df["ICD9_CODE"] == vocabs[terms[k]])
        if dic.count()==0:
            print vocabs[terms[k]]+" : NULL"
        else:
            print vocabs[terms[k]]+" : "+dic.collect()[0][3]
        n = n + 1
        if n%10 ==0:
                break

topic 1:
V290 : Observation for suspected infectious condition
V3000 : Single liveborn, born in hospital, delivered without mention of cesarean section
V053 : Need for prophylactic vaccination and inoculation against viral hepatitis
7742 : Neonatal jaundice associated with preterm delivery
769 : Respiratory distress syndrome in newborn
77081 : Primary apnea of newborn
7470 : Patent ductus arteriosus
76515 : Other preterm infants, 1,250-1,499 grams
5849 : Acute kidney failure, unspecified
V502 : Routine or ritual circumcision
topic 2:
4019 : Unspecified essential hypertension
41401 : Coronary atherosclerosis of native coronary artery
431 : Intracerebral hemorrhage
53081 : Esophageal reflux
5849 : Acute kidney failure, unspecified
2724 : Other and unspecified hyperlipidemia
25000 : Diabetes mellitus without mention of complication, type II or unspecified type, not stated as uncontrolled
2851 : Acute posthemorrhagic anemia
2720 : Pure hypercholesterolemia
E8810 : Accidental fall from ladd

Show the diagnosis with description and to filter noisy disease codes.

In [13]:
noisyWordList = ['4019','7793','2724','2449','0389','5849','2875','4280','30560']
#List topics and each code in one topic
for i in range(len(tfidf_topics_terms)):
    print "topic {}:".format(i+1)
    terms = tfidf_topics_terms[i][0]
    displayNumOfTerms = 10
    n = 0
    for k in range(len(terms)):
        dic = dic_ICD_df.where(dic_ICD_df["ICD9_CODE"] == vocabs[terms[k]])
        if dic.count()!=0 and vocabs[terms[k]] not in noisyWordList:
            print vocabs[terms[k]]+" : "+dic.collect()[0][3]
            n = n + 1
            if n%10 ==0:
                break

topic 1:
V290 : Observation for suspected infectious condition
V3000 : Single liveborn, born in hospital, delivered without mention of cesarean section
V053 : Need for prophylactic vaccination and inoculation against viral hepatitis
7742 : Neonatal jaundice associated with preterm delivery
769 : Respiratory distress syndrome in newborn
77081 : Primary apnea of newborn
7470 : Patent ductus arteriosus
76515 : Other preterm infants, 1,250-1,499 grams
V502 : Routine or ritual circumcision
4589 : Hypotension, unspecified
topic 2:
41401 : Coronary atherosclerosis of native coronary artery
431 : Intracerebral hemorrhage
53081 : Esophageal reflux
25000 : Diabetes mellitus without mention of complication, type II or unspecified type, not stated as uncontrolled
2851 : Acute posthemorrhagic anemia
2720 : Pure hypercholesterolemia
E8810 : Accidental fall from ladder
42731 : Atrial fibrillation
311 : Depressive disorder, not elsewhere classified
2859 : Anemia, unspecified
topic 3:
42731 : Atrial fi

## Python Syntax Testing

In [4]:
ls = ['sdf','sf','66','67']
ls = [x for x in ls if not x.startswith("6") and not x in ['sf']]
ls

['sdf']

In [23]:
noisyWordList = ['4019','7793','2724','2449']
'4019' not in noisyWordList

False