Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .idea/copyright/libble.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion .idea/copyright/profiles_settings.xml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 30 additions & 0 deletions data/testMF.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
1,1,3.0
1,2,4.0
1,3,2.8
1,4,4.0
1,5,3.7
1,6,4.7
2,1,2.0
2,2,5.0
2,3,4.8
2,4,2.6
2,5,4.2
2,6,3.0
3,1,4.3
3,2,3.2
3,3,5.0
3,4,4.9
3,5,3.2
3,6,4.0
4,1,3.0
4,2,4.3
4,3,4.3
4,4,1.0
4,5,3.2
4,6,2.3
5,1,4.0
5,2,4.3
5,3,4.5
5,4,2.3
5,5,2.0
5,6,1.0
57 changes: 37 additions & 20 deletions src/main/scala/collaborativeFiltering/MatrixFactorization.scala
Original file line number Diff line number Diff line change
@@ -1,3 +1,20 @@
/*
*
* Copyright (c) 2016 LIBBLE team supervised by Dr. Wu-Jun LI at Nanjing University.
* All Rights Reserved.
* Licensed under the Apache License, Version 2.0 (the "License");
* You may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

/**
* Created by syh on 2016/12/9.
*/
Expand Down Expand Up @@ -90,26 +107,26 @@ class MatrixFactorization extends Serializable{
var testTime = 0L
var i = 0
while (i < numIters){
//loss
val testTimeStart = System.currentTimeMillis()
val bc_test_itemFactors = ratingsByRow.context.broadcast(itemFactors)
val loss = ratingsByRow.mapPartitionsWithIndex {(index,iter) =>
val localV = bc_test_itemFactors.value
val localU = MatrixFactorization.workerstore.get[Map[Int, Vector]](s"userFactors_$index")
val reguV = localV.mapValues(v => lambda_v * v.dot(v))
val reguU = localU.mapValues(u => lambda_u * u.dot(u))
val ls = iter.foldLeft(0.0) { (l, r) =>
val uh = localU.get(r.index_x).get
val vj = localV.get(r.index_y).get
val residual = r.rating - uh.dot(vj)
l + residual * residual + reguU.get(r.index_x).get + reguV.get(r.index_y).get
}
Iterator.single(ls)
}.reduce(_ + _) / numRatings
bc_test_itemFactors.unpersist()
print(s"$loss\t")
testTime += (System.currentTimeMillis() - testTimeStart)
println(s"${System.currentTimeMillis() - testTime - startTime}")
// //loss
// val testTimeStart = System.currentTimeMillis()
// val bc_test_itemFactors = ratingsByRow.context.broadcast(itemFactors)
// val loss = ratingsByRow.mapPartitionsWithIndex {(index,iter) =>
// val localV = bc_test_itemFactors.value
// val localU = MatrixFactorization.workerstore.get[Map[Int, Vector]](s"userFactors_$index")
// val reguV = localV.mapValues(v => lambda_v * v.dot(v))
// val reguU = localU.mapValues(u => lambda_u * u.dot(u))
// val ls = iter.foldLeft(0.0) { (l, r) =>
// val uh = localU.get(r.index_x).get
// val vj = localV.get(r.index_y).get
// val residual = r.rating - uh.dot(vj)
// l + residual * residual + reguU.get(r.index_x).get + reguV.get(r.index_y).get
// }
// Iterator.single(ls)
// }.reduce(_ + _) / numRatings
// bc_test_itemFactors.unpersist()
// print(s"$loss\t")
// testTime += (System.currentTimeMillis() - testTimeStart)
// println(s"${System.currentTimeMillis() - testTime - startTime}")
//broadcast V to p workers
val bc_itemFactors = ratingsByRow.context.broadcast(itemFactors)
//for each woker i parallelly do
Expand Down
59 changes: 41 additions & 18 deletions src/main/scala/examples/testCF.scala
Original file line number Diff line number Diff line change
@@ -1,10 +1,27 @@
/*
*
* Copyright (c) 2016 LIBBLE team supervised by Dr. Wu-Jun LI at Nanjing University.
* All Rights Reserved.
* Licensed under the Apache License, Version 2.0 (the "License");
* You may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

/**
* We licence this file to you under the Apache Licence 2.0; you could get a copy
* of the licence from http://www.apache.org/licenses/LICENSE-2.0.
*/
package libble.examples

import libble.collaborativeFiltering.{MatrixFactorization, Rating}
import libble.collaborativeFiltering.{MatrixFactorizationByScope, MatrixFactorization, Rating}
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}

Expand All @@ -16,47 +33,53 @@ import scala.collection.mutable
*/
object testCF {
def main(args: Array[String]) {

if (args.length < 1) {
System.err.println("Usage: ~ path:String --numIters=Int --numParts=Int --rank=Int --regParam_u=Double --regParam_v=Double --stepsize=Double")
System.exit(1)
}

val optionsList = args.drop(1).map { arg =>
val optionsList = args.map { arg =>
arg.dropWhile(_ == '-').split('=') match {
case Array(opt, v) => (opt -> v)
case _ => throw new IllegalArgumentException("Invalid argument: " + arg)
}
}
val options = mutable.Map(optionsList: _*)
System.setProperty("hadoop.home.dir", "D:\\Program Files\\hadoop-2.6.0")
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)

val conf = new SparkConf()
.setAppName("testMF")
val sc = new SparkContext(conf)

val trainsetPath = options.remove("trainset").map(_.toString).getOrElse("data\\testMF.txt")
val stepsize = options.remove("stepsize").map(_.toDouble).getOrElse(0.1)
val regParam_u = options.remove("regParam_u").map(_.toDouble).getOrElse(0.1)
val regParam_v = options.remove("regParam_u").map(_.toDouble).getOrElse(0.1)
val numIters = options.remove("numIters").map(_.toInt).getOrElse(50)
val numParts = options.remove("numParts").map(_.toInt).getOrElse(2)
val rank = options.remove("rank").map(_.toInt).getOrElse(10)
val testsetPath = options.remove("testset").map(_.toString).getOrElse("data\\testMF.txt")

val stepsize = options.remove("stepsize").map(_.toDouble).getOrElse(0.01)
val regParam_u = options.remove("regParam_u").map(_.toDouble).getOrElse(0.05)
val regParam_v = options.remove("regParam_u").map(_.toDouble).getOrElse(0.05)
val numIters = options.remove("numIters").map(_.toInt).getOrElse(200)
val numParts = options.remove("numParts").map(_.toInt).getOrElse(16)
val rank = options.remove("rank").map(_.toInt).getOrElse(40)

val trainSet = sc.textFile(args(0), numParts)
val trainSet = sc.textFile(trainsetPath, numParts)
.map(_.split(',') match { case Array(user, item, rate) =>
Rating(rate.toDouble, user.toInt, item.toInt)
})
val testSet = sc.textFile(testsetPath, numParts)
.map(_.split(',') match { case Array(user, item, rate) =>
Rating(rate.toDouble, user.toInt, item.toInt)
})

val model = new MatrixFactorization()
val model = new MatrixFactorizationByScope()
.train(trainSet,
numIters,
numParts,
rank,
regParam_u,
regParam_v,
stepsize)

val result = model.predict(testSet.map(r=>(r.index_x,r.index_y)))
val rmse = result.map(r=>((r.index_x,r.index_y), r.rating))
.join(testSet.map(r=>((r.index_x,r.index_y), r.rating)))
.values
.map(i => math.pow(i._1 - i._2, 2))
.sum() / testSet.count()
println(s"rmse of test set: $rmse")
}
}
24 changes: 14 additions & 10 deletions src/main/scala/examples/testLR.scala
Original file line number Diff line number Diff line change
@@ -1,16 +1,18 @@
/*
* Copyright (c) 2016 LIBBLE team supervised by Dr. Wu-Jun LI at Nanjing University.
* All Rights Reserved.
* Licensed under the Apache License, Version 2.0 (the "License");
* You may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* Copyright (c) 2016 LIBBLE team supervised by Dr. Wu-Jun LI at Nanjing University.
* All Rights Reserved.
* Licensed under the Apache License, Version 2.0 (the "License");
* You may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package libble.examples

Expand Down Expand Up @@ -55,5 +57,7 @@ object testLR {
val training = sc.loadLIBBLEFile(args(0), numPart)
val m = new LogisticRegression(stepSize, regParam, elasticF, numIter, numPart)
m.train(training)


}
}