From 52e2bda37a2cc6b8a20ca0ccf561cd17fe1f7fd1 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Fri, 8 May 2015 11:35:47 -0700 Subject: [PATCH 1/4] added ALS --- .../apache/spark/ml/recommendation/ALS.scala | 9 +- .../ml/param/_shared_params_code_gen.py | 2 +- python/pyspark/ml/param/shared.py | 132 ++++++++- python/pyspark/ml/recommendation.py | 271 ++++++++++++++++++ 4 files changed, 396 insertions(+), 18 deletions(-) create mode 100644 python/pyspark/ml/recommendation.py diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 6cf4b40075281..fd37ba1b0ac52 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -49,7 +49,7 @@ import org.apache.spark.util.random.XORShiftRandom * Common params for ALS. */ private[recommendation] trait ALSParams extends Params with HasMaxIter with HasRegParam - with HasPredictionCol with HasCheckpointInterval { + with HasPredictionCol with HasCheckpointInterval with HasSeed { /** * Param for rank of the matrix factorization (>= 1). @@ -147,7 +147,7 @@ private[recommendation] trait ALSParams extends Params with HasMaxIter with HasR setDefault(rank -> 10, maxIter -> 10, regParam -> 0.1, numUserBlocks -> 10, numItemBlocks -> 10, implicitPrefs -> false, alpha -> 1.0, userCol -> "user", itemCol -> "item", - ratingCol -> "rating", nonnegative -> false, checkpointInterval -> 10) + ratingCol -> "rating", nonnegative -> false, checkpointInterval -> 10, seed -> 42L) /** * Validates and transforms the input schema. @@ -290,7 +290,8 @@ class ALS extends Estimator[ALSModel] with ALSParams { override def fit(dataset: DataFrame): ALSModel = { val ratings = dataset - .select(col($(userCol)), col($(itemCol)), col($(ratingCol)).cast(FloatType)) + .select(col($(userCol)).cast(IntegerType), col($(itemCol)).cast(IntegerType), + col($(ratingCol)).cast(FloatType)) .map { row => Rating(row.getInt(0), row.getInt(1), row.getFloat(2)) } @@ -298,7 +299,7 @@ class ALS extends Estimator[ALSModel] with ALSParams { numUserBlocks = $(numUserBlocks), numItemBlocks = $(numItemBlocks), maxIter = $(maxIter), regParam = $(regParam), implicitPrefs = $(implicitPrefs), alpha = $(alpha), nonnegative = $(nonnegative), - checkpointInterval = $(checkpointInterval)) + checkpointInterval = $(checkpointInterval), seed = $(seed)) copyValues(new ALSModel(this, $(rank), userFactors, itemFactors)) } diff --git a/python/pyspark/ml/param/_shared_params_code_gen.py b/python/pyspark/ml/param/_shared_params_code_gen.py index c1c8e921dda87..3c4fd83cb4b8d 100644 --- a/python/pyspark/ml/param/_shared_params_code_gen.py +++ b/python/pyspark/ml/param/_shared_params_code_gen.py @@ -97,7 +97,7 @@ def get$Name(self): ("inputCol", "input column name", None), ("inputCols", "input column names", None), ("outputCol", "output column name", None), - ("numFeatures", "number of features", None)] + ("checkpointInterval", "checkpoint interval (>= 1)", None)] code = [] for name, doc, defaultValueStr in shared: code.append(_gen_param_code(name, doc, defaultValueStr)) diff --git a/python/pyspark/ml/param/shared.py b/python/pyspark/ml/param/shared.py index aaf80f00085bf..8af6efe794a4b 100644 --- a/python/pyspark/ml/param/shared.py +++ b/python/pyspark/ml/param/shared.py @@ -281,30 +281,136 @@ def getOutputCol(self): return self.getOrDefault(self.outputCol) -class HasNumFeatures(Params): + +class HasNumItemBlocks(Params): + """ + Mixin for param numItemBlocks: number of item blocks. + """ + + # a placeholder to make it appear in the generated doc + + + def __init__(self): + super(HasNumItemBlocks, self).__init__() + #: param for number of item blocks + + if None is not None: + self._setDefault(numItemBlocks=None) + + + +class HasImplicitPrefs(Params): + """ + Mixin for param implicitPrefs: whether to use implicit preference. + """ + + # a placeholder to make it appear in the generated doc + + + def __init__(self): + super(HasImplicitPrefs, self).__init__() + #: param for whether to use implicit preference + + if None is not None: + self._setDefault(implicitPrefs=None) + + + + +class HasAlpha(Params): + """ + Mixin for param alpha: alpha for implicit preference. + """ + + # a placeholder to make it appear in the generated doc + + + def __init__(self): + super(HasAlpha, self).__init__() + #: param for alpha for implicit preference + + if None is not None: + self._setDefault(alpha=None) + + + + +class HasUserCol(Params): + """ + Mixin for param userCol: column name for user ids. + """ + + # a placeholder to make it appear in the generated doc + + + def __init__(self): + super(HasUserCol, self).__init__() + #: param for column name for user ids + + if None is not None: + self._setDefault(userCol=None) + + + + +class HasItemCol(Params): + """ + Mixin for param itemCol: column name for item ids. + """ + + # a placeholder to make it appear in the generated doc + + + def __init__(self): + super(HasItemCol, self).__init__() + #: param for column name for item ids + + if None is not None: + self._setDefault(itemCol=None) + + + + +class HasRatingCol(Params): + """ + Mixin for param ratingCol: column name for ratings. + """ + + # a placeholder to make it appear in the generated doc + + + def __init__(self): + super(HasRatingCol, self).__init__() + #: param for column name for ratings + + if None is not None: + self._setDefault(ratingCol=None) + + +class HasCheckpointInterval(Params): """ - Mixin for param numFeatures: number of features. + Mixin for param checkpointInterval: checkpoint interval (>= 1). """ # a placeholder to make it appear in the generated doc - numFeatures = Param(Params._dummy(), "numFeatures", "number of features") + checkpointInterval = Param(Params._dummy(), "checkpointInterval", "checkpoint interval (>= 1)") def __init__(self): - super(HasNumFeatures, self).__init__() - #: param for number of features - self.numFeatures = Param(self, "numFeatures", "number of features") + super(HasCheckpointInterval, self).__init__() + #: param for checkpoint interval (>= 1) + self.checkpointInterval = Param(self, "checkpointInterval", "checkpoint interval (>= 1)") if None is not None: - self._setDefault(numFeatures=None) + self._setDefault(checkpointInterval=None) - def setNumFeatures(self, value): + def setCheckpointInterval(self, value): """ - Sets the value of :py:attr:`numFeatures`. + Sets the value of :py:attr:`checkpointInterval`. """ - self.paramMap[self.numFeatures] = value + self.paramMap[self.checkpointInterval] = value return self - def getNumFeatures(self): + def getCheckpointInterval(self): """ - Gets the value of numFeatures or its default value. + Gets the value of checkpointInterval or its default value. """ - return self.getOrDefault(self.numFeatures) + return self.getOrDefault(self.checkpointInterval) diff --git a/python/pyspark/ml/recommendation.py b/python/pyspark/ml/recommendation.py new file mode 100644 index 0000000000000..6be0f85b49ba2 --- /dev/null +++ b/python/pyspark/ml/recommendation.py @@ -0,0 +1,271 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from pyspark.ml.util import keyword_only +from pyspark.ml.wrapper import JavaEstimator, JavaModel +from pyspark.ml.param.shared import * +from pyspark.mllib.common import inherit_doc + + +__all__ = ['ALS', 'ALSModel'] + + +@inherit_doc +class ALS(JavaEstimator, HasCheckpointInterval, HasMaxIter, HasPredictionCol, HasRegParam, HasSeed): + """ + Alternating Least Squares (ALS) matrix factorization. + + ALS attempts to estimate the ratings matrix `R` as the product of two lower-rank matrices, + `X` and `Y`, i.e. `X * Yt = R`. Typically these approximations are called 'factor' matrices. + The general approach is iterative. During each iteration, one of the factor matrices is held + constant, while the other is solved for using least squares. The newly-solved factor matrix is + then held constant while solving for the other factor matrix. + + This is a blocked implementation of the ALS factorization algorithm that groups the two sets + of factors (referred to as "users" and "products") into blocks and reduces communication by only + sending one copy of each user vector to each product block on each iteration, and only for the + product blocks that need that user's feature vector. This is achieved by pre-computing some + information about the ratings matrix to determine the "out-links" of each user (which blocks of + products it will contribute to) and "in-link" information for each product (which of the feature + vectors it receives from each user block it will depend on). This allows us to send only an + array of feature vectors between each user block and product block, and have the product block + find the users' ratings and update the products based on these messages. + + For implicit preference data, the algorithm used is based on + "Collaborative Filtering for Implicit Feedback Datasets", available at + `http://dx.doi.org/10.1109/ICDM.2008.22`, adapted for the blocked approach used here. + + Essentially instead of finding the low-rank approximations to the rating matrix `R`, + this finds the approximations for a preference matrix `P` where the elements of `P` are 1 if + r > 0 and 0 if r <= 0. The ratings then act as 'confidence' values related to strength of + indicated user preferences rather than explicit ratings given to items. + + >>> als = ALS(rank=10, maxIter=5) + >>> model = als.fit(df) + >>> test = sqlContext.createDataFrame([(0, 2), (1, 0), (2, 0)], ["user", "item"]) + >>> predictions = sorted(model.transform(test).collect(), key=lambda r: r[0]) + >>> predictions[0] + Row(user=0, item=2, prediction=0.39...) + >>> predictions[1] + Row(user=1, item=0, prediction=3.19...) + >>> predictions[2] + Row(user=2, item=0, prediction=-1.15...) + """ + _java_class = "org.apache.spark.ml.recommendation.ALS" + # a placeholder to make it appear in the generated doc + rank = Param(Params._dummy(), "rank", "rank of the factorization") + numUserBlocks = Param(Params._dummy(), "numUserBlocks", "number of user blocks") + numItemBlocks = Param(Params._dummy(), "numItemBlocks", "number of item blocks") + implicitPrefs = Param(Params._dummy(), "implicitPrefs", "whether to use implicit preference") + alpha = Param(Params._dummy(), "alpha", "alpha for implicit preference") + userCol = Param(Params._dummy(), "userCol", "column name for user ids") + itemCol = Param(Params._dummy(), "itemCol", "column name for item ids") + ratingCol = Param(Params._dummy(), "ratingCol", "column name for ratings") + nonnegative = Param(Params._dummy(), "nonnegative", + "whether to use nonnegative constraint for least squares") + + @keyword_only + def __init__(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, + implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=42L, + ratingCol="rating", nonnegative=False, checkpointInterval=10): + """ + __init__(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, + implicitPrefs=false, alpha=1.0, userCol="user", itemCol="item", seed=42L, + ratingCol="rating", nonnegative=false, checkpointInterval=10) + """ + super(ALS, self).__init__() + self.rank = Param(self, "rank", "rank of the factorization") + self.numUserBlocks = Param(self, "numUserBlocks", "number of user blocks") + self.numItemBlocks = Param(self, "numItemBlocks", "number of item blocks") + self.implicitPrefs = Param(self, "implicitPrefs", "whether to use implicit preference") + self.alpha = Param(self, "alpha", "alpha for implicit preference") + self.userCol = Param(self, "userCol", "column name for user ids") + self.itemCol = Param(self, "itemCol", "column name for item ids") + self.ratingCol = Param(self, "ratingCol", "column name for ratings") + self.nonnegative = Param(self, "nonnegative", + "whether to use nonnegative constraint for least squares") + self._setDefault(rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, + implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=42, + ratingCol="rating", nonnegative=False, checkpointInterval=10) + kwargs = self.__init__._input_kwargs + self.setParams(**kwargs) + + @keyword_only + def setParams(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, + implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=42, + ratingCol="rating", nonnegative=False, checkpointInterval=10): + """ + setParams(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, + implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=42, + ratingCol="rating", nonnegative=False, checkpointInterval=10) + Sets params for ALS. + """ + kwargs = self.setParams._input_kwargs + return self._set(**kwargs) + + def _create_model(self, java_model): + return ALSModel(java_model) + + def setRank(self, value): + """ + Sets the value of :py:attr:`rank`. + """ + self.paramMap[self.rank] = value + return self + + def getRank(self): + """ + Gets the value of rank or its default value. + """ + return self.getOrDefault(self.rank) + + def setNumUserBlocks(self, value): + """ + Sets the value of :py:attr:`numUserBlocks`. + """ + self.paramMap[self.numUserBlocks] = value + return self + + def getNumUserBlocks(self): + """ + Gets the value of numUserBlocks or its default value. + """ + return self.getOrDefault(self.numUserBlocks) + + def setNumItemBlocks(self, value): + """ + Sets the value of :py:attr:`numItemBlocks`. + """ + self.paramMap[self.numItemBlocks] = value + return self + + def getNumItemBlocks(self): + """ + Gets the value of numItemBlocks or its default value. + """ + return self.getOrDefault(self.numItemBlocks) + + def setNumBlocks(self, value): + """ + Sets both :py:attr:`numUserBlocks` and :py:attr:`numItemBlocks` to the specific value. + """ + self.paramMap[self.numUserBlocks] = value + self.paramMap[self.numItemBlocks] = value + + + def setImplicitPrefs(self, value): + """ + Sets the value of :py:attr:`implicitPrefs`. + """ + self.paramMap[self.implicitPrefs] = value + return self + + def getImplicitPrefs(self): + """ + Gets the value of implicitPrefs or its default value. + """ + return self.getOrDefault(self.implicitPrefs) + + def setAlpha(self, value): + """ + Sets the value of :py:attr:`alpha`. + """ + self.paramMap[self.alpha] = value + return self + + def getAlpha(self): + """ + Gets the value of alpha or its default value. + """ + return self.getOrDefault(self.alpha) + + def setUserCol(self, value): + """ + Sets the value of :py:attr:`userCol`. + """ + self.paramMap[self.userCol] = value + return self + + def getUserCol(self): + """ + Gets the value of userCol or its default value. + """ + return self.getOrDefault(self.userCol) + + def setItemCol(self, value): + """ + Sets the value of :py:attr:`itemCol`. + """ + self.paramMap[self.itemCol] = value + return self + + def getItemCol(self): + """ + Gets the value of itemCol or its default value. + """ + return self.getOrDefault(self.itemCol) + + def setRatingCol(self, value): + """ + Sets the value of :py:attr:`ratingCol`. + """ + self.paramMap[self.ratingCol] = value + return self + + def getRatingCol(self): + """ + Gets the value of ratingCol or its default value. + """ + return self.getOrDefault(self.ratingCol) + + def setNonnegative(self, value): + """ + Sets the value of :py:attr:`nonnegative`. + """ + self.paramMap[self.nonnegative] = value + return self + + def getNonnegative(self): + """ + Gets the value of nonnegative or its default value. + """ + return self.getOrDefault(self.nonnegative) + + +class ALSModel(JavaModel): + """ + Model fitted by ALS. + """ + + +if __name__ == "__main__": + import doctest + from pyspark.context import SparkContext + from pyspark.sql import SQLContext + globs = globals().copy() + # The small batch size here ensures that we see multiple batches, + # even in these small test examples: + sc = SparkContext("local[2]", "ml.recommendation tests") + sqlContext = SQLContext(sc) + globs['sc'] = sc + globs['sqlContext'] = sqlContext + globs['df'] = sqlContext.createDataFrame([(0, 0, 4.0), (0, 1, 2.0), (1, 1, 3.0), (1, 2, 4.0), + (2, 1, 1.0), (2, 2, 5.0)], ["user", "item", "rating"]) + (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) + sc.stop() + if failure_count: + exit(-1) From 0bd66b19ee3b7a6cd18cc8ff2b8cfcd20121aa30 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Fri, 8 May 2015 11:59:57 -0700 Subject: [PATCH 2/4] fixed seed --- .../org/apache/spark/ml/recommendation/ALS.scala | 2 +- python/pyspark/ml/recommendation.py | 13 ++++++------- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index fd37ba1b0ac52..3d72a8717b323 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -147,7 +147,7 @@ private[recommendation] trait ALSParams extends Params with HasMaxIter with HasR setDefault(rank -> 10, maxIter -> 10, regParam -> 0.1, numUserBlocks -> 10, numItemBlocks -> 10, implicitPrefs -> false, alpha -> 1.0, userCol -> "user", itemCol -> "item", - ratingCol -> "rating", nonnegative -> false, checkpointInterval -> 10, seed -> 42L) + ratingCol -> "rating", nonnegative -> false, checkpointInterval -> 10, seed -> 0L) /** * Validates and transforms the input schema. diff --git a/python/pyspark/ml/recommendation.py b/python/pyspark/ml/recommendation.py index 6be0f85b49ba2..193dd0de444a1 100644 --- a/python/pyspark/ml/recommendation.py +++ b/python/pyspark/ml/recommendation.py @@ -80,11 +80,11 @@ class ALS(JavaEstimator, HasCheckpointInterval, HasMaxIter, HasPredictionCol, Ha @keyword_only def __init__(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, - implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=42L, + implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=0, ratingCol="rating", nonnegative=False, checkpointInterval=10): """ __init__(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, - implicitPrefs=false, alpha=1.0, userCol="user", itemCol="item", seed=42L, + implicitPrefs=false, alpha=1.0, userCol="user", itemCol="item", seed=0, ratingCol="rating", nonnegative=false, checkpointInterval=10) """ super(ALS, self).__init__() @@ -99,18 +99,18 @@ def __init__(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemB self.nonnegative = Param(self, "nonnegative", "whether to use nonnegative constraint for least squares") self._setDefault(rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, - implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=42, + implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=0, ratingCol="rating", nonnegative=False, checkpointInterval=10) kwargs = self.__init__._input_kwargs self.setParams(**kwargs) @keyword_only def setParams(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, - implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=42, + implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=0, ratingCol="rating", nonnegative=False, checkpointInterval=10): """ setParams(self, rank=10, maxIter=10, regParam=0.1, numUserBlocks=10, numItemBlocks=10, - implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=42, + implicitPrefs=False, alpha=1.0, userCol="user", itemCol="item", seed=0, ratingCol="rating", nonnegative=False, checkpointInterval=10) Sets params for ALS. """ @@ -166,7 +166,6 @@ def setNumBlocks(self, value): self.paramMap[self.numUserBlocks] = value self.paramMap[self.numItemBlocks] = value - def setImplicitPrefs(self, value): """ Sets the value of :py:attr:`implicitPrefs`. @@ -264,7 +263,7 @@ class ALSModel(JavaModel): globs['sc'] = sc globs['sqlContext'] = sqlContext globs['df'] = sqlContext.createDataFrame([(0, 0, 4.0), (0, 1, 2.0), (1, 1, 3.0), (1, 2, 4.0), - (2, 1, 1.0), (2, 2, 5.0)], ["user", "item", "rating"]) + (2, 1, 1.0), (2, 2, 5.0)], ["user", "item", "rating"]) (failure_count, test_count) = doctest.testmod(globs=globs, optionflags=doctest.ELLIPSIS) sc.stop() if failure_count: From eaed879e1f0ef340eb6a832cc7d5210cdb80489f Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Fri, 8 May 2015 12:06:02 -0700 Subject: [PATCH 3/4] readd numFeatures --- .../ml/param/_shared_params_code_gen.py | 1 + python/pyspark/ml/param/shared.py | 29 +++++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/python/pyspark/ml/param/_shared_params_code_gen.py b/python/pyspark/ml/param/_shared_params_code_gen.py index 94eced2d21963..ed3171b6976d3 100644 --- a/python/pyspark/ml/param/_shared_params_code_gen.py +++ b/python/pyspark/ml/param/_shared_params_code_gen.py @@ -97,6 +97,7 @@ def get$Name(self): ("inputCol", "input column name", None), ("inputCols", "input column names", None), ("outputCol", "output column name", None), + ("numFeatures", "number of features", None), ("checkpointInterval", "checkpoint interval (>= 1)", None), ("seed", "random seed", None), ("tol", "the convergence tolerance for iterative algorithms", None), diff --git a/python/pyspark/ml/param/shared.py b/python/pyspark/ml/param/shared.py index 8bbed3e98602e..d0bcadee22347 100644 --- a/python/pyspark/ml/param/shared.py +++ b/python/pyspark/ml/param/shared.py @@ -281,6 +281,35 @@ def getOutputCol(self): return self.getOrDefault(self.outputCol) +class HasNumFeatures(Params): + """ + Mixin for param numFeatures: number of features. + """ + + # a placeholder to make it appear in the generated doc + numFeatures = Param(Params._dummy(), "numFeatures", "number of features") + + def __init__(self): + super(HasNumFeatures, self).__init__() + #: param for number of features + self.numFeatures = Param(self, "numFeatures", "number of features") + if None is not None: + self._setDefault(numFeatures=None) + + def setNumFeatures(self, value): + """ + Sets the value of :py:attr:`numFeatures`. + """ + self.paramMap[self.numFeatures] = value + return self + + def getNumFeatures(self): + """ + Gets the value of numFeatures or its default value. + """ + return self.getOrDefault(self.numFeatures) + + class HasCheckpointInterval(Params): """ Mixin for param checkpointInterval: checkpoint interval (>= 1). From be6e9317df073b4e9d123ac64b88f106e6dd6536 Mon Sep 17 00:00:00 2001 From: Burak Yavuz Date: Fri, 8 May 2015 14:10:20 -0700 Subject: [PATCH 4/4] addressed comments --- .../apache/spark/ml/recommendation/ALS.scala | 3 ++ python/pyspark/ml/recommendation.py | 53 +++++++++++-------- 2 files changed, 34 insertions(+), 22 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala index 3d72a8717b323..d7cbffc3be26f 100644 --- a/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/ml/recommendation/ALS.scala @@ -278,6 +278,9 @@ class ALS extends Estimator[ALSModel] with ALSParams { /** @group setParam */ def setCheckpointInterval(value: Int): this.type = set(checkpointInterval, value) + /** @group setParam */ + def setSeed(value: Long): this.type = set(seed, value) + /** * Sets both numUserBlocks and numItemBlocks to the specific value. * @group setParam diff --git a/python/pyspark/ml/recommendation.py b/python/pyspark/ml/recommendation.py index 193dd0de444a1..4846b907e85ec 100644 --- a/python/pyspark/ml/recommendation.py +++ b/python/pyspark/ml/recommendation.py @@ -29,30 +29,39 @@ class ALS(JavaEstimator, HasCheckpointInterval, HasMaxIter, HasPredictionCol, Ha """ Alternating Least Squares (ALS) matrix factorization. - ALS attempts to estimate the ratings matrix `R` as the product of two lower-rank matrices, - `X` and `Y`, i.e. `X * Yt = R`. Typically these approximations are called 'factor' matrices. - The general approach is iterative. During each iteration, one of the factor matrices is held - constant, while the other is solved for using least squares. The newly-solved factor matrix is - then held constant while solving for the other factor matrix. - - This is a blocked implementation of the ALS factorization algorithm that groups the two sets - of factors (referred to as "users" and "products") into blocks and reduces communication by only - sending one copy of each user vector to each product block on each iteration, and only for the - product blocks that need that user's feature vector. This is achieved by pre-computing some - information about the ratings matrix to determine the "out-links" of each user (which blocks of - products it will contribute to) and "in-link" information for each product (which of the feature - vectors it receives from each user block it will depend on). This allows us to send only an - array of feature vectors between each user block and product block, and have the product block - find the users' ratings and update the products based on these messages. + ALS attempts to estimate the ratings matrix `R` as the product of + two lower-rank matrices, `X` and `Y`, i.e. `X * Yt = R`. Typically + these approximations are called 'factor' matrices. The general + approach is iterative. During each iteration, one of the factor + matrices is held constant, while the other is solved for using least + squares. The newly-solved factor matrix is then held constant while + solving for the other factor matrix. + + This is a blocked implementation of the ALS factorization algorithm + that groups the two sets of factors (referred to as "users" and + "products") into blocks and reduces communication by only sending + one copy of each user vector to each product block on each + iteration, and only for the product blocks that need that user's + feature vector. This is achieved by pre-computing some information + about the ratings matrix to determine the "out-links" of each user + (which blocks of products it will contribute to) and "in-link" + information for each product (which of the feature vectors it + receives from each user block it will depend on). This allows us to + send only an array of feature vectors between each user block and + product block, and have the product block find the users' ratings + and update the products based on these messages. For implicit preference data, the algorithm used is based on - "Collaborative Filtering for Implicit Feedback Datasets", available at - `http://dx.doi.org/10.1109/ICDM.2008.22`, adapted for the blocked approach used here. - - Essentially instead of finding the low-rank approximations to the rating matrix `R`, - this finds the approximations for a preference matrix `P` where the elements of `P` are 1 if - r > 0 and 0 if r <= 0. The ratings then act as 'confidence' values related to strength of - indicated user preferences rather than explicit ratings given to items. + "Collaborative Filtering for Implicit Feedback Datasets", available + at `http://dx.doi.org/10.1109/ICDM.2008.22`, adapted for the blocked + approach used here. + + Essentially instead of finding the low-rank approximations to the + rating matrix `R`, this finds the approximations for a preference + matrix `P` where the elements of `P` are 1 if r > 0 and 0 if r <= 0. + The ratings then act as 'confidence' values related to strength of + indicated user preferences rather than explicit ratings given to + items. >>> als = ALS(rank=10, maxIter=5) >>> model = als.fit(df)