From 978b225d6815aa6a7ad97e5ccc62344581dd4902 Mon Sep 17 00:00:00 2001 From: zhengruifeng Date: Wed, 28 Oct 2020 19:34:47 +0800 Subject: [PATCH 1/2] init init init init init init ix ix ix ix ix ix init init --- .../apache/spark/ml/feature/Word2Vec.scala | 38 ++++++++++++------- .../apache/spark/mllib/feature/Word2Vec.scala | 26 +++++++------ 2 files changed, 40 insertions(+), 24 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala index 9b5f5a619e02c..e0e275e98719c 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala @@ -20,6 +20,7 @@ package org.apache.spark.ml.feature import org.apache.hadoop.fs.Path import org.apache.spark.annotation.Since +import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.config.Kryo.KRYO_SERIALIZER_MAX_BUFFER_SIZE import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors, VectorUDT} @@ -278,6 +279,8 @@ class Word2VecModel private[ml] ( @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) + private var bcModel: Broadcast[Word2VecModel] = _ + /** * Transform a sentence column to a vector column to represent the whole sentence. The transform * is performed by averaging all word vectors it contains. @@ -285,27 +288,36 @@ class Word2VecModel private[ml] ( @Since("2.0.0") override def transform(dataset: Dataset[_]): DataFrame = { val outputSchema = transformSchema(dataset.schema, logging = true) - val vectors = wordVectors.getVectors - .mapValues(vv => Vectors.dense(vv.map(_.toDouble))) - .map(identity).toMap // mapValues doesn't return a serializable map (SI-7005) - val bVectors = dataset.sparkSession.sparkContext.broadcast(vectors) - val d = $(vectorSize) - val emptyVec = Vectors.sparse(d, Array.emptyIntArray, Array.emptyDoubleArray) - val word2Vec = udf { sentence: Seq[String] => + + if (bcModel == null) { + bcModel = dataset.sparkSession.sparkContext.broadcast(this) + } + + val size = $(vectorSize) + val emptyVec = Vectors.sparse(size, Array.emptyIntArray, Array.emptyDoubleArray) + val transformer = udf { sentence: Seq[String] => if (sentence.isEmpty) { emptyVec } else { - val sum = Vectors.zeros(d) + val wordIndices = bcModel.value.wordVectors.wordIndex + val wordVectors = bcModel.value.wordVectors.wordVectors + val array = Array.ofDim[Double](size) + var count = 0L sentence.foreach { word => - bVectors.value.get(word).foreach { v => - BLAS.axpy(1.0, v, sum) + wordIndices.get(word).foreach { index => + val offset = index * size + var i = 0 + while (i < size) { array(i) += wordVectors(offset + i); i += 1 } } + count += 1 } - BLAS.scal(1.0 / sentence.size, sum) - sum + val vec = Vectors.dense(array) + BLAS.scal(1.0 / count, vec) + vec } } - dataset.withColumn($(outputCol), word2Vec(col($(inputCol))), + + dataset.withColumn($(outputCol), transformer(col($(inputCol))), outputSchema($(outputCol)).metadata) } diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala index eeb583f84ca8b..895d85a5841ed 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala @@ -502,19 +502,19 @@ class Word2VecModel private[spark] ( private val vectorSize = wordVectors.length / numWords // wordList: Ordered list of words obtained from wordIndex. - private val wordList: Array[String] = { - val (wl, _) = wordIndex.toSeq.sortBy(_._2).unzip - wl.toArray + private lazy val wordList: Array[String] = { + wordIndex.toSeq.sortBy(_._2).iterator.map(_._1).toArray } // wordVecNorms: Array of length numWords, each value being the Euclidean norm // of the wordVector. - private val wordVecNorms: Array[Float] = { - val wordVecNorms = new Array[Float](numWords) + private lazy val wordVecNorms: Array[Float] = { + val num = numWords + val size = vectorSize + val wordVecNorms = new Array[Float](num) var i = 0 - while (i < numWords) { - val vec = wordVectors.slice(i * vectorSize, i * vectorSize + vectorSize) - wordVecNorms(i) = blas.snrm2(vectorSize, vec, 1) + while (i < num) { + wordVecNorms(i) = blas.snrm2(size, wordVectors, i * size, 1) i += 1 } wordVecNorms @@ -538,9 +538,13 @@ class Word2VecModel private[spark] ( @Since("1.1.0") def transform(word: String): Vector = { wordIndex.get(word) match { - case Some(ind) => - val vec = wordVectors.slice(ind * vectorSize, ind * vectorSize + vectorSize) - Vectors.dense(vec.map(_.toDouble)) + case Some(index) => + val size = vectorSize + val offset = index * size + val array = Array.ofDim[Double](size) + var i = 0 + while (i < size) { array(i) = wordVectors(offset + i); i += 1 } + Vectors.dense(array) case None => throw new IllegalStateException(s"$word not in vocabulary") } From 3ba4fda47dd7627014b0aacfcbdc1b0a93fd924a Mon Sep 17 00:00:00 2001 From: Ruifeng Zheng Date: Thu, 3 Dec 2020 10:34:02 +0800 Subject: [PATCH 2/2] address comments --- .../org/apache/spark/ml/feature/Word2Vec.scala | 14 ++++---------- .../org/apache/spark/mllib/feature/Word2Vec.scala | 9 +-------- 2 files changed, 5 insertions(+), 18 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala index e0e275e98719c..0b9c1b570d943 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/feature/Word2Vec.scala @@ -20,7 +20,6 @@ package org.apache.spark.ml.feature import org.apache.hadoop.fs.Path import org.apache.spark.annotation.Since -import org.apache.spark.broadcast.Broadcast import org.apache.spark.internal.config.Kryo.KRYO_SERIALIZER_MAX_BUFFER_SIZE import org.apache.spark.ml.{Estimator, Model} import org.apache.spark.ml.linalg.{BLAS, Vector, Vectors, VectorUDT} @@ -279,8 +278,6 @@ class Word2VecModel private[ml] ( @Since("1.4.0") def setOutputCol(value: String): this.type = set(outputCol, value) - private var bcModel: Broadcast[Word2VecModel] = _ - /** * Transform a sentence column to a vector column to represent the whole sentence. The transform * is performed by averaging all word vectors it contains. @@ -289,20 +286,17 @@ class Word2VecModel private[ml] ( override def transform(dataset: Dataset[_]): DataFrame = { val outputSchema = transformSchema(dataset.schema, logging = true) - if (bcModel == null) { - bcModel = dataset.sparkSession.sparkContext.broadcast(this) - } - + val bcModel = dataset.sparkSession.sparkContext.broadcast(this.wordVectors) val size = $(vectorSize) val emptyVec = Vectors.sparse(size, Array.emptyIntArray, Array.emptyDoubleArray) val transformer = udf { sentence: Seq[String] => if (sentence.isEmpty) { emptyVec } else { - val wordIndices = bcModel.value.wordVectors.wordIndex - val wordVectors = bcModel.value.wordVectors.wordVectors + val wordIndices = bcModel.value.wordIndex + val wordVectors = bcModel.value.wordVectors val array = Array.ofDim[Double](size) - var count = 0L + var count = 0 sentence.foreach { word => wordIndices.get(word).foreach { index => val offset = index * size diff --git a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala index 895d85a5841ed..8a6317a910146 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala @@ -509,15 +509,8 @@ class Word2VecModel private[spark] ( // wordVecNorms: Array of length numWords, each value being the Euclidean norm // of the wordVector. private lazy val wordVecNorms: Array[Float] = { - val num = numWords val size = vectorSize - val wordVecNorms = new Array[Float](num) - var i = 0 - while (i < num) { - wordVecNorms(i) = blas.snrm2(size, wordVectors, i * size, 1) - i += 1 - } - wordVecNorms + Array.tabulate(numWords)(i => blas.snrm2(size, wordVectors, i * size, 1)) } @Since("1.5.0")