### Extract ICD10 codings and Drug entities from CORD19 research articles

#### Group Project: Group 4 (Brad, Divya, Rameez, Reshma)

#### Dataset link : https://www.kaggle.com/allen-institute-for-ai/CORD-19-research-challenge

### SparkNLP References:
   * https://github.com/JohnSnowLabs/spark-nlp-workshop/tree/master/tutorials

### Software version to install

In [2]:
import os

In [3]:
! apt-get update -qq

In [4]:
! apt-get install -y openjdk-8-jdk-headless -qq > /dev/null

In [5]:
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]

In [6]:
! java -version

openjdk version "1.8.0_265"
OpenJDK Runtime Environment (build 1.8.0_265-8u265-b01-0ubuntu2~18.04-b01)
OpenJDK 64-Bit Server VM (build 25.265-b01, mixed mode)


In [7]:
license_keys = {'SECRET': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxx',
 'SPARK_NLP_LICENSE': 'xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx',
 'AWS_ACCESS_KEY_ID': 'xxxxxxxxxxxxxxx',
 'AWS_SECRET_ACCESS_KEY': 'xxxxxxxxxxxxxxxxxxxxxx'}

In [8]:
secret = license_keys['SECRET']
os.environ['SPARK_NLP_LICENSE'] = license_keys['SPARK_NLP_LICENSE']
os.environ['AWS_ACCESS_KEY_ID']= license_keys['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY'] = license_keys['AWS_SECRET_ACCESS_KEY']
jsl_version = '2.5.5'
version = '2.5.5'

In [9]:
!pip install --ignore-installed -q pyspark==2.4.4

In [10]:
!python -m pip install --upgrade spark-nlp-jsl==$jsl_version  --extra-index-url https://pypi.johnsnowlabs.com/$secret

Looking in indexes: https://pypi.org/simple, https://pypi.johnsnowlabs.com/2.5.5-9418714c2732a91f4b4d0181ae277931a226068e
Requirement already up-to-date: spark-nlp-jsl==2.5.5 in /opt/conda/anaconda/lib/python3.6/site-packages (2.5.5)


In [11]:
!pip install --ignore-installed -q spark-nlp==$version

### Load pySpark and SparkNLP packages

In [12]:
import sys, os, time
from sparknlp.base import *
from sparknlp.annotator import *
from sparknlp.util import *
from sparknlp_jsl.annotator import *

from sparknlp.pretrained import ResourceDownloader
from pyspark.sql.functions import *
from pyspark.sql import functions as F
from pyspark.ml import Pipeline, PipelineModel
import sparknlp_jsl

spark = sparknlp_jsl.start(secret)

### Define SparkNLP Clinical Pipelines

#### Common Pipeline - Clinical NER Pipeline creation

In [14]:
# Annotator that transforms a text column from dataframe into an Annotation ready for NLP

documentAssembler = DocumentAssembler()\
  .setInputCol("text")\
  .setOutputCol("document")

# Sentence Detector annotator, processes various sentences per line

sentenceDetector = SentenceDetector()\
  .setInputCols(["document"])\
  .setOutputCol("sentence")\
  .setCustomBounds([","])

# Tokenizer splits words in a relevant format for NLP

tokenizer = Tokenizer()\
  .setInputCols(["sentence"])\
  .setOutputCol("raw_token")\
  .setSplitChars([" ",",","\(","\)"])

# Tokenizer splits words in a relevant format for NLP

stopwords = StopWordsCleaner()\
  .setInputCols(["raw_token"])\
  .setOutputCol("token")

# WordEmbeddingsModel pretrained "embeddings_clinical" includes a model of 1.6Gb that needs to be downloaded

word_embeddings = WordEmbeddingsModel.pretrained("embeddings_clinical", "en", "clinical/models")\
  .setInputCols(["sentence", "token"])\
  .setOutputCol("embeddings")

ner_converter = NerConverterInternal() \
  .setInputCols(["sentence", "token", "ner"]) \
  .setOutputCol("greedy_chunk")\
  .setWhiteList(['TREATMENT'])

chunk_embeddings = ChunkEmbeddings()\
    .setInputCols("greedy_chunk", "embeddings")\
    .setOutputCol("chunk_embeddings")

embeddings_clinical download started this may take some time.
Approximate size to download 1.6 GB
[OK!]


#### Pipeline 1 - ICD 10 CM

