In [6]:
import pyspark

from pyspark.sql import SparkSession

In [5]:
import os
import tarfile
import urllib.request

# Set the URL for the file to be downloaded
url = "https://storage.googleapis.com/aas-data-sets/profiledata_06-May-2005.tar.gz"

# Set the destination folder for extraction
destination_folder = "data"

# Create the destination folder if it doesn't exist
if not os.path.exists(destination_folder):
    os.makedirs(destination_folder)

# Set the filename based on the URL
filename = os.path.join(destination_folder, "profiledata_06-May-2005.tar.gz")

# Download the file
urllib.request.urlretrieve(url, filename)

# Extract the files
with tarfile.open(filename, "r:gz") as tar:
    members = tar.getmembers()
    tar.extractall(destination_folder)
# Print the extracted filenames
extracted_files = [os.path.basename(member.name) for member in members if member.isfile()]
print("Extracted files:")
for file in extracted_files:
    print(file)

print("Files downloaded and extracted successfully.")


Extracted files:
artist_data.txt
README.txt
user_artist_data.txt
artist_alias.txt
Files downloaded and extracted successfully.


In [7]:
spark = SparkSession.builder.config("spark.driver.memory", "12g").appName('music-recommender').getOrCreate()

In [8]:
raw_user_artist_path = "/content/data/profiledata_06-May-2005/user_artist_data.txt"
raw_user_artist_data = spark.read.text(raw_user_artist_path)

raw_user_artist_data.show(5)

+-------------------+
|              value|
+-------------------+
|       1000002 1 55|
| 1000002 1000006 33|
|  1000002 1000007 8|
|1000002 1000009 144|
|1000002 1000010 314|
+-------------------+
only showing top 5 rows



In [9]:
raw_artist_data = spark.read.text("/content/data/profiledata_06-May-2005/artist_data.txt")

raw_artist_data.show(5)

+--------------------+
|               value|
+--------------------+
|1134999	06Crazy Life|
|6821360	Pang Nakarin|
|10113088	Terfel, ...|
|10151459	The Flam...|
|6826647	Bodenstan...|
+--------------------+
only showing top 5 rows



In [10]:
raw_artist_alias = spark.read.text("/content/data/profiledata_06-May-2005/artist_alias.txt")

raw_artist_alias.show(5)

+----------------+
|           value|
+----------------+
| 1092764	1000311|
| 1095122	1000557|
| 6708070	1007267|
|10088054	1042317|
| 1195917	1042317|
+----------------+
only showing top 5 rows



**Preparing the Data**

In [11]:
raw_user_artist_data.show(10)

+-------------------+
|              value|
+-------------------+
|       1000002 1 55|
| 1000002 1000006 33|
|  1000002 1000007 8|
|1000002 1000009 144|
|1000002 1000010 314|
|  1000002 1000013 8|
| 1000002 1000014 42|
| 1000002 1000017 69|
|1000002 1000024 329|
|  1000002 1000025 1|
+-------------------+
only showing top 10 rows



Each line of the file contains a user ID, an artist ID, and a play count, separated by spaces. To compute statistics on the user ID, we split the line by space characters and parse the values as integers. The result is conceptually three “columns”: a user ID, artist ID, and count as ints. It makes sense to transform this to a dataframe with columns named “user”, “artist”, and “count” because it then becomes simple to compute simple statistics like the maximum and minimum:

In [12]:
from pyspark.sql.functions import split, min, max
from pyspark.sql.types import IntegerType, StringType

user_artist_df = raw_user_artist_data.withColumn('user',
                                    split(raw_user_artist_data['value'], ' ').\
                                    getItem(0).\
                                    cast(IntegerType()))
user_artist_df = user_artist_df.withColumn('artist',
                                    split(raw_user_artist_data['value'], ' ').\
                                    getItem(1).\
                                    cast(IntegerType()))
user_artist_df = user_artist_df.withColumn('count',
                                    split(raw_user_artist_data['value'], ' ').\
                                    getItem(2).\
                                    cast(IntegerType())).drop('value')

user_artist_df.select([min("user"), max("user"), min("artist"),\
                                    max("artist")]).show()

