In [1]:
# !pip install findspark
# import findspark
# findspark.init()

# Named Entity Recognition for Healthcare with SparkNLP NerDL and NerCRF - Data Preparation and Model Evaluation

## Why is Data Preparation Important?

It's simple to use a pretrained named entity recognition model, but sometimes you need to train your own model to get the best results. This tutorial will show you how to prepare your healthcare training data and train your own NER model using Python and SparkNLP. SparkNLP NerDL has cutting edge scores with the BC2GM dataset (Micro-average F1: 0.87) and other benchmark datasets. You need to use licensed SparkNLP Clinical embeddings to get those cutting edge scores on healthcare data, but Glove embeddings still do great. I'll show you how to train and evaluate your NerCRF and NerDL models on the BC5CDR-Chem dataset using Glove embeddings.

### Preparing the Training Data

To train a NerCRF or NerDL model, you will need to put your tokens and entity labels into a space-separated format called CoNLL. A CoNLL file puts each token of a sentence on a different line, and separates each sentence with an empty line. In the following Python example I will annotate one sentence and save it in CoNLL format.

In [1]:
#Create some tokens
tokens=['An', 'apple', 'a', 'day', 'keeps', 'the', 'doctor', 'away', '.']

#Create part of speech labels or use a place-holder value like "NN".
pos_labels=['DT', 'NN', 'DT', 'NN', 'VBZ', 'DT', 'NN', 'RB', '.']

#Create some named entity labels. 'O' labels mean no named entity was found.
entity_labels=['B-Treatment','I-Treatment','I-Treatment','I-Treatment','O','O','O','O','O']

Please notice the entity labels above. When an entity has more than one word, the label for the first word should begin with "B-" and the label for the following words should begin with "I-". Now let's save the tokens, parts-of-speech, and entity labels in CoNLL format.

In [2]:
conll_lines=''

for token,pos,label in zip(tokens,pos_labels,entity_labels):
    
    conll_lines+="{} {} {} {}\n".format(token, pos, pos, label)

#Add another line break at the end of the sentence in order to create an empty line.
conll_lines+='\n'

#For this example I will print the lines instead of writing a .txt file.
print(conll_lines)


An DT DT B-Treatment
apple NN NN I-Treatment
a DT DT I-Treatment
day NN NN I-Treatment
keeps VBZ VBZ O
the DT DT O
doctor NN NN O
away RB RB O
. . . O




Please see the printed CoNLL above. "An" is the first word in "An apple a day" so it is labelled "B-Treatment", while "apple","a", and "day" are all labelled "I-Treatment". The words that are not "Treatments" are labelled with a capital "O".

Here's another example of a sentence annotated in CoNLL format. The entity is "blood pressure".

In [3]:
#Create some tokens
tokens=['I','checked','my','blood','pressure','this','morning','.']

#Create part-of-speech labels or use a place-holder value like 'NN'.
pos_labels=['PRP', 'VBD', 'PRP', 'NN', 'NN', 'DT', 'NN', '.']

#Create some named entity labels. 'O' labels mean no named entity was found
entity_labels=['O','O','O','B-Test','I-Test','O','O','O']

In [4]:
conll_lines=''

for token,pos,label in zip(tokens,pos_labels,entity_labels):
    
    conll_lines+="{} {} {} {}\n".format(token, pos, pos, label)

#Add another line break at the end of the sentence in order to create an empty line.
conll_lines+='\n'

#For this example I will print the lines instead of writing a .txt file.
print(conll_lines)

I PRP PRP O
checked VBD VBD O
my PRP PRP O
blood NN NN B-Test
pressure NN NN I-Test
this DT DT O
morning NN NN O
. . . O




As you can see above, 'blood' is the first word in the entity, so it is labelled "B-Test", while "pressure" is the second word in the entity so it is labelled "I-Test". We do this so the model can tell that "blood pressure" is one whole entity, rather than the two separate entities "blood" and "pressure.

Now let's work with some real datasets. First we have to load the data.

In [35]:
# import os
# ! wget -O ncbi.tsv https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/resources/en/ner/conll-2003/NCBIdisease.tsv
# ! wget -O BC5CDRtrain.txt https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/resources/en/ner/conll-2003/CRFtrain_dev.txt
# ! wget -O BC5CDRtest.txt https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/resources/en/ner/conll-2003/CRFtest.txt

