From 7f809029309ef615bbc50165be961565d0a089ac Mon Sep 17 00:00:00 2001 From: aleksei Date: Tue, 26 Dec 2017 14:39:27 +0300 Subject: [PATCH 01/12] Kryo serialization for ViveknSentimentModel --- .../nlp/annotators/ner/crf/NerCrfModel.scala | 49 ++++---------- .../sda/vivekn/ViveknSentimentModel.scala | 67 +++++++++++++++---- .../serialization/SerializationHelper.scala | 63 +++++++++++++++++ 3 files changed, 129 insertions(+), 50 deletions(-) create mode 100644 src/main/scala/com/johnsnowlabs/nlp/serialization/SerializationHelper.scala diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala index e8462ebdfe3f2c..e82b7c0864e48f 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala @@ -4,11 +4,10 @@ import com.johnsnowlabs.ml.crf.{LinearChainCrfModel, SerializedLinearChainCrfMod import com.johnsnowlabs.nlp.AnnotatorType._ import com.johnsnowlabs.nlp.annotators.common.{IndexedTaggedWord, NerTagged, PosTagged, TaggedSentence} import com.johnsnowlabs.nlp.annotators.common.Annotated.{NerTaggedSentence, PosTaggedSentence} +import com.johnsnowlabs.nlp.serialization.SerializationHelper import com.johnsnowlabs.nlp.{Annotation, AnnotatorModel} -import org.apache.hadoop.fs.Path import org.apache.spark.ml.param.StringArrayParam import org.apache.spark.ml.util._ -import org.apache.spark.sql.{Encoders, Row} /* @@ -84,38 +83,21 @@ class NerCrfModel(override val uid: String) } object NerCrfModel extends DefaultParamsReadable[NerCrfModel] { - implicit val crfEncoder = Encoders.kryo[SerializedLinearChainCrfModel] - override def read: MLReader[NerCrfModel] = new NerCrfModelReader(super.read) + private val crfModelKey = "crfModel" + private val dictionaryFeaturesKey = "dictionaryFeatures" + class NerCrfModelReader(baseReader: MLReader[NerCrfModel]) extends MLReader[NerCrfModel] { override def load(path: String): NerCrfModel = { + val helper = SerializationHelper(sparkSession, path) val instance = baseReader.load(path) - val dataPath = new Path(path, "data").toString - val loaded = sparkSession.sqlContext.read.format("parquet").load(dataPath) - val crfModel = loaded.as[SerializedLinearChainCrfModel].head - - val dictPath = new Path(path, "dict").toString - val dictLoaded = sparkSession.sqlContext.read.format("parquet") - .load(dictPath) - .collect - .head + val crfModel = helper.deserializeScalar[SerializedLinearChainCrfModel](crfModelKey) + instance.model = crfModel.map(m => m.deserialize) - val lines = dictLoaded.asInstanceOf[Row].getAs[Seq[String]](0) - - val dict = lines - .map {line => - val items = line.split(":") - (items(0), items(1)) - } - .toMap - - val dictFeatures = new DictionaryFeatures(dict) - - instance - .setModel(crfModel.deserialize) - .setDictionaryFeatures(dictFeatures) + val map = helper.deserializeMap[String, String](dictionaryFeaturesKey) + instance.setDictionaryFeatures(new DictionaryFeatures(map)) } } @@ -125,17 +107,10 @@ object NerCrfModel extends DefaultParamsReadable[NerCrfModel] { require(model.model.isDefined, "Crf Model must be defined before serialization") baseWriter.save(path) + val helper = SerializationHelper(sparkSession, path) - val spark = sparkSession - import spark.sqlContext.implicits._ - - val toStore = model.model.get.serialize - val dataPath = new Path(path, "data").toString - Seq(toStore).toDS.write.mode("overwrite").parquet(dataPath) - - val dictPath = new Path(path, "dict").toString - val dictLines = model.dictionaryFeatures.dict.toSeq.map(p => p._1 + ":" + p._2) - Seq(dictLines).toDS.write.mode("overwrite").parquet(dictPath) + helper.serializeScalar(crfModelKey, model.model.get.serialize) + helper.serializeMap(dictionaryFeaturesKey, model.dictionaryFeatures.dict) } } } diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/sda/vivekn/ViveknSentimentModel.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/sda/vivekn/ViveknSentimentModel.scala index 213f8e9017a1e0..4cdbd0b47d0d5e 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/sda/vivekn/ViveknSentimentModel.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/sda/vivekn/ViveknSentimentModel.scala @@ -1,10 +1,11 @@ package com.johnsnowlabs.nlp.annotators.sda.vivekn -import com.johnsnowlabs.nlp.annotators.common.{IntStringMapParam, Tokenized, TokenizedSentence} +import com.johnsnowlabs.nlp.annotators.common.{Tokenized, TokenizedSentence} +import com.johnsnowlabs.nlp.serialization.SerializationHelper import com.johnsnowlabs.nlp.{Annotation, AnnotatorModel} import com.typesafe.config.{Config, ConfigFactory} -import org.apache.spark.ml.param.{IntParam, StringArrayParam} -import org.apache.spark.ml.util.{DefaultParamsReadable, Identifiable} +import org.apache.spark.ml.param.IntParam +import org.apache.spark.ml.util.{DefaultParamsReadable, Identifiable, MLReader, MLWriter} class ViveknSentimentModel(override val uid: String) extends AnnotatorModel[ViveknSentimentModel] { @@ -19,16 +20,17 @@ class ViveknSentimentModel(override val uid: String) extends AnnotatorModel[Vive override val requiredAnnotatorTypes: Array[AnnotatorType] = Array(TOKEN, DOCUMENT) - protected val positive: IntStringMapParam = new IntStringMapParam(this, "positive_sentences", "positive sentences trained") - protected val negative: IntStringMapParam = new IntStringMapParam(this, "negative_sentences", "negative sentences trained") - protected val features: StringArrayParam = new StringArrayParam(this, "words", "unique words trained") + protected var positive = Map[String, Int]() + protected var negative = Map[String, Int]() + protected var features = Array[String]() + protected val positiveTotals: IntParam = new IntParam(this, "positive_totals", "count of positive words") protected val negativeTotals: IntParam = new IntParam(this, "negative_totals", "count of negative words") def this() = this(Identifiable.randomUID("VIVEKN")) - private[vivekn] def setPositive(value: Map[String, Int]) = set(positive, value) - private[vivekn] def setNegative(value: Map[String, Int]) = set(negative, value) + private[vivekn] def setPositive(value: Map[String, Int]) = {positive = value; this} + private[vivekn] def setNegative(value: Map[String, Int]) = {negative = value; this} private[vivekn] def setPositiveTotals(value: Int) = set(positiveTotals, value) private[vivekn] def setNegativeTotals(value: Int) = set(negativeTotals, value) private[vivekn] def setWords(value: Array[String]) = { @@ -44,14 +46,16 @@ class ViveknSentimentModel(override val uid: String) extends AnnotatorModel[Vive Range(start, afterStart, step).foreach(k => { value.slice(k, k+step).foreach(currentFeatures.add) }) - set(features, currentFeatures.toArray) + + features = currentFeatures.toArray + this } def classify(sentence: TokenizedSentence): Boolean = { - val words = ViveknSentimentApproach.negateSequence(sentence.tokens.toList).intersect($(features)).distinct + val words = ViveknSentimentApproach.negateSequence(sentence.tokens.toList).intersect(features).distinct if (words.isEmpty) return true - val positiveProbability = words.map(word => scala.math.log(($(positive).getOrElse(word, 0) + 1.0) / (2.0 * $(positiveTotals)))).sum - val negativeProbability = words.map(word => scala.math.log(($(negative).getOrElse(word, 0) + 1.0) / (2.0 * $(negativeTotals)))).sum + val positiveProbability = words.map(word => scala.math.log((positive.getOrElse(word, 0) + 1.0) / (2.0 * $(positiveTotals)))).sum + val negativeProbability = words.map(word => scala.math.log((negative.getOrElse(word, 0) + 1.0) / (2.0 * $(negativeTotals)))).sum positiveProbability > negativeProbability } @@ -76,6 +80,43 @@ class ViveknSentimentModel(override val uid: String) extends AnnotatorModel[Vive ) }) } + + override def write: MLWriter = new ViveknSentimentModel.Writer(this, super.write) } -object ViveknSentimentModel extends DefaultParamsReadable[ViveknSentimentModel] \ No newline at end of file +object ViveknSentimentModel extends DefaultParamsReadable[ViveknSentimentModel] { + override def read = new Reader(super.read) + + private val positiveKey = "positive" + private val negativeKey = "negative" + private val featuresKey = "features" + + class Reader(baseReader: MLReader[ViveknSentimentModel]) extends MLReader[ViveknSentimentModel] { + + override def load(path: String): ViveknSentimentModel = { + val helper = SerializationHelper(sparkSession, path) + val instance = baseReader.load(path) + + val positive = helper.deserializeMap[String, Int](positiveKey) + val negative = helper.deserializeMap[String, Int](negativeKey) + val features = helper.deserializeArray[String](featuresKey) + + instance.features = features + instance + .setNegative(negative) + .setPositive(positive) + } + } + + class Writer(model: ViveknSentimentModel, baseWriter: MLWriter) extends MLWriter { + + override protected def saveImpl(path: String): Unit = { + baseWriter.save(path) + val helper = SerializationHelper(sparkSession, path) + + helper.serializeMap[String, Int](positiveKey, model.positive) + helper.serializeMap[String, Int](negativeKey, model.negative) + helper.serializeArray[String](featuresKey, model.features) + } + } +} \ No newline at end of file diff --git a/src/main/scala/com/johnsnowlabs/nlp/serialization/SerializationHelper.scala b/src/main/scala/com/johnsnowlabs/nlp/serialization/SerializationHelper.scala new file mode 100644 index 00000000000000..b1d08bb4426fac --- /dev/null +++ b/src/main/scala/com/johnsnowlabs/nlp/serialization/SerializationHelper.scala @@ -0,0 +1,63 @@ +package com.johnsnowlabs.nlp.serialization + +import org.apache.hadoop.fs.Path +import org.apache.spark.sql.{Encoders, SparkSession} + +import scala.reflect.ClassTag + + +case class SerializationHelper(spark: SparkSession, path: String) { + import spark.sqlContext.implicits._ + + private def getFieldPath(field: String) = + Path.mergePaths(new Path(path), new Path("/fields/" + field)).toString + + + def serializeScalar[TValue: ClassTag](field: String, value: TValue): Unit = { + implicit val encoder = Encoders.kryo[TValue] + + val dataPath = getFieldPath(field) + Seq(value).toDS.write.mode("overwrite").parquet(dataPath) + } + + def deserializeScalar[TValue: ClassTag](field: String): Option[TValue] = { + implicit val encoder = Encoders.kryo[TValue] + + val dataPath = getFieldPath(field) + val loaded = spark.sqlContext.read.format("parquet").load(dataPath) + loaded.as[TValue].collect.headOption + } + + def serializeArray[TValue: ClassTag](field: String, value: Array[TValue]): Unit = { + implicit val encoder = Encoders.kryo[TValue] + + val dataPath = getFieldPath(field) + value.toSeq.toDS.write.mode("overwrite").parquet(dataPath) + } + + def deserializeArray[TValue: ClassTag](field: String): Array[TValue] = { + implicit val encoder = Encoders.kryo[TValue] + + val dataPath = getFieldPath(field) + val loaded = spark.sqlContext.read.format("parquet").load(dataPath) + loaded.as[TValue].collect + } + + def serializeMap[TKey: ClassTag, TValue: ClassTag](field: String, value: Map[TKey, TValue]): Unit = { + implicit val valueEncoder = Encoders.kryo[Map[TKey, TValue]] + + val dataPath = getFieldPath(field) + Seq(value).toDF().write.mode("overwrite").parquet(dataPath) + } + + def deserializeMap[TKey: ClassTag, TValue: ClassTag](field: String): Map[TKey, TValue] = { + implicit val valueEncoder = Encoders.kryo[Map[TKey, TValue]] + + val dataPath = getFieldPath(field) + val loaded = spark.sqlContext.read.format("parquet").load(dataPath) + loaded.as[Map[TKey, TValue]] + .collect + .headOption + .getOrElse(Map[TKey, TValue]()) + } +} From f1ef6fbffb2e9eda10665d7c8ed911a1126eb67a Mon Sep 17 00:00:00 2001 From: aleksei Date: Thu, 28 Dec 2017 13:41:53 +0300 Subject: [PATCH 02/12] Check if file exists before reading --- .../serialization/SerializationHelper.scala | 44 ++++++++++++------- 1 file changed, 29 insertions(+), 15 deletions(-) diff --git a/src/main/scala/com/johnsnowlabs/nlp/serialization/SerializationHelper.scala b/src/main/scala/com/johnsnowlabs/nlp/serialization/SerializationHelper.scala index b1d08bb4426fac..0c04b783fcb558 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/serialization/SerializationHelper.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/serialization/SerializationHelper.scala @@ -1,6 +1,6 @@ package com.johnsnowlabs.nlp.serialization -import org.apache.hadoop.fs.Path +import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.spark.sql.{Encoders, SparkSession} import scala.reflect.ClassTag @@ -8,56 +8,70 @@ import scala.reflect.ClassTag case class SerializationHelper(spark: SparkSession, path: String) { import spark.sqlContext.implicits._ + val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration) - private def getFieldPath(field: String) = - Path.mergePaths(new Path(path), new Path("/fields/" + field)).toString + private def getFieldPath(field: String): Path = + Path.mergePaths(new Path(path), new Path("/fields/" + field)) def serializeScalar[TValue: ClassTag](field: String, value: TValue): Unit = { implicit val encoder = Encoders.kryo[TValue] val dataPath = getFieldPath(field) - Seq(value).toDS.write.mode("overwrite").parquet(dataPath) + Seq(value).toDS.write.mode("overwrite").parquet(dataPath.toString) } def deserializeScalar[TValue: ClassTag](field: String): Option[TValue] = { implicit val encoder = Encoders.kryo[TValue] val dataPath = getFieldPath(field) - val loaded = spark.sqlContext.read.format("parquet").load(dataPath) - loaded.as[TValue].collect.headOption + if (fs.exists(dataPath)) { + val loaded = spark.sqlContext.read.format("parquet").load(dataPath.toString) + loaded.as[TValue].collect.headOption + } else { + None + } } def serializeArray[TValue: ClassTag](field: String, value: Array[TValue]): Unit = { implicit val encoder = Encoders.kryo[TValue] val dataPath = getFieldPath(field) - value.toSeq.toDS.write.mode("overwrite").parquet(dataPath) + value.toSeq.toDS.write.mode("overwrite").parquet(dataPath.toString) } def deserializeArray[TValue: ClassTag](field: String): Array[TValue] = { implicit val encoder = Encoders.kryo[TValue] val dataPath = getFieldPath(field) - val loaded = spark.sqlContext.read.format("parquet").load(dataPath) - loaded.as[TValue].collect + if (fs.exists(dataPath)) { + val loaded = spark.sqlContext.read.format("parquet").load(dataPath.toString) + loaded.as[TValue].collect + } else { + Array.empty + } } def serializeMap[TKey: ClassTag, TValue: ClassTag](field: String, value: Map[TKey, TValue]): Unit = { implicit val valueEncoder = Encoders.kryo[Map[TKey, TValue]] val dataPath = getFieldPath(field) - Seq(value).toDF().write.mode("overwrite").parquet(dataPath) + Seq(value).toDF().write.mode("overwrite").parquet(dataPath.toString) } def deserializeMap[TKey: ClassTag, TValue: ClassTag](field: String): Map[TKey, TValue] = { implicit val valueEncoder = Encoders.kryo[Map[TKey, TValue]] val dataPath = getFieldPath(field) - val loaded = spark.sqlContext.read.format("parquet").load(dataPath) - loaded.as[Map[TKey, TValue]] - .collect - .headOption - .getOrElse(Map[TKey, TValue]()) + + if (fs.exists(dataPath)) { + val loaded = spark.sqlContext.read.format("parquet").load(dataPath.toString) + loaded.as[Map[TKey, TValue]] + .collect + .headOption + .getOrElse(Map[TKey, TValue]()) + } else { + Map() + } } } From 96db778fbd64d4339a8b7b209c7688acaec59144 Mon Sep 17 00:00:00 2001 From: Saif Addin Date: Sat, 6 Jan 2018 00:30:33 -0300 Subject: [PATCH 03/12] - Features container for improved serialization --- .../example/vivekn-sentiment/sentiment.ipynb | 2 +- .../johnsnowlabs/nlp/AnnotatorApproach.scala | 2 +- .../com/johnsnowlabs/nlp/AnnotatorModel.scala | 2 +- .../com/johnsnowlabs/nlp/HasFeatures.scala | 35 ++++++ .../nlp/ParamsAndFeaturesReadable.scala | 21 ++++ .../nlp/ParamsAndFeaturesWritable.scala | 21 ++++ .../nlp/annotators/Lemmatizer.scala | 65 ++++++++++- .../nlp/annotators/ner/crf/NerCrfModel.scala | 67 +++-------- .../sda/vivekn/ViveknSentimentModel.scala | 80 ++++--------- .../spell/norvig/NorvigSweetingModel.scala | 6 +- .../nlp/serialization/Feature.scala | 110 ++++++++++++++++++ .../serialization/SerializationHelper.scala | 77 ------------ .../ner/crf/NerCrfApproachSpec.scala | 10 +- 13 files changed, 299 insertions(+), 199 deletions(-) create mode 100644 src/main/scala/com/johnsnowlabs/nlp/HasFeatures.scala create mode 100644 src/main/scala/com/johnsnowlabs/nlp/ParamsAndFeaturesReadable.scala create mode 100644 src/main/scala/com/johnsnowlabs/nlp/ParamsAndFeaturesWritable.scala create mode 100644 src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala delete mode 100644 src/main/scala/com/johnsnowlabs/nlp/serialization/SerializationHelper.scala diff --git a/python/example/vivekn-sentiment/sentiment.ipynb b/python/example/vivekn-sentiment/sentiment.ipynb index 22653401696249..502b7c8fe91fca 100644 --- a/python/example/vivekn-sentiment/sentiment.ipynb +++ b/python/example/vivekn-sentiment/sentiment.ipynb @@ -12,7 +12,7 @@ "import sys\n", "sys.path.append('../../')\n", "\n", - "from pyspark.ml import Pipeline\n", + "from pyspark.ml import Pipeline, PipelineModel\n", "from sparknlp.annotator import *\n", "from sparknlp.base import DocumentAssembler, Finisher\n" ] diff --git a/src/main/scala/com/johnsnowlabs/nlp/AnnotatorApproach.scala b/src/main/scala/com/johnsnowlabs/nlp/AnnotatorApproach.scala index c1df9007d55434..9d6448cb03ff92 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/AnnotatorApproach.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/AnnotatorApproach.scala @@ -18,7 +18,7 @@ abstract class AnnotatorApproach[M <: Model[M]] with HasInputAnnotationCols with HasOutputAnnotationCol with HasAnnotatorType - with DefaultParamsWritable { + with ParamsAndFeaturesWritable { val description: String diff --git a/src/main/scala/com/johnsnowlabs/nlp/AnnotatorModel.scala b/src/main/scala/com/johnsnowlabs/nlp/AnnotatorModel.scala index 51b288451bf50c..7dad564e48bd3c 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/AnnotatorModel.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/AnnotatorModel.scala @@ -15,7 +15,7 @@ import org.apache.spark.sql.functions.{array, udf} */ abstract class AnnotatorModel[M <: Model[M]] extends Model[M] - with DefaultParamsWritable + with ParamsAndFeaturesWritable with HasAnnotatorType with HasInputAnnotationCols with HasOutputAnnotationCol { diff --git a/src/main/scala/com/johnsnowlabs/nlp/HasFeatures.scala b/src/main/scala/com/johnsnowlabs/nlp/HasFeatures.scala new file mode 100644 index 00000000000000..e4606de9dd3705 --- /dev/null +++ b/src/main/scala/com/johnsnowlabs/nlp/HasFeatures.scala @@ -0,0 +1,35 @@ +package com.johnsnowlabs.nlp + +import com.johnsnowlabs.nlp.serialization.{ArrayFeature, Feature, MapFeature, StructFeature} + +import scala.collection.mutable.ArrayBuffer + +trait HasFeatures { + + val features: ArrayBuffer[Feature[_, _, _]] = ArrayBuffer.empty + + protected def set[T](feature: ArrayFeature[T], value: Array[T]): this.type = {feature.setValue(Some(value)); this} + + protected def set[K, V](feature: MapFeature[K, V], value: Map[K, V]): this.type = {feature.setValue(Some(value)); this} + + protected def set[T](feature: StructFeature[T], value: T): this.type = {feature.setValue(Some(value)); this} + + protected def setDefault[T](feature: ArrayFeature[T], value: Array[T]): this.type = {feature.setDefault(Some(value)); this} + + protected def setDefault[K, V](feature: MapFeature[K, V], value: Map[K, V]): this.type = {feature.setDefault(Some(value)); this} + + protected def setDefault[T](feature: StructFeature[T], value: T): this.type = {feature.setDefault(Some(value)); this} + + protected def get[T](feature: ArrayFeature[T]): Option[Array[T]] = feature.get + + protected def get[K, V](feature: MapFeature[K, V]): Option[Map[K, V]] = feature.get + + protected def get[T](feature: StructFeature[T]): Option[T] = feature.get + + protected def $$[T](feature: ArrayFeature[T]): Array[T] = feature.getValue + + protected def $$[K, V](feature: MapFeature[K, V]): Map[K, V] = feature.getValue + + protected def $$[T](feature: StructFeature[T]): T = feature.getValue + +} diff --git a/src/main/scala/com/johnsnowlabs/nlp/ParamsAndFeaturesReadable.scala b/src/main/scala/com/johnsnowlabs/nlp/ParamsAndFeaturesReadable.scala new file mode 100644 index 00000000000000..55fce96668c8c7 --- /dev/null +++ b/src/main/scala/com/johnsnowlabs/nlp/ParamsAndFeaturesReadable.scala @@ -0,0 +1,21 @@ +package com.johnsnowlabs.nlp + +import org.apache.spark.ml.util.{DefaultParamsReadable, MLReader} + +class FeaturesReader[T <: HasFeatures](baseReader: MLReader[T]) extends MLReader[T] { + + override def load(path: String): T = { + + val instance = baseReader.load(path) + + for (feature <- instance.features) { + val value = feature.deserialize(sparkSession, path, feature.name) + feature.setValue(value) + } + instance + } +} + +trait ParamsAndFeaturesReadable[T <: HasFeatures] extends DefaultParamsReadable[T] { + override def read: MLReader[T] = new FeaturesReader(super.read) +} diff --git a/src/main/scala/com/johnsnowlabs/nlp/ParamsAndFeaturesWritable.scala b/src/main/scala/com/johnsnowlabs/nlp/ParamsAndFeaturesWritable.scala new file mode 100644 index 00000000000000..976935c8e02774 --- /dev/null +++ b/src/main/scala/com/johnsnowlabs/nlp/ParamsAndFeaturesWritable.scala @@ -0,0 +1,21 @@ +package com.johnsnowlabs.nlp + +import org.apache.spark.ml.param.Params +import org.apache.spark.ml.util.{DefaultParamsWritable, MLWriter} + +class FeaturesWriter[T](annotatorWithFeatures: HasFeatures, baseWriter: MLWriter) extends MLWriter with HasFeatures { + + override protected def saveImpl(path: String): Unit = { + baseWriter.save(path) + + for (feature <- annotatorWithFeatures.features) { + feature.serializeInfer(sparkSession, path, feature.name, feature.getValue) + } + } +} + +trait ParamsAndFeaturesWritable extends DefaultParamsWritable with Params with HasFeatures { + + override def write: MLWriter = new FeaturesWriter(this, super.write) + +} diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/Lemmatizer.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/Lemmatizer.scala index 3673e5564e25e5..fc9c08040498e9 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/Lemmatizer.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/Lemmatizer.scala @@ -6,7 +6,7 @@ import com.johnsnowlabs.nlp.{Annotation, AnnotatorModel} import com.typesafe.config.Config import com.johnsnowlabs.nlp.util.ConfigHelper import org.apache.spark.ml.param.Param -import org.apache.spark.ml.util.{DefaultParamsReadable, Identifiable} +import org.apache.spark.ml.util.{DefaultParamsReadable, Identifiable, MLReader, MLWriter} import scala.collection.JavaConverters._ @@ -53,14 +53,22 @@ class Lemmatizer(override val uid: String) extends AnnotatorModel[Lemmatizer] { def this() = this(Identifiable.randomUID("LEMMATIZER")) def getLemmaDict: Map[String, String] = $(lemmaDict) + protected def getLemmaFormat: String = $(lemmaFormat) + protected def getLemmaKeySep: String = $(lemmaKeySep) + protected def getLemmaValSep: String = $(lemmaValSep) def setLemmaDict(dictionary: String): this.type = { set(lemmaDict, Lemmatizer.retrieveLemmaDict(dictionary, $(lemmaFormat), $(lemmaKeySep), $(lemmaValSep))) } - def setLemmaDictHMap(dictionary: java.util.HashMap[String, String]): this.type = { set(lemmaDict, dictionary.asScala.toMap) } + def setLemmaDictMap(dictionary: Map[String, String]): this.type = { + set(lemmaDict, dictionary) + } + def setLemmaFormat(value: String): this.type = set(lemmaFormat, value) + def setLemmaKeySep(value: String): this.type = set(lemmaKeySep, value) + def setLemmaValSep(value: String): this.type = set(lemmaValSep, value) /** * @return one to one annotation from token to a lemmatized word, if found on dictionary or leave the word as is @@ -80,11 +88,27 @@ class Lemmatizer(override val uid: String) extends AnnotatorModel[Lemmatizer] { } object Lemmatizer extends DefaultParamsReadable[Lemmatizer] { +/* + private val config: Config = ConfigHelper.retrieve + private val legacyRead = config.getBoolean("settings.legacyRead") + + private val lemmaDictKey = "lemmaDict" + private val lemmaFormatKey = "lemmaFormat" + private val lemmaKeySepKey = "lemmmaKeySep" + private val lemmaValSepKey = "lemmaValSep" + + override def read: MLReader[Lemmatizer] = { + if (legacyRead) + super.read + else + new Reader(super.read) + } /** * Retrieves Lemma dictionary from configured compiled source set in configuration * @return a Dictionary for lemmas */ +*/ protected def retrieveLemmaDict( lemmaFilePath: String, lemmaFormat: String, @@ -93,4 +117,41 @@ object Lemmatizer extends DefaultParamsReadable[Lemmatizer] { ): Map[String, String] = { ResourceHelper.flattenRevertValuesAsKeys(lemmaFilePath, lemmaFormat.toUpperCase, lemmaKeySep, lemmaValSep) } +/* + class Reader(baseReader: MLReader[Lemmatizer]) extends MLReader[Lemmatizer] { + + override def load(path: String): Lemmatizer = { + val helper = SerializationHelper(sparkSession, path) + val instance = baseReader.load(path) + + val lemmaDict = helper.deserializeMap[String, String](lemmaDictKey) + val lemmaFormat = helper.deserializeScalar[String](lemmaFormatKey) + .getOrElse(config.getString("nlp.lemmaDict.format")) + val lemmaKeySep = helper.deserializeScalar[String](lemmaKeySepKey) + .getOrElse(config.getString("nlp.lemmaDict.kvSeparator")) + val lemmaValSep = helper.deserializeScalar[String](lemmaValSepKey) + .getOrElse(config.getString("nlp.lemmaDict.vSeparator")) + + instance + .setLemmaDictMap(lemmaDict) + .setLemmaFormat(lemmaFormat) + .setLemmaKeySep(lemmaKeySep) + .setLemmaValSep(lemmaValSep) + } + } + + class Writer(model: Lemmatizer, baseWriter: MLWriter) extends MLWriter { + + override protected def saveImpl(path: String): Unit = { + baseWriter.save(path) + val helper = SerializationHelper(sparkSession, path) + + helper.serializeMap[String, String](lemmaDictKey, model.getLemmaDict) + helper.serializeScalar[String](lemmaFormatKey, model.getLemmaFormat) + helper.serializeScalar[String](lemmaKeySepKey, model.getLemmaKeySep) + helper.serializeScalar[String](lemmaValSepKey, model.getLemmaValSep) + + } + } +*/ } diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala index e82b7c0864e48f..aae32f444ef9a8 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala @@ -1,11 +1,11 @@ package com.johnsnowlabs.nlp.annotators.ner.crf -import com.johnsnowlabs.ml.crf.{LinearChainCrfModel, SerializedLinearChainCrfModel} +import com.johnsnowlabs.ml.crf.LinearChainCrfModel import com.johnsnowlabs.nlp.AnnotatorType._ import com.johnsnowlabs.nlp.annotators.common.{IndexedTaggedWord, NerTagged, PosTagged, TaggedSentence} import com.johnsnowlabs.nlp.annotators.common.Annotated.{NerTaggedSentence, PosTaggedSentence} -import com.johnsnowlabs.nlp.serialization.SerializationHelper -import com.johnsnowlabs.nlp.{Annotation, AnnotatorModel} +import com.johnsnowlabs.nlp.serialization.{MapFeature, StructFeature} +import com.johnsnowlabs.nlp.{Annotation, AnnotatorModel, ParamsAndFeaturesReadable} import org.apache.spark.ml.param.StringArrayParam import org.apache.spark.ml.util._ @@ -19,18 +19,13 @@ class NerCrfModel(override val uid: String) def this() = this(Identifiable.randomUID("NER")) val entities = new StringArrayParam(this, "entities", "List of Entities to recognize") - var model: Option[LinearChainCrfModel] = None - var dictionaryFeatures = DictionaryFeatures(Seq.empty) + val model: StructFeature[LinearChainCrfModel] = new StructFeature[LinearChainCrfModel](this, "crfModel", "CRF Model") + val dictionaryFeatures: MapFeature[String, String] = new MapFeature[String, String](this, "dictionaryFeatures", "CRF Features dictionary") - def setModel(crf: LinearChainCrfModel): NerCrfModel = { - model = Some(crf) - this - } + def setModel(crf: LinearChainCrfModel): NerCrfModel = set(model, crf) - def setDictionaryFeatures(dictFeatures: DictionaryFeatures) = { - dictionaryFeatures = dictFeatures - this - } + def setDictionaryFeatures(dictFeatures: DictionaryFeatures): this.type = set(dictionaryFeatures, dictFeatures.dict) + setDefault(dictionaryFeatures, Map.empty[String, String]) def setEntities(toExtract: Array[String]): NerCrfModel = set(entities, toExtract) @@ -40,12 +35,12 @@ class NerCrfModel(override val uid: String) * @return sentences with recognized Named Entities */ def tag(sentences: Seq[PosTaggedSentence]): Seq[NerTaggedSentence] = { - require(model.isDefined, "model must be set before tagging") + require(model.isSet, "model must be set before tagging") - val crf = model.get + val crf = $$(model) sentences.map{sentence => - val instance = FeatureGenerator(dictionaryFeatures).generate(sentence, crf.metadata) + val instance = FeatureGenerator(new DictionaryFeatures($$(dictionaryFeatures))).generate(sentence, crf.metadata) val labelIds = crf.predict(instance) val words = sentence.indexedTaggedWords .zip(labelIds.labels) @@ -70,48 +65,12 @@ class NerCrfModel(override val uid: String) NerTagged.pack(taggedSentences) } - def shrink(minW: Float): NerCrfModel = { - model = model.map(m => m.shrink(minW)) - this - } + def shrink(minW: Float): NerCrfModel = set(model, $$(model).shrink(minW)) override val requiredAnnotatorTypes = Array(DOCUMENT, TOKEN, POS) override val annotatorType: AnnotatorType = NAMED_ENTITY - override def write: MLWriter = new NerCrfModel.NerCrfModelWriter(this, super.write) -} - -object NerCrfModel extends DefaultParamsReadable[NerCrfModel] { - override def read: MLReader[NerCrfModel] = new NerCrfModelReader(super.read) - - private val crfModelKey = "crfModel" - private val dictionaryFeaturesKey = "dictionaryFeatures" - - class NerCrfModelReader(baseReader: MLReader[NerCrfModel]) extends MLReader[NerCrfModel] { - override def load(path: String): NerCrfModel = { - val helper = SerializationHelper(sparkSession, path) - val instance = baseReader.load(path) - - val crfModel = helper.deserializeScalar[SerializedLinearChainCrfModel](crfModelKey) - instance.model = crfModel.map(m => m.deserialize) - - val map = helper.deserializeMap[String, String](dictionaryFeaturesKey) - instance.setDictionaryFeatures(new DictionaryFeatures(map)) - } - } - - class NerCrfModelWriter(model: NerCrfModel, baseWriter: MLWriter) extends MLWriter { - - override protected def saveImpl(path: String): Unit = { - require(model.model.isDefined, "Crf Model must be defined before serialization") - - baseWriter.save(path) - val helper = SerializationHelper(sparkSession, path) - - helper.serializeScalar(crfModelKey, model.model.get.serialize) - helper.serializeMap(dictionaryFeaturesKey, model.dictionaryFeatures.dict) - } - } } +object NerCrfModel extends ParamsAndFeaturesReadable[NerCrfModel] diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/sda/vivekn/ViveknSentimentModel.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/sda/vivekn/ViveknSentimentModel.scala index 4cdbd0b47d0d5e..ea16c0900f1b58 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/sda/vivekn/ViveknSentimentModel.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/sda/vivekn/ViveknSentimentModel.scala @@ -1,17 +1,18 @@ package com.johnsnowlabs.nlp.annotators.sda.vivekn import com.johnsnowlabs.nlp.annotators.common.{Tokenized, TokenizedSentence} -import com.johnsnowlabs.nlp.serialization.SerializationHelper -import com.johnsnowlabs.nlp.{Annotation, AnnotatorModel} -import com.typesafe.config.{Config, ConfigFactory} +import com.johnsnowlabs.nlp.serialization.{ArrayFeature, MapFeature} +import com.johnsnowlabs.nlp.util.ConfigHelper +import com.johnsnowlabs.nlp.{Annotation, AnnotatorModel, ParamsAndFeaturesReadable} +import com.typesafe.config.Config import org.apache.spark.ml.param.IntParam -import org.apache.spark.ml.util.{DefaultParamsReadable, Identifiable, MLReader, MLWriter} +import org.apache.spark.ml.util.Identifiable class ViveknSentimentModel(override val uid: String) extends AnnotatorModel[ViveknSentimentModel] { import com.johnsnowlabs.nlp.AnnotatorType._ - private val config: Config = ConfigFactory.load + private val config: Config = ConfigHelper.retrieve private val importantFeatureRatio = config.getDouble("nlp.viveknSentiment.importantFeaturesRatio") private val unimportantFeatureStep = config.getDouble("nlp.viveknSentiment.unimportantFeaturesStepRatio") private val featureLimit = config.getInt("nlp.viveknSentiment.featuresLimit") @@ -20,20 +21,24 @@ class ViveknSentimentModel(override val uid: String) extends AnnotatorModel[Vive override val requiredAnnotatorTypes: Array[AnnotatorType] = Array(TOKEN, DOCUMENT) - protected var positive = Map[String, Int]() - protected var negative = Map[String, Int]() - protected var features = Array[String]() + protected val positive: MapFeature[String, Int] = MapFeature(this, "positive_sentences", "positive sentences trained") + protected val negative: MapFeature[String, Int] = MapFeature(this, "negative_sentences", "negative sentences trained") + protected val words: ArrayFeature[String] = ArrayFeature(this, "words", "unique words trained") protected val positiveTotals: IntParam = new IntParam(this, "positive_totals", "count of positive words") protected val negativeTotals: IntParam = new IntParam(this, "negative_totals", "count of negative words") def this() = this(Identifiable.randomUID("VIVEKN")) - private[vivekn] def setPositive(value: Map[String, Int]) = {positive = value; this} - private[vivekn] def setNegative(value: Map[String, Int]) = {negative = value; this} - private[vivekn] def setPositiveTotals(value: Int) = set(positiveTotals, value) - private[vivekn] def setNegativeTotals(value: Int) = set(negativeTotals, value) - private[vivekn] def setWords(value: Array[String]) = { + private[vivekn] def getPositive: Map[String, Int] = $$(positive) + private[vivekn] def getNegative: Map[String, Int] = $$(negative) + private[vivekn] def getFeatures: Array[String] = $$(words) + + private[vivekn] def setPositive(value: Map[String, Int]): this.type = set(positive, value) + private[vivekn] def setNegative(value: Map[String, Int]): this.type = set(negative, value) + private[vivekn] def setPositiveTotals(value: Int): this.type = set(positiveTotals, value) + private[vivekn] def setNegativeTotals(value: Int): this.type = set(negativeTotals, value) + private[vivekn] def setWords(value: Array[String]): this.type = { require(value.nonEmpty, "Word analysis for features cannot be empty. Set prune to false if training is small") val currentFeatures = scala.collection.mutable.Set.empty[String] val start = (value.length * importantFeatureRatio).ceil.toInt @@ -47,15 +52,14 @@ class ViveknSentimentModel(override val uid: String) extends AnnotatorModel[Vive value.slice(k, k+step).foreach(currentFeatures.add) }) - features = currentFeatures.toArray - this + set(words, currentFeatures.toArray) } def classify(sentence: TokenizedSentence): Boolean = { - val words = ViveknSentimentApproach.negateSequence(sentence.tokens.toList).intersect(features).distinct - if (words.isEmpty) return true - val positiveProbability = words.map(word => scala.math.log((positive.getOrElse(word, 0) + 1.0) / (2.0 * $(positiveTotals)))).sum - val negativeProbability = words.map(word => scala.math.log((negative.getOrElse(word, 0) + 1.0) / (2.0 * $(negativeTotals)))).sum + val wordFeatures = ViveknSentimentApproach.negateSequence(sentence.tokens.toList).intersect($$(words)).distinct + if (wordFeatures.isEmpty) return true + val positiveProbability = wordFeatures.map(word => scala.math.log(($$(positive).getOrElse(word, 0) + 1.0) / (2.0 * $(positiveTotals)))).sum + val negativeProbability = wordFeatures.map(word => scala.math.log(($$(negative).getOrElse(word, 0) + 1.0) / (2.0 * $(negativeTotals)))).sum positiveProbability > negativeProbability } @@ -81,42 +85,6 @@ class ViveknSentimentModel(override val uid: String) extends AnnotatorModel[Vive }) } - override def write: MLWriter = new ViveknSentimentModel.Writer(this, super.write) } -object ViveknSentimentModel extends DefaultParamsReadable[ViveknSentimentModel] { - override def read = new Reader(super.read) - - private val positiveKey = "positive" - private val negativeKey = "negative" - private val featuresKey = "features" - - class Reader(baseReader: MLReader[ViveknSentimentModel]) extends MLReader[ViveknSentimentModel] { - - override def load(path: String): ViveknSentimentModel = { - val helper = SerializationHelper(sparkSession, path) - val instance = baseReader.load(path) - - val positive = helper.deserializeMap[String, Int](positiveKey) - val negative = helper.deserializeMap[String, Int](negativeKey) - val features = helper.deserializeArray[String](featuresKey) - - instance.features = features - instance - .setNegative(negative) - .setPositive(positive) - } - } - - class Writer(model: ViveknSentimentModel, baseWriter: MLWriter) extends MLWriter { - - override protected def saveImpl(path: String): Unit = { - baseWriter.save(path) - val helper = SerializationHelper(sparkSession, path) - - helper.serializeMap[String, Int](positiveKey, model.positive) - helper.serializeMap[String, Int](negativeKey, model.negative) - helper.serializeArray[String](featuresKey, model.features) - } - } -} \ No newline at end of file +object ViveknSentimentModel extends ParamsAndFeaturesReadable[ViveknSentimentModel] \ No newline at end of file diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/spell/norvig/NorvigSweetingModel.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/spell/norvig/NorvigSweetingModel.scala index 4767a67bc55cd0..2cc1cf57b332ae 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/spell/norvig/NorvigSweetingModel.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/spell/norvig/NorvigSweetingModel.scala @@ -42,9 +42,11 @@ class NorvigSweetingModel(override val uid: String) extends AnnotatorModel[Norvi def this() = this(Identifiable.randomUID("SPELL")) def setWordCount(value: Map[String, Int]): this.type = set(wordCount, value) - def setCustomDict(value: Map[String, String]): this.type = set(customDict, value) + protected def getWordCount: Map[String, Int] = $(wordCount) + protected def getCustomDict: Map[String, String] = $(customDict) + /** Utilities */ /** number of items duplicated in some text */ def cartesianProduct[T](xss: List[List[_]]): List[List[_]] = xss match { @@ -244,4 +246,4 @@ class NorvigSweetingModel(override val uid: String) extends AnnotatorModel[Norvi } } -object NorvigSweetingModel extends DefaultParamsReadable[NorvigSweetingModel] +object NorvigSweetingModel extends DefaultParamsReadable[NorvigSweetingModel] \ No newline at end of file diff --git a/src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala b/src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala new file mode 100644 index 00000000000000..e574404ef0fdac --- /dev/null +++ b/src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala @@ -0,0 +1,110 @@ +package com.johnsnowlabs.nlp.serialization + +import com.johnsnowlabs.nlp.HasFeatures +import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.spark.sql.types.{ArrayType, StringType} +import org.apache.spark.sql.{Encoder, Encoders, SparkSession} + +import scala.reflect.ClassTag + +abstract class Feature[Serializable1, Serializable2, TComplete](model: HasFeatures, val name: String, val description: String) extends Serializable { + model.features.append(this) + + final protected var value: Option[TComplete] = None + final protected var defaultValue: Option[TComplete] = None + + def serialize(spark: SparkSession, path: String, field: String, value: TComplete): Unit + + final def serializeInfer(spark: SparkSession, path: String, field: String, value: Any): Unit = + serialize(spark, path, field, value.asInstanceOf[TComplete]) + + def deserialize(spark: SparkSession, path: String, field: String): Option[_] + + final protected def getFieldPath(path: String, field: String): Path = + Path.mergePaths(new Path(path), new Path("/fields/" + field)) + + final def get: Option[TComplete] = value + final def getValue: TComplete = value.getOrElse(getDefault) + final def getDefault: TComplete = defaultValue.getOrElse(throw new Exception(s"Feature $name has no default value")) + final def setValue(v: Option[Any]): HasFeatures = {value = Some(v.get.asInstanceOf[TComplete]); model} + final def setDefault(v: Option[Any]): HasFeatures = {defaultValue = Some(v.get.asInstanceOf[TComplete]); model} + final def isSet: Boolean = value.isDefined + +} + +case class StructFeature[TValue: ClassTag](model: HasFeatures, override val name: String, override val description: String) + extends Feature[TValue, TValue, TValue](model, name, description) { + + implicit val encoder: Encoder[TValue] = Encoders.kryo[TValue] + + override def serialize(spark: SparkSession, path: String, field: String, value: TValue): Unit = { + import spark.sqlContext.implicits._ + val dataPath = getFieldPath(path, field) + Seq(value.asInstanceOf[TValue]).toDS.write.mode("overwrite").parquet(dataPath.toString) + } + + override def deserialize(spark: SparkSession, path: String, field: String): Option[TValue] = { + val fs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration) + val dataPath = getFieldPath(path, field) + if (fs.exists(dataPath)) { + val loaded = spark.read.parquet(dataPath.toString) + import spark.implicits._ + loaded.schema.head.dataType match { + case ArrayType(StringType, _) => loaded.as[String].collect.headOption.map(_.asInstanceOf[TValue]) + case _ => loaded.as[TValue].collect.headOption + } + } else { + None + } + } + +} + +case class MapFeature[TKey: ClassTag, TValue: ClassTag](model: HasFeatures, override val name: String, override val description: String) + extends Feature[TKey, TValue, Map[TKey, TValue]](model, name, description) { + + implicit val encoder: Encoder[Map[TKey, TValue]] = Encoders.kryo[Map[TKey, TValue]] + + override def serialize(spark: SparkSession, path: String, field: String, value: Map[TKey, TValue]): Unit = { + import spark.sqlContext.implicits._ + val dataPath = getFieldPath(path, field) + Seq(value).toDS.write.mode("overwrite").parquet(dataPath.toString) + } + + override def deserialize(spark: SparkSession, path: String, field: String): Option[Map[TKey, TValue]] = { + val fs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration) + val dataPath = getFieldPath(path, field) + if (fs.exists(dataPath)) { + val loaded = spark.read.parquet(dataPath.toString) + loaded.as[Map[TKey, TValue]].collect.headOption + } else { + None + } + } + +} + +case class ArrayFeature[TValue: ClassTag](model: HasFeatures, override val name: String, override val description: String) + extends Feature[TValue, TValue, Array[TValue]](model, name, description) { + + implicit val encoder: Encoder[TValue] = Encoders.kryo[TValue] + + override def serialize(spark: SparkSession, path: String, field: String, value: Array[TValue]): Unit = { + import spark.sqlContext.implicits._ + val dataPath = getFieldPath(path, field) + Seq(value.toSeq.toDS.write.mode("overwrite").parquet(dataPath.toString)) + } + + override def deserialize(spark: SparkSession, path: String, field: String): Option[Array[TValue]] = { + val fs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration) + val dataPath = getFieldPath(path, field) + if (fs.exists(dataPath)) { + val loaded = spark.read.parquet(dataPath.toString) + Some(loaded.as[TValue].collect) + } else { + None + } + } + +} + diff --git a/src/main/scala/com/johnsnowlabs/nlp/serialization/SerializationHelper.scala b/src/main/scala/com/johnsnowlabs/nlp/serialization/SerializationHelper.scala deleted file mode 100644 index 0c04b783fcb558..00000000000000 --- a/src/main/scala/com/johnsnowlabs/nlp/serialization/SerializationHelper.scala +++ /dev/null @@ -1,77 +0,0 @@ -package com.johnsnowlabs.nlp.serialization - -import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.spark.sql.{Encoders, SparkSession} - -import scala.reflect.ClassTag - - -case class SerializationHelper(spark: SparkSession, path: String) { - import spark.sqlContext.implicits._ - val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration) - - private def getFieldPath(field: String): Path = - Path.mergePaths(new Path(path), new Path("/fields/" + field)) - - - def serializeScalar[TValue: ClassTag](field: String, value: TValue): Unit = { - implicit val encoder = Encoders.kryo[TValue] - - val dataPath = getFieldPath(field) - Seq(value).toDS.write.mode("overwrite").parquet(dataPath.toString) - } - - def deserializeScalar[TValue: ClassTag](field: String): Option[TValue] = { - implicit val encoder = Encoders.kryo[TValue] - - val dataPath = getFieldPath(field) - if (fs.exists(dataPath)) { - val loaded = spark.sqlContext.read.format("parquet").load(dataPath.toString) - loaded.as[TValue].collect.headOption - } else { - None - } - } - - def serializeArray[TValue: ClassTag](field: String, value: Array[TValue]): Unit = { - implicit val encoder = Encoders.kryo[TValue] - - val dataPath = getFieldPath(field) - value.toSeq.toDS.write.mode("overwrite").parquet(dataPath.toString) - } - - def deserializeArray[TValue: ClassTag](field: String): Array[TValue] = { - implicit val encoder = Encoders.kryo[TValue] - - val dataPath = getFieldPath(field) - if (fs.exists(dataPath)) { - val loaded = spark.sqlContext.read.format("parquet").load(dataPath.toString) - loaded.as[TValue].collect - } else { - Array.empty - } - } - - def serializeMap[TKey: ClassTag, TValue: ClassTag](field: String, value: Map[TKey, TValue]): Unit = { - implicit val valueEncoder = Encoders.kryo[Map[TKey, TValue]] - - val dataPath = getFieldPath(field) - Seq(value).toDF().write.mode("overwrite").parquet(dataPath.toString) - } - - def deserializeMap[TKey: ClassTag, TValue: ClassTag](field: String): Map[TKey, TValue] = { - implicit val valueEncoder = Encoders.kryo[Map[TKey, TValue]] - - val dataPath = getFieldPath(field) - - if (fs.exists(dataPath)) { - val loaded = spark.sqlContext.read.format("parquet").load(dataPath.toString) - loaded.as[Map[TKey, TValue]] - .collect - .headOption - .getOrElse(Map[TKey, TValue]()) - } else { - Map() - } - } -} diff --git a/src/test/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfApproachSpec.scala b/src/test/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfApproachSpec.scala index 8739ec15c5ff7c..e57eb4b8548a7d 100644 --- a/src/test/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfApproachSpec.scala +++ b/src/test/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfApproachSpec.scala @@ -16,14 +16,14 @@ class NerCrfApproachSpec extends FlatSpec { nerModel.write.overwrite.save("./test_crf_pipeline") val loadedNer = NerCrfModel.read.load("./test_crf_pipeline") - assert(nerModel.model.get.serialize == loadedNer.model.get.serialize) - assert(nerModel.dictionaryFeatures == loadedNer.dictionaryFeatures) + assert(nerModel.model.getValue.serialize == loadedNer.model.getValue.serialize) + assert(nerModel.dictionaryFeatures.getValue == loadedNer.dictionaryFeatures.getValue) } "NerCrfApproach" should "have correct set of labels" in { - assert(nerModel.model.isDefined) - val metadata = nerModel.model.get.metadata + assert(nerModel.model.isSet) + val metadata = nerModel.model.getValue.metadata assert(metadata.labels.toSeq == Seq("@#Start", "PER", "O", "ORG", "LOC")) } @@ -65,7 +65,7 @@ class NerCrfApproachSpec extends FlatSpec { "NerCrfModel" should "correctly handle entities param" in { val restrictedModel = new NerCrfModel() .setEntities(Array("PER", "LOC")) - .setModel(nerModel.model.get) + .setModel(nerModel.model.getValue) .setOutputCol(nerModel.getOutputCol) .setInputCols(nerModel.getInputCols) From 60cff1d9916fbd5a9197536d099896b0789fda7f Mon Sep 17 00:00:00 2001 From: Saif Addin Date: Sat, 6 Jan 2018 00:45:26 -0300 Subject: [PATCH 04/12] - Removed bad comments --- .../nlp/annotators/Lemmatizer.scala | 60 +------------------ 1 file changed, 1 insertion(+), 59 deletions(-) diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/Lemmatizer.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/Lemmatizer.scala index fc9c08040498e9..657c27ac5f19ea 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/Lemmatizer.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/Lemmatizer.scala @@ -6,7 +6,7 @@ import com.johnsnowlabs.nlp.{Annotation, AnnotatorModel} import com.typesafe.config.Config import com.johnsnowlabs.nlp.util.ConfigHelper import org.apache.spark.ml.param.Param -import org.apache.spark.ml.util.{DefaultParamsReadable, Identifiable, MLReader, MLWriter} +import org.apache.spark.ml.util.{DefaultParamsReadable, Identifiable} import scala.collection.JavaConverters._ @@ -88,27 +88,6 @@ class Lemmatizer(override val uid: String) extends AnnotatorModel[Lemmatizer] { } object Lemmatizer extends DefaultParamsReadable[Lemmatizer] { -/* - private val config: Config = ConfigHelper.retrieve - private val legacyRead = config.getBoolean("settings.legacyRead") - - private val lemmaDictKey = "lemmaDict" - private val lemmaFormatKey = "lemmaFormat" - private val lemmaKeySepKey = "lemmmaKeySep" - private val lemmaValSepKey = "lemmaValSep" - - override def read: MLReader[Lemmatizer] = { - if (legacyRead) - super.read - else - new Reader(super.read) - } - - /** - * Retrieves Lemma dictionary from configured compiled source set in configuration - * @return a Dictionary for lemmas - */ -*/ protected def retrieveLemmaDict( lemmaFilePath: String, lemmaFormat: String, @@ -117,41 +96,4 @@ object Lemmatizer extends DefaultParamsReadable[Lemmatizer] { ): Map[String, String] = { ResourceHelper.flattenRevertValuesAsKeys(lemmaFilePath, lemmaFormat.toUpperCase, lemmaKeySep, lemmaValSep) } -/* - class Reader(baseReader: MLReader[Lemmatizer]) extends MLReader[Lemmatizer] { - - override def load(path: String): Lemmatizer = { - val helper = SerializationHelper(sparkSession, path) - val instance = baseReader.load(path) - - val lemmaDict = helper.deserializeMap[String, String](lemmaDictKey) - val lemmaFormat = helper.deserializeScalar[String](lemmaFormatKey) - .getOrElse(config.getString("nlp.lemmaDict.format")) - val lemmaKeySep = helper.deserializeScalar[String](lemmaKeySepKey) - .getOrElse(config.getString("nlp.lemmaDict.kvSeparator")) - val lemmaValSep = helper.deserializeScalar[String](lemmaValSepKey) - .getOrElse(config.getString("nlp.lemmaDict.vSeparator")) - - instance - .setLemmaDictMap(lemmaDict) - .setLemmaFormat(lemmaFormat) - .setLemmaKeySep(lemmaKeySep) - .setLemmaValSep(lemmaValSep) - } - } - - class Writer(model: Lemmatizer, baseWriter: MLWriter) extends MLWriter { - - override protected def saveImpl(path: String): Unit = { - baseWriter.save(path) - val helper = SerializationHelper(sparkSession, path) - - helper.serializeMap[String, String](lemmaDictKey, model.getLemmaDict) - helper.serializeScalar[String](lemmaFormatKey, model.getLemmaFormat) - helper.serializeScalar[String](lemmaKeySepKey, model.getLemmaKeySep) - helper.serializeScalar[String](lemmaValSepKey, model.getLemmaValSep) - - } - } -*/ } From 1bb498d34dae432f0648912c951bd87fd9440ce8 Mon Sep 17 00:00:00 2001 From: Saif Addin Date: Sun, 7 Jan 2018 17:14:29 -0300 Subject: [PATCH 05/12] - Features as classes - Extended features to POS, Spell Checker and Lemmatizer --- .../nlp/annotators/Lemmatizer.scala | 14 +++++------ .../pos/perceptron/PerceptronModel.scala | 20 ++++++++-------- .../sda/vivekn/ViveknSentimentModel.scala | 6 ++--- .../spell/norvig/NorvigSweetingModel.scala | 24 +++++++++---------- .../nlp/serialization/Feature.scala | 6 ++--- 5 files changed, 35 insertions(+), 35 deletions(-) diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/Lemmatizer.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/Lemmatizer.scala index 657c27ac5f19ea..bcacba7d0addbf 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/Lemmatizer.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/Lemmatizer.scala @@ -1,12 +1,12 @@ package com.johnsnowlabs.nlp.annotators -import com.johnsnowlabs.nlp.annotators.common.StringMapParam +import com.johnsnowlabs.nlp.serialization.MapFeature import com.johnsnowlabs.nlp.util.io.ResourceHelper -import com.johnsnowlabs.nlp.{Annotation, AnnotatorModel} +import com.johnsnowlabs.nlp.{Annotation, AnnotatorModel, ParamsAndFeaturesReadable} import com.typesafe.config.Config import com.johnsnowlabs.nlp.util.ConfigHelper import org.apache.spark.ml.param.Param -import org.apache.spark.ml.util.{DefaultParamsReadable, Identifiable} +import org.apache.spark.ml.util.Identifiable import scala.collection.JavaConverters._ @@ -25,7 +25,7 @@ class Lemmatizer(override val uid: String) extends AnnotatorModel[Lemmatizer] { private val config: Config = ConfigHelper.retrieve - val lemmaDict: StringMapParam = new StringMapParam(this, "lemmaDict", "provide a lemma dictionary") + val lemmaDict: MapFeature[String, String] = new MapFeature(this, "lemmaDict", "provide a lemma dictionary") val lemmaFormat: Param[String] = new Param[String](this, "lemmaFormat", "TXT or TXTDS for reading dictionary as dataset") @@ -52,7 +52,7 @@ class Lemmatizer(override val uid: String) extends AnnotatorModel[Lemmatizer] { def this() = this(Identifiable.randomUID("LEMMATIZER")) - def getLemmaDict: Map[String, String] = $(lemmaDict) + def getLemmaDict: Map[String, String] = $$(lemmaDict) protected def getLemmaFormat: String = $(lemmaFormat) protected def getLemmaKeySep: String = $(lemmaKeySep) protected def getLemmaValSep: String = $(lemmaValSep) @@ -80,14 +80,14 @@ class Lemmatizer(override val uid: String) extends AnnotatorModel[Lemmatizer] { annotatorType, tokenAnnotation.begin, tokenAnnotation.end, - $(lemmaDict).getOrElse(token, token), + $$(lemmaDict).getOrElse(token, token), tokenAnnotation.metadata ) } } } -object Lemmatizer extends DefaultParamsReadable[Lemmatizer] { +object Lemmatizer extends ParamsAndFeaturesReadable[Lemmatizer] { protected def retrieveLemmaDict( lemmaFilePath: String, lemmaFormat: String, diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/pos/perceptron/PerceptronModel.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/pos/perceptron/PerceptronModel.scala index 8b614548fbce84..f66daa3011a0da 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/pos/perceptron/PerceptronModel.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/pos/perceptron/PerceptronModel.scala @@ -1,9 +1,9 @@ package com.johnsnowlabs.nlp.annotators.pos.perceptron import com.johnsnowlabs.nlp.annotators.common._ -import com.johnsnowlabs.nlp.annotators.param.AnnotatorParam -import com.johnsnowlabs.nlp.{Annotation, AnnotatorModel} -import org.apache.spark.ml.util.{DefaultParamsReadable, Identifiable} +import com.johnsnowlabs.nlp.serialization.StructFeature +import com.johnsnowlabs.nlp.{Annotation, AnnotatorModel, ParamsAndFeaturesReadable} +import org.apache.spark.ml.util.Identifiable /** * Part of speech tagger that might use different approaches @@ -18,8 +18,8 @@ class PerceptronModel(override val uid: String) extends AnnotatorModel[Perceptro /** Internal structure for target sentences holding their range information which is used for annotation */ private case class SentenceToBeTagged(tokenizedSentence: TokenizedSentence, start: Int, end: Int) - val model: AnnotatorParam[AveragedPerceptron, SerializedPerceptronModel] = - new AnnotatorParam[AveragedPerceptron, SerializedPerceptronModel](this, "POS Model", "POS Tagging approach") + val model: StructFeature[AveragedPerceptron] = + new StructFeature[AveragedPerceptron](this, "POS Model", "POS Tagging approach") override val annotatorType: AnnotatorType = POS @@ -35,16 +35,16 @@ class PerceptronModel(override val uid: String) extends AnnotatorModel[Perceptro */ def tag(tokenizedSentences: Array[TokenizedSentence]): Array[TaggedSentence] = { logger.debug(s"PREDICTION: Tagging:\nSENT: <<${tokenizedSentences.map(_.condense).mkString(">>\nSENT<<")}>> model weight properties in 'bias' " + - s"feature:\nPREDICTION: ${$(model).getWeights("bias").mkString("\nPREDICTION: ")}") + s"feature:\nPREDICTION: ${$$(model).getWeights("bias").mkString("\nPREDICTION: ")}") var prev = START(0) var prev2 = START(1) tokenizedSentences.map(sentence => { val context: Array[String] = START ++: sentence.tokens.map(normalized) ++: END sentence.indexedTokens.zipWithIndex.map { case (IndexedToken(word, begin, end), i) => - val tag = $(model).getTagBook.find(_.word == word.toLowerCase).map(_.tag).getOrElse( + val tag = $$(model).getTagBook.find(_.word == word.toLowerCase).map(_.tag).getOrElse( { val features = getFeatures(i, word, context, prev, prev2) - $(model).predict(features) + $$(model).predict(features) } ) prev2 = prev @@ -56,7 +56,7 @@ class PerceptronModel(override val uid: String) extends AnnotatorModel[Perceptro def this() = this(Identifiable.randomUID("POS")) - def getModel: AveragedPerceptron = $(model) + def getModel: AveragedPerceptron = $$(model) def setModel(targetModel: AveragedPerceptron): this.type = set(model, targetModel) @@ -68,4 +68,4 @@ class PerceptronModel(override val uid: String) extends AnnotatorModel[Perceptro } } -object PerceptronModel extends DefaultParamsReadable[PerceptronModel] \ No newline at end of file +object PerceptronModel extends ParamsAndFeaturesReadable[PerceptronModel] \ No newline at end of file diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/sda/vivekn/ViveknSentimentModel.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/sda/vivekn/ViveknSentimentModel.scala index ea16c0900f1b58..b6bed97b1b56c0 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/sda/vivekn/ViveknSentimentModel.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/sda/vivekn/ViveknSentimentModel.scala @@ -21,9 +21,9 @@ class ViveknSentimentModel(override val uid: String) extends AnnotatorModel[Vive override val requiredAnnotatorTypes: Array[AnnotatorType] = Array(TOKEN, DOCUMENT) - protected val positive: MapFeature[String, Int] = MapFeature(this, "positive_sentences", "positive sentences trained") - protected val negative: MapFeature[String, Int] = MapFeature(this, "negative_sentences", "negative sentences trained") - protected val words: ArrayFeature[String] = ArrayFeature(this, "words", "unique words trained") + protected val positive: MapFeature[String, Int] = new MapFeature(this, "positive_sentences", "positive sentences trained") + protected val negative: MapFeature[String, Int] = new MapFeature(this, "negative_sentences", "negative sentences trained") + protected val words: ArrayFeature[String] = new ArrayFeature(this, "words", "unique words trained") protected val positiveTotals: IntParam = new IntParam(this, "positive_totals", "count of positive words") protected val negativeTotals: IntParam = new IntParam(this, "negative_totals", "count of negative words") diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/spell/norvig/NorvigSweetingModel.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/spell/norvig/NorvigSweetingModel.scala index 2cc1cf57b332ae..c3bbb009d10b44 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/spell/norvig/NorvigSweetingModel.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/spell/norvig/NorvigSweetingModel.scala @@ -1,9 +1,9 @@ package com.johnsnowlabs.nlp.annotators.spell.norvig -import com.johnsnowlabs.nlp.annotators.common.{IntStringMapParam, StringMapParam} -import com.johnsnowlabs.nlp.{Annotation, AnnotatorModel} +import com.johnsnowlabs.nlp.serialization.MapFeature +import com.johnsnowlabs.nlp.{Annotation, AnnotatorModel, ParamsAndFeaturesReadable} import com.typesafe.config.{Config, ConfigFactory} -import org.apache.spark.ml.util.{DefaultParamsReadable, Identifiable} +import org.apache.spark.ml.util.Identifiable import org.slf4j.LoggerFactory import scala.collection.immutable.HashSet @@ -22,8 +22,8 @@ class NorvigSweetingModel(override val uid: String) extends AnnotatorModel[Norvi private val alphabet = "abcdefghijjklmnopqrstuvwxyz".toCharArray private val vowels = "aeiouy".toCharArray - protected val wordCount: IntStringMapParam = new IntStringMapParam(this, "word_count", "word count") - protected val customDict: StringMapParam = new StringMapParam(this, "custom_dict", "custom dict") + protected val wordCount: MapFeature[String, Int] = new MapFeature(this, "wordCount", "words frequency in training") + protected val customDict: MapFeature[String, String] = new MapFeature(this, "customDict", "custom words in dictionary") private val logger = LoggerFactory.getLogger("NorvigApproach") private val config: Config = ConfigFactory.load @@ -36,7 +36,7 @@ class NorvigSweetingModel(override val uid: String) extends AnnotatorModel[Norvi private val vowelSwapLimit = config.getInt("nlp.norvigChecker.vowelSwapLimit") private lazy val allWords: HashSet[String] = { - if ($(caseSensitive)) HashSet($(wordCount).keys.toSeq:_*) else HashSet($(wordCount).keys.toSeq.map(_.toLowerCase):_*) + if ($(caseSensitive)) HashSet($$(wordCount).keys.toSeq:_*) else HashSet($$(wordCount).keys.toSeq.map(_.toLowerCase):_*) } def this() = this(Identifiable.randomUID("SPELL")) @@ -44,8 +44,8 @@ class NorvigSweetingModel(override val uid: String) extends AnnotatorModel[Norvi def setWordCount(value: Map[String, Int]): this.type = set(wordCount, value) def setCustomDict(value: Map[String, String]): this.type = set(customDict, value) - protected def getWordCount: Map[String, Int] = $(wordCount) - protected def getCustomDict: Map[String, String] = $(customDict) + protected def getWordCount: Map[String, Int] = $$(wordCount) + protected def getCustomDict: Map[String, String] = $$(customDict) /** Utilities */ /** number of items duplicated in some text */ @@ -100,7 +100,7 @@ class NorvigSweetingModel(override val uid: String) extends AnnotatorModel[Norvi wordCount.getOrElse(word, 0) } - private def compareFrequencies(value: String): Int = frequency(value, $(wordCount)) + private def compareFrequencies(value: String): Int = frequency(value, $$(wordCount)) private def compareHammers(input: String)(value: String): Int = hammingDistance(input, value) /** Posibilities analysis */ @@ -169,9 +169,9 @@ class NorvigSweetingModel(override val uid: String) extends AnnotatorModel[Norvi if (allWords.contains(word)) { logger.debug("Word found in dictionary. No spell change") Some(word) - } else if ($(customDict).contains(word)) { + } else if ($$(customDict).contains(word)) { logger.debug("Word custom dictionary found. Replacing") - Some($(customDict)(word)) + Some($$(customDict)(word)) } else if (allWords.contains(word.distinct)) { logger.debug("Word as distinct found in dictionary") Some(word.distinct) @@ -246,4 +246,4 @@ class NorvigSweetingModel(override val uid: String) extends AnnotatorModel[Norvi } } -object NorvigSweetingModel extends DefaultParamsReadable[NorvigSweetingModel] \ No newline at end of file +object NorvigSweetingModel extends ParamsAndFeaturesReadable[NorvigSweetingModel] \ No newline at end of file diff --git a/src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala b/src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala index e574404ef0fdac..a1903d063d7b1c 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala @@ -32,7 +32,7 @@ abstract class Feature[Serializable1, Serializable2, TComplete](model: HasFeatur } -case class StructFeature[TValue: ClassTag](model: HasFeatures, override val name: String, override val description: String) +class StructFeature[TValue: ClassTag](model: HasFeatures, override val name: String, override val description: String) extends Feature[TValue, TValue, TValue](model, name, description) { implicit val encoder: Encoder[TValue] = Encoders.kryo[TValue] @@ -60,7 +60,7 @@ case class StructFeature[TValue: ClassTag](model: HasFeatures, override val name } -case class MapFeature[TKey: ClassTag, TValue: ClassTag](model: HasFeatures, override val name: String, override val description: String) +class MapFeature[TKey: ClassTag, TValue: ClassTag](model: HasFeatures, override val name: String, override val description: String) extends Feature[TKey, TValue, Map[TKey, TValue]](model, name, description) { implicit val encoder: Encoder[Map[TKey, TValue]] = Encoders.kryo[Map[TKey, TValue]] @@ -84,7 +84,7 @@ case class MapFeature[TKey: ClassTag, TValue: ClassTag](model: HasFeatures, over } -case class ArrayFeature[TValue: ClassTag](model: HasFeatures, override val name: String, override val description: String) +class ArrayFeature[TValue: ClassTag](model: HasFeatures, override val name: String, override val description: String) extends Feature[TValue, TValue, Array[TValue]](model, name, description) { implicit val encoder: Encoder[TValue] = Encoders.kryo[TValue] From b0c2d77ca2ca1b0ff62a9e8e63b01d26c746b7ac Mon Sep 17 00:00:00 2001 From: Saif Addin Date: Sun, 7 Jan 2018 19:17:07 -0300 Subject: [PATCH 06/12] - Testing broadcast --- .../scala/com/johnsnowlabs/nlp/HasFeatures.scala | 6 ------ .../johnsnowlabs/nlp/annotators/Lemmatizer.scala | 2 +- .../nlp/annotators/ner/crf/NerCrfModel.scala | 2 +- .../johnsnowlabs/nlp/serialization/Feature.scala | 16 +++++++--------- 4 files changed, 9 insertions(+), 17 deletions(-) diff --git a/src/main/scala/com/johnsnowlabs/nlp/HasFeatures.scala b/src/main/scala/com/johnsnowlabs/nlp/HasFeatures.scala index e4606de9dd3705..81ba7a9063b929 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/HasFeatures.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/HasFeatures.scala @@ -14,12 +14,6 @@ trait HasFeatures { protected def set[T](feature: StructFeature[T], value: T): this.type = {feature.setValue(Some(value)); this} - protected def setDefault[T](feature: ArrayFeature[T], value: Array[T]): this.type = {feature.setDefault(Some(value)); this} - - protected def setDefault[K, V](feature: MapFeature[K, V], value: Map[K, V]): this.type = {feature.setDefault(Some(value)); this} - - protected def setDefault[T](feature: StructFeature[T], value: T): this.type = {feature.setDefault(Some(value)); this} - protected def get[T](feature: ArrayFeature[T]): Option[Array[T]] = feature.get protected def get[K, V](feature: MapFeature[K, V]): Option[Map[K, V]] = feature.get diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/Lemmatizer.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/Lemmatizer.scala index bcacba7d0addbf..6fd8979e9503e3 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/Lemmatizer.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/Lemmatizer.scala @@ -40,7 +40,7 @@ class Lemmatizer(override val uid: String) extends AnnotatorModel[Lemmatizer] { setDefault(lemmaValSep, config.getString("nlp.lemmaDict.vSeparator")) if (config.getString("nlp.lemmaDict.file").nonEmpty) - setDefault(lemmaDict, Lemmatizer.retrieveLemmaDict( + set(lemmaDict, Lemmatizer.retrieveLemmaDict( config.getString("nlp.lemmaDict.file"), config.getString("nlp.lemmaDict.format"), config.getString("nlp.lemmaDict.kvSeparator"), diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala index aae32f444ef9a8..35f260af3bfad4 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala @@ -25,7 +25,7 @@ class NerCrfModel(override val uid: String) def setModel(crf: LinearChainCrfModel): NerCrfModel = set(model, crf) def setDictionaryFeatures(dictFeatures: DictionaryFeatures): this.type = set(dictionaryFeatures, dictFeatures.dict) - setDefault(dictionaryFeatures, Map.empty[String, String]) + set(dictionaryFeatures, Map.empty[String, String]) def setEntities(toExtract: Array[String]): NerCrfModel = set(entities, toExtract) diff --git a/src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala b/src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala index a1903d063d7b1c..51085abda8fb9b 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala @@ -2,16 +2,16 @@ package com.johnsnowlabs.nlp.serialization import com.johnsnowlabs.nlp.HasFeatures import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.spark.broadcast.Broadcast import org.apache.spark.sql.types.{ArrayType, StringType} import org.apache.spark.sql.{Encoder, Encoders, SparkSession} import scala.reflect.ClassTag -abstract class Feature[Serializable1, Serializable2, TComplete](model: HasFeatures, val name: String, val description: String) extends Serializable { +abstract class Feature[Serializable1, Serializable2, TComplete](model: HasFeatures, val name: String, val description: String)(implicit val sparkSession: SparkSession = SparkSession.builder().getOrCreate()) extends Serializable { model.features.append(this) - final protected var value: Option[TComplete] = None - final protected var defaultValue: Option[TComplete] = None + final protected var value: Broadcast[Option[TComplete]] = sparkSession.sparkContext.broadcast[Option[TComplete]](None) def serialize(spark: SparkSession, path: String, field: String, value: TComplete): Unit @@ -23,12 +23,10 @@ abstract class Feature[Serializable1, Serializable2, TComplete](model: HasFeatur final protected def getFieldPath(path: String, field: String): Path = Path.mergePaths(new Path(path), new Path("/fields/" + field)) - final def get: Option[TComplete] = value - final def getValue: TComplete = value.getOrElse(getDefault) - final def getDefault: TComplete = defaultValue.getOrElse(throw new Exception(s"Feature $name has no default value")) - final def setValue(v: Option[Any]): HasFeatures = {value = Some(v.get.asInstanceOf[TComplete]); model} - final def setDefault(v: Option[Any]): HasFeatures = {defaultValue = Some(v.get.asInstanceOf[TComplete]); model} - final def isSet: Boolean = value.isDefined + final def get: Option[TComplete] = value.value + final def getValue: TComplete = value.value.getOrElse(throw new Exception(s"feature $name is not set")) + final def setValue(v: Option[Any]): HasFeatures = {value.unpersist(false); value = sparkSession.sparkContext.broadcast(Some(v.get.asInstanceOf[TComplete])); model} + final def isSet: Boolean = value.value.isDefined } From b52f518da1aab013df3f02325307a40551137df2 Mon Sep 17 00:00:00 2001 From: Saif Addin Date: Sun, 7 Jan 2018 20:11:49 -0300 Subject: [PATCH 07/12] - Switch option order --- .../example/vivekn-sentiment/sentiment.ipynb | 248 ++++++++++++++---- .../com/johnsnowlabs/nlp/HasFeatures.scala | 6 + .../nlp/annotators/Lemmatizer.scala | 2 +- .../nlp/annotators/ner/crf/NerCrfModel.scala | 2 +- .../nlp/serialization/Feature.scala | 16 +- 5 files changed, 213 insertions(+), 61 deletions(-) diff --git a/python/example/vivekn-sentiment/sentiment.ipynb b/python/example/vivekn-sentiment/sentiment.ipynb index 502b7c8fe91fca..d5d5fdd80f60d6 100644 --- a/python/example/vivekn-sentiment/sentiment.ipynb +++ b/python/example/vivekn-sentiment/sentiment.ipynb @@ -2,13 +2,14 @@ "cells": [ { "cell_type": "code", - "execution_count": null, + "execution_count": 1, "metadata": { "collapsed": true }, "outputs": [], "source": [ "#Imports\n", + "import time\n", "import sys\n", "sys.path.append('../../')\n", "\n", @@ -19,27 +20,42 @@ }, { "cell_type": "code", - "execution_count": null, - "metadata": { - "collapsed": true - }, - "outputs": [], - "source": [ - "from pyspark.sql import SparkSession\n", - "\n", - "spark = SparkSession.builder \\\n", - " .master(\"local[2]\") \\\n", - " .config(\"spark.jar\", \"lib/sparknlp.jar\") \\\n", - " .config(\"spark.driver.memory\", \"5g\")\\\n", - " .config(\"spark.dirver.maxResultSize\", \"2g\")\\\n", - " .getOrCreate()" - ] - }, - { - "cell_type": "code", - "execution_count": null, + "execution_count": 2, "metadata": {}, - "outputs": [], + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+------+---------+--------------------+\n", + "|itemid|sentiment| text|\n", + "+------+---------+--------------------+\n", + "| 1| 0| ...|\n", + "| 2| 0| ...|\n", + "| 3| 1| omg...|\n", + "| 4| 0| .. Omga...|\n", + "| 5| 0| i think ...|\n", + "| 6| 0| or i jus...|\n", + "| 7| 1| Juuuuuuuuu...|\n", + "| 8| 0| Sunny Agai...|\n", + "| 9| 1| handed in m...|\n", + "| 10| 1| hmmmm.... i...|\n", + "| 11| 0| I must thin...|\n", + "| 12| 1| thanks to a...|\n", + "| 13| 0| this weeken...|\n", + "| 14| 0| jb isnt show...|\n", + "| 15| 0| ok thats it ...|\n", + "| 16| 0| <-------- ...|\n", + "| 17| 0| awhhe man.......|\n", + "| 18| 1| Feeling stran...|\n", + "| 19| 0| HUGE roll of ...|\n", + "| 20| 0| I just cut my...|\n", + "+------+---------+--------------------+\n", + "only showing top 20 rows\n", + "\n" + ] + } + ], "source": [ "#Load the input data to be annotated\n", "data = spark. \\\n", @@ -53,7 +69,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 3, "metadata": { "collapsed": true }, @@ -69,7 +85,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 4, "metadata": { "collapsed": true }, @@ -84,7 +100,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 5, "metadata": { "collapsed": true }, @@ -99,7 +115,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 6, "metadata": { "collapsed": true }, @@ -112,7 +128,7 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 7, "metadata": { "collapsed": true }, @@ -121,14 +137,17 @@ "### Spell Checker\n", "spell_checker = NorvigSweetingApproach() \\\n", " .setInputCols([\"normal\"]) \\\n", - " .setOutputCol(\"spell\")\n", + " .setOutputCol(\"spell\") \\\n", + " .setCorpusPath(\"./corpus/pos\")\n", + " #.setDictPath(\"/home/saif/IdeaProjects/spark-nlp/src/main/resources/spell/words.txt\") \\\n", + " #.setCorpusFormat(\"TXTDS\")\n", "\n", "#checked = spell_checker.fit(tokenized).transform(tokenized)" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 8, "metadata": { "collapsed": true }, @@ -137,14 +156,29 @@ "sentiment_detector = ViveknSentimentApproach() \\\n", " .setInputCols([\"spell\", \"sentence\"]) \\\n", " .setOutputCol(\"sentiment\") \\\n", - " .setPositiveSource(\"../../../src/test/resources/vivekn/positive\") \\\n", - " .setNegativeSource(\"../../../src/test/resources/vivekn/negative\") \\\n", - " .setPruneCorpus(False)\n" + " .setPruneCorpus(False) \\\n", + " .setPositiveSource(\"./corpus/pos\") \\\n", + " .setNegativeSource(\"./corpus/neg\")\n", + " #.setPositiveSource(\"../../../src/test/resources/vivekn/positive\") \\\n", + " #.setNegativeSource(\"../../../src/test/resources/vivekn/negative\") \\\n" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 9, + "metadata": { + "collapsed": true + }, + "outputs": [], + "source": [ + "pos = PerceptronApproach() \\\n", + " .setInputCols([\"sentence\", \"spell\"]) \\\n", + " .setOutputCol(\"pos\")" + ] + }, + { + "cell_type": "code", + "execution_count": 10, "metadata": { "collapsed": true }, @@ -157,9 +191,43 @@ }, { "cell_type": "code", - "execution_count": null, + "execution_count": 11, "metadata": {}, - "outputs": [], + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+------+--------------------+--------------------+\n", + "|itemid| text| finished_sentiment|\n", + "+------+--------------------+--------------------+\n", + "| 1| ...| result->negative|\n", + "| 2| ...| result->negative|\n", + "| 3| omg...| result->negative|\n", + "| 4| .. Omga...|result->positive@...|\n", + "| 5| i think ...|result->negative@...|\n", + "| 6| or i jus...| result->negative|\n", + "| 7| Juuuuuuuuu...| result->negative|\n", + "| 8| Sunny Agai...| result->negative|\n", + "| 9| handed in m...|result->negative@...|\n", + "| 10| hmmmm.... i...|result->negative@...|\n", + "| 11| I must thin...| result->negative|\n", + "| 12| thanks to a...| result->negative|\n", + "| 13| this weeken...| result->negative|\n", + "| 14| jb isnt show...| result->negative|\n", + "| 15| ok thats it ...| result->negative|\n", + "| 16| <-------- ...|result->positive@...|\n", + "| 17| awhhe man.......|result->negative@...|\n", + "| 18| Feeling stran...|result->negative@...|\n", + "| 19| HUGE roll of ...|result->negative@...|\n", + "| 20| I just cut my...|result->negative@...|\n", + "+------+--------------------+--------------------+\n", + "only showing top 20 rows\n", + "\n", + "Time elapsed pipeline process: 34.804439544677734\n" + ] + } + ], "source": [ "pipeline = Pipeline(stages=[\n", " document_assembler,\n", @@ -168,18 +236,34 @@ " normalizer,\n", " spell_checker,\n", " sentiment_detector,\n", + " pos,\n", " finisher\n", "])\n", "\n", + "start = time.time()\n", "sentiment_data = pipeline.fit(data).transform(data)\n", - "sentiment_data.show()" + "sentiment_data.show()\n", + "end = time.time()\n", + "print(\"Time elapsed pipeline process: \" + str(end - start))" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 12, "metadata": {}, - "outputs": [], + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Row(itemid=1, text=' is so sad for my APL friend.............', finished_sentiment='result->negative')\n", + "Row(itemid=2, text=' I missed the New Moon trailer...', finished_sentiment='result->negative')\n", + "Row(itemid=3, text=' omg its already 7:30 :O', finished_sentiment='result->negative')\n", + "Row(itemid=4, text=\" .. Omgaga. Im sooo im gunna CRy. I've been at this dentist since 11.. I was suposed 2 just get a crown put on (30mins)...\", finished_sentiment='result->positive@result->negative@result->negative@result->negative')\n", + "Row(itemid=5, text=' i think mi bf is cheating on me!!! T_T', finished_sentiment='result->negative@result->negative')\n" + ] + } + ], "source": [ "for r in sentiment_data.take(5):\n", " print(r)" @@ -187,35 +271,93 @@ }, { "cell_type": "code", - "execution_count": null, - "metadata": { - "collapsed": true - }, - "outputs": [], + "execution_count": 13, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Time elapsed in write pipelines: 40.02541160583496\n" + ] + } + ], "source": [ + "start = time.time()\n", "pipeline.write().overwrite().save(\"./ps\")\n", - "pipeline.fit(data).write().overwrite().save(\"./ms\")" + "pipeline.fit(data).write().overwrite().save(\"./ms\")\n", + "end = time.time()\n", + "print(\"Time elapsed in write pipelines: \" + str(end - start))" ] }, { "cell_type": "code", - "execution_count": null, - "metadata": { - "collapsed": true - }, - "outputs": [], + "execution_count": 16, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Time elapsed in read pipelines: 3.8598501682281494\n" + ] + } + ], "source": [ - "from pyspark.ml import Pipeline,PipelineModel" + "start = time.time()\n", + "p = Pipeline.read().load(\"./ps\")\n", + "pm = PipelineModel.read().load(\"./ms\")\n", + "end = time.time()\n", + "print(\"Time elapsed in read pipelines: \" + str(end - start))" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 17, "metadata": {}, - "outputs": [], + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "+------+--------------------+--------------------+\n", + "|itemid| text| finished_sentiment|\n", + "+------+--------------------+--------------------+\n", + "| 1| ...| result->negative|\n", + "| 2| ...| result->negative|\n", + "| 3| omg...| result->negative|\n", + "| 4| .. Omga...|result->positive@...|\n", + "| 5| i think ...|result->negative@...|\n", + "| 6| or i jus...| result->negative|\n", + "| 7| Juuuuuuuuu...| result->negative|\n", + "| 8| Sunny Agai...| result->negative|\n", + "| 9| handed in m...|result->negative@...|\n", + "| 10| hmmmm.... i...|result->negative@...|\n", + "| 11| I must thin...| result->negative|\n", + "| 12| thanks to a...| result->negative|\n", + "| 13| this weeken...| result->negative|\n", + "| 14| jb isnt show...| result->negative|\n", + "| 15| ok thats it ...| result->negative|\n", + "| 16| <-------- ...|result->positive@...|\n", + "| 17| awhhe man.......|result->negative@...|\n", + "| 18| Feeling stran...|result->negative@...|\n", + "| 19| HUGE roll of ...|result->negative@...|\n", + "| 20| I just cut my...|result->negative@...|\n", + "+------+--------------------+--------------------+\n", + "only showing top 20 rows\n", + "\n", + "1000\n", + "Time elapsed in using loaded pipelines: 5.094742298126221\n" + ] + } + ], "source": [ - "Pipeline.read().load(\"./ps\")\n", - "PipelineModel.read().load(\"./ms\")" + "start = time.time()\n", + "#pm.transform(data).where(\"finished_sentiment not like '%negative%'\").show()\n", + "pm.transform(data).show()\n", + "print(pm.transform(data).count())\n", + "end = time.time()\n", + "print(\"Time elapsed in using loaded pipelines: \" + str(end - start))" ] }, { diff --git a/src/main/scala/com/johnsnowlabs/nlp/HasFeatures.scala b/src/main/scala/com/johnsnowlabs/nlp/HasFeatures.scala index 81ba7a9063b929..b8738116d7a385 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/HasFeatures.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/HasFeatures.scala @@ -14,6 +14,12 @@ trait HasFeatures { protected def set[T](feature: StructFeature[T], value: T): this.type = {feature.setValue(Some(value)); this} + protected def setDefault[T](feature: ArrayFeature[T], value: Array[T]): this.type = {feature.setValue(Some(value)); this} + + protected def setDefault[K, V](feature: MapFeature[K, V], value: Map[K, V]): this.type = {feature.setValue(Some(value)); this} + + protected def setDefault[T](feature: StructFeature[T], value: T): this.type = {feature.setValue(Some(value)); this} + protected def get[T](feature: ArrayFeature[T]): Option[Array[T]] = feature.get protected def get[K, V](feature: MapFeature[K, V]): Option[Map[K, V]] = feature.get diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/Lemmatizer.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/Lemmatizer.scala index 6fd8979e9503e3..bcacba7d0addbf 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/Lemmatizer.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/Lemmatizer.scala @@ -40,7 +40,7 @@ class Lemmatizer(override val uid: String) extends AnnotatorModel[Lemmatizer] { setDefault(lemmaValSep, config.getString("nlp.lemmaDict.vSeparator")) if (config.getString("nlp.lemmaDict.file").nonEmpty) - set(lemmaDict, Lemmatizer.retrieveLemmaDict( + setDefault(lemmaDict, Lemmatizer.retrieveLemmaDict( config.getString("nlp.lemmaDict.file"), config.getString("nlp.lemmaDict.format"), config.getString("nlp.lemmaDict.kvSeparator"), diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala index 35f260af3bfad4..aae32f444ef9a8 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala @@ -25,7 +25,7 @@ class NerCrfModel(override val uid: String) def setModel(crf: LinearChainCrfModel): NerCrfModel = set(model, crf) def setDictionaryFeatures(dictFeatures: DictionaryFeatures): this.type = set(dictionaryFeatures, dictFeatures.dict) - set(dictionaryFeatures, Map.empty[String, String]) + setDefault(dictionaryFeatures, Map.empty[String, String]) def setEntities(toExtract: Array[String]): NerCrfModel = set(entities, toExtract) diff --git a/src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala b/src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala index 51085abda8fb9b..2d4c1d6f03a578 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala @@ -8,10 +8,10 @@ import org.apache.spark.sql.{Encoder, Encoders, SparkSession} import scala.reflect.ClassTag -abstract class Feature[Serializable1, Serializable2, TComplete](model: HasFeatures, val name: String, val description: String)(implicit val sparkSession: SparkSession = SparkSession.builder().getOrCreate()) extends Serializable { +abstract class Feature[Serializable1, Serializable2, TComplete: ClassTag](model: HasFeatures, val name: String, val description: String)(implicit val sparkSession: SparkSession = SparkSession.builder().getOrCreate()) extends Serializable { model.features.append(this) - final protected var value: Broadcast[Option[TComplete]] = sparkSession.sparkContext.broadcast[Option[TComplete]](None) + final protected var value: Option[Broadcast[TComplete]] = None def serialize(spark: SparkSession, path: String, field: String, value: TComplete): Unit @@ -23,10 +23,14 @@ abstract class Feature[Serializable1, Serializable2, TComplete](model: HasFeatur final protected def getFieldPath(path: String, field: String): Path = Path.mergePaths(new Path(path), new Path("/fields/" + field)) - final def get: Option[TComplete] = value.value - final def getValue: TComplete = value.value.getOrElse(throw new Exception(s"feature $name is not set")) - final def setValue(v: Option[Any]): HasFeatures = {value.unpersist(false); value = sparkSession.sparkContext.broadcast(Some(v.get.asInstanceOf[TComplete])); model} - final def isSet: Boolean = value.value.isDefined + final def get: Option[TComplete] = value.map(_.value) + final def getValue: TComplete = value.map(_.value).getOrElse(throw new Exception(s"feature $name is not set")) + final def setValue(v: Option[Any]): HasFeatures = { + if (isSet) value.get.destroy() + value = Some(sparkSession.sparkContext.broadcast[TComplete](v.get.asInstanceOf[TComplete])) + model + } + final def isSet: Boolean = value.isDefined } From de5d4a069d0fce778bee572fb7e9c61019023b58 Mon Sep 17 00:00:00 2001 From: Saif Addin Date: Mon, 8 Jan 2018 12:43:04 -0300 Subject: [PATCH 08/12] - Resource helper uses broadcast --- .../johnsnowlabs/nlp/AnnotatorApproach.scala | 2 +- .../nlp/annotators/Lemmatizer.scala | 2 +- .../nlp/annotators/ner/crf/NerCrfModel.scala | 4 +- .../pos/perceptron/PerceptronModel.scala | 2 +- .../sda/vivekn/ViveknSentimentModel.scala | 6 +-- .../spell/norvig/NorvigSweetingModel.scala | 4 +- .../nlp/serialization/Feature.scala | 48 ++++++++++++------- 7 files changed, 42 insertions(+), 26 deletions(-) diff --git a/src/main/scala/com/johnsnowlabs/nlp/AnnotatorApproach.scala b/src/main/scala/com/johnsnowlabs/nlp/AnnotatorApproach.scala index 9d6448cb03ff92..c1df9007d55434 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/AnnotatorApproach.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/AnnotatorApproach.scala @@ -18,7 +18,7 @@ abstract class AnnotatorApproach[M <: Model[M]] with HasInputAnnotationCols with HasOutputAnnotationCol with HasAnnotatorType - with ParamsAndFeaturesWritable { + with DefaultParamsWritable { val description: String diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/Lemmatizer.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/Lemmatizer.scala index bcacba7d0addbf..efdb6d6adc0961 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/Lemmatizer.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/Lemmatizer.scala @@ -25,7 +25,7 @@ class Lemmatizer(override val uid: String) extends AnnotatorModel[Lemmatizer] { private val config: Config = ConfigHelper.retrieve - val lemmaDict: MapFeature[String, String] = new MapFeature(this, "lemmaDict", "provide a lemma dictionary") + val lemmaDict: MapFeature[String, String] = new MapFeature(this, "lemmaDict") val lemmaFormat: Param[String] = new Param[String](this, "lemmaFormat", "TXT or TXTDS for reading dictionary as dataset") diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala index aae32f444ef9a8..8a3ab46b0ec32e 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala @@ -19,8 +19,8 @@ class NerCrfModel(override val uid: String) def this() = this(Identifiable.randomUID("NER")) val entities = new StringArrayParam(this, "entities", "List of Entities to recognize") - val model: StructFeature[LinearChainCrfModel] = new StructFeature[LinearChainCrfModel](this, "crfModel", "CRF Model") - val dictionaryFeatures: MapFeature[String, String] = new MapFeature[String, String](this, "dictionaryFeatures", "CRF Features dictionary") + val model: StructFeature[LinearChainCrfModel] = new StructFeature[LinearChainCrfModel](this, "crfModel") + val dictionaryFeatures: MapFeature[String, String] = new MapFeature[String, String](this, "dictionaryFeatures") def setModel(crf: LinearChainCrfModel): NerCrfModel = set(model, crf) diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/pos/perceptron/PerceptronModel.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/pos/perceptron/PerceptronModel.scala index f66daa3011a0da..2a90cb45e6cd14 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/pos/perceptron/PerceptronModel.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/pos/perceptron/PerceptronModel.scala @@ -19,7 +19,7 @@ class PerceptronModel(override val uid: String) extends AnnotatorModel[Perceptro private case class SentenceToBeTagged(tokenizedSentence: TokenizedSentence, start: Int, end: Int) val model: StructFeature[AveragedPerceptron] = - new StructFeature[AveragedPerceptron](this, "POS Model", "POS Tagging approach") + new StructFeature[AveragedPerceptron](this, "POS Model") override val annotatorType: AnnotatorType = POS diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/sda/vivekn/ViveknSentimentModel.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/sda/vivekn/ViveknSentimentModel.scala index b6bed97b1b56c0..cf60d22f167d74 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/sda/vivekn/ViveknSentimentModel.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/sda/vivekn/ViveknSentimentModel.scala @@ -21,9 +21,9 @@ class ViveknSentimentModel(override val uid: String) extends AnnotatorModel[Vive override val requiredAnnotatorTypes: Array[AnnotatorType] = Array(TOKEN, DOCUMENT) - protected val positive: MapFeature[String, Int] = new MapFeature(this, "positive_sentences", "positive sentences trained") - protected val negative: MapFeature[String, Int] = new MapFeature(this, "negative_sentences", "negative sentences trained") - protected val words: ArrayFeature[String] = new ArrayFeature(this, "words", "unique words trained") + protected val positive: MapFeature[String, Int] = new MapFeature(this, "positive_sentences") + protected val negative: MapFeature[String, Int] = new MapFeature(this, "negative_sentences") + protected val words: ArrayFeature[String] = new ArrayFeature(this, "words") protected val positiveTotals: IntParam = new IntParam(this, "positive_totals", "count of positive words") protected val negativeTotals: IntParam = new IntParam(this, "negative_totals", "count of negative words") diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/spell/norvig/NorvigSweetingModel.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/spell/norvig/NorvigSweetingModel.scala index c3bbb009d10b44..e12e0899eedb64 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/spell/norvig/NorvigSweetingModel.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/spell/norvig/NorvigSweetingModel.scala @@ -22,8 +22,8 @@ class NorvigSweetingModel(override val uid: String) extends AnnotatorModel[Norvi private val alphabet = "abcdefghijjklmnopqrstuvwxyz".toCharArray private val vowels = "aeiouy".toCharArray - protected val wordCount: MapFeature[String, Int] = new MapFeature(this, "wordCount", "words frequency in training") - protected val customDict: MapFeature[String, String] = new MapFeature(this, "customDict", "custom words in dictionary") + protected val wordCount: MapFeature[String, Int] = new MapFeature(this, "wordCount") + protected val customDict: MapFeature[String, String] = new MapFeature(this, "customDict") private val logger = LoggerFactory.getLogger("NorvigApproach") private val config: Config = ConfigFactory.load diff --git a/src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala b/src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala index 2d4c1d6f03a578..84389af85c7821 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala @@ -8,7 +8,7 @@ import org.apache.spark.sql.{Encoder, Encoders, SparkSession} import scala.reflect.ClassTag -abstract class Feature[Serializable1, Serializable2, TComplete: ClassTag](model: HasFeatures, val name: String, val description: String)(implicit val sparkSession: SparkSession = SparkSession.builder().getOrCreate()) extends Serializable { +abstract class Feature[Serializable1, Serializable2, TComplete: ClassTag](model: HasFeatures, val name: String)(implicit val sparkSession: SparkSession = SparkSession.builder().getOrCreate()) extends Serializable { model.features.append(this) final protected var value: Option[Broadcast[TComplete]] = None @@ -34,13 +34,13 @@ abstract class Feature[Serializable1, Serializable2, TComplete: ClassTag](model: } -class StructFeature[TValue: ClassTag](model: HasFeatures, override val name: String, override val description: String) - extends Feature[TValue, TValue, TValue](model, name, description) { +class StructFeature[TValue: ClassTag](model: HasFeatures, override val name: String) + extends Feature[TValue, TValue, TValue](model, name) { implicit val encoder: Encoder[TValue] = Encoders.kryo[TValue] override def serialize(spark: SparkSession, path: String, field: String, value: TValue): Unit = { - import spark.sqlContext.implicits._ + import spark.implicits._ val dataPath = getFieldPath(path, field) Seq(value.asInstanceOf[TValue]).toDS.write.mode("overwrite").parquet(dataPath.toString) } @@ -62,39 +62,55 @@ class StructFeature[TValue: ClassTag](model: HasFeatures, override val name: Str } -class MapFeature[TKey: ClassTag, TValue: ClassTag](model: HasFeatures, override val name: String, override val description: String) - extends Feature[TKey, TValue, Map[TKey, TValue]](model, name, description) { - - implicit val encoder: Encoder[Map[TKey, TValue]] = Encoders.kryo[Map[TKey, TValue]] +class MapFeature[TKey: ClassTag, TValue: ClassTag](model: HasFeatures, override val name: String) + extends Feature[TKey, TValue, Map[TKey, TValue]](model, name) { override def serialize(spark: SparkSession, path: String, field: String, value: Map[TKey, TValue]): Unit = { - import spark.sqlContext.implicits._ + import spark.implicits._ + //implicit val encoder: Encoder[(TKey, TValue)] = Encoders.tuple(Encoders.kryo[TKey], Encoders.kryo[TValue]) val dataPath = getFieldPath(path, field) - Seq(value).toDS.write.mode("overwrite").parquet(dataPath.toString) + //value.toSeq.toDS.as[(TKey, TValue)].write.mode("overwrite").parquet(dataPath.toString) + spark.sparkContext.parallelize(value.toSeq).saveAsObjectFile(dataPath.toString) } + + override def deserialize(spark: SparkSession, path: String, field: String): Option[Map[TKey, TValue]] = { + //implicit val encoder: Encoder[(TKey, TValue)] = Encoders.tuple(Encoders.kryo[TKey], Encoders.kryo[TValue]) val fs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration) val dataPath = getFieldPath(path, field) if (fs.exists(dataPath)) { - val loaded = spark.read.parquet(dataPath.toString) - loaded.as[Map[TKey, TValue]].collect.headOption + //val loaded = spark.read.parquet(dataPath.toString) + //Some(loaded.as[(TKey, TValue)].collect.toMap) + Some(spark.sparkContext.objectFile[(TKey, TValue)](dataPath.toString).collect.toMap) } else { None } } + + + /* + override def deserialize(spark: SparkSession, path: String, field: String): Option[Map[TKey, TValue]] = { + val k = new ArrayFeature[TKey](model, name+"_k") + val v = new ArrayFeature[TValue](model, name+"_v") + val ks = k.deserialize(spark, path+"_k", field+"_k") + val vs = v.deserialize(spark, path+"_v", field+"_v") + ks.map(kk => kk.zip(vs.get).toMap[TKey, TValue]) + } + */ + } -class ArrayFeature[TValue: ClassTag](model: HasFeatures, override val name: String, override val description: String) - extends Feature[TValue, TValue, Array[TValue]](model, name, description) { +class ArrayFeature[TValue: ClassTag](model: HasFeatures, override val name: String) + extends Feature[TValue, TValue, Array[TValue]](model, name) { implicit val encoder: Encoder[TValue] = Encoders.kryo[TValue] override def serialize(spark: SparkSession, path: String, field: String, value: Array[TValue]): Unit = { - import spark.sqlContext.implicits._ + import spark.implicits._ val dataPath = getFieldPath(path, field) - Seq(value.toSeq.toDS.write.mode("overwrite").parquet(dataPath.toString)) + value.toSeq.toDS.write.mode("overwrite").parquet(dataPath.toString) } override def deserialize(spark: SparkSession, path: String, field: String): Option[Array[TValue]] = { From 221e5138fd9f0ea9b5ca50738b1a998cccaaa130 Mon Sep 17 00:00:00 2001 From: Saif Addin Date: Mon, 8 Jan 2018 13:59:00 -0300 Subject: [PATCH 09/12] - Map optimizations - Dataset conversion improvements --- .../nlp/serialization/Feature.scala | 39 ++++--------------- 1 file changed, 8 insertions(+), 31 deletions(-) diff --git a/src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala b/src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala index 84389af85c7821..c42dac3eff1a50 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala @@ -40,21 +40,15 @@ class StructFeature[TValue: ClassTag](model: HasFeatures, override val name: Str implicit val encoder: Encoder[TValue] = Encoders.kryo[TValue] override def serialize(spark: SparkSession, path: String, field: String, value: TValue): Unit = { - import spark.implicits._ val dataPath = getFieldPath(path, field) - Seq(value.asInstanceOf[TValue]).toDS.write.mode("overwrite").parquet(dataPath.toString) + spark.createDataset(Seq(value)).write.mode("overwrite").parquet(dataPath.toString) } override def deserialize(spark: SparkSession, path: String, field: String): Option[TValue] = { val fs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration) val dataPath = getFieldPath(path, field) if (fs.exists(dataPath)) { - val loaded = spark.read.parquet(dataPath.toString) - import spark.implicits._ - loaded.schema.head.dataType match { - case ArrayType(StringType, _) => loaded.as[String].collect.headOption.map(_.asInstanceOf[TValue]) - case _ => loaded.as[TValue].collect.headOption - } + Some(spark.read.parquet(dataPath.toString).as[TValue].first) } else { None } @@ -65,41 +59,26 @@ class StructFeature[TValue: ClassTag](model: HasFeatures, override val name: Str class MapFeature[TKey: ClassTag, TValue: ClassTag](model: HasFeatures, override val name: String) extends Feature[TKey, TValue, Map[TKey, TValue]](model, name) { + implicit val encoder: Encoder[(TKey, TValue)] = Encoders.kryo[(TKey, TValue)] + override def serialize(spark: SparkSession, path: String, field: String, value: Map[TKey, TValue]): Unit = { import spark.implicits._ - //implicit val encoder: Encoder[(TKey, TValue)] = Encoders.tuple(Encoders.kryo[TKey], Encoders.kryo[TValue]) val dataPath = getFieldPath(path, field) - //value.toSeq.toDS.as[(TKey, TValue)].write.mode("overwrite").parquet(dataPath.toString) - spark.sparkContext.parallelize(value.toSeq).saveAsObjectFile(dataPath.toString) + value.toSeq.toDS.write.mode("overwrite").parquet(dataPath.toString) } override def deserialize(spark: SparkSession, path: String, field: String): Option[Map[TKey, TValue]] = { - //implicit val encoder: Encoder[(TKey, TValue)] = Encoders.tuple(Encoders.kryo[TKey], Encoders.kryo[TValue]) val fs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration) val dataPath = getFieldPath(path, field) if (fs.exists(dataPath)) { - //val loaded = spark.read.parquet(dataPath.toString) - //Some(loaded.as[(TKey, TValue)].collect.toMap) - Some(spark.sparkContext.objectFile[(TKey, TValue)](dataPath.toString).collect.toMap) + Some(spark.read.parquet(dataPath.toString).as[(TKey, TValue)].collect.toMap) } else { None } } - - - /* - override def deserialize(spark: SparkSession, path: String, field: String): Option[Map[TKey, TValue]] = { - val k = new ArrayFeature[TKey](model, name+"_k") - val v = new ArrayFeature[TValue](model, name+"_v") - val ks = k.deserialize(spark, path+"_k", field+"_k") - val vs = v.deserialize(spark, path+"_v", field+"_v") - ks.map(kk => kk.zip(vs.get).toMap[TKey, TValue]) - } - */ - } class ArrayFeature[TValue: ClassTag](model: HasFeatures, override val name: String) @@ -108,17 +87,15 @@ class ArrayFeature[TValue: ClassTag](model: HasFeatures, override val name: Stri implicit val encoder: Encoder[TValue] = Encoders.kryo[TValue] override def serialize(spark: SparkSession, path: String, field: String, value: Array[TValue]): Unit = { - import spark.implicits._ val dataPath = getFieldPath(path, field) - value.toSeq.toDS.write.mode("overwrite").parquet(dataPath.toString) + spark.createDataset(value).write.mode("overwrite").parquet(dataPath.toString) } override def deserialize(spark: SparkSession, path: String, field: String): Option[Array[TValue]] = { val fs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration) val dataPath = getFieldPath(path, field) if (fs.exists(dataPath)) { - val loaded = spark.read.parquet(dataPath.toString) - Some(loaded.as[TValue].collect) + Some(spark.read.parquet(dataPath.toString).as[TValue].collect) } else { None } From 150474db0a5f363396049c256c1abc428318a5b0 Mon Sep 17 00:00:00 2001 From: Saif Addin Date: Mon, 8 Jan 2018 14:03:08 -0300 Subject: [PATCH 10/12] - Clean notebook --- .../example/vivekn-sentiment/sentiment.ipynb | 186 +++--------------- 1 file changed, 25 insertions(+), 161 deletions(-) diff --git a/python/example/vivekn-sentiment/sentiment.ipynb b/python/example/vivekn-sentiment/sentiment.ipynb index d5d5fdd80f60d6..3aa72328ccce70 100644 --- a/python/example/vivekn-sentiment/sentiment.ipynb +++ b/python/example/vivekn-sentiment/sentiment.ipynb @@ -2,7 +2,7 @@ "cells": [ { "cell_type": "code", - "execution_count": 1, + "execution_count": null, "metadata": { "collapsed": true }, @@ -20,42 +20,9 @@ }, { "cell_type": "code", - "execution_count": 2, + "execution_count": null, "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "+------+---------+--------------------+\n", - "|itemid|sentiment| text|\n", - "+------+---------+--------------------+\n", - "| 1| 0| ...|\n", - "| 2| 0| ...|\n", - "| 3| 1| omg...|\n", - "| 4| 0| .. Omga...|\n", - "| 5| 0| i think ...|\n", - "| 6| 0| or i jus...|\n", - "| 7| 1| Juuuuuuuuu...|\n", - "| 8| 0| Sunny Agai...|\n", - "| 9| 1| handed in m...|\n", - "| 10| 1| hmmmm.... i...|\n", - "| 11| 0| I must thin...|\n", - "| 12| 1| thanks to a...|\n", - "| 13| 0| this weeken...|\n", - "| 14| 0| jb isnt show...|\n", - "| 15| 0| ok thats it ...|\n", - "| 16| 0| <-------- ...|\n", - "| 17| 0| awhhe man.......|\n", - "| 18| 1| Feeling stran...|\n", - "| 19| 0| HUGE roll of ...|\n", - "| 20| 0| I just cut my...|\n", - "+------+---------+--------------------+\n", - "only showing top 20 rows\n", - "\n" - ] - } - ], + "outputs": [], "source": [ "#Load the input data to be annotated\n", "data = spark. \\\n", @@ -69,7 +36,7 @@ }, { "cell_type": "code", - "execution_count": 3, + "execution_count": null, "metadata": { "collapsed": true }, @@ -85,7 +52,7 @@ }, { "cell_type": "code", - "execution_count": 4, + "execution_count": null, "metadata": { "collapsed": true }, @@ -100,7 +67,7 @@ }, { "cell_type": "code", - "execution_count": 5, + "execution_count": null, "metadata": { "collapsed": true }, @@ -115,7 +82,7 @@ }, { "cell_type": "code", - "execution_count": 6, + "execution_count": null, "metadata": { "collapsed": true }, @@ -128,7 +95,7 @@ }, { "cell_type": "code", - "execution_count": 7, + "execution_count": null, "metadata": { "collapsed": true }, @@ -137,17 +104,14 @@ "### Spell Checker\n", "spell_checker = NorvigSweetingApproach() \\\n", " .setInputCols([\"normal\"]) \\\n", - " .setOutputCol(\"spell\") \\\n", - " .setCorpusPath(\"./corpus/pos\")\n", - " #.setDictPath(\"/home/saif/IdeaProjects/spark-nlp/src/main/resources/spell/words.txt\") \\\n", - " #.setCorpusFormat(\"TXTDS\")\n", + " .setOutputCol(\"spell\")\n", "\n", "#checked = spell_checker.fit(tokenized).transform(tokenized)" ] }, { "cell_type": "code", - "execution_count": 8, + "execution_count": null, "metadata": { "collapsed": true }, @@ -157,15 +121,13 @@ " .setInputCols([\"spell\", \"sentence\"]) \\\n", " .setOutputCol(\"sentiment\") \\\n", " .setPruneCorpus(False) \\\n", - " .setPositiveSource(\"./corpus/pos\") \\\n", - " .setNegativeSource(\"./corpus/neg\")\n", - " #.setPositiveSource(\"../../../src/test/resources/vivekn/positive\") \\\n", - " #.setNegativeSource(\"../../../src/test/resources/vivekn/negative\") \\\n" + " .setPositiveSource(\"../../../src/test/resources/vivekn/positive\") \\\n", + " .setNegativeSource(\"../../../src/test/resources/vivekn/negative\") \\\n" ] }, { "cell_type": "code", - "execution_count": 9, + "execution_count": null, "metadata": { "collapsed": true }, @@ -178,7 +140,7 @@ }, { "cell_type": "code", - "execution_count": 10, + "execution_count": null, "metadata": { "collapsed": true }, @@ -191,43 +153,9 @@ }, { "cell_type": "code", - "execution_count": 11, + "execution_count": null, "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "+------+--------------------+--------------------+\n", - "|itemid| text| finished_sentiment|\n", - "+------+--------------------+--------------------+\n", - "| 1| ...| result->negative|\n", - "| 2| ...| result->negative|\n", - "| 3| omg...| result->negative|\n", - "| 4| .. Omga...|result->positive@...|\n", - "| 5| i think ...|result->negative@...|\n", - "| 6| or i jus...| result->negative|\n", - "| 7| Juuuuuuuuu...| result->negative|\n", - "| 8| Sunny Agai...| result->negative|\n", - "| 9| handed in m...|result->negative@...|\n", - "| 10| hmmmm.... i...|result->negative@...|\n", - "| 11| I must thin...| result->negative|\n", - "| 12| thanks to a...| result->negative|\n", - "| 13| this weeken...| result->negative|\n", - "| 14| jb isnt show...| result->negative|\n", - "| 15| ok thats it ...| result->negative|\n", - "| 16| <-------- ...|result->positive@...|\n", - "| 17| awhhe man.......|result->negative@...|\n", - "| 18| Feeling stran...|result->negative@...|\n", - "| 19| HUGE roll of ...|result->negative@...|\n", - "| 20| I just cut my...|result->negative@...|\n", - "+------+--------------------+--------------------+\n", - "only showing top 20 rows\n", - "\n", - "Time elapsed pipeline process: 34.804439544677734\n" - ] - } - ], + "outputs": [], "source": [ "pipeline = Pipeline(stages=[\n", " document_assembler,\n", @@ -249,21 +177,9 @@ }, { "cell_type": "code", - "execution_count": 12, + "execution_count": null, "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Row(itemid=1, text=' is so sad for my APL friend.............', finished_sentiment='result->negative')\n", - "Row(itemid=2, text=' I missed the New Moon trailer...', finished_sentiment='result->negative')\n", - "Row(itemid=3, text=' omg its already 7:30 :O', finished_sentiment='result->negative')\n", - "Row(itemid=4, text=\" .. Omgaga. Im sooo im gunna CRy. I've been at this dentist since 11.. I was suposed 2 just get a crown put on (30mins)...\", finished_sentiment='result->positive@result->negative@result->negative@result->negative')\n", - "Row(itemid=5, text=' i think mi bf is cheating on me!!! T_T', finished_sentiment='result->negative@result->negative')\n" - ] - } - ], + "outputs": [], "source": [ "for r in sentiment_data.take(5):\n", " print(r)" @@ -271,17 +187,9 @@ }, { "cell_type": "code", - "execution_count": 13, + "execution_count": null, "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Time elapsed in write pipelines: 40.02541160583496\n" - ] - } - ], + "outputs": [], "source": [ "start = time.time()\n", "pipeline.write().overwrite().save(\"./ps\")\n", @@ -292,17 +200,9 @@ }, { "cell_type": "code", - "execution_count": 16, + "execution_count": null, "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "Time elapsed in read pipelines: 3.8598501682281494\n" - ] - } - ], + "outputs": [], "source": [ "start = time.time()\n", "p = Pipeline.read().load(\"./ps\")\n", @@ -313,48 +213,12 @@ }, { "cell_type": "code", - "execution_count": 17, + "execution_count": null, "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "+------+--------------------+--------------------+\n", - "|itemid| text| finished_sentiment|\n", - "+------+--------------------+--------------------+\n", - "| 1| ...| result->negative|\n", - "| 2| ...| result->negative|\n", - "| 3| omg...| result->negative|\n", - "| 4| .. Omga...|result->positive@...|\n", - "| 5| i think ...|result->negative@...|\n", - "| 6| or i jus...| result->negative|\n", - "| 7| Juuuuuuuuu...| result->negative|\n", - "| 8| Sunny Agai...| result->negative|\n", - "| 9| handed in m...|result->negative@...|\n", - "| 10| hmmmm.... i...|result->negative@...|\n", - "| 11| I must thin...| result->negative|\n", - "| 12| thanks to a...| result->negative|\n", - "| 13| this weeken...| result->negative|\n", - "| 14| jb isnt show...| result->negative|\n", - "| 15| ok thats it ...| result->negative|\n", - "| 16| <-------- ...|result->positive@...|\n", - "| 17| awhhe man.......|result->negative@...|\n", - "| 18| Feeling stran...|result->negative@...|\n", - "| 19| HUGE roll of ...|result->negative@...|\n", - "| 20| I just cut my...|result->negative@...|\n", - "+------+--------------------+--------------------+\n", - "only showing top 20 rows\n", - "\n", - "1000\n", - "Time elapsed in using loaded pipelines: 5.094742298126221\n" - ] - } - ], + "outputs": [], "source": [ "start = time.time()\n", - "#pm.transform(data).where(\"finished_sentiment not like '%negative%'\").show()\n", - "pm.transform(data).show()\n", + "pm.transform(data).where(\"finished_sentiment not like '%negative%'\").show()\n", "print(pm.transform(data).count())\n", "end = time.time()\n", "print(\"Time elapsed in using loaded pipelines: \" + str(end - start))" From 247ec17a600c19709e207f3d9f54f51ca506f6be Mon Sep 17 00:00:00 2001 From: Saif Addin Date: Mon, 8 Jan 2018 15:43:16 -0300 Subject: [PATCH 11/12] Merge branch 'master' into serialization_features_bc # Conflicts: # src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala --- src/main/scala/com/johnsnowlabs/nlp/AnnotatorModel.scala | 1 - .../com/johnsnowlabs/nlp/ParamsAndFeaturesReadable.scala | 7 +++++++ .../com/johnsnowlabs/nlp/ParamsAndFeaturesWritable.scala | 7 +++++++ .../nlp/annotators/ner/crf/NerCrfModel.scala | 7 +++---- .../nlp/embeddings/ModelWithWordEmbeddings.scala | 9 +++++---- .../com/johnsnowlabs/nlp/serialization/Feature.scala | 7 ++++++- 6 files changed, 28 insertions(+), 10 deletions(-) diff --git a/src/main/scala/com/johnsnowlabs/nlp/AnnotatorModel.scala b/src/main/scala/com/johnsnowlabs/nlp/AnnotatorModel.scala index 7dad564e48bd3c..d3ed36a2daff82 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/AnnotatorModel.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/AnnotatorModel.scala @@ -2,7 +2,6 @@ package com.johnsnowlabs.nlp import org.apache.spark.ml.Model import org.apache.spark.ml.param.ParamMap -import org.apache.spark.ml.util.DefaultParamsWritable import org.apache.spark.sql.expressions.UserDefinedFunction import org.apache.spark.sql.{DataFrame, Dataset, Row} import org.apache.spark.sql.types._ diff --git a/src/main/scala/com/johnsnowlabs/nlp/ParamsAndFeaturesReadable.scala b/src/main/scala/com/johnsnowlabs/nlp/ParamsAndFeaturesReadable.scala index 55fce96668c8c7..a0e239d217e29a 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/ParamsAndFeaturesReadable.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/ParamsAndFeaturesReadable.scala @@ -1,5 +1,6 @@ package com.johnsnowlabs.nlp +import com.johnsnowlabs.nlp.embeddings.ModelWithWordEmbeddings import org.apache.spark.ml.util.{DefaultParamsReadable, MLReader} class FeaturesReader[T <: HasFeatures](baseReader: MLReader[T]) extends MLReader[T] { @@ -12,6 +13,12 @@ class FeaturesReader[T <: HasFeatures](baseReader: MLReader[T]) extends MLReader val value = feature.deserialize(sparkSession, path, feature.name) feature.setValue(value) } + + instance match { + case m: ModelWithWordEmbeddings[_] => m.deserializeEmbeddings(path, sparkSession.sparkContext) + case _ => + } + instance } } diff --git a/src/main/scala/com/johnsnowlabs/nlp/ParamsAndFeaturesWritable.scala b/src/main/scala/com/johnsnowlabs/nlp/ParamsAndFeaturesWritable.scala index 976935c8e02774..e591bbd64a8b50 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/ParamsAndFeaturesWritable.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/ParamsAndFeaturesWritable.scala @@ -1,5 +1,6 @@ package com.johnsnowlabs.nlp +import com.johnsnowlabs.nlp.embeddings.ModelWithWordEmbeddings import org.apache.spark.ml.param.Params import org.apache.spark.ml.util.{DefaultParamsWritable, MLWriter} @@ -11,6 +12,12 @@ class FeaturesWriter[T](annotatorWithFeatures: HasFeatures, baseWriter: MLWriter for (feature <- annotatorWithFeatures.features) { feature.serializeInfer(sparkSession, path, feature.name, feature.getValue) } + + annotatorWithFeatures match { + case m: ModelWithWordEmbeddings[_] => m.serializeEmbeddings(path, sparkSession.sparkContext) + case _ => + } + } } diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala index 08985608cfebe9..a6f776ee6ea0de 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala @@ -5,10 +5,9 @@ import com.johnsnowlabs.nlp.AnnotatorType._ import com.johnsnowlabs.nlp.annotators.common.{IndexedTaggedWord, NerTagged, PosTagged, TaggedSentence} import com.johnsnowlabs.nlp.annotators.common.Annotated.{NerTaggedSentence, PosTaggedSentence} import com.johnsnowlabs.nlp.serialization.{MapFeature, StructFeature} -import com.johnsnowlabs.nlp.{Annotation, AnnotatorModel, ParamsAndFeaturesReadable} +import com.johnsnowlabs.nlp.ParamsAndFeaturesReadable import com.johnsnowlabs.nlp.embeddings.ModelWithWordEmbeddings -import com.johnsnowlabs.nlp.{Annotation, AnnotatorModel} -import org.apache.hadoop.fs.Path +import com.johnsnowlabs.nlp.Annotation import org.apache.spark.ml.param.StringArrayParam import org.apache.spark.ml.util._ @@ -41,7 +40,7 @@ class NerCrfModel(override val uid: String) extends ModelWithWordEmbeddings[NerC val crf = $$(model) - val fg = FeatureGenerator(dictionaryFeatures, embeddings) + val fg = FeatureGenerator(new DictionaryFeatures($$(dictionaryFeatures)), embeddings) sentences.map{sentence => val instance = fg.generate(sentence, crf.metadata) val labelIds = crf.predict(instance) diff --git a/src/main/scala/com/johnsnowlabs/nlp/embeddings/ModelWithWordEmbeddings.scala b/src/main/scala/com/johnsnowlabs/nlp/embeddings/ModelWithWordEmbeddings.scala index 8fe438d8a5d93f..cfc3e04649e236 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/embeddings/ModelWithWordEmbeddings.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/embeddings/ModelWithWordEmbeddings.scala @@ -22,8 +22,8 @@ abstract class ModelWithWordEmbeddings[M <: ModelWithWordEmbeddings[M]] val nDims = new IntParam(this, "nDims", "Number of embedding dimensions") val indexPath = new Param[String](this, "indexPath", "File that stores Index") - def setDims(nDims: Int) = set(this.nDims, nDims).asInstanceOf[M] - def setIndexPath(path: String) = set(this.indexPath, path).asInstanceOf[M] + def setDims(nDims: Int): this.type = set(this.nDims, nDims) + def setIndexPath(path: String): this.type = set(this.indexPath, path) lazy val embeddings: Option[WordEmbeddings] = get(indexPath).map { path => // Have to copy file because RockDB changes it and Spark rises Exception @@ -54,7 +54,7 @@ abstract class ModelWithWordEmbeddings[M <: ModelWithWordEmbeddings[M]] val src = getEmbeddingsSerializedPath(path) // 1. Copy to local file - val localPath = WordEmbeddingsClusterHelper.createLocalPath + val localPath = WordEmbeddingsClusterHelper.createLocalPath() if (fs.exists(src)) { fs.copyToLocalFile(src, new Path(localPath)) @@ -80,5 +80,6 @@ abstract class ModelWithWordEmbeddings[M <: ModelWithWordEmbeddings[M]] } } - def getEmbeddingsSerializedPath(path: String) = Path.mergePaths(new Path(path), new Path("/embeddings")) + def getEmbeddingsSerializedPath(path: String): Path = Path.mergePaths(new Path(path), new Path("/embeddings")) + } diff --git a/src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala b/src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala index c42dac3eff1a50..da916d90f31bf5 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala @@ -1,9 +1,14 @@ package com.johnsnowlabs.nlp.serialization +import java.io.File +import java.nio.file.{Files, Paths} + import com.johnsnowlabs.nlp.HasFeatures +import com.johnsnowlabs.nlp.embeddings.{ModelWithWordEmbeddings, WordEmbeddings, WordEmbeddingsClusterHelper} import org.apache.hadoop.fs.{FileSystem, Path} +import org.apache.spark.{SparkContext, SparkFiles} import org.apache.spark.broadcast.Broadcast -import org.apache.spark.sql.types.{ArrayType, StringType} +import org.apache.spark.ml.param.{ParamMap, Params} import org.apache.spark.sql.{Encoder, Encoders, SparkSession} import scala.reflect.ClassTag From 8208245ed3ba5248b50ffdecc1158efa8de8637e Mon Sep 17 00:00:00 2001 From: Saif Addin Date: Mon, 8 Jan 2018 18:05:30 -0300 Subject: [PATCH 12/12] Merge branch 'master' into serialization_features_bc # Conflicts: # src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala --- .../nlp/ParamsAndFeaturesReadable.scala | 17 ++++++++++------- .../nlp/ParamsAndFeaturesWritable.scala | 18 +++++++++++------- .../nlp/annotators/ner/crf/NerCrfModel.scala | 5 ++--- .../nlp/embeddings/EmbeddingsReadable.scala | 10 ++++++++++ .../embeddings/ModelWithWordEmbeddings.scala | 5 +++++ 5 files changed, 38 insertions(+), 17 deletions(-) create mode 100644 src/main/scala/com/johnsnowlabs/nlp/embeddings/EmbeddingsReadable.scala diff --git a/src/main/scala/com/johnsnowlabs/nlp/ParamsAndFeaturesReadable.scala b/src/main/scala/com/johnsnowlabs/nlp/ParamsAndFeaturesReadable.scala index a0e239d217e29a..6194d743ad7ff1 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/ParamsAndFeaturesReadable.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/ParamsAndFeaturesReadable.scala @@ -1,9 +1,9 @@ package com.johnsnowlabs.nlp -import com.johnsnowlabs.nlp.embeddings.ModelWithWordEmbeddings import org.apache.spark.ml.util.{DefaultParamsReadable, MLReader} +import org.apache.spark.sql.SparkSession -class FeaturesReader[T <: HasFeatures](baseReader: MLReader[T]) extends MLReader[T] { +class FeaturesReader[T <: HasFeatures](baseReader: MLReader[T], onRead: (T, String, SparkSession) => Unit) extends MLReader[T] { override def load(path: String): T = { @@ -14,15 +14,18 @@ class FeaturesReader[T <: HasFeatures](baseReader: MLReader[T]) extends MLReader feature.setValue(value) } - instance match { - case m: ModelWithWordEmbeddings[_] => m.deserializeEmbeddings(path, sparkSession.sparkContext) - case _ => - } + onRead(instance, path, sparkSession) instance } } trait ParamsAndFeaturesReadable[T <: HasFeatures] extends DefaultParamsReadable[T] { - override def read: MLReader[T] = new FeaturesReader(super.read) + + def onRead(instance: T, path: String, spark: SparkSession): Unit = {} + + override def read: MLReader[T] = new FeaturesReader( + super.read, + (instance: T, path: String, spark: SparkSession) => onRead(instance, path, spark) + ) } diff --git a/src/main/scala/com/johnsnowlabs/nlp/ParamsAndFeaturesWritable.scala b/src/main/scala/com/johnsnowlabs/nlp/ParamsAndFeaturesWritable.scala index e591bbd64a8b50..aac623b487c02d 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/ParamsAndFeaturesWritable.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/ParamsAndFeaturesWritable.scala @@ -1,10 +1,11 @@ package com.johnsnowlabs.nlp -import com.johnsnowlabs.nlp.embeddings.ModelWithWordEmbeddings import org.apache.spark.ml.param.Params import org.apache.spark.ml.util.{DefaultParamsWritable, MLWriter} +import org.apache.spark.sql.SparkSession -class FeaturesWriter[T](annotatorWithFeatures: HasFeatures, baseWriter: MLWriter) extends MLWriter with HasFeatures { +class FeaturesWriter[T](annotatorWithFeatures: HasFeatures, baseWriter: MLWriter, onWritten: (String, SparkSession) => Unit) + extends MLWriter with HasFeatures { override protected def saveImpl(path: String): Unit = { baseWriter.save(path) @@ -13,16 +14,19 @@ class FeaturesWriter[T](annotatorWithFeatures: HasFeatures, baseWriter: MLWriter feature.serializeInfer(sparkSession, path, feature.name, feature.getValue) } - annotatorWithFeatures match { - case m: ModelWithWordEmbeddings[_] => m.serializeEmbeddings(path, sparkSession.sparkContext) - case _ => - } + onWritten(path, sparkSession) } } trait ParamsAndFeaturesWritable extends DefaultParamsWritable with Params with HasFeatures { - override def write: MLWriter = new FeaturesWriter(this, super.write) + def onWritten(path: String, spark: SparkSession): Unit = {} + + override def write: MLWriter = new FeaturesWriter( + this, + super.write, + (path: String, spark: SparkSession) => onWritten(path, spark) + ) } diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala index a6f776ee6ea0de..d940f6b96fedb1 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala @@ -5,8 +5,7 @@ import com.johnsnowlabs.nlp.AnnotatorType._ import com.johnsnowlabs.nlp.annotators.common.{IndexedTaggedWord, NerTagged, PosTagged, TaggedSentence} import com.johnsnowlabs.nlp.annotators.common.Annotated.{NerTaggedSentence, PosTaggedSentence} import com.johnsnowlabs.nlp.serialization.{MapFeature, StructFeature} -import com.johnsnowlabs.nlp.ParamsAndFeaturesReadable -import com.johnsnowlabs.nlp.embeddings.ModelWithWordEmbeddings +import com.johnsnowlabs.nlp.embeddings.{EmbeddingsReadable, ModelWithWordEmbeddings} import com.johnsnowlabs.nlp.Annotation import org.apache.spark.ml.param.StringArrayParam import org.apache.spark.ml.util._ @@ -75,4 +74,4 @@ class NerCrfModel(override val uid: String) extends ModelWithWordEmbeddings[NerC } -object NerCrfModel extends ParamsAndFeaturesReadable[NerCrfModel] +object NerCrfModel extends EmbeddingsReadable[NerCrfModel] diff --git a/src/main/scala/com/johnsnowlabs/nlp/embeddings/EmbeddingsReadable.scala b/src/main/scala/com/johnsnowlabs/nlp/embeddings/EmbeddingsReadable.scala new file mode 100644 index 00000000000000..3049abac384572 --- /dev/null +++ b/src/main/scala/com/johnsnowlabs/nlp/embeddings/EmbeddingsReadable.scala @@ -0,0 +1,10 @@ +package com.johnsnowlabs.nlp.embeddings + +import com.johnsnowlabs.nlp.ParamsAndFeaturesReadable +import org.apache.spark.sql.SparkSession + +trait EmbeddingsReadable[T <: ModelWithWordEmbeddings[_]] extends ParamsAndFeaturesReadable[T] { + override def onRead(instance: T, path: String, spark: SparkSession): Unit = { + instance.deserializeEmbeddings(path, spark.sparkContext) + } +} diff --git a/src/main/scala/com/johnsnowlabs/nlp/embeddings/ModelWithWordEmbeddings.scala b/src/main/scala/com/johnsnowlabs/nlp/embeddings/ModelWithWordEmbeddings.scala index cfc3e04649e236..4319c5899e0a7a 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/embeddings/ModelWithWordEmbeddings.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/embeddings/ModelWithWordEmbeddings.scala @@ -8,6 +8,7 @@ import org.apache.hadoop.fs.{FileSystem, Path} import org.apache.ivy.util.FileUtil import org.apache.spark.{SparkContext, SparkFiles} import org.apache.spark.ml.param.{IntParam, Param} +import org.apache.spark.sql.SparkSession /** @@ -82,4 +83,8 @@ abstract class ModelWithWordEmbeddings[M <: ModelWithWordEmbeddings[M]] def getEmbeddingsSerializedPath(path: String): Path = Path.mergePaths(new Path(path), new Path("/embeddings")) + override def onWritten(path: String, spark: SparkSession): Unit = { + deserializeEmbeddings(path, spark.sparkContext) + } + }