In [None]:
#parameters
#bucket: str for s3 bucket name
#get_tokens: if True, gets NER tokens; if False, gets assertions
#use_clean: if True, uses cleaned_notes column; if False, uses raw notes colum
#import name: str for input file in s3 bucket
#export_name: str for beginning exported filenames (do not include '.csv')
bucket = ""
get_tokens = True
use_clean = True
import_name = "data/note_set_cleaned.csv"
export_name = "data/processed_notes"

In [None]:
import boto3
import json
import os
import pandas as pd
import sys

s3_client = boto3.client('s3')
s3_response = s3_client.get_object(
    Bucket=bucket,
    Key='license/sparknlp_for_healthcare.5.5.1.json'
)
s3_object_body = s3_response.get('Body')
license_keys=json.load(s3_object_body)

locals().update(license_keys)

# Update specific env variables
os.environ.update({ k: license_keys[k] for k in 
    ( 
        "SECRET",
        "PUBLIC_VERSION",
        "JSL_VERSION",
        'SPARK_NLP_LICENSE'
    )
})

In [None]:
# Installing pyspark and spark-nlp
%pip install --upgrade -q pyspark==3.4.1 spark-nlp==$PUBLIC_VERSION 

# Installing Spark NLP Healthcare
%pip install --upgrade -q spark-nlp-jsl==$JSL_VERSION  --extra-index-url https://pypi.johnsnowlabs.com/$SECRET

# Installing Spark NLP Display Library for visualization
%pip install -q spark-nlp-display 

# svgwrite==1.4
# py4j==0.10.9.5

In [None]:
import pyspark
import sparknlp_jsl
from pyspark import SparkContext
from pyspark.ml import Pipeline
from pyspark.sql import Row, SparkSession
from sparknlp.annotator import *
from sparknlp.base import *
from sparknlp.common import *
from sparknlp_jsl.annotator import *

In [None]:
num_cpus = os.cpu_count()

params_dict = {}
params_dict[96] = {
    "spark.network.timeout": "600s",    #default 
    "spark.driver.memory": "200G",      #  200G + 172GB <= 384 GB
    "spark.executor.memory": "2200M",
    #"spark.executor.memory": "1800M",     # 96 cores * 1.8GB = ~ 172 GB
    "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
    "spark.kryoserializer.buffer.max": "2000M",
    "spark.driver.maxResultSize": "4G",    # 96 * 2 = 192GB < 200G
    #"spark.memory.offHeap.enabled": "true",
}

params_dict[48] = {
    "spark.network.timeout": "600s",    #default 
    "spark.driver.memory": "100G",      #  100G + 87G <= 192 GB
    "spark.executor.memory": "1800M",     # 48 cores * 1.8GB = ~87 GB
    "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
    "spark.kryoserializer.buffer.max": "2000M",
    "spark.driver.maxResultSize": "4G",    # 48 * 2 = 96GB < 100G
    #"spark.memory.offHeap.enabled": "true",
}

params_dict[32] = {
    "spark.network.timeout": "600s",    #default 
    "spark.driver.memory": "100G",      
    "spark.executor.memory": "1800M",     
    "spark.serializer": "org.apache.spark.serializer.KryoSerializer",
    "spark.kryoserializer.buffer.max": "2000M",
    "spark.driver.maxResultSize": "4G",    
    #"spark.memory.offHeap.enabled": "true",
}
params = params_dict[num_cpus]

spark = sparknlp_jsl.start(license_keys['SECRET'], gpu=False, params=params)

In [None]:
print("Spark NLP Version :", sparknlp.version())
print("Spark NLP_JSL Version :", sparknlp_jsl.version())

spark

In [None]:
# Annotator that transforms a text column from dataframe into an Annotation ready for NLP
documentAssembler = DocumentAssembler()\
    .setInputCol("text")\
    .setOutputCol("document")

# Sentence Detector annotator, processes various sentences per line
sentenceDetector = SentenceDetector()\
     .setInputCols(["document"])\
     .setOutputCol("sentence")\
     .setCustomBounds(["  ", ".'"])\
     .setSplitLength(150)

# Tokenizer splits words in a relevant format for NLP
tokenizer = Tokenizer()\
    .setInputCols(["sentence"])\
    .setOutputCol("token")

In [None]:
#Our three NER models
clinical_ner = MedicalNerModel.pretrained("ner_jsl", "en", "clinical/models") \
    .setInputCols(["sentence", "token", "embeddings"]) \
    .setOutputCol("clinical_ner")