+---------+---------+-----------+-----------+
|min(user)|max(user)|min(artist)|max(artist)|
+---------+---------+-----------+-----------+
|       90|  2443548|          1|   10794401|
+---------+---------+-----------+-----------+



The maximum user and artist IDs are 2443548 and 10794401, respectively (and their minimums are 90 and 1; no negative values). These are comfortably smaller than 2147483647. No additional transformation will be necessary to use these IDs.

In [13]:
from pyspark.sql.functions import col

artist_by_id = raw_artist_data.withColumn('id', split(col('value'), '\s+', 2).\
                                                getItem(0).\
                                                cast(IntegerType()))
artist_by_id = artist_by_id.withColumn('name', split(col('value'), '\s+', 2).\
                                               getItem(1).\
                                               cast(StringType())).drop('value')

artist_by_id.show(5)

+--------+--------------------+
|      id|                name|
+--------+--------------------+
| 1134999|        06Crazy Life|
| 6821360|        Pang Nakarin|
|10113088|Terfel, Bartoli- ...|
|10151459| The Flaming Sidebur|
| 6826647|   Bodenstandig 3000|
+--------+--------------------+
only showing top 5 rows



In [14]:
artist_alias = raw_artist_alias.withColumn('artist',
                                          split(col('value'), '\s+').\
                                                getItem(0).\
                                                cast(IntegerType())).\
                                withColumn('alias',
                                            split(col('value'), '\s+').\
                                            getItem(1).\
                                            cast(StringType())).\
                                drop('value')

artist_alias.show(5)

+--------+-------+
|  artist|  alias|
+--------+-------+
| 1092764|1000311|
| 1095122|1000557|
| 6708070|1007267|
|10088054|1042317|
| 1195917|1042317|
+--------+-------+
only showing top 5 rows



In [15]:
artist_by_id.filter(artist_by_id.id.isin(1092764, 1000311)).show()

+-------+--------------+
|     id|          name|
+-------+--------------+
|1000311| Steve Winwood|
|1092764|Winwood, Steve|
+-------+--------------+



**Building Model**

In [16]:
from pyspark.sql.functions import broadcast, when

train_data = user_artist_df.join(broadcast(artist_alias),
                                              'artist', how='left')
train_data = train_data.withColumn('artist',
                                    when(col('alias').isNull(), col('artist')).\
                                    otherwise(col('alias')))

train_data = train_data.withColumn('artist', col('artist').\
                                             cast(IntegerType())).\
                                             drop('alias')

train_data.cache()

train_data.count()

24296858

We will use the Alternating Least Squares algorithm to compute latent factors from our dataset. This type of approach was popularized around the time of the Netflix Prize competition by papers like “Collaborative Filtering for Implicit Feedback Datasets” and “Large-Scale Parallel Collaborative Filtering for the Netflix Prize”. PySpark MLlib’s ALS implementation draws on ideas from both of these papers and is the only recommender algorithm currently implemented in Spark MLlib.

In [17]:
from pyspark.ml.recommendation import ALS

model = ALS(rank=10, seed=0, maxIter=5, regParam=0.1,
            implicitPrefs=True, alpha=1.0, userCol='user',
            itemCol='artist', ratingCol='count'). \
        fit(train_data)

In [18]:
model.userFactors.show(1, truncate = False)

+---+---------------------------------------------------------------------------------------------------------------------------+
|id |features                                                                                                                   |
+---+---------------------------------------------------------------------------------------------------------------------------+
|90 |[0.16020626, 0.20717518, -0.1719469, 0.06038466, 0.06272771, 0.54658705, -0.4048189, 0.43657345, -0.10396772, -0.042728323]|
+---+---------------------------------------------------------------------------------------------------------------------------+
only showing top 1 row



**Spot Checking Recommendations**

In [19]:
user_id = 2093760

existing_artist_ids = train_data.filter(train_data.user == user_id).select("artist").collect()

existing_artist_ids = [i[0] for i in existing_artist_ids]

artist_by_id.filter(col('id').isin(existing_artist_ids)).show()

