In [None]:
import sys
sys.path.append('../../')

from pyspark.sql import SparkSession
from pyspark.ml import Pipeline

from sparknlp.annotator import *
from sparknlp.common import *
from sparknlp.base import *

In [None]:
spark = SparkSession.builder \
    .master("local[1]") \
    .config("spark.jar", "lib/sparknlp.jar") \
    .config("spark.driver.memory", "8g")\
    .config("spark.dirver.maxResultSize", "2g")\
    .getOrCreate()

1. Download CoNLL2003 dataset
2. Save 3 files eng.train, eng.testa, eng.testa, into working dir ./

In [None]:
from pyspark.sql.types import *

class Annotation:
    def __init__(self, annotatorType, begin, end, metadata):
        self.annotatorType = annotatorType
        self.begin = begin
        self.end = end
        self.metadata = metadata

        
annotation_schema = StructType([
    StructField("annotatorType", StringType()),
    StructField("begin", IntegerType(), False),
    StructField("end", IntegerType(), False),
    StructField("metadata", MapType(StringType(), StringType()))
])
    


def readDataset(file, spark, doc_column = "text", label_column = "label"):
    result = []
    doc = ""
    labels = []

    with open(file) as f:
        for line in f:
            items = line.split(' ')
            word = items[0]
            if word == "-DOCSTART-":
                result.append((doc, labels))
                doc = ""
                labels = []
            elif len(items) == 1:
                doc = doc + "\n"
            else:
                if doc:
                    doc = doc + " "

                begin = len(doc)
                doc = doc + word
                end = len(doc) - 1
                ner = items[3]
                labels.append(Annotation("named_entity", begin, end, {"tag": ner}))

    if doc:
        result.append((doc, labels))
    
    global annotation_schema
    
    schema =  StructType([
      StructField(doc_column, StringType()),
      StructField(label_column, ArrayType(annotation_schema))
    ])
    
    
    return spark.createDataFrame(result, schema = schema)

In [None]:
import time

def train_model(file, spark):
    print("Dataset Reading")
    
    start = time.time()
    dataset = readDataset(file, spark)
    print("Done, {}\n".format(time.time() - start))

    print("Start fitting")
    params = CrfParams(minEpochs=10, 
                    l2=1.0, 
                    verbose=2, 
                    randomSeed=0, 
                    lossEps=1e-3, 
                    c0=2250000)

    documentAssembler = DocumentAssembler()\
      .setInputCol("text")\
      .setOutputCol("document")

    sentenceDetector = SentenceDetectorModel()\
      .setInputCols(["document"])\
      .setOutputCol("sentence")

    tokenizer = RegexTokenizer()\
      .setInputCols(["document"])\
      .setOutputCol("token")

    posTagger = PerceptronApproach()\
      .setCorpusPath("../../../src/test/resources/anc-pos-corpus/")\
      .setIterations(5)\
      .setInputCols(["token", "document"])\
      .setOutputCol("pos")

    nerTagger = CrfBasedNer()\
      .setCrfParams(params)\
      .setInputCols(["sentence", "token", "pos"])\
      .setLabelColumn("label")\
      .setOutputCol("ner")

    pipeline = Pipeline()\
      .setStages([
        documentAssembler,
        sentenceDetector,
        tokenizer,
        posTagger,
        nerTagger
      ])

    return pipeline.fit(dataset)

In [None]:
from pyspark.sql.functions import col, udf, explode


def get_dataset_for_analysis(file, model, spark):
    print("Dataset Reading")
    
    start = time.time()
    dataset = readDataset(file, spark)
    print("Done, {}\n".format(time.time() - start))
    
    predicted = model.transform(dataset)
    
    global annotation_schema
    
    zip_annotations = udf(
      lambda x, y: list(zip(x, y)),
      ArrayType(StructType([
          StructField("predicted", annotation_schema),
          StructField("label", annotation_schema)
      ]))
    )
    
    return predicted\
        .withColumn("result", zip_annotations("ner", "label"))\
        .select(explode("result").alias("result"))\
        .select(
            col("result.predicted").alias("predicted"), 
            col("result.label").alias("label")
        )
        
def printStat(label, correct, predicted, predictedCorrect):
    prec = predictedCorrect / predicted if predicted > 0 else 0
    rec = predictedCorrect / correct if correct > 0 else 0
    f1 = (2*prec*rec)/(prec + rec) if prec + rec > 0 else 0
    
    print("{}\t{}\t{}\t{}".format(label, prec, rec, f1))
        

def test_dataset(file, model, spark):
    started = time.time()

    df = get_dataset_for_analysis(file, model, spark)
    lines = df.collect()
    lines = [
        (
            r["predicted"]["metadata"]["tag"].strip(), 
            r["label"]["metadata"]["tag"].strip()
        ) for r in lines]
    
    
    correct = {}
    predicted = {}
    predictedCorrect = {}
    
    for (lPredicted, lCorrect) in lines:
        correct[lCorrect] = correct.get(lCorrect, 0) + 1
        predicted[lPredicted] = predicted.get(lPredicted, 0) + 1

        if lCorrect == lPredicted:
            predictedCorrect[lPredicted] = predictedCorrect.get(lPredicted, 0) + 1
    
    print("time: {}".format(time.time() - started))
    
    correct = { key: correct[key] for key in correct.keys() if key != 'O'}
    predicted = { key: predicted[key] for key in predicted.keys() if key != 'O'}
    predictedCorrect = { key: predictedCorrect[key] for key in predictedCorrect.keys() if key != 'O'}
    
    labels = set(list(correct.keys()) + list(predicted.keys()))
    
    print("label\tprec\trec\tf1")
    totalCorrect = sum(correct.values())
    totalPredicted = sum(predicted.values())
    totalPredictedCorrect = sum(predictedCorrect.values())
    
    printStat("Total", totalCorrect, totalPredicted, totalPredictedCorrect)
    
    for label in labels:
        printStat(label, correct.get(label, 0), predicted.get(label, 0), predictedCorrect.get(label, 0))
    

In [None]:
import os.path

folder = '.'
train_file = os.path.join(folder, "eng.train")
test_file_a = os.path.join(folder, "eng.testa")
test_file_b = os.path.join(folder, "eng.testb")

model = train_model(train_file, spark)

In [None]:
print("\nQuality on training data")
test_dataset(train_file, model, spark)

print("\n\nQuality on validation data")
test_dataset(test_file_a, model, spark)

print("\n\nQuality on test data")
test_dataset(test_file_b, model, spark)

In [None]:
df = get_dataset_for_analysis("eng.testa", model, spark)
df.show()