From 0d56475bf1df1cf24fc04f90e76c4f2739bf5134 Mon Sep 17 00:00:00 2001 From: Saif Addin Date: Wed, 24 Jan 2018 14:24:22 -0300 Subject: [PATCH 1/3] - Iterators in RH --- ...lWithWordEmbeddings.scala => HasWordEmbeddings.scala} | 9 ++++----- .../nlp/annotators/ner/crf/NerCrfApproach.scala | 6 +++--- .../nlp/annotators/ner/crf/NerCrfModel.scala | 6 +++--- ...Embeddings.scala => ApproachWithWordEmbeddings.scala} | 4 ++-- .../johnsnowlabs/nlp/embeddings/EmbeddingsReadable.scala | 4 ++-- .../com/johnsnowlabs/nlp/util/io/ResourceHelper.scala | 2 +- 6 files changed, 15 insertions(+), 16 deletions(-) rename src/main/scala/com/johnsnowlabs/nlp/{embeddings/ModelWithWordEmbeddings.scala => HasWordEmbeddings.scala} (93%) rename src/main/scala/com/johnsnowlabs/nlp/embeddings/{AnnotatorWithWordEmbeddings.scala => ApproachWithWordEmbeddings.scala} (95%) diff --git a/src/main/scala/com/johnsnowlabs/nlp/embeddings/ModelWithWordEmbeddings.scala b/src/main/scala/com/johnsnowlabs/nlp/HasWordEmbeddings.scala similarity index 93% rename from src/main/scala/com/johnsnowlabs/nlp/embeddings/ModelWithWordEmbeddings.scala rename to src/main/scala/com/johnsnowlabs/nlp/HasWordEmbeddings.scala index 4319c5899e0a7a..98b5482de2699b 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/embeddings/ModelWithWordEmbeddings.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/HasWordEmbeddings.scala @@ -1,14 +1,14 @@ -package com.johnsnowlabs.nlp.embeddings +package com.johnsnowlabs.nlp import java.io.File import java.nio.file.{Files, Paths} -import com.johnsnowlabs.nlp.AnnotatorModel +import com.johnsnowlabs.nlp.embeddings.{WordEmbeddings, WordEmbeddingsClusterHelper} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.ivy.util.FileUtil -import org.apache.spark.{SparkContext, SparkFiles} import org.apache.spark.ml.param.{IntParam, Param} import org.apache.spark.sql.SparkSession +import org.apache.spark.{SparkContext, SparkFiles} /** @@ -17,8 +17,7 @@ import org.apache.spark.sql.SparkSession * * Corresponding Approach have to implement AnnotatorWithWordEmbeddings */ -abstract class ModelWithWordEmbeddings[M <: ModelWithWordEmbeddings[M]] - extends AnnotatorModel[M] with AutoCloseable { +trait HasWordEmbeddings extends AutoCloseable with ParamsAndFeaturesWritable { val nDims = new IntParam(this, "nDims", "Number of embedding dimensions") val indexPath = new Param[String](this, "indexPath", "File that stores Index") diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfApproach.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfApproach.scala index 453fc0cfd47396..72584b03ebb3ac 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfApproach.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfApproach.scala @@ -1,7 +1,7 @@ package com.johnsnowlabs.nlp.annotators.ner.crf import com.johnsnowlabs.ml.crf.{CrfParams, LinearChainCrf, TextSentenceLabels, Verbose} -import com.johnsnowlabs.nlp.{AnnotatorApproach, AnnotatorType, DocumentAssembler} +import com.johnsnowlabs.nlp.{AnnotatorType, DocumentAssembler} import com.johnsnowlabs.nlp.AnnotatorType.{DOCUMENT, NAMED_ENTITY, POS, TOKEN} import com.johnsnowlabs.nlp.annotators.RegexTokenizer import com.johnsnowlabs.nlp.annotators.common.Annotated.PosTaggedSentence @@ -9,7 +9,7 @@ import com.johnsnowlabs.nlp.annotators.common.NerTagged import com.johnsnowlabs.nlp.annotators.pos.perceptron.PerceptronApproach import com.johnsnowlabs.nlp.annotators.sbd.pragmatic.SentenceDetectorModel import com.johnsnowlabs.nlp.datasets.CoNLL -import com.johnsnowlabs.nlp.embeddings.AnnotatorWithWordEmbeddings +import com.johnsnowlabs.nlp.embeddings.ApproachWithWordEmbeddings import org.apache.spark.ml.Pipeline import org.apache.spark.ml.param.{DoubleParam, IntParam, Param, StringArrayParam} import org.apache.spark.ml.util.{DefaultParamsReadable, Identifiable} @@ -19,7 +19,7 @@ import org.apache.spark.sql.{DataFrame, Dataset} Algorithm for training Named Entity Recognition Model. */ class NerCrfApproach(override val uid: String) - extends AnnotatorWithWordEmbeddings[NerCrfApproach, NerCrfModel] { + extends ApproachWithWordEmbeddings[NerCrfApproach, NerCrfModel] { def this() = this(Identifiable.randomUID("NER")) diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala index a444b6de44b06d..1c1339293d70aa 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala @@ -5,8 +5,8 @@ import com.johnsnowlabs.nlp.AnnotatorType._ import com.johnsnowlabs.nlp.annotators.common.{IndexedTaggedWord, NerTagged, PosTagged, TaggedSentence} import com.johnsnowlabs.nlp.annotators.common.Annotated.{NerTaggedSentence, PosTaggedSentence} import com.johnsnowlabs.nlp.serialization.{MapFeature, StructFeature} -import com.johnsnowlabs.nlp.embeddings.{EmbeddingsReadable, ModelWithWordEmbeddings} -import com.johnsnowlabs.nlp.Annotation +import com.johnsnowlabs.nlp.embeddings.EmbeddingsReadable +import com.johnsnowlabs.nlp.{Annotation, AnnotatorModel, HasWordEmbeddings} import org.apache.spark.ml.param.StringArrayParam import org.apache.spark.ml.util._ @@ -14,7 +14,7 @@ import org.apache.spark.ml.util._ /* Named Entity Recognition model */ -class NerCrfModel(override val uid: String) extends ModelWithWordEmbeddings[NerCrfModel]{ +class NerCrfModel(override val uid: String) extends AnnotatorModel[NerCrfModel] with HasWordEmbeddings { def this() = this(Identifiable.randomUID("NER")) diff --git a/src/main/scala/com/johnsnowlabs/nlp/embeddings/AnnotatorWithWordEmbeddings.scala b/src/main/scala/com/johnsnowlabs/nlp/embeddings/ApproachWithWordEmbeddings.scala similarity index 95% rename from src/main/scala/com/johnsnowlabs/nlp/embeddings/AnnotatorWithWordEmbeddings.scala rename to src/main/scala/com/johnsnowlabs/nlp/embeddings/ApproachWithWordEmbeddings.scala index 6b69f24aa62ea3..e4234d6e8f6f51 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/embeddings/AnnotatorWithWordEmbeddings.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/embeddings/ApproachWithWordEmbeddings.scala @@ -4,7 +4,7 @@ import java.io.File import java.nio.file.Files import java.util.UUID -import com.johnsnowlabs.nlp.AnnotatorApproach +import com.johnsnowlabs.nlp.{AnnotatorApproach, AnnotatorModel, HasWordEmbeddings} import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.SparkContext import org.apache.spark.ml.param.{IntParam, Param} @@ -20,7 +20,7 @@ import org.apache.spark.sql.SparkSession * 3. Than this index file is spread across the cluster. * 4. Every model 'ModelWithWordEmbeddings' uses local RocksDB as Word Embeddings lookup. */ -abstract class AnnotatorWithWordEmbeddings[A <: AnnotatorWithWordEmbeddings[A, M], M <: ModelWithWordEmbeddings[M]] +abstract class ApproachWithWordEmbeddings[A <: ApproachWithWordEmbeddings[A, M], M <: AnnotatorModel[M] with HasWordEmbeddings] extends AnnotatorApproach[M] with AutoCloseable { val sourceEmbeddingsPath = new Param[String](this, "sourceEmbeddingsPath", "Word embeddings file") diff --git a/src/main/scala/com/johnsnowlabs/nlp/embeddings/EmbeddingsReadable.scala b/src/main/scala/com/johnsnowlabs/nlp/embeddings/EmbeddingsReadable.scala index 3049abac384572..2ddf01f72c11ce 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/embeddings/EmbeddingsReadable.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/embeddings/EmbeddingsReadable.scala @@ -1,9 +1,9 @@ package com.johnsnowlabs.nlp.embeddings -import com.johnsnowlabs.nlp.ParamsAndFeaturesReadable +import com.johnsnowlabs.nlp.{HasWordEmbeddings, ParamsAndFeaturesReadable} import org.apache.spark.sql.SparkSession -trait EmbeddingsReadable[T <: ModelWithWordEmbeddings[_]] extends ParamsAndFeaturesReadable[T] { +trait EmbeddingsReadable[T <: HasWordEmbeddings] extends ParamsAndFeaturesReadable[T] { override def onRead(instance: T, path: String, spark: SparkSession): Unit = { instance.deserializeEmbeddings(path, spark.sparkContext) } diff --git a/src/main/scala/com/johnsnowlabs/nlp/util/io/ResourceHelper.scala b/src/main/scala/com/johnsnowlabs/nlp/util/io/ResourceHelper.scala index 68c2a5f032c16a..740b19881a9398 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/util/io/ResourceHelper.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/util/io/ResourceHelper.scala @@ -28,7 +28,7 @@ import scala.util.Random */ object ResourceHelper { - private val spark: SparkSession = SparkSession.builder().getOrCreate() + val spark: SparkSession = SparkSession.builder().getOrCreate() /** Structure for a SourceStream coming from compiled content */ case class SourceStream(resource: String) { From f68ed2f2a4e3ae268521dd407f0e6d7932994ab4 Mon Sep 17 00:00:00 2001 From: Saif Addin Date: Wed, 24 Jan 2018 18:55:15 -0300 Subject: [PATCH 2/3] - Recursive Pipelines --- python/example/crf-ner/ner.ipynb | 126 ++++-------------- python/sparknlp/annotator.py | 5 +- python/sparknlp/base.py | 90 ++++++++++++- .../johnsnowlabs/nlp/AnnotatorApproach.scala | 12 +- .../johnsnowlabs/nlp/HasRecursiveFit.scala | 21 +++ .../johnsnowlabs/nlp/RecursivePipeline.scala | 58 ++++++++ .../annotators/ner/crf/NerCrfApproach.scala | 23 ++-- .../parser/dep/DependencyParser.scala | 3 +- .../pos/perceptron/PerceptronApproach.scala | 3 +- .../sda/vivekn/ViveknSentimentApproach.scala | 3 +- .../spell/norvig/NorvigSweetingApproach.scala | 3 +- .../nlp/annotators/LemmatizerTestSpec.scala | 19 +++ 12 files changed, 243 insertions(+), 123 deletions(-) create mode 100644 src/main/scala/com/johnsnowlabs/nlp/HasRecursiveFit.scala create mode 100644 src/main/scala/com/johnsnowlabs/nlp/RecursivePipeline.scala diff --git a/python/example/crf-ner/ner.ipynb b/python/example/crf-ner/ner.ipynb index bd89c8b00612db..a29bbbbb5b301c 100644 --- a/python/example/crf-ner/ner.ipynb +++ b/python/example/crf-ner/ner.ipynb @@ -2,7 +2,7 @@ "cells": [ { "cell_type": "code", - "execution_count": 1, + "execution_count": null, "metadata": { "collapsed": true }, @@ -16,14 +16,17 @@ "\n", "from sparknlp.annotator import *\n", "from sparknlp.common import *\n", - "from sparknlp.base import *" + "from sparknlp.base import *\n", + "\n", + "import time\n", + "import zipfile" ] }, { "cell_type": "code", - "execution_count": 2, + "execution_count": null, "metadata": { - "collapsed": false + "collapsed": true }, "outputs": [], "source": [ @@ -39,7 +42,7 @@ }, { "cell_type": "code", - "execution_count": 3, + "execution_count": null, "metadata": { "collapsed": true }, @@ -61,7 +64,7 @@ }, { "cell_type": "code", - "execution_count": 4, + "execution_count": null, "metadata": { "collapsed": true }, @@ -87,13 +90,12 @@ }, { "cell_type": "code", - "execution_count": 5, + "execution_count": null, "metadata": { - "collapsed": false + "collapsed": true }, "outputs": [], "source": [ - "import time\n", "\n", "documentAssembler = DocumentAssembler()\\\n", " .setInputCol(\"text\")\\\n", @@ -113,6 +115,7 @@ " .setInputCols([\"token\", \"document\"])\\\n", " .setOutputCol(\"pos\")\n", "\n", + "#.setEmbeddingsSource(\"glove.6B.100d.txt\", 100, 2)\\\n", "nerTagger = NerCrfApproach()\\\n", " .setInputCols([\"sentence\", \"token\", \"pos\"])\\\n", " .setLabelColumn(\"label\")\\\n", @@ -121,7 +124,6 @@ " .setMaxEpochs(20)\\\n", " .setLossEps(1e-3)\\\n", " .setDicts([\"ner-corpus/dict.txt\"])\\\n", - " .setEmbeddingsSource(\"glove.6B.100d.txt\", 100, 2)\\\n", " .setDatasetPath(\"eng.train\")\\\n", " .setL2(1)\\\n", " .setC0(1250000)\\\n", @@ -145,44 +147,9 @@ }, { "cell_type": "code", - "execution_count": 6, - "metadata": { - "collapsed": false - }, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "+------+---------+--------------------+\n", - "|itemid|sentiment| text|\n", - "+------+---------+--------------------+\n", - "| 1| 0| ...|\n", - "| 2| 0| ...|\n", - "| 3| 1| omg...|\n", - "| 4| 0| .. Omga...|\n", - "| 5| 0| i think ...|\n", - "| 6| 0| or i jus...|\n", - "| 7| 1| Juuuuuuuuu...|\n", - "| 8| 0| Sunny Agai...|\n", - "| 9| 1| handed in m...|\n", - "| 10| 1| hmmmm.... i...|\n", - "| 11| 0| I must thin...|\n", - "| 12| 1| thanks to a...|\n", - "| 13| 0| this weeken...|\n", - "| 14| 0| jb isnt show...|\n", - "| 15| 0| ok thats it ...|\n", - "| 16| 0| <-------- ...|\n", - "| 17| 0| awhhe man.......|\n", - "| 18| 1| Feeling stran...|\n", - "| 19| 0| HUGE roll of ...|\n", - "| 20| 0| I just cut my...|\n", - "+------+---------+--------------------+\n", - "only showing top 20 rows\n", - "\n" - ] - } - ], + "execution_count": null, + "metadata": {}, + "outputs": [], "source": [ "#Load the input data to be annotated\n", "data = spark. \\\n", @@ -196,67 +163,24 @@ }, { "cell_type": "code", - "execution_count": 7, + "execution_count": null, "metadata": { - "collapsed": false, "scrolled": false }, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Start fitting\n", - "Fitting is ended\n" - ] - } - ], + "outputs": [], "source": [ + "start = time.time()\n", "print(\"Start fitting\")\n", "model = pipeline.fit(data)\n", - "print(\"Fitting is ended\")" + "print(\"Fitting is ended\")\n", + "print (time.time() - start)" ] }, { "cell_type": "code", - "execution_count": 8, - "metadata": { - "collapsed": false - }, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "+------+---------+--------------------+--------------------+\n", - "|itemid|sentiment| text| finished_ner|\n", - "+------+---------+--------------------+--------------------+\n", - "| 1| 0| ...|word->is#result->...|\n", - "| 2| 0| ...|word->I#result->O...|\n", - "| 3| 1| omg...|word->omg#result-...|\n", - "| 4| 0| .. Omga...|word->Omgaga.#res...|\n", - "| 5| 0| i think ...|word->i#result->O...|\n", - "| 6| 0| or i jus...|word->or#result->...|\n", - "| 7| 1| Juuuuuuuuu...|word->Juuuuuuuuuu...|\n", - "| 8| 0| Sunny Agai...|word->Sunny#resul...|\n", - "| 9| 1| handed in m...|word->handed#resu...|\n", - "| 10| 1| hmmmm.... i...|word->i#result->O...|\n", - "| 11| 0| I must thin...|word->I#result->O...|\n", - "| 12| 1| thanks to a...|word->thanks#resu...|\n", - "| 13| 0| this weeken...|word->this#result...|\n", - "| 14| 0| jb isnt show...|word->jb#result->...|\n", - "| 15| 0| ok thats it ...|word->ok#result->...|\n", - "| 16| 0| <-------- ...|word->This#result...|\n", - "| 17| 0| awhhe man.......|word->awhhe#resul...|\n", - "| 18| 1| Feeling stran...|word->Feeling#res...|\n", - "| 19| 0| HUGE roll of ...|word->HUGE#result...|\n", - "| 20| 0| I just cut my...|word->I#result->O...|\n", - "+------+---------+--------------------+--------------------+\n", - "only showing top 20 rows\n", - "\n" - ] - } - ], + "execution_count": null, + "metadata": {}, + "outputs": [], "source": [ "ner_data = model.transform(data)\n", "ner_data.show()" @@ -266,7 +190,7 @@ "cell_type": "code", "execution_count": null, "metadata": { - "collapsed": false + "collapsed": true }, "outputs": [], "source": [ @@ -293,7 +217,7 @@ "metadata": { "anaconda-cloud": {}, "kernelspec": { - "display_name": "Python [default]", + "display_name": "Python 3", "language": "python", "name": "python3" }, diff --git a/python/sparknlp/annotator.py b/python/sparknlp/annotator.py index 2cbac3f4a72b4b..dfecd5b8c6ca58 100755 --- a/python/sparknlp/annotator.py +++ b/python/sparknlp/annotator.py @@ -7,6 +7,7 @@ from pyspark.ml.util import JavaMLReadable, JavaMLWritable from pyspark.ml.wrapper import JavaTransformer, JavaModel, JavaEstimator from pyspark.ml.param.shared import Param, Params, TypeConverters +from sparknlp.base import JavaRecursiveEstimator if sys.version_info[0] == 2: #Needed. Delete once DA becomes an annotator in 1.1.x @@ -130,6 +131,7 @@ def setPattern(self, value): def setLowercase(self, value): return self._set(lowercase=value) + class RegexMatcher(AnnotatorTransformer): strategy = Param(Params._dummy(), @@ -523,8 +525,7 @@ class NorvigSweetingModel(JavaModel, JavaMLWritable, JavaMLReadable, AnnotatorPr name = "NorvigSweetingModel" - -class NerCrfApproach(JavaEstimator, JavaMLWritable, JavaMLReadable, AnnotatorProperties, AnnotatorWithEmbeddings): +class NerCrfApproach(JavaRecursiveEstimator, JavaMLWritable, JavaMLReadable, AnnotatorProperties, AnnotatorWithEmbeddings): labelColumn = Param(Params._dummy(), "labelColumn", "Column with label per each token", diff --git a/python/sparknlp/base.py b/python/sparknlp/base.py index 00ad982e8ae27c..5400d79a706c3e 100644 --- a/python/sparknlp/base.py +++ b/python/sparknlp/base.py @@ -1,7 +1,95 @@ from pyspark import keyword_only from pyspark.ml.util import JavaMLReadable, JavaMLWritable -from pyspark.ml.wrapper import JavaTransformer +from pyspark.ml.wrapper import JavaTransformer, JavaEstimator from pyspark.ml.param.shared import Param, Params, TypeConverters +from pyspark.ml.pipeline import Pipeline, PipelineModel, Estimator, Transformer + + +class JavaRecursiveEstimator(JavaEstimator): + + def _fit_java(self, dataset, pipeline=None): + """ + Fits a Java model to the input dataset. + :param dataset: input dataset, which is an instance of + :py:class:`pyspark.sql.DataFrame` + :param params: additional params (overwriting embedded values) + :return: fitted Java model + """ + self._transfer_params_to_java() + if pipeline: + return self._java_obj.recursiveFit(dataset._jdf, pipeline._to_java()) + else: + return self._java_obj.fit(dataset._jdf) + + def _fit(self, dataset, pipeline=None): + java_model = self._fit_java(dataset, pipeline) + model = self._create_model(java_model) + return self._copyValues(model) + + def fit(self, dataset, params=None, pipeline=None): + """ + Fits a model to the input dataset with optional parameters. + :param dataset: input dataset, which is an instance of :py:class:`pyspark.sql.DataFrame` + :param params: an optional param map that overrides embedded params. If a list/tuple of + param maps is given, this calls fit on each param map and returns a list of + models. + :returns: fitted model(s) + """ + if params is None: + params = dict() + if isinstance(params, (list, tuple)): + models = [None] * len(params) + for index, model in self.fitMultiple(dataset, params): + models[index] = model + return models + elif isinstance(params, dict): + if params: + return self.copy(params)._fit(dataset, pipeline=pipeline) + else: + return self._fit(dataset, pipeline=pipeline) + else: + raise ValueError("Params must be either a param map or a list/tuple of param maps, " + "but got %s." % type(params)) + + +class RecursivePipeline(Pipeline, JavaEstimator): + @keyword_only + def __init__(self, *args, **kwargs): + super(RecursivePipeline, self).__init__(*args, **kwargs) + self._java_obj = self._new_java_obj("com.johnsnowlabs.nlp.RecursivePipeline", self.uid) + kwargs = self._input_kwargs + self.setParams(**kwargs) + + def _fit(self, dataset): + stages = self.getStages() + for stage in stages: + if not (isinstance(stage, Estimator) or isinstance(stage, Transformer)): + raise TypeError( + "Cannot recognize a pipeline stage of type %s." % type(stage)) + indexOfLastEstimator = -1 + for i, stage in enumerate(stages): + if isinstance(stage, Estimator): + indexOfLastEstimator = i + transformers = [] + for i, stage in enumerate(stages): + if isinstance(stage, Transformer): + transformers.append(stage) + dataset = stage.transform(dataset) + elif isinstance(stage, JavaRecursiveEstimator): + model = stage.fit(dataset, pipeline=PipelineModel(transformers)) + transformers.append(model) + if i < indexOfLastEstimator: + dataset = model.transform(dataset) + else: + model = stage.fit(dataset) + transformers.append(model) + if i < indexOfLastEstimator: + dataset = model.transform(dataset) + if i <= indexOfLastEstimator: + pass + else: + transformers.append(stage) + return PipelineModel(transformers) class DocumentAssembler(JavaTransformer, JavaMLReadable, JavaMLWritable): diff --git a/src/main/scala/com/johnsnowlabs/nlp/AnnotatorApproach.scala b/src/main/scala/com/johnsnowlabs/nlp/AnnotatorApproach.scala index 0d83d6f6370da4..7d59a515eee2e8 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/AnnotatorApproach.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/AnnotatorApproach.scala @@ -1,7 +1,7 @@ package com.johnsnowlabs.nlp import org.apache.spark.ml.param.ParamMap -import org.apache.spark.ml.{Estimator, Model} +import org.apache.spark.ml.{Estimator, Model, PipelineModel} import org.apache.spark.sql.{Dataset, SparkSession} import org.apache.spark.sql.types.{ArrayType, MetadataBuilder, StructField, StructType} import org.apache.spark.ml.util.DefaultParamsWritable @@ -15,14 +15,14 @@ import org.apache.spark.ml.util.DefaultParamsWritable */ abstract class AnnotatorApproach[M <: Model[M]] extends Estimator[M] - with HasInputAnnotationCols - with HasOutputAnnotationCol - with HasAnnotatorType - with DefaultParamsWritable { + with HasInputAnnotationCols + with HasOutputAnnotationCol + with HasAnnotatorType + with DefaultParamsWritable { val description: String - def train(dataset: Dataset[_]): M + def train(dataset: Dataset[_], recursivePipeline: Option[PipelineModel] = None): M def beforeTraining(spark: SparkSession): Unit = {} diff --git a/src/main/scala/com/johnsnowlabs/nlp/HasRecursiveFit.scala b/src/main/scala/com/johnsnowlabs/nlp/HasRecursiveFit.scala new file mode 100644 index 00000000000000..a0c16de230ce95 --- /dev/null +++ b/src/main/scala/com/johnsnowlabs/nlp/HasRecursiveFit.scala @@ -0,0 +1,21 @@ +package com.johnsnowlabs.nlp + +import org.apache.spark.ml.{Model, PipelineModel} +import org.apache.spark.sql.Dataset + +/** AnnotatorApproach'es may extend this trait in order to allow + * RecursivePipelines to include intermediate + * steps trained PipelineModel's + * */ +trait HasRecursiveFit[M <: Model[M]] { + + this: AnnotatorApproach[M] => + + final def recursiveFit(dataset: Dataset[_], recursivePipeline: PipelineModel): M = { + beforeTraining(dataset.sparkSession) + val model = copyValues(train(dataset, Some(recursivePipeline)).setParent(this)) + onTrained(model, dataset.sparkSession) + model + } + +} diff --git a/src/main/scala/com/johnsnowlabs/nlp/RecursivePipeline.scala b/src/main/scala/com/johnsnowlabs/nlp/RecursivePipeline.scala new file mode 100644 index 00000000000000..3ef5a4570ea5d7 --- /dev/null +++ b/src/main/scala/com/johnsnowlabs/nlp/RecursivePipeline.scala @@ -0,0 +1,58 @@ +package com.johnsnowlabs.nlp + +import org.apache.spark.ml.util.Identifiable +import org.apache.spark.ml.{Estimator, Pipeline, PipelineModel, Transformer} +import org.apache.spark.sql.Dataset + +import scala.collection.mutable.ListBuffer + +class RecursivePipeline(override val uid: String) extends Pipeline { + + def this() = this(Identifiable.randomUID("RECURSIVE_PIPELINE")) + + /**Workaround to PipelineModel being private in Spark*/ + private def createPipeline(dataset: Dataset[_], uid: String, transformers: Array[Transformer]) = { + new Pipeline().setStages(transformers).fit(dataset) + } + + + /** Has to behave as of spark 2.x.x */ + override def fit(dataset: Dataset[_]): PipelineModel = { + transformSchema(dataset.schema, logging = true) + val theStages = $(stages) + var indexOfLastEstimator = -1 + theStages.view.zipWithIndex.foreach { case (stage, index) => + stage match { + case _: Estimator[_] => + indexOfLastEstimator = index + case _ => + } + } + var curDataset = dataset + val transformers = ListBuffer.empty[Transformer] + theStages.view.zipWithIndex.foreach { case (stage, index) => + if (index <= indexOfLastEstimator) { + val transformer = stage match { + case estimator: HasRecursiveFit[_] => + estimator.recursiveFit(curDataset, createPipeline(curDataset, uid, transformers.toArray)) + case estimator: Estimator[_] => + estimator.fit(curDataset) + case t: Transformer => + t + case _ => + throw new IllegalArgumentException( + s"Does not support stage $stage of type ${stage.getClass}") + } + if (index < indexOfLastEstimator) { + curDataset = transformer.transform(curDataset) + } + transformers += transformer + } else { + transformers += stage.asInstanceOf[Transformer] + } + } + + createPipeline(curDataset, uid, transformers.toArray).setParent(this) + } + +} diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfApproach.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfApproach.scala index 72584b03ebb3ac..7c320d38aaaa79 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfApproach.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfApproach.scala @@ -1,7 +1,7 @@ package com.johnsnowlabs.nlp.annotators.ner.crf import com.johnsnowlabs.ml.crf.{CrfParams, LinearChainCrf, TextSentenceLabels, Verbose} -import com.johnsnowlabs.nlp.{AnnotatorType, DocumentAssembler} +import com.johnsnowlabs.nlp.{AnnotatorType, DocumentAssembler, HasRecursiveFit, RecursivePipeline} import com.johnsnowlabs.nlp.AnnotatorType.{DOCUMENT, NAMED_ENTITY, POS, TOKEN} import com.johnsnowlabs.nlp.annotators.RegexTokenizer import com.johnsnowlabs.nlp.annotators.common.Annotated.PosTaggedSentence @@ -10,7 +10,7 @@ import com.johnsnowlabs.nlp.annotators.pos.perceptron.PerceptronApproach import com.johnsnowlabs.nlp.annotators.sbd.pragmatic.SentenceDetectorModel import com.johnsnowlabs.nlp.datasets.CoNLL import com.johnsnowlabs.nlp.embeddings.ApproachWithWordEmbeddings -import org.apache.spark.ml.Pipeline +import org.apache.spark.ml.{Pipeline, PipelineModel} import org.apache.spark.ml.param.{DoubleParam, IntParam, Param, StringArrayParam} import org.apache.spark.ml.util.{DefaultParamsReadable, Identifiable} import org.apache.spark.sql.{DataFrame, Dataset} @@ -19,7 +19,7 @@ import org.apache.spark.sql.{DataFrame, Dataset} Algorithm for training Named Entity Recognition Model. */ class NerCrfApproach(override val uid: String) - extends ApproachWithWordEmbeddings[NerCrfApproach, NerCrfModel] { + extends ApproachWithWordEmbeddings[NerCrfApproach, NerCrfModel] with HasRecursiveFit[NerCrfModel] { def this() = this(Identifiable.randomUID("NER")) @@ -73,11 +73,18 @@ class NerCrfApproach(override val uid: String) ) - private def getTrainDataframe(dataset: Dataset[_]): DataFrame = { + private def getTrainDataframe(dataset: Dataset[_], recursivePipeline: Option[PipelineModel]): DataFrame = { if (!isDefined(datasetPath)) return dataset.toDF() + val reader = CoNLL(3, AnnotatorType.NAMED_ENTITY) + val dataframe = reader.readDataset($(datasetPath), dataset.sparkSession).toDF + + if (recursivePipeline.isDefined) { + return recursivePipeline.get.transform(dataframe) + } + val documentAssembler = new DocumentAssembler() .setInputCol("text") .setOutputCol("document") @@ -93,7 +100,7 @@ class NerCrfApproach(override val uid: String) val posTagger = new PerceptronApproach() .setCorpusPath("anc-pos-corpus/") - .setNIterations(10) + .setNIterations(5) .setInputCols("token", "document") .setOutputCol("pos") @@ -105,15 +112,13 @@ class NerCrfApproach(override val uid: String) posTagger) ) - val reader = CoNLL(3, AnnotatorType.NAMED_ENTITY) - val dataframe = reader.readDataset($(datasetPath), dataset.sparkSession).toDF pipeline.fit(dataframe).transform(dataframe) } - override def train(dataset: Dataset[_]): NerCrfModel = { + override def train(dataset: Dataset[_], recursivePipeline: Option[PipelineModel]): NerCrfModel = { - val rows = getTrainDataframe(dataset) + val rows = getTrainDataframe(dataset, recursivePipeline) val trainDataset: Array[(TextSentenceLabels, PosTaggedSentence)] = NerTagged.collectTrainingInstances(rows, getInputCols, $(labelColumn)) diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/parser/dep/DependencyParser.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/parser/dep/DependencyParser.scala index e44eef674fe2ba..29ef30f1fb03be 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/parser/dep/DependencyParser.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/parser/dep/DependencyParser.scala @@ -2,6 +2,7 @@ package com.johnsnowlabs.nlp.annotators.parser.dep import com.johnsnowlabs.nlp.AnnotatorApproach import com.johnsnowlabs.nlp.AnnotatorType._ +import org.apache.spark.ml.PipelineModel import org.apache.spark.ml.param.Param import org.apache.spark.ml.util.{DefaultParamsReadable, Identifiable} import org.apache.spark.sql.Dataset @@ -19,7 +20,7 @@ class DependencyParser(override val uid: String) extends AnnotatorApproach[Depen override val requiredAnnotatorTypes = Array(DOCUMENT, POS, TOKEN) - override def train(dataset: Dataset[_]): DependencyParserModel = { + override def train(dataset: Dataset[_], recursivePipeline: Option[PipelineModel]): DependencyParserModel = { new DependencyParserModel() .setSourcePath($(sourcePath)) } diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/pos/perceptron/PerceptronApproach.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/pos/perceptron/PerceptronApproach.scala index 478ac71e7a1666..856ac87b967a5f 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/pos/perceptron/PerceptronApproach.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/pos/perceptron/PerceptronApproach.scala @@ -4,6 +4,7 @@ import com.johnsnowlabs.nlp.AnnotatorApproach import com.johnsnowlabs.nlp.annotators.common.{TaggedSentence, TaggedWord} import com.johnsnowlabs.nlp.util.io.ResourceHelper import com.typesafe.config.{Config, ConfigFactory} +import org.apache.spark.ml.PipelineModel import org.apache.spark.ml.param.{IntParam, Param} import org.apache.spark.ml.util.{DefaultParamsReadable, Identifiable} import org.apache.spark.sql.Dataset @@ -87,7 +88,7 @@ class PerceptronApproach(override val uid: String) extends AnnotatorApproach[Per * * @return A trained averaged model */ - override def train(dataset: Dataset[_]): PerceptronModel = { + override def train(dataset: Dataset[_], recursivePipeline: Option[PipelineModel]): PerceptronModel = { /** * Generates TagBook, which holds all the word to tags mapping that are not ambiguous */ diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/sda/vivekn/ViveknSentimentApproach.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/sda/vivekn/ViveknSentimentApproach.scala index 1f1d6b484b13b4..d332f568d68d29 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/sda/vivekn/ViveknSentimentApproach.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/sda/vivekn/ViveknSentimentApproach.scala @@ -2,6 +2,7 @@ package com.johnsnowlabs.nlp.annotators.sda.vivekn import com.johnsnowlabs.nlp.AnnotatorApproach import com.johnsnowlabs.nlp.util.io.ResourceHelper +import org.apache.spark.ml.PipelineModel import org.apache.spark.ml.param.{IntParam, Param} import org.apache.spark.ml.util.{DefaultParamsReadable, Identifiable} import org.apache.spark.sql.Dataset @@ -44,7 +45,7 @@ class ViveknSentimentApproach(override val uid: String) def setTokenPattern(value: String): this.type = set(tokenPattern, value) - override def train(dataset: Dataset[_]): ViveknSentimentModel = { + override def train(dataset: Dataset[_], recursivePipeline: Option[PipelineModel]): ViveknSentimentModel = { val fromPositive: (MMap[String, Int], MMap[String, Int]) = ResourceHelper.ViveknWordCount( source=$(positiveSourcePath), diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/spell/norvig/NorvigSweetingApproach.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/spell/norvig/NorvigSweetingApproach.scala index 5925349ebd0c6e..7bf1587861d77a 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/spell/norvig/NorvigSweetingApproach.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/spell/norvig/NorvigSweetingApproach.scala @@ -2,6 +2,7 @@ package com.johnsnowlabs.nlp.annotators.spell.norvig import com.johnsnowlabs.nlp.AnnotatorApproach import com.johnsnowlabs.nlp.util.io.ResourceHelper +import org.apache.spark.ml.PipelineModel import org.apache.spark.ml.param.Param import org.apache.spark.ml.util.{DefaultParamsReadable, Identifiable} import org.apache.spark.sql.Dataset @@ -45,7 +46,7 @@ class NorvigSweetingApproach(override val uid: String) def setTokenPattern(value: String): this.type = set(tokenPattern, value) - override def train(dataset: Dataset[_]): NorvigSweetingModel = { + override def train(dataset: Dataset[_], recursivePipeline: Option[PipelineModel]): NorvigSweetingModel = { val loadWords = ResourceHelper.wordCount($(dictPath), $(corpusFormat).toUpperCase, $(tokenPattern)) val corpusWordCount = if (get(corpusPath).isDefined) { diff --git a/src/test/scala/com/johnsnowlabs/nlp/annotators/LemmatizerTestSpec.scala b/src/test/scala/com/johnsnowlabs/nlp/annotators/LemmatizerTestSpec.scala index 0ae442959f895c..0ed89436902aa4 100644 --- a/src/test/scala/com/johnsnowlabs/nlp/annotators/LemmatizerTestSpec.scala +++ b/src/test/scala/com/johnsnowlabs/nlp/annotators/LemmatizerTestSpec.scala @@ -12,6 +12,8 @@ import org.scalatest._ */ class LemmatizerTestSpec extends FlatSpec with LemmatizerBehaviors { + SparkAccessor.spark + val lemmatizer = new Lemmatizer "a lemmatizer" should s"be of type ${AnnotatorType.TOKEN}" in { assert(lemmatizer.annotatorType == AnnotatorType.TOKEN) @@ -66,14 +68,31 @@ class LemmatizerTestSpec extends FlatSpec with LemmatizerBehaviors { finisher )) + val recursivePipeline = new RecursivePipeline() + .setStages(Array( + documentAssembler, + sentenceDetector, + tokenizer, + lemmatizer, + finisher + )) + val model = pipeline.fit(data) model.transform(data).show() val PIPE_PATH = "./tmp_pipeline" + model.write.overwrite().save(PIPE_PATH) val loadedPipeline = PipelineModel.read.load(PIPE_PATH) loadedPipeline.transform(data).show + val recursiveModel = recursivePipeline.fit(data) + recursiveModel.transform(data).show() + + recursiveModel.write.overwrite().save(PIPE_PATH) + val loadedRecPipeline = PipelineModel.read.load(PIPE_PATH) + loadedRecPipeline.transform(data).show + succeed } From 14bd4e72cedfcee8ed174ccc11bb45e2d39040ec Mon Sep 17 00:00:00 2001 From: Saif Addin Date: Thu, 25 Jan 2018 13:04:16 -0300 Subject: [PATCH 3/3] - Updated crf tests to use recursive pipeline - Added warning logs into not using recursive pipeline in ner --- src/main/resources/log4j.properties | 8 ++++++-- .../nlp/annotators/ner/crf/NerCrfApproach.scala | 6 ++++++ .../com/johnsnowlabs/ml/crf/CoNLL2003PipelineTest.scala | 6 +++--- 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/src/main/resources/log4j.properties b/src/main/resources/log4j.properties index cea9c52085e453..26346385cc775b 100644 --- a/src/main/resources/log4j.properties +++ b/src/main/resources/log4j.properties @@ -1,7 +1,11 @@ -log4j.rootLogger=ERROR, STDOUT +log4j.rootLogger=WARNING, STDOUT log4j.appender.STDOUT=org.apache.log4j.ConsoleAppender log4j.appender.STDOUT.layout=org.apache.log4j.PatternLayout log4j.appender.STDOUT.layout.ConversionPattern=[%5p] %m%n log4j.logger.AnnotatorLogger=WARNING -log4j.logger.CRF=INFO \ No newline at end of file +log4j.logger.RuleFactory=WARNING +log4j.logger.PerceptronTraining=WARNING +log4j.logger.PragmaticScorer=WARNING +log4j.logger.NorvigApproach=WARNING +log4j.logger.CRF=WARNING \ No newline at end of file diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfApproach.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfApproach.scala index 7c320d38aaaa79..795ccf0b4e76f3 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfApproach.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfApproach.scala @@ -14,6 +14,7 @@ import org.apache.spark.ml.{Pipeline, PipelineModel} import org.apache.spark.ml.param.{DoubleParam, IntParam, Param, StringArrayParam} import org.apache.spark.ml.util.{DefaultParamsReadable, Identifiable} import org.apache.spark.sql.{DataFrame, Dataset} +import org.slf4j.LoggerFactory /* Algorithm for training Named Entity Recognition Model. @@ -23,6 +24,8 @@ class NerCrfApproach(override val uid: String) def this() = this(Identifiable.randomUID("NER")) + private val logger = LoggerFactory.getLogger("NorvigApproach") + override val description = "CRF based Named Entity Recognition Tagger" override val requiredAnnotatorTypes = Array(DOCUMENT, TOKEN, POS) override val annotatorType = NAMED_ENTITY @@ -85,6 +88,9 @@ class NerCrfApproach(override val uid: String) return recursivePipeline.get.transform(dataframe) } + logger.warn("NER CRF not in a RecursivePipeline." + + "It is recommended to use a com.jonsnowlabs.nlp.RecursivePipeline for" + + "better performance during training") val documentAssembler = new DocumentAssembler() .setInputCol("text") .setOutputCol("document") diff --git a/src/test/scala/com/johnsnowlabs/ml/crf/CoNLL2003PipelineTest.scala b/src/test/scala/com/johnsnowlabs/ml/crf/CoNLL2003PipelineTest.scala index 665d936b20405d..5d2c5a2fe0a561 100644 --- a/src/test/scala/com/johnsnowlabs/ml/crf/CoNLL2003PipelineTest.scala +++ b/src/test/scala/com/johnsnowlabs/ml/crf/CoNLL2003PipelineTest.scala @@ -9,7 +9,7 @@ import com.johnsnowlabs.nlp.annotators.pos.perceptron.PerceptronApproach import com.johnsnowlabs.nlp.annotators.sbd.pragmatic.SentenceDetectorModel import com.johnsnowlabs.nlp.datasets.CoNLL import com.johnsnowlabs.nlp.embeddings.WordEmbeddingsFormat -import org.apache.spark.ml.{Pipeline, PipelineModel, PipelineStage} +import org.apache.spark.ml.{PipelineModel, PipelineStage} import org.apache.spark.sql.DataFrame import scala.collection.mutable @@ -76,7 +76,7 @@ object CoNLL2003PipelineTest extends App { val stages = getPosStages() - val pipeline = new Pipeline() + val pipeline = new RecursivePipeline() .setStages(stages) pipeline.fit(dataset) @@ -92,7 +92,7 @@ object CoNLL2003PipelineTest extends App { val stages = getNerStages() - val pipeline = new Pipeline() + val pipeline = new RecursivePipeline() .setStages(stages) pipeline.fit(dataset)