In [None]:
# Entity Resolution Pretrained Models icd10 cm
icd10cmResolver2 = ChunkEntityResolverModel.pretrained('chunkresolve_icd10cm_diseases_clinical', 'en', "clinical/models")\
    .setEnableLevenshtein(True)\
    .setNeighbours(200).setAlternatives(5).setDistanceWeights([3,3,2,0,0,7])\
    .setInputCols('token', 'chunk_embs_jsl')\
    .setOutputCol('icd10cm_resolution')

#### Pipeline 2 - ICD 10 PCS

In [None]:
# Entity Resolution Pretrained Models icd10 pcs
icd10pcsResolver2 = ChunkEntityResolverModel.pretrained('chunkresolve_icd10pcs_clinical', 'en', "clinical/models")\
    .setEnableLevenshtein(True)\
    .setNeighbours(200).setAlternatives(5).setDistanceWeights([3,3,2,0,0,7])\
    .setInputCols('token', 'chunk_embs_jsl')\
    .setOutputCol('icd10pcs_resolution')

#### Pipeline 3 - Drug - RxNorm

In [15]:
rxnorm_resolver1 = ChunkEntityResolverModel()\
    .pretrained('chunkresolve_rxnorm_sbd_clinical', 'en', "clinical/models")\
    .setEnableLevenshtein(True)\
    .setNeighbours(200).setAlternatives(5).setDistanceWeights([3,11,0,0,0,9])\
    .setInputCols('token', 'chunk_embeddings')\
    .setOutputCol('rxnorm_resolution')\
    .setPoolingStrategy("MAX")

posology_ner = NerDLModel.pretrained("ner_posology", "en", "clinical/models") \
  .setInputCols(["sentence", "token", "embeddings"]) \
  .setOutputCol("ner")

ner_converter1 = NerConverterInternal() \
  .setInputCols(["sentence", "token", "ner"]) \
  .setOutputCol("ner_chunk")

chunk_merge = ChunkMergeApproach().setInputCols("ner_chunk","ner_chunk").setOutputCol("merged_chunk")\
  .setReplaceDictResource("replace_dict.csv","TEXT", {"delimiter":","})

iob_tagger = IOBTagger().setInputCols("token","merged_chunk").setOutputCol("merged_ner")

ner_converter2 = NerConverterInternal() \
  .setInputCols(["sentence", "token", "merged_ner"]) \
  .setOutputCol("greedy_chunk")\
  .setPreservePosition(False)\
  .setGreedyMode(True)\
  .setWhiteList(['DRUG'])

# Annotators responsible for the Cancer Genetics Entity Recognition task
jslNer = NerDLModel.pretrained('ner_jsl', 'en', "clinical/models")\
    .setInputCols('sentence', 'token', 'embeddings')\
    .setOutputCol('ner_jsl')

drugNer = NerDLModel.pretrained('ner_drugs', 'en', "clinical/models")\
    .setInputCols('sentence', 'token', 'embeddings')\
    .setOutputCol('ner_drug')

#Converter annotators transform IOB tags into full chunks (sequence set of tokens) tagged with `entity` metadata
jslConverter = NerConverter()\
    .setInputCols('sentence', 'token', 'ner_jsl')\
    .setOutputCol('chunk_jsl')\
    .setWhiteList(["Diagnosis"])

drugConverter = NerConverter()\
    .setInputCols('sentence', 'token', 'ner_drug')\
    .setOutputCol('chunk_drug')

#ChunkEmbeddings annotators aggregate embeddings for each token in the chunk
jslChunkEmbeddings = ChunkEmbeddings()\
  .setInputCols('chunk_jsl', 'embeddings')\
  .setOutputCol('chunk_embs_jsl')

drugChunkEmbeddings = ChunkEmbeddings()\
  .setInputCols('chunk_drug', 'embeddings')\
  .setOutputCol('chunk_embs_drug')

rxnormResolver2 = ChunkEntityResolverModel()\
    .pretrained('chunkresolve_rxnorm_scd_clinical', 'en', "clinical/models")\
    .setEnableLevenshtein(True)\
    .setNeighbours(200).setAlternatives(5).setDistanceWeights([3,3,2,0,0,7])\
    .setInputCols('token', 'chunk_embs_drug')\
    .setOutputCol('rxnorm_resolution')\

chunkresolve_rxnorm_sbd_clinical download started this may take some time.
Approximate size to download 17.9 MB
[OK!]
ner_posology download started this may take some time.
Approximate size to download 13.7 MB
[OK!]


### Assembled Pipeline

#### Assemble pipeline 1: ICD10 CM + RxNorm

