# NER with BERT in Spark NLP

In [None]:
!python -V

## Installation

In [None]:
import os

# Install java
! apt-get update -qq
! apt-get install -y openjdk-8-jdk-headless -qq > /dev/null

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]
! java -version

# Install pyspark
! pip install --ignore-installed pyspark==2.4.4

# Install Spark NLP
! pip install --ignore-installed spark-nlp

## Import libraries and download datasets

In [None]:
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline

import sparknlp
from sparknlp.annotator import *
from sparknlp.common import *
from sparknlp.base import *

In [None]:
spark = sparknlp.start()
#spark = sparknlp.start(gpu=True)

In [None]:
print("Spark NLP version: ", sparknlp.version())
print("Apache Spark version: ", spark.version)

In [None]:
def start(gpu=False):
    builder = SparkSession.builder \
        .appName("Spark NLP") \
        .master("local[*]") \
        .config("spark.driver.memory", "8G") \
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")\
        .config("spark.kryoserializer.buffer.max", "1000M")
    if gpu:
        builder.config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp-gpu_2.11:2.5.1")
    else:
        builder.config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.11:2.5.1")

    return builder.getOrCreate()

  
spark = start(gpu=False)

In [None]:
from urllib.request import urlretrieve

urlretrieve('https://github.com/JohnSnowLabs/spark-nlp/raw/master/src/test/resources/conll2003/eng.train',
           'eng.train')

urlretrieve('https://github.com/JohnSnowLabs/spark-nlp/raw/master/src/test/resources/conll2003/eng.testa',
           'eng.testa')


In [None]:
with open("eng.train") as f:
    c=f.read()

print (c[:500])

## Building NER pipeline

In [None]:
from sparknlp.training import CoNLL

training_data = CoNLL().readDataset(spark, './eng.train')
training_data.show(3)

In [None]:
training_data.count()

### Loading Bert

In Spark NLP, we have four pre-trained variants of BERT: bert_base_uncased , bert_base_cased , bert_large_uncased , bert_large_cased . Which one to use depends on your use case, train set, and the complexity of the task you are trying to model.

In the code snippet above, we basically load the bert_base_cased version from Spark NLP public resources and point thesentenceand token columns in   setInputCols(). In short, BertEmbeddings() annotator will take sentence and token columns and populate Bert embeddings in bert column. In general, each word is translated to a 768-dimensional vector. The parametersetPoolingLayer() can be set to 0 as the first layer and fastest, -1 as the last layer and -2 as the second-to-last-hidden layer.

As explained by the authors of official BERT paper, different BERT layers capture different information. The last layer is too closed to the target functions (i.e. masked language model and next sentence prediction) during pre-training, therefore it may be biased to those targets. If you want to use the last hidden layer anyway, please feel free to set pooling_layer=-1. Intuitively, pooling_layer=-1 is close to the training output, so it may be biased to the training targets. If you don't fine-tune the model, then this could lead to a bad representation. That said, it is a matter of trade-off between model accuracy and computational resources you have.

In [None]:
bert_annotator = BertEmbeddings.pretrained('bert_base_cased', 'en') \
 .setInputCols(["sentence",'token'])\
 .setOutputCol("bert")\
 .setCaseSensitive(False)\
 .setPoolingLayer(0)

In [None]:
# BertEmbeddings.load("local/path/")

In [None]:
from sparknlp.training import CoNLL

test_data = CoNLL().readDataset(spark, './eng.testa')

test_data = bert_annotator.transform(test_data)

test_data.show(3)

In [None]:
#test_data.limit(1000).write.parquet("test_withEmbeds.parquet")

In [None]:
test_data.select("bert.result","bert.embeddings",'label.result').show()

In [None]:
import numpy as np

emb_vector = np.array(test_data.select("bert.embeddings").take(1))

emb_vector

In [None]:
nerTagger = NerDLApproach()\
  .setInputCols(["sentence", "token", "bert"])\
  .setLabelColumn("label")\
  .setOutputCol("ner")\
  .setMaxEpochs(1)\
  .setLr(0.001)\
  .setPo(0.005)\
  .setBatchSize(8)\
  .setRandomSeed(0)\
  .setVerbose(1)\
  .setValidationSplit(0.2)\
  .setEvaluationLogExtended(True) \
  .setEnableOutputLogs(True)\
  .setIncludeConfidence(True)\
  .setTestDataset("test_withEmbeds.parquet")


pipeline = Pipeline(
    stages = [
    bert_annotator,
    nerTagger
  ])

You can also set learning rate ( setLr ), learning rate decay coefficient ( setPo ), setBatchSize and setDropout rate. Please see the official repo for the entire list.

In [None]:
%%time

ner_model = pipeline.fit(training_data.limit(1000))

In [None]:
ner_model

In [None]:
# on COLAB, it takes 30 min to train entire trainset (conll2003) for 10 epochs on GPU with layer = 0

In [None]:
predictions = ner_model.transform(test_data)
predictions.show(3)

In [None]:
predictions.select('token.result','label.result','ner.result').show(truncate=40)

In [None]:
predictions.printSchema()

In [None]:
import pyspark.sql.functions as F

