In [None]:
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.2.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m12.4 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.2-py2.py3-none-any.whl size=281824025 sha256=806d288a9cff00792d752c04195f34b7dd365a0cf9fc22f1e495bae8e037489c
  Stored in directory: /root/.cache/pip/wheels/6c/e3/9b/0525ce8a69478916513509d43693511463c6468db0de237c86
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [None]:
#GDRIVE
filepath = "/content/drive/MyDrive/adv_analytics/profiledata_06-May-2005/user_artist_data.txt" 

In [None]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('Adv_Analytics').getOrCreate()

In [None]:
#The main dataset is in the user_artist_data.txt file. It contains about 
# 141,000 unique users, and 1.6 million unique artists. About 24.2 million 
# users’ plays of artists are recorded, along with their counts.

raw_user_artist_data = spark.read.text(filepath)

In [None]:
raw_user_artist_data.show(5)

+-------------------+
|              value|
+-------------------+
|       1000002 1 55|
| 1000002 1000006 33|
|  1000002 1000007 8|
|1000002 1000009 144|
|1000002 1000010 314|
+-------------------+
only showing top 5 rows



In [None]:
# The dataset also gives the names of each artist by ID in the artist_data.txt file
#This might have misspellings etc
raw_artist_data = spark.read.text("/content/drive/MyDrive/adv_analytics/profiledata_06-May-2005/artist_data.txt")
raw_artist_data.show(5)

+--------------------+
|               value|
+--------------------+
|1134999\t06Crazy ...|
|6821360\tPang Nak...|
|10113088\tTerfel,...|
|10151459\tThe Fla...|
|6826647\tBodensta...|
+--------------------+
only showing top 5 rows



In [None]:
#artists mapped to proper names without error
raw_artist_alias = spark.read.text("/content/drive/MyDrive/adv_analytics/profiledata_06-May-2005/artist_alias.txt")
raw_artist_alias.show(5)

+-----------------+
|            value|
+-----------------+
| 1092764\t1000311|
| 1095122\t1000557|
| 6708070\t1007267|
|10088054\t1042317|
| 1195917\t1042317|
+-----------------+
only showing top 5 rows



In [None]:
# Preparing the Data
raw_user_artist_data.show(10)

+-------------------+
|              value|
+-------------------+
|       1000002 1 55|
| 1000002 1000006 33|
|  1000002 1000007 8|
|1000002 1000009 144|
|1000002 1000010 314|
|  1000002 1000013 8|
| 1000002 1000014 42|
| 1000002 1000017 69|
|1000002 1000024 329|
|  1000002 1000025 1|
+-------------------+
only showing top 10 rows



In [None]:
# Each line of the file contains a user ID, an artist ID, 
# and a play count, separated by spaces.

# We split this 3 things for easy statistics calculation

In [None]:
from pyspark.sql.functions import split, min, max
from pyspark.sql.types import IntegerType, StringType

In [None]:
user_artist_df = raw_user_artist_data.withColumn('user',
split(raw_user_artist_data['value'], ' ').\
getItem(0).\
cast(IntegerType()))

In [None]:
user_artist_df = user_artist_df.withColumn('artist',
split(raw_user_artist_data['value'], ' ').\
getItem(1).\
cast(IntegerType()))

In [None]:
user_artist_df = user_artist_df.withColumn('count',
split(raw_user_artist_data['value'], ' ').\
getItem(2).\
cast(IntegerType())).drop('value')

In [None]:
user_artist_df.select([min("user"), max("user"), min("artist"),\
max("artist")]).show()

+---------+---------+-----------+-----------+
|min(user)|max(user)|min(artist)|max(artist)|
+---------+---------+-----------+-----------+
|       90|  2443548|          1|   10794401|
+---------+---------+-----------+-----------+



In [None]:
# To know the artist names corresponding to the numeric IDs. raw_artist_data 
# contains the artist ID and name separated by a tab. 
# PySpark’s split function accepts regular expression values for the pattern
# parameter. We can split using the whitespace character \s

from pyspark.sql.functions import col
artist_by_id = raw_artist_data.withColumn('id', split(col('value'), '\s+', 2).\
getItem(0).\
cast(IntegerType()))

artist_by_id = artist_by_id.withColumn('name', split(col('value'), '\s+', 2).\
getItem(1).\
cast(StringType())).drop('value')

artist_by_id.show(5)

+--------+--------------------+
|      id|                name|
+--------+--------------------+
| 1134999|        06Crazy Life|
| 6821360|        Pang Nakarin|
|10113088|Terfel, Bartoli- ...|
|10151459| The Flaming Sidebur|
| 6826647|   Bodenstandig 3000|
+--------+--------------------+
only showing top 5 rows



In [None]:
# Parse the alias dataset too

artist_alias = raw_artist_alias.withColumn('artist',
                                           split(col('value'), '\s+').\
                                           getItem(0).\
                                           cast(IntegerType())).\
                                withColumn('alias',
                                           split(col('value'), '\s+').\
                                           getItem(1).\
                                           cast(StringType())).\
                                           drop('value')
                                           
artist_alias.show(5)

