From 52d95db8e4cf6d9ecd15632b44215c333a0b8108 Mon Sep 17 00:00:00 2001 From: Saif Addin Date: Thu, 11 Jan 2018 18:32:13 -0300 Subject: [PATCH 1/3] spark-packages published --- .../nlp/serialization/Feature.scala | 20 ++++++++++++------- 1 file changed, 13 insertions(+), 7 deletions(-) diff --git a/src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala b/src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala index da916d90f31bf5..505b408114047e 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala @@ -46,14 +46,16 @@ class StructFeature[TValue: ClassTag](model: HasFeatures, override val name: Str override def serialize(spark: SparkSession, path: String, field: String, value: TValue): Unit = { val dataPath = getFieldPath(path, field) - spark.createDataset(Seq(value)).write.mode("overwrite").parquet(dataPath.toString) + //spark.createDataset(Seq(value)).write.mode("overwrite").parquet(dataPath.toString) + spark.sparkContext.parallelize(Seq(value)).saveAsObjectFile(dataPath.toString) } override def deserialize(spark: SparkSession, path: String, field: String): Option[TValue] = { val fs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration) val dataPath = getFieldPath(path, field) if (fs.exists(dataPath)) { - Some(spark.read.parquet(dataPath.toString).as[TValue].first) + //Some(spark.read.parquet(dataPath.toString).as[TValue].first) + Some(spark.sparkContext.objectFile[TValue](dataPath.toString).first) } else { None } @@ -67,9 +69,10 @@ class MapFeature[TKey: ClassTag, TValue: ClassTag](model: HasFeatures, override implicit val encoder: Encoder[(TKey, TValue)] = Encoders.kryo[(TKey, TValue)] override def serialize(spark: SparkSession, path: String, field: String, value: Map[TKey, TValue]): Unit = { - import spark.implicits._ + //import spark.implicits._ val dataPath = getFieldPath(path, field) - value.toSeq.toDS.write.mode("overwrite").parquet(dataPath.toString) + //value.toSeq.toDS.write.mode("overwrite").parquet(dataPath.toString) + spark.sparkContext.parallelize(value.toSeq).saveAsObjectFile(dataPath.toString) } @@ -78,7 +81,8 @@ class MapFeature[TKey: ClassTag, TValue: ClassTag](model: HasFeatures, override val fs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration) val dataPath = getFieldPath(path, field) if (fs.exists(dataPath)) { - Some(spark.read.parquet(dataPath.toString).as[(TKey, TValue)].collect.toMap) + //Some(spark.read.parquet(dataPath.toString).as[(TKey, TValue)].collect.toMap) + Some(spark.sparkContext.objectFile[(TKey, TValue)](dataPath.toString).collect.toMap) } else { None } @@ -93,14 +97,16 @@ class ArrayFeature[TValue: ClassTag](model: HasFeatures, override val name: Stri override def serialize(spark: SparkSession, path: String, field: String, value: Array[TValue]): Unit = { val dataPath = getFieldPath(path, field) - spark.createDataset(value).write.mode("overwrite").parquet(dataPath.toString) + //spark.createDataset(value).write.mode("overwrite").parquet(dataPath.toString) + spark.sparkContext.parallelize(value).saveAsObjectFile(dataPath.toString) } override def deserialize(spark: SparkSession, path: String, field: String): Option[Array[TValue]] = { val fs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration) val dataPath = getFieldPath(path, field) if (fs.exists(dataPath)) { - Some(spark.read.parquet(dataPath.toString).as[TValue].collect) + //Some(spark.read.parquet(dataPath.toString).as[TValue].collect) + Some(spark.sparkContext.objectFile[TValue](dataPath.toString).collect()) } else { None } From 54fbe59df5533bd3ff4111f1491ccaba57cc43fe Mon Sep 17 00:00:00 2001 From: Saif Addin Date: Thu, 11 Jan 2018 19:20:13 -0300 Subject: [PATCH 2/3] spark-packages published --- .../nlp/serialization/Feature.scala | 139 ++++++++++++++---- src/test/resources/application.conf | 3 +- 2 files changed, 114 insertions(+), 28 deletions(-) diff --git a/src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala b/src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala index 505b408114047e..2b90cda96d9d32 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala @@ -1,14 +1,9 @@ package com.johnsnowlabs.nlp.serialization -import java.io.File -import java.nio.file.{Files, Paths} - import com.johnsnowlabs.nlp.HasFeatures -import com.johnsnowlabs.nlp.embeddings.{ModelWithWordEmbeddings, WordEmbeddings, WordEmbeddingsClusterHelper} +import com.johnsnowlabs.nlp.util.ConfigHelper import org.apache.hadoop.fs.{FileSystem, Path} -import org.apache.spark.{SparkContext, SparkFiles} import org.apache.spark.broadcast.Broadcast -import org.apache.spark.ml.param.{ParamMap, Params} import org.apache.spark.sql.{Encoder, Encoders, SparkSession} import scala.reflect.ClassTag @@ -16,26 +11,75 @@ import scala.reflect.ClassTag abstract class Feature[Serializable1, Serializable2, TComplete: ClassTag](model: HasFeatures, val name: String)(implicit val sparkSession: SparkSession = SparkSession.builder().getOrCreate()) extends Serializable { model.features.append(this) - final protected var value: Option[Broadcast[TComplete]] = None + private val config = ConfigHelper.retrieve + + val serializationMode: String = config.getString("performance.serialization") + val useBroadcast: Boolean = config.getBoolean("performance.useBroadcast") - def serialize(spark: SparkSession, path: String, field: String, value: TComplete): Unit + final protected var broadcastValue: Option[Broadcast[TComplete]] = None + final protected var rawValue: Option[TComplete] = None + + final def serialize(spark: SparkSession, path: String, field: String, value: TComplete): Unit = { + serializationMode match { + case "dataset" => serializeDataset(spark, path, field, value) + case "object" => serializeObject(spark, path, field, value) + case _ => throw new IllegalArgumentException("Illegal performance.serialization setting. Can be 'dataset' or 'object'") + } + } final def serializeInfer(spark: SparkSession, path: String, field: String, value: Any): Unit = serialize(spark, path, field, value.asInstanceOf[TComplete]) - def deserialize(spark: SparkSession, path: String, field: String): Option[_] + final def deserialize(spark: SparkSession, path: String, field: String): Option[_] = { + if (broadcastValue.isDefined || rawValue.isDefined) + throw new Exception(s"Trying de deserialize an already set value for ${this.name}") + serializationMode match { + case "dataset" => deserializeDataset(spark, path, field) + case "object" => deserializeObject(spark, path, field) + case _ => throw new IllegalArgumentException("Illegal performance.serialization setting. Can be 'dataset' or 'object'") + } + } + + protected def serializeDataset(spark: SparkSession, path: String, field: String, value: TComplete): Unit + + protected def deserializeDataset(spark: SparkSession, path: String, field: String): Option[_] + + protected def serializeObject(spark: SparkSession, path: String, field: String, value: TComplete): Unit + + protected def deserializeObject(spark: SparkSession, path: String, field: String): Option[_] final protected def getFieldPath(path: String, field: String): Path = Path.mergePaths(new Path(path), new Path("/fields/" + field)) - final def get: Option[TComplete] = value.map(_.value) - final def getValue: TComplete = value.map(_.value).getOrElse(throw new Exception(s"feature $name is not set")) + final def get: Option[TComplete] = { + if (useBroadcast) + broadcastValue.map(_.value) + else + rawValue + } + + final def getValue: TComplete = { + if (useBroadcast) + broadcastValue.map(_.value).getOrElse(throw new Exception(s"feature $name is not set")) + else + rawValue.getOrElse(throw new Exception(s"feature $name is not set")) + } + final def setValue(v: Option[Any]): HasFeatures = { - if (isSet) value.get.destroy() - value = Some(sparkSession.sparkContext.broadcast[TComplete](v.get.asInstanceOf[TComplete])) + if (useBroadcast) { + if (isSet) broadcastValue.get.destroy() + broadcastValue = Some(sparkSession.sparkContext.broadcast[TComplete](v.get.asInstanceOf[TComplete])) + } else { + rawValue = Some(v.get.asInstanceOf[TComplete]) + } model } - final def isSet: Boolean = value.isDefined + final def isSet: Boolean = { + if (useBroadcast) + broadcastValue.isDefined + else + rawValue.isDefined + } } @@ -44,23 +88,36 @@ class StructFeature[TValue: ClassTag](model: HasFeatures, override val name: Str implicit val encoder: Encoder[TValue] = Encoders.kryo[TValue] - override def serialize(spark: SparkSession, path: String, field: String, value: TValue): Unit = { + override def serializeObject(spark: SparkSession, path: String, field: String, value: TValue): Unit = { val dataPath = getFieldPath(path, field) - //spark.createDataset(Seq(value)).write.mode("overwrite").parquet(dataPath.toString) spark.sparkContext.parallelize(Seq(value)).saveAsObjectFile(dataPath.toString) } - override def deserialize(spark: SparkSession, path: String, field: String): Option[TValue] = { + override def deserializeObject(spark: SparkSession, path: String, field: String): Option[TValue] = { val fs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration) val dataPath = getFieldPath(path, field) if (fs.exists(dataPath)) { - //Some(spark.read.parquet(dataPath.toString).as[TValue].first) Some(spark.sparkContext.objectFile[TValue](dataPath.toString).first) } else { None } } + override def serializeDataset(spark: SparkSession, path: String, field: String, value: TValue): Unit = { + val dataPath = getFieldPath(path, field) + spark.createDataset(Seq(value)).write.mode("overwrite").parquet(dataPath.toString) + } + + override def deserializeDataset(spark: SparkSession, path: String, field: String): Option[TValue] = { + val fs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration) + val dataPath = getFieldPath(path, field) + if (fs.exists(dataPath)) { + Some(spark.read.parquet(dataPath.toString).as[TValue].first) + } else { + None + } + } + } class MapFeature[TKey: ClassTag, TValue: ClassTag](model: HasFeatures, override val name: String) @@ -68,26 +125,41 @@ class MapFeature[TKey: ClassTag, TValue: ClassTag](model: HasFeatures, override implicit val encoder: Encoder[(TKey, TValue)] = Encoders.kryo[(TKey, TValue)] - override def serialize(spark: SparkSession, path: String, field: String, value: Map[TKey, TValue]): Unit = { - //import spark.implicits._ + override def serializeObject(spark: SparkSession, path: String, field: String, value: Map[TKey, TValue]): Unit = { val dataPath = getFieldPath(path, field) - //value.toSeq.toDS.write.mode("overwrite").parquet(dataPath.toString) spark.sparkContext.parallelize(value.toSeq).saveAsObjectFile(dataPath.toString) } - override def deserialize(spark: SparkSession, path: String, field: String): Option[Map[TKey, TValue]] = { + override def deserializeObject(spark: SparkSession, path: String, field: String): Option[Map[TKey, TValue]] = { val fs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration) val dataPath = getFieldPath(path, field) if (fs.exists(dataPath)) { - //Some(spark.read.parquet(dataPath.toString).as[(TKey, TValue)].collect.toMap) Some(spark.sparkContext.objectFile[(TKey, TValue)](dataPath.toString).collect.toMap) } else { None } } + override def serializeDataset(spark: SparkSession, path: String, field: String, value: Map[TKey, TValue]): Unit = { + import spark.implicits._ + val dataPath = getFieldPath(path, field) + value.toSeq.toDS.write.mode("overwrite").parquet(dataPath.toString) + } + + + + override def deserializeDataset(spark: SparkSession, path: String, field: String): Option[Map[TKey, TValue]] = { + val fs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration) + val dataPath = getFieldPath(path, field) + if (fs.exists(dataPath)) { + Some(spark.read.parquet(dataPath.toString).as[(TKey, TValue)].collect.toMap) + } else { + None + } + } + } class ArrayFeature[TValue: ClassTag](model: HasFeatures, override val name: String) @@ -95,22 +167,35 @@ class ArrayFeature[TValue: ClassTag](model: HasFeatures, override val name: Stri implicit val encoder: Encoder[TValue] = Encoders.kryo[TValue] - override def serialize(spark: SparkSession, path: String, field: String, value: Array[TValue]): Unit = { + override def serializeObject(spark: SparkSession, path: String, field: String, value: Array[TValue]): Unit = { val dataPath = getFieldPath(path, field) - //spark.createDataset(value).write.mode("overwrite").parquet(dataPath.toString) spark.sparkContext.parallelize(value).saveAsObjectFile(dataPath.toString) } - override def deserialize(spark: SparkSession, path: String, field: String): Option[Array[TValue]] = { + override def deserializeObject(spark: SparkSession, path: String, field: String): Option[Array[TValue]] = { val fs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration) val dataPath = getFieldPath(path, field) if (fs.exists(dataPath)) { - //Some(spark.read.parquet(dataPath.toString).as[TValue].collect) Some(spark.sparkContext.objectFile[TValue](dataPath.toString).collect()) } else { None } } + override def serializeDataset(spark: SparkSession, path: String, field: String, value: Array[TValue]): Unit = { + val dataPath = getFieldPath(path, field) + spark.createDataset(value).write.mode("overwrite").parquet(dataPath.toString) + } + + override def deserializeDataset(spark: SparkSession, path: String, field: String): Option[Array[TValue]] = { + val fs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration) + val dataPath = getFieldPath(path, field) + if (fs.exists(dataPath)) { + Some(spark.read.parquet(dataPath.toString).as[TValue].collect) + } else { + None + } + } + } diff --git a/src/test/resources/application.conf b/src/test/resources/application.conf index c65ea165f0ffb8..06d7e40dc04932 100644 --- a/src/test/resources/application.conf +++ b/src/test/resources/application.conf @@ -38,5 +38,6 @@ nlp { } performance { - useBroadcast = false + serialization = "object" + useBroadcast = true } From a8a92adc0db092874abca802521b89e53582626d Mon Sep 17 00:00:00 2001 From: Saif Addin Date: Fri, 12 Jan 2018 01:13:27 -0300 Subject: [PATCH 3/3] - Added fallback option for lazy defaults - Added defaults to application.conf --- src/main/resources/application.conf | 3 ++- .../com/johnsnowlabs/nlp/HasFeatures.scala | 6 ++--- .../nlp/annotators/Lemmatizer.scala | 2 +- .../nlp/annotators/ner/crf/NerCrfModel.scala | 2 +- .../nlp/serialization/Feature.scala | 24 +++++++++---------- src/test/resources/application.conf | 3 +-- 6 files changed, 19 insertions(+), 21 deletions(-) diff --git a/src/main/resources/application.conf b/src/main/resources/application.conf index 4e497d0bb46fbb..abf43b92886f32 100644 --- a/src/main/resources/application.conf +++ b/src/main/resources/application.conf @@ -52,5 +52,6 @@ settings { overrideConfPath = "./application.conf" } performance { - + serialization = "object" + useBroadcast = true } \ No newline at end of file diff --git a/src/main/scala/com/johnsnowlabs/nlp/HasFeatures.scala b/src/main/scala/com/johnsnowlabs/nlp/HasFeatures.scala index b8738116d7a385..d713366ae16eac 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/HasFeatures.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/HasFeatures.scala @@ -14,11 +14,11 @@ trait HasFeatures { protected def set[T](feature: StructFeature[T], value: T): this.type = {feature.setValue(Some(value)); this} - protected def setDefault[T](feature: ArrayFeature[T], value: Array[T]): this.type = {feature.setValue(Some(value)); this} + protected def setDefault[T](feature: ArrayFeature[T], value: () => Array[T]): this.type = {feature.setFallback(Some(value)); this} - protected def setDefault[K, V](feature: MapFeature[K, V], value: Map[K, V]): this.type = {feature.setValue(Some(value)); this} + protected def setDefault[K, V](feature: MapFeature[K, V], value: () => Map[K, V]): this.type = {feature.setFallback(Some(value)); this} - protected def setDefault[T](feature: StructFeature[T], value: T): this.type = {feature.setValue(Some(value)); this} + protected def setDefault[T](feature: StructFeature[T], value: () => T): this.type = {feature.setFallback(Some(value)); this} protected def get[T](feature: ArrayFeature[T]): Option[Array[T]] = feature.get diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/Lemmatizer.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/Lemmatizer.scala index efdb6d6adc0961..0c1863b0d33fa6 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/Lemmatizer.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/Lemmatizer.scala @@ -40,7 +40,7 @@ class Lemmatizer(override val uid: String) extends AnnotatorModel[Lemmatizer] { setDefault(lemmaValSep, config.getString("nlp.lemmaDict.vSeparator")) if (config.getString("nlp.lemmaDict.file").nonEmpty) - setDefault(lemmaDict, Lemmatizer.retrieveLemmaDict( + setDefault(lemmaDict, () => Lemmatizer.retrieveLemmaDict( config.getString("nlp.lemmaDict.file"), config.getString("nlp.lemmaDict.format"), config.getString("nlp.lemmaDict.kvSeparator"), diff --git a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala index d940f6b96fedb1..a444b6de44b06d 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/annotators/ner/crf/NerCrfModel.scala @@ -25,7 +25,7 @@ class NerCrfModel(override val uid: String) extends ModelWithWordEmbeddings[NerC def setModel(crf: LinearChainCrfModel): NerCrfModel = set(model, crf) def setDictionaryFeatures(dictFeatures: DictionaryFeatures): this.type = set(dictionaryFeatures, dictFeatures.dict) - setDefault(dictionaryFeatures, Map.empty[String, String]) + setDefault(dictionaryFeatures, () => Map.empty[String, String]) def setEntities(toExtract: Array[String]): NerCrfModel = set(entities, toExtract) diff --git a/src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala b/src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala index 2b90cda96d9d32..f3a251f533423b 100644 --- a/src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala +++ b/src/main/scala/com/johnsnowlabs/nlp/serialization/Feature.scala @@ -18,6 +18,7 @@ abstract class Feature[Serializable1, Serializable2, TComplete: ClassTag](model: final protected var broadcastValue: Option[Broadcast[TComplete]] = None final protected var rawValue: Option[TComplete] = None + final protected var fallback: Option[() => TComplete] = None final def serialize(spark: SparkSession, path: String, field: String, value: TComplete): Unit = { serializationMode match { @@ -32,7 +33,7 @@ abstract class Feature[Serializable1, Serializable2, TComplete: ClassTag](model: final def deserialize(spark: SparkSession, path: String, field: String): Option[_] = { if (broadcastValue.isDefined || rawValue.isDefined) - throw new Exception(s"Trying de deserialize an already set value for ${this.name}") + throw new Exception(s"Trying de deserialize an already set value for ${this.name}. This should not happen.") serializationMode match { case "dataset" => deserializeDataset(spark, path, field) case "object" => deserializeObject(spark, path, field) @@ -52,17 +53,11 @@ abstract class Feature[Serializable1, Serializable2, TComplete: ClassTag](model: Path.mergePaths(new Path(path), new Path("/fields/" + field)) final def get: Option[TComplete] = { - if (useBroadcast) - broadcastValue.map(_.value) - else - rawValue + broadcastValue.map(_.value).orElse(rawValue) } final def getValue: TComplete = { - if (useBroadcast) - broadcastValue.map(_.value).getOrElse(throw new Exception(s"feature $name is not set")) - else - rawValue.getOrElse(throw new Exception(s"feature $name is not set")) + broadcastValue.map(_.value).orElse(rawValue).orElse(fallback.map(_())).getOrElse(throw new Exception(s"feature $name is not set")) } final def setValue(v: Option[Any]): HasFeatures = { @@ -74,11 +69,14 @@ abstract class Feature[Serializable1, Serializable2, TComplete: ClassTag](model: } model } + + def setFallback(v: Option[() => TComplete]): HasFeatures = { + fallback = v + model + } + final def isSet: Boolean = { - if (useBroadcast) - broadcastValue.isDefined - else - rawValue.isDefined + broadcastValue.isDefined || rawValue.isDefined } } diff --git a/src/test/resources/application.conf b/src/test/resources/application.conf index 06d7e40dc04932..f5872cfcf6efb3 100644 --- a/src/test/resources/application.conf +++ b/src/test/resources/application.conf @@ -36,8 +36,7 @@ nlp { revert = -1.0 } } - performance { serialization = "object" useBroadcast = true -} +} \ No newline at end of file