From 62fc43c7f91704f23d448d58872755a0e1743274 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Thu, 5 Feb 2015 23:34:33 -0800 Subject: [PATCH 1/4] implement save/load for MFM --- .../spark/mllib/recommendation/ALS.scala | 2 +- .../MatrixFactorizationModel.scala | 64 ++++++++++++++++++- 2 files changed, 63 insertions(+), 3 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala index 4bb28d1b1e071..caacab943030b 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/ALS.scala @@ -18,7 +18,7 @@ package org.apache.spark.mllib.recommendation import org.apache.spark.Logging -import org.apache.spark.annotation.{DeveloperApi, Experimental} +import org.apache.spark.annotation.DeveloperApi import org.apache.spark.api.java.JavaRDD import org.apache.spark.ml.recommendation.{ALS => NewALS} import org.apache.spark.rdd.RDD diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index ed2f8b41bcae5..78f4acba2d15e 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -19,11 +19,15 @@ package org.apache.spark.mllib.recommendation import java.lang.{Integer => JavaInteger} +import org.apache.hadoop.fs.Path import org.jblas.DoubleMatrix -import org.apache.spark.Logging +import org.apache.spark.{Logging, SparkContext} import org.apache.spark.api.java.{JavaPairRDD, JavaRDD} +import org.apache.spark.mllib.recommendation.MatrixFactorizationModel.SaveLoadV1_0 +import org.apache.spark.mllib.util.{Loader, Saveable} import org.apache.spark.rdd.RDD +import org.apache.spark.sql.{Row, SQLContext} import org.apache.spark.storage.StorageLevel /** @@ -41,7 +45,8 @@ import org.apache.spark.storage.StorageLevel class MatrixFactorizationModel( val rank: Int, val userFeatures: RDD[(Int, Array[Double])], - val productFeatures: RDD[(Int, Array[Double])]) extends Serializable with Logging { + val productFeatures: RDD[(Int, Array[Double])]) + extends Saveable with Serializable with Logging { require(rank > 0) validateFeatures("User", userFeatures) @@ -125,6 +130,11 @@ class MatrixFactorizationModel( recommend(productFeatures.lookup(product).head, userFeatures, num) .map(t => Rating(t._1, product, t._2)) + + override def save(sc: SparkContext, path: String): Unit = { + SaveLoadV1_0.save(this, path) + } + private def recommend( recommendToFeatures: Array[Double], recommendableFeatures: RDD[(Int, Array[Double])], @@ -136,3 +146,53 @@ class MatrixFactorizationModel( scored.top(num)(Ordering.by(_._2)) } } + +private object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { + + import org.apache.spark.mllib.util.Loader._ + + private object SaveLoadV1_0 { + + private val thisFormatVersion = "1.0" + + private val thisClassName = "org.apache.spark.mllib.recommendation.MatrixFactorizationModel" + + def save(model: MatrixFactorizationModel, path: String): Unit = { + val sc = model.userFeatures.sparkContext + val sqlContext = new SQLContext(sc) + import sqlContext.implicits.createDataFrame + val metadata = (thisClassName, thisFormatVersion, model.rank) + val metadataRDD = sc.parallelize(Seq(metadata), 1).toDataFrame("class", "version", "rank") + metadataRDD.toJSON.saveAsTextFile(metadataPath(path)) + model.userFeatures.toDataFrame("id", "features").saveAsParquetFile(userPath(path)) + model.productFeatures.toDataFrame("id", "features").saveAsParquetFile(productPath(path)) + } + + override def load(sc: SparkContext, path: String): MatrixFactorizationModel = { + val sqlContext = new SQLContext(sc) + val (className, formatVersion, metadata) = loadMetadata(sc, path) + assert(className == thisClassName) + assert(formatVersion == thisFormatVersion) + val rank = metadata.select("rank").map { case Row(r: Int) => + r + }.first() + val userFeatures = sqlContext.parquetFile(userPath(path)) + .map { case Row(id: Int, features: Seq[Double]) => + (id, features.toArray) + } + val productFeatures = sqlContext.parquetFile(productPath(path)) + .map { case Row(id: Int, features: Seq[Double]) => + (id, features.toArray) + } + new MatrixFactorizationModel(r, userFeatures, productFeatures) + } + + private def userPath(path: String): String = { + new Path(dataPath(path), "user").toUri.toString + } + + private def productPath(path: String): String = { + new Path(dataPath(path), "product").toUri.toString + } + } +} From f487cb216fdfddbd12772456f277ccbb1bb7a116 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Fri, 6 Feb 2015 00:22:48 -0800 Subject: [PATCH 2/4] add unit tests --- .../MatrixFactorizationModel.scala | 33 ++++++++++++++----- .../MatrixFactorizationModelSuite.scala | 19 +++++++++++ 2 files changed, 44 insertions(+), 8 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index 78f4acba2d15e..08ff68a1b3d6c 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -17,6 +17,7 @@ package org.apache.spark.mllib.recommendation +import java.io.IOException import java.lang.{Integer => JavaInteger} import org.apache.hadoop.fs.Path @@ -24,7 +25,6 @@ import org.jblas.DoubleMatrix import org.apache.spark.{Logging, SparkContext} import org.apache.spark.api.java.{JavaPairRDD, JavaRDD} -import org.apache.spark.mllib.recommendation.MatrixFactorizationModel.SaveLoadV1_0 import org.apache.spark.mllib.util.{Loader, Saveable} import org.apache.spark.rdd.RDD import org.apache.spark.sql.{Row, SQLContext} @@ -130,9 +130,10 @@ class MatrixFactorizationModel( recommend(productFeatures.lookup(product).head, userFeatures, num) .map(t => Rating(t._1, product, t._2)) + override val formatVersion: String = "1.0" override def save(sc: SparkContext, path: String): Unit = { - SaveLoadV1_0.save(this, path) + MatrixFactorizationModel.SaveLoadV1_0.save(this, path) } private def recommend( @@ -151,12 +152,30 @@ private object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] import org.apache.spark.mllib.util.Loader._ - private object SaveLoadV1_0 { + override def load(sc: SparkContext, path: String): MatrixFactorizationModel = { + val (loadedClassName, formatVersion, metadata) = loadMetadata(sc, path) + val classNameV1_0 = SaveLoadV1_0.thisClassName + (loadedClassName, formatVersion) match { + case (className, "1.0") if className == classNameV1_0 => + SaveLoadV1_0.load(sc, path) + case _ => + throw new IOException("" + + "MatrixFactorizationModel.load did not recognize model with" + + s"(class: $loadedClassName, version: $formatVersion). Supported:\n" + + s" ($classNameV1_0, 1.0)") + } + } + + private object SaveLoadV1_0 extends Loader[MatrixFactorizationModel] { private val thisFormatVersion = "1.0" - private val thisClassName = "org.apache.spark.mllib.recommendation.MatrixFactorizationModel" + val thisClassName = "org.apache.spark.mllib.recommendation.MatrixFactorizationModel" + /** + * Saves a [[MatrixFactorizationModel]], where user features are saved under `data/users` and + * product features are saved under `data/products`. + */ def save(model: MatrixFactorizationModel, path: String): Unit = { val sc = model.userFeatures.sparkContext val sqlContext = new SQLContext(sc) @@ -173,9 +192,7 @@ private object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] val (className, formatVersion, metadata) = loadMetadata(sc, path) assert(className == thisClassName) assert(formatVersion == thisFormatVersion) - val rank = metadata.select("rank").map { case Row(r: Int) => - r - }.first() + val rank = metadata.select("rank").first().getInt(0) val userFeatures = sqlContext.parquetFile(userPath(path)) .map { case Row(id: Int, features: Seq[Double]) => (id, features.toArray) @@ -184,7 +201,7 @@ private object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] .map { case Row(id: Int, features: Seq[Double]) => (id, features.toArray) } - new MatrixFactorizationModel(r, userFeatures, productFeatures) + new MatrixFactorizationModel(rank, userFeatures, productFeatures) } private def userPath(path: String): String = { diff --git a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModelSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModelSuite.scala index b9caecc904a23..9801e87576744 100644 --- a/mllib/src/test/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModelSuite.scala +++ b/mllib/src/test/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModelSuite.scala @@ -22,6 +22,7 @@ import org.scalatest.FunSuite import org.apache.spark.mllib.util.MLlibTestSparkContext import org.apache.spark.mllib.util.TestingUtils._ import org.apache.spark.rdd.RDD +import org.apache.spark.util.Utils class MatrixFactorizationModelSuite extends FunSuite with MLlibTestSparkContext { @@ -53,4 +54,22 @@ class MatrixFactorizationModelSuite extends FunSuite with MLlibTestSparkContext new MatrixFactorizationModel(rank, userFeatures, prodFeatures1) } } + + test("save/load") { + val model = new MatrixFactorizationModel(rank, userFeatures, prodFeatures) + val tempDir = Utils.createTempDir() + val path = tempDir.toURI.toString + def collect(features: RDD[(Int, Array[Double])]): Set[(Int, Seq[Double])] = { + features.mapValues(_.toSeq).collect().toSet + } + try { + model.save(sc, path) + val newModel = MatrixFactorizationModel.load(sc, path) + assert(newModel.rank === rank) + assert(collect(newModel.userFeatures) === collect(userFeatures)) + assert(collect(newModel.productFeatures) === collect(prodFeatures)) + } finally { + Utils.deleteRecursively(tempDir) + } + } } From 14b7ea65addd05d166a835280c9a6827c4c33788 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Fri, 6 Feb 2015 14:21:57 -0800 Subject: [PATCH 3/4] address comments --- .../recommendation/MatrixFactorizationModel.scala | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index 08ff68a1b3d6c..e1373b5e4c7ef 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -130,7 +130,7 @@ class MatrixFactorizationModel( recommend(productFeatures.lookup(product).head, userFeatures, num) .map(t => Rating(t._1, product, t._2)) - override val formatVersion: String = "1.0" + protected override val formatVersion: String = "1.0" override def save(sc: SparkContext, path: String): Unit = { MatrixFactorizationModel.SaveLoadV1_0.save(this, path) @@ -148,7 +148,7 @@ class MatrixFactorizationModel( } } -private object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { +object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { import org.apache.spark.mllib.util.Loader._ @@ -159,17 +159,18 @@ private object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] case (className, "1.0") if className == classNameV1_0 => SaveLoadV1_0.load(sc, path) case _ => - throw new IOException("" + - "MatrixFactorizationModel.load did not recognize model with" + + throw new IOException("MatrixFactorizationModel.load did not recognize model with" + s"(class: $loadedClassName, version: $formatVersion). Supported:\n" + s" ($classNameV1_0, 1.0)") } } - private object SaveLoadV1_0 extends Loader[MatrixFactorizationModel] { + private[recommendation] + object SaveLoadV1_0 extends Loader[MatrixFactorizationModel] { private val thisFormatVersion = "1.0" + private[recommendation] val thisClassName = "org.apache.spark.mllib.recommendation.MatrixFactorizationModel" /** From a0593947822ed3294881c0b23d8b8a83ff2aee62 Mon Sep 17 00:00:00 2001 From: Xiangrui Meng Date: Sat, 7 Feb 2015 10:44:52 -0800 Subject: [PATCH 4/4] SaveLoad not extending Loader --- .../spark/mllib/recommendation/MatrixFactorizationModel.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala index e1373b5e4c7ef..9ff06ac362a31 100644 --- a/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala +++ b/mllib/src/main/scala/org/apache/spark/mllib/recommendation/MatrixFactorizationModel.scala @@ -166,7 +166,7 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { } private[recommendation] - object SaveLoadV1_0 extends Loader[MatrixFactorizationModel] { + object SaveLoadV1_0 { private val thisFormatVersion = "1.0" @@ -188,7 +188,7 @@ object MatrixFactorizationModel extends Loader[MatrixFactorizationModel] { model.productFeatures.toDataFrame("id", "features").saveAsParquetFile(productPath(path)) } - override def load(sc: SparkContext, path: String): MatrixFactorizationModel = { + def load(sc: SparkContext, path: String): MatrixFactorizationModel = { val sqlContext = new SQLContext(sc) val (className, formatVersion, metadata) = loadMetadata(sc, path) assert(className == thisClassName)