+--------+-------+
|  artist|  alias|
+--------+-------+
| 1092764|1000311|
| 1095122|1000557|
| 6708070|1007267|
|10088054|1042317|
| 1195917|1042317|
+--------+-------+
only showing top 5 rows



In [None]:
# This will prove the alias entry is correct
artist_by_id.filter(artist_by_id.id.isin(1092764, 1000311)).show()

+-------+--------------+
|     id|          name|
+-------+--------------+
|1000311| Steve Winwood|
|1092764|Winwood, Steve|
+-------+--------------+



In [None]:
# One Transformation
# The aliases dataset should be applied to convert all artist IDs 
#to a canonical ID, if a different canonical ID exists

from pyspark.sql.functions import broadcast, when

train_data = train_data = user_artist_df.join(broadcast(artist_alias),
'artist', how='left')

In [None]:
train_data = train_data.withColumn('artist',
when(col('alias').isNull(), col('artist')).\
otherwise(col('alias')))

train_data = train_data.withColumn('artist', col('artist').\
cast(IntegerType())).\
drop('alias')

In [None]:
train_data.cache()

DataFrame[artist: int, user: int, count: int]

In [None]:
train_data.count()

24296858

In [None]:
# We broadcast the artist_alias DataFrame created earlier. 
# This makes Spark send and hold in memory just one copy 
# for each executor in the cluster

# As a rule of thumb, it’s helpful to broadcast a significantly 
#smaller dataset when performing a join with a very big dataset

#The call to cache suggests to Spark that this DataFrame should be 
# temporarily stored after being computed and, furthermore, kept in 
# memory in the cluster. This is helpful because the ALS algorithm is 
# iterative and will typically need to access this data 10 times or more.

# When you use cache or persist, the DataFrame is not fully cached 
# until you trigger an action that goes through every record (e.g., count).

In [None]:
# Building a First Model

In [None]:
from pyspark.ml.recommendation import ALS
model = ALS(rank=10, seed=0, maxIter=5, regParam=0.1,
implicitPrefs=True, alpha=1.0, userCol='user',
itemCol='artist', ratingCol='count'). \
fit(train_data)

In [None]:
# see some feature vectors, try the following, which displays just one row
# and does not truncate the wide display of the feature vector:
model.userFactors.show(1, truncate = False)

In [None]:
# Spot Checking Recommendations

In [None]:
# Take, for example, user 2093760. First, let’s look at his or her plays 
# to get a sense of the person’s tastes. Extract the IDs of artists that 
# this user has listened to and print their names.

user_id = 2093760

#Collect dataset of artist ID.
existing_artist_ids = train_data.filter(train_data.user == user_id) \
.select("artist").collect()
existing_artist_ids = [i[0] for i in existing_artist_ids]

#Filter in those artists.
artist_by_id.filter(col('id').isin(existing_artist_ids)).show()

In [None]:
# Now make recommendations

user_subset = train_data.select('user').where(col('user') == user_id).distinct()
top_predictions = model.recommendForUserSubset(user_subset, 5)
top_predictions.show()

In [None]:
# The resulting recommendations contain lists comprised of artist ID and,
# of course, “predictions.” For this type of ALS algorithm, the prediction
# is an opaque value normally between 0 and 1, where higher values mean a
# better recommendation. It is not a probability but can be thought of as
# an estimate of a 0/1 value indicating whether the user won’t or will 
# interact with the artist, respectively.

top_predictions_pandas = top_predictions.toPandas()
print(top_prediction_pandas)

In [None]:
recommended_artist_ids = [i[0] for i in top_predictions_pandas.\
recommendations[0]]

In [None]:
artist_by_id.filter(col('id').isin(recommended_artist_ids)).show()

In [None]:
# Evaluating Recommendation Quality
# Train Test Split 

In [None]:
# area under the curve AUC may be viewed as the probability that a 
# randomly chosen good recommendation ranks above a randomly chosen
# bad recommendation.

In [None]:
# Computing AUC

In [None]:
def area_under_curve(positive_data,b_all_artist_IDs,predict_function):
    

In [None]:
all_data = user_artist_df.join(broadcast(artist_alias), 'artist', how='left') \
.withColumn('artist', when(col('alias').isNull(), col('artist'))\
.otherwise(col('alias'))) \
.withColumn('artist', col('artist').cast(IntegerType())).drop('alias')

In [None]:
train_data, cv_data = all_data.randomSplit([0.9, 0.1], seed=54321)
train_data.cache()
cv_data.cache()

In [None]:
all_artist_ids = all_data.select("artist").distinct().count()
b_all_artist_ids = broadcast(all_artist_ids)

In [None]:
model = ALS(rank=10, seed=0, maxIter=5, regParam=0.1,
implicitPrefs=True, alpha=1.0, userCol='user',
itemCol='artist', ratingCol='count') \
.fit(train_data)

In [None]:
area_under_curve(cv_data, b_all_artist_ids, model.transform)

In [None]:
some_users = all_data.select("user").distinct().limit(100)

In [None]:
val someRecommendations = someUsers.map(userID => (userID, makeRecommendations(model, userID, 5)))
someRecommendations.foreach { case (userID, recsDF) =>
val recommendedArtists = recsDF.select("artist").as[Int].collect()
println(s"$userID -> ${recommendedArtists.mkString(", ")}")
}