<h1>Recommending Music and the Audioscrobbler Dataset<span class="tocSkip"></span></h1>
<div class="toc"><ul class="toc-item"></ul></div>

In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import split, min, max
from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import col
from pyspark.sql.functions import broadcast, when
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.sql.functions import sum as _sum
from pprint import pprint
from itertools import product
from pyspark.sql.functions import col, lit, count, mean, coalesce
from pyspark.sql import DataFrame
from typing import List
import random

In [None]:
spark = (SparkSession
  .builder
  .appName('App')
  .config('spark.driver.memory', '4g')
  .getOrCreate())

In [6]:
raw_user_artist_path = '/Users/alexfil/Desktop/git_hub/spark/data/user_artist_data.txt'

In [7]:
raw_user_artist_data = spark.read.text(raw_user_artist_path)

In [8]:
raw_user_artist_data.show(5)

+-------------------+
|              value|
+-------------------+
|       1000002 1 55|
| 1000002 1000006 33|
|  1000002 1000007 8|
|1000002 1000009 144|
|1000002 1000010 314|
+-------------------+
only showing top 5 rows



In [9]:
raw_artist_data = spark.read.text('/Users/alexfil/Desktop/git_hub/spark/data/artist_data.txt')

In [10]:
raw_artist_data.show(5)

+--------------------+
|               value|
+--------------------+
|1134999\t06Crazy ...|
|6821360\tPang Nak...|
|10113088\tTerfel,...|
|10151459\tThe Fla...|
|6826647\tBodensta...|
+--------------------+
only showing top 5 rows



In [11]:
raw_artist_alias = spark.read.text('/Users/alexfil/Desktop/git_hub/spark/data/artist_alias.txt')

In [12]:
raw_artist_alias.show(5)

+-----------------+
|            value|
+-----------------+
| 1092764\t1000311|
| 1095122\t1000557|
| 6708070\t1007267|
|10088054\t1042317|
| 1195917\t1042317|
+-----------------+
only showing top 5 rows



# Preparing the Data

In [13]:
raw_user_artist_data.show(10)

+-------------------+
|              value|
+-------------------+
|       1000002 1 55|
| 1000002 1000006 33|
|  1000002 1000007 8|
|1000002 1000009 144|
|1000002 1000010 314|
|  1000002 1000013 8|
| 1000002 1000014 42|
| 1000002 1000017 69|
|1000002 1000024 329|
|  1000002 1000025 1|
+-------------------+
only showing top 10 rows



In [14]:
user_artist_df = raw_user_artist_data.withColumn('user',
                                                 split(raw_user_artist_data['value'], ' ').\
                                                 getItem(0).\
                                                 cast(IntegerType()))

In [15]:
user_artist_df = user_artist_df.withColumn('artist',
                                                 split(raw_user_artist_data['value'], ' ').\
                                                 getItem(1).\
                                                 cast(IntegerType()))
user_artist_df = user_artist_df.withColumn('count',
                                                 split(raw_user_artist_data['value'], ' ').\
                                                 getItem(2).\
                                                 cast(IntegerType())).drop('value')

In [16]:
user_artist_df.select([min('user'), max('user'), min('artist'), max('artist')]).show()

[Stage 4:>                                                          (0 + 4) / 4]

+---------+---------+-----------+-----------+
|min(user)|max(user)|min(artist)|max(artist)|
+---------+---------+-----------+-----------+
|       90|  2443548|          1|   10794401|
+---------+---------+-----------+-----------+



                                                                                

In [17]:
artist_by_id = raw_artist_data.withColumn('id', split(col('value'),
                                                     '\s+', 2).\
                                         getItem(0).\
                                         cast(IntegerType()))
artist_by_id = artist_by_id.withColumn('name', split(col('value'),
                                                     '\s+', 2).\
                                         getItem(1).\
                                         cast(StringType())).drop('value')
                                        

In [18]:
artist_by_id.show(5)

+--------+--------------------+
|      id|                name|
+--------+--------------------+
| 1134999|        06Crazy Life|
| 6821360|        Pang Nakarin|
|10113088|Terfel, Bartoli- ...|
|10151459| The Flaming Sidebur|
| 6826647|   Bodenstandig 3000|
+--------+--------------------+
only showing top 5 rows



In [19]:
artist_alias = raw_artist_alias.withColumn('artist',
                                           split(col('value'), '\s+').\
                                           getItem(0).\
                                           cast(IntegerType())).\
                                withColumn('alias',
                                           split(col('value'), '\s+').\
                                           getItem(1).\
                                           cast(StringType())).drop('value')

