# Ch 4. Building a Recommendation Engine with Spark

* The idea behind recommendation engines is to predict what people might like and to uncover relationships between items to aid in the discovery process

* we could use a recommendation engine to show our users movies that they might enjoy. If we can do this well, we could keep our users engaged using our service, which is good for both our users and us

## Types of recommendation models
* Recommendation engines are most effective in two general scenarios(which are not mutally exclusive)

* Large number of available options for users
* A significant degree of personal taste involved

### Content-based filtering
* 아이템 속성 기반(제목, 태그 등), 오디오 비디오 컨텐츠에서 추출한 속성도 포함 가능
* 사용자 프로파일 사용자 속성을 기반 유사한 사용자 매칭
* 사용자와 관련된 아이템의 속성의 조합으로 사용자를 표현
* 이것이 사용자 프로파일

### Collaborative filtering
* 다수의 사람들의 선호도를 기반
* 사용자 기반 추천, 아이템 기반 추천 
* 사용자, 아이템 기반 추천은 일반적으로 nearest-neighbor model에 속함


## Matrix Factorization
* spark의 추천 모델은 현재 matrix factorization
* 이런 종류의 모델은 꾸준히 협업 필터링에서 좋은 성능을 냄

### Explicit matrix factorizaton
* rations, thumbs up, likes 와 같은 명시적 선호도 자료에 기반
* 사용자와 아이템의 2차원 행렬로 표시(very sparse matrix)
***
```
Tom, Star Wars, 5
Jane, Titanic, 4
Bill, Batman, 3
Jane, Star Wars, 2
Bill, Titanic, 3
```
***
<img src=image4notebook/4_1.png width=500 height=250 />
* matrix factorization, matrix completion
<img src=image4notebook/4_2.png width=500 height=500 />
* two matrices : U x k, I x k
* 원본 평가 행렬은 very sparse, 각 요인 행렬은 dense
<img src=image4notebook/4_3.png width=500 height=500 />
* latent feature models
* 사용자와 아이템에 대한 평가를 예측하기 위해서, 관련된 user-factor matrix 행과, item-factor matrix 행의 벡터 내적(vector dot product)을 계산(아래 이미지 참조)
<img src=image4notebook/4_4.png width=500 height=500 />
* 두 아이템 사이의 유사도를 알기위해 nearest-neighber model 사용(item-factor 벡터 사이 유사도 제외)
<img src=image4notebook/4_5.png width=500 height=500 />
* factorization 모델의 좋은 점은 모델 생성 후 추천 계산의 용이성이 높음
* 사용자와 아이템 집합이 큰 경우 저장, 계산이 어려움
* factorization 모델의 단점은 NNM모델에 비해 해석 및 이해가 어려움
* 모델 훈련 단계에서 많은 연산이 필요함

### implicit matrix factorization
* 직접적인 평가가 아닌 암묵적 피드백
* 영화를 보거나 구매하는 행위, 영화 시청 횟수 등
* P : binary preference matrix, C : confidence weights matrix
* P : 사용자가 본 영화, C : 사용자가 영화를 본 횟수
* 벡터 내적을 통해 추천을 계산 시 스코아는 평가를 나타내기보다 사용자의 선호도를 나타냄
<img src=image4notebook/4_6.png width=500 height=200 />

### Alternating least squares
* ALS는 matrix factorization 문제를 해결하는 최적화 기술
* 좋은 성능, 병렬처리 구현이 상대적으로 용이
* 1.0.0 ~ 1.4.1 현재 버전까지도 Collaborative filtering 알고리즘은ALS만을 제공하고 있음


## Extracting the right features from your data

### Extrating features from the Movie-Lens 100k dataset

In [27]:
val rawData = sc.textFile("/Users/Limsangbae/ml-100k/u.data")
rawData.first()

196	242	3	881250949

In [28]:
val rawRatings = rawData.map(_.split("\t").take(3))
rawRatings.first()

Array(196, 242, 3)