predictions.select(F.explode(F.arrays_zip('token.result','label.result','ner.result')).alias("cols")) \
.select(F.expr("cols['0']").alias("token"),
        F.expr("cols['1']").alias("ground_truth"),
        F.expr("cols['2']").alias("prediction")).show(truncate=False)

### Loading from local

In [None]:
# loading the one trained 10 epochs on GPU with entire train set

loaded_ner_model = NerDLModel.load("NER_bert_20200226")\
 .setInputCols(["sentence", "token", "bert"])\
 .setOutputCol("ner")

In [None]:
predictions_loaded = loaded_ner_model.transform(test_data)

predictions_loaded.select(F.explode(F.arrays_zip('token.result','label.result','ner.result')).alias("cols")) \
.select(F.expr("cols['0']").alias("token"),
        F.expr("cols['1']").alias("ground_truth"),
        F.expr("cols['2']").alias("prediction")).show(30, truncate=False)

In [None]:
import pandas as pd

df = predictions_loaded.select('token.result','label.result','ner.result').toPandas()

df

### Bert with poolingLayer -2

In [None]:
bert_annotator.setPoolingLayer(-2)

In [None]:
pipeline = Pipeline(
    stages = [
    bert_annotator,
    nerTagger
  ])

In [None]:
ner_model_v2 = pipeline.fit(training_data.limit(1000))

In [None]:
predictions_v2 = ner_model_v2.transform(test_data.limit(10))

predictions_v2.select(F.explode(F.arrays_zip('token.result','label.result','ner.result')).alias("cols")) \
.select(F.expr("cols['0']").alias("token"),
        F.expr("cols['1']").alias("ground_truth"),
        F.expr("cols['2']").alias("prediction")).show(truncate=False)

### with Glove Embeddings

In [None]:
glove = WordEmbeddingsModel().pretrained() \
 .setInputCols(["sentence",'token'])\
 .setOutputCol("glove")\
 .setCaseSensitive(False)

test_data = CoNLL().readDataset(spark, './eng.testa')

test_data = glove.transform(test_data.limit(1000))

test_data.write.parquet("test_withGloveEmbeds.parquet")


In [None]:
nerTagger.setInputCols(["sentence", "token", "glove"])
nerTagger.setTestDataset("test_withGloveEmbeds.parquet")

glove_pipeline = Pipeline(
    stages = [
    glove,
    nerTagger
  ])

In [None]:
%%time

ner_model_v3 = glove_pipeline.fit(training_data.limit(1000))

In [None]:
predictions_v3 = ner_model_v3.transform(test_data.limit(10))

# test_data.sample(False,0.1,0)

predictions_v3.select(F.explode(F.arrays_zip('token.result','label.result','ner.result')).alias("cols")) \
.select(F.expr("cols['0']").alias("token"),
        F.expr("cols['1']").alias("ground_truth"),
        F.expr("cols['2']").alias("prediction")).show(truncate=False)

In [None]:
np.array (predictions.select('token.result').take(1))[0][0]

In [None]:
import pandas as pd

tokens = np.array (predictions.select('token.result').take(1))[0][0]
ground = np.array (predictions.select('label.result').take(1))[0][0]
label_bert_0 = np.array (predictions.select('ner.result').take(1))[0][0]
label_bert_2 = np.array (predictions_v2.select('ner.result').take(1))[0][0]
label_glove = np.array (predictions_v3.select('ner.result').take(1))[0][0]

pd.DataFrame({'token':tokens,
              'ground':ground,
              'label_bert_0':label_bert_0,
              'label_bert_2':label_bert_2,
              'label_glove':label_glove})

### Saving the trained model

In [None]:
ner_model_v3.stages

In [None]:
ner_model_v3.stages[1].write().overwrite().save('NER_bert_20200219')

## Prediction Pipeline

In [None]:
document = DocumentAssembler()\
    .setInputCol("text")\
    .setOutputCol("document")

sentence = SentenceDetector()\
    .setInputCols(['document'])\
    .setOutputCol('sentence')

token = Tokenizer()\
    .setInputCols(['sentence'])\
    .setOutputCol('token')

bert = BertEmbeddings.pretrained('bert_base_cased', 'en') \
 .setInputCols(["sentence",'token'])\
 .setOutputCol("bert")\
 .setCaseSensitive(False)

loaded_ner_model = NerDLModel.load("NER_bert_20200219")\
 .setInputCols(["sentence", "token", "bert"])\
 .setOutputCol("ner")

converter = NerConverter()\
  .setInputCols(["document", "token", "ner"])\
  .setOutputCol("ner_span")

ner_prediction_pipeline = Pipeline(
    stages = [
        document,
        sentence,
        token,
        bert,
        loaded_ner_model,
        converter])

In [None]:
empty_data = spark.createDataFrame([['']]).toDF("text")

empty_data.show()

In [None]:
prediction_model = ner_prediction_pipeline.fit(empty_data)


In [None]:
text = "Peter Parker is a nice guy and lives in New York."
sample_data = spark.createDataFrame([[text]]).toDF("text")
sample_data.show()

In [None]:

preds = prediction_model.transform(sample_data)