--2023-12-05 17:08:06--  https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/resources/en/ner/conll-2003/NCBIdisease.tsv
Resolving s3.amazonaws.com (s3.amazonaws.com)... 52.217.13.70, 52.217.107.182, 54.231.225.48, ...
Connecting to s3.amazonaws.com (s3.amazonaws.com)|52.217.13.70|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 1060328 (1.0M) [text/tab-separated-values]
Saving to: ‘ncbi.tsv’


2023-12-05 17:08:07 (1.84 MB/s) - ‘ncbi.tsv’ saved [1060328/1060328]

--2023-12-05 17:08:07--  https://s3.amazonaws.com/auxdata.johnsnowlabs.com/public/resources/en/ner/conll-2003/CRFtrain_dev.txt
Resolving s3.amazonaws.com (s3.amazonaws.com)... 54.231.225.48, 52.217.122.64, 52.217.101.174, ...
Connecting to s3.amazonaws.com (s3.amazonaws.com)|54.231.225.48|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 3347168 (3.2M) [text/plain]
Saving to: ‘BC5CDRtrain.txt’


2023-12-05 17:08:10 (1.59 MB/s) - ‘BC5CDRtrain.txt’ saved [3347168/3347168]

--

### How to Convert a Pandas Dataframe to CoNLL Format

In the next example I'll read from a Pandas dataframe and write a CoNLL file for NerDL. I'll use the sentence ID (sent_id) column to determine if I need to leave an empty line before a new sentence. Here are the first 5 lines of the dataframe:

In [6]:
# !pip install pandas

In [5]:
import pandas as pd
ncbi=pd.read_csv('ncbi.tsv',sep='\t')

In [6]:
ncbi.head()

Unnamed: 0,sent_id,token,entity_label
0,1,Identification,O
1,1,of,O
2,1,APC2,O
3,1,",",O
4,1,a,O


For NerDL the part-of-speech column is not used, but a CoNLL must still have a part of speech column. Add a part-of-speech column with 'NN' or some other placeholder as the only value. If you already have a part of speech column, you don't need to take this step.

In [7]:
ncbi['pos']='NN'

My Pandas dataframe is called 'ncbi' and I've added a part-of-speech column which I've called 'pos'. Now write a CoNLL file using the columns of the Pandas dataframe as input.

In [8]:
file_loc = "train_CoNLL_data"

In [9]:
conll_lines="-DOCSTART- -X- -X- -O-\n\n"
save=0

for sent, token, pos, label in zip(ncbi['sent_id'],ncbi['token'],ncbi['pos'],ncbi['entity_label']): 
    
# If the sentence ID has changed, that means we are starting a new sentence. We have to add an empty line.
    
    if save!=sent:
        conll_lines+='\n'
    
# Save the conll line
    
    conll_lines += "{} {} {} {}\n".format(token, pos, pos, label)
    
    save=sent
    

# Now print all of the lines to a text file

with open(file_loc,'w') as txtfile:
        
    for line in conll_lines:
        txtfile.write(line)

txtfile.close()
    

If you look at the first 25 lines of the final CoNLL file, you'll see that rows containing only line breaks signal the beginning of a new sentence.

In [10]:
with open(file_loc,'r') as f:
    lines=f.readlines()[0:25]
f.close()
lines

['-DOCSTART- -X- -X- -O-\n',
 '\n',
 '\n',
 'Identification NN NN O\n',
 'of NN NN O\n',
 'APC2 NN NN O\n',
 ', NN NN O\n',
 'a NN NN O\n',
 'homologue NN NN O\n',
 'of NN NN O\n',
 'the NN NN O\n',
 'adenomatous NN NN B-Disease\n',
 'polyposis NN NN I-Disease\n',
 'coli NN NN I-Disease\n',
 'tumour NN NN I-Disease\n',
 'suppressor NN NN O\n',
 '. NN NN O\n',
 '\n',
 'The NN NN O\n',
 'adenomatous NN NN B-Disease\n',
 'polyposis NN NN I-Disease\n',
 'coli NN NN I-Disease\n',
 '( NN NN I-Disease\n',
 'APC NN NN I-Disease\n',
 ') NN NN I-Disease\n']