wip_ner = MedicalNerModel.pretrained("jsl_ner_wip_clinical", "en", "clinical/models") \
    .setInputCols(["sentence", "token", "embeddings"]) \
    .setOutputCol("wip_ner")

bert_ner = MedicalBertForTokenClassifier.pretrained("bert_token_classifier_ner_jsl",
                                                        "en",
                                                        "clinical/models")\
    .setInputCols("token", "sentence")\
    .setOutputCol("bert_ner")

clinical_ner_converter = NerConverterInternal() \
    .setInputCols(["sentence", "token", "clinical_ner"]) \
    .setOutputCol("clinical_ner_chunk")\
    .setWhiteList(["SYMPTOM","VS_FINDING","DISEASE_SYNDROME_DISORDER","ADMISSION_DISCHARGE","PROCEDURE"])

wip_ner_converter = NerConverterInternal() \
    .setInputCols(["sentence", "token", "wip_ner"]) \
    .setOutputCol("wip_ner_chunk")\
    .setWhiteList(["SYMPTOM","VS_FINDING","DISEASE_SYNDROME_DISORDER","ADMISSION_DISCHARGE","PROCEDURE"])

bert_ner_converter = NerConverterInternal() \
    .setInputCols(["sentence", "token", "bert_ner"]) \
    .setOutputCol("bert_ner_chunk")\
    .setWhiteList(["SYMPTOM","VS_FINDING","DISEASE_SYNDROME_DISORDER","ADMISSION_DISCHARGE","PROCEDURE"])

#Merger model - when chunks conflict, this one uses the output with the highest confidence score
chunk_merger = ChunkMergeApproach()\
    .setInputCols(["clinical_ner_chunk", "wip_ner_chunk", "bert_ner_chunk"])\
    .setOutputCol("ner_chunk")\
    .setOrderingFeatures(["ChunkConfidence"])\
    .setSelectionStrategy("Sequential")

# Assertion model trained on i2b2 (sampled from MIMIC) dataset
clinical_assertion = AssertionDLModel.pretrained("assertion_jsl_augmented", "en", "clinical/models") \
    .setInputCols(["sentence", "ner_chunk", "embeddings"]) \
    .setOutputCol("assertion")

In [None]:
#Create the pipeline
nlpPipeline_difdetect = Pipeline(stages=[
    documentAssembler, 
    sentenceDetector,
    tokenizer,
    word_embeddings,
    clinical_ner,
    clinical_ner_converter,
    wip_ner,
    wip_ner_converter,
    bert_ner,
    bert_ner_converter,
    chunk_merger,
    clinical_assertion
    ])

empty_data = spark.createDataFrame([[""]]).toDF("text")

model_difdetect = nlpPipeline_difdetect.fit(empty_data)

In [None]:
if use_clean:
    text = pd.read_csv(f"s3://{bucket}/{import_name}", usecols=["notes_id", "cleaned_notes"] )
    text.notes_id = text.notes_id.astype(str)
    text = text.drop_duplicates(subset=["notes_id"]).reset_index(drop=True)
    text = text.rename(columns={"cleaned_notes":"text"})
else:
    text = pd.read_csv(f"s3://{bucket}/{import_name}", usecols=["notes", "notes_id"]) #, nrows=1000)
    text.notes_id = text.notes_id.astype(str)
    text = text.drop_duplicates(subset=["notes_id"]).reset_index(drop=True)
    text = text.rename(columns={"notes":"text"})

ltext = list(text["text"])

In [None]:
divide = 700
start = 0
itn = 0

while start < len(text):
    text_df = text.iloc[start:start+divide,].copy()
    text_df.iteritems = text_df.items
    text_df = spark.createDataFrame(data=text_df)
    light_result_dif = model_difdetect.transform(text_df)
    lr_df = light_result_dif.toPandas()

    begin = []
    end = []
    length = []
    chunk = []
    entity = []
    status = []
    confidence = []
    notes_id = []
    
    def get_info(row):
        for i in range(len(row.assertion)):
            begin.append(row.ner_chunk[i].begin)
            end.append(row.ner_chunk[i].end)
            length.append(row.ner_chunk[i].end - row.ner_chunk[i].begin)
            chunk.append(row.ner_chunk[i].result)
            entity.append(row.ner_chunk[i].metadata["entity"])
            status.append(row.assertion[i].result)
            confidence.append(row.assertion[i].metadata["confidence"])
            notes_id.append(row.notes_id)
    

    lr_df.apply(get_info, axis=1)
    pd.set_option('max_colwidth', None)
    pd.set_option('display.max_rows', 100)

    df = pd.DataFrame({'chunks':chunk, 'chunk_begin':begin, 'chunk_end': end, 'chunk_len': length,
                         'entities':entity, 'assertion':status, 'confidence':confidence, 'notes_id':notes_id})
    

    df.to_csv(f"s3://{bucket}/{export_name}_{itn}.csv")
    del df

    lrdn = light_result_dif.select("token.result", "token.begin", "token.end", "clinical_ner.result", 
                                       "wip_ner.result", "bert_ner.result", "notes_id")
    lrdn = lrdn.toDF("token", "token.begin", "token.end", "clinical_ner",
                      "wip_ner", "bert_ner", "notes_id")

    pd_df = lrdn.toPandas()
    pd_df.to_csv(f"s3://{bucket}/{export_name}_{itn}_tok.csv")
    itn += 1
    start += divide

