In [1]:
val rawUserArtistData =
  spark.read.textFile("user_artist_data_small.txt")

rawUserArtistData.take(5).foreach(println)

Intitializing Scala interpreter ...

Spark Web UI available at http://tacios-mbp-3.br.ibm.com:4040
SparkContext available as 'sc' (version = 2.4.3, master = local[*], app id = local-1570050767246)
SparkSession available as 'spark'


1059637 1000010 238
1059637 1000049 1
1059637 1000056 1
1059637 1000062 11
1059637 1000094 1


rawUserArtistData: org.apache.spark.sql.Dataset[String] = [value: string]


In [2]:
val userArtistDF = rawUserArtistData.map { line =>
  val Array(user, artist, _*) = line.split(' ')
  (user.toInt, artist.toInt)
}.toDF("user", "artist")
userArtistDF.agg(
  min("user"), max("user"), min("artist"), max("artist")).show()

+---------+---------+-----------+-----------+
|min(user)|max(user)|min(artist)|max(artist)|
+---------+---------+-----------+-----------+
|  1000647|  2288164|          1|   10788218|
+---------+---------+-----------+-----------+



userArtistDF: org.apache.spark.sql.DataFrame = [user: int, artist: int]


In [3]:
val rawArtistData = spark.read.textFile("artist_data_small.txt")
val artistByID = rawArtistData.flatMap { line =>
  val (id, name) = line.span(_ != '\t')
  if (name.isEmpty) {
    None
  } else {
    try {
      Some((id.toInt, name.trim))
    } catch {
      case _: NumberFormatException => None
    }
  }
}.toDF("id", "name")

rawArtistData: org.apache.spark.sql.Dataset[String] = [value: string]
artistByID: org.apache.spark.sql.DataFrame = [id: int, name: string]


In [4]:
val rawArtistAlias = spark.read.textFile("artist_alias_small.txt")
val artistAlias = rawArtistAlias.flatMap { line =>
  val Array(artist, alias) = line.split('\t')
  if (artist.isEmpty) {
    None
  } else {
    Some((artist.toInt, alias.toInt))
  }
}.collect().toMap