Now let's see SparkNLPs cutting edge results! We'll train NerCRF and NerDL models on the BC5CDR-Chem benchmark dataset.

### Training and Evaluating NerCRF

NerCRF is a named entity recognition model in the SparkNLP library which is based on Conditional Random Fields. It requires part-of-speech for model training. To train a model with NerCRF, first import SparkNLP and start your Spark session. Then load the CoNLL.

In [11]:
import sparknlp
from pyspark.ml import Pipeline
from pyspark.sql import SparkSession
from sparknlp.annotator import *
# from sparknlp_jsl.annotator import *
from sparknlp.base import *
# import sparknlp_jsl
import pyspark.sql.functions as F

spark = sparknlp.start()

spark



:: loading settings :: url = jar:file:/home/ubuntu/.local/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/ubuntu/.ivy2/cache
The jars for the packages stored in: /home/ubuntu/.ivy2/jars
com.johnsnowlabs.nlp#spark-nlp_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-d4b9d8fa-6a3e-4577-b81a-b55f67a55a7a;1.0
	confs: [default]
	found com.johnsnowlabs.nlp#spark-nlp_2.12;4.3.1 in central
	found com.typesafe#config;1.4.2 in central
	found org.rocksdb#rocksdbjni;6.29.5 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.828 in central
	found com.github.universal-automata#liblevenshtein;3.0.0 in central
	found com.google.protobuf#protobuf-java-util;3.0.0-beta-3 in central
	found com.google.protobuf#protobuf-java;3.0.0-beta-3 in central
	found com.google.code.gson#gson;2.3 in central
	found it.unimi.dsi#fastutil;7.0.12 in central
	found org.projectlombok#lombok;1.16.8 in central
	found com.google.cloud#google-cloud-storage;2.16.0 in central
	found com.google.guava#guava;31.1-jre in central
	found com.google.guava#failur

23/12/05 19:01:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/12/05 19:01:52 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [12]:
from sparknlp.training import CoNLL

file_loc='BC5CDRtrain.txt'
train = CoNLL().readDataset(spark, file_loc)

In [13]:
train = train.repartition(1000)

In [14]:
train.rdd.getNumPartitions()

[Stage 0:>                                                          (0 + 0) / 8]

23/12/05 19:02:18 WARN TaskSetManager: Stage 0 contains a task of very large size (17818 KiB). The maximum recommended task size is 1000 KiB.


[Stage 0:>                                                          (0 + 8) / 8]

1000

In [15]:
train.select(
    F.explode(
        F.arrays_zip(
            F.col('token.result').alias("token"),
            F.col('label.result').alias("ground_truth"))).alias("cols")) \
    .select(
        F.col("cols").getItem("token").alias("token"),
        F.col("cols").getItem("ground_truth").alias("ground_truth")) \
    .groupBy('ground_truth').count().show()


23/12/05 19:02:25 WARN TaskSetManager: Stage 1 contains a task of very large size (17818 KiB). The maximum recommended task size is 1000 KiB.




+------------+------+
|ground_truth| count|
+------------+------+
|      I-CHEM|  3648|
|      B-CHEM| 10550|
|           O|221425|
+------------+------+



                                                                                

I will add Glove embeddings to the dataset before Ner training, but if you want better results with your healthcare projects, use SparkNLP Clinical embeddings. First, set up your pipeline and fit your model to your training dataset. The fitting process could take some time.

In [16]:
word_embeddings = WordEmbeddingsModel.pretrained('glove_100d')\
          .setInputCols(["document", "token"])\
          .setOutputCol("embeddings")

nerTagger = NerCrfApproach()\
    .setInputCols(["sentence", "token", "pos", "embeddings"])\
    .setLabelColumn("label")\
    .setOutputCol("ner")\
    .setMaxEpochs(9)\
    
ner_pipeline = Pipeline(stages=[
          word_embeddings,
          nerTagger
 ])

glove_100d download started this may take some time.
Approximate size to download 145.3 MB
[ | ]glove_100d download started this may take some time.
Approximate size to download 145.3 MB
[ / ]Download done! Loading the resource.
[OK!]