In [20]:
pipelineicdrx = Pipeline().setStages([
    documentAssembler, 
    sentenceDetector, 
    tokenizer, 
    stopwords, 
    word_embeddings, 
    jslNer,
    drugNer,
    jslConverter,
    drugConverter,
    jslChunkEmbeddings, 
    drugChunkEmbeddings,
    icd10cmResolver2,
    rxnormResolver2
])

#### Assemble pipeline 2: ICD10 PCS + RxNorm

In [None]:
pipelinepcsrx = Pipeline().setStages([
    documentAssembler, 
    sentenceDetector, 
    tokenizer, 
    stopwords, 
    word_embeddings, 
    jslNer,
    drugNer,
    jslConverter,
    drugConverter,
    jslChunkEmbeddings, 
    drugChunkEmbeddings,
    icd10pcsResolver2,
    rxnormResolver2
])

### Functions 

In [21]:
def quick_metadata_analysis(df, doc_field, chunk_field, code_fields):
    code_res_meta = ", ".join([f"{cf}.metadata" for cf in code_fields])
    expression = f"explode(arrays_zip({chunk_field}.begin, {chunk_field}.end, {chunk_field}.result, {chunk_field}.metadata, "+code_res_meta+")) as a"
    top_n_rest = [(f"float(a['{i+4}'].confidence) as {(cf.split('_')[0])}_conf",
                    f"arrays_zip(split(a['{i+4}'].all_k_results,':::'),split(a['{i+4}'].all_k_resolutions,':::')) as {cf.split('_')[0]+'_opts'}")
                    for i, cf in enumerate(code_fields)]
    top_n_rest_args = []
    for tr in top_n_rest:
        for t in tr:
            top_n_rest_args.append(t)
    return df.selectExpr(doc_field, expression) \
        .orderBy('doc_id', F.expr("a['0']"), F.expr("a['1']"))\
        .selectExpr(f"concat_ws('::',{doc_field},a['0'],a['1']) as coords", "a['2'] as chunk","a['3'].entity as entity", *top_n_rest_args)

## Load Data

In [26]:
data = spark.read.csv('gs://cord-19-group-4/google-cloud-dataproc-metainfo/cord19_V2.csv', header=True).repartition(4)

### Filtering out null abstracts and body_text

In [None]:
data_abstract = data.filter(data.abstract.isNotNull())
data_abstract = data_abstract.sample(0.005)

In [27]:
data_body_text = data.filter(data.body_text.isNotNull())
data_body_text = data_body_text.sample(0.05)

In [28]:
data_abstract_part = data_abstract.select('cord_uid', 'abstract').toDF('doc_id', 'text')

In [29]:
data_body_part     = data_body_text.select('cord_uid', 'body_text').toDF('doc_id', 'text')

### Call Pipelines

In [30]:
icdPipeline  = pipelineicdrx.fit(data_abstract_part)
output_icd   = icdPipeline.transform(data_abstract_part)

In [31]:
drugPipeline = pipelineicdrx.fit(data_body_part)
output_drug  = drugPipeline.transform(data_body_part)

In [None]:
pcsPipeline  = pipelinepcsrx.fit(data_abstract_part)
output_pcs   = pcsPipeline.transform(data_abstract_part)

In [None]:
drugpcsPipeline  = pipelinepcsrx.fit(data_body_part)
output_drug_pcs   = icdPipeline.transform(data_body_part)

### ICD10 CM + rxNorm Analysis

In [32]:
output_icd.cache()

