Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-33609][ML] word2vec reduce broadcast size #30548

Closed

Conversation

zhengruifeng
Copy link
Contributor

@zhengruifeng zhengruifeng commented Nov 30, 2020

What changes were proposed in this pull request?

1, directly use float vectors instead of converting to double vectors, this is about 2x faster than using vec.axpy;
2, mark wordList and wordVecNorms lazy
3, avoid slicing in computation of wordVecNorms

Why are the changes needed?

halve broadcast size

Does this PR introduce any user-facing change?

No

How was this patch tested?

existing testsuites

init

init

init

init

init

ix

ix

ix

ix

ix

ix

init

init
var i = 0
while (i < numWords) {
val vec = wordVectors.slice(i * vectorSize, i * vectorSize + vectorSize)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoid this slicing

@@ -538,9 +538,13 @@ class Word2VecModel private[spark] (
@Since("1.1.0")
def transform(word: String): Vector = {
wordIndex.get(word) match {
case Some(ind) =>
val vec = wordVectors.slice(ind * vectorSize, ind * vectorSize + vectorSize)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

avoid this slicing

@SparkQA
Copy link

SparkQA commented Nov 30, 2020

Test build #131984 has finished for PR 30548 at commit 978b225.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@@ -278,34 +279,45 @@ class Word2VecModel private[ml] (
@Since("1.4.0")
def setOutputCol(value: String): this.type = set(outputCol, value)

private var bcModel: Broadcast[Word2VecModel] = _
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't suppose we have a way to clean this up after use - will just have to get GCed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes. I followed the impl of CountVectorizer here.
Since other .ml impls do not use a mutable var for a broadcast variable like this, I will remove this var.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As to CountVectorizer, should we also remove the var broadcastDict in it? It looks like that other mllib impls do not use mutable broadcasted variable like that.

val offset = index * size
val array = Array.ofDim[Double](size)
var i = 0
while (i < size) { array(i) = wordVectors(offset + i); i += 1 }
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this actually more efficient than slice? Likewise above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess so, I will do a simple test.

}

// wordVecNorms: Array of length numWords, each value being the Euclidean norm
// of the wordVector.
private val wordVecNorms: Array[Float] = {
val wordVecNorms = new Array[Float](numWords)
private lazy val wordVecNorms: Array[Float] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How much does this save, if it only happens once and has to happen to use the model?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this var wordVecNorms is only used in method findSynonyms in the .mllib.w2v; however, this findSynonyms is never used in the .ml side. So I think we can make it lazy.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK fair enough. There are use cases here that would never need this calculated?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

however, this findSynonyms is never used in the .ml side. So I think we can make it lazy.

I am wrong. this var wordVecNorms is used in methods findSynonyms and findSynonymsArray in the .ml side. Since it is not used in transform, so we can still mark it lazy

val word2Vec = udf { sentence: Seq[String] =>

if (bcModel == null) {
bcModel = dataset.sparkSession.sparkContext.broadcast(this)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like you only use this.wordVectors below? maybe just broadcast that

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

both wordVectors and wordIndex are used.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, there are two wordVectors...

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, right, I think I meant to say that you only use those two. is there any savings from just broadcasting those rather than the whole model? if not that's fine.

@zhengruifeng
Copy link
Contributor Author

zhengruifeng commented Dec 3, 2020

test performance:

  test("add float to array") {
    val floatArrays = Array.tabulate(10000, 100)((i, j) => i.toFloat / (j + 1))
    val vectors = floatArrays.map(array => Vectors.dense(array.map(_.toDouble)))

    val tic0 = System.nanoTime()
    Seq.range(0, 1000).foreach { i =>
      val sum = Array.ofDim[Double](100)
      floatArrays.foreach { array =>
        var j = 0
        while (j < 100) { sum(j) += array(j); j += 1 }
      }
    }
    val toc0 = System.nanoTime()


    val tic1 = System.nanoTime()
    Seq.range(0, 1000).foreach { i =>
      val sum = Vectors.zeros(100)
      vectors.foreach { vec =>
        org.apache.spark.ml.linalg.BLAS.axpy(1.0, vec, sum)
      }
    }
    val toc1 = System.nanoTime()

    println(s"array sum: ${toc0 - tic0}, vector axpy: ${toc1 - tic1}")
  }

result:
image

@srowen it seems that directly adding float values is nearly 2x faster than using axpy, while halving the broadcast size.

@srowen
Copy link
Member

srowen commented Dec 3, 2020

I buy that. If this is in response to the slice comment above, I am looking at a different part of the change where you unrolled the slice. Not a big deal but I guess I'd be surprised if it makes a difference, and if not, then slice is simpler.

val emptyVec = Vectors.sparse(d, Array.emptyIntArray, Array.emptyDoubleArray)
val word2Vec = udf { sentence: Seq[String] =>

val bcModel = dataset.sparkSession.sparkContext.broadcast(this.wordVectors)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At first glance this makes more sense. But, we can't call bcModel.destroy() at the end here anyway. So we have this broadcast we can't explicitly close no matter what. And now I guess, this would re-broadcast every time? that could be bad. What do you think? I know this is not consistent in the code either way.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And now I guess, this would re-broadcast every time? that could be bad. What do you think?

I agree. I perfer not using broadcasting in transform, but this may need more discussion. we can keep current behavior for now.

GBT models are also broadcasted in this way for performance since SPARK-7127.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good but i'd back out this part of the change

@zhengruifeng
Copy link
Contributor Author

slicing:

test("slicing vs non-slicing") {
    val n = 10000
    val size = 100
    val floatArray = Array.tabulate(n * size)(i => i.toFloat)

    val tic0 = System.nanoTime()
    Seq.range(0, 1000).foreach { i =>
      Seq.range(0, n).foreach { j =>
        floatArray.slice(j * size, j * size + size).map(_.toDouble)
      }
    }
    val toc0 = System.nanoTime()


    val tic1 = System.nanoTime()
    Seq.range(0, 1000).foreach { i =>
      Seq.range(0, n).foreach { j =>
        val doubles = Array.ofDim[Double](size)
        val offset = j * size
        var k = 0
        while (k < size) { doubles(k) = floatArray(offset + k); k += 1 }
      }
    }
    val toc1 = System.nanoTime()

    println(s"slicing: ${toc0 - tic0}, non-slicing: ${toc1 - tic1}")
  }

image

@srowen slicing and then mapping to double, is about 10X slower than the new impl. It is somewhat surprising to me.

@SparkQA
Copy link

SparkQA commented Dec 3, 2020

Kubernetes integration test starting
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/36679/

@SparkQA
Copy link

SparkQA commented Dec 3, 2020

Kubernetes integration test status failure
URL: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder-K8s/36679/

@SparkQA
Copy link

SparkQA commented Dec 3, 2020

Test build #132082 has finished for PR 30548 at commit 3ba4fda.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@zhengruifeng
Copy link
Contributor Author

Merged to master, thanks @srowen for reviewing!

@zhengruifeng zhengruifeng deleted the w2v_float32_transform branch December 8, 2020 03:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
3 participants