In [1]:
import findspark
findspark.init()

import pyspark
import random
from pyspark import SparkContext, SparkConf
from pyspark.sql import SQLContext

In [2]:
conf = SparkConf().setAppName("Vectorizer").setMaster("spark://10.102.2.122:7077")
sc = SparkContext(conf=conf)

sqlContext = SQLContext(sc)

print("Spark Version: " + sc.version)
print("PySpark Version: " + pyspark.__version__)

Spark Version: 2.3.2
PySpark Version: 2.3.2


<h3>Functions</h3>

In [3]:
from pyspark.sql.functions import collect_list, collect_set, split
from pyspark.sql.functions import upper,col,udf,concat, lit
from pyspark.sql.types import StructType, StructField, StringType, FloatType, IntegerType, DateType
def GetSchema():
    schema = StructType([
        StructField("PATIENT_NOTE_KEY", IntegerType(), True),
        StructField("complaint_tokens_str", StringType(), True)])
    return schema

import os
def Collect_Raw_Data(sqlContext):
    df_files = sqlContext.read.csv("E:/TEMP/OUTPUT/ChiefComplaint_CleanedText/*.csv",header=False,schema=GetSchema(),sep="|")
    return df_files

import os
def Collect_Raw_Data_Single(sqlContext):
    df_files = sqlContext.read.csv("E:/TEMP/OUTPUT/ChiefComplaint_SingleFile/*.csv",header=False,schema=GetSchema(),sep="|")
    return df_files

from pyspark.ml.feature import CountVectorizer, CountVectorizerModel
from pyspark.sql.functions import col, udf
from pyspark.sql.types import *
import json
import os

class Vocabulary(object):

    def __init__(self, comp_cv = None, icd_cv = None):
        if comp_cv is not None and icd_cv is not None:
            self.setComp_CV(comp_cv)
            self.setICD_CV(icd_cv)

    def setComp_CV(self, comp_cv):
        self.comp_cv = comp_cv
        self.comp_dict = Save_Model_Vocab(self.comp_cv, 'comp_cv')

    def setICD_CV(self, icd_cv):
        self.icd_cv = icd_cv
        self.icd_dict = Save_Model_Vocab(self.icd_cv, 'icd_cv')

    def saveVocab(self):
        Save_Model_Vocab(self.comp_cv, "comp_cv")
        self.comp_cv.write().overwrite().save('vocab\\complaint_vocab')
        Save_Model_Vocab(self.icd_cv, "icd_cv")
        self.icd_cv.write().overwrite().save('vocab\\icd_vocab')

    def loadVocab(self):
        try:
            self.comp_cv = CountVectorizerModel.load("vocab\\complaint_vocab")
            self.icd_cv = CountVectorizerModel.load("vocab\\icd_vocab")
        except:
            print('No spark context initialized so not loading vocabulary models')
        self.comp_dict = {'int':json.load(open("vocab\\comp_cv_model_int-token.txt")), 'token': json.load(open("vocab\\comp_cv_model_token-int.txt"))}
        self.icd_dict = {'int':json.load(open("vocab\\icd_cv_model_int-token.txt")), 'token':json.load(open("vocab\\icd_cv_model_token-int.txt"))}

    def setVocab(self, comp_cv = None, icd_cv = None):
        self.setComp_CV(comp_cv)
        self.setICD_CV(icd_cv)

    def getVocab(self):
        return [self.comp_cv, self.icd_cv]

    def getDict(self):
        return {"comp": self.comp_dict, "icd":self.icd_dict}

    def fitVocab(self, df):
        for mod in self.getVocab():
            df = mod.transform(df)
        return df

    #def transform(self, df, mod):
    #    df_cols = df.columns
        
    #    input_col = mod.getOrDefault(mod.getParam('inputCol'))
    #    output_col = mod.getOrDefault(mod.getParam('outputCol'))
    #    df = mod.transform(df)
    #    convert_vector = udf(lambda x: x.toArray().tolist(), ArrayType(DoubleType()))
    #    df = df.withColumn(input_col + '_idx', convert_vector(col(output_col)))
    #    df_cols.extend([input_col + '_idx'])

    #    self.df = df

    #    return df.select(df_cols)

