# Proof of concept - Predict Top-5 diagnosis categories

<ul> <li> D1: Diseases of the circulatory system </ul> </li>
<ul> <li> D2: External causes of injury and supplemental classification </ul> </li>
<ul> <li> D3: Endocrine, nutritional and metabolic diseases, and immunity disorders </ul> </li>
<ul> <li> D4: Diseases of the respiratory system </ul> </li>
<ul> <li> D5: Injury and poisoning </ul> </li>

## 1. Pre process data tables

In [1]:
import re
import os
import ast
import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from pyspark.sql import SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import collect_list
import pyspark.sql.functions as F
import pyspark.sql.types as T
from pyspark.sql import SparkSession
import nltk
from nltk.corpus import stopwords
import string
nltk.download('wordnet')
from nltk.stem import WordNetLemmatizer
import matplotlib.pyplot as plt
from wordcloud import WordCloud
#from transformers import AutoTokenizer, AutoModel

[nltk_data] Downloading package wordnet to
[nltk_data]     C:\Users\kfpj179\AppData\Roaming\nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


In [None]:
app_name = "ClickThrough"
master = "local[*]"
spark = SparkSession\
        .builder\
        .appName(app_name)\
        .master(master)\
        .getOrCreate()

In [None]:
app_name = "ClickThrough"
master = "local[*]"
spark = SparkSession\
        .builder\
        .appName(app_name)\
        .master(master)\
        .getOrCreate()
sc = spark.sparkContext
sqlContext = SQLContext(sc)

In [None]:
sc

In [4]:
filename_LIST = os.listdir("data")
for filename_LIST_ITEM in filename_LIST:
    filename,fileformat = filename_LIST_ITEM.split('.')
    exec(filename+'_DF = sqlContext.read.format("'+fileformat+'").option("header", "true").option("multiline",True).'+
         'option("escape",'+"'"+'"'+"')"+'.load("data/'+filename+'.csv")')
    exec(filename+'_DF.createOrReplaceTempView("'+filename+'")')

### 1.1 Build HADM level diagnosis flag tables

In [5]:
spark.sql("""
select A.*, B.SHORT_TITLE, B.LONG_TITLE, 
case when substr(A.ICD9_CODE,1,1) in ('E','V') then 'external causes of injury and supplemental classification'
when substr(A.ICD9_CODE,1,3) between 001 and 139 then 'infectious and parasitic diseases'
when substr(A.ICD9_CODE,1,3) between 140 and 239 then 'neoplasms'
when substr(A.ICD9_CODE,1,3) between 240 and 279 then 'endocrine, nutritional and metabolic diseases, and immunity disorders'
when substr(A.ICD9_CODE,1,3) between 280 and 289 then 'diseases of the blood and blood-forming organs'
when substr(A.ICD9_CODE,1,3) between 290 and 319 then 'mental disorders'
when substr(A.ICD9_CODE,1,3) between 320 and 389 then 'diseases of the nervous system and sense organs'
when substr(A.ICD9_CODE,1,3) between 390 and 459 then 'diseases of the circulatory system'
when substr(A.ICD9_CODE,1,3) between 460 and 519 then 'diseases of the respiratory system'
when substr(A.ICD9_CODE,1,3) between 520 and 579 then 'diseases of the digestive system'
when substr(A.ICD9_CODE,1,3) between 580 and 629 then 'diseases of the genitourinary system'
when substr(A.ICD9_CODE,1,3) between 630 and 679 then 'complications of pregnancy, childbirth, and the puerperium'
when substr(A.ICD9_CODE,1,3) between 680 and 709 then 'diseases of the skin and subcutaneous tissue'
when substr(A.ICD9_CODE,1,3) between 710 and 739 then 'diseases of the musculoskeletal system and connective tissue'
when substr(A.ICD9_CODE,1,3) between 740 and 759 then 'congenital anomalies'
when substr(A.ICD9_CODE,1,3) between 760 and 779 then 'certain conditions originating in the perinatal period'
when substr(A.ICD9_CODE,1,3) between 780 and 799 then 'symptoms, signs, and ill-defined conditions'
when substr(A.ICD9_CODE,1,3) between 800 and 999 then 'injury and poisoning' 
end as ICD_GROUP
from DIAGNOSES_ICD A
left join D_ICD_DIAGNOSES B
on A.ICD9_CODE = B.ICD9_CODE
""").createOrReplaceTempView('DIAGNOSES_ICD_WITH_GROUPING')

