# Real-time Personalization with Latent Factor Models
-------------------------------------------------

## Overview
Latent factor models (i.e. - collaborative filtering) can provide a powerful abstraction for creating usable user-item preference data. By setting up the desired product affinity matrix, various matrix factorization techniques can be used to find a lower rank approximation of this matrix when given (as is usually the case) a small known / measured portion of the full matrix. The final user and item matrices (of rank R) have also been shown to be quite useful in other derivative personalization techniques.

## Notebook Overview
Below is a walkthrough from start to finish of a method for finding the top personalized recommendations for films similar to [Con Air](http://www.imdb.com/title/tt0118880/). We will use the [MovieLens](https://grouplens.org/datasets/movielens/) data set and find a very simple low-rank approximation of the user-movie affinity matrix. We will also show how this model can be tailored with real-time affinities that can personalize the recommendations even further without needing to re-compute the entire user-item affinity matrix.

1. Load and process MovieLens data
2. Determine best hyper-parameters for ALS model
3. Train the full model
4. Find similar movies to a target film based on latent user preferences
5. Derive real-time method for ranking films
6. Test examples of real-time recommendations

## Supporting Material
1. Slide [presentation](http://slides.com/dataexhaust/deck-c32b28f0-f834-45fb-9143-2362f74050ab#/)

In [1]:
/*
 *  Environment Setup
 *  ========================
 *  - Jupyter-Scala (https://github.com/alexarchambault/jupyter-scala)
 */
import $exclude.`org.slf4j:slf4j-log4j12`, $ivy.`org.slf4j:slf4j-nop:1.7.21` // for cleaner logs
import $profile.`hadoop-2.6`
import $ivy.`org.apache.spark::spark-sql:2.1.0` // adjust spark version - spark >= 2.0
import $ivy.`org.apache.hadoop:hadoop-aws:2.6.4`
import $ivy.`org.jupyter-scala::spark:0.4.0` // for JupyterSparkSession (SparkSession aware of the jupyter-scala kernel)

// General spark imports
import org.apache.spark._
import org.apache.spark.sql._
import jupyter.spark.session._

// Create sessions
val spark = JupyterSparkSession.builder() // important - call this rather than SparkSession.builder()
  .jupyter() // this method must be called straightaway after builder()
  // .yarn("/etc/hadoop/conf") // optional, for Spark on YARN - argument is the Hadoop conf directory
  // .emr("2.6.4") // on AWS ElasticMapReduce, this adds aws-related to the spark jar list
  .master("local[*]") // change to "yarn-client" on YARN
  .config("spark.driver.memory", "8g")
  .config("spark.executor.memory", "8g")
  .appName("Graph-based Review Summarization")
  .getOrCreate()

// Access underlying spark context (for backwards compatibility)
val sc = spark.sparkContext
val sqlContext = spark.sqlContext

log4j:WARN No appenders could be found for logger (io.netty.util.internal.logging.InternalLoggerFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.


[32mimport [39m[36m$exclude.$                        , $ivy.$                            // for cleaner logs
[39m
[32mimport [39m[36m$profile.$           
[39m
[32mimport [39m[36m$ivy.$                                   // adjust spark version - spark >= 2.0
[39m
[32mimport [39m[36m$ivy.$                                   
[39m
[32mimport [39m[36m$ivy.$                                // for JupyterSparkSession (SparkSession aware of the jupyter-scala kernel)

// General spark imports
[39m
[32mimport [39m[36morg.apache.spark._
[39m
[32mimport [39m[36morg.apache.spark.sql._
[39m
[32mimport [39m[36mjupyter.spark.session._

// Create sessions
[39m
[36mspark[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32msql[39m.[32mSparkSession[39m = org.apache.spark.sql.SparkSession@2935f3af
[36msc[39m: [32morg[39m.[32mapache[39m.[32mspark[39m.[32mSparkContext[39m = org.apache.spark.SparkContext@2c47648a
[36msqlContext[39m: [32morg[39m.[32mapac

In [3]:
// Load other related dependencies
interp.load.ivy("org.scalanlp" %% "breeze" % "0.13")
interp.load.ivy("org.apache.spark" %% "spark-mllib" % "2.0.2")
interp.load.ivy("com.databricks" %% "spark-csv" % "1.5.0")

Downloading https://repo1.maven.org/maven2/com/databricks/spark-csv_2.11/1.5.0/spark-csv_2.11-1.5.0.pom
Downloading https://repo1.maven.org/maven2/com/databricks/spark-csv_2.11/1.5.0/spark-csv_2.11-1.5.0.pom.sha1
Downloaded https://repo1.maven.org/maven2/com/databricks/spark-csv_2.11/1.5.0/spark-csv_2.11-1.5.0.pom.sha1
Downloaded https://repo1.maven.org/maven2/com/databricks/spark-csv_2.11/1.5.0/spark-csv_2.11-1.5.0.pom
Downloading https://repo1.maven.org/maven2/org/apache/commons/commons-csv/1.1/commons-csv-1.1.pom
Downloading https://repo1.maven.org/maven2/org/apache/commons/commons-csv/1.1/commons-csv-1.1.pom.sha1
Downloaded https://repo1.maven.org/maven2/org/apache/commons/commons-csv/1.1/commons-csv-1.1.pom.sha1
Downloaded https://repo1.maven.org/maven2/org/apache/commons/commons-csv/1.1/commons-csv-1.1.pom
Downloading https://repo1.maven.org/maven2/org/apache/commons/commons-csv/1.1/commons-csv-1.1.jar.sha1
Downloading https://repo1.maven.org/maven2/org/apache/commons/commons-csv

In [4]:
// Spark libraries
import sqlContext._
import sqlContext.implicits._
import org.apache.spark._
import org.apache.spark.rdd._
import org.apache.spark.SparkContext._
import org.apache.spark.sql._
import org.apache.spark.sql.types._

// ML Imports
import breeze.linalg._
import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
import org.apache.spark.mllib.recommendation.Rating

[32mimport [39m[36msqlContext._
[39m
[32mimport [39m[36msqlContext.implicits._
[39m
[32mimport [39m[36morg.apache.spark._
[39m
[32mimport [39m[36morg.apache.spark.rdd._
[39m
[32mimport [39m[36morg.apache.spark.SparkContext._
[39m
[32mimport [39m[36morg.apache.spark.sql._
[39m
[32mimport [39m[36morg.apache.spark.sql.types._

// ML Imports
[39m
[32mimport [39m[36mbreeze.linalg._
[39m
[32mimport [39m[36morg.apache.spark.mllib.linalg.Vectors
[39m
[32mimport [39m[36morg.apache.spark.mllib.recommendation.ALS
[39m
[32mimport [39m[36morg.apache.spark.mllib.recommendation.MatrixFactorizationModel
[39m
[32mimport [39m[36morg.apache.spark.mllib.recommendation.Rating[39m

## Support methods
These are a couple of support methods for later derivations - in particular CosineSimilarity and a Matrix creation helper function.

In [5]:
/*
* Helper Matrix Generation
 * ====
 * Creates and populates a 2D Breeze matrix from array of arrays
 */
def matrix(data: Array[Array[Double]], rows: Int, cols: Int): DenseMatrix[Double] = {
  // Allocate an array of arrays
  val matrix = DenseMatrix.zeros[Double](rows, cols)

  // Iterate over each position (i,j) in source matrix
  (0 to (data.length - 1)).foreach(i => {
    (0 to (data(0).length - 1)).foreach(j => {
      matrix(i, j) = data(i)(j)
    })
  })

  // Return DenseMatrix
  matrix
}



/*
 * Object in scala for calculating cosine similarity
 * Reuben Sutton - 2012
 * More information: http://en.wikipedia.org/wiki/Cosine_similarity
 */

object CosineSimilarity {

  /*
   * This method takes 2 equal length arrays of integers
   * It returns a double representing similarity of the 2 arrays
   * 0.9925 would be 99.25% similar
   * (x dot y)/||X|| ||Y||
   */
  def cosineSimilarity(x: Array[Double], y: Array[Double]): Double = {
    require(x.size == y.size)
    dotProduct(x, y)/(magnitude(x) * magnitude(y))
  }

  /*
   * Return the dot product of the 2 arrays
   * e.g. (a[0]*b[0])+(a[1]*a[2])
   */
  def dotProduct(x: Array[Double], y: Array[Double]): Double = {
    (for((a, b) <- x zip y) yield a * b) sum
  }

  /*
   * Return the magnitude of an array
   * We multiply each element, sum it, then square root the result.
   */
  def magnitude(x: Array[Double]): Double = {
    math.sqrt(x map(i => i*i) sum)
  }

}


defined [32mfunction[39m [36mmatrix[39m
defined [32mobject[39m [36mCosineSimilarity[39m

## Load movie data
MovieLens ratings come in a very simple form:

```
userId,movieId,rating,timestamp
1,169,2.5,1204927694
1,2471,3.0,1204927438
1,48516,5.0,1204927435
2,2571,3.5,1436165433
2,109487,4.0,1436165496
2,112552,5.0,1436165496
2,112556,4.0,1436165499
3,356,4.0,920587155
3,2394,4.0,920586920
3,2431,5.0,920586945
3,2445,4.0,920586945
```

In [None]:
// Load ratings data -- using Spark CSV (https://github.com/databricks/spark-csv)
val ratings = sqlContext.read.format("com.databricks.spark.csv")
                        .option("header", "true").option("inferSchema", "true")
                        .load("file:///.../movies/ml-latest/ratings.csv")
                        .map(r => {
                            Rating(r(0).toString.toInt, r(1).toString.toInt, r(2).toString.toDouble)
                        }).registerTempTable("ratings")


## Determine hyper-parameters
Since we using the Alternating-Least Squares algorithm (comes standard in SparkML), it requires a couple of hyper-parameters.

In [None]:
// Create training, validation, and test datasets
val Array(training, test) = ratings.randomSplit(Array(0.8, 0.2))

// Iterate over models and train
def trainAndTest(rank: Int, lambda: Double, numIter: Int) = {
    // Build the recommendation model using ALS on the training data
    val model = ALS.train(training, rank, numIter, lambda)

    // Evaluate the model on test data
    val usersProducts = test.map { case Rating(user, product, rate) =>
        (user, product)
    }
    val predictions = model.predict(usersProducts).map { case Rating(user, product, rate) =>
        ((user, product), rate)
    }

    val ratesAndPreds = test.map { case Rating(user, product, rate) =>
        ((user, product), rate)
    }.join(predictions)

    val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) =>
        val err = (r1 - r2)
        err * err
    }.mean()

    println("RMSE (validation) = " + MSE + " for the model trained with rank = " 
                                   + rank + ", lambda = " + lambda 
                                   + ", and numIter = " + numIter + ".")
}

## Train on full data-set (once hyper-parameters selected)

In [None]:
/* ==== Test Results ====
    [Baseline]
    RMSE (validation) = 0.6832349868066907 for the model
            trained with rank = 5, lambda = 0.1, and numIter = 5.

    [Pivot on Rank]
    RMSE (validation) = 0.6800594873467324 for the model 
            trained with rank = 10, lambda = 0.1, and numIter = 5.
    RMSE (validation) = 0.690144394236897 for the model 
            trained with rank = 20, lambda = 0.1, and numIter = 5.
    RMSE (validation) = 0.698129529945344 for the model 
            trained with rank = 50, lambda = 0.1, and numIter = 5.

    [Pivot on Lambda]
    RMSE (validation) = 0.6925651848679597 for the model 
            trained with rank = 5, lambda = 0.01, and numIter = 5.
    RMSE (validation) = 1.0692672408983346 for the model 
            trained with rank = 5, lambda = 0.5, and numIter = 5.
    RMSE (validation) = 1.7497220606946313 for the model 
            trained with rank = 5, lambda = 1.0, and numIter = 5.

    [Use Best Performing Parameters]
    RMSE (validation) = 0.6649058015762571 for the model 
            trained with rank = 10, lambda = 0.1, and numIter = 20.
 */

// Train full model
val model = ALS.train(ratings, 10, 10, 0.1)

// Save model
model.save(sc, "file:///.../movies/models/ml/v1")


## Find similar movies to Con-Air
With a trained latent-factor model, we can use it to find similar movies to a target film (Con Air) based simply on the implicit preferences of people who have like Con Air and movies with similar people to those that like Con Air.

In [None]:
// Open raw movie data
val movies = sqlContext.read.format("com.databricks.spark.csv")
                       .option("header", "true").option("inferSchema", "true")
                       .load("file:///.../movies/ml-latest/movies.csv")
                       .registerTempTable("movies")

// Load item factors
case class ItemFactor(id: Int, features: Array[Double])
case class IndexedItemFactor(id: Int, features: Array[Double], index: Int)
val item_factors = sqlContext.load("file:///.../movies/models/ml/v1/data/product","parquet")
                             .as[ItemFactor].rdd.zipWithIndex.map(x => {
                                IndexedItemFactor(x._1.id, x._1.features, x._2.toInt)
                             })
item_factors.toDF().registerTempTable("item_factors")

// Get the feature vector for Con Air
val target_item_factors = item_factors.filter(i => i.id == 1552).take(1)(0).features

// Iterate over all other item factors and map a cosine distance
case class SimilarItem(movie_id: Int, similarity: Double)
val similar_items = sc.parallelize(item_factors.collect()).map(i => {
    // Calculate similarity
    val similarity = CosineSimilarity.cosineSimilarity(target_item_factors, i.features)

    (similarity, i)
}).takeOrdered(20)(Ordering[Double].reverse.on(x=>x._1)).map(x => SimilarItem(x._2.id, x._1))

// Prep for Spark SQL
sc.parallelize(similar_items).toDF().registerTempTable("similar_items")

// Merge with book lookup and show top 10 similar books
sqlContext.sql("SELECT m.title, s.similarity, s.rank 
                FROM movies m JOIN similar_items s ON m.movieId = s.movie_id 
                ORDER BY s.similarity DESC LIMIT 20").foreach(println)


```

[Con Air (1997),1.0]
[Bad Boys (1995),0.9885250995229632]
[Striking Distance (1993),0.9868059080297423]
[City Hunter (Sing si lip yan) (1993),0.9857155838604543]
[Rock, The (1996),0.9855154341766271]
[Another 48 Hrs. (1990),0.9842966650016308]
[Program, The (1993),0.9830120663020663]
[Young Guns II (1990),0.982577633206861]
[Face/Off (1997),0.9825008152957304]
[Navy Seals (1990),0.9823346682235141]
[Assassins (1995),0.9820721829300805]
[Lethal Weapon 3 (1992),0.9820630838696822]
[Sharpe's Challenge (2006),0.9820047546907581]
[My Avatar and Me (Min Avatar og mig) (2010),0.9817191629728049]
[Blue Streak (1999),0.9817106802763232]
[Days of Thunder (1990),0.9816860526998341]
[Outbreak (1995),0.9804433142553622]
[Whispers in the Dark (1992),0.9803792985641578]
[Rapid Fire (1992),0.9803632101298569]
[Cowboy Way, The (1994),0.9799338957065918]
```

## Predicting new affinities (in real-time)
A simple method for getting a target user's tailored recommendations is to simply measure their own affinities (movie ratings in our case), include them in the whole dataset, and re-run the ALS derivation. This allows one to use the user's specific Preference Vector (P(U)).

In the case where we either a) cannot re-calculate the entire latent matrix because recommendations are required in real-time or b) the latent factor matrix is built on different collected dataset than the information available in the recommendation context, we must figure out a more clever way to find an approximation for P(U) given some other measure of known affinity.

The derivation for this approach exists in greater detail on my blog [post](http://dataexhaust.io/designing-product-recommendation-engines-for-the-new-age-of-digital-commerce/).

In [None]:
// Instantiate the user product matrix (in-memory)
val number_of_factors = item_factors.first.features.length
val number_of_items = item_factors.count().toInt
val Y = matrix(item_factors.collect().map(i => i.features), number_of_items, number_of_factors)

// Set up preferred films
val preferred_movies = Array(
  // Provide captured implicit movie preference weights
)

// Create G = Y * (Y.t * Y) ^ -1
val G = Y * inv(Y.t * Y)

// Find predicted user factor vector X_u => P_u * Y * (Y.t * Y) ^ -1'
val X_u = preferred_movies.map(x => x._3 * G(x._2,::))
                          .fold(DenseMatrix.zeros[Double](1, number_of_factors))((acc, v) => {
  acc + v
})

// Use X_u to get personalized movie scores Q_u = X_u * Y.t
val Q_u = X_u * Y.t

// ..... TEST .......

// Sort and pair with movie id
case class ItemRating(id: Int, score: Double)
val rated_items = (0 to (number_of_items - 1)).map(i => ItemRating(i, Q_u(0, i)))
                                              .sortBy(ir => -1.0 * ir.score).toArray
sc.parallelize(rated_items).toDF().registerTempTable("rated_items")

// Find the top recommended movies
sqlContext.sql("SELECT m.title, s.similarity, r.score 
                FROM rated_items r 
                JOIN movies m ON r.id = m.movieId 
                JOIN similar_items s ON s.movie_id = r.id 
                ORDER BY r.score DESC LIMIT 30").foreach(println)

// Taste Profile: "Lord of War was my JAM!"
val preferred_movies = Array(
  (3717, 403, 10.0),       // Gone in Sixty Seconds
  (1835, 30170, 50.0),     // City of Angels
  (36529, 27388, 100.0),   // Lord of War
  (733, 15060, 25.0)       // The Rock
)

// Taste Profile: "National Treasure was just the best..."
val preferred_movies = Array(
  (3717, 403, 10.0),       // Gone in Sixty Seconds
  (1835, 30170, 50.0),     // City of Angels
  (8972, 30893, 100.0),    // National Treasure
  (47810, 8756, 50.0)      // The Wicker Man
)