![stream](https://image.slidesharecdn.com/hl7v2messagingconformancejan2011-110303095217-phpapp02/95/hl7-v2-messaging-conformance-jan-2011-15-728.jpg?cb=1299146151)

#### Author: Vedant Jain

##### Medical Coding
A coder’s job is to code to the highest level of specificity. This means abstracting the most information out of the medical reports from the provider and taking accurate notes. It also means knowing the medical terminology for both procedures and diagnoses. Coding to a general level, or undercoding (which we’ll discuss in a moment) can lead to a rejected or denied claim.

##### LOINC – Logical Observation Identifiers Names and Codes
Logical Observation Identifiers Names and Codes (LOINC) was created in 1994 by the Regenstrief Institute as a free, universal standard for laboratory and clinical observations, and to enable exchange of health information across different systems. Where ICD records diagnoses and CPT services, LOINC is a code system used to identify test observations. LOINC codes are often more specific than CPT, and one CPT code can have multiple LOINC codes associated with it.

Currently, more than 26,000 people in 157 countries are using LOINC, and it has been recognized as the preferred standard for coding testing and observations in HL7.


###### With the following solution built on top of Spark, a healthcare provider benefitted in the following ways:
* ######Timely and accurate billing process resulting in 1M +  additional annual revenue from documented secondary diagnoses and care 
* ######Reduced costs with Improved surgery suite/staff utilization
* ######Average of 50,000 documents passed through annotators per day versus 5,000 historically
* ######Free-text search use cases completing in milliseconds versus 30+ minutes previously
* ######NLP, Categorization & Text Search for HL7/EMR Data

In [2]:
%fs ls /mnt/vedant-demo/healthcare/hl7

In [3]:
path = "/mnt/vedant-demo/healthcare/hl7/"
hl7RDD = sc.wholeTextFiles(path)
hl7DF = hl7RDD.toDF()
display(hl7DF)

###Step 1: Parse every HL7 message into JSON
###### Convert the HL7 payload into JSON. The hidden cell below contains a python function that does the conversion

Hidden Cell Below

In [6]:
import sys, hl7, json

import sys, hl7, json
from datetime import datetime
import pandas as pd
import time
import re



def to_date(dt):
    try:
        date = datetime.strptime(dt , '%Y%m%d')
        return date
    except:
        return None

def get_pname(seg):
    try:    
        name = str(seg[5])
        name = name.replace("^", " ")
    
    except:
        name = None
    
    return name

def get_paddress(seg):
    try:
        address = str(seg[11]).replace("^", ",")
        address = address.split(',')
        address = str(address[:4]).replace("[", "").replace("]", "")
            
    except:
        address  = None
    
    return address
    
def get_segments(message):
  num_ids = len(message)
  seg = []
  for i in xrange(1, num_ids+1):    
    seg.append(message['segments'][i]['_id'])
  return seg

def cleanup_json(thisjson):
    clean_json = re.sub(r':\s*{', ': {', thisjson)
    clean_json = re.sub(r'}\n,', '},', clean_json)
    clean_json = re.sub(r',(\s*)}', r'\1}', clean_json)
    clean_json = thisjson.replace("'", "\\").replace("\\","")
    return clean_json

  
def readHL7(infile):
    firstLine = True
    Messages = []
    strMessage = ''
    for line in infile:
#         line = line.strip()
        if(len(line) > 0):
             if(firstLine == False and line.startswith(u'MSH|^~\&')):
                 Messages.append(strMessage)
                 strMessage = line
             else:
                 strMessage += line
             firstLine = False
    if(len(strMessage) >= 4):
         Messages.append(strMessage)
    Messages = str(Messages)
    for message in infile:
        h = hl7.parse(infile)
    try:
      msh_10 = h.segment('MSH')[10];
      #event = h.segment('MSH')[9];
      msgdt = str(h.segment('MSH')[7]);  
      msgdt = to_date(msgdt[:8])
      _segments = [];
      segIndex = 1;
      segDic = {};
      segARR = []
    except:
      msh_10 = None;
      #event = h.segment('MSH')[9];
      msgdt = None 
      _segments = [];
      segIndex = 1;
      segDic = {};
      segARR = []
    try:
        pname = get_pname(h.segment('PID'))
    except:
        pname = None
    
    try:
        paddress = get_paddress(h.segment('PID'))
    except:
        paddress = None
    
    for seg in h:
        segName = unicode(seg[0])
        segVal = unicode(seg)
        fieldIndex = 1
        fieldCount = 1
        _fields = []
        seg.pop(0)
        if(segName == 'MSH'):
            fieldDoc = {'_id':'MSH_1','Val': seg.separator}
            _fields.append(fieldDoc)
            fieldCount += 1
            fieldIndex += 1

        for field in seg:
            fieldName = segName+'_'+unicode(fieldIndex)
            fieldVal = unicode(field)
            hasRepetitions = False;
            if fieldVal:
                fieldDoc = {'_id': fieldName,'Val': fieldVal}
                
                if ('~' in fieldVal and fieldName != 'MSH_2'):
                    hasRepetitions = True;
                    _repfields = []
                    repFields = fieldVal.split('~');
                    repIndex = 1;
                    for repField in repFields:
                        if repField:
                            repFieldVal = unicode(repField);
                            fieldName = segName+'_'+unicode(fieldIndex)
                            fieldDoc = {'_id': fieldName,'Val': repFieldVal, 'Rep': repIndex}
                            _repfields.append(fieldDoc)
                        
                            if('^' in repFieldVal):
                                repFieldComps = repFieldVal.split('^');
                                comIndex = 1;
                                for repFieldComp in repFieldComps:
                                    repFieldCompVal = unicode(repFieldComp);
                                    comName = segName+'_'+unicode(fieldIndex)+'_'+unicode(comIndex)
                                    if repFieldCompVal:
                                        fieldDoc = {'_id': comName,'Val': repFieldCompVal, 'Rep': repIndex}
                                        _repfields.append(fieldDoc)
                                    comIndex += 1
                        repIndex += 1;	
							
                    fieldDoc = {'_id': fieldName,'Val': fieldVal, 'Repetitions': _repfields}
					
                _fields.append(fieldDoc)
                fieldCount += 1
				
                if (hasRepetitions == False and len(field) > 1 and fieldName != 'MSH_2'):
                    comIndex = 1
                    for component in field:
                        comName = segName+'_'+unicode(fieldIndex)+'_'+unicode(comIndex)
                        comVal = unicode(component)
                        if comVal:
                            fieldDoc = {'_id': comName,'Val': comVal}
                            _fields.append(fieldDoc)
                        comIndex += 1
            fieldIndex += 1

        if segName in segDic:
            segDic[segName] = segDic[segName] + 1;
        else:
            segDic[segName] = 1;
		
        segDoc ={'_id': segName, 'Rep': segDic[segName], 'Seq': segIndex, 'Val': segVal, 'FC': fieldIndex-1, 'VF': fieldCount-1, 'Fields': _fields}
        _segments.append(segDoc)
        segIndex += 1
        #segARR.append(segName)
        #segJ = [json.dumps(x) for x in segARR]
        
    json_segments = [json.dumps(x) for x in _segments]
    
    
    #json_segments = cleanup_json(json_segments)
    #ts = time.time()
    hl7doc = ('{ "id": "%s", "date": "%s", "name": "%s", "address": "%s", "segments": %s }') % (msh_10, msgdt, pname, paddress, json_segments)
    hl7doc = cleanup_json(hl7doc)
    hl7js = json.dumps(hl7doc)
    newjson = json.loads(hl7js)
    return newjson
#         doc = hl72JSON(h)     
#         return h
#         newjson = json.loads(doc)
#         return newjson
#         return message

# hl7RDD.collect()
# display(hl7DF)
# hl7RDD.map(lambda x: x[1].split(","))
# hl7DF = hl7RDD.toDF()
# display(hl7DF)



In [7]:
path = "/mnt/vedant-demo/healthcare/hl7/"
hl7RDD = sc.wholeTextFiles(path)
hl7RDD = hl7RDD.map(lambda x: readHL7(x[1]))
# hl7RDD.collect()

### Step 2: Create a JSON Dataframe

In [9]:
hl7DF = spark.read.json(hl7RDD)
display(hl7DF)

In [10]:
patientDF  = hl7DF.selectExpr("id","name","address")

In [11]:
display(patientDF)

###Step 3: Get the data from individual segments and apply structure

In [13]:
from pyspark.sql.functions import explode

dfsSegments = hl7DF.select("address", "date", "id", "name", explode("segments").alias("segmentsData"))
# display(dfsSegments)

##### Chart Patient Treatment Timeline

By creating a structured dataset keyed on patient information and organized by timestamp we can create a view of the patient's treatment timeline. Similarly, by keying on the condition across all patients, we can create a view of outcomes of treatments.

In [15]:
dfsSegmentsData = dfsSegments.select(dfsSegments["address"], dfsSegments["date"], dfsSegments["id"], dfsSegments["name"], dfsSegments["segmentsData"].getField("FC"), dfsSegments["segmentsData"].getField("Rep"), dfsSegments["segmentsData"].getField("Seq"), dfsSegments["segmentsData"].getField("VF"), dfsSegments["segmentsData"].getField("Val").alias("values"), dfsSegments["segmentsData"].getField("_id"))

display(dfsSegmentsData.filter(dfsSegmentsData.date != 'None'))

In [16]:
# dfsSegmentsData.write.mode("overwrite").parquet("/mnt/vedant-demo/healthcare/hl7-parsed")

### Step 4: Extract Lab Results

The Observation/Result segment is used to transmit the observations of the LAB. OBX segments have great flexibility to report information. When properly coded, OBX segments report a large amount of information in a small amount of space. OBX segments within the ORU message are widely used to report laboratory and other clinical information.

There can be many OBX segments identified like OBX|1|, OBX|2|, OBX|3|, OBX|4|, OBX|5|, and OBX|6|, etc.

An Example of information contained in the OBX segment

![stream](http://lh6.ggpht.com/-fWCUtlqIvBg/U_gzX7cyonI/AAAAAAAAFH8/rEgY-kghFzU/image_thumb%25255B1%25255D.png?imgmax=800)

For details on the OBX Segment and other HL7 segments, please refer to the following guide: https://www.hcup-us.ahrq.gov/datainnovations/clinicalcontentenhancementtoolkit/hi6.jsp

####Filter the dataframe to retrieve only the OBX data

In [21]:
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

df = spark.read.parquet("/mnt/vedant-demo/healthcare/hl7-parsed/")
df = df.select("*").filter(df["`segmentsData._id`"] == "OBX")

In [22]:
from pyspark.sql.functions import split
split_col = split(df['values'], '\\|')
df.withColumn("values", split("values", "\\|"))
df = df.withColumn("reason_for_visit", split_col.getItem(5))

In [23]:
display(df)

### Step 5: Perform data cleansing - remove stop words, special characters etc..

In [25]:
from pyspark.sql.functions import udf

def cleaning(value):
  value = value.replace("_", " ")
  return value

clean_udf = udf(cleaning)

newdf = df.withColumn("reason_for_visit", clean_udf(df.reason_for_visit))

# display(newdf)

In [26]:
from pyspark.sql.types import ArrayType
import struct
import re

def cleanup_text(record):
    words = record.split()
    
    # Default list of Stopwords
    stopwords_core = ['a', u'about', u'above', u'after', u'again', u'against', u'all', u'am', u'an', u'and', u'any', u'are', u'arent', u'as', u'at', 
    u'be', u'because', u'been', u'before', u'being', u'below', u'between', u'both', u'but', u'by', 
    u'can', 'cant', 'come', u'could', 'couldnt', 
    u'd', u'did', u'didn', u'do', u'does', u'doesnt', u'doing', u'dont', u'down', u'during', 
    u'each', 
    u'few', 'finally', u'for', u'from', u'further', 
    u'had', u'hadnt', u'has', u'hasnt', u'have', u'havent', u'having', u'he', u'her', u'here', u'hers', u'herself', u'him', u'himself', u'his', u'how', 
    u'i', u'if', u'in', u'into', u'is', u'isnt', u'it', u'its', u'itself', 
    u'just', 
    u'll', 
    u'm', u'me', u'might', u'more', u'most', u'must', u'my', u'myself', 
    u'no', u'nor', u'not', u'now', 
    u'o', u'of', u'off', u'on', u'once', u'only', u'or', u'other', u'our', u'ours', u'ourselves', u'out', u'over', u'own', 
    u'r', u're', 
    u's', 'said', u'same', u'she', u'should', u'shouldnt', u'so', u'some', u'such', 
    u't', u'than', u'that', 'thats', u'the', u'their', u'theirs', u'them', u'themselves', u'then', u'there', u'these', u'they', u'this', u'those', u'through', u'to', u'too', 
    u'under', u'until', u'up', 
    u'very', 
    u'was', u'wasnt', u'we', u'were', u'werent', u'what', u'when', u'where', u'which', u'while', u'who', u'whom', u'why', u'will', u'with', u'wont', u'would', 
    u'y', u'you', u'your', u'yours', u'yourself', u'yourselves']
    
    # Custom List of Stopwords - Add your own here
    stopwords_custom = ['patient', 'complained', 'reason', 'visit']
    stopwords = stopwords_core + stopwords_custom 
    words = [re.sub('[^a-zA-Z0-9]','',word) for word in words]                                       # Remove special characters
    final = [word.lower() for word in words if word.lower() not in stopwords]     # Remove stopwords 
    final = str(final)
    final = final.replace("[", "").replace("]", "")
    return final
 
udf_cleantext = udf(cleanup_text , StringType())
finaldf = newdf.withColumn("items", udf_cleantext(newdf.reason_for_visit))
display(finaldf)

###### Now we have our test data properly cleaned and ready for machine learning.

### Step 6: Prepare the training dataset. We will use the LOINC data to train our LDA Model.

In [29]:
import re
def cleaned_names(record):
  items = []
#   record = [x for x in record]
  if record is not None:
    record = record.encode('utf-8')
    record = str(record)
    names = record.split(";")
    for x in names:
      items.append(x)
#     names = [x.split(";") for x in record]
#     names = record[0]
  else: 
    names = None
  items = [re.sub('[^a-zA-Z0-9]','',word) for word in items]                                       # Remove special characters
  items = str(items)
  items = items.replace("[", "").replace("]", "").replace(" ", "")
  return items
cleaned_names_udf = udf(cleaned_names)
df = spark.read.option("header", "true").csv("/mnt/vedant-demo/healthcare/loinc-csv/")
df = df.withColumn("items", cleaned_names_udf(df.RELATEDNAMES2))
display(df)

#### Let's lookup all the terms that are related to 'Alcohol' and their status

In [31]:
display(df.where(df.RELATEDNAMES2.like("%alcohol%")).filter(df.STATUS != "which are found in whole blood"))

##### Similarily, for hygiene:

In [33]:
display(df.where(df.RELATEDNAMES2.like("%hygiene%")).filter(df.STATUS != "which are found in whole blood"))

### Step 7: Create a ML pipeline with Tokenizer, CountVectorizer and LDA
###### In this step, we will train algorithm to automatically generate/recommend a list of LOINC codes based on the conditions described in the observational results

#### Topic Modeling Latent Dirichlet Allocation

![stream](https://littleml.files.wordpress.com/2016/11/topic_graph.png?w=497&h=185)

In LDA, each document may be viewed as a mixture of various topics where each document is considered to have a set of topics that are assigned to it via LDA. This is identical to probabilistic latent semantic analysis (pLSA), except that in LDA the topic distribution is assumed to have a sparse Dirichlet prior. The sparse Dirichlet priors encode the intuition that documents cover only a small set of topics and that topics use only a small set of words frequently. In practice, this results in a better disambiguation of words and a more precise assignment of documents to topics. LDA is a generalisation of the pLSA model, which is equivalent to LDA under a uniform Dirichlet prior distribution.[5]

For example, an LDA model might have topics that can be classified as CAT_related and DOG_related. A topic has probabilities of generating various words, such as milk, meow, and kitten, which can be classified and interpreted by the viewer as "CAT_related". Naturally, the word cat itself will have high probability given this topic. The DOG_related topic likewise has probabilities of generating each word: puppy, bark, and bone might have high probability. Words without special relevance, such as the (see function word), will have roughly even probability between classes (or can be placed into a separate category). A topic is not strongly defined, neither semantically nor epistemologically. It is identified on the basis of automatic detection of the likelihood of term co-occurrence. A lexical word may occur in several topics with a different probability, however, with a different typical set of neighboring words in each topic.

Each document is assumed to be characterized by a particular set of topics. This is akin to the standard bag of words model assumption, and makes the individual words exchangeable.
[SOURCE: https://en.wikipedia.org/wiki/Latent_Dirichlet_allocation]

In [37]:
from pyspark.ml.feature import HashingTF, IDF, Tokenizer, CountVectorizer
from pyspark.ml import Pipeline
from pyspark.ml.clustering import LDA
from pyspark.sql.types import ArrayType, StringType
from pyspark.ml.classification import NaiveBayes


tokenizer = Tokenizer(inputCol="items", outputCol="words")
loincData = tokenizer.transform(df)
# display(loincData)
# # Option 1 - HashingTF
# hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
# featurizedData = hashingTF.transform(loincData)
# alternatively, CountVectorizer can also be used to get term frequency vectors
# Option 2 (CountVectorizer) - Term Frequency Vectorization    : 
cv = CountVectorizer(inputCol="words", outputCol="rawFeatures", vocabSize = 100000)
cvmodel = cv.fit(loincData)
featurizedData = cvmodel.transform(loincData)
vocab = cvmodel.vocabulary
vocab_broadcast = sc.broadcast(vocab)

# idf = IDF(inputCol="rawFeatures", outputCol="features")
# idf = IDF(inputCol="ngramFeatures", outputCol="features") # for n-gram features
# idfModel = idf.fit(featurizedData)

lda = LDA(k = 4, featuresCol="rawFeatures")
ldamodel = lda.fit(featurizedData)
pipeline = Pipeline(stages=[tokenizer, cv, lda])
model = pipeline.fit(df)

rescaledData = ldamodel.transform(featurizedData)

display(rescaledData)

### Step 8: Use pipeline to transform the test and the training dataset

In [39]:
ldaResults = model.transform(finaldf)
# display(ldaResults)

In [40]:
from pyspark.ml.clustering import LDA
ldatopics = ldamodel.describeTopics()

def map_termID_to_Word(termIndices):
    words = []
    for termID in termIndices:
        words.append(vocab_broadcast.value[termID])
    
    return words

udf_map_termID_to_Word = udf(map_termID_to_Word , ArrayType(StringType()))
ldatopics_mapped = ldatopics.withColumn("description", udf_map_termID_to_Word(ldatopics.termIndices))

display(ldatopics_mapped.select(ldatopics_mapped.topic, ldatopics_mapped.description))


### Step 9: Evaluate the Model

In [42]:
# Lower the perplexity and higher the log likelihood, better the model
ll = ldamodel.logLikelihood(rescaledData)
lp = ldamodel.logPerplexity(rescaledData)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))

In [43]:
# Lower the perplexity and higher the log likelihood, better the model
ll = ldamodel.logLikelihood(ldaResults)
lp = ldamodel.logPerplexity(ldaResults)
print("The lower bound on the log likelihood of the entire corpus: " + str(ll))
print("The upper bound on perplexity: " + str(lp))

In [44]:
from pyspark.sql.types import FloatType
from pyspark.sql.functions import lit
def breakout_array(index_number, record):
    vectorlist = record.tolist()
    return vectorlist[index_number]

udf_breakout_array = udf(breakout_array, FloatType())
enrichedData = ldaResults                                                                   \
        .withColumn("topic_1", udf_breakout_array(lit(0), ldaResults.topicDistribution))  \
        .withColumn("topic_2", udf_breakout_array(lit(1), ldaResults.topicDistribution))  \
        .withColumn("topic_3", udf_breakout_array(lit(2), ldaResults.topicDistribution))  \
        .withColumn("topic_4", udf_breakout_array(lit(3), ldaResults.topicDistribution)) 
enrichedData.createOrReplaceTempView("ldaResultsFromVisits")

In [45]:
from pyspark.sql.types import FloatType
from pyspark.sql.functions import lit
def breakout_array(index_number, record):
    vectorlist = record.tolist()
    return vectorlist[index_number]

udf_breakout_array = udf(breakout_array, FloatType())
enrichedData = rescaledData                                                                   \
        .withColumn("topic_1", udf_breakout_array(lit(0), rescaledData.topicDistribution))  \
        .withColumn("topic_2", udf_breakout_array(lit(1), rescaledData.topicDistribution))  \
        .withColumn("topic_3", udf_breakout_array(lit(2), rescaledData.topicDistribution))  \
        .withColumn("topic_4", udf_breakout_array(lit(3), rescaledData.topicDistribution)) 
enrichedData.createOrReplaceTempView("ldaResults")

In [46]:
%sql select * from ldaresults

In [47]:
%sql 

SELECT LOINC_NUM, RELATEDNAMES2, COMPONENT,
    CASE
        WHEN Topic_1 >= Topic_2 AND Topic_1 >= Topic_3 AND Topic_1 >= Topic_4 THEN "Topic_1" 
        WHEN Topic_2 >= Topic_1 AND Topic_2 >= Topic_3 AND Topic_2 >= Topic_4 THEN "Topic_2"
        WHEN Topic_3 >= Topic_1 AND Topic_3 >= Topic_2 AND Topic_3 >= Topic_4 THEN "Topic_3"
        WHEN Topic_4 >= Topic_1 AND Topic_4 >= Topic_2 AND Topic_4 >= Topic_3 THEN "Topic_4"
        ELSE NULL
    END AS ISSUE
    FROM LDARESULTS

In [48]:
%sql SELECT A.ID, A.NAME, A.DATE, A.REASON_FOR_VISIT, B.LOINC_NUM FROM VISITS A JOIN LOINC B ON A.ISSUE == B.ISSUE

### Step 10: Get recommendations

In [50]:
from pyspark.sql.functions import *

visits_lda = spark.sql('SELECT ID, NAME, DATE, REASON_FOR_VISIT, CASE \
        WHEN Topic_1 >= Topic_2 AND Topic_1 >= Topic_3 AND Topic_1 >= Topic_4 THEN "Topic_1"  \
        WHEN Topic_2 >= Topic_1 AND Topic_2 >= Topic_3 AND Topic_2 >= Topic_4 THEN "Topic_2" \
        WHEN Topic_3 >= Topic_1 AND Topic_3 >= Topic_2 AND Topic_3 >= Topic_4 THEN "Topic_3" \
        WHEN Topic_4 >= Topic_1 AND Topic_4 >= Topic_2 AND Topic_4 >= Topic_3 THEN "Topic_4" \
        ELSE NULL \
    END AS ISSUE \
    FROM LDARESULTSFROMVISITS \
    ORDER BY NAME')

loinc_lda = spark.sql('SELECT LOINC_NUM, RELATEDNAMES2, COMPONENT, \
    CASE \
        WHEN Topic_1 >= Topic_2 AND Topic_1 >= Topic_3 AND Topic_1 >= Topic_4 THEN "Topic_1" \
        WHEN Topic_2 >= Topic_1 AND Topic_2 >= Topic_3 AND Topic_2 >= Topic_4 THEN "Topic_2" \
        WHEN Topic_3 >= Topic_1 AND Topic_3 >= Topic_2 AND Topic_3 >= Topic_4 THEN "Topic_3" \
        WHEN Topic_4 >= Topic_1 AND Topic_4 >= Topic_2 AND Topic_4 >= Topic_3 THEN "Topic_4" \
        ELSE NULL \
    END AS ISSUE \
    FROM LDARESULTS')

visits_lda.createOrReplaceTempView("VISITS")
loinc_lda.createOrReplaceTempView("LOINC")

recom_loinc = spark.sql("SELECT A.ID, A.NAME, A.DATE, A.REASON_FOR_VISIT, COLLECT_LIST(B.LOINC_NUM) AS POSSIBLE_CODES, COLLECT_LIST(B.COMPONENT) AS RELATED_TERMS FROM VISITS A JOIN LOINC B ON A.ISSUE == B.ISSUE GROUP BY ID, NAME, DATE, REASON_FOR_VISIT")
# recom_loinc.groupBy("name", "id", "date", "reason_for_visit").agg(collect_list("LOINC_NUM").alias("loinc recommendations"))

display(recom_loinc)