In [None]:
conc = []
for n in range(0, itn):
    conc.append(pd.read_csv(f"s3://{bucket}/{export_name}_{n}.csv"))

if len(conc) > 1:
    conc = pd.concat(conc).reset_index(drop=True)
else:
    conc = conc[0]
    
conc = conc[['chunks', 'chunk_begin', 'chunk_end', 'chunk_len', 'entities', 'assertion', 'confidence', 'notes_id']]
conc.to_csv(f"s3://{bucket}/{export_name}_assertions.csv", index=False)
del conc
for n in range(0, itn):
    #clean up
    s3_client.delete_object(Bucket=bucket, Key=export_name + "_" + str(n) + ".csv")

In [None]:
pd_df = []
for n in range(0, itn):
    pd_df.append(pd.read_csv(f"s3://{bucket}/{export_name}_{n}_tok.csv"))
if len(pd_df) > 1:
    pd_df = pd.concat(pd_df).reset_index(drop=True)
else:
    pd_df = pd_df[0]    

#Clean up/re-listify various columns
pd_df = pd_df[pd_df["token.begin"] != '[]']
pd_df = pd_df[pd_df["token.end"] != '[]']
pd_df["token.begin"] = pd_df["token.begin"].apply(lambda x: [int(i) for i in x[1:-1].split(', ') if i != ''])
pd_df["token.end"] = pd_df["token.end"].apply(lambda x: [int(i) for i in x[1:-1].split(', ') if i != ''])
pd_df["token"] = pd_df["token"].apply(lambda x: x[1:-1].replace("'", "").split(', '))
pd_df["clinical_ner"] = pd_df["clinical_ner"].apply(lambda x: x[1:-1].replace("'", "").split(', '))
pd_df["wip_ner"] = pd_df["wip_ner"].apply(lambda x: x[1:-1].replace("'", "").split(', '))
pd_df["bert_ner"] = pd_df["bert_ner"].apply(lambda x: x[1:-1].replace("'", "").split(', '))
pd_df = pd_df[pd_df["token.begin"].map(len) > 0]
pd_df = pd_df[pd_df["token.end"].map(len) > 0]

#Finalize columns
pd_df = pd_df.reset_index()
pd_df = pd_df[["index", "token", "token.begin", "token.end",
               "clinical_ner", "wip_ner", "bert_ner", "notes_id"]]
pd_df = pd_df.set_axis(["note_no", "tokens", "token_start", "token_end",
                        "clinical_ner", "wip_ner", "BERT_ner", "notes_id"], axis=1)
pd_df["note_no"] = pd_df["note_no"].astype(int)

In [None]:
#Explode df
cons = []

def match_tokens(row):
    tokens = list(zip(row.tokens, row.token_start, row.token_end,
                       row.clinical_ner, row.wip_ner, row.BERT_ner))
    out = pd.DataFrame(tokens, columns=["tokens", "token_start", "token_end",
                                        "clinical_ner", "wip_ner", "BERT_ner"])
    out["note_no"] = row.note_no
    out["notes_id"] = row.notes_id
    cons.append(out)

pd_df = pd_df.apply(match_tokens, axis=1)
del pd_df
exploded_df = pd.concat(cons)
exploded_df["token_len"] = (exploded_df.token_end - exploded_df.token_start) + 1
exploded_df = exploded_df[["tokens", "token_start", "token_end", "token_len",
                          "clinical_ner", "wip_ner", "BERT_ner", "notes_id", "note_no"]]
exploded_df.reset_index(drop=True, inplace=True)

In [None]:
exploded_df.to_csv((f"s3://{bucket}/{export_name}_tokens.csv"), index=False)
for n in range(0, itn):
    #clean up
    s3_client.delete_object(Bucket=bucket, Key=export_name + "_" + str(n) + "_tok.csv")