From 2a71796bbf43e93750d81531c50b84bac6856df3 Mon Sep 17 00:00:00 2001 From: Asher Krim Date: Fri, 13 Jan 2017 18:13:14 -0500 Subject: [PATCH 1/7] new word2vec save format of sequence of wordVectors backwards compatibility maintained by checking the spark version. The new format should make it more feasible to save large models --- .../apache/spark/ml/feature/Word2Vec.scala | 37 +++++++++++++------ 1 file changed, 25 insertions(+), 12 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala index 3ed08c983d56..174d92e6e23a 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala @@ -18,10 +18,9 @@ package org.apache.spark.ml.feature import org.apache.hadoop.fs.Path - import org.apache.spark.annotation.Since import org.apache.spark.ml.{Estimator, Model} -import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors, VectorUDT} +import org.apache.spark.ml.linalg.{BLAS, Vector, VectorUDT, Vectors} import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ @@ -302,16 +301,18 @@ class Word2VecModel private[ml] ( @Since("1.6.0") object Word2VecModel extends MLReadable[Word2VecModel] { + private case class Data(word: String, vector: Seq[Float]) + private[Word2VecModel] class Word2VecModelWriter(instance: Word2VecModel) extends MLWriter { - private case class Data(wordIndex: Map[String, Int], wordVectors: Seq[Float]) - override protected def saveImpl(path: String): Unit = { DefaultParamsWriter.saveMetadata(instance, path, sc) - val data = Data(instance.wordVectors.wordIndex, instance.wordVectors.wordVectors.toSeq) + + val wordVectors = instance.wordVectors.getVectors + val dataArray = wordVectors.toSeq.map { case (word, vector) => Data(word, vector) } val dataPath = new Path(path, "data").toString - sparkSession.createDataFrame(Seq(data)).repartition(1).write.parquet(dataPath) + sparkSession.createDataFrame(dataArray).repartition(1).write.parquet(dataPath) } } @@ -321,13 +322,25 @@ object Word2VecModel extends MLReadable[Word2VecModel] { override def load(path: String): Word2VecModel = { val metadata = DefaultParamsReader.loadMetadata(path, sc, className) + val dataPath = new Path(path, "data").toString - val data = sparkSession.read.parquet(dataPath) - .select("wordIndex", "wordVectors") - .head() - val wordIndex = data.getAs[Map[String, Int]](0) - val wordVectors = data.getAs[Seq[Float]](1).toArray - val oldModel = new feature.Word2VecModel(wordIndex, wordVectors) + val rawData = sparkSession.read.parquet(dataPath) + + val oldModel = if (rawData.columns.contains("wordIndex")) { + val data = rawData + .select("wordIndex", "wordVectors") + .head() + val wordIndex = data.getAs[Map[String, Int]](0) + val wordVectors = data.getAs[Seq[Float]](1).toArray + new feature.Word2VecModel(wordIndex, wordVectors) + } else { + val wordVectorsMap: Map[String, Array[Float]] = rawData.as[Data] + .collect() + .map(wordVector => (wordVector.word, wordVector.vector.toArray)) + .toMap + new feature.Word2VecModel(wordVectorsMap) + } + val model = new Word2VecModel(metadata.uid, oldModel) DefaultParamsReader.getAndSetParams(model, metadata) model From ac935731f2dc8316e17257fbbca2d10932ed853f Mon Sep 17 00:00:00 2001 From: Asher Krim Date: Sat, 14 Jan 2017 14:40:41 -0500 Subject: [PATCH 2/7] reintroduce SPARK-11994 to keep partition sizes reasonably small The fix was never migrated to the ml version of Word2Vec --- .../apache/spark/ml/feature/Word2Vec.scala | 29 +++++++++++++++++-- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala index 174d92e6e23a..8b4cb1a0f894 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala @@ -18,9 +18,10 @@ package org.apache.spark.ml.feature import org.apache.hadoop.fs.Path + import org.apache.spark.annotation.Since import org.apache.spark.ml.{Estimator, Model} -import org.apache.spark.ml.linalg.{BLAS, Vector, VectorUDT, Vectors} +import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors, VectorUDT} import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ @@ -29,6 +30,7 @@ import org.apache.spark.mllib.linalg.VectorImplicits._ import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ +import org.apache.spark.util.Utils /** * Params for [[Word2Vec]] and [[Word2VecModel]]. @@ -312,7 +314,25 @@ object Word2VecModel extends MLReadable[Word2VecModel] { val wordVectors = instance.wordVectors.getVectors val dataArray = wordVectors.toSeq.map { case (word, vector) => Data(word, vector) } val dataPath = new Path(path, "data").toString - sparkSession.createDataFrame(dataArray).repartition(1).write.parquet(dataPath) + sparkSession.createDataFrame(dataArray) + .repartition(calculateNumberOfPartitions) + .write + .parquet(dataPath) + } + + val FloatSize = 4 + val AverageWordSize = 15 + def calculateNumberOfPartitions(): Int = { + // [SPARK-11994] - We want to partition the model in partitions smaller than + // spark.kryoserializer.buffer.max + val bufferSizeInBytes = Utils.byteStringAsBytes( + sc.conf.get("spark.kryoserializer.buffer.max", "64m")) + // Calculate the approximate size of the model. + // Assuming an average word size of 15 bytes, the formula is: + // (floatSize * vectorSize + 15) * numWords + val numWords = instance.wordVectors.wordIndex.size + val approximateSizeInBytes = (FloatSize * instance.getVectorSize + AverageWordSize) * numWords + ((approximateSizeInBytes / bufferSizeInBytes) + 1).toInt } } @@ -321,10 +341,13 @@ object Word2VecModel extends MLReadable[Word2VecModel] { private val className = classOf[Word2VecModel].getName override def load(path: String): Word2VecModel = { + val spark = sparkSession + import spark.implicits._ + val metadata = DefaultParamsReader.loadMetadata(path, sc, className) val dataPath = new Path(path, "data").toString - val rawData = sparkSession.read.parquet(dataPath) + val rawData = spark.read.parquet(dataPath) val oldModel = if (rawData.columns.contains("wordIndex")) { val data = rawData From 6752ccb66960dbe50d3ad8c897634071c49abce8 Mon Sep 17 00:00:00 2001 From: Asher Krim Date: Tue, 17 Jan 2017 10:32:15 -0500 Subject: [PATCH 3/7] use array type instead of seq; remove unneeded type from local val --- .../main/scala/org/apache/spark/ml/feature/Word2Vec.scala | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala index 8b4cb1a0f894..84eb20e25d51 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala @@ -303,7 +303,7 @@ class Word2VecModel private[ml] ( @Since("1.6.0") object Word2VecModel extends MLReadable[Word2VecModel] { - private case class Data(word: String, vector: Seq[Float]) + private case class Data(word: String, vector: Array[Float]) private[Word2VecModel] class Word2VecModelWriter(instance: Word2VecModel) extends MLWriter { @@ -312,7 +312,7 @@ object Word2VecModel extends MLReadable[Word2VecModel] { DefaultParamsWriter.saveMetadata(instance, path, sc) val wordVectors = instance.wordVectors.getVectors - val dataArray = wordVectors.toSeq.map { case (word, vector) => Data(word, vector) } + val dataArray = wordVectors.toSeq.map { case (word, vector) => Data(word, vector) }.toArray val dataPath = new Path(path, "data").toString sparkSession.createDataFrame(dataArray) .repartition(calculateNumberOfPartitions) @@ -357,9 +357,9 @@ object Word2VecModel extends MLReadable[Word2VecModel] { val wordVectors = data.getAs[Seq[Float]](1).toArray new feature.Word2VecModel(wordIndex, wordVectors) } else { - val wordVectorsMap: Map[String, Array[Float]] = rawData.as[Data] + val wordVectorsMap = rawData.as[Data] .collect() - .map(wordVector => (wordVector.word, wordVector.vector.toArray)) + .map(wordVector => (wordVector.word, wordVector.vector)) .toMap new feature.Word2VecModel(wordVectorsMap) } From 218bb0d753ef800c508e14580001b4d9efdb26a7 Mon Sep 17 00:00:00 2001 From: Asher Krim Date: Thu, 19 Jan 2017 13:53:43 -0500 Subject: [PATCH 4/7] make these local vals --- .../main/scala/org/apache/spark/ml/feature/Word2Vec.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala index 84eb20e25d51..417d37f87dab 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala @@ -320,9 +320,9 @@ object Word2VecModel extends MLReadable[Word2VecModel] { .parquet(dataPath) } - val FloatSize = 4 - val AverageWordSize = 15 def calculateNumberOfPartitions(): Int = { + val floatSize = 4 + val averageWordSize = 15 // [SPARK-11994] - We want to partition the model in partitions smaller than // spark.kryoserializer.buffer.max val bufferSizeInBytes = Utils.byteStringAsBytes( @@ -331,7 +331,7 @@ object Word2VecModel extends MLReadable[Word2VecModel] { // Assuming an average word size of 15 bytes, the formula is: // (floatSize * vectorSize + 15) * numWords val numWords = instance.wordVectors.wordIndex.size - val approximateSizeInBytes = (FloatSize * instance.getVectorSize + AverageWordSize) * numWords + val approximateSizeInBytes = (floatSize * instance.getVectorSize + averageWordSize) * numWords ((approximateSizeInBytes / bufferSizeInBytes) + 1).toInt } } From 9b5e9288699012b2e5d9b347191fd3d141b31d7d Mon Sep 17 00:00:00 2001 From: Asher Krim Date: Thu, 19 Jan 2017 14:02:50 -0500 Subject: [PATCH 5/7] rely on spark version less than 2.2 as an indicator for the old format --- .../org/apache/spark/ml/feature/Word2Vec.scala | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala index 417d37f87dab..eb4eb3cc79a0 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala @@ -18,10 +18,9 @@ package org.apache.spark.ml.feature import org.apache.hadoop.fs.Path - import org.apache.spark.annotation.Since import org.apache.spark.ml.{Estimator, Model} -import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors, VectorUDT} +import org.apache.spark.ml.linalg.{BLAS, Vector, VectorUDT, Vectors} import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._ @@ -30,7 +29,7 @@ import org.apache.spark.mllib.linalg.VectorImplicits._ import org.apache.spark.sql.{DataFrame, Dataset, SparkSession} import org.apache.spark.sql.functions._ import org.apache.spark.sql.types._ -import org.apache.spark.util.Utils +import org.apache.spark.util.{Utils, VersionUtils} /** * Params for [[Word2Vec]] and [[Word2VecModel]]. @@ -345,19 +344,19 @@ object Word2VecModel extends MLReadable[Word2VecModel] { import spark.implicits._ val metadata = DefaultParamsReader.loadMetadata(path, sc, className) + val (major, minor) = VersionUtils.majorMinorVersion(metadata.sparkVersion) val dataPath = new Path(path, "data").toString - val rawData = spark.read.parquet(dataPath) - val oldModel = if (rawData.columns.contains("wordIndex")) { - val data = rawData + val oldModel = if (major.toInt < 2 || (major.toInt == 2 && minor.toInt < 2)) { + val data = spark.read.parquet(dataPath) .select("wordIndex", "wordVectors") .head() val wordIndex = data.getAs[Map[String, Int]](0) val wordVectors = data.getAs[Seq[Float]](1).toArray new feature.Word2VecModel(wordIndex, wordVectors) } else { - val wordVectorsMap = rawData.as[Data] + val wordVectorsMap = spark.read.parquet(dataPath).as[Data] .collect() .map(wordVector => (wordVector.word, wordVector.vector)) .toMap From 08f2c77f46bd3cf0eb0e66072fe62aa3d734fa45 Mon Sep 17 00:00:00 2001 From: Asher Krim Date: Sun, 5 Feb 2017 13:26:17 -0500 Subject: [PATCH 6/7] small corrections --- .../main/scala/org/apache/spark/ml/feature/Word2Vec.scala | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala index eb4eb3cc79a0..a5b66a9e645c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala @@ -18,6 +18,7 @@ package org.apache.spark.ml.feature import org.apache.hadoop.fs.Path + import org.apache.spark.annotation.Since import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.ml.linalg.{BLAS, Vector, VectorUDT, Vectors} @@ -311,9 +312,9 @@ object Word2VecModel extends MLReadable[Word2VecModel] { DefaultParamsWriter.saveMetadata(instance, path, sc) val wordVectors = instance.wordVectors.getVectors - val dataArray = wordVectors.toSeq.map { case (word, vector) => Data(word, vector) }.toArray + val dataSeq = wordVectors.toSeq.map { case (word, vector) => Data(word, vector) } val dataPath = new Path(path, "data").toString - sparkSession.createDataFrame(dataArray) + sparkSession.createDataFrame(dataSeq) .repartition(calculateNumberOfPartitions) .write .parquet(dataPath) @@ -348,7 +349,7 @@ object Word2VecModel extends MLReadable[Word2VecModel] { val dataPath = new Path(path, "data").toString - val oldModel = if (major.toInt < 2 || (major.toInt == 2 && minor.toInt < 2)) { + val oldModel = if (major < 2 || (major == 2 && minor < 2)) { val data = spark.read.parquet(dataPath) .select("wordIndex", "wordVectors") .head() From 28f013baf051e1bc55d51002847a31a6113eb5fd Mon Sep 17 00:00:00 2001 From: Asher Krim Date: Sun, 5 Feb 2017 14:08:45 -0500 Subject: [PATCH 7/7] fix selector ordering --- mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala index a5b66a9e645c..42e8a66a62b6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala @@ -21,7 +21,7 @@ import org.apache.hadoop.fs.Path import org.apache.spark.annotation.Since import org.apache.spark.ml.{Estimator, Model} -import org.apache.spark.ml.linalg.{BLAS, Vector, VectorUDT, Vectors} +import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors, VectorUDT} import org.apache.spark.ml.param._ import org.apache.spark.ml.param.shared._ import org.apache.spark.ml.util._