Author: Youssef Belyazid <br>
Email: belyazidyous@gmail.com <br>
Year: 2021

### Loading lastfm Dataset

In [0]:
dbutils.fs.mv("dbfs:/FileStore/tables/data_lastfm/data_lastfm.zip","file:/databricks/driver/data_lastfm.zip")

In [0]:
%%bash
unzip data_lastfm.zip

In [0]:
dbutils.fs.mv("file:/databricks/driver/data_lastfm.csv", "dbfs:/FileStore/tables/data_lastfm.csv")

In [0]:
from pyspark.sql.functions import col, explode, sum, count, expr
from pyspark.ml.feature import StringIndexer, IndexToString
from pyspark.ml import Transformer
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark import keyword_only
from pyspark.ml.evaluation import Evaluator
from pyspark.ml.param.shared import Param
from pyspark.mllib.evaluation import RankingMetrics
from pyspark.sql import Window
import pyspark.sql.functions as F
from pyspark.ml.recommendation import ALS, ALSModel
import pandas as pd

### Reading and preprocessing the data

In [0]:
lastfm_df = spark.read.format("csv").option("inferSchema", "true").option("header","true").\
load("dbfs:/FileStore/tables/data_lastfm.csv").\
withColumn('play_count', col('play_count').cast('integer'))

lastfm_df.cache()

In [0]:
class clean_data(Transformer):
  
    def _transform(self, df):
      clean_df = df.dropna()
      return clean_df
      

class preprocess_data(Transformer):
  
  @keyword_only
  def __init__(self,cutoff_items=5, cutoff_users=750):
      super(preprocess_data, self).__init__()
      self.cutoff_items = cutoff_items
      self.cutoff_users = cutoff_users

  def _transform(self, df):
    df1 = df \
            .groupBy('user_sha1') \
            .agg(count('artist_name').alias('artist_name_count')) \
            .where(f'artist_name_count >= {self.cutoff_items}') \
            .select('user_sha1')

    df1 = df.join(df1, 'user_sha1', 'inner')

    df2 = df1 \
            .groupBy('artist_name') \
            .agg(count('user_sha1').alias('user_count')) \
            .where(f'user_count >= {self.cutoff_users}') \
            .select('artist_name')

    preprocessed_data = df.join(df2, 'artist_name', 'inner')

    return preprocessed_data


In [0]:
clean_data = clean_data()
lastfm_df = clean_data.transform(lastfm_df)

preprocess_data = preprocess_data()
lastfm_df = preprocess_data.transform(lastfm_df)

In [0]:
display(lastfm_df.take(5))

artist_name,user_sha1,play_count
1349,0039f6a10a8afc639e621ec4a6601306bafd9adf,4
1349,008c354e33658fb3b68de133ec99e7d57f191bb0,190
1349,00b647282eee5f49a1f811f45c55f8878dae8383,120
1349,00ec5fa841a608ce9c1280771cd95b48466dd6d0,68
1349,012876befe9c49325fdd8e10f00f94b920b174c6,19


### Exploring the data

In [0]:
def get_sparsity(df, col_users, col_items, rating):
  num_users = df.select(col_users).distinct().count()
  num_artists = df.select(col_items).distinct().count()
  num_ratings = df.select(rating).count()
  sparsity = round((1 - (num_ratings)/(num_users*num_artists))*100,3)
  return sparsity

In [0]:
sparsity_rate = get_sparsity(lastfm_df, 'user_sha1', 'artist_name', 'play_count')
print(f'the data sparsity rate is:{sparsity_rate}')

In [0]:
# top 5 most played artists
display(lastfm_df.groupby('artist_name').agg(sum('play_count').alias("sum_play_count"))\
        .sort("sum_play_count", ascending=False).take(5))

artist_name,sum_play_count
the beatles,30466827
radiohead,27426234
coldplay,16686772
pink floyd,15943557
metallica,15481852


### Training ALS

In [0]:
#converting user_sha1 and artist_name to integer indexes to fit als requirements

indexer_user_id = StringIndexer(inputCol="user_sha1", outputCol="user_id",handleInvalid="skip")
lastfm_indexed = indexer_user_id.fit(lastfm_df).transform(lastfm_df).\
withColumn('user_id', col('user_id').cast('integer'))

indexer_artist_id = StringIndexer(inputCol="artist_name", outputCol="artist_id",handleInvalid="skip")
index_artist_id_model = indexer_artist_id.fit(lastfm_indexed) 
lastfm_indexed = index_artist_id_model.transform(lastfm_indexed).\
withColumn('artist_id', col('artist_id').cast('integer'))

lastfm_indexed = lastfm_indexed.select('user_id', 'artist_id', 'play_count')

display(lastfm_indexed.head(5))

user_id,artist_id,play_count
76877,2506,4
120115,2506,190
31011,2506,120
269515,2506,68
312995,2506,19


In [0]:
# model hyperparameters fine-tuning using cross-validation
als = ALS(userCol="user_id",
          itemCol="artist_id",
          ratingCol="play_count",
          nonnegative = True,
          implicitPrefs = True,
          coldStartStrategy="drop",
          checkpointInterval=1)

