## Content-Based Filtering with PySpark

In this section, we explore content-based filtering using the built in PySpark. We use a dataset which includes information about music artists, their associated tags, and how users interacted with them. We aim to generate artist recommendations for users based on the tags associated with artists they have interacted with. This involves loading the data, creating profiles for the artists, vectorising the tags, calculating similarities and finally generating recommendations based on those similarities.

# Chapter 0: Importing Required Libraries

The key libraries used are from PySpark, which is ideal for handling large datasets efficiently.

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.ml.feature import CountVectorizer, IDF
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.stat import Correlation
from pyspark.mllib.linalg.distributed import RowMatrix

# Chapter 1: Loading Our Data

First we initialise our Spark session. Initialising a Spark session is required for any operation in PySpark. This session allows us to leverage PySpark's machine learning capabalities. 

In [2]:
spark = SparkSession.builder \
    .appName("Content-Based Filtering") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/11/22 12:32:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Then, we define the Base Directory for our dataset files. This allows us to load the data for further processing. 

In [3]:
base_directory = "/Users/harrywilson/Desktop/DataScienceToolbox/Assessment2Data"

Finally, we load our datasets. This includes information about artists, their tags and user interactions. In this step, we also read our data into Spark DataFrames.

In [4]:
# Function to load a .dat file as a DataFrame
def load_data(filename):
    file_path = f"{base_directory}/{filename}"
    return spark.read.csv(file_path, sep="\t", header=True, inferSchema=True)

# Load datasets
user_artists_df = load_data("user_artists.dat")
artists_df = load_data("artists.dat")
tags_df = load_data("tags.dat")
user_taggedartists_df = load_data("user_taggedartists.dat")

# Display a few rows from the datasets to ensure everything is working
print("User-Artists Dataset:")
user_artists_df.show(5)

print("Artists Dataset:")
artists_df.show(5)

print("Tags Dataset:")
tags_df.show(5)

print("User-Tagged Artists Dataset:")
user_taggedartists_df.show(5)


                                                                                

User-Artists Dataset:
+------+--------+------+
|userID|artistID|weight|
+------+--------+------+
|     2|      51| 13883|
|     2|      52| 11690|
|     2|      53| 11351|
|     2|      54| 10300|
|     2|      55|  8983|
+------+--------+------+
only showing top 5 rows

Artists Dataset:
+---+-----------------+--------------------+--------------------+
| id|             name|                 url|          pictureURL|
+---+-----------------+--------------------+--------------------+
|  1|     MALICE MIZER|http://www.last.f...|http://userserve-...|
|  2|  Diary of Dreams|http://www.last.f...|http://userserve-...|
|  3|Carpathian Forest|http://www.last.f...|http://userserve-...|
|  4|     Moi dix Mois|http://www.last.f...|http://userserve-...|
|  5|      Bella Morte|http://www.last.f...|http://userserve-...|
+---+-----------------+--------------------+--------------------+
only showing top 5 rows

Tags Dataset:
+-----+-----------------+
|tagID|         tagValue|
+-----+-----------------+


# Chapter 2: Creating Artist Profiles

This first step involves creating profiles, based on the tags associated with each artist. We aggregate the tags to form this profile.

In [5]:
# Join user_taggedartists with tags for tag information
artist_tags_df = user_taggedartists_df.join(tags_df, on="tagID", how="inner")

# Join artist tags with artists to get artist details and tag names
artist_tags_info_df = artist_tags_df.join(
    artists_df, artist_tags_df.artistID == artists_df.id
).select(
    artist_tags_df["artistID"],
    artists_df["name"].alias("artist_name"),
    artist_tags_df["tagValue"].alias("tag")
)

# Aggregate tags for each artist into a list and remove duplicates
artist_profiles_df = artist_tags_info_df.groupBy("artistID", "artist_name") \
    .agg(F.collect_set("tag").alias("tags"))

# Display artist profiles
artist_profiles_df.show(5, truncate=False)

[Stage 14:>                                                         (0 + 1) / 1]

+--------+-----------------+---------------------------------------------------------------------------------------------------------------+
|artistID|artist_name      |tags                                                                                                           |
+--------+-----------------+---------------------------------------------------------------------------------------------------------------+
|1       |MALICE MIZER     |[j-rock, weeabo, jrock, visual kei, better than lady gaga, gothic, japanese]                                   |
|2       |Diary of Dreams  |[darkwave, ambient, true goth emo, gothic rock, gothic, industrial, dark, german, seen live, electronic, vocal]|
|3       |Carpathian Forest|[norsk arysk metal, very kvlt, black metal, saxophones, true norwegian black metal, norwegian black metal]     |
|4       |Moi dix Mois     |[j-rock, rock, visual kei, bazarov, gothic, metal, gothic japanese, gothic metal, japanese]                    |
|5       |Bel

                                                                                