In [17]:
ner_model = ner_pipeline.fit(train)

23/12/05 19:02:44 WARN TaskSetManager: Stage 8 contains a task of very large size (17818 KiB). The maximum recommended task size is 1000 KiB.


                                                                                

Next add word embeddings to your test dataset and make your predictions.

In [18]:
from sparknlp.training import CoNLL

file_loc='BC5CDRtest.txt'
test = CoNLL().readDataset(spark, file_loc)

test_data = word_embeddings.transform(test)


In [22]:
predictions = ner_model.transform(test_data)

You can see all of your input and output columns in the final "predictions" dataframe, but I'll focus on the 'ner' column which contains the prediction, and the 'label' column which contains the ground truth. You can use sklearn.metrics classification_report to check the accuracy of the predictions using these 2 columns. 

In [26]:
# !pip install scikit-learn

In [72]:
# predictions.show()


In [23]:
from sklearn.metrics import classification_report
import pyspark.sql.functions as F

preds = predictions.select(
    F.explode(
        F.arrays_zip(
            F.col('token.result').alias("token"),
            F.col('label.result').alias("label"),
            F.col('ner.result').alias("ner")
        )
    ).alias("cols")).select(
        F.col("cols").getItem("token").alias("token"),
        F.col("cols").getItem("label").alias("label"),
        F.col("cols").getItem("ner").alias("ner")
    )

In [25]:
preds.filter("ner!='O'").show()

23/12/05 17:42:47 WARN TaskSetManager: Stage 19 contains a task of very large size (9303 KiB). The maximum recommended task size is 1000 KiB.


+------------+------+------+
|       token| label|   ner|
+------------+------+------+
|  dobutamine|B-CHEM|B-CHEM|
|  dobutamine|B-CHEM|B-CHEM|
|  dobutamine|B-CHEM|B-CHEM|
|  Dubutamine|B-CHEM|B-CHEM|
|           5|B-CHEM|B-CHEM|
|           -|I-CHEM|I-CHEM|
|fluorouracil|I-CHEM|I-CHEM|
|           5|B-CHEM|B-CHEM|
|           -|I-CHEM|I-CHEM|
|fluorouracil|I-CHEM|I-CHEM|
|           5|B-CHEM|B-CHEM|
|           -|I-CHEM|I-CHEM|
|          FU|I-CHEM|I-CHEM|
|    ammonium|B-CHEM|B-CHEM|
|    ammonium|B-CHEM|B-CHEM|
|           5|B-CHEM|B-CHEM|
|           -|I-CHEM|I-CHEM|
|          FU|I-CHEM|I-CHEM|
|    ammonium|B-CHEM|B-CHEM|
|           5|B-CHEM|B-CHEM|
+------------+------+------+
only showing top 20 rows



In [26]:
#Convert the Spark dataframe to a Pandas dataframe.
import pandas as pd
preds_df=preds.toPandas()

23/12/05 17:42:58 WARN TaskSetManager: Stage 20 contains a task of very large size (9303 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

In [27]:
print (classification_report(preds_df['label'], preds_df['ner']))

              precision    recall  f1-score   support

      B-CHEM       0.92      0.82      0.87      5385
      I-CHEM       0.77      0.80      0.78      1628
           O       0.99      0.99      0.99    117737

    accuracy                           0.98    124750
   macro avg       0.89      0.87      0.88    124750
weighted avg       0.98      0.98      0.98    124750



### Training and Evaluating NerDL

NerDL is a deep learning named entity recognition model in the SparkNLP library which does not require training data to contain parts-of-speech. It is a Bidirectional LSTM-CNN. For a more detailed overview of training a model using NerDL, you can check out this [post](https://medium.com/r/?url=https%3A%2F%2Ftowardsdatascience.com%2Fnamed-entity-recognition-ner-with-bert-in-spark-nlp-874df20d1d77). We've already loaded the BC5CDR-Chem test and train datasets. Now I can show you how to add Glove embeddings and save the test data as a parquet file before NerDL model training

In [28]:
word_embeddings = WordEmbeddingsModel.pretrained('glove_100d')\
          .setInputCols(["document", "token"])\
          .setOutputCol("embeddings")

test_data = word_embeddings.transform(test)

test_data.write.parquet('../test.parquet')


glove_100d download started this may take some time.
Approximate size to download 145.3 MB
[OK!]


23/12/05 17:43:40 ERROR FileOutputCommitter: Mkdirs failed to create file:/home/test.parquet/_temporary/0
23/12/05 17:43:40 WARN TaskSetManager: Stage 21 contains a task of very large size (9303 KiB). The maximum recommended task size is 1000 KiB.
23/12/05 17:43:41 ERROR Executor: Exception in task 4.0 in stage 21.0 (TID 2157)
java.io.IOException: Mkdirs failed to create file:/home/test.parquet/_temporary/0/_temporary/attempt_20231205174340527253840669164587_0021_m_000004_2157 (exists=false, cwd=file:/home/ubuntu)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:515)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:500)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
	at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
	at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:347)
	at org.apache.parquet