rawArtistAlias: org.apache.spark.sql.Dataset[String] = [value: string]
artistAlias: scala.collection.immutable.Map[Int,Int] = Map(1039896 -> 1277013, 1199139 -> 166, 1047491 -> 1003342, 9929763 -> 1003778, 2025676 -> 1001141, 9929753 -> 1007347, 2103190 -> 1002909, 1005489 -> 2003588, 2009180 -> 6751847, 1261152 -> 1007206, 6801236 -> 1013362, 6843530 -> 1260159, 1038051 -> 6684730, 10107676 -> 118, 1008455 -> 1020, 1351048 -> 71, 6606757 -> 1003888, 2061602 -> 6748393, 1289246 -> 1023527, 2036732 -> 71, 6614668 -> 7006467, 1014175 -> 1014175, 1197558 -> 1001943, 1012315 -> 1238836, 9928967 -> 15, 1055562 -> 1276662, 1037848 -> 1007201, 6923988 -> 2140107, 6634844 -> 1018408, 1244994 -> 1028445, 1042508 -> 1008824, 2126687 -> 1023928, 6806131 -> 1002061, 1017671 -> 1015311, 1275359 -> 1...

In [5]:
artistAlias.head

res2: (Int, Int) = (1039896,1277013)


In [6]:
artistByID.filter($"id" isin (1208690, 1003926)).show()

+-------+---------------+
|     id|           name|
+-------+---------------+
|1003926|Collective Soul|
+-------+---------------+



In [7]:
import org.apache.spark.sql._
import org.apache.spark.broadcast._

def buildCounts(
    rawUserArtistData: Dataset[String],
    bArtistAlias: Broadcast[Map[Int,Int]]): DataFrame = {
  rawUserArtistData.map { line =>
    val Array(userID, artistID, count) = line.split(' ').map(_.toInt)
    val finalArtistID =
      bArtistAlias.value.getOrElse(artistID, artistID)
    (userID, finalArtistID, count)
  }.toDF("user", "artist", "count")
}

val bArtistAlias = spark.sparkContext.broadcast(artistAlias)

val trainData = buildCounts(rawUserArtistData, bArtistAlias)
trainData.cache()

import org.apache.spark.sql._
import org.apache.spark.broadcast._
buildCounts: (rawUserArtistData: org.apache.spark.sql.Dataset[String], bArtistAlias: org.apache.spark.broadcast.Broadcast[Map[Int,Int]])org.apache.spark.sql.DataFrame
bArtistAlias: org.apache.spark.broadcast.Broadcast[scala.collection.immutable.Map[Int,Int]] = Broadcast(9)
trainData: org.apache.spark.sql.DataFrame = [user: int, artist: int ... 1 more field]
res4: trainData.type = [user: int, artist: int ... 1 more field]


In [8]:
import org.apache.spark.ml.recommendation._
import scala.util.Random

val model = new ALS().
    setSeed(Random.nextLong()).
    setImplicitPrefs(true).
    setRank(10).
    setRegParam(0.01).
    setAlpha(1.0).
    setMaxIter(5).
    setUserCol("user").
    setItemCol("artist").
    setRatingCol("count").
    setPredictionCol("prediction").
    fit(trainData)

import org.apache.spark.ml.recommendation._
import scala.util.Random
model: org.apache.spark.ml.recommendation.ALSModel = als_8483252df3ef


In [36]:
//2288164
//1000647
val userID = 2288164

val existingArtistIDs = trainData.
  filter($"user" === userID).
  select("artist").as[Int].collect()

artistByID.filter($"id" isin (existingArtistIDs:_*)).show()

+--------+--------------------+
|      id|                name|
+--------+--------------------+
| 1244627|    Hymie's Basement|
| 1099116|    Central Services|
|10592278|Frankie Lymond & ...|
|     926|   Souls of Mischief|
| 1278727|Slug, Aesop Rock ...|
| 1001339|       House of Pain|
|10606449|            ¸çüûêÎ¤¨|
|    2020|         Lauryn Hill|
| 1004028|    Notorious B.I.G.|
| 1014826|       Junior Senior|
| 1000584|            Al Green|
|10494100|Crypt The Warchil...|
|10494116|Jedi Mind Tricks;...|
| 1308603|    2Pac/Trick Daddy|
| 1009156|                 Mae|
| 1008335|Earth, Wind and Fire|
| 1009256|     Minnie Riperton|
| 1002896|               Mirah|
|    1087|          Biz Markie|
|10450217|          Nikkifurie|
+--------+--------------------+
only showing top 20 rows



userID: Int = 2288164
existingArtistIDs: Array[Int] = Array(1, 1000024, 1000033, 1000054, 1000113, 1000177, 1000241, 1000263, 1000433, 1000481, 1000487, 1000584, 1000591, 1000693, 1000737, 1000764, 1000781, 1001107, 1001277, 1001339, 1001534, 1001779, 1001819, 1001828, 1001855, 1002254, 1002325, 1002470, 1002513, 1002649, 1002726, 1002896, 1003328, 1003400, 1003430, 1003585, 1003665, 1003673, 1003681, 1003686, 1003752, 1003892, 1003928, 1004028, 1004317, 1004456, 1004496, 1004535, 1004841, 1006029, 1006034, 1006076, 1006672, 1007064, 1007260, 1007307, 1007416, 1007476, 1012243, 1007528, 1007543, 10078504, 1007883, 1008164, 1008286, 1008335, 1008337, 1008463, 1008515, 1008583, 1008663, 1009156, 1009226, 1009256, 1009393, 1009488, 1009544, 1009576, 1009893, 1010583, 1010650, 1010814, 1010...

In [10]:
def makeRecommendations(
    model: ALSModel,
    userID: Int,
    howMany: Int): DataFrame = {

  val toRecommend = model.itemFactors.
    select($"id".as("artist")).
    withColumn("user", lit(userID))

  model.transform(toRecommend).
    select("artist", "prediction").
    orderBy($"prediction".desc).
    limit(howMany)
}

makeRecommendations: (model: org.apache.spark.ml.recommendation.ALSModel, userID: Int, howMany: Int)org.apache.spark.sql.DataFrame


In [37]:
val topRecommendations = makeRecommendations(model, userID, 5)

topRecommendations: org.apache.spark.sql.DataFrame = [artist: int, prediction: float]


In [38]:
spark.conf.set("spark.sql.crossJoin.enabled", "true")

In [39]:
topRecommendations.show()

+-------+----------+
| artist|prediction|
+-------+----------+
|   2231| 1.7739185|
|   3292| 1.7348462|
|1037970| 1.6296855|
|   2085| 1.6195577|
|   3170| 1.5850995|
+-------+----------+



In [40]:
val recommendedArtistIDs =
  topRecommendations.select("artist").as[Int].collect()

artistByID.filter($"id" isin (recommendedArtistIDs:_*)).show()


+-------+-------------+
|     id|         name|
+-------+-------------+
|   2231|         Live|
|1037970|   Kanye West|
|   3292|       Eagles|
|   3170|Tracy Chapman|
|   2085|      Santana|
+-------+-------------+



recommendedArtistIDs: Array[Int] = Array(2231, 3292, 1037970, 2085, 3170)


In [67]:
def areaUnderCurve(
    positiveData: DataFrame,
    bAllArtistIDs: Broadcast[Array[Int]],
    predictFunction: (DataFrame => DataFrame)): Double = {
    ...
}

<console>: 6: error: illegal start of statement

In [51]:
val allData = buildCounts(rawUserArtistData, bArtistAlias)
val Array(trainData, cvData) = allData.randomSplit(Array(0.9, 0.1))
trainData.cache()
cvData.cache()

allData: org.apache.spark.sql.DataFrame = [user: int, artist: int ... 1 more field]
trainData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [user: int, artist: int ... 1 more field]
cvData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [user: int, artist: int ... 1 more field]
res33: cvData.type = [user: int, artist: int ... 1 more field]


In [52]:
val allArtistIDs = allData.select("artist").as[Int].distinct().collect()
val bAllArtistIDs = spark.sparkContext.broadcast(allArtistIDs)

allArtistIDs: Array[Int] = Array(1048726, 463, 1281854, 1008081, 1014690, 1087384, 1091250, 1233083, 1346305, 6623644, 833, 1316951, 1245054, 6642786, 1001129, 10130219, 1245208, 1004552, 1010281, 1036659, 1062730, 1291109, 6663903, 6668762, 6814190, 1279698, 6723762, 1004021, 1007972, 1009031, 1012617, 1013212, 1014191, 1023660, 1028228, 1040057, 1041189, 1053084, 10729995, 1189991, 1203598, 1230694, 1239654, 1259455, 1261703, 1262404, 1266726, 1276692, 1277913, 2281411, 3175, 4935, 6642933, 6911438, 6696725, 1038390, 2099635, 6649067, 1029443, 2022896, 1012261, 1034510, 1019303, 1015250, 1016546, 1023841, 1084951, 1260023, 1829, 496, 1007334, 10402275, 1059283, 1123104, 1126726, 1160165, 1171406, 2025147, 2061170, 2079446, 2139904, 2143912, 2146392, 6604291, 6615149, 6630663, 6636337,...

In [54]:
val model = new ALS().
    setSeed(Random.nextLong()).
    setImplicitPrefs(true).
    setRank(10).setRegParam(0.01).setAlpha(1.0).setMaxIter(5).
    setUserCol("user").setItemCol("artist").
    setRatingCol("count").setPredictionCol("prediction").
    fit(trainData)

model: org.apache.spark.ml.recommendation.ALSModel = als_baa76971fd8d


In [55]:
val topRecommendations = makeRecommendations(model, userID, 5)

topRecommendations: org.apache.spark.sql.DataFrame = [artist: int, prediction: float]


In [56]:
topRecommendations.show()

+-------+----------+
| artist|prediction|
+-------+----------+
|1002840| 2.0156026|
|    478| 1.6738842|
|    250| 1.6192838|
|1000848| 1.5848703|
|   1403|  1.560207|
+-------+----------+



In [57]:
val recommendedArtistIDs =
  topRecommendations.select("artist").as[Int].collect()

artistByID.filter($"id" isin (recommendedArtistIDs:_*)).show()

+-------+------------------+
|     id|              name|
+-------+------------------+
|1000848|        Roxy Music|
|   1403|       Marvin Gaye|
|    478|  Boards of Canada|
|1002840|The Blood Brothers|
|    250|           Outkast|
+-------+------------------+



recommendedArtistIDs: Array[Int] = Array(1002840, 478, 250, 1000848, 1403)
