Compute necessary imports

In [1]:
import os
import sys
sys.path.append('../../')

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline

import time
import zipfile

Initialize Spark, if not already in a pyspark environment

In [2]:
spark = SparkSession.builder \
    .appName("ner")\
    .master("local[1]")\
    .config("spark.driver.memory","6G")\
    .config("spark.driver.maxResultSize", "2G") \
    .config("spark.jars.packages", "JohnSnowLabs:spark-nlp:1.6.2")\
    .config("spark.kryoserializer.buffer.max", "500m")\
    .getOrCreate()

In [3]:
from sparknlp.annotator import *
from sparknlp.common import *
from sparknlp.base import *

Download CoNLL 2003 data if not present

In [4]:
# Download CoNLL 2003 Dataset
import os
from pathlib import Path
import urllib.request

# https://github.com/patverga/torch-ner-nlp-from-scratch/tree/master/data/conll2003
if not Path("eng.train").is_file():
    url = "https://github.com/patverga/torch-ner-nlp-from-scratch/raw/master/data/conll2003/eng.train"
    urllib.request.urlretrieve(url, 'eng.train')


Download Glove embeddings and unzip, if not present

In [5]:
# Download Glove Word Embeddings
file = "../../data/glove.6B.zip"
if not Path(file).is_file():
    url = "http://nlp.stanford.edu/data/glove.6B.zip"
    print("Start downoading Glove Word Embeddings. It will take some time, please wait...")
    urllib.request.urlretrieve(url, file)
    print("Downloading finished")
    
if not Path("../../data/glove.6B.100d.txt").is_file():
    zip_ref = zipfile.ZipFile(file, 'r')
    zip_ref.extractall("../../data/")
    zip_ref.close()

Use different file protocol if windows or linux

In [6]:
fProtocol = 'file:///' if os.name == 'nt' else 'file://'

Create annotator components with appropriate params and in the right order. The finisher will output only NER. Put everything in Pipeline

In [7]:
documentAssembler = DocumentAssembler()\
  .setInputCol("text")\
  .setOutputCol("document")

sentenceDetector = SentenceDetector()\
  .setInputCols(["document"])\
  .setOutputCol("sentence")

tokenizer = Tokenizer()\
  .setInputCols(["document"])\
  .setOutputCol("token")

nerTagger = NerDLApproach()\
  .setInputCols(["sentence", "token"])\
  .setLabelColumn("label")\
  .setOutputCol("ner")\
  .setMaxEpochs(10)\
  .setExternalDataset(fProtocol + os.getcwd() + "/eng.train")\
  .setValidationDataset(fProtocol + os.getcwd() + "/eng.testa")\
  .setTestDataset(fProtocol + os.getcwd() + "/eng.testb")\
  .setEmbeddingsSource(fProtocol + os.getcwd() + "../../../data/glove.6B.100d.txt", 100, 2)\
  .setRandomSeed(0)\
  .setVerbose(2)

converter = NerConverter()\
  .setInputCols(["document", "token", "ner"])\
  .setOutputCol("ner_span")
    
finisher = Finisher() \
    .setInputCols(["ner_span"]) \
    .setIncludeKeys(True)

pipeline = Pipeline(
    stages = [
    documentAssembler,
    sentenceDetector,
    tokenizer,
    nerTagger,
    converter,
    finisher
  ])


Load parquet dataset and cache into memory

In [8]:
#Load the input data to be annotated

path = os.getcwd().replace("\\", "/") + "../../../data/sentiment.parquet"
path = os.path.abspath(path)
print(path)

data = spark. \
        read. \
        parquet(fProtocol + path). \
        limit(1000)
data.cache()
data.count()
data.show()

/home/jovyan/work/data/sentiment.parquet
+------+---------+--------------------+
|itemid|sentiment|                text|
+------+---------+--------------------+
|     1|        0|                 ...|
|     2|        0|                 ...|
|     3|        1|              omg...|
|     4|        0|          .. Omga...|
|     5|        0|         i think ...|
|     6|        0|         or i jus...|
|     7|        1|       Juuuuuuuuu...|
|     8|        0|       Sunny Agai...|
|     9|        1|      handed in m...|
|    10|        1|      hmmmm.... i...|
|    11|        0|      I must thin...|
|    12|        1|      thanks to a...|
|    13|        0|      this weeken...|
|    14|        0|     jb isnt show...|
|    15|        0|     ok thats it ...|
|    16|        0|    &lt;-------- ...|
|    17|        0|    awhhe man.......|
|    18|        1|    Feeling stran...|
|    19|        0|    HUGE roll of ...|
|    20|        0|    I just cut my...|
+------+---------+--------------------+

Train the pipeline

In [9]:
start = time.time()
print("Start fitting")
model = pipeline.fit(data)
print("Fitting is ended")
print (time.time() - start)

Start fitting
Fitting is ended
3853.542244195938


Lets predict with the model

In [10]:
model.transform(data).show()

+------+---------+--------------------+--------------------+
|itemid|sentiment|                text|   finished_ner_span|
+------+---------+--------------------+--------------------+
|     1|        0|                 ...|entity->ORG#resul...|
|     2|        0|                 ...|entity->LOC#resul...|
|     3|        1|              omg...|                    |
|     4|        0|          .. Omga...|entity->PER#resul...|
|     5|        0|         i think ...|                    |
|     6|        0|         or i jus...|                    |
|     7|        1|       Juuuuuuuuu...|entity->PER#resul...|
|     8|        0|       Sunny Agai...|entity->ORG#resul...|
|     9|        1|      handed in m...|                    |
|    10|        1|      hmmmm.... i...|                    |
|    11|        0|      I must thin...|                    |
|    12|        1|      thanks to a...|                    |
|    13|        0|      this weeken...|                    |
|    14|        0|     j

Save both pipeline and single model once trained, on disk

In [11]:
pipeline.write().overwrite().save("./ner_dl_pipeline")
model.write().overwrite().save("./ner_dl_model")

Load both again, deserialize from disk

In [12]:
from pyspark.ml import PipelineModel, Pipeline

Pipeline.read().load("./ner_dl_pipeline")
sameModel = PipelineModel.read().load("./ner_dl_model")