In [0]:
import scala.collection.Map
import scala.collection.mutable.ArrayBuffer
import scala.util.Random
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.ml.recommendation.{ALS, ALSModel}
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
import org.apache.spark.sql.functions._

import java.nio.file.Paths
import spark.implicits._


In [1]:
val OS = System.getProperty("os.name").toLowerCase
if (OS.contains("win"))
  System.setProperty("hadoop.home.dir", Paths.get("winutils").toAbsolutePath.toString)
else
  System.setProperty("hadoop.home.dir", "/")

val spark = SparkSession.builder().config("spark.master", "local[*]").getOrCreate()
// Optional, but may help avoid errors due to long lineage
//spark.sparkContext.setCheckpointDir("./tmp")

//val base = "./audioscrobbler_data/"
val base = "gs://dataproc-staging-europe-west6-872288405326-yxz32br1/audioscrobbler_data/"

val rawUserArtistData = spark.read.textFile(base + "user_artist_data.txt")
val rawArtistData = spark.read.textFile(base + "artist_data.txt")
val rawArtistAlias = spark.read.textFile(base + "artist_alias.txt")


In [2]:
// show the 5 first line in the file userartistdata
rawUserArtistData.take(5).foreach(println)

// It makes sense to transform this to a data frame with columns named “user” and “artist,” because it then becomes simple to compute simple statistics like the maximum and minimum of both columns 
val userArtistDF = rawUserArtistData.map { line =>
  val Array(user, artist, _*) = line.split(' ')
  (user.toInt, artist.toInt)
}.toDF("user", "artist")

// ALS does not specify the type to use. but it works better with Int 32 so we have to be sure the max number existing in the dataset is bellow the maximum int value (2147483647). Here it's below so we have no problems
userArtistDF.agg(min("user"), max("user"), min("artist"), max("artist")).show()

In [3]:
/*
To be able to interpret the futur results, it's useful to have a corespondance between ID and band Name. This information is given to us by the file artist_data which contain the ID Name map
Use of flatmap insted of map because some line in the file are corrupted. 
*/
val artistByID = rawArtistData.flatMap { line =>
      val (id, name) = line.span(_ != '\t')
      if (name.isEmpty) {
        None
      } else {
        try {
          Some((id.toInt, name.trim))
        } catch {
          case _: NumberFormatException => None
        }
      }
    }.toDF("id", "name")
 
// we can see that it map indeed the ID to the name of the artist   
artistByID.head


In [4]:
/*
The artist_alias.txt file maps artist IDs that may be misspelled or nonstandard to the
ID of the artist’s canonical name.

*/
val artistAlias = rawArtistAlias.flatMap { line =>
    val Array(artist, alias) = line.split('\t')
    if (artist.isEmpty) {
        None
    } else {
        Some((artist.toInt, alias.toInt))
    }
    }.collect().toMap
    
artistAlias.head

// we can check different artist name mapped to the same canonical ID
// for exemple we can see that 1208690 is mapped to 1003926, we can then see if the name they are refered to is the same.
val (badID, goodID) = artistAlias.head
artistByID.filter($"id" isin (badID, goodID)).show()

In [5]:
/*
This function transform all mispelled artist name ID to the right ID and the proceed to count the number of time a artist has been lisened to by a user
*/
def buildCounts(rawUserArtistData: Dataset[String], bArtistAlias: Broadcast[Map[Int,Int]]): DataFrame = {
    rawUserArtistData.map { line =>
        val Array(userID, artistID, count) = line.split(' ').map(_.toInt)
        val finalArtistID =
            bArtistAlias.value.getOrElse(artistID, artistID)
    (userID, finalArtistID, count)
    }.toDF("user", "artist", "count")
}

def buildArtistAlias(rawArtistAlias: Dataset[String]): Map[Int,Int] = {
    rawArtistAlias.flatMap { line =>
    val Array(artist, alias) = line.split('\t')
    if (artist.isEmpty) {
        None
    } else {
        Some((artist.toInt, alias.toInt))
    }
    }.collect().toMap
}

