# Named Entity Recognition
In this notebook, we create a Named Entity Recognition (NER) pipeline to detect `CARDINAL`, `EVENT`, `WORK_OF_ART`, `ORG`, `DATE`, `GPE`, `PERSON`, `PRODUCT`, `NORP`, `ORDINAL`, `MONEY`, `LOC`, `FAC`, `LAW`, `TIME`, `PERCENT`, `QUANTITY`, `LANGUAGE` entities from the given text. <br/>

We will use [`ner_onto`](https://sparknlp.org/2020/02/03/onto_100_en.html) pretrained model from sparknlp library. 

Setting up packages and libraries

In [4]:
import sparknlp
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Spark NLP")\
    .master("local[*]")\
    .config("spark.driver.memory","16G")\
    .config("spark.driver.maxResultSize", "0") \
    .config("spark.kryoserializer.buffer.max", "2000M")\
    .config("spark.jars", "/Users/ahmetemintek/Desktop/mlops_study/experiment_nlp/extraction_model/sparknlp_jar/spark-nlp-assembly-5.3.2.jar")\
    .getOrCreate()

24/04/03 21:19:03 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/03 21:19:05 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [5]:
spark

Importing sub-packages

In [6]:
from sparknlp.annotator import *
from sparknlp.base import *
from pyspark.ml import Pipeline
from pyspark.sql import functions as F  
from pyspark.ml.feature import CountVectorizer, HashingTF, IDF, OneHotEncoder, StringIndexer, VectorAssembler, SQLTransformer
import pandas as pd

### Downloading pretrained models

We will download the pretrained models to our local. In this way, we won't be downloading the models everytime the pipeline is called. 

In [13]:
# pretrained model's path
cache_folder_path= "/Users/ahmetemintek/cache_pretrained"

Downloading the pretrained models to local(cache_pretrained folder) by using `ResourceDownloader()` from sparknlp library. 

In [13]:
from sparknlp.pretrained import ResourceDownloader

ResourceDownloader.downloadModel(ContextSpellCheckerModel, "spellcheck_dl", "en",remote_loc="public/models")
ResourceDownloader.downloadModel(WordEmbeddingsModel, "glove_100d", "en",remote_loc="public/models")
ResourceDownloader.downloadModel(NerDLModel, "onto_100", "en",remote_loc="public/models")

glove_100d download started this may take some time.
Approximate size to download 145.3 MB
[OK!]


WORD_EMBEDDINGS_MODEL_48cffc8b9a76

We downloaded a spell checker model, word embeddings model and NER model for our pipeline. 

In [7]:
import glob
# checking the cache folder path
spell_path=glob.glob("/Users/ahmetemintek/cache_pretrained/spell*" )
emb_path=glob.glob("/Users/ahmetemintek/cache_pretrained/glove_100d*" )
ner_path= glob.glob("/Users/ahmetemintek/cache_pretrained/onto_100*")

print("SpellChecker model path", spell_path[1])
print("Embeddings model path", emb_path[0])
print("NER model path", ner_path[0])

SpellChecker model path /Users/ahmetemintek/cache_pretrained/spellcheck_dl_en_3.4.1_3.0_1648457196011
Embeddings model path /Users/ahmetemintek/cache_pretrained/glove_100d_en_2.4.0_2.4_1579690104032
NER model path /Users/ahmetemintek/cache_pretrained/onto_100_en_2.4.0_2.4_1579729071672


In [7]:
#copying pretrained models to the project folder
! cp -r /Users/ahmetemintek/cache_pretrained/spellcheck_dl_en_3.4.1_3.0_1648457196011 ner_pipeline/pretrained_models
! cp -r /Users/ahmetemintek/cache_pretrained/glove_100d_en_2.4.0_2.4_1579690104032 ner_pipeline/pretrained_models
! cp -r /Users/ahmetemintek/cache_pretrained/onto_100_en_2.4.0_2.4_1579729071672 ner_pipeline/pretrained_models

Now, I will create required annotators and models for NER and put them into a pipeline. 

In [9]:
documentAssembler= DocumentAssembler()\
    .setInputCol("text")\
    .setOutputCol("document")

tokenizer= Tokenizer()\
    .setInputCols(["document"])\
    .setOutputCol("token")

spell_checker= ContextSpellCheckerModel.load(spell_path[-1])\
    .setInputCols("token")\
    .setOutputCol("checked")

word_embedding= WordEmbeddingsModel.load(emb_path[0])\
    .setInputCols(["document", "checked"])\
    .setOutputCol("embeddings")

onto_ner = NerDLModel.load(ner_path[0]) \
          .setInputCols(["document", "checked", "embeddings"]) \
          .setOutputCol("ner")

ner_converter= NerConverter()\
    .setInputCols(["document", "checked", "ner"])\
    .setOutputCol("ner_chunk")

nlp_pipeline= Pipeline(stages=[ 
                               documentAssembler,
                               tokenizer,
                               spell_checker,
                               word_embedding,
                               onto_ner,
                               ner_converter
])

2024-04-03 14:01:52.989383: I external/org_tensorflow/tensorflow/core/platform/cpu_feature_guard.cc:151] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2024-04-03 14:02:04.558364: W external/org_tensorflow/tensorflow/core/common_runtime/colocation_graph.cc:1218] Failed to place the graph without changing the devices of some resources. Some of the operations (that had to be colocated with resource generating operations) are not supported on the resources' devices. Current candidate devices are [
  /job:localhost/replica:0/task:0/device:CPU:0].
