In [1]:
import sys
sys.path.append('../../')

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline

from sparknlp.annotator import *
from sparknlp.common import *
from sparknlp.base import *

ImportError: No module named 'sparknlp'

In [2]:
# Download CoNLL 2003 Dataset
import os
from pathlib import Path
import urllib.request

if not Path("eng.train").is_file():
    url = "https://github.com/patverga/torch-ner-nlp-from-scratch/raw/master/data/conll2003/eng.train"
    urllib.request.urlretrieve(url, 'eng.train')


In [3]:
spark = SparkSession.builder \
    .appName("ner")\
    .master("local[1]")\
    .config("spark.driver.memory","4G")\
    .config("spark.driver.maxResultSize", "2G")\
    .config("spark.jar", "lib/sparknlp.jar")\
    .config("spark.kryoserializer.buffer.max", "500m")\
    .getOrCreate()

1. Download CoNLL2003 dataset
2. Save 3 files eng.train, eng.testa, eng.testa, into working dir ./

In [4]:
import time

documentAssembler = DocumentAssembler()\
  .setInputCol("text")\
  .setOutputCol("document")

sentenceDetector = SentenceDetectorModel()\
  .setInputCols(["document"])\
  .setOutputCol("sentence")

tokenizer = RegexTokenizer()\
  .setInputCols(["document"])\
  .setOutputCol("token")

posTagger = PerceptronApproach()\
  .setCorpusPath("anc-pos-corpus/")\
  .setIterations(5)\
  .setInputCols(["token", "document"])\
  .setOutputCol("pos")

nerTagger = NerCrfApproach()\
  .setInputCols(["sentence", "token", "pos"])\
  .setLabelColumn("label")\
  .setOutputCol("ner")\
  .setMinEpochs(1)\
  .setMaxEpochs(10)\
  .setLossEps(1e-3)\
  .setDicts(["ner-corpus/dict.txt"])\
  .setDatasetPath("eng.train")\
  .setL2(1)\
  .setC0(1250000)\
  .setRandomSeed(0)\
  .setVerbose(2)

finisher = Finisher() \
    .setInputCols(["ner"]) \
    .setIncludeKeys(True)

pipeline = Pipeline(
    stages = [
    documentAssembler,
    sentenceDetector,
    tokenizer,
    posTagger,
    nerTagger,
    finisher
  ])


In [5]:
#Load the input data to be annotated
data = spark. \
        read. \
        parquet("../../../src/test/resources/sentiment.parquet"). \
        limit(1000)
data.cache()
data.count()
data.show()

+------+---------+--------------------+
|itemid|sentiment|                text|
+------+---------+--------------------+
|     1|        0|                 ...|
|     2|        0|                 ...|
|     3|        1|              omg...|
|     4|        0|          .. Omga...|
|     5|        0|         i think ...|
|     6|        0|         or i jus...|
|     7|        1|       Juuuuuuuuu...|
|     8|        0|       Sunny Agai...|
|     9|        1|      handed in m...|
|    10|        1|      hmmmm.... i...|
|    11|        0|      I must thin...|
|    12|        1|      thanks to a...|
|    13|        0|      this weeken...|
|    14|        0|     jb isnt show...|
|    15|        0|     ok thats it ...|
|    16|        0|    &lt;-------- ...|
|    17|        0|    awhhe man.......|
|    18|        1|    Feeling stran...|
|    19|        0|    HUGE roll of ...|
|    20|        0|    I just cut my...|
+------+---------+--------------------+
only showing top 20 rows



In [6]:
print("Start fitting")
model = pipeline.fit(data)
print("Fitting is ended")

Start fitting
Fitting is ended


In [7]:
ner_data = model.transform(data)
ner_data.show()

+------+---------+--------------------+--------------------+
|itemid|sentiment|                text|        finished_ner|
+------+---------+--------------------+--------------------+
|     1|        0|                 ...|word->is#result->...|
|     2|        0|                 ...|word->I#result->O...|
|     3|        1|              omg...|word->omg#result-...|
|     4|        0|          .. Omga...|word->..#result->...|
|     5|        0|         i think ...|word->i#result->O...|
|     6|        0|         or i jus...|word->or#result->...|
|     7|        1|       Juuuuuuuuu...|word->Juuuuuuuuuu...|
|     8|        0|       Sunny Agai...|word->Sunny#resul...|
|     9|        1|      handed in m...|word->handed#resu...|
|    10|        1|      hmmmm.... i...|word->i#result->O...|
|    11|        0|      I must thin...|word->I#result->O...|
|    12|        1|      thanks to a...|word->thanks#resu...|
|    13|        0|      this weeken...|word->this#result...|
|    14|        0|     j

In [8]:
pipeline.write().overwrite().save("./ner_pipeline")
model.write().overwrite().save("./ner_model")

In [9]:
from pyspark.ml import PipelineModel, Pipeline

Pipeline.read().load("./ner_pipeline")
sameModel = PipelineModel.read().load("./ner_model")