In [None]:
!pip install pyspark==3.1.3 spark-nlp==3.4.2

Collecting pyspark==3.1.3
  Downloading pyspark-3.1.3.tar.gz (214.0 MB)
[K     |████████████████████████████████| 214.0 MB 7.8 kB/s 
[?25hCollecting spark-nlp==3.4.2
  Downloading spark_nlp-3.4.2-py2.py3-none-any.whl (142 kB)
[K     |████████████████████████████████| 142 kB 17.5 MB/s 
[?25hCollecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 47.2 MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.3-py2.py3-none-any.whl size=214463484 sha256=d6cbc18181103717a932ad46fe1078ca2d33a418a8cf7dc6c04c3a60075d2f76
  Stored in directory: /root/.cache/pip/wheels/ad/8e/49/44c110bb8e008d0778c6577d600d46047c6478ecca3f8f1517
Successfully built pyspark
Installing collected packages: py4j, spark-nlp, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.3 spark-nlp-3.4.2


In [None]:
import sparknlp

spark = sparknlp.start(gpu=True)

print("Spark NLP version: ", sparknlp.version())
print("Apache Spark version: ", spark.version)

spark

Spark NLP version:  3.4.2
Apache Spark version:  3.1.3


In [None]:
import sys

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline

from sparknlp.annotator import *
from sparknlp.common import *
from sparknlp.base import *

In [None]:
from pyspark.ml.feature import SQLTransformer
from pyspark.ml.feature import StringIndexer

In [None]:
import pandas as pd
import numpy as np

In [None]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


Slicing each document into parts with 1000 (or less) tokens each

In [None]:
from sklearn.model_selection import train_test_split
df_train, df_test=train_test_split(readmission,test_size=0.5, random_state=49)

# sub-sampling the negatives (non-readmitted) on the training set
df_train_readm=df_train[df_train.READMISSION_STATUS=='Readmitted']
df_train_non_readm=df_train[df_train.READMISSION_STATUS=='Non-readmitted']
df_train_sub = pd.concat([df_train_readm, df_train_non_readm.sample(n = len(df_train_readm), random_state = 50)],axis = 1.5)

In [None]:
df_train_sub=df_train_sub.reset_index(drop=True)
df_test=df_test.reset_index(drop=True)

In [None]:
import math
def thousand_split(text):
  sublist_list=[]
  token_list=text.split(" ")
  if len(token_list)<=1000:
    sublist_list.append(text)
  else:
      num_sublist=int(math.modf(len(token_list)/1000)[1]+1)
      sublist_list=[0]*num_sublist
      for i in range(num_sublist):
          sublist_list[i]=token_list[1000*i:1000*(i+1)]
          sublist_list[i]=" ".join(sublist_list[i])
  return(sublist_list)

In [None]:
TEXT_FILE_THOUSAND=[]
HADM_ID=[]
READMISSION=[]

from tqdm import tqdm
for i in tqdm(range(len(df_train_sub))):
  sublist_list=thousand_split(df_train_sub['TEXT_FILE'][i])
  admission_id=df_train_sub['HADM_ID'][i]
  readmission_status=df_train_sub['READMISSION_STATUS'][i]
  for sublist in sublist_list:
    TEXT_FILE_thousand.append(sublist)
    HADM_ID.append(admission_id)
    READMISSION.append(readmission_status)

In [None]:
df_train_sliced= pd.DataFrame(list(zip(HADM_ID,TEXT_FILE_thousand,READMISSION)),
               columns =['HADM_ID','TEXT_FILE','readmission_status'])

In [None]:
TEXT_FILE_thousand=[]
HADM_ID=[]
READMISSION=[]

from tqdm import tqdm
for i in tqdm(range(len(df_test))):
  sublist_list=thousand_split(df_test['TEXT_FILE'][i])
  admission_id=df_test['HADM_ID'][i]
  readmission_status=df_test['READMISSION_STATUS'][i]
  for sublist in sublist_list:
    TEXT_FILE_thousand.append(sublist)
    HADM_ID.append(admission_id)
    READMISSION.append(readmission_status)

In [None]:
df_test_sliced= pd.DataFrame(list(zip(HADM_ID,TEXT_FILE_thousand,READMISSION)),
               columns =['HADM_ID','TEXT_FILE','readmission_status'])

In [None]:
spark.conf.set("arrow.enabling", "true")
train = spark.createDataFrame(df_train_sliced)
test= spark.createDataFrame(df_test_sliced)

PIPELINES

In [None]:
%%time
# Produce pipeline for data cleaning and sentence(discharge summary) embedding
document_assembler = DocumentAssembler() \
      .setInputCol("TEXT_FILE") \
      .setOutputCol("document")
    
tokenizer = Tokenizer() \
      .setInputCols(["document"]) \
      .setOutputCol("token")

lemmatizer = Lemmatizer() \
    .setInputCols(["token"]) \
    .setOutputCol("lemma") \
    .setDictionary("gdrive/MyDrive/Colab_notebook/lemma.txt", value_delimiter ="\t", key_delimiter = "->")

bert_embeddings = BertEmbeddings.pretrained()\
  .setInputCols(["document","lemma"])\
  .setOutputCol("bert_embeddings")\
  .setCaseSensitive(False)

embeddingsSentence = SentenceEmbeddings() \
      .setInputCols(["document", "bert_embeddings"]) \
      .setOutputCol("sentence_embeddings") \
      .setPoolingStrategy("AVERAGE")

embeddings_finisher = EmbeddingsFinisher() \
      .setInputCols(["sentence_embeddings"]) \
      .setOutputCols(["finished_sentence_embeddings"]) \
      .setOutputAsVector(True)\
      .setCleanAnnotations(False)

explodeVectors = SQLTransformer(statement=
      "SELECT EXPLODE(finished_sentence_embeddings) AS features, * FROM __THIS__")

label_stringIdx = StringIndexer(inputCol = "readmission_status", outputCol = "label")

nlp_pipeline_Bert = Pipeline(
stages=[document_assembler, 
          tokenizer,
          lemmatizer,
          bert_embeddings,
          embeddingsSentence,
          embeddings_finisher,
          explodeVectors,
          label_stringIdx])

In [None]:
nlp_bert_thousand=nlp_pipeline_Bert.fit(train)

In [None]:
nlp_bert_thousand.write().overwrite().save('gdrive/MyDrive/Colab_notebook/Models_Pipelines/bert_1000_readmission')

In [None]:
from pyspark.ml.pipeline import PipelineModel
nlp_bert_five_hund= PipelineModel.load("gdrive/MyDrive/Colab_notebook/Models_Pipelines/bert_1000_readmission/")

In [None]:
processed_train=nlp_bert_thousand.transform(train)
processed_test=nlp_bert_thousand.transform(test)

In [None]:
#Combining Rows
from pyspark.sql.functions import collect_list
from pyspark.sql import functions as F
processed_train_combined = processed_train.groupby('HADM_ID').agg(collect_list('features').alias("features"),F.min(processed_train.label))
processed_test_combined= processed_test.groupby('HADM_ID').agg(collect_list('features').alias("features"),F.min(processed_test.label))

In [None]:
#Saved as orc
processed_train_combined.write.orc("gdrive/MyDrive/Colab_notebook/transformed_data/bert_1000_train")
processed_test_combined.write.orc("gdrive/MyDrive/Colab_notebook/transformed_data/bert_1000_test")

In [None]:
#Computing average embedding vector
def average_emb(df):
  for i in range(len(df)):
    new_embedding_list=[]
    embedding_list=df['features'][i]
    for k in range(len(embedding_list)):
      sentence_embedding=embedding_list[k]
      new_embedding_list.append(sentence_embedding)
    df['features'][i]=[sum(sub_list) / len(sub_list) for sub_list in zip(*new_embedding_list)]
  return(df)

In [None]:
processed_train_combined=spark.read.orc("gdrive/MyDrive/Colab_notebook/transformed_data/bert_1000_train")
processed_test_combined=spark.read.orc("gdrive/MyDrive/Colab_notebook/transformed_data/bert_1000_test")

In [None]:
pd_train=processed_train_combined.toPandas()

In [None]:
pd_test=processed_test_combined.toPandas()

In [None]:
pd_train.head()

In [None]:
pd_train.label=pd_train['min(label)'].astype("int")
pd_test.label=pd_test['min(label)'].astype("int")

In [None]:
def combined_to_sep(df):
  HADM_ID=[]
  features=[]
  label=[]
  for i in range(len(df)):
    embedding_list=df['features'][i]
    for n in range(len(embedding_list)):
      single_embedding=embedding_list[n]
      HADM_ID.append(df['HADM_ID'][i])
      features.append(single_embedding[3])
      label.append(df['min(label)'][i])
  df_sep= pd.DataFrame(list(zip(HADM_ID,features,label)),
               columns =['HADM_ID','features','label'])
  return(df_sep)

In [None]:
pd_train_sep=combined_to_sep(pd_train)
pd_test_sep=combined_to_sep(pd_test)

In [None]:
pd_train_sep.to_csv('gdrive/MyDrive/Colab_notebook/transformed_data/bert_1000_train_v2.csv')
pd_test_sep.to_csv('gdrive/MyDrive/Colab_notebook/transformed_data/bert_1000_test_v2.csv')

In [None]:
pd_train_sep=pd.read_csv('gdrive/MyDrive/Colab_notebook/transformed_data/bert_1000_train_v2.csv')
pd_test_sep=pd.read_csv('gdrive/MyDrive/Colab_notebook/transformed_data/bert_1000_test_v2.csv')

In [None]:
# Drop invalid rows where features='[]'
train_invalid_index=[]
for i in range(len(pd_train_sep)):
  if pd_train_sep['features'][i]=='[]':
    train_invalid_index.append(i)
pd_train_sep=pd_train_sep.drop(train_invalid_index).reset_index(drop=True)

test_invalid_index=[]
for i in range(len(pd_test_sep)):
  if pd_test_sep['features'][i]=='[]':
    test_invalid_index.append(i)
pd_test_sep=pd_test_sep.drop(test_invalid_index).reset_index(drop=True)

Modeling

In [None]:
from sklearn.linear_model import LogisticRegression
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.pipeline import Pipeline
from sklearn.model_selection import cross_val_predict
from sklearn.metrics import roc_curve
from sklearn.metrics import roc_auc_score
from sklearn.naive_bayes import MultinomialNB
from sklearn.neighbors import KNeighborsClassifier
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import RepeatedStratifiedKFold
from sklearn.svm import LinearSVC
from sklearn.feature_extraction.text import CountVectorizer
from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import cross_val_score
from sklearn.metrics import make_scorer
from sklearn.ensemble import RandomForestClassifier

In [None]:
X_train=pd_train_sep.features
y_train=pd_train_sep.label
X_train_trans=[]
for doc in X_train:
  embedding=doc[1:-1]
  embedding_list=embedding.split(",")
  str_to_num_list=[]
  for num_str in embedding_list:
    str_to_num_list.append(float(num_str))
  X_train_trans.append(str_to_num_list)

In [None]:
X_test=pd_test_sep.features
y_test=pd_test_sep.label
X_test_trans=[]
for doc in X_test:
  embedding=doc[1:-1]
  embedding_list=embedding.split(",")
  str_to_num_list=[]
  for num_str in embedding_list:
    str_to_num_list.append(float(num_str))
  X_test_trans.append(str_to_num_list)

In [None]:
# Manual division
subset_1=pd_train_sep.iloc[0:2201]
subset_2=pd_train_sep.iloc[2201:4401]
subset_3=pd_train_sep.iloc[4401:6602]
subset_4=pd_train_sep.iloc[6602:8802]
subset_5=pd_train_sep.iloc[8802:]

In [None]:
# Logistic Regression
# Write hyper-parameter tuning from scratch so that parameter k can also be tunned
subset_list=[subset_1,subset_2,subset_3,subset_4,subset_5]
solvers_list=['newton-cg', 'lbfgs', 'liblinear']
c_values_list = [1.0, 0.1, 0.01]
k_list=[0.01,0.05,0.1,0.15]
solver_result=[]
c_values_result=[]
k_list_result=[]
auc_score=[]
for i in range(len(subset_list)):
  val_set=subset_list[i]
  training_set=pd.concat(subset_list[:i]+subset_list[i+1:])
  X_train=training_set.features
  y_train=training_set.label
  X_val=val_set.features
  y_val=val_set.label
  X_train_trans=[]
  for doc in X_train:
    embedding=doc[1:-1]
    embedding_list=embedding.split(",")
    str_to_num_list=[]
    for num_str in embedding_list:
      str_to_num_list.append(float(num_str))
    X_train_trans.append(str_to_num_list)
  X_val_trans=[]
  for doc in X_val:
    embedding=doc[1:-1]
    embedding_list=embedding.split(",")
    str_to_num_list=[]
    for num_str in embedding_list:
      str_to_num_list.append(float(num_str))
    X_val_trans.append(str_to_num_list)
  for s in solvers_list:
    for c in c_values_list:
      for k in k_list:
        logistic_clf_bert=LogisticRegression(solver=s,C=c)
        logistic_clf_bert.fit(X_train_trans,y_train)
        y_pred=logistic_clf_bert.predict_proba(X_val_trans)
        ID_PRED=pd.DataFrame()
        ID_PRED['HADM_ID']=val_set['HADM_ID']
        ID_PRED['pred']=y_pred[:,1]
        ID_PRED['actual']=y_val
        re_combined=ID_PRED.groupby('HADM_ID',as_index=False,sort=False).agg({'pred':['mean','max','count'],'actual':'mean'})
        re_combined.columns=['HADM_ID','pred_mean','pred_max','pred_count','actual']
        re_combined['final_prediction']=(re_combined['pred_max']+re_combined['pred_mean']*re_combined['pred_count']/k)/(1+re_combined['pred_count']/k)
        new_auc=roc_auc_score(re_combined['actual'],re_combined['final_prediction'])
        solver_result.append(s)
        c_values_result.append(c)
        k_list_result.append(k)
        auc_score.append(new_auc)
  df_results=pd.DataFrame(list(zip(solver_result, c_values_result,k_list_result,auc_score)),columns=['solver','c','k','auc'])
  result_groupby=df_results.groupby(['solver','c','k'],as_index=False,sort=False).agg({'auc':'mean'})
  result_groupby.columns=['solver','c','k','auc_mean']
  result_groupby=result_groupby.sort_values(by='auc_mean',ascending=False)
result_groupby

In [None]:
logistic_clf_bert=LogisticRegression(solver='newton-cg',C=0.5)
logistic_clf_bert.fit(X_train_trans,y_train)
y_pred_logistic_clf_bert=logistic_clf_bert.predict_proba(X_test_trans)

In [None]:
ID_PRED=pd.DataFrame()
ID_PRED['HADM_ID']=pd_test_sep['HADM_ID']
ID_PRED['pred']=y_pred_logistic_clf_bert[:,1]
ID_PRED['actual']=y_test
re_combined=ID_PRED.groupby('HADM_ID',as_index=False,sort=False).agg({'pred':['mean','max','count'],'actual':'mean'})
re_combined.columns=['HADM_ID','pred_mean','pred_max','pred_count','actual']
re_combined['final_prediction']=(re_combined['pred_max']+re_combined['pred_mean']*re_combined['pred_count']/0.01)/(1+re_combined['pred_count']/0.01)
auc_logistic_clf_bert=roc_auc_score(re_combined['actual'],re_combined['final_prediction'])

In [None]:
# Random Forest
# Write hyper-parameter tuning from scratch so that parameter k can also be tunned
subset_list=[subset_1,subset_2,subset_3,subset_4,subset_5]
max_depth=[5, 10, 15]
min_samples_leaf=[20, 50, 100]
criterion=["gini", "entropy"]
n_estimators=[10,50,100]
k_list=[0.01,0.05,0.1]
max_depth_result=[]
min_samples_leaf_result=[]
criterion_result=[]
n_estimators_result=[]
k_list_result=[]
auc_score=[]
for i in range(len(subset_list)):
  val_set=subset_list[i]
  training_set=pd.concat(subset_list[:i]+subset_list[i+1:])
  X_train=training_set.features
  y_train=training_set.label
  X_val=val_set.features
  y_val=val_set.label
  X_train_trans=[]
  for doc in X_train:
    embedding=doc[1:-1]
    embedding_list=embedding.split(",")
    str_to_num_list=[]
    for num_str in embedding_list:
      str_to_num_list.append(float(num_str))
    X_train_trans.append(str_to_num_list)
  X_val_trans=[]
  for doc in X_val:
    embedding=doc[1:-1]
    embedding_list=embedding.split(",")
    str_to_num_list=[]
    for num_str in embedding_list:
      str_to_num_list.append(float(num_str))
    X_val_trans.append(str_to_num_list)
  for depth in max_depth:
    for leaf in min_samples_leaf:
      for criter in criterion:
        for estimator in n_estimators:
          for k in k_list:
            rb_clf_bert=RandomForestClassifier(max_depth=depth,min_samples_leaf=leaf,criterion=criter,n_estimators=estimator)
            rb_clf_bert.fit(X_train_trans,y_train)
            y_pred=rb_clf_bert.predict_proba(X_val_trans)
            ID_PRED=pd.DataFrame()
            ID_PRED['HADM_ID']=val_set['HADM_ID']
            ID_PRED['pred']=y_pred[:,1]
            ID_PRED['actual']=y_val
            re_combined=ID_PRED.groupby('HADM_ID',as_index=False,sort=False).agg({'pred':['mean','max','count'],'actual':'mean'})
            re_combined.columns=['HADM_ID','pred_mean','pred_max','pred_count','actual']
            re_combined['final_prediction']=(re_combined['pred_max']+re_combined['pred_mean']*re_combined['pred_count']/k)/(1+re_combined['pred_count']/k)
            new_auc=roc_auc_score(re_combined['actual'],re_combined['final_prediction'])
            max_depth_result.append(depth)
            min_samples_leaf_result.append(leaf)
            criterion_result.append(criter)
            n_estimators_result.append(estimator)
            k_list_result.append(k)
            auc_score.append(new_auc)
    df_results=pd.DataFrame(list(zip(max_depth_result, min_samples_leaf_result,criterion_result,n_estimators_result,k_list_result,auc_score)),columns=['max_depth','min_samples_leaf','criterion','n_estimator','k','auc'])
    result_groupby=df_results.groupby(['max_depth','min_samples_leaf','criterion','n_estimator','k'],as_index=False,sort=False).agg({'auc':'mean'})
    result_groupby.columns=['max_depth','min_samples_leaf','criterion','n_estimator','k','auc_mean']
    result_groupby=result_groupby.sort_values(by='auc_mean',ascending=False)
result_groupby

In [None]:
rb_clf_bert=RandomForestClassifier(max_depth=7,min_samples_leaf=7,criterion='gini',n_estimators=10)
rb_clf_bert.fit(X_train_trans,y_train)
y_pred_rb_clf_bert=rb_clf_bert.predict_proba(X_test_trans)

In [None]:
ID_PRED=pd.DataFrame()
ID_PRED['HADM_ID']=pd_test_sep['HADM_ID']
ID_PRED['pred']=y_pred_rb_clf_bert[:,1]
ID_PRED['actual']=y_test
re_combined=ID_PRED.groupby('HADM_ID',as_index=False,sort=False).agg({'pred':['mean','max','count'],'actual':'mean'})
re_combined.columns=['HADM_ID','pred_mean','pred_max','pred_count','actual']
re_combined['final_prediction']=(re_combined['pred_max']+re_combined['pred_mean']*re_combined['pred_count']/0.05)/(1+re_combined['pred_count']/0.05)
auc_rb_clf_bert=roc_auc_score(re_combined['actual'],re_combined['final_prediction'])

In [None]:
# SVM-linear
subset_list=[subset_1,subset_2,subset_3,subset_4,subset_5]
c_values_list = [100,10,1.0,0.1,0.01]
k_list=[0.01,0.05,0.1,0.15]
c_values_result=[]
k_list_result=[]
auc_score=[]
for i in range(len(subset_list)):
  val_set=subset_list[i]
  training_set=pd.concat(subset_list[:i]+subset_list[i+1:])
  X_train=training_set.features
  y_train=training_set.label
  X_val=val_set.features
  y_val=val_set.label
  X_train_trans=[]
  for doc in X_train:
    embedding=doc[1:-1]
    embedding_list=embedding.split(",")
    str_to_num_list=[]
    for num_str in embedding_list:
      str_to_num_list.append(float(num_str))
    X_train_trans.append(str_to_num_list)
  X_val_trans=[]
  for doc in X_val:
    embedding=doc[1:-1]
    embedding_list=embedding.split(",")
    str_to_num_list=[]
    for num_str in embedding_list:
      str_to_num_list.append(float(num_str))
    X_val_trans.append(str_to_num_list)
  for c in c_values_list:
    for k in k_list:
      lsvm_clf_bert=LinearSVC(C=c)
      lsvm_clf_bert.fit(X_train_trans,y_train)
      y_pred=lsvm_clf_bert.decision_function(X_val_trans)
      ID_PRED=pd.DataFrame()
      ID_PRED['HADM_ID']=val_set['HADM_ID']
      ID_PRED['pred']=y_pred
      ID_PRED['actual']=y_val
      re_combined=ID_PRED.groupby('HADM_ID',as_index=False,sort=False).agg({'pred':['mean','max','count'],'actual':'mean'})
      re_combined.columns=['HADM_ID','pred_mean','pred_max','pred_count','actual']
      re_combined['final_prediction']=(re_combined['pred_max']+re_combined['pred_mean']*re_combined['pred_count']/k)/(1+re_combined['pred_count']/k)
      new_auc=roc_auc_score(re_combined['actual'],re_combined['final_prediction'])
      c_values_result.append(c)
      k_list_result.append(k)
      auc_score.append(new_auc)
  df_results=pd.DataFrame(list(zip(c_values_result,k_list_result,auc_score)),columns=['c','k','auc'])
  result_groupby=df_results.groupby(['c','k'],as_index=False,sort=False).agg({'auc':'mean'})
  result_groupby.columns=['c','k','auc_mean']
  result_groupby=result_groupby.sort_values(by='auc_mean',ascending=False)
result_groupby

In [None]:
lsvm_clf_bert=LinearSVC(C=0.05)
lsvm_clf_bert.fit(X_train_trans,y_train)
y_pred_lsvm_clf_bert=lsvm_clf_bert.decision_function(X_test_trans)

In [None]:
ID_PRED=pd.DataFrame()
ID_PRED['HADM_ID']=pd_test_sep['HADM_ID']
ID_PRED['pred']=y_pred_lsvm_clf_bert
ID_PRED['actual']=y_test
re_combined=ID_PRED.groupby('HADM_ID',as_index=False,sort=False).agg({'pred':['mean','max','count'],'actual':'mean'})
re_combined.columns=['HADM_ID','pred_mean','pred_max','pred_count','actual']
re_combined['final_prediction']=(re_combined['pred_max']+re_combined['pred_mean']*re_combined['pred_count']/0.05)/(1+re_combined['pred_count']/0.05)
auc_lsvm_clf_bert=roc_auc_score(re_combined['actual'],re_combined['final_prediction'])