# Setting up a Spark session and importing the necessary libraries, including ALS from pyspark.ml.recommendation.

In [3]:
from pyspark.ml.recommendation import ALS
from pyspark.sql import SparkSession
appName="Collaborative Filtering with PySpark"

In [4]:
spark = SparkSession.builder.appName(appName).getOrCreate()

In [5]:
sc = spark.sparkContext

In [6]:
from pyspark.sql.types import StructType,StructField,IntegerType,StringType,LongType
from pyspark.sql.functions import col

# Defining schemas for the datasets: one for artists, one for user-artist interactions, and one for artist aliases.

In [10]:
schema_artist = StructType([StructField("artistId",StringType(),True),StructField("artistName",StringType(),True)])
schema_user_artist = StructType([StructField("userId",StringType(),True),StructField("artistId",StringType(),True),StructField("playCount",StringType(),True)])
schema_alias = StructType([StructField("badId",StringType(),True),StructField("goodId",StringType(),True)])


# Loading datasets from text files into DataFrames. Applying a parser function to handle delimiters and convert IDs to integers where needed. Mapping any alias IDs to canonical artist IDs to ensure data consistency.

In [13]:
def parser(s, delimeters=" ", to_int=None):
    s = s.split(delimeters)
    if to_int:
        return tuple([int(s[i]) if i in to_int else s[i] for i in range(len(s))])
    return tuple(s)

artistData = sc.textFile("artist_data_small.txt").map(lambda x: parser(x,'\t',[0]))

artistAlias = sc.textFile("artist_alias_small.txt").map(lambda x: parser(x,'\t', [0,1]))

artistAliasMap = artistAlias.collectAsMap()
userArtistData = sc.textFile("user_artist_data_small.txt").map(lambda x: parser(x,' ',[0,1,2]))
userArtistData = userArtistData.map(lambda x: (x[0], artistAliasMap.get(x[1], x[1]), x[2]))

In [14]:
print (artistData)

PythonRDD[29] at RDD at PythonRDD.scala:53


# Creating DataFrames for artists, user-artist interactions, and alias mappings. Establishing aliases for convenience in later joins.

In [15]:
artist_df = spark.read.option("sep","\t").csv("artist_data_small.txt", schema=schema_artist)
user_artist_df = spark.read.option("sep"," ").csv("user_artist_data_small.txt", schema=schema_user_artist)
alias_df = spark.read.option("sep","\t").csv("artist_alias_small.txt", schema=schema_alias)


In [16]:
ua = user_artist_df.alias('ua')
ub = artist_df.alias('ub')

# Splitting the user-artist interactions into training and testing sets using an 80/20 ratio. Casting userId and artistId to integers and playCount to floats for ALS compatibility.

In [17]:
(training, test) = ua.randomSplit([0.8, 0.2])

In [19]:
training.printSchema()

root
 |-- userId: string (nullable = true)
 |-- artistId: string (nullable = true)
 |-- playCount: string (nullable = true)



In [20]:
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType, FloatType

training = training.select(
    col("userId").cast(IntegerType()),
    col("artistId").cast(IntegerType()),
    col("playCount").cast(FloatType())
)

test = test.select(
    col("userId").cast(IntegerType()),
    col("artistId").cast(IntegerType()),
    col("playCount").cast(FloatType())
)


# Initializing the ALS model with implicit feedback, specifying the user, item, and rating columns. Setting the cold start strategy to drop unknown users or items during prediction.

In [21]:
als = ALS(
    maxIter=5,
    implicitPrefs=True,
    userCol="userId",
    itemCol="artistId",
    ratingCol="playCount",
    coldStartStrategy="drop"
)

model = als.fit(training)
predictions = model.transform(test)
predictions.show()


+-------+--------+---------+-----------+
| userId|artistId|playCount| prediction|
+-------+--------+---------+-----------+
|2007381| 1000010|    805.0| -0.5076301|
|2007381| 1000130|      3.0|-0.37181824|
|2007381| 1000183|    133.0|   0.647008|
|2007381| 1000199|    175.0|  0.5723726|
|2007381| 1000243|     16.0| -0.7427173|
|2007381| 1000602|     96.0| 0.24970129|
|2007381| 1000698|    216.0|-0.07490238|
|2007381| 1000982|   1133.0| -0.4148789|
|2007381| 1001048|   1796.0|  0.7960715|
|2007381| 1001130|     52.0|0.008230289|
|2007381| 1001478|     97.0|  0.3162734|
|2007381| 1001597|     77.0|  1.2141029|
|2007381| 1002178|    109.0| 0.22540614|
|2007381| 1002576|    378.0|-0.20048225|
|2007381| 1002862|    964.0| 0.18182719|
|2007381| 1003447|     96.0| 0.48031384|
|2007381| 1003469|    271.0|0.074024215|
|2007381| 1003552|   1965.0| -0.1754109|
|2007381| 1003729|    535.0| 0.81503373|
|2007381| 1004055|     66.0| 0.15560623|
+-------+--------+---------+-----------+
only showing top

# Demonstrating the final recommendations for a sample user, showing artists the model predicts they might enjoy, which may differ from their current top plays

In [22]:
def currentLikes(ua,ub,userId,limit):
 df = ua.join(ub,ua.artistId==ub.artistId)\
 .filter(ua.userId==userId)\
 .sort(ua.playCount.desc())\
 .select(ua.userId,ua.playCount,ub.artistName)\
 .limit(limit)
 return df

# display top 10 liked artists for user 2062243
currentLikes(ua,ub,2062243,10).show(truncate=False)

+-------+---------+----------------------------+
|userId |playCount|artistName                  |
+-------+---------+----------------------------+
|2062243|99       |morgan heritage             |
|2062243|98       |Mr C The Slide Man          |
|2062243|98       |Music 205lub                |
|2062243|98       |The Moody Blues             |
|2062243|98       |Mountain                    |
|2062243|98       |La Bouche                   |
|2062243|98       |Moxy Fr√ºvous                |
|2062243|98       |Music 205olf                |
|2062243|98       |Music 205n                  |
|2062243|98       |Music 205tills, Nash & Young|
+-------+---------+----------------------------+



In [39]:
from pyspark.sql import Row
from pyspark.sql.functions import col

def recommendedArtists(userId, limit):
    # Getting  top recommended artists for the user
    test = model.recommendForAllUsers(limit) \
                .filter(col('userId') == userId) \
                .select("recommendations") \
                .collect()

    if not test:  # handle case where user has no recommendations
        return None

    # Extracting artistid
    topArtists = [item.artistId for item in test[0][0]]  # list of ints

    # Create dataframe from list
    artists_df = spark.createDataFrame([Row(artistId=a) for a in topArtists])

    # Joining artist datafrmae to get names
    final = artists_df.join(ub, artists_df.artistId == ub.artistId) \
                      .select(ub.artistId, ub.artistName)

    return final


In [40]:
recommendedArtists(2010581, 10).show(truncate=False)


+--------+----------------------+
|artistId|artistName            |
+--------+----------------------+
|1002840 |The Blood Brothers    |
|1010373 |Explosions in the Sky |
|2178    |The Velvet Underground|
|1000416 |Ramones               |
|2745    |Ladytron              |
|1010646 |!!!                   |
|1233610 |The Rapture           |
|5771    |Stiff Little Fingers  |
|1991    |Cake                  |
|1252    |Travis                |
+--------+----------------------+