/*
This is an optimisation. It ensure that Spark send and hold in memory just one copy for each executor in the cluster.
*/
val bArtistAlias = spark.sparkContext.broadcast(buildArtistAlias(rawArtistAlias))
val trainData = buildCounts(rawUserArtistData, bArtistAlias)

trainData.cache()

In [6]:
/*
Define and Train the model with the basic parametre
*/
val model = new ALS().
    setSeed(Random.nextLong()).
    setImplicitPrefs(true).
    setRank(10).
    setRegParam(0.01).
    setAlpha(1.0).
    setMaxIter(5).
    setUserCol("user").
    setItemCol("artist").
    setRatingCol("count").
    setPredictionCol("prediction").
    fit(trainData)

In [7]:
/*
This display the feature vector
*/
model.userFactors.show(1, truncate = false)

In [8]:
/*
We can see if the model make sense. (if the recommandation are good). To do this, first we need to see what our test user (2093760) is listening to.
*/
val userID = 2093760
val existingArtistIDs = trainData.
    filter($"user" === userID).
    select("artist").as[Int].collect()
    
// after extraction of the artist ID that the user listen to. We can show their name.
artistByID.filter($"id" isin (existingArtistIDs:_*)).show()

In [9]:
/*
This function let us get the recommendation. The bad news is that, surprisingly, ALSModel does not have a method that directly computes top recommendations for a user. Its purpose is to estimate a user’s preference for any given artist. Spark 2.2 will add a recommendAll method to address this. This can be used to score all artists for a user and then return the few with the highest predicted score: 
*/
def makeRecommendations(model: ALSModel, userID: Int, howMany: Int): DataFrame = {
    // Select all artist IDs and pair with target user ID.
    val toRecommend = model.itemFactors.
      select($"id".as("artist")).
      withColumn("user", lit(userID))
     
    //  Score all artists, return top by score.
    model.transform(toRecommend).
      select("artist", "prediction").
      orderBy($"prediction".desc).
      limit(howMany)
  }

In [10]:
/*
Show the top recommendation for our test user but compare to his taste, we can see that it doesn't really match with the prediction.

*/
val topRecommendations = makeRecommendations(model, userID, 5)
//topRecommendations.show()

val recommendedArtistIDs = topRecommendations.select("artist").as[Int].collect()
artistByID.filter($"id" isin (recommendedArtistIDs:_*)).show()

In [11]:
def areaUnderCurve(positiveData: DataFrame, bAllArtistIDs: Broadcast[Array[Int]], predictFunction: (DataFrame => DataFrame)): Double = {

    // What this actually computes is AUC, per user. The result is actually something
    // that might be called "mean AUC".

    // Take held-out data as the "positive".
    // Make predictions for each of them, including a numeric score
    val positivePredictions = predictFunction(positiveData.select("user", "artist")).
      withColumnRenamed("prediction", "positivePrediction")

    // BinaryClassificationMetrics.areaUnderROC is not used here since there are really lots of
    // small AUC problems, and it would be inefficient, when a direct computation is available.

    // Create a set of "negative" products for each user. These are randomly chosen
    // from among all of the other artists, excluding those that are "positive" for the user.
    val negativeData = positiveData.select("user", "artist").as[(Int,Int)].
      groupByKey { case (user, _) => user }.
      flatMapGroups{ (userID:Int, userIDAndPosArtistIDs:Iterator[(Int,Int)]) =>
        val random = new Random()
        val posItemIDSet = userIDAndPosArtistIDs.map { case (_, artist) => artist }.toSet
        val negative = new ArrayBuffer[Int]()
        val allArtistIDs = bAllArtistIDs.value
        var i = 0
        // Make at most one pass over all artists to avoid an infinite loop.
        // Also stop when number of negative equals positive set size
        while (i < allArtistIDs.length && negative.size < posItemIDSet.size) {
          val artistID = allArtistIDs(random.nextInt(allArtistIDs.length))
          // Only add new distinct IDs
          if (!posItemIDSet.contains(artistID)) {
            negative += artistID
          }
          i += 1
        }
        // Return the set with user ID added back
        negative.map(artistID => (userID, artistID))
      }.toDF("user", "artist")

    // Make predictions on the rest:
    val negativePredictions = predictFunction(negativeData).
      withColumnRenamed("prediction", "negativePrediction")

    // Join positive predictions to negative predictions by user, only.
    // This will result in a row for every possible pairing of positive and negative
    // predictions within each user.
    val joinedPredictions = positivePredictions.join(negativePredictions, "user").
      select("user", "positivePrediction", "negativePrediction").cache()

    // Count the number of pairs per user
    val allCounts = joinedPredictions.
      groupBy("user").agg(count(lit("1")).as("total")).
      select("user", "total")
    // Count the number of correctly ordered pairs per user
    val correctCounts = joinedPredictions.
      filter($"positivePrediction" > $"negativePrediction").
      groupBy("user").agg(count("user").as("correct")).
      select("user", "correct")

    // Combine these, compute their ratio, and average over all users
    val meanAUC = allCounts.join(correctCounts, Seq("user"), "left_outer").
      select($"user", (coalesce($"correct", lit(0)) / $"total").as("auc")).
      agg(mean("auc")).
      as[Double].first()

    joinedPredictions.unpersist()

    meanAUC
  }