See below for details of this colocation group:
Colocation Debug Info:
Colocation group had the following types and supported devices: 
Root Member(assigned_device_name_index_=-1 requested_device_name_='/device:GPU:1' assigned_device_nam

Fitting the pipeline with an empty data. 

In [20]:
empty_df= spark.createDataFrame([[" "]]).toDF("text")
pipelineModel= nlp_pipeline.fit(empty_df)

Saving the pipeline

In [27]:
pipelineModel.write().overwrite().save("pretrained_pipeline_for_ner")

24/04/02 11:17:20 WARN TaskSetManager: Stage 32 contains a task of very large size (1200 KiB). The maximum recommended task size is 1000 KiB.
24/04/02 11:17:21 WARN TaskSetManager: Stage 33 contains a task of very large size (23847 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

Testing the saved pipeline

In [10]:
from sparknlp.pretrained import PretrainedPipeline
loaded_pipe= PretrainedPipeline.from_disk("pretrained_pipeline_for_ner")

2024-04-03 14:02:36.275675: W external/org_tensorflow/tensorflow/core/common_runtime/colocation_graph.cc:1218] Failed to place the graph without changing the devices of some resources. Some of the operations (that had to be colocated with resource generating operations) are not supported on the resources' devices. Current candidate devices are [
  /job:localhost/replica:0/task:0/device:CPU:0].
See below for details of this colocation group:
Colocation Debug Info:
Colocation group had the following types and supported devices: 
Root Member(assigned_device_name_index_=-1 requested_device_name_='/device:GPU:1' assigned_device_name_='' resource_device_name_='/device:GPU:1' supported_device_types_=[CPU] possible_devices_=[]
ApplyAdam: CPU 
Cast: CPU 
Add: CPU 
GatherV2: CPU 
Identity: CPU 
Fill: CPU 
RandomUniform: CPU 
Mul: CPU 
Sub: CPU 
Assign: CPU 
VariableV2: CPU 
Const: CPU 

Colocation members, user-requested devices, and framework assigned devices, if any:
  char_repr_cnn/char_embed

Annotating the pipeline with example text. 

In [12]:
loaded_pipe.annotate("Benjamin is a good guy and live is New York")

{'checked': ['Benjamin',
  'is',
  'a',
  'good',
  'guy',
  'and',
  'live',
  'is',
  'New',
  'York'],
 'document': ['Benjamin is a good guy and live is New York'],
 'ner_chunk': ['Benjamin', 'New York'],
 'token': ['Benjamin',
  'is',
  'a',
  'good',
  'guy',
  'and',
  'live',
  'is',
  'New',
  'York'],
 'ner': ['B-PERSON', 'O', 'O', 'O', 'O', 'O', 'O', 'O', 'B-GPE', 'I-GPE'],
 'embeddings': ['Benjamin',
  'is',
  'a',
  'good',
  'guy',
  'and',
  'live',
  'is',
  'New',
  'York']}

In [16]:
loaded_pipe.annotate("Benjamin is a good guy and live is New York")["ner_chunk"]

['Benjamin', 'New York']

As seen above, we successfuly created an NER pipeline, then saved to local and loaded. 