AnalysisException: 'java.lang.RuntimeException: java.io.IOException: (null) entry in command string: null chmod 0733 C:\\tmp\\hive;'

In [None]:
DIAGNOSIS_GROUPING_DF = spark.sql("""select distinct HADM_ID, 
case when ICD_GROUP = 'diseases of the circulatory system' then 'D1'
when ICD_GROUP = 'external causes of injury and supplemental classification' then 'D2'
when ICD_GROUP = 'endocrine, nutritional and metabolic diseases, and immunity disorders' then 'D3'
when ICD_GROUP = 'diseases of the respiratory system' then 'D4'
when ICD_GROUP = 'injury and poisoning' then 'D5' end as ICD_GROUP_ID
from DIAGNOSES_ICD_WITH_GROUPING 
where ICD_GROUP in ('diseases of the circulatory system',
'external causes of injury and supplemental classification',
'endocrine, nutritional and metabolic diseases, and immunity disorders',
'diseases of the respiratory system',
'injury and poisoning')""")
DIAGNOSIS_GROUPING_PIVOT_DF = DIAGNOSIS_GROUPING_DF.groupBy('HADM_ID').pivot('ICD_GROUP_ID').count().fillna(0)
DIAGNOSIS_GROUPING_PIVOT_DF.createOrReplaceTempView('DIAGNOSIS_GROUPING_PIVOT')

In [None]:
spark.sql("""select * from DIAGNOSIS_GROUPING_PIVOT limit 5""").show()

In [None]:
DIAGNOSIS_GROUPING_PIVOT_DF.show(5)

### 1.2 Combine all notes at HADM level

In [None]:
NOTEEVENTS_DF = spark.sql("""SELECT A.HADM_ID, B.D1, B.D2, B.D3, B.D4, B.D5, lower(A.TEXT) as TEXT_LOWER 
from NOTEEVENTS A inner join DIAGNOSIS_GROUPING_PIVOT B on A.HADM_ID = B.HADM_ID""")

#### 1.2.1 Group all Text at HADM level

In [None]:
NOTEEVENTS_GROUPED_DF = NOTEEVENTS_DF.groupby('HADM_ID','D1','D2','D3','D4','D5') \
                        .agg(F.concat_ws("", F.collect_list(NOTEEVENTS_DF.TEXT_LOWER)).alias('TEXT'))

In [None]:
NOTEEVENTS_GROUPED_DF.cache()
NOTEEVENTS_GROUPED_DF.show(5)

In [None]:
NOTEEVENTS_GROUPED_DF.printSchema()

In [None]:
NOTEEVENTS_GROUPED_DF.describe().toPandas().T

#### 1.2.2 Convert to lowercase and Tokenize

In [18]:
NOTES_RDD = NOTEEVENTS_GROUPED_DF.select('HADM_ID','TEXT').rdd
def TokenizeFunct(x):
    return (x[0], nltk.word_tokenize(re.sub('\n',' ',re.sub(r'\[\*\*.+\*\*\]','xxx',x[1]))))
NOTES_RDD = NOTES_RDD.map(TokenizeFunct)

#### 1.2.3 Remove stopwords, numbers and punctuations

In [19]:
def removeStopWordsFunct(x):
    stop_words=set((stopwords.words('english')))
    filteredSentence = [w for w in x[1] if (w not in stop_words)]
    return (x[0], filteredSentence)
NOTES_RDD = NOTES_RDD.map(removeStopWordsFunct)

In [20]:
def removePunctuationsFunct(x):
    list_punct=list(string.punctuation)
    filtered = [''.join(c for c in s if c not in list_punct) for s in x[1]] 
    filtered_space = [s for s in filtered if s] #remove empty space 
    #filtered_num = [s for s in filtered_space if not(s.isnumeric())]
    return (x[0], filtered_space)
NOTES_RDD = NOTES_RDD.map(removePunctuationsFunct)

#### 1.2.4 Lemmatization

In [21]:
def lemmatizationFunct(x):
    lemmatizer = WordNetLemmatizer()
    finalLem = [lemmatizer.lemmatize(s) for s in x[1] if s not in ['']]
    return (x[0], finalLem)
NOTES_RDD = NOTES_RDD.map(lemmatizationFunct)

In [23]:
NOTES_RDD.take(1)