rankingEvaluator = RankingEvaluator()

param_grid = ParamGridBuilder()\
            .addGrid(als.rank, [5,10,20])\
            .addGrid(als.alpha,[1,10,40,60])\
            .addGrid(als.regParam, [.01, .1, 1, 10,100]) \
            .addGrid(als.maxIter, [10, 12, 15])\
            .build()

cv = CrossValidator(estimator=als,
                    estimatorParamMaps=param_grid ,
                    evaluator=rankingEvaluator,
                    numFolds=5)

cvModel = cv.fit(lastfm_indexed)

In [0]:
(train, test) = lastfm_indexed.randomSplit([0.8, 0.2], seed = 1)

In [0]:
# model with the best hyper-paramters combination

als = ALS(userCol="user_id",
          itemCol="artist_id",
          ratingCol="play_count",
          nonnegative = False,
          implicitPrefs = True,
          coldStartStrategy="drop",
         alpha=40)
model = als.fit(train)

In [0]:
#saving the model
def save_model(model, path):
  model.save(path)
  
def load_model(path):
  model = ALSModel.load(path)
  return model

save_model(model, 'dbfs:/FileStore/als_model')

In [0]:
#utility function for evaluating the model

class RankingEvaluator(Evaluator):

    @keyword_only
    def __init__(self, k=10):
        super(RankingEvaluator, self).__init__()
        self.k = k

    def _evaluate(self, predictedDF):
      
        windowSpec = Window.partitionBy('user_id').orderBy(col('prediction').desc())
        perUserPredictedItemsDF = predictedDF \
            .select('user_id', 'artist_id', 'prediction', F.rank().over(windowSpec).alias('rank')) \
            .where('rank <= {0}'.format(self.k)) \
            .groupBy('user_id') \
            .agg(expr('collect_list(artist_id) as items'))

        windowSpec = Window.partitionBy('user_id').orderBy(col('play_count').desc())
        perUserActualItemsDF = predictedDF \
            .select('user_id', 'artist_id', 'play_count', F.rank().over(windowSpec).alias('rank')) \
            .where('rank <= {0}'.format(self.k)) \
            .groupBy('user_id') \
            .agg(expr('collect_list(artist_id) as items'))

        perUserItemsRDD = perUserPredictedItemsDF.join(F.broadcast(perUserActualItemsDF), 'user_id', 'inner') \
            .rdd \
            .map(lambda row: (row[1], row[2]))

        if perUserItemsRDD.isEmpty():
            return 0.0

        rankingMetrics = RankingMetrics(perUserItemsRDD)
        metric = rankingMetrics.meanAveragePrecisionAt(self.k)
        return metric

In [0]:
#evaluating the model on the test set 

predictions_df = model.transform(test)
re = RankingEvaluator()
re.evaluate(predictions_df)

### Saving a dataframe with user_id, actual artists listened to, and recommended artists

In [0]:
# generating a recommendation of 10 artists for each user
rec = model.recommendForAllUsers(10)\
.selectExpr("user_id", "explode(recommendations) as recom")\
.select("user_id", "recom.artist_id")

In [0]:
# converting artist_id back to artist_name and grouping them in a list
labelReverse = IndexToString(inputCol="artist_id", outputCol='artist_name', labels = index_artist_id_model.labels)
rec_transformed = labelReverse.transform(rec)
final_rec = rec_transformed.groupBy('user_id').\
agg(expr("collect_list(artist_name) as artists"))

In [0]:
# joining the dataframe with artists recommended and artists actually listned to on user_id
predvsactual = labelReverse.transform(train).\
orderBy(col("user_id"), expr("play_count DESC")).groupBy('user_id').\
agg(expr("collect_list(artist_name) as artists_actual")).join(final_rec, ["user_id"]).rdd\
.map(lambda row: (row[0],row[1][:10],row[2]))

In [0]:
# exporting the previous dataframe via pickle
predvsactual_df = predvsactual.toDF().toPandas().rename(columns={"_1": "user_id", "_2": "Listened to", "_3":"Recommendations"})
predvsactual_df.to_pickle('recommendations-artists.pkl',protocol=4)

In [0]:
predvsactual_df.head()

Unnamed: 0,user_id,Listened to,Recommendations
0,31,"[daft punk, café del mar, leonard cohen, röyks...","[wir sind helden, schiller, sasha, acda en de ..."
1,34,"[grizzly bear, arvo pärt, digitalism, neutral ...","[alina orlova, sopor aeternus & the ensemble o..."
2,53,"[mogwai, hadouken!, the presets, elliott smith...","[canned heat, emir kusturica & the no smoking ..."
3,65,"[sum 41, hoobastank, the offspring, nirvana, a...","[bad company, frank klepacki, kotoko, raappana..."
4,78,"[3 doors down, the strokes, r.e.m., peaches, +...","[angerfist, drive-by truckers, black lips, the..."
