## Path related

In [1]:
val DataHome = "file:///Users/hadoop/eda/data/"

In [2]:
DataHome

file:///Users/hadoop/eda/data/

# Fetch valid attribute

In [3]:
val rawData = sc.textFile(DataHome + "ml-100K/u.data")

In [4]:
rawData

file:///Users/hadoop/eda/data/ml-100K/u.data MapPartitionsRDD[2] at textFile at <console>:22

In [5]:
rawData.count()

100000

In [6]:
rawData.take(10)

Array(196	242	3	881250949, 186	302	3	891717742, 22	377	1	878887116, 244	51	2	880606923, 166	346	1	886397596, 298	474	4	884182806, 115	265	2	881171488, 253	465	5	891628467, 305	451	3	886324817, 6	86	3	883603013)

In [7]:
rawData.first()

196	242	3	881250949

In [8]:
val rawRatings = rawData.map(_.split("\t").take(3))

In [9]:
rawRatings.first()

Array(196, 242, 3)

### Do you know what the underscore _ is?

In [10]:
List(1, 2, 3) map (_ + 2)

List(3, 4, 5)

In [11]:
List(1, 2, 3) map {_ + 2}

List(3, 4, 5)

In [12]:
List(1, 2, 3) map (element => element + 2)

List(3, 4, 5)

In [13]:
rawRatings.first()

Array(196, 242, 3)

In [14]:
rawRatings.count()

100000

## Import MLlib package

In [15]:
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.Rating

In [16]:
Rating()

Name: Compile Error
Message: <console>:21: error: not enough arguments for method apply: (user: Int, product: Int, rating: Double)org.apache.spark.mllib.recommendation.Rating in object Rating.
Unspecified value parameters user, product, rating.
              Rating()
                    ^
StackTrace: 

## Transfer the data type

In [17]:
val ratings = rawRatings.map{case Array(user, movie, rating) =>
Rating(user.toInt, movie.toInt, rating.toDouble)}

In [18]:
ratings.cache()

MapPartitionsRDD[4] at map at <console>:28

In [19]:
ratings.persist()

MapPartitionsRDD[4] at map at <console>:28

In [20]:
ratings.first()

Rating(196,242,3.0)

# Training recommendation model

In [21]:
val model = ALS.train(ratings, 50, 10, 0.01)

In [22]:
model

org.apache.spark.mllib.recommendation.MatrixFactorizationModel@484c4e90

In [23]:
model.userFeatures

users MapPartitionsRDD[210] at mapValues at ALS.scala:255

In [24]:
model.userFeatures.count

943

In [25]:
model

org.apache.spark.mllib.recommendation.MatrixFactorizationModel@484c4e90

# Use the recommendation model

## User recommendating

In [26]:
// Generating recommendating
val predicteRating = model.predict(789, 123)

In [27]:
predicteRating

4.001840799366895

In [28]:
val userId = 789
val K = 10
val topKRecs = model.recommendProducts(userId, K)
println(topKRecs.mkString("\n"))

Rating(789,56,6.083941661580425)
Rating(789,156,5.615780727101129)
Rating(789,96,5.505746763033932)
Rating(789,135,5.43592194833331)
Rating(789,180,5.408899612616104)
Rating(789,211,5.374947809057756)
Rating(789,663,5.356187740727127)
Rating(789,856,5.342549917516576)
Rating(789,246,5.331845343765216)
Rating(789,187,5.30115823343019)


In [29]:
// Check recommendating result
val movies = sc.textFile(DataHome + "ml-100k/u.item")

In [30]:
movies.take(2)