In [32]:
import org.apache.spark.mllib.recommendation.ALS

In [33]:
ALS.train

Name: Compile Error
Message: <console>:24: error: ambiguous reference to overloaded definition,
both method train in object ALS of type (ratings: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating], rank: Int, iterations: Int)org.apache.spark.mllib.recommendation.MatrixFactorizationModel
and  method train in object ALS of type (ratings: org.apache.spark.rdd.RDD[org.apache.spark.mllib.recommendation.Rating], rank: Int, iterations: Int, lambda: Double)org.apache.spark.mllib.recommendation.MatrixFactorizationModel
match expected type ?
              ALS.train
                  ^
StackTrace: 

In [34]:
import org.apache.spark.mllib.recommendation.Rating
Rating()

Name: Compile Error
Message: <console>:26: error: not enough arguments for method apply: (user: Int, product: Int, rating: Double)org.apache.spark.mllib.recommendation.Rating in object Rating.
Unspecified value parameters user, product, rating.
              Rating()
                    ^
StackTrace: 

In [35]:
val ratings = rawRatings.map { 
    case Array(user,movie, rating) => Rating(user.toInt, movie.toInt, rating.toDouble)
}

In [36]:
ratings.first()

Rating(196,242,3.0)

## Training the recommendation model

### Training a model on the MovieLens 100k  dataset

* 모델을 훈련 시킬 준비가 되었음!
    * rank : ALS 모델의 factor의 수, factor는 low-rank approximation matrices의 hidden feature의 수. 일반적으로 큰 수가 좋으나 메모리 사용(계산 및 저장)에 직접적 영향이 있음. Trade- off. 10~200 정도가 일반적으로 합리적 범위
    * iterations : 반복 수행 횟수. ALS  알고리즘은 상대적으로 적은 반복만으로도 좋은 성능. 10 정도가 일반적으로 좋음
    * lambda : 모델의 균일화(regularization) 제어, over fitting을 제어. 데이터 특성에 맞게 조절이 필요

In [37]:
val model= ALS.train(ratings, 50,10, 0.01)

In [38]:
model.userFeatures.count // lazy transformation

943

In [39]:
model.productFeatures.count

1682

In [31]:
model.productFeatures.count

1682

```
Limsangbae  Macintosh  ~  ml-100k  $  wc -l u.user
     943 u.user
 Limsangbae  Macintosh  ~  ml-100k  $  wc -l u.item
    1682 u.item
    ```

## Using the recommendation model

### User recommendations

In [40]:
val predictedRating = model.predict(789,123)

In [41]:
predictedRating

3.911786142405169

In [42]:
val userId=789
val K=10
val topKRecs = model.recommendProducts(userId,K)
println(topKRecs.mkString("\n"))
println(topKRecs.toString())