To apply similarity calculation, which is vital for content based filtering, we need to convert the tags into numerical interpretations. Therefore, we use CountVectorizer, this creates a vector of word counts for each artist's tags.

In [6]:
# Ensure tag_text is an array of strings
tags_df = artist_profiles_df.select(
    F.col("artistID"),
    F.col("artist_name"),
    F.col("tags").alias("tag_text")
)

# Vectorise tags
vectoriser = CountVectorizer(inputCol="tag_text", outputCol="raw_features")
vectorised_model = vectoriser.fit(tags_df)
vectorised_df = vectorised_model.transform(tags_df)

# Display vectorized features
vectorised_df.show(5, truncate=False)


[Stage 28:>                                                         (0 + 1) / 1]

+--------+-----------------+---------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------+
|artistID|artist_name      |tag_text                                                                                                       |raw_features                                                                               |
+--------+-----------------+---------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------+
|1       |MALICE MIZER     |[j-rock, weeabo, jrock, visual kei, better than lady gaga, gothic, japanese]                                   |(9718,[92,95,220,255,427,760,2520],[1.0,1.0,1.0,1.0,1.0,1.0,1.0])                          |
|2       |Diary of Dreams  |[darkwave, ambient, true goth emo, gothi

                                                                                

For the raw_features column, we have the format (total_features, [indices], [values]). Where:
total_features: total number of unique tags in the dataset
indices: Indices of non-zero features (tags present for the artist)
values: Corresponding count of each tag (as we got rid of duplicates, these values are all 1)


Next, we compute the Term Frequency - Inverse Document Frequency (TF-IDF). This assesses the importance of each tag within an artist's profile.

In [7]:
# Compute TF-IDF
idf = IDF(inputCol="raw_features", outputCol="features")
idf_model = idf.fit(vectorised_df)
tfidf_df = idf_model.transform(vectorised_df)

# Display TF-IDF features
tfidf_df.show(5, truncate=False)

[Stage 38:>                                                         (0 + 1) / 1]

+--------+-----------------+---------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|artistID|artist_name      |tag_text                                                                                                       |raw_features                                                                               |features                                                                                                                                                                                                                                                |
+--------+-----------------+--------

                                                                                

Now, we normalise the feature vectors. Here, we are ensuring all features are on the same scale. We use MinMaxScaler to normalise the tag vectors. This is vital for similarity calculation.

In [8]:

# Normalise the feature vectors
scaler = MinMaxScaler(inputCol="features", outputCol="scaled_features")
scaler_model = scaler.fit(tfidf_df)
scaled_tfidf_df = scaler_model.transform(tfidf_df)

# Display scaled features
scaled_tfidf_df.show(5, truncate=False)


[Stage 51:>                                                         (0 + 1) / 1]

+--------+-----------------+---------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------+
|artistID|artist_name      |tag_text                                                                                                       |raw_features                                                                               |features                                                                                                                                                            

                                                                                

# Chapter : Similarities Between Artists

We can compute cosine similarity between artists. This is done using RowMatrix and columnSimilarities.

In [9]:

# Convert the scaled features column to RDD of dense vectors
row_matrix_rdd = scaled_tfidf_df.select("scaled_features").rdd.map(lambda row: row.scaled_features.toArray())

# Create a RowMatrix from the RDD
row_matrix = RowMatrix(row_matrix_rdd)

# Compute pairwise cosine similarities
similarities = row_matrix.columnSimilarities()

# Convert the similarities result back to a DataFrame for better readability
similarities_df = similarities.entries.toDF(["artistID_1", "artistID_2", "similarity"])

# Show the top 5 results
similarities_df.show(5, truncate=False)


24/11/22 12:34:14 WARN Executor: Managed memory leak detected; size = 79008692 bytes, task 0.0 in stage 63.0 (TID 49)
[Stage 66:>                                                         (0 + 1) / 1]

+----------+----------+--------------------+
|artistID_1|artistID_2|similarity          |
+----------+----------+--------------------+
|261       |1529      |0.1270001270001905  |
|65        |673       |0.02369561801910072 |
|742       |758       |0.1739130434782609  |
|233       |1265      |0.03603749850782236 |
|65        |2431      |0.030206104666508846|
+----------+----------+--------------------+
only showing top 5 rows



24/11/22 12:34:19 WARN Executor: Managed memory leak detected; size = 79008692 bytes, task 0.0 in stage 66.0 (TID 50)
                                                                                

In [10]:
# Join user interactions with artist similarity data
user_artist_df = user_artists_df.join(artists_df, user_artists_df.artistID == artists_df.id).select(
    user_artists_df["userID"], 
    user_artists_df["artistID"]
)

# Join user interactions with artist similarity data
user_recommendations = user_artist_df.join(
    similarities_df,
    user_artist_df.artistID == similarities_df.artistID_1
).groupBy("userID", "artistID_2") \
 .agg(F.mean("similarity").alias("avg_similarity")) \
 .orderBy("userID", "avg_similarity", ascending=False)

# Display recommendations
user_recommendations.show(50, truncate=False)

24/11/22 12:34:37 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 12:34:37 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 12:34:37 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 12:34:37 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 12:34:41 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 12:34:41 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 12:34:41 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 12:34:41 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
24/11/22 12:34:45 WARN RowBasedKeyValueBatch: Calling spill() on

+------+----------+------------------+
|userID|artistID_2|avg_similarity    |
+------+----------+------------------+
|2100  |6333      |1.0               |
|2100  |6864      |1.0               |
|2100  |8703      |1.0               |
|2100  |4932      |0.7071067811865475|
|2100  |6826      |0.7071067811865475|
|2100  |6511      |0.7071067811865475|
|2100  |5856      |0.7071067811865475|
|2100  |7962      |0.7071067811865475|
|2100  |4889      |0.7071067811865475|
|2100  |9109      |0.7071067811865475|
|2100  |4307      |0.7071067811865475|
|2100  |9166      |0.7071067811865475|
|2100  |5020      |0.7071067811865475|
|2100  |6874      |0.7071067811865475|
|2100  |5142      |0.7071067811865475|
|2100  |4993      |0.7071067811865475|
|2100  |5227      |0.7071067811865475|
|2100  |7001      |0.7071067811865475|
|2100  |5691      |0.7071067811865475|
|2100  |5531      |0.7071067811865475|
|2100  |5780      |0.7071067811865475|
|2100  |7663      |0.7071067811865475|
|2100  |6506      |0.7071

                                                                                

In [11]:
# Input a user ID
input_user_id = 5

# Get artists the user has already interacted with
interacted_artists = user_artists_df.filter(F.col("userID") == input_user_id).select("artistID")

# Collect artistIDs as a list for filtering
interacted_artist_ids = [row["artistID"] for row in interacted_artists.collect()]

# Generate recommendations for the user
user_recommendations_filtered = (
    user_artist_df.filter(F.col("userID") == input_user_id)
    .join(similarities_df, user_artist_df.artistID == similarities_df.artistID_1)
    .filter(~F.col("artistID_2").isin(interacted_artist_ids))  # Exclude already interacted artists
    .groupBy("userID", "artistID_2")
    .agg(F.mean("similarity").alias("avg_similarity"))
    .orderBy(F.col("avg_similarity").desc())  # Ensure descending order
)

# Add artist names to the recommendations
user_recommendations_with_names = (
    user_recommendations_filtered
    .join(artists_df, user_recommendations_filtered.artistID_2 == artists_df.id, how="inner")
    .select(
        "userID",
        "artistID_2",
        F.col("name").alias("artist_name"),
        "avg_similarity"
    )
    .orderBy(F.col("avg_similarity").desc())  # Reconfirm descending order for display
)

# Display the recommendations with artist names
user_recommendations_with_names.show(10, truncate=False)


[Stage 86:>                                                         (0 + 1) / 1]

+------+----------+-----------------------+-------------------+
|userID|artistID_2|artist_name            |avg_similarity     |
+------+----------+-----------------------+-------------------+
|5     |995       |China Crisis           |0.43643578047198495|
|5     |93        |Jean-Michel Jarre      |0.4220981192619431 |
|5     |1129      |Villa cariño           |0.4163331998932266 |
|5     |1130      |Ludwig van Beethoven   |0.4163331998932266 |
|5     |908       |Vanessa Hudgens        |0.3920784235278428 |
|5     |1283      |Blue Planet Corporation|0.3829708431025352 |
|5     |1324      |A7ie                   |0.3829708431025352 |
|5     |1196      |Suicide Silence        |0.36666666666666664|
|5     |1396      |Digitalism             |0.3651483716701107 |
|5     |1331      |Accessory              |0.34815531191139565|
+------+----------+-----------------------+-------------------+
only showing top 10 rows



                                                                                