From 8fb4a82be98588795454127f4281363d105146f0 Mon Sep 17 00:00:00 2001 From: hqzizania Date: Fri, 24 Jun 2016 14:26:31 +0800 Subject: [PATCH 01/12] add dsyrk to ALS --- .../apache/spark/ml/recommendation/ALS.scala | 53 +++++++++++++++++-- 1 file changed, 50 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 5dc2433e55c39..c46304bfecc32 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -619,8 +619,10 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { /** Number of entries in the upper triangular part of a k-by-k matrix. */ val triK = k * (k + 1) / 2 - /** A^T^ * A */ + /** The upper triangular of A^T^ * A */ val ata = new Array[Double](triK) + /** A^T^ * A */ + val ata2 = new Array[Double](k * k) /** A^T^ * b */ val atb = new Array[Double](k) @@ -635,6 +637,15 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { } } + private def copyToTri(a: Array[Double]): Unit = { + var ii = 0 + for(i <- 0 until k) + for(j <- 0 to i) { + ata(ii) += ata2(i * k + j) + ii += 1 + } + } + /** Adds an observation. */ def add(a: Array[Float], b: Double, c: Double = 1.0): this.type = { require(c >= 0.0) @@ -647,6 +658,15 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { this } + /** Adds a stack of observations. */ + def addStack(a: Array[Double], b: Array[Double], n: Int): this.type = { + require(a.length == n * k) + blas.dsyrk(upper, "N", k, n, 1.0, a, k, 1.0, ata2, k) + copyToTri(ata2) + blas.dgemv("N", k, n, 1.0, a, k, b, 1, 1.0, atb, 1) + this + } + /** Merges another normal equation object. */ def merge(other: NormalEquation): this.type = { require(other.k == k) @@ -658,6 +678,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { /** Resets everything to zero, which should be called after each solve. */ def reset(): Unit = { ju.Arrays.fill(ata, 0.0) + ju.Arrays.fill(ata2, 0.0) ju.Arrays.fill(atb, 0.0) } } @@ -1296,6 +1317,9 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { } var i = srcPtrs(j) var numExplicits = 0 + val doStack = if (srcPtrs(j + 1) - srcPtrs(j) > 10) true else false + val srcFactorBuffer = mutable.ArrayBuilder.make[Double] + val bBuffer = mutable.ArrayBuilder.make[Double] while (i < srcPtrs(j + 1)) { val encoded = srcEncodedIndices(i) val blockId = srcEncoder.blockId(encoded) @@ -1310,14 +1334,37 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { // for rating > 0. Because YtY is already added, we need to adjust the scaling here. if (rating > 0) { numExplicits += 1 - ls.add(srcFactor, (c1 + 1.0) / c1, c1) + if (doStack) { + var ii = 0 + while(ii < srcFactor.length) { + srcFactorBuffer += srcFactor(ii) * c1 + ii += 1 + } + bBuffer += (c1 + 1.0) / c1 + } + else { + ls.add(srcFactor, (c1 + 1.0) / c1, c1) + } } } else { - ls.add(srcFactor, rating) numExplicits += 1 + if (doStack) { + bBuffer += rating + var ii = 0 + while(ii < srcFactor.length) { + srcFactorBuffer += srcFactor(ii) + ii += 1 + } + } + else { + ls.add(srcFactor, rating) + } } i += 1 } + if(numExplicits > 0 && doStack) { + ls.addStack(srcFactorBuffer.result(), bBuffer.result(), numExplicits) + } // Weight lambda by the number of explicit ratings based on the ALS-WR paper. dstFactors(j) = solver.solve(ls, numExplicits * regParam) j += 1 From 7e3d238c62269f923832e7cba237f750082450a9 Mon Sep 17 00:00:00 2001 From: hqzizania Date: Sat, 25 Jun 2016 00:28:57 +0800 Subject: [PATCH 02/12] implicitprefs ut fails fix --- .../apache/spark/ml/recommendation/ALS.scala | 26 +++++-------------- 1 file changed, 7 insertions(+), 19 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index c46304bfecc32..6c3cb9e7b8ac7 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -619,14 +619,13 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { /** Number of entries in the upper triangular part of a k-by-k matrix. */ val triK = k * (k + 1) / 2 - /** The upper triangular of A^T^ * A */ - val ata = new Array[Double](triK) /** A^T^ * A */ - val ata2 = new Array[Double](k * k) + val ata = new Array[Double](triK) /** A^T^ * b */ val atb = new Array[Double](k) private val da = new Array[Double](k) + private val ata2 = new Array[Double](k * k) private val upper = "U" private def copyToDouble(a: Array[Float]): Unit = { @@ -637,7 +636,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { } } - private def copyToTri(a: Array[Double]): Unit = { + private def copyToTri(): Unit = { var ii = 0 for(i <- 0 until k) for(j <- 0 to i) { @@ -662,7 +661,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { def addStack(a: Array[Double], b: Array[Double], n: Int): this.type = { require(a.length == n * k) blas.dsyrk(upper, "N", k, n, 1.0, a, k, 1.0, ata2, k) - copyToTri(ata2) + copyToTri() blas.dgemv("N", k, n, 1.0, a, k, b, 1, 1.0, atb, 1) this } @@ -1334,17 +1333,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { // for rating > 0. Because YtY is already added, we need to adjust the scaling here. if (rating > 0) { numExplicits += 1 - if (doStack) { - var ii = 0 - while(ii < srcFactor.length) { - srcFactorBuffer += srcFactor(ii) * c1 - ii += 1 - } - bBuffer += (c1 + 1.0) / c1 - } - else { - ls.add(srcFactor, (c1 + 1.0) / c1, c1) - } + ls.add(srcFactor, (c1 + 1.0) / c1, c1) } } else { numExplicits += 1 @@ -1355,14 +1344,13 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { srcFactorBuffer += srcFactor(ii) ii += 1 } - } - else { + } else { ls.add(srcFactor, rating) } } i += 1 } - if(numExplicits > 0 && doStack) { + if (!implicitPrefs && doStack) { ls.addStack(srcFactorBuffer.result(), bBuffer.result(), numExplicits) } // Weight lambda by the number of explicit ratings based on the ALS-WR paper. From 3607bdcd22cf03c068e777a1927ee3d5da49045a Mon Sep 17 00:00:00 2001 From: hqzizania Date: Tue, 28 Jun 2016 21:51:55 +0800 Subject: [PATCH 03/12] use "while" loop instead of "for" set stack size > 128 and comments added --- .../apache/spark/ml/recommendation/ALS.scala | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 6c3cb9e7b8ac7..8e9b3bdabf2a7 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -637,12 +637,19 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { } private def copyToTri(): Unit = { + var i = 0 + var j = 0 var ii = 0 - for(i <- 0 until k) - for(j <- 0 to i) { - ata(ii) += ata2(i * k + j) + while (i < k) { + val temp = i * k + j = 0 + while (j <= i) { + ata(ii) += ata2(temp + j) + j += 1 ii += 1 } + i += 1 + } } /** Adds an observation. */ @@ -1316,7 +1323,9 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { } var i = srcPtrs(j) var numExplicits = 0 - val doStack = if (srcPtrs(j + 1) - srcPtrs(j) > 10) true else false + // Stacking factors(vectors) in matrices to speed up the computation, + // when the number of factors and the rank is large enough. + val doStack = srcPtrs(j + 1) - srcPtrs(j) > 128 && rank > 128 val srcFactorBuffer = mutable.ArrayBuilder.make[Double] val bBuffer = mutable.ArrayBuilder.make[Double] while (i < srcPtrs(j + 1)) { From 56194eb8dc3ea8c233dbb362eeb83af5091abcba Mon Sep 17 00:00:00 2001 From: hqzizania Date: Wed, 29 Jun 2016 18:57:57 +0800 Subject: [PATCH 04/12] add unit test for dostack ALS --- .../scala/org/apache/spark/ml/recommendation/ALSSuite.scala | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala index e8ed50acf877c..ccd9020dcf869 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala @@ -389,6 +389,11 @@ class ALSSuite targetRMSE = 0.3) } + test("rank-129 matrix with stacking factors in matrices") { + val (training, test) = genExplicitTestData(numUsers = 200, numItems = 20, rank = 1) + testALS(training, test, maxIter = 1, rank = 129, regParam = 0.01, targetRMSE = 0.02) + } + test("using generic ID types") { val (ratings, _) = genImplicitTestData(numUsers = 20, numItems = 40, rank = 2, noiseStd = 0.01) From dc4f4badba26635aa95c6ade5a589d4bd50ae886 Mon Sep 17 00:00:00 2001 From: hqzizania Date: Fri, 21 Oct 2016 13:34:51 +0800 Subject: [PATCH 05/12] reset threshold values for doStack and remove UT --- .../main/scala/org/apache/spark/ml/recommendation/ALS.scala | 2 +- .../scala/org/apache/spark/ml/recommendation/ALSSuite.scala | 5 ----- 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 8e9b3bdabf2a7..b5a05e43a4ad0 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -1325,7 +1325,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { var numExplicits = 0 // Stacking factors(vectors) in matrices to speed up the computation, // when the number of factors and the rank is large enough. - val doStack = srcPtrs(j + 1) - srcPtrs(j) > 128 && rank > 128 + val doStack = srcPtrs(j + 1) - srcPtrs(j) > 1024 && rank > 1024 val srcFactorBuffer = mutable.ArrayBuilder.make[Double] val bBuffer = mutable.ArrayBuilder.make[Double] while (i < srcPtrs(j + 1)) { diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala index ccd9020dcf869..e8ed50acf877c 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala @@ -389,11 +389,6 @@ class ALSSuite targetRMSE = 0.3) } - test("rank-129 matrix with stacking factors in matrices") { - val (training, test) = genExplicitTestData(numUsers = 200, numItems = 20, rank = 1) - testALS(training, test, maxIter = 1, rank = 129, regParam = 0.01, targetRMSE = 0.02) - } - test("using generic ID types") { val (ratings, _) = genImplicitTestData(numUsers = 20, numItems = 40, rank = 2, noiseStd = 0.01) From d29fd67a2a24675b7be2f7f51ba170fda11a85d7 Mon Sep 17 00:00:00 2001 From: hqzizania Date: Sun, 23 Oct 2016 18:58:18 -0700 Subject: [PATCH 06/12] add threshold param to ALS --- .../apache/spark/ml/recommendation/ALS.scala | 39 ++++++++++++++----- .../spark/ml/recommendation/ALSSuite.scala | 10 ++++- 2 files changed, 39 insertions(+), 10 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index b5a05e43a4ad0..74dc71a395ca2 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -200,10 +200,24 @@ private[recommendation] trait ALSParams extends ALSModelParams with HasMaxIter w /** @group expertGetParam */ def getFinalStorageLevel: String = $(finalStorageLevel) + /** + * Param for threshold in computation of dst factors to decide + * if stacking factors to speed up the computation.(>= 1). + * Default: 1024 + * @group expertParam + */ + val threshold = new IntParam(this, "threshold", "threshold in computation of dst factors " + + "to decide if stacking factors to speed up the computation.", + ParamValidators.gtEq(1)) + + /** @group expertGetParam */ + def getThreshold: Int = $(threshold) + setDefault(rank -> 10, maxIter -> 10, regParam -> 0.1, numUserBlocks -> 10, numItemBlocks -> 10, implicitPrefs -> false, alpha -> 1.0, userCol -> "user", itemCol -> "item", ratingCol -> "rating", nonnegative -> false, checkpointInterval -> 10, - intermediateStorageLevel -> "MEMORY_AND_DISK", finalStorageLevel -> "MEMORY_AND_DISK") + intermediateStorageLevel -> "MEMORY_AND_DISK", finalStorageLevel -> "MEMORY_AND_DISK", + threshold -> 1024) /** * Validates and transforms the input schema. @@ -436,6 +450,10 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel] @Since("2.0.0") def setFinalStorageLevel(value: String): this.type = set(finalStorageLevel, value) + /** @group expertSetParam */ + @Since("2.1.0") + def setThreshold(value: Int): this.type = set(threshold, value) + /** * Sets both numUserBlocks and numItemBlocks to the specific value. * @@ -464,14 +482,15 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel] val instrLog = Instrumentation.create(this, ratings) instrLog.logParams(rank, numUserBlocks, numItemBlocks, implicitPrefs, alpha, userCol, itemCol, ratingCol, predictionCol, maxIter, - regParam, nonnegative, checkpointInterval, seed) + regParam, nonnegative, threshold, checkpointInterval, seed) val (userFactors, itemFactors) = ALS.train(ratings, rank = $(rank), numUserBlocks = $(numUserBlocks), numItemBlocks = $(numItemBlocks), maxIter = $(maxIter), regParam = $(regParam), implicitPrefs = $(implicitPrefs), alpha = $(alpha), nonnegative = $(nonnegative), intermediateRDDStorageLevel = StorageLevel.fromString($(intermediateStorageLevel)), finalRDDStorageLevel = StorageLevel.fromString($(finalStorageLevel)), - checkpointInterval = $(checkpointInterval), seed = $(seed)) + threshold = $(threshold), checkpointInterval = $(checkpointInterval), + seed = $(seed)) val userDF = userFactors.toDF("id", "features") val itemDF = itemFactors.toDF("id", "features") val model = new ALSModel(uid, $(rank), userDF, itemDF).setParent(this) @@ -706,6 +725,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { nonnegative: Boolean = false, intermediateRDDStorageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK, finalRDDStorageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK, + threshold: Int = 1024, checkpointInterval: Int = 10, seed: Long = 0L)( implicit ord: Ordering[ID]): (RDD[(ID, Array[Float])], RDD[(ID, Array[Float])]) = { @@ -752,7 +772,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { userFactors.setName(s"userFactors-$iter").persist(intermediateRDDStorageLevel) val previousItemFactors = itemFactors itemFactors = computeFactors(userFactors, userOutBlocks, itemInBlocks, rank, regParam, - userLocalIndexEncoder, implicitPrefs, alpha, solver) + userLocalIndexEncoder, implicitPrefs, alpha, solver, threshold) previousItemFactors.unpersist() itemFactors.setName(s"itemFactors-$iter").persist(intermediateRDDStorageLevel) // TODO: Generalize PeriodicGraphCheckpointer and use it here. @@ -762,7 +782,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { } val previousUserFactors = userFactors userFactors = computeFactors(itemFactors, itemOutBlocks, userInBlocks, rank, regParam, - itemLocalIndexEncoder, implicitPrefs, alpha, solver) + itemLocalIndexEncoder, implicitPrefs, alpha, solver, threshold) if (shouldCheckpoint(iter)) { ALS.cleanShuffleDependencies(sc, deps) deletePreviousCheckpointFile() @@ -773,7 +793,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { } else { for (iter <- 0 until maxIter) { itemFactors = computeFactors(userFactors, userOutBlocks, itemInBlocks, rank, regParam, - userLocalIndexEncoder, solver = solver) + userLocalIndexEncoder, solver = solver, threshold = threshold) if (shouldCheckpoint(iter)) { val deps = itemFactors.dependencies itemFactors.checkpoint() @@ -783,7 +803,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { previousCheckpointFile = itemFactors.getCheckpointFile } userFactors = computeFactors(itemFactors, itemOutBlocks, userInBlocks, rank, regParam, - itemLocalIndexEncoder, solver = solver) + itemLocalIndexEncoder, solver = solver, threshold = threshold) } } val userIdAndFactors = userInBlocks @@ -1297,7 +1317,8 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { srcEncoder: LocalIndexEncoder, implicitPrefs: Boolean = false, alpha: Double = 1.0, - solver: LeastSquaresNESolver): RDD[(Int, FactorBlock)] = { + solver: LeastSquaresNESolver, + threshold: Int): RDD[(Int, FactorBlock)] = { val numSrcBlocks = srcFactorBlocks.partitions.length val YtY = if (implicitPrefs) Some(computeYtY(srcFactorBlocks, rank)) else None val srcOut = srcOutBlocks.join(srcFactorBlocks).flatMap { @@ -1325,7 +1346,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { var numExplicits = 0 // Stacking factors(vectors) in matrices to speed up the computation, // when the number of factors and the rank is large enough. - val doStack = srcPtrs(j + 1) - srcPtrs(j) > 1024 && rank > 1024 + val doStack = srcPtrs(j + 1) - srcPtrs(j) > threshold && rank > threshold val srcFactorBuffer = mutable.ArrayBuilder.make[Double] val bBuffer = mutable.ArrayBuilder.make[Double] while (i < srcPtrs(j + 1)) { diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala index e8ed50acf877c..83291a9f4c80a 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala @@ -301,7 +301,8 @@ class ALSSuite implicitPrefs: Boolean = false, numUserBlocks: Int = 2, numItemBlocks: Int = 3, - targetRMSE: Double = 0.05): Unit = { + targetRMSE: Double = 0.05, + threshold: Int = 1024): Unit = { val spark = this.spark import spark.implicits._ val als = new ALS() @@ -311,6 +312,7 @@ class ALSSuite .setNumUserBlocks(numUserBlocks) .setNumItemBlocks(numItemBlocks) .setSeed(0) + .setThreshold(threshold) val alpha = als.getAlpha val model = als.fit(training.toDF()) val predictions = model.transform(test.toDF()).select("rating", "prediction").rdd.map { @@ -382,6 +384,12 @@ class ALSSuite numItemBlocks = 5, numUserBlocks = 5) } + test("do stacking factors in matrices") { + val (training, test) = genExplicitTestData(numUsers = 200, numItems = 20, rank = 1) + testALS(training, test, maxIter = 1, rank = 129, regParam = 0.01, targetRMSE = 0.02, + threshold = 128) + } + test("implicit feedback") { val (training, test) = genImplicitTestData(numUsers = 20, numItems = 40, rank = 2, noiseStd = 0.01) From 294164d839b0ce191fee341b0eb82b81d506d8c8 Mon Sep 17 00:00:00 2001 From: hqzizania Date: Sun, 23 Oct 2016 19:47:50 -0700 Subject: [PATCH 07/12] nit fix --- .../scala/org/apache/spark/ml/recommendation/ALS.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 74dc71a395ca2..9c654997e5a19 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -201,11 +201,11 @@ private[recommendation] trait ALSParams extends ALSModelParams with HasMaxIter w def getFinalStorageLevel: String = $(finalStorageLevel) /** - * Param for threshold in computation of dst factors to decide - * if stacking factors to speed up the computation.(>= 1). - * Default: 1024 - * @group expertParam - */ + * Param for threshold in computation of dst factors to decide + * if stacking factors to speed up the computation.(>= 1). + * Default: 1024 + * @group expertParam + */ val threshold = new IntParam(this, "threshold", "threshold in computation of dst factors " + "to decide if stacking factors to speed up the computation.", ParamValidators.gtEq(1)) From 513e7915ecb807bc04ed8a17fdaa121e9ac578b5 Mon Sep 17 00:00:00 2001 From: hqzizania Date: Sun, 23 Oct 2016 19:56:39 -0700 Subject: [PATCH 08/12] nit --- .../scala/org/apache/spark/ml/recommendation/ALSSuite.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala index 83291a9f4c80a..3b4b108096677 100644 --- a/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/ml/recommendation/ALSSuite.scala @@ -389,7 +389,7 @@ class ALSSuite testALS(training, test, maxIter = 1, rank = 129, regParam = 0.01, targetRMSE = 0.02, threshold = 128) } - + test("implicit feedback") { val (training, test) = genImplicitTestData(numUsers = 20, numItems = 40, rank = 2, noiseStd = 0.01) From 1081e64c3fbd31c3d35b987b3200eae8c8c688e2 Mon Sep 17 00:00:00 2001 From: hqzizania Date: Sun, 23 Oct 2016 21:44:52 -0700 Subject: [PATCH 09/12] mima fix --- project/MimaExcludes.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 350b144f8294b..53c0a268a6af2 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -864,6 +864,9 @@ object MimaExcludes { // [SPARK-12221] Add CPU time to metrics ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.TaskMetrics.this"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.TaskMetricDistributions.this") + ) ++ Seq( + // SPARK-6685 + ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.recommendation.ALS.train") ) } From a6b5a16cd78e4efe99fda40f92592c9712b04146 Mon Sep 17 00:00:00 2001 From: hqzizania Date: Sun, 23 Oct 2016 22:04:39 -0700 Subject: [PATCH 10/12] oops --- project/MimaExcludes.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index 53c0a268a6af2..fb7b806fbfd8d 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -866,7 +866,7 @@ object MimaExcludes { ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.TaskMetricDistributions.this") ) ++ Seq( // SPARK-6685 - ProblemFilters.exclude[IncompatibleMethTypeProblem]("org.apache.spark.ml.recommendation.ALS.train") + ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.recommendation.ALS.train") ) } From 20774575ad51d2a10e47c8f8ee7581a1519ace6b Mon Sep 17 00:00:00 2001 From: hqzizania Date: Tue, 25 Oct 2016 10:28:45 -0700 Subject: [PATCH 11/12] solve mima failure --- .../scala/org/apache/spark/ml/recommendation/ALS.scala | 8 ++++---- project/MimaExcludes.scala | 3 --- 2 files changed, 4 insertions(+), 7 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index d123c0b9d1973..4e0702a7c196f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -485,8 +485,8 @@ class ALS(@Since("1.4.0") override val uid: String) extends Estimator[ALSModel] alpha = $(alpha), nonnegative = $(nonnegative), intermediateRDDStorageLevel = StorageLevel.fromString($(intermediateStorageLevel)), finalRDDStorageLevel = StorageLevel.fromString($(finalStorageLevel)), - threshold = $(threshold), checkpointInterval = $(checkpointInterval), - seed = $(seed)) + checkpointInterval = $(checkpointInterval), + seed = $(seed), threshold = $(threshold)) val userDF = userFactors.toDF("id", "features") val itemDF = itemFactors.toDF("id", "features") val model = new ALSModel(uid, $(rank), userDF, itemDF).setParent(this) @@ -721,9 +721,9 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { nonnegative: Boolean = false, intermediateRDDStorageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK, finalRDDStorageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK, - threshold: Int = 1024, checkpointInterval: Int = 10, - seed: Long = 0L)( + seed: Long = 0L, + threshold: Int = 1024)( implicit ord: Ordering[ID]): (RDD[(ID, Array[Float])], RDD[(ID, Array[Float])]) = { require(intermediateRDDStorageLevel != StorageLevel.NONE, "ALS is not designed to run without persisting intermediate RDDs.") diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala index fb7b806fbfd8d..350b144f8294b 100644 --- a/project/MimaExcludes.scala +++ b/project/MimaExcludes.scala @@ -864,9 +864,6 @@ object MimaExcludes { // [SPARK-12221] Add CPU time to metrics ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.TaskMetrics.this"), ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.status.api.v1.TaskMetricDistributions.this") - ) ++ Seq( - // SPARK-6685 - ProblemFilters.exclude[DirectMissingMethodProblem]("org.apache.spark.ml.recommendation.ALS.train") ) } From 4fdcbe01a1797800d4480a75e38942f5b2443ac4 Mon Sep 17 00:00:00 2001 From: Peng Meng Date: Thu, 19 Oct 2017 23:18:30 +0800 Subject: [PATCH 12/12] use new Array to replace ArrayBuilder --- .../org/apache/spark/ml/recommendation/ALS.scala | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index e4dcd21df7369..27cda5e95f7a6 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -1670,8 +1670,10 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { // Stacking factors(vectors) in matrices to speed up the computation, // when the number of factors and the rank is large enough. val doStack = srcPtrs(j + 1) - srcPtrs(j) > threshold && rank > threshold - val srcFactorBuffer = mutable.ArrayBuilder.make[Double] - val bBuffer = mutable.ArrayBuilder.make[Double] + val srcFactorBuffer = new Array[Double]((srcPtrs(j + 1) - srcPtrs(j)) * rank) + val bBuffer = new Array[Double](srcPtrs(j + 1) - srcPtrs(j)) + var srcIndex = 0 + var bIndex = 0 while (i < srcPtrs(j + 1)) { val encoded = srcEncodedIndices(i) val blockId = srcEncoder.blockId(encoded) @@ -1691,10 +1693,12 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { } else { numExplicits += 1 if (doStack) { - bBuffer += rating + bBuffer(bIndex) = rating + bIndex += 1 var ii = 0 - while(ii < srcFactor.length) { - srcFactorBuffer += srcFactor(ii) + while(ii < rank) { + srcFactorBuffer(srcIndex) = srcFactor(ii) + srcIndex += 1 ii += 1 } } else { @@ -1704,7 +1708,7 @@ object ALS extends DefaultParamsReadable[ALS] with Logging { i += 1 } if (!implicitPrefs && doStack) { - ls.addStack(srcFactorBuffer.result(), bBuffer.result(), numExplicits) + ls.addStack(srcFactorBuffer, bBuffer, numExplicits) } // Weight lambda by the number of explicit ratings based on the ALS-WR paper. dstFactors(j) = solver.solve(ls, numExplicits * regParam)