Py4JJavaError: An error occurred while calling o377.parquet.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 21.0 failed 1 times, most recent failure: Lost task 0.0 in stage 21.0 (TID 2153) (ip-172-31-22-112.eu-central-1.compute.internal executor driver): java.io.IOException: Mkdirs failed to create file:/home/test.parquet/_temporary/0/_temporary/attempt_20231205174340527253840669164587_0021_m_000000_2153 (exists=false, cwd=file:/home/ubuntu)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:515)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:500)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
	at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
	at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:347)
	at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:314)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:484)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:422)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:411)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:36)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$$anon$1.newInstance(ParquetUtils.scala:490)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:161)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:146)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:389)
	at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:100)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeWrite$4(FileFormatWriter.scala:307)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:271)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:792)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: java.io.IOException: Mkdirs failed to create file:/home/test.parquet/_temporary/0/_temporary/attempt_20231205174340527253840669164587_0021_m_000000_2153 (exists=false, cwd=file:/home/ubuntu)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:515)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:500)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1175)
	at org.apache.parquet.hadoop.util.HadoopOutputFile.create(HadoopOutputFile.java:74)
	at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:347)
	at org.apache.parquet.hadoop.ParquetFileWriter.<init>(ParquetFileWriter.java:314)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:484)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:422)
	at org.apache.parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:411)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.<init>(ParquetOutputWriter.scala:36)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetUtils$$anon$1.newInstance(ParquetUtils.scala:490)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:161)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:146)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:389)
	at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:100)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