DataFrame[doc_id: string, text: string, document: array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>, sentence: array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>, raw_token: array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>, token: array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>, embeddings: array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>, ner_jsl: array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>, ner_drug: array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>, chunk_jsl: array<struct<annotatorType:string

In [33]:
output_drug.cache()

DataFrame[doc_id: string, text: string, document: array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>, sentence: array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>, raw_token: array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>, token: array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>, embeddings: array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>, ner_jsl: array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>, ner_drug: array<struct<annotatorType:string,begin:int,end:int,result:string,metadata:map<string,string>,embeddings:array<float>>>, chunk_jsl: array<struct<annotatorType:string

In [None]:
# output.write.mode("overwrite").save("temp1")

In [None]:
## save this data on disk - and load it again to make it faster

In [None]:
# output = spark.read.load("temp1")

### Analysis of outputs

In [34]:
icd10cm_analysis = quick_metadata_analysis(output_icd, 'doc_id', 'chunk_jsl',['icd10cm_resolution'])

In [35]:
rxnorm_analysis = \
quick_metadata_analysis(output_drug, 'doc_id', 'chunk_drug',['rxnorm_resolution'])

#### Selected documents where confidence score was > 0.6 for icd10 and rxnorm

In [36]:
icd_sel = icd10cm_analysis.filter(icd10cm_analysis.icd10cm_conf>0.6)

In [37]:
icd_sel.cache()

DataFrame[coords: string, chunk: string, entity: string, icd10cm_conf: float, icd10cm_opts: array<struct<0:string,1:string>>]

In [38]:
rxnorm_sel = rxnorm_analysis.filter(rxnorm_analysis.rxnorm_conf>0.6)

In [39]:
rxnorm_sel.cache()

DataFrame[coords: string, chunk: string, entity: string, rxnorm_conf: float, rxnorm_opts: array<struct<0:string,1:string>>]

In [40]:
# rxnorm_sel_c = rxnorm_sel.collect()

In [41]:
# icd_sel_c = icd_sel.collect()

### Convert to Pandas Dataframe

In [42]:
import pandas as pd

In [43]:
pd.set_option('display.max_colwidth', 250)
pd.set_option('display.max_rows', 500)

#### Unable to store all data from spark dataframe into pandas dataframe; tried parquet, collect(), cache(), setting up spark driver memory, executive memeory...

In [52]:
icd_sel     = icd_sel.limit(100)
icd_sel_pdf = icd_sel.toPandas()

In [50]:
rxnorm_sel     = rxnorm_sel.limit(100)
rxnorm_sel_pdf = rxnorm_sel.toPandas()

### Visualize the output

In [53]:
icd_sel_pdf.head(10)

Unnamed: 0,coords,chunk,entity,icd10cm_conf,icd10cm_opts
0,0jxuedd8::343::370,DCDA twin gestations and HDP,Diagnosis,0.9644,"[(F842, Rett's syndrome), (L0882, Omphalitis not of newborn), (E031, Congenital hypothyroidism without goiter), (N6489, Other specified disorders of breast), (N471, Phimosis)]"
1,0jxuedd8::524::526,HDP,Diagnosis,0.6274,"[(E7141, Primary carnitine deficiency), (D593, Hemolytic-uremic syndrome), (E876, Hypokalemia), (E222, Syndrome of inappropriate secretion of antidiuretic hormone), (F40210, Arachnophobia)]"
2,0jxuedd8::587::589,HDP,Diagnosis,0.6274,"[(E7141, Primary carnitine deficiency), (D593, Hemolytic-uremic syndrome), (E876, Hypokalemia), (E222, Syndrome of inappropriate secretion of antidiuretic hormone), (F40210, Arachnophobia)]"
3,0jxuedd8::790::792,HDP,Diagnosis,0.6274,"[(E7141, Primary carnitine deficiency), (D593, Hemolytic-uremic syndrome), (E876, Hypokalemia), (E222, Syndrome of inappropriate secretion of antidiuretic hormone), (F40210, Arachnophobia)]"
4,0jxuedd8::985::993,early HDP,Diagnosis,0.7483,"[(G300, Alzheimer's disease with early onset), (E301, Precocious puberty), (F840, Autistic disorder), (G301, Alzheimer's disease with late onset), (G650, Sequelae of Guillain-Barre syndrome)]"
5,28ci6cfx::1767::1782,renal impairment,Diagnosis,0.8962,"[(N289, Disorder of kidney and ureter, unspecified), (N189, Chronic kidney disease, unspecified), (N059, Unspecified nephritic syndrome with unspecified morphologic changes), (N250, Renal osteodystrophy), (N186, End stage renal disease)]"
6,2ivzvkw8::258::265,aneurysm,Diagnosis,0.8604,"[(I729, Aneurysm of unspecified site), (I723, Aneurysm of iliac artery), (I253, Aneurysm of heart), (I719, Aortic aneurysm of unspecified site, without rupture), (I2541, Coronary artery aneurysm)]"
7,3g2u0i71::50::76,Peripheral Arterial Disease,Diagnosis,0.7399,"[(I739, Peripheral vascular disease, unspecified), (I7389, Other specified peripheral vascular diseases), (I798, Other disorders of arteries, arterioles and capillaries in diseases classified elsewhere), (I7789, Other specified disorders of arter..."
8,3g2u0i71::408::410,PAD,Diagnosis,1.0,"[(I739, Peripheral vascular disease, unspecified), (I2510, Atherosclerotic heart disease of native coronary artery without angina pectoris), (I25790, Atherosclerosis of other coronary artery bypass graft(s) with unstable angina pectoris), (I25799..."
9,3vthgenz::1167::1186,clotting were sepsis,Diagnosis,0.9031,"[(A4189, Other specified sepsis), (A267, Erysipelothrix sepsis), (A327, Listerial sepsis), (A227, Anthrax sepsis), (A427, Actinomycotic sepsis)]"


In [51]:
rxnorm_sel_pdf.head(12)

Unnamed: 0,coords,chunk,entity,rxnorm_conf,rxnorm_opts
0,09c5hw3j::320::325,serine,DrugChem,0.9898,"[(198931, Serine 600 MG Oral Capsule), (198925, Threonine 500 MG Oral Capsule), (199236, Threonine 500 MG Oral Tablet), (198923, Aspartic Acid 600 MG Oral Capsule), (432917, ethyl cysteine 150 MG Oral Tablet)]"
1,0hdi3izd::129::135,acetone,DrugChem,0.6342,"[(308569, Acetone 46 MG/ML Topical Solution), (616795, Acetone 60 MG/ML / Ethanol 0.15 ML/ML Medicated Shampoo), (310418, Formaldehyde 77 MG/ML Topical Solution), (244646, Formaldehyde 40 MG/ML Topical Solution), (200297, Ethanol 7 ML/ML Topical ..."
2,0hdi3izd::356::366,isopropanol,DrugChem,0.7249,"[(308569, Acetone 46 MG/ML Topical Solution), (616795, Acetone 60 MG/ML / Ethanol 0.15 ML/ML Medicated Shampoo), (307802, Ethanol 7 ML/ML Topical Solution), (250678, Ethanol 7 ML/ML Topical Lotion), (248180, Ethanol 1 ML/ML Topical Solution)]"
3,0hdi3izd::379::385,acetone,DrugChem,0.6342,"[(308569, Acetone 46 MG/ML Topical Solution), (616795, Acetone 60 MG/ML / Ethanol 0.15 ML/ML Medicated Shampoo), (310418, Formaldehyde 77 MG/ML Topical Solution), (244646, Formaldehyde 40 MG/ML Topical Solution), (200297, Ethanol 7 ML/ML Topical ..."
4,8j7wjixc::952::977,clomiphene and toremiphene,DrugChem,0.7092,"[(1093060, Clomiphene Citrate 50 MG Oral Tablet), (432632, Lynestrenol 5 MG Oral Tablet), (247495, Buserelin 1 MG/ML Injectable Solution), (250627, nomegestrol 5 MG Oral Tablet), (246108, Chlormadinone 5 MG Oral Tablet)]"
5,8speml4j::594::599,serine,DrugChem,0.9898,"[(198931, Serine 600 MG Oral Capsule), (198925, Threonine 500 MG Oral Capsule), (199236, Threonine 500 MG Oral Tablet), (198923, Aspartic Acid 600 MG Oral Capsule), (432917, ethyl cysteine 150 MG Oral Tablet)]"
6,9fbufpxz::1158::1167,phosphorus,DrugChem,0.995,"[(410881, calcium phosphate 28.7 MG / Phosphorus 53.3 MG Oral Capsule), (309003, Carbon Dioxide 9 % / Nitrogen 91 % Gas for Inhalation), (309001, Carbon Dioxide 5 % / Nitrogen 95 % Gas for Inhalation), (348459, Nitrogen 70 % / Oxygen 30 % Gas for..."
7,9fbufpxz::1260::1269,phosphorus,DrugChem,0.995,"[(410881, calcium phosphate 28.7 MG / Phosphorus 53.3 MG Oral Capsule), (309003, Carbon Dioxide 9 % / Nitrogen 91 % Gas for Inhalation), (309001, Carbon Dioxide 5 % / Nitrogen 95 % Gas for Inhalation), (348459, Nitrogen 70 % / Oxygen 30 % Gas for..."
8,bpw2m5fb::698::707,creatinine,DrugChem,0.9996,"[(849628, Creatinine 800 MG Oral Capsule), (252180, Urea 10 MG/ML Topical Lotion), (424168, Urea 30 MG/ML Topical Lotion), (251705, Urea 20 MG/ML Topical Lotion), (245052, Urea 200 MG/ML Oral Solution)]"
9,dj5ia9l8::811::834,benzene and formaldehyde,DrugChem,0.956,"[(315104, Formaldehyde 10 MG/ML / Isopropyl Alcohol 8.7 ML/ML Topical Solution), (348346, dichlorodifluoromethane 100 % Gas for Inhalation), (1249574, chlorine dioxide 1 MG/ML Topical Solution), (1249581, chlorine dioxide 10 MG/ML Topical Solutio..."