def Save_Model_Vocab(model, output):
    vocab_list = model.vocabulary

    int_dict = dict((i, token) for i, token in enumerate(vocab_list))
    token_dict = {token: i for i, token in int_dict.items()}

    make_directory("vocab")
    with open("vocab\\" + output + "_model_int-token.txt", 'w') as file:
        file.write(json.dumps(int_dict))

    with open("vocab\\" + output + "_model_token-int.txt", 'w') as file:
        file.write(json.dumps(token_dict))

    return {"int":int_dict, "token":token_dict}

def make_directory(directory):
    if not os.path.exists(directory):
        os.makedirs(directory)

<h3>Program</h3>

In [4]:
df = Collect_Raw_Data_Single(sqlContext)
df = df.withColumn("complaint_tokens_array", split(col("complaint_tokens_str"),' '))
#df_Final = df.select('complaint_tokens_array').limit(5000)
df = df.where(col("complaint_tokens_array").isNotNull())
##df.select('PATIENT_NOTE_KEY', 'complaint_tokens_str').coalesce(1).write.format('csv').mode("overwrite").options(delimiter='|').save('E:/TEMP/OUTPUT/ChiefComplaint_SingleFile')

In [5]:
df.show(5)

+----------------+--------------------+----------------------+
|PATIENT_NOTE_KEY|complaint_tokens_str|complaint_tokens_array|
+----------------+--------------------+----------------------+
|        14595210|persist congest p...|  [persist, congest...|
|        15098556|symptom much impr...|  [symptom, much, i...|
|        14611743|week histori pain...|  [week, histori, p...|
|        17626919|left ear pierc mo...|  [left, ear, pierc...|
|        14873956|uri sx cough whee...|  [uri, sx, cough, ...|
+----------------+--------------------+----------------------+
only showing top 5 rows



In [6]:
from pyspark.ml.feature import CountVectorizer

cv = CountVectorizer(inputCol="complaint_tokens_array", outputCol="complaint_tokens_vectors", vocabSize=5000, minDF=1.0)
#comp_cv = cv.fit(df)
model = cv.fit(df)

df_vec = model.transform(df)

In [7]:
df_vec.count()

667643

In [8]:
import numpy as np
np.ones(70)

array([1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
       1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
       1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
       1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1., 1.,
       1., 1.])

In [9]:
df_vec.show(5)

+----------------+--------------------+----------------------+------------------------+
|PATIENT_NOTE_KEY|complaint_tokens_str|complaint_tokens_array|complaint_tokens_vectors|
+----------------+--------------------+----------------------+------------------------+
|        14595210|persist congest p...|  [persist, congest...|    (5000,[0,6,15,19,...|
|        15098556|symptom much impr...|  [symptom, much, i...|    (5000,[1,44,61,75...|
|        14611743|week histori pain...|  [week, histori, p...|    (5000,[0,2,9,10,3...|
|        17626919|left ear pierc mo...|  [left, ear, pierc...|    (5000,[25,42,43,5...|
|        14873956|uri sx cough whee...|  [uri, sx, cough, ...|    (5000,[3,9,30,34,...|
+----------------+--------------------+----------------------+------------------------+
only showing top 5 rows



In [10]:
#Xs = np.array(df_vec.select('complaint_tokens_vectors').limit(5000).collect())#convert to 1d array
#rows, d, features = Xs.shape
splits = df_vec.randomSplit(np.ones(70))

In [11]:
splits[0].count()

9501

In [14]:
#CONVERT TO A DENSE VECTOR ONLY ON THE SPLITS TO AVOID OUF MEMORY
from pyspark.sql.types import *
array_udf = udf(lambda vector: vector.toArray().tolist(), ArrayType(DoubleType()))

split_vec = splits[0].withColumn("vectors", array_udf('complaint_tokens_vectors'))
values = split_vec.select('vectors').collect()

In [15]:
Xs = np.array(values)
rows, d, features = Xs.shape

X = Xs.reshape(rows,features)
X[X > 1] = 1

In [16]:
X[0]

array([1., 1., 1., ..., 0., 0., 0.])

In [None]:
#PENDING SAVE X