Next set up the rest of the pipeline by adding the location of the test data parquet file and the folder where your Tensorflow graphs are located. Using ".setEvaluationLogExtended(True)" will output a more detailed model evaluation log. When you run the training, If you get an error for incompatible TF graph, use NerDL_Graph.ipynb located [here](https://github.com/JohnSnowLabs/spark-nlp-workshop/blob/master/tutorials/Certification_Trainings/Public/4.1_NerDL_Graph.ipynb) to create a graph using the parameters given in the error message. If you're having trouble with this part of NerDL model training, you should read this [post](https://medium.com/r/?url=https%3A%2F%2Ftowardsdatascience.com%2Fnamed-entity-recognition-ner-with-bert-in-spark-nlp-874df20d1d77).

In [29]:
nerTagger = NerDLApproach()\
  .setInputCols(["sentence", "token", "embeddings"])\
  .setLabelColumn("label")\
  .setOutputCol("ner")\
  .setMaxEpochs(15)\
  .setLr(0.001)\
  .setPo(0.005)\
  .setBatchSize(32)\
  .setRandomSeed(0)\
  .setVerbose(1)\
  .setValidationSplit(0.2)\
  .setEvaluationLogExtended(True) \
  .setEnableOutputLogs(True)\
  .setIncludeConfidence(True)\
  .setGraphFolder('../tfgraphs')\
  .setTestDataset('../test.parquet')
                  
ner_pipeline = Pipeline(stages=[
          word_embeddings,
          nerTagger
 ])

Even though the word_embeddings pipe is in a previous cell, it is still part of the pipeline. In the next cell I'll fit the model to the training set. This could take some time.

In [30]:
%%time

ner_model = ner_pipeline.fit(train)



AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/home/test.parquet.

You can find the final log at the top of the list here:

In [29]:
! cd ~/annotator_logs && ls -lt

total 280
-rw-rw-r-- 1 myilmaz myilmaz 13529 Jun 13 18:07 NerDLApproach_004e6626b3bb.log
-rw-rw-r-- 1 myilmaz myilmaz 17997 Jun 12 17:51 NerDLApproach_15b6d84b808b.log
-rw-rw-r-- 1 myilmaz myilmaz  9819 Jun 11 20:00 NerDLApproach_31530d63198f.log
-rw-rw-r-- 1 myilmaz myilmaz  1092 Jun 11 19:52 NerDLApproach_e5b8f13159eb.log
-rw-rw-r-- 1 myilmaz myilmaz  1100 Jun 11 19:45 NerDLApproach_9e3eb5e1c0e9.log
-rw-rw-r-- 1 myilmaz myilmaz  2007 Jun 11 10:11 NerDLApproach_7e555f3bb935.log
-rw-rw-r-- 1 myilmaz myilmaz  2009 Jun 11 10:00 NerDLApproach_b61689d542fe.log
-rw-rw-r-- 1 myilmaz myilmaz 12650 Jun 10 17:38 NerDLApproach_3a19217cbe4b.log
-rw-rw-r-- 1 myilmaz myilmaz  1049 Jun  7 21:16 NerDLApproach_0560e20a620b.log
-rw-rw-r-- 1 myilmaz myilmaz 55543 May 16 09:57 NerDLApproach_70a344517ff7.log
-rw-rw-r-- 1 myilmaz myilmaz 55501 May 16 08:47 NerDLApproach_769f9d4b74d3.log
-rw-rw-r-- 1 myilmaz myilmaz 27771 May 16 08:15 NerDLApproach_acc76debb989.log
-rw-rw-r-- 1 myilmaz myilmaz 

For each training epoch your extended log will print 2 sets of metrics, one for the validation dataset and one for the test dataset. (The metrics for the validation data is on the top). For each dataset there's a table showing true positives (tp), false positives (fp), false negatives (fn), precision, recall and f1 scores for each entity (except 'O'). Beneath this table you'll find the macro-average and micro-average precision, recall and f1 scores for the dataset. So if you're looking for the micro-average f1 score for the test data, you'll find it on the last line of the log for each epoch.

In [1]:
!cat ~/annotator_logs/NerDLApproach_15b6d84b808b.log

Name of the selected graph: /home/myilmaz/devel/mag/datasets/make_CoNLL/tutorial/tfgraphs/blstm_3_100_128_82.pb
Training started, trainExamples: 7313, labels: 3 chars: 81, 


Epoch: 0 started, learning rate: 0.001, dataset size: 7313
Done, 33.119487882 loss: 722.0155, batches: 231
Quality on validation dataset (20.0%), valExamples = 1828
time to finish evaluation: 3.284676026
label	 tp	 fp	 fn	 prec	 rec	 f1
B-CHEM	 1572	 294	 508	 0.8424437	 0.75576925	 0.7967562
I-CHEM	 349	 85	 501	 0.8041475	 0.41058823	 0.5436137
tp: 1921 fp: 379 fn: 1009 labels: 2
Macro-average	 prec: 0.8232956, rec: 0.58317876, f1: 0.6827405
Micro-average	 prec: 0.8352174, rec: 0.6556314, f1: 0.7346081
Quality on test dataset: 
time to finish evaluation: 8.096200166
label	 tp	 fp	 fn	 prec	 rec	 f1
B-CHEM	 3946	 743	 1439	 0.84154403	 0.7327762	 0.7834028
I-CHEM	 586	 232	 1042	 0.71638143	 0.35995087	 0.47914964
tp: 4532 fp: 975 fn: 2481 labels: 2
Macro-average	 prec: 0.77896273, rec: 0.5463

Overall our NerDL and NerCRF models didn't do too bad with the BC5CDR-Chem benchmark dataset enriched with Glove embeddings. In the 11th epoch the NerDL model's macro-average f1 score on the test set was 0.86 and after 9 epochs the NerCRF had a macro-average f1 score of 0.88 on the test set. However, using Clinical embeddings instead of Glove will bring your NerDL micro-average F1 score from 0.887 up to 0.915, much closer to the best published score for this dataset.