preds.show()

In [None]:
preds.select('ner_span.result').take(1)

In [None]:

preds.select(F.explode(F.arrays_zip("ner_span.result","ner_span.metadata")).alias("entities")) \
.select(F.expr("entities['0']").alias("chunk"),
        F.expr("entities['1'].entity").alias("entity")).show(truncate=False)

In [None]:
document = DocumentAssembler()\
    .setInputCol("text")\
    .setOutputCol("document")

sentence = SentenceDetector()\
    .setInputCols(['document'])\
    .setOutputCol('sentence')

token = Tokenizer()\
    .setInputCols(['sentence'])\
    .setOutputCol('token')

loaded_ner_model = NerDLModel.load("NER_bert_20200219")\
 .setInputCols(["sentence", "token", "glove"])\
 .setOutputCol("ner")

converter = NerConverter()\
  .setInputCols(["document", "token", "ner"])\
  .setOutputCol("ner_span")

glove_ner_prediction_pipeline = Pipeline(
    stages = [
        document,
        sentence,
        token,
        glove,
        loaded_ner_model,
        converter])

In [None]:
glove_prediction_model = glove_ner_prediction_pipeline.fit(empty_data)

In [None]:

preds = glove_prediction_model.transform(sample_data)

preds.show()

In [None]:

preds.select(F.explode(F.arrays_zip("ner_span.result","ner_span.metadata")).alias("entities")) \
.select(F.expr("entities['0']").alias("chunk"),
        F.expr("entities['1'].entity").alias("entity")).show(truncate=False)

### Pretrained Pipelines

In [None]:
from sparknlp.pretrained import PretrainedPipeline

pretrained_pipeline = PretrainedPipeline('recognize_entities_dl', lang='en')

#onto_recognize_entities_sm
#explain_document_dl

In [None]:
text = "The Mona Lisa is a 16th century oil painting created by Leonardo. It's held at the Louvre in Paris."

result = pretrained_pipeline.annotate(text)

list(zip(result['token'], result['ner']))

In [None]:
pretrained_pipeline2 = PretrainedPipeline('explain_document_dl', lang='en')


In [None]:
text = "The Mona Lisa is a 16th centry oil painting created by Leonrdo. It's held at the Louvre in Paris."

result2 = pretrained_pipeline2.annotate(text)

result2
list(zip(result2['token'],  result2['checked'], result2['pos'], result2['ner'],  result2['lemma'],  result2['stem']))

In [None]:
xx= pretrained_pipeline2.fullAnnotate(text)

[(n.result, n.metadata['entity']) for n in xx['ner_span']]

## Using your own custom Word Embedding

In [None]:
custom_embeddings = WordEmbeddings()\
  .setInputCols(["sentence", "token"])\
  .setOutputCol("glove")\
  .setStoragePath('/Users/vkocaman/cache_pretrained/PubMed-shuffle-win-2.bin', "BINARY")\
.setDimension(200)

In [None]:
custom_embeddings.fit(training_data.limit(10)).transform(training_data.limit(10)).show()

## creating your own CoNLL dataset

In [None]:
import json
import os
from pyspark.ml import Pipeline
from sparknlp.base import *
from sparknlp.annotator import *
import sparknlp

spark = sparknlp.start()

def get_ann_pipeline ():
    
    document_assembler = DocumentAssembler() \
        .setInputCol("text")\
        .setOutputCol('document')

    sentence = SentenceDetector()\
        .setInputCols(['document'])\
        .setOutputCol('sentence')\
        .setCustomBounds(['\n'])

    tokenizer = Tokenizer() \
        .setInputCols(["sentence"]) \
        .setOutputCol("token")

    pos = PerceptronModel.pretrained() \
              .setInputCols(["sentence", "token"]) \
              .setOutputCol("pos")
    
    embeddings = WordEmbeddingsModel.pretrained()\
          .setInputCols(["sentence", "token"])\
          .setOutputCol("embeddings")

    ner_model = NerDLModel.pretrained() \
          .setInputCols(["sentence", "token", "embeddings"]) \
          .setOutputCol("ner")

    ner_converter = NerConverter()\
      .setInputCols(["sentence", "token", "ner"])\
      .setOutputCol("ner_chunk")

    ner_pipeline = Pipeline(
        stages = [
            document_assembler,
            sentence,
            tokenizer,
            pos,
            embeddings,
            ner_model,
            ner_converter
        ]
    )

    empty_data = spark.createDataFrame([[""]]).toDF("text")

    ner_pipelineFit = ner_pipeline.fit(empty_data)

    ner_lp_pipeline = LightPipeline(ner_pipelineFit)

    print ("Spark NLP NER lightpipeline is created")

    return ner_lp_pipeline


In [None]:
conll_pipeline = get_ann_pipeline ()

In [None]:
parsed = conll_pipeline.annotate ("Peter Parker is a nice guy and lives in New York.")
parsed

In [None]:
conll_lines=''

for token, pos, ner in zip(parsed['token'],parsed['pos'],parsed['ner']):

    conll_lines += "{} {} {} {}\n".format(token, pos, pos, ner)


print(conll_lines)