+-------+---------------+
|     id|           name|
+-------+---------------+
|   1180|     David Gray|
|    378|  Blackalicious|
|    813|     Jurassic 5|
|1255340|The Saw Doctors|
|    942|         Xzibit|
+-------+---------------+



In [20]:
user_subset = train_data.select('user').where(col('user') == user_id).distinct()
top_predictions = model.recommendForUserSubset(user_subset, 5)

top_predictions.show()

+-------+--------------------+
|   user|     recommendations|
+-------+--------------------+
|2093760|[[2814, 0.0294106...|
+-------+--------------------+



In [21]:
top_predictions_pandas = top_predictions.toPandas()
print(top_predictions_pandas)

      user                                    recommendations
0  2093760  [(2814, 0.029410675168037415), (1300642, 0.028...


In [22]:
recommended_artist_ids = [i[0] for i in top_predictions_pandas.\
                                        recommendations[0]]

artist_by_id.filter(col('id').isin(recommended_artist_ids)).show()

+-------+----------+
|     id|      name|
+-------+----------+
|   2814|   50 Cent|
|   4605|Snoop Dogg|
|1007614|     Jay-Z|
|1001819|      2Pac|
|1300642|  The Game|
+-------+----------+



In [23]:
from pyspark.sql.functions import col, lit, count, mean, coalesce
from pyspark.sql import DataFrame
from typing import List
import random


def area_under_curve(positive_data: DataFrame, b_all_artist_ids: List[int], predict_function) -> float:
    positive_predictions = predict_function(positive_data.select("user", "artist")).withColumnRenamed("prediction", "positivePrediction")

    def negative_data_generation(user_artist_tuples):
        user_negative_artists = []
        for user, pos_artist_ids in user_artist_tuples:
            pos_artist_id_set = set(pos_artist_ids)
            negative_artists = set()
            while len(negative_artists) < len(pos_artist_id_set):
                artist_id = b_all_artist_ids[random.randint(0, len(b_all_artist_ids) - 1)]
                if artist_id not in pos_artist_id_set:
                    negative_artists.add(artist_id)
            user_negative_artists.extend([(user, artist_id) for artist_id in negative_artists])
        return user_negative_artists

    user_artist_rdd = positive_data.select("user", "artist").rdd.groupByKey().mapValues(list).collect()
    negative_data = spark.createDataFrame(negative_data_generation(user_artist_rdd), schema=["user", "artist"])

    negative_predictions = predict_function(negative_data).withColumnRenamed("prediction", "negativePrediction")

    joined_predictions = positive_predictions.join(negative_predictions, "user").select("user", "positivePrediction", "negativePrediction").cache()

    all_counts = joined_predictions.groupBy("user").agg(count(lit(1)).alias("total")).select("user", "total")
    correct_counts = joined_predictions.filter(col("positivePrediction") > col("negativePrediction")).groupBy("user").agg(count("user").alias("correct")).select("user", "correct")

    mean_auc = all_counts.join(correct_counts, ["user"], "left_outer").select(col("user"), (coalesce(col("correct"), lit(0)) / col("total")).alias("auc")).agg(mean("auc")).collect()[0][0]

    joined_predictions.unpersist()

    return mean_auc



all_data = user_artist_df.join(broadcast(artist_alias), 'artist', how='left') \
    .withColumn('artist', when(col('alias').isNull(), col('artist'))\
    .otherwise(col('alias'))) \
    .withColumn('artist', col('artist').cast(IntegerType())).drop('alias')

train_data, cv_data = all_data.randomSplit([0.9, 0.1], seed=54321)
train_data.cache()
cv_data.cache()

all_artist_ids = all_data.select("artist").distinct()
all_artist_ids = [i[0] for i in all_artist_ids.collect()]
# b_all_artist_ids = broadcast(all_artist_ids)

model = ALS(rank=10, seed=0, maxIter=5, regParam=0.1,
            implicitPrefs=True, alpha=1.0, userCol='user',
            itemCol='artist', ratingCol='count') \
        .fit(train_data)

area_under_curve(cv_data, all_artist_ids, model.transform)

0.9039777085450373