# 1. LAST-FM: Time Weighted ALS Recommender

This notebook implements a simple ALS recommender based on the LastFM user listening dataset. It uses spark and is written in Scala. Minimal data cleaning/pre-processing is performed to provide a baseline model. 

## 1.1 Imports and set up 

Key libraries are imported, the spark session is initialised and the listening data is loaded in. 

In [1]:
import $ivy.`org.apache.spark::spark-sql:3.0.0` // Or use any other 2.x version here

[32mimport [39m[36m$ivy.$                                   // Or use any other 2.x version here[39m

In [2]:
import $ivy.`org.apache.spark::spark-mllib:3.0.0`

[32mimport [39m[36m$ivy.$                                    [39m

In [3]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.ml.feature.{StringIndexer,IndexToString}
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.recommendation.ALS
import org.apache.spark.ml.evaluation.RegressionEvaluator
import org.apache.spark.mllib.evaluation.{RankingMetrics, RegressionMetrics}
import org.apache.spark.ml.feature.RobustScaler
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.functions.vector_to_array
import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
import org.apache.spark.sql._

[32mimport [39m[36morg.apache.spark.sql.SparkSession
[39m
[32mimport [39m[36morg.apache.spark.ml.feature.{StringIndexer,IndexToString}
[39m
[32mimport [39m[36morg.apache.spark.ml.Pipeline
[39m
[32mimport [39m[36morg.apache.spark.ml.recommendation.ALS
[39m
[32mimport [39m[36morg.apache.spark.ml.evaluation.RegressionEvaluator
[39m
[32mimport [39m[36morg.apache.spark.mllib.evaluation.{RankingMetrics, RegressionMetrics}
[39m
[32mimport [39m[36morg.apache.spark.ml.feature.RobustScaler
[39m
[32mimport [39m[36morg.apache.spark.ml.feature.VectorAssembler
[39m
[32mimport [39m[36morg.apache.spark.ml.functions.vector_to_array
[39m
[32mimport [39m[36morg.apache.spark.sql.types._
[39m
[32mimport [39m[36morg.apache.spark.sql.functions._
[39m
[32mimport [39m[36morg.apache.spark.sql._[39m

In [5]:
import org.apache.log4j.{Level, Logger}
Logger.getLogger("org").setLevel(Level.OFF)



val spark = {
  NotebookSparkSession.builder()
    .master("local[*]")
    .appName("lastfm")
    .getOrCreate()
}

Loading spark-stubs
Getting spark JARs
Creating SparkSession


Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties


[32mimport [39m[36morg.apache.log4j.{Level, Logger}
[39m
[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@4391f09a

In [6]:
// path to last-fm dataset. Can be downloaded here: http://millionsongdataset.com/lastfm/
var data_path:String = "../resources/lastfm-dataset-1K/userid-timestamp-artid-artname-traid-traname.tsv"

In [7]:
// schema defined below to set column names and types. 
val schema = new StructType()
            .add("user_id", StringType, true)
            .add("timestamp", DateType, true)
            .add("artist_id", StringType, true)
            .add("artist_name", StringType, true)
            .add("track_id", StringType, true)
            .add("track_name", StringType, true)

[36mschema[39m: [32mStructType[39m = [33mStructType[39m(
  [33mStructField[39m([32m"user_id"[39m, StringType, true, {}),
  [33mStructField[39m([32m"timestamp"[39m, DateType, true, {}),
  [33mStructField[39m([32m"artist_id"[39m, StringType, true, {}),
  [33mStructField[39m([32m"artist_name"[39m, StringType, true, {}),
  [33mStructField[39m([32m"track_id"[39m, StringType, true, {}),
  [33mStructField[39m([32m"track_name"[39m, StringType, true, {})
)

In [8]:
// read in data
val listener_data = spark
                    .read
                    .options(Map(
                      "header" -> "false",
                      "dateFormat" -> "yyyy-MM-ddTHH:mm:ssZ"
                    ))
                    .schema(schema)
                    .format("org.apache.spark.sql.execution.datasources.csv.CSVFileFormat")
                    .option("sep", "\t")
                    .load(data_path)
listener_data.showHTML(20, 200)

user_id,timestamp,artist_id,artist_name,track_id,track_name
user_000001,2009-05-04,f1b1cf71-bd35-4e99-8624-24a6e15f133a,Deep Dish,,Fuck Me Im Famous (Pacha Ibiza)-09-28-2007
user_000001,2009-05-04,a7f7df4a-77d8-4f12-8acd-5c60c93f4de8,坂本龍一,,Composition 0919 (Live_2009_4_15)
user_000001,2009-05-04,a7f7df4a-77d8-4f12-8acd-5c60c93f4de8,坂本龍一,,Mc2 (Live_2009_4_15)
user_000001,2009-05-04,a7f7df4a-77d8-4f12-8acd-5c60c93f4de8,坂本龍一,,Hibari (Live_2009_4_15)
user_000001,2009-05-04,a7f7df4a-77d8-4f12-8acd-5c60c93f4de8,坂本龍一,,Mc1 (Live_2009_4_15)
user_000001,2009-05-04,a7f7df4a-77d8-4f12-8acd-5c60c93f4de8,坂本龍一,,To Stanford (Live_2009_4_15)
user_000001,2009-05-04,a7f7df4a-77d8-4f12-8acd-5c60c93f4de8,坂本龍一,,Improvisation (Live_2009_4_15)
user_000001,2009-05-04,a7f7df4a-77d8-4f12-8acd-5c60c93f4de8,坂本龍一,,Glacier (Live_2009_4_15)
user_000001,2009-05-04,a7f7df4a-77d8-4f12-8acd-5c60c93f4de8,坂本龍一,,Parolibre (Live_2009_4_15)
user_000001,2009-05-04,a7f7df4a-77d8-4f12-8acd-5c60c93f4de8,坂本龍一,,Bibo No Aozora (Live_2009_4_15)


[36mlistener_data[39m: [32mDataFrame[39m = [user_id: string, timestamp: date ... 4 more fields]

In [9]:
import spark.implicits._

val df = listener_data.na.drop()
val max_date = df.agg(max("timestamp")).map(_.getDate(0)).collectAsList().toArray()(0)

[32mimport [39m[36mspark.implicits._

[39m
[36mdf[39m: [32mDataFrame[39m = [user_id: string, timestamp: date ... 4 more fields]
[36mmax_date[39m: [32mObject[39m = 2013-09-29

In [13]:
val date_time_df = df.select("user_id", "track_id", "timestamp")
        .groupBy("user_id", "track_id")
        .agg(count("*").alias("count"), max("timestamp").alias("last_heard")).withColumn("max_date", lit(max_date))
val df_agg = date_time_df
             .withColumn("datediff", datediff(col("max_date"), col("last_heard")))
             .select("user_id", "track_id", "count", "datediff")
             .orderBy("datediff")
val df_agg_filtered = df_agg.limit(10000)
df_agg_filtered.showHTML(20, 200)

user_id,track_id,count,datediff
user_000762,b5b40605-5a81-46b4-a51e-2b1ec7964c1a,1,0
user_000405,223f9324-3546-446b-96a2-5f663192dd42,1,1143
user_000651,d9f41e96-e7c5-401b-ae84-a2032a631409,19,1563
user_000960,fe1e042b-e443-4f36-97eb-8955ad6fcca0,32,1563
user_000960,f30a68e0-3598-4b46-b52d-d3ea3b447615,5,1563
user_000612,b0990649-efb6-469f-96e9-2376d43a8cca,198,1563
user_000651,3ffed2aa-435d-41f9-a30d-58d28ad2949c,22,1563
user_000612,459b467a-81f1-4a0d-afdd-1dae11797405,26,1563
user_000310,29d88e0a-6114-47fd-9764-e25072a721fe,1,1563
user_000651,d02aa861-ac38-418c-8755-85ed47fbd2a5,26,1563


[36mdate_time_df[39m: [32mDataFrame[39m = [user_id: string, track_id: string ... 3 more fields]
[36mdf_agg[39m: [32mDataset[39m[[32mRow[39m] = [user_id: string, track_id: string ... 2 more fields]
[36mdf_agg_filtered[39m: [32mDataset[39m[[32mRow[39m] = [user_id: string, track_id: string ... 2 more fields]

In [14]:
df_agg_filtered.cache() // recommended to prevent repeating the calculation

val condition = col("datediff") > 2000
val train = df_agg_filtered.filter(condition)
val test = df_agg_filtered.filter(not(condition))

[36mres13_0[39m: [32mDataset[39m[[32mRow[39m] = [user_id: string, track_id: string ... 2 more fields]
[36mcondition[39m: [32mColumn[39m = (datediff > 2000)
[36mtrain[39m: [32mDataset[39m[[32mRow[39m] = [user_id: string, track_id: string ... 2 more fields]
[36mtest[39m: [32mDataset[39m[[32mRow[39m] = [user_id: string, track_id: string ... 2 more fields]
[36mres13_4[39m: [32mLong[39m = [32m0L[39m

In [16]:
println(train.count)

0


The dataframe is then aggregated by user and track, to get the number of times a user has heard a particular track. 

In [None]:
val Array(training, test) = df_agg_filtered.randomSplit(Array[Double](0.8, 0.2), 18)

//revisit to make more efficient

val feat = df_agg_filtered.columns.filter(_ .contains("id"))
val inds = feat.map { colName =>
   new StringIndexer()
    .setInputCol(colName)
    .setOutputCol(colName.replace("id", "index"))
    .fit(df_agg_filtered)
    .setHandleInvalid("keep")
}

val va = new VectorAssembler()
    .setInputCols(Array("count"))
    .setOutputCol("count_assembled")

val scaler = new RobustScaler()
  .setInputCol("count_assembled")
  .setOutputCol("rating")

val pipeline = new Pipeline()
  .setStages(inds.toArray ++ Array(va, scaler))
  
val tr_s = pipeline.fit(training).transform(training)
val ts_s = pipeline.fit(training).transform(test)

val tr_full = tr_s.withColumn("rating_as_array", vector_to_array(tr_s("rating")).getItem(0))
val ts_full = ts_s.withColumn("rating_as_array", vector_to_array(ts_s("rating")).getItem(0))

val tr_final = tr_full.select("user_index", "track_index", "count","rating_as_array").orderBy("user_index")
val ts_final = ts_full.select("user_index", "track_index", "count", "rating_as_array").orderBy("user_index")

In [None]:
tr_final.show()

In [None]:
val als = new ALS()
  .setRank(5)
  .setUserCol("user_index")
  .setItemCol("track_index")
  .setRatingCol("rating_as_array")

val model = als.fit(tr_final)
model.setColdStartStrategy("drop")

val predictions = model.transform(ts_final)

val evaluator = new RegressionEvaluator()
  .setMetricName("rmse")
  .setLabelCol("rating_as_array")
  .setPredictionCol("prediction")

val rmse = evaluator.evaluate(predictions)
println(s"Root-mean-square error = $rmse")


In [None]:
val userRecs = model.recommendForAllUsers(10)
val movieRecs = model.recommendForAllItems(10)

In [None]:
val firsttworecs = userRecs
    .withColumn("columns",expr("struct(recommendations[0] as rec1, recommendations[0] as rec2) as columns"))
    .select("user_index","columns.*")
firsttworecs.show()

In [None]:
val firstRec = firsttworecs.select("user_index", "rec1.*")
firstRec.show()

In [None]:
// TODO make more efficient 
import spark.implicits._
val users = df_agg_filtered.select("user_id").map(_.getString(0)).distinct().collectAsList().toArray().map(_.asInstanceOf[String])
val usermap = new IndexToString()
    .setInputCol("user_index")
    .setOutputCol("user_id")
    .setLabels(users)

val userout = usermap.transform(firstRec)

val tracks = df_agg_filtered.select("track_id").map(_.getString(0)).distinct().collectAsList().toArray().map(_.asInstanceOf[String])
val trackmap = new IndexToString()
    .setInputCol("track_index")
    .setOutputCol("track_id")
    .setLabels(tracks)

val trackout = trackmap.transform(userout)

In [None]:
trackout.show()

In [None]:
val final_result = trackout.as("results")
        .join(df.as("in"), $"results.track_id" === $"in.track_id")
        .select("results.user_id", "in.track_name", "results.rating").distinct()
final_result.show()