Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-3541][MLLIB] New ALS implementation with improved storage #3720

Closed
wants to merge 21 commits into from

Conversation

mengxr
Copy link
Contributor

@mengxr mengxr commented Dec 17, 2014

This PR adds a new ALS implementation to spark.ml using the pipeline API, which should be able to scale to billions of ratings. Compared with the ALS under spark.mllib, the new implementation

  1. uses the same algorithm,
  2. uses float type for ratings,
  3. uses primitive arrays to avoid GC,
  4. sorts and compresses ratings on each block so that we can solve least squares subproblems one by one using only one normal equation instance.

The following figure shows performance comparison on copies of the Amazon Reviews dataset using a 16-node (m3.2xlarge) EC2 cluster (the same setup as in http://databricks.com/blog/2014/07/23/scalable-collaborative-filtering-with-spark-mllib.html):
als-wip

I keep the spark.mllib's ALS untouched for easy comparison. If the new implementation works well, I'm going to match the features of the ALS under spark.mllib and then make it a wrapper of the new implementation, in a separate PR.

TODO:

  • Add unit tests for implicit preferences.

@SparkQA
Copy link

SparkQA commented Dec 17, 2014

Test build #24537 has finished for PR 3720 at commit 3f2d81a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Rating(user: Int, product: Int, rating: Float)
    • case class Params(
    • class ALS extends Estimator[ALSModel] with ALSParams
    • trait ParquetTest

@SparkQA
Copy link

SparkQA commented Dec 19, 2014

Test build #24653 has finished for PR 3720 at commit 4937fd4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)
    • case class Movie(movieId: Int, title: String, genres: Seq[String])
    • case class Params(
    • class ALS extends Estimator[ALSModel] with ALSParams

@SparkQA
Copy link

SparkQA commented Dec 30, 2014

Test build #24911 has finished for PR 3720 at commit 213d163.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)
    • case class Movie(movieId: Int, title: String, genres: Seq[String])
    • case class Params(
    • class ALS extends Estimator[ALSModel] with ALSParams

@SparkQA
Copy link

SparkQA commented Jan 8, 2015

Test build #25195 has finished for PR 3720 at commit 2a8deb3.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)
    • case class Movie(movieId: Int, title: String, genres: Seq[String])
    • case class Params(
    • class ALS extends Estimator[ALSModel] with ALSParams
    • case class RatingBlock(srcIds: Array[Int], dstIds: Array[Int], ratings: Array[Float])

@mengxr mengxr changed the title [WIP][SPARK-3541][MLLIB] New ALS implementation with improved storage [SPARK-3541][MLLIB] New ALS implementation with improved storage Jan 8, 2015
@mengxr
Copy link
Contributor Author

mengxr commented Jan 8, 2015

@srowen @coderxiang This PR is almost ready, pending few unit tests. Would you be interested in reviewing the code?

@SparkQA
Copy link

SparkQA commented Jan 8, 2015

Test build #25207 has finished for PR 3720 at commit a76da7b.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)
    • case class Movie(movieId: Int, title: String, genres: Seq[String])
    • case class Params(
    • class ALS extends Estimator[ALSModel] with ALSParams
    • case class RatingBlock(srcIds: Array[Int], dstIds: Array[Int], ratings: Array[Float])

val err = rating.toDouble - prediction
val err2 = err * err
if (err2.isNaN) {
Iterator.empty
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tiny: would it be clearer to return Some and None? This works too of course.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@SparkQA
Copy link

SparkQA commented Jan 9, 2015

Test build #25337 has finished for PR 3720 at commit b84f41c.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)
    • case class Movie(movieId: Int, title: String, genres: Seq[String])
    • case class Params(
    • class ALS extends Estimator[ALSModel] with ALSParams
    • case class RatingBlock(srcIds: Array[Int], dstIds: Array[Int], ratings: Array[Float])

@SparkQA
Copy link

SparkQA commented Jan 10, 2015

Test build #25353 has finished for PR 3720 at commit dd0d0e8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)
    • case class Movie(movieId: Int, title: String, genres: Seq[String])
    • case class Params(
    • class ALS extends Estimator[ALSModel] with ALSParams
    • case class RatingBlock(srcIds: Array[Int], dstIds: Array[Int], ratings: Array[Float])

@mengxr
Copy link
Contributor Author

mengxr commented Jan 13, 2015

test this please

@SparkQA
Copy link

SparkQA commented Jan 13, 2015

Test build #25485 has finished for PR 3720 at commit dd0d0e8.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)
    • case class Movie(movieId: Int, title: String, genres: Seq[String])
    • case class Params(
    • class ALS extends Estimator[ALSModel] with ALSParams
    • case class RatingBlock(srcIds: Array[Int], dstIds: Array[Int], ratings: Array[Float])

@mengxr
Copy link
Contributor Author

mengxr commented Jan 19, 2015

test this please

@mengxr
Copy link
Contributor Author

mengxr commented Jan 19, 2015

@srowen @coderxiang T on the implementation?

@SparkQA
Copy link

SparkQA commented Jan 19, 2015

Test build #25765 has finished for PR 3720 at commit dd0d0e8.

  • This patch fails to build.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)
    • case class Movie(movieId: Int, title: String, genres: Seq[String])
    • case class Params(
    • class ALS extends Estimator[ALSModel] with ALSParams
    • case class RatingBlock(srcIds: Array[Int], dstIds: Array[Int], ratings: Array[Float])

@SparkQA
Copy link

SparkQA commented Jan 19, 2015

Test build #25767 has finished for PR 3720 at commit 1b9e852.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds the following public classes (experimental):
    • case class Rating(userId: Int, movieId: Int, rating: Float, timestamp: Long)
    • case class Movie(movieId: Int, title: String, genres: Seq[String])
    • case class Params(
    • class ALS extends Estimator[ALSModel] with ALSParams
    • case class RatingBlock(srcIds: Array[Int], dstIds: Array[Int], ratings: Array[Float])

@mengxr
Copy link
Contributor Author

mengxr commented Jan 22, 2015

@srowen @coderxiang Do you have more comments? I'm thinking about merging this and then port nonnegative support. After that, we can replace the ALS implementation under "spark.mllib".

@srowen
Copy link
Member

srowen commented Jan 22, 2015

@mengxr Given how familiar you are with this implementation, and the tests, I can only be pretty sure it works. I didn't see any style issues, and thought through some of the loops for speed/correctness but every spot check was fine. It is looking good to me.

@coderxiang
Copy link
Contributor

@mengxr the logic and style also look good to me.

@mengxr
Copy link
Contributor Author

mengxr commented Jan 23, 2015

Thanks! I've merged this into master. I'll send follow-up PRs soon.

@asfgit asfgit closed this in ea74365 Jan 23, 2015
scwf pushed a commit to scwf/spark that referenced this pull request Jan 25, 2015
commit ea74365
Author: Xiangrui Meng <meng@databricks.com>
Date:   Thu Jan 22 22:09:13 2015 -0800

    [SPARK-3541][MLLIB] New ALS implementation with improved storage

    This PR adds a new ALS implementation to `spark.ml` using the pipeline API, which should be able to scale to billions of ratings. Compared with the ALS under `spark.mllib`, the new implementation

    1. uses the same algorithm,
    2. uses float type for ratings,
    3. uses primitive arrays to avoid GC,
    4. sorts and compresses ratings on each block so that we can solve least squares subproblems one by one using only one normal equation instance.

    The following figure shows performance comparison on copies of the Amazon Reviews dataset using a 16-node (m3.2xlarge) EC2 cluster (the same setup as in http://databricks.com/blog/2014/07/23/scalable-collaborative-filtering-with-spark-mllib.html):
    ![als-wip](https://cloud.githubusercontent.com/assets/829644/5659447/4c4ff8e0-96c7-11e4-87a9-73c1c63d07f3.png)

    I keep the `spark.mllib`'s ALS untouched for easy comparison. If the new implementation works well, I'm going to match the features of the ALS under `spark.mllib` and then make it a wrapper of the new implementation, in a separate PR.

    TODO:
    - [X] Add unit tests for implicit preferences.

    Author: Xiangrui Meng <meng@databricks.com>

    Closes apache#3720 from mengxr/SPARK-3541 and squashes the following commits:

    1b9e852 [Xiangrui Meng] fix compile
    5129be9 [Xiangrui Meng] Merge remote-tracking branch 'apache/master' into SPARK-3541
    dd0d0e8 [Xiangrui Meng] simplify test code
    c627de3 [Xiangrui Meng] add tests for implicit feedback
    b84f41c [Xiangrui Meng] address comments
    a76da7b [Xiangrui Meng] update ALS tests
    2a8deb3 [Xiangrui Meng] add some ALS tests
    857e876 [Xiangrui Meng] add tests for rating block and encoded block
    d3c1ac4 [Xiangrui Meng] rename some classes for better code readability add more doc and comments
    213d163 [Xiangrui Meng] org imports
    771baf3 [Xiangrui Meng] chol doc update
    ca9ad9d [Xiangrui Meng] add unit tests for chol
    b4fd17c [Xiangrui Meng] add unit tests for NormalEquation
    d0f99d3 [Xiangrui Meng] add tests for LocalIndexEncoder
    80b8e61 [Xiangrui Meng] fix imports
    4937fd4 [Xiangrui Meng] update ALS example
    56c253c [Xiangrui Meng] rename product to item
    bce8692 [Xiangrui Meng] doc for parameters and project the output columns
    3f2d81a [Xiangrui Meng] add doc
    1efaecf [Xiangrui Meng] add example code
    8ae86b5 [Xiangrui Meng] add a working copy of the new ALS implementation

commit e0f7fb7
Author: jerryshao <saisai.shao@intel.com>
Date:   Thu Jan 22 22:04:21 2015 -0800

    [SPARK-5315][Streaming] Fix reduceByWindow Java API not work bug

    `reduceByWindow` for Java API is actually not Java compatible, change to make it Java compatible.

    Current solution is to deprecate the old one and add a new API, but since old API actually is not correct, so is keeping the old one meaningful? just to keep the binary compatible? Also even adding new API still need to add to Mima exclusion, I'm not sure to change the API, or deprecate the old API and add a new one, which is the best solution?

    Author: jerryshao <saisai.shao@intel.com>

    Closes apache#4104 from jerryshao/SPARK-5315 and squashes the following commits:

    5bc8987 [jerryshao] Address the comment
    c7aa1b4 [jerryshao] Deprecate the old one to keep binary compatible
    8e9dc67 [jerryshao] Fix JavaDStream reduceByWindow signature error

commit 3c3fa63
Author: jerryshao <saisai.shao@intel.com>
Date:   Thu Jan 22 21:58:53 2015 -0800

    [SPARK-5233][Streaming] Fix error replaying of WAL introduced bug

    Because of lacking of `BlockAllocationEvent` in WAL recovery, the dangled event will mix into the new batch, which will lead to the wrong result. Details can be seen in [SPARK-5233](https://issues.apache.org/jira/browse/SPARK-5233).

    Author: jerryshao <saisai.shao@intel.com>

    Closes apache#4032 from jerryshao/SPARK-5233 and squashes the following commits:

    f0b0c0b [jerryshao] Further address the comments
    a237c75 [jerryshao] Address the comments
    e356258 [jerryshao] Fix bug in unit test
    558bdc3 [jerryshao] Correctly replay the WAL log when recovering from failure
@hy2014
Copy link

hy2014 commented Apr 15, 2015

hi, we use als example, but we found same data, same input arguments, but output different result, spark 1.2.0 can result data we want, but spark 1.3.0 cannot return right data, it all return zero matrix in userFeature.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
6 participants