Rating(789,195,5.697909233600918)
Rating(789,156,5.640288372299024)
Rating(789,177,5.549697775269709)
Rating(789,96,5.458112117882252)
Rating(789,223,5.446679308973188)
Rating(789,433,5.393072357021246)
Rating(789,192,5.374565041036448)
Rating(789,429,5.360461561295375)
Rating(789,180,5.353779019521968)
Rating(789,653,5.346538771637403)
[Lorg.apache.spark.mllib.recommendation.Rating;@4c20df2a


cat u.data | grep "^789"| sort -n -k2

#### Inspecting the recommendations

In [44]:
val movies = sc.textFile("/Users/Limsangbae/ml-100k/u.item")
val titles = movies.map(line => line.split("\\|").take(2)).map(array=>(array(0).toInt,array(1))).collectAsMap()
titles(123)

Frighteners, The (1996)

In [45]:
titles.toString()

Map(137 -> Big Night (1996), 891 -> Bent (1997), 550 -> Die Hard: With a Vengeance (1995), 1205 -> Secret Agent, The (1996), 146 -> Unhook the Stars (1996), 864 -> My Fellow Americans (1996), 559 -> Interview with the Vampire (1994), 218 -> Cape Fear (1991), 568 -> Speed (1994), 227 -> Star Trek VI: The Undiscovered Country (1991), 765 -> Boomerang (1992), 1115 -> Twelfth Night (1996), 774 -> Prophecy, The (1995), 433 -> Heathers (1989), 92 -> True Romance (1993), 1528 -> Nowhere (1997), 846 -> To Gillian on Her 37th Birthday (1996), 1187 -> Switchblade Sisters (1975), 1501 -> Prisoner of the Mountains (Kavkazsky Plennik) (1996), 442 -> Amityville Curse, The (1990), 1160 -> Love! Valour! Compassion! (1997), 101 -> Heavy Metal (1981), 1196 -> Savage Nights (Nuits fauves, ...

In [46]:
val moviesForUser=ratings.keyBy(_.user).lookup(789)
println(moviesForUser.size)

33


In [50]:
moviesForUser.sortBy(-_.rating).take(10).map(rating=>(titles(rating.product),rating.rating)).foreach(println)
// soryBy " - "descending

Name: Syntax Error.
Message: 
StackTrace: 

(Godfather, The (1972),5.0)
(Trainspotting (1996),5.0)
(Dead Man Walking (1995),5.0)
(Star Wars (1977),5.0)
(Swingers (1996),5.0)
(Leaving Las Vegas (1995),5.0)
(Bound (1996),5.0)
(Fargo (1996),5.0)
(Last Supper, The (1995),5.0)
(Private Parts (1997),4.0)


In [52]:
topKRecs.map(rating=>(titles(rating.product),rating.rating)).foreach(println)

(Terminator, The (1984),5.697909233600918)
(Reservoir Dogs (1992),5.640288372299024)
(Good, The Bad and The Ugly, The (1966),5.549697775269709)
(Terminator 2: Judgment Day (1991),5.458112117882252)
(Sling Blade (1996),5.446679308973188)
(Heathers (1989),5.393072357021246)
(Raging Bull (1980),5.374565041036448)
(Day the Earth Stood Still, The (1951),5.360461561295375)
(Apocalypse Now (1979),5.353779019521968)
(Touch of Evil (1958),5.346538771637403)


### Item recommendations

* 이 항목과 가장 비슷한 항목은 어떤것인가?
* 유사도 측정 방법
    * Pearson correlation
    * cosine similarity for real-valued vectors
    * jaccard similarity for binary vectors
    
### Generating similar movies for the MovieLens 100k dataset    
* 현재 MatrixFactorizationModel API는 item2item 유사도 계산을 직접적으로 지원하지 않음
* Cosine similarity 사용(-1 ~ 1)

In [53]:
import org.jblas.DoubleMatrix
val aMatrix = new DoubleMatrix(Array(1,0,2,0,3.0))

In [54]:
println(aMatrix.toString())

[1.000000; 0.000000; 2.000000; 0.000000; 3.000000]


In [55]:
def cosineSimilarity(vec1: DoubleMatrix, vec2: DoubleMatrix) : 
Double = { vec1.dot(vec2)/(vec1.norm2()*vec2.norm2())}

In [56]:
// 동일한 벡터 사이의 코사인 유사도는 1
val itemId = 567
val itemFactor = model.productFeatures.lookup(itemId).head
val itemVector = new DoubleMatrix(itemFactor)
cosineSimilarity(itemVector, itemVector) 


1.0

In [57]:
val sims = model.productFeatures.map{ case (id,factor)=>
val factorVector = new DoubleMatrix(factor)
val sim = cosineSimilarity(factorVector, itemVector)
(id,sim)
}

In [59]:
// K는 10으로 
val sortedSims = sims.top(K) (Ordering.by[(Int,Double), Double] { case (id, similarity) => similarity})
//top() spark 함수를 이용하여 분산 방식으로 

Name: Syntax Error.
Message: 
StackTrace: 

In [60]:
println(sortedSims.take(10).mkString("\n"))

(567,1.0)
(16,0.7021405671989809)
(433,0.6965832620396708)
(109,0.6871238986088308)
(403,0.6805367352655004)
(741,0.6791489354428673)
(1007,0.6766634661747077)
(248,0.6721551340936744)
(219,0.6718872224527191)
(413,0.668481869082473)


In [61]:
println(titles(itemId))

Wes Craven's New Nightmare (1994)


In [62]:
val sortedSims2 = sims.top(K + 1)(Ordering.by[(Int, Double), Double] { case (id, similarity) => similarity })

In [63]:
//println이 책에는 빠져있음
println(sortedSims2.slice(1, 11).map{ case (id, sim) => (titles(id), sim) }.mkString("\n"))

(French Twist (Gazon maudit) (1995),0.7021405671989809)
(Heathers (1989),0.6965832620396708)
(Mystery Science Theater 3000: The Movie (1996),0.6871238986088308)
(Batman (1989),0.6805367352655004)
(Last Supper, The (1995),0.6791489354428673)
(Waiting for Guffman (1996),0.6766634661747077)
(Grosse Pointe Blank (1997),0.6721551340936744)
(Nightmare on Elm Street, A (1984),0.6718872224527191)
(Tales from the Crypt Presents: Bordello of Blood (1996),0.668481869082473)
(Some Kind of Wonderful (1987),0.6677884697880286)


## Evaluating the performance of recommendation models
* 훈련된 모델이 좋은 모델이라는 것을 어떻게 알 수 있을까?
* 예측 성능 평가 필요함
* 두가지 일반적인 측정 지표(Mean Squared Error, Mean average precision at K)

### Mean Squared Error(MSE)
* 오차 제곱의 합을 관측값(행)으로 나눈 것


In [64]:
val actualRating = moviesForUser.take(1)(0)
println(actualRating)

Rating(789,1012,4.0)


In [65]:
val predictRating = model.predict(789, actualRating.product)
//println(actualRating.product)
//val predictRating = model.predict(789, 1024)
println(predictRating)

4.010584985135074


In [66]:
val squareError = math.pow(predictedRating - actualRating.rating, 2.0)
println(squareError)

0.007781684671761088


In [67]:
val usersProducts = ratings.map{ case Rating(user, product, rating) => (user, product) }
val predictions =model.predict(usersProducts).map{
case Rating(user,product,rating) => ((user,product),rating) }

In [68]:
val ratingsAndPredictions = ratings.map{
case Rating(user, product, rating) => ((user,product), rating) }.join(predictions)
println(ratingsAndPredictions.take(1).mkString("\n"))

((92,386),(3.0,2.4415577029127324))


In [69]:
val MSE = ratingsAndPredictions.map{
case((user,product),(actual,predicted)) =>
math.pow((actual-predicted),2)
}.reduce(_+_) / ratingsAndPredictions.count
println("MSE = " + MSE)

MSE = 0.0850860348619457


In [70]:
val RMSE = math.sqrt(MSE)
println("RMSE = " + RMSE)

RMSE = 0.2916951059958766


## Mean average precision at K
* Mean average precision at K(MAPK) 
* APK는 정보탐색에 일반적으로 사용되는 지표
* precision = Relevant Retrieved/Retrieved
* Average Precision = 관련된 문서 찾은 것의 precision을 전체 관련된 문서의 수로 나눈 것

In [71]:
def avgPrecisionK(actual: Seq[Int], predicted :
Seq[Int], k:Int): Double={
val predK = predicted.take(k)
var score =0.0
var numHits =0.9
for ((p, i) <-predK.zipWithIndex) {
    if (actual.contains(p)) {
        numHits += 1.0
        score += numHits/(i.toDouble + 1.0)
        }
    }
    if (actual.isEmpty) {
        1.0
    } else {
        score /scala.math.min(actual.size,k).toDouble
    }
}


In [72]:
val actualMovies = moviesForUser.map(_.product)
println(actualMovies)

ArrayBuffer(1012, 127, 475, 93, 1161, 286, 293, 9, 50, 294, 181, 1, 1008, 508, 284, 1017, 137, 111, 742, 248, 249, 1007, 591, 150, 276, 151, 129, 100, 741, 288, 762, 628, 124)


In [73]:
val predictedMovies = topKRecs.map(_.product)
println(predictedMovies.mkString("\n"))

195
156
177
96
223
433
192
429
180
653


In [74]:
val apk10 = avgPrecisionK(actualMovies, predictedMovies, 10)
println(apk10)

0.0


```
In order to compute the APK for each user and average them to compute the overall MAPK, we will need to generate the list of recommendations for each user in our dataset. While this can be fairly intensive on a large scale, we can distribute the computation using our Spark functionality. However, one limitation is that each worker must have the full item-factor matrix available so that it can compute the dot product between the relevant user vector and all item vectors. This can be a problem when the number of items is extremely high as the item matrix must fit in the memory of one machine.
```

In [75]:
val itemFactors = model.productFeatures.map { case (id, factor) => factor }.collect()
val itemMatrix = new DoubleMatrix(itemFactors)
println(itemMatrix.rows, itemMatrix.columns)

(1682,50)


In [76]:
val imBroadcast = sc.broadcast(itemMatrix)

In [77]:
val allRecs = model.userFeatures.map{ case (userId, array) => 
  val userVector = new DoubleMatrix(array)
  val scores = imBroadcast.value.mmul(userVector)
  val sortedWithId = scores.data.zipWithIndex.sortBy(-_._1)
  val recommendedIds = sortedWithId.map(_._2 + 1).toSeq
  (userId, recommendedIds)
}

println(allRecs)

MapPartitionsRDD[465] at map at <console>:44


In [78]:
val userMovies = ratings.map{ case Rating(user, product, rating) => (user, product) }.groupBy(_._1)

In [79]:
val K = 10
val MAPK = allRecs.join(userMovies).map{ case (userId, (predicted, actualWithIds)) => 
  val actual = actualWithIds.map(_._2).toSeq
  avgPrecisionK(actual, predicted, K)
  }.reduce(_ + _) / allRecs.count
println("Mean Average Precision at K = " + MAPK)

Mean Average Precision at K = 0.05180138952010632


## Using MLlib's built-in evaluation functions

* MSE, RMSE, MAPK
* 실은 계산할 필요 없음. 다 제공함 T_T

In [80]:
import org.apache.spark.mllib.evaluation.RegressionMetrics
val predictedAndTrue = ratingsAndPredictions.map { 
case ((user, product), (predicted, actual)) => (predicted, actual) }
val regressionMetrics = new RegressionMetrics(predictedAndTrue)

println("Mean Squared Error = " + regressionMetrics.meanSquaredError)
println("Root Mean Squared Error = " + regressionMetrics.rootMeanSquaredError)

Mean Squared Error = 0.08508603486194566
Root Mean Squared Error = 0.29169510599587656


In [86]:
import org.apache.spark.mllib.evaluation.RankingMetrics
val predictedAndTrueForRanking = allRecs.join(userMovies).map{ case (userId, (predicted, actualWithIds)) => 
  val actual = actualWithIds.map(_._2)
  (predicted.toArray, actual.toArray)
}
val rankingMetrics = new RankingMetrics(predictedAndTrueForRanking)
println("Mean Average Precision = " + rankingMetrics.meanAveragePrecision)

Mean Average Precision = 0.0725445180994263


In [89]:
val MAPK2000 = allRecs.join(userMovies).map{ case (userId, (predicted, actualWithIds)) => 
  val actual = actualWithIds.map(_._2).toSeq
  avgPrecisionK(actual, predicted, 2000)
}.reduce(_ + _) / allRecs.count
println("Mean Average Precision = " + MAPK2000)

Mean Average Precision = 0.07768046214275937