In [20]:
artist_alias.show(5)

+--------+-------+
|  artist|  alias|
+--------+-------+
| 1092764|1000311|
| 1095122|1000557|
| 6708070|1007267|
|10088054|1042317|
| 1195917|1042317|
+--------+-------+
only showing top 5 rows



In [21]:
artist_by_id.filter(artist_by_id.id.isin(1092764, 1000311)).show()

[Stage 10:>                                                         (0 + 3) / 3]

+-------+--------------+
|     id|          name|
+-------+--------------+
|1000311| Steve Winwood|
|1092764|Winwood, Steve|
+-------+--------------+





In [22]:
train_data = user_artist_df.join(broadcast(artist_alias),
                                'artist', how='left')

In [23]:
train_data = train_data.withColumn('artist',
                                   when(col('alias').isNull(), col('artist')).\
                                   otherwise(col('alias')))

In [24]:
train_data = train_data.withColumn('artist', col('artist').cast(IntegerType())).drop('alias')

In [25]:
train_data.cache()

DataFrame[artist: int, user: int, count: int]

In [27]:
train_data.count()

                                                                                

24296858

# Alternating Least Squares Algorithm

In [28]:
model = ALS(rank=10, seed=0, maxIter=5, regParam=0.1,
            implicitPrefs=True, alpha=1.0, userCol='user',
            itemCol='artist', ratingCol='count').fit(train_data)

24/07/02 22:12:41 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/07/02 22:12:41 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
24/07/02 22:12:42 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
                                                                                

In [29]:
model.userFactors.show(1, truncate=False)

+---+-----------------------------------------------------------------------------------------------------------------------------+
|id |features                                                                                                                     |
+---+-----------------------------------------------------------------------------------------------------------------------------+
|90 |[0.16020626, 0.20717518, -0.17194685, 0.060384676, 0.0627277, 0.54658705, -0.40481892, 0.43657345, -0.10396776, -0.042728294]|
+---+-----------------------------------------------------------------------------------------------------------------------------+
only showing top 1 row



# Spot Checking Recommendations

In [30]:
user_id = 2093760

In [31]:
existind_artist_ids = train_data.filter(train_data.user == user_id).select('artist').collect()

In [32]:
existind_artist_ids = [i[0] for i in existind_artist_ids]

In [33]:
artist_by_id.filter(col('id').isin(existind_artist_ids)).show()



+-------+---------------+
|     id|           name|
+-------+---------------+
|   1180|     David Gray|
|    378|  Blackalicious|
|    813|     Jurassic 5|
|1255340|The Saw Doctors|
|    942|         Xzibit|
+-------+---------------+



                                                                                

In [34]:
user_subset = train_data.select('user').where(col('user') == user_id).distinct()

In [35]:
top_predictions = model.recommendForUserSubset(user_subset, 5)

In [36]:
top_predictions.show()