[('100010',
  ['admission',
   'date',
   'xxx',
   'date',
   'birth',
   'xxx',
   'sex',
   'f',
   'service',
   'urology',
   'allergy',
   'penicillin',
   'attending',
   'xxx',
   'chief',
   'complaint',
   'gross',
   'hematuria',
   '50pound',
   'weight',
   'loss',
   'major',
   'surgical',
   'invasive',
   'procedure',
   'open',
   'left',
   'radical',
   'nephrectomy',
   'history',
   'present',
   'illness',
   '54',
   'yo',
   'female',
   'wlarge',
   'left',
   'renal',
   'mass',
   'sp',
   'mediastinoscopy',
   'showing',
   'metastatic',
   'rcc',
   'xxx',
   'previously',
   'healthy',
   'patientn',
   'two',
   'year',
   'ago',
   'noted',
   'one',
   'episode',
   'gross',
   'hematuria',
   'quickly',
   'resolved',
   'recently',
   '50pound',
   'weight',
   'loss',
   '210',
   '160',
   'pound',
   'past',
   'six',
   'month',
   'noted',
   'fullness',
   'left',
   'upper',
   'quadrant',
   'actual',
   'pain',
   'requires',
   'occasional'

In [24]:
NOTES_RDD.toDF(['HADM_ID', 'TEXT_TOKEN']).createOrReplaceTempView('NOTES_TOKEN')

#### 1.2.5 Combine diagnosis group flags

In [27]:
NOTES_TOKEN_DF = spark.sql("""SELECT A.HADM_ID, B.D1, B.D2, B.D3, B.D4, B.D5, A.TEXT_TOKEN
from NOTES_TOKEN A inner join DIAGNOSIS_GROUPING_PIVOT B on A.HADM_ID = B.HADM_ID""")
NOTES_TOKEN_DF.cache()
#NOTE_TOKEN_DF.write.parquet('C:/Users/kfpj179/Desktop/Final Project/data/mimic-train-1')
#NOTE_TOKEN_DF.summary().toPandas().T

DataFrame[HADM_ID: string, D1: bigint, D2: bigint, D3: bigint, D4: bigint, D5: bigint, TEXT_TOKEN: array<string>]

In [29]:
NOTES_TOKEN_DF.show(2)

+-------+---+---+---+---+---+--------------------+
|HADM_ID| D1| D2| D3| D4| D5|          TEXT_TOKEN|
+-------+---+---+---+---+---+--------------------+
| 100010|  0|  0|  1|  0|  0|[admission, date,...|
| 100140|  0|  1|  0|  1|  0|[xxx, 1228, pm, c...|
+-------+---+---+---+---+---+--------------------+
only showing top 2 rows



In [34]:
NOTES_TOKEN_DF.printSchema()

root
 |-- HADM_ID: string (nullable = true)
 |-- D1: long (nullable = true)
 |-- D2: long (nullable = true)
 |-- D3: long (nullable = true)
 |-- D4: long (nullable = true)
 |-- D5: long (nullable = true)
 |-- TEXT_TOKEN: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [36]:
NOTES_TOKEN_DF.describe().toPandas().T

Unnamed: 0,0,1,2,3,4
summary,count,mean,stddev,min,max
HADM_ID,57632,149959.28055594114,28882.103401634144,100001,199999
D1,57632,0.735181843420322,0.44124015992805676,0,1
D2,57632,0.7184550249861188,0.44975650293216807,0,1
D3,57632,0.6138950583009439,0.48685935191704993,0,1
D4,57632,0.435417823431427,0.49581590135958314,0,1
D5,57632,0.3905469183786785,0.4878771904822777,0,1


## 2. Load pre-trained BERT model

In [57]:
from pyspark.sql.types import IntegerType
NOTES_TOKEN_DF = NOTES_TOKEN_DF.withColumn('D1',NOTES_TOKEN_DF['D1'].cast(IntegerType()))
NOTES_TOKEN_DF = NOTES_TOKEN_DF.withColumn('D2',NOTES_TOKEN_DF['D2'].cast(IntegerType()))
NOTES_TOKEN_DF = NOTES_TOKEN_DF.withColumn('D3',NOTES_TOKEN_DF['D3'].cast(IntegerType()))
NOTES_TOKEN_DF = NOTES_TOKEN_DF.withColumn('D4',NOTES_TOKEN_DF['D4'].cast(IntegerType()))
NOTES_TOKEN_DF = NOTES_TOKEN_DF.withColumn('D5',NOTES_TOKEN_DF['D5'].cast(IntegerType())).cache()

In [220]:
NOTEEVENTS_GROUPED_DF.show(5)

+-------+---+---+---+---+---+--------------------+
|HADM_ID| D1| D2| D3| D4| D5|                TEXT|
+-------+---+---+---+---+---+--------------------+
| 100010|  0|  0|  1|  0|  0|admission date:  ...|
| 100140|  0|  1|  0|  1|  0|[**2117-6-17**] 1...|
| 100227|  1|  1|  0|  1|  1|admission date:  ...|
| 100263|  1|  1|  1|  1|  1|admission date:  ...|
| 100320|  1|  1|  1|  0|  0|admission date:  ...|
+-------+---+---+---+---+---+--------------------+
only showing top 5 rows



In [227]:
NOTES_TOKEN_DF.write.mode('overwrite').parquet('C:/tmp/train')
#NOTEEVENTS_GROUPED_DF.coalesce(1).write.format("com.databricks.spark.csv").mode('overwrite') \
#        .option("header", "true").option("nullValue", "0").save("train.csv")
#NOTEEVENTS_GROUPED_DF.write.csv('mycsv.csv')

Py4JJavaError: An error occurred while calling o3939.parquet.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
	at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
	at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:81)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
	at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:80)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:127)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:75)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566)
	at sun.reflect.GeneratedMethodAccessor401.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
	at java.lang.reflect.Method.invoke(Unknown Source)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Unknown Source)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 411.0 failed 1 times, most recent failure: Lost task 5.0 in stage 411.0 (TID 3536, localhost, executor driver): java.io.IOException: (null) entry in command string: null chmod 0644 C:\tmp\train\_temporary\0\_temporary\attempt_20200604081317_0411_m_000005_3536\part-00005-198bcf14-46b8-4303-956e-1780fb27eb2f-c000.snappy.parquet
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:770)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:866)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:849)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:225)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
	at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:398)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:461)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
	at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
	at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:248)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:390)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:349)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:151)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:120)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:108)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:236)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1891)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1879)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1878)
	at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1878)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:927)
	at scala.Option.foreach(Option.scala:257)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:927)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2112)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2061)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2050)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:738)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
	... 32 more