Array(1|Toy Story (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?Toy%20Story%20(1995)|0|0|0|1|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0, 2|GoldenEye (1995)|01-Jan-1995||http://us.imdb.com/M/title-exact?GoldenEye%20(1995)|0|1|1|0|0|0|0|0|0|0|0|0|0|0|0|0|1|0|0)

In [31]:
val titles = movies.map(line => 
line.split("\\|").take(2)).map(array => (array(0).toInt,
array(1).toString)).collectAsMap()

In [32]:
titles(123)

Frighteners, The (1996)

In [33]:
val moviesForUser = ratings.keyBy(_.user).lookup(789)

In [34]:
moviesForUser

WrappedArray(Rating(789,1012,4.0), Rating(789,127,5.0), Rating(789,475,5.0), Rating(789,93,4.0), Rating(789,1161,3.0), Rating(789,286,1.0), Rating(789,293,4.0), Rating(789,9,5.0), Rating(789,50,5.0), Rating(789,294,3.0), Rating(789,181,4.0), Rating(789,1,3.0), Rating(789,1008,4.0), Rating(789,508,4.0), Rating(789,284,3.0), Rating(789,1017,3.0), Rating(789,137,2.0), Rating(789,111,3.0), Rating(789,742,3.0), Rating(789,248,3.0), Rating(789,249,3.0), Rating(789,1007,4.0), Rating(789,591,3.0), Rating(789,150,5.0), Rating(789,276,5.0), Rating(789,151,2.0), Rating(789,129,5.0), Rating(789,100,5.0), Rating(789,741,5.0), Rating(789,288,3.0), Rating(789,762,3.0), Rating(789,628,3.0), Rating(789,124,4.0))

In [35]:
moviesForUser.size

33

In [36]:
moviesForUser.sortBy(-_.rating).take(10).map(rating =>
                                            (titles(rating.product), rating.rating)).foreach(println)

(Godfather, The (1972),5.0)
(Trainspotting (1996),5.0)
(Dead Man Walking (1995),5.0)
(Star Wars (1977),5.0)
(Swingers (1996),5.0)
(Leaving Las Vegas (1995),5.0)
(Bound (1996),5.0)
(Fargo (1996),5.0)
(Last Supper, The (1995),5.0)
(Private Parts (1997),4.0)


In [37]:
topKRecs.map(rating => (titles(rating.product), rating.rating)).foreach(println)

(Pulp Fiction (1994),6.083941661580425)
(Reservoir Dogs (1992),5.615780727101129)
(Terminator 2: Judgment Day (1991),5.505746763033932)
(2001: A Space Odyssey (1968),5.43592194833331)
(Apocalypse Now (1979),5.408899612616104)
(M*A*S*H (1970),5.374947809057756)
(Being There (1979),5.356187740727127)
(Night on Earth (1991),5.342549917516576)
(Chasing Amy (1997),5.331845343765216)
(Godfather: Part II, The (1974),5.30115823343019)


## Item recommendating

In [39]:
%lsmagic

Available line magics:
%lsmagic %showtypes %adddeps %truncation %addjar

Available cell magics:
%%sql %%html %%javascript %%dataframe %%pyspark %%scala %%sparkr

Type %<magic_name> for usage info.
         


In [40]:
%addjar

Usage: %AddJar <jar_url>

Option   Description                        
------   -----------                        
-f       forces re-download of specified jar
--magic  loads jar as a magic extension     


In [44]:
%addjar file:/Users/hadoop/Downloads/jblas-1.2.4.jar

Starting download from file:/Users/hadoop/Downloads/jblas-1.2.4.jar
Finished download of jblas-1.2.4.jar


In [46]:
// This step occour error
import org.jblas.DoubleMatrix

In [47]:
val aMatrix = new DoubleMatrix(Array(1.0, 2.0, 3.0))

In [49]:
aMatrix

[1.000000; 2.000000; 3.000000]

In [50]:
def cosineSimilarity(vec1: DoubleMatrix, vec2: DoubleMatrix): Double =
{
    vec1.dot(vec2) / (vec1.norm2() * vec2.norm2())
}

In [54]:
val itemId = 567
val itemFactor = model.productFeatures.lookup(itemId).head
val itemVector = new DoubleMatrix(itemFactor)
cosineSimilarity(itemVector, itemVector)

1.0000000000000002

In [55]:
val sims = model.productFeatures.map{
 case(id, factor) => 
 val factorVector = new DoubleMatrix (factor)
 val sim = cosineSimilarity(factorVector, itemVector)
 (id, sim)
}

In [56]:
val sortedSims = sims.top(K)(Ordering.by[(Int, Double), Double]
{case(id, similarity) => similarity})

In [57]:
sortedSims

Array((567,1.0000000000000002), (288,0.7320649932052218), (184,0.7304292520729745), (219,0.7231391148544101), (152,0.7152935249739909), (550,0.707878874102939), (201,0.6983712756508331), (195,0.693884979685754), (1222,0.686234788032258), (156,0.6856838164645929))

In [58]:
println(sortedSims.take(10).mkString("\n"))

(567,1.0000000000000002)
(288,0.7320649932052218)
(184,0.7304292520729745)
(219,0.7231391148544101)
(152,0.7152935249739909)
(550,0.707878874102939)
(201,0.6983712756508331)
(195,0.693884979685754)
(1222,0.686234788032258)
(156,0.6856838164645929)


# Check the similar item from recommendating

In [59]:
println(titles(itemId))

Wes Craven's New Nightmare (1994)


In [60]:
itemId

567

In [61]:
val sortedSims2 = sims.top(K + 1)(Ordering.by[(Int, Double), Double]{
case (id, similarity) => similarity
})

In [62]:
sortedSims2

Array((567,1.0000000000000002), (288,0.7320649932052218), (184,0.7304292520729745), (219,0.7231391148544101), (152,0.7152935249739909), (550,0.707878874102939), (201,0.6983712756508331), (195,0.693884979685754), (1222,0.686234788032258), (156,0.6856838164645929), (24,0.6820257519173245))

In [63]:
sortedSims2.slice(1, 11).map{
case (id, sim) => (titles(id), sim)
}.mkString("\n")

(Scream (1996),0.7320649932052218)
(Army of Darkness (1993),0.7304292520729745)
(Nightmare on Elm Street, A (1984),0.7231391148544101)
(Sleeper (1973),0.7152935249739909)
(Die Hard: With a Vengeance (1995),0.707878874102939)
(Evil Dead II (1987),0.6983712756508331)
(Terminator, The (1984),0.693884979685754)
(Judgment Night (1993),0.686234788032258)
(Reservoir Dogs (1992),0.6856838164645929)
(Rumble in the Bronx (1995),0.6820257519173245)

# Recommendation model test

## Mean Squared Error, MSE

In [64]:
val actualRating = moviesForUser.take(1)(0)

In [65]:
actualRating

Rating(789,1012,4.0)

In [66]:
val predictedRating = model.predict(789, actualRating.product)

In [68]:
predictedRating

3.946482427063494

In [69]:
predicteRating

4.001840799366895

In [70]:
val squaredError = math.pow(predictedRating - actualRating.rating, 2.0)

In [71]:
squaredError

0.002864130613014245

In [73]:
val usersProducts = ratings.map {
case Rating(user, product, rating) => (user, product)
}

In [74]:
usersProducts

MapPartitionsRDD[225] at map at <console>:26

In [81]:
val predictions = model.predict(usersProducts).map {
case Rating(user, product, rating) =>
((user, product), rating)
}

In [83]:
predictions

MapPartitionsRDD[243] at map at <console>:30

In [84]:
val ratingsAndPredictions = ratings.map {
case Rating(user, product, rating) =>
((user, product), rating)
}.join(predictions)

In [85]:
ratingsAndPredictions

MapPartitionsRDD[247] at join at <console>:35

In [86]:
val MSE = ratingsAndPredictions.map {
case ((user, product), (actual, predicted)) =>
math.pow((actual - predicted), 2)
}.reduce(_ + _) / ratingsAndPredictions.count

In [88]:
println("Mean Squared Error = " + MSE)

Mean Squared Error = 0.0850486436804848


In [89]:
val RMSE = math.sqrt(MSE)

In [90]:
println("Root Mean Squared Error = " + RMSE)

Root Mean Squared Error = 0.2916310060341403


# K-means correct rating