In [12]:
def predictMostListened(train: DataFrame)(allData: DataFrame): DataFrame = {
    val listenCounts = train.groupBy("artist").
      agg(sum("count").as("prediction")).
      select("artist", "prediction")
    allData.
      join(listenCounts, Seq("artist"), "left_outer").
      select("user", "artist", "prediction")
  }

In [13]:

val bArtistAlias = spark.sparkContext.broadcast(buildArtistAlias(rawArtistAlias))

val allData = buildCounts(rawUserArtistData, bArtistAlias)
val Array(trainData, cvData) = allData.randomSplit(Array(0.9, 0.1))
trainData.cache()
cvData.cache()

val allArtistIDs = allData.select("artist").as[Int].distinct().collect()
val bAllArtistIDs = spark.sparkContext.broadcast(allArtistIDs)

val mostListenedAUC = areaUnderCurve(cvData, bAllArtistIDs, predictMostListened(trainData))
println(mostListenedAUC)

val evaluations =
  for (rank     <- Seq(5,  30);
       regParam <- Seq(1.0, 0.0001);
       alpha    <- Seq(1.0, 40.0))
  yield {
    val model = new ALS().
      setSeed(Random.nextLong()).
      setImplicitPrefs(true).
      setRank(rank).setRegParam(regParam).
      setAlpha(alpha).setMaxIter(20).
      setUserCol("user").setItemCol("artist").
      setRatingCol("count").setPredictionCol("prediction").
      fit(trainData)

    val auc = areaUnderCurve(cvData, bAllArtistIDs, model.transform)

    model.userFactors.unpersist()
    model.itemFactors.unpersist()

    (auc, (rank, regParam, alpha))
  }

evaluations.sorted.reverse.foreach(println)

trainData.unpersist()
cvData.unpersist()


In [14]:
/*
We ones again rerun the recommandation for the test user but this time with ALS set with the good hyperparametre.
We can see that the recommandation are more accurate this time
*/

val bArtistAlias = spark.sparkContext.broadcast(buildArtistAlias(rawArtistAlias))
val allData = buildCounts(rawUserArtistData, bArtistAlias).cache()

val model = new ALS().
  setSeed(Random.nextLong()).
  setImplicitPrefs(true).
  setRank(10).setRegParam(1.0).setAlpha(40.0).setMaxIter(20).
  setUserCol("user").setItemCol("artist").
  setRatingCol("count").setPredictionCol("prediction").
  fit(allData)
allData.unpersist()

val userID = 2093760
val topRecommendations = makeRecommendations(model, userID, 5)

val recommendedArtistIDs = topRecommendations.select("artist").as[Int].collect()
artistByID.join(spark.createDataset(recommendedArtistIDs).toDF("id"), "id").
  select("name").show()

model.userFactors.unpersist()
model.itemFactors.unpersist()