Caused by: java.io.IOException: (null) entry in command string: null chmod 0644 C:\tmp\train\_temporary\0\_temporary\attempt_20200604081317_0411_m_000005_3536\part-00005-198bcf14-46b8-4303-956e-1780fb27eb2f-c000.snappy.parquet
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:770)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:866)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:849)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:733)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:225)
	at org.apache.hadoop.fs.RawLocalFileSystem$LocalFSFileOutputStream.<init>(RawLocalFileSystem.java:209)
	at org.apache.hadoop.fs.RawLocalFileSystem.createOutputStreamWithMode(RawLocalFileSystem.java:307)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:296)
	at org.apache.hadoop.fs.RawLocalFileSystem.create(RawLocalFileSystem.java:328)
	at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSOutputSummer.<init>(ChecksumFileSystem.java:398)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:461)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:892)
	at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
	at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:248)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:390)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:349)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:37)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anon$1.newInstance(ParquetFileFormat.scala:151)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:120)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:108)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:236)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:123)
	at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	... 1 more


In [81]:
NOTEEVENTS_GROUPED_DF.take(1)

[Row(HADM_ID='100010', D1=0, D2=0, D3=1, D4=0, D5=0, TEXT='admission date:  [**2109-12-10**]              discharge date:   [**2109-12-14**]\n\ndate of birth:  [**2055-6-3**]             sex:   f\n\nservice: urology\n\nallergies:\npenicillins\n\nattending:[**first name3 (lf) 11304**]\nchief complaint:\ngross hematuria, 50-pound weight loss\n\nmajor surgical or invasive procedure:\nopen left radical nephrectomy\n\n\nhistory of present illness:\n54 y/o female w/large left renal mass, s/p mediastinoscopy\nshowing metastatic rcc.  [**known firstname **] is a previously healthy\npatientn who, two years ago, noted one episode of gross\nhematuria, which quickly resolved.  more recently, she has had a\n50-pound weight loss from 210 to 160 pounds over the past six\nmonths.  she has noted some\nfullness in the left upper quadrant, but no actual pain and only\nrequires very occasional tylenol for this.  imaging was\nperformed, which revealed a very large left renal mass\nconsistent with renal cel