+-------+--------------------+
|   user|     recommendations|
+-------+--------------------+
|2093760|[{2814, 0.0294106...|
+-------+--------------------+



                                                                                

In [37]:
top_predictions_pandas = top_predictions.toPandas()

                                                                                

In [38]:
top_predictions_pandas

Unnamed: 0,user,recommendations
0,2093760,"[(2814, 0.029410677030682564), (1300642, 0.028..."


In [39]:
recommended_artist_ids = [i[0] for i in top_predictions_pandas.recommendations[0]]

In [40]:
artist_by_id.filter(col('id').isin(recommended_artist_ids)).show()

[Stage 274:>                                                        (0 + 3) / 3]

+-------+----------+
|     id|      name|
+-------+----------+
|   2814|   50 Cent|
|   4605|Snoop Dogg|
|1007614|     Jay-Z|
|1001819|      2Pac|
|1300642|  The Game|
+-------+----------+





# Evaluating Recommendation Quality

In [41]:
def area_under_curve(positive_data: DataFrame, b_all_artist_ids: List[int], predict_function) -> float:
    positive_predictions = predict_function(positive_data.select("user", "artist")).withColumnRenamed("prediction", "positivePrediction")

    def negative_data_generation(user_artist_tuples):
        user_negative_artists = []
        for user, pos_artist_ids in user_artist_tuples:
            pos_artist_id_set = set(pos_artist_ids)
            negative_artists = set()
            while len(negative_artists) < len(pos_artist_id_set):
                artist_id = b_all_artist_ids[random.randint(0, len(b_all_artist_ids) - 1)]
                if artist_id not in pos_artist_id_set:
                    negative_artists.add(artist_id)
            user_negative_artists.extend([(user, artist_id) for artist_id in negative_artists])
        return user_negative_artists

    user_artist_rdd = positive_data.select("user", "artist").rdd.groupByKey().mapValues(list).collect()
    negative_data = spark.createDataFrame(negative_data_generation(user_artist_rdd), schema=["user", "artist"])

    negative_predictions = predict_function(negative_data).withColumnRenamed("prediction", "negativePrediction")

    joined_predictions = positive_predictions.join(negative_predictions, "user").select("user", "positivePrediction", "negativePrediction").cache()

    all_counts = joined_predictions.groupBy("user").agg(count(lit(1)).alias("total")).select("user", "total")
    correct_counts = joined_predictions.filter(col("positivePrediction") > col("negativePrediction")).groupBy("user").agg(count("user").alias("correct")).select("user", "correct")

    mean_auc = all_counts.join(correct_counts, ["user"], "left_outer").select(col("user"), (coalesce(col("correct"), lit(0)) / col("total")).alias("auc")).agg(mean("auc")).collect()[0][0]

    joined_predictions.unpersist()

    return mean_auc

In [42]:
all_data = user_artist_df.join(broadcast(artist_alias), 'artist', how='left') \
        .withColumn('artist', when(col('alias').isNull(), col('artist'))\
        .otherwise(col('alias'))) \
        .withColumn('artist', col('artist').cast(IntegerType())).drop('alias')

In [43]:
train_data, cv_data = all_data.randomSplit([0.9, 0.1], seed=54321)
train_data.cache()
cv_data.cache()

DataFrame[artist: int, user: int, count: int]

In [45]:
all_artist_ids = all_data.select("artist").distinct()
all_artist_ids = [i[0] for i in all_artist_ids.collect()]

                                                                                

In [46]:
model = ALS(rank=10, seed=0, maxIter=5, regParam=0.1,
            implicitPrefs=True, alpha=1.0, userCol='user',
            itemCol='artist', ratingCol='count').fit(train_data)

                                                                                

In [47]:
area_under_curve(cv_data, all_artist_ids, model.transform)

24/07/02 22:22:57 WARN TaskSetManager: Stage 421 contains a task of very large size (7120 KiB). The maximum recommended task size is 1000 KiB.
                                                                                

0.9036149652905634

In [58]:
def predict_most_listened(train):
    listen_counts = train.groupBy("artist")\
                        .agg(_sum("count").alias("prediction"))\
                        .select("artist", "prediction")
    return all_data.join(listen_counts, "artist", "left_outer").select("user", "artist", "prediction")

In [62]:
predict_most_listened(train_data)

In [None]:
area_under_curve(cv_data, all_artist_ids, predict_most_listened(train_data))

# Hyperparameter Selection

In [None]:
ranks = [5, 30]
reg_params = [4.0, 0.0001]
alphas = [1.0, 40.0]
hyperparam_combinations = list(product(*[ranks, reg_params, alphas]))

evaluations = []

for c in hyperparam_combinations:
    rank = c[0]
    reg_param = c[1]
    alpha = c[2]
    model = ALS().setSeed(0).setImplicitPrefs(True).setRank(rank).\
    setRegParam(reg_param).setAlpha(alpha).\
    setMaxIter(20).setUserCol("user").\
    setItemCol("artist").setRatingCol("count").\
    setPredictionCol("prediction").fit(train_data)

    auc = area_under_curve(cv_data, all_artist_ids, model.transform)

    model.userFactors.unpersist()
    model.itemFactors.unpersist()

    evaluations.append((auc, (rank, regParam, alpha)))

evaluations.sort(key=lambda x: x[0], reverse=True)
pprint(evaluations)

# Making Recommendations

In [None]:
some_users = all_data.select("user").distinct().limit(100)

def make_recommendations(model, user_id, num_recs):
    user_subset = train_data.select('user').where(col('user') == user_id).distinct()
    recommendations = model.recommendForUserSubset(user_subset, num_recs)
    return recommendations

some_recommendations = [(user_id[0], make_recommendations(model, user_id[0], 5)) for user_id in some_users.collect()]

for user_id, recs_df in some_recommendations:
    recs_df = recs_df.select("recommendations")
    recommended_artists = [row.asDict()["artist"] for row in recs_df.collect()[0][0]]
    print(f"{user_id} -> {', '.join(map(str, recommended_artists))}")