# Load Data

In [14]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
# Initialize Spark session
spark = SparkSession.builder.appName("ReadUserArtistData").getOrCreate()

# Read the file into a Spark DataFrame
file_path = "data/dataset-problemset5-ex7/user_artist_data_small.txt"
df = spark.read.csv(file_path, sep=' ', inferSchema=True, header=False)
df = df.toDF("user_id", "artist_id", "playcount")

df_alias = spark.read.csv("data/dataset-problemset5-ex7/artist_alias_small.txt", sep='\t', inferSchema=True, header=False)
df_alias = df_alias.toDF("bad_id", "good_id")

df_names = spark.read.csv("data/dataset-problemset5-ex7/artist_data_small.txt", sep='\t', inferSchema=True, header=False)
df_names = df_names.toDF("artist_id", "artist_name")

df.show()
print(f"Number of rows in df: {df.count()}")
df.printSchema()
df_alias.show()
df_names.show()


+-------+---------+---------+
|user_id|artist_id|playcount|
+-------+---------+---------+
|1059637|  1000010|      238|
|1059637|  1000049|        1|
|1059637|  1000056|        1|
|1059637|  1000062|       11|
|1059637|  1000094|        1|
|1059637|  1000112|      423|
|1059637|  1000113|        5|
|1059637|  1000114|        2|
|1059637|  1000123|        2|
|1059637|  1000130|    19129|
|1059637|  1000139|        4|
|1059637|  1000241|      188|
|1059637|  1000263|      180|
|1059637|  1000289|        2|
|1059637|  1000305|        1|
|1059637|  1000320|       21|
|1059637|  1000340|        1|
|1059637|  1000427|       20|
|1059637|  1000428|       12|
|1059637|  1000433|       10|
+-------+---------+---------+
only showing top 20 rows

Number of rows in df: 49481
root
 |-- user_id: integer (nullable = true)
 |-- artist_id: integer (nullable = true)
 |-- playcount: integer (nullable = true)

+-------+-------+
| bad_id|good_id|
+-------+-------+
|1027859|1252408|
|1017615|    668|
|67458

# Functions

In [None]:
def clean_df(df):
    df_cleaned = (df.join(df_alias, df.artist_id == df_alias.bad_id, "left")
                   .withColumn("actual_id", F.coalesce(df_alias.good_id, df.artist_id))
                   .drop("bad_id", "artist_id", "good_id")
                   .withColumnRenamed("actual_id", "artist_id")
        )
    return df_cleaned
# Test
df.filter(F.col("artist_id") == 1027859).show()
df.filter(F.col("artist_id") == 1252408).show()
df.groupBy().agg(
    F.count("playcount"),
    F.countDistinct("artist_id"),
    F.countDistinct("user_id")
).show()
cleaned_df = clean_df(df)
cleaned_df.filter(F.col("artist_id") == 1027859).show()
cleaned_df.filter(F.col("artist_id") == 1252408).show()
cleaned_df.groupBy().agg(
    F.count("playcount"),
    F.countDistinct("artist_id"),
    F.countDistinct("user_id")
).show()
num_distinct_artists = cleaned_df.select("artist_id").distinct().count()
print(f"Number of distinct artist_ids: {num_distinct_artists}")

+-------+---------+---------+
|user_id|artist_id|playcount|
+-------+---------+---------+
|1072684|  1027859|       31|
+-------+---------+---------+

+-------+---------+---------+
|user_id|artist_id|playcount|
+-------+---------+---------+
|1059334|  1252408|       10|
|1072684|  1252408|       12|
+-------+---------+---------+



In [None]:
def calc_pearson_sim(df):
    df_avg = df.groupBy("user_id").agg((F.sum("playcount") / num_distinct_artists).alias("avg_playcount"))

    # Calculate the deviation from the average for each user
    df_dev = df.join(df_avg, "user_id", "left").withColumn("deviation", F.col("playcount") - F.col("avg_playcount"))
    # Calculate the norm for each user
    df_norm = df_dev.groupBy("user_id").agg(F.sqrt(F.sum(F.pow(F.col("deviation"), 2))).alias("norm"))
    # Calculate the dot product between users
    # Step 1: Self-join on artist_id to find user pairs
    user_artist_df = (
        df_dev.alias("a")
        .join(df_dev.alias("b"), on="artist_id")
        .where(F.col("a.user_id") < F.col("b.user_id"))  # Ensure unique user pairs
        .select(
            F.col("a.user_id").alias("user_id_1"),
            F.col("b.user_id").alias("user_id_2"),
            (F.col("a.playcount") * F.col("b.playcount")).alias("dot_product")
        )
    )

    # Step 2: Aggregate dot product for each user pair
    user_dot_product = (
        user_artist_df.groupBy("user_id_1", "user_id_2")
        .agg(F.sum("dot_product").alias("dot"))
    )
    # Calculate the similarity between users
    df_similarity = user_dot_product.join(df_norm, F.col("user_id_1") == F.col("user_id")).drop("user_id").withColumnRenamed("norm", "norm_1")
    df_similarity = df_similarity.join(df_norm, F.col("user_id_2") == F.col("user_id")).drop("user_id")
    df_similarity = df_similarity.toDF("user_id_1", "user_id_2", "dot", "norm_1", "norm_2")

    df_similarity = df_similarity.withColumn("similarity", F.col("dot") / (F.col("norm_1") * F.col("norm_2")))

    return df_similarity

test = calc_pearson_sim(cleaned_df)
test.orderBy("similarity", ascending= True).show()
test.groupBy().agg(
    F.max(F.col("similarity")),
    F.min(F.col("similarity")),
    F.avg(F.col("similarity"))
).show


+---------+---------+----+------------------+------------------+--------------------+
|user_id_1|user_id_2| dot|            norm_1|            norm_2|          similarity|
+---------+---------+----+------------------+------------------+--------------------+
|  1021501|  2064012|  14|15172.362307216808|440754.73664322274|2.093523511368315...|
|  1021501|  1059637|  57|15172.362307216808| 460903.8186157603|8.151008471758954E-9|
|  1029563|  2017397|  41|10483.659436474723|  42752.9135384926|9.147559348816185E-8|
|  2023742|  2064012|3108| 19610.64330866105|440754.73664322274|3.595772290757283...|
|  1021501|  1029563|  63|15172.362307216808|10483.659436474723|3.960722725450514E-7|
|  1059334|  2010581|  25| 3355.023415553227|14522.475528588566| 5.13102109044555E-7|
|  2010581|  2014936| 525|14522.475528588566|41019.732199988044|8.813041787413062E-7|
|  1059334|  2017397| 130| 3355.023415553227|  42752.9135384926|9.063210777646851E-7|
|  1021501|  1059245| 247|15172.362307216808|14396.363

<bound method DataFrame.show of DataFrame[max(similarity): double, min(similarity): double, avg(similarity): double]>

In [None]:
def get_top_k_similar_users(df_similarity, user_id, k):
    # Filter the dataframe to get similarities for the given user
    df_user_similarity = df_similarity.filter((df_similarity.user_id_1 == user_id) | (df_similarity.user_id_2 == user_id))

    # Select the relevant columns and order by similarity in descending order
    df_user_similarity = df_user_similarity.withColumn("similar_user_id",
                                                       F.when(df_similarity.user_id_1 == user_id, df_similarity.user_id_2)
                                                       .otherwise(df_similarity.user_id_1))
    df_user_similarity = df_user_similarity.select("similar_user_id", "similarity").orderBy(F.desc("similarity"))

    top_k_similar_users = df_user_similarity.limit(k)

    return top_k_similar_users


In [None]:
def create_df_utility(df_clean):
    df_utility = df_clean.groupBy("artist_id").pivot("user_id").agg(F.first("playcount")).fillna(0)
    return df_utility

cleaned_df.show()
create_df_utility(cleaned_df).show()

+-------+---------+---------+
|user_id|playcount|artist_id|
+-------+---------+---------+
|1059637|      238|  1000010|
|1059637|        1|  1000049|
|1059637|        1|  1000056|
|1059637|       11|  1000062|
|1059637|        1|  1000094|
|1059637|      423|  1000112|
|1059637|        5|  1000113|
|1059637|        2|  1000114|
|1059637|        2|  1000123|
|1059637|    19129|  1000130|
|1059637|        4|  1000139|
|1059637|      188|  1000241|
|1059637|      180|  1000263|
|1059637|        2|  1000289|
|1059637|        1|  1000305|
|1059637|       21|  1000320|
|1059637|        1|  1000340|
|1059637|       20|  1000427|
|1059637|       12|  1000428|
|1059637|       10|  1000433|
+-------+---------+---------+
only showing top 20 rows



24/12/02 11:11:02 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+---------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|artist_id|1000647|1001440|1007308|1009943|1017610|1021501|1021940|1024631|1026084|1029563|1031009|1035511|1041919|1042223|1046559|1047812|1048402|1052054|1052461|1055449|1058890|1059245|1059334|1059637|1059765|1063644|1070641|1070932|1072684|1073421|1076906|2000668|2005710|2007381|2010008|2010581|2014936|2017397|2020513|2023686|2023742|2023977|2030069|2062243|2064012|2069337|2069889|2070757|2102019|2288164|
+---------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-----

In [None]:


def predict_artists(df_clean, user_id, k):
    df_similarity = calc_pearson_sim(df_clean)
    df_similarity.show()
    top_k_similar_users = get_top_k_similar_users(df_similarity, user_id, k)
    top_k_similar_users_dict = {row['similar_user_id']: row['similarity'] for row in top_k_similar_users.collect()}
    top_k_similar_users.show()
    df_utility = create_df_utility(df_clean)
    similar_user_ids = [row.similar_user_id for row in top_k_similar_users.collect()]
    selected_cols = [col for col in df_utility.columns if col == "artist_id" or int(col) in similar_user_ids]
    df_utility_selected = df_utility.select(selected_cols)
    df_utility_selected = df_utility_selected.withColumn("prediction", sum(df_utility_selected[col] * top_k_similar_users_dict[int(col)] for col in df_utility_selected.columns if col != "artist_id"))
    df_prediction = df_utility_selected.orderBy("prediction", ascending=False).limit(20)
    return df_prediction
df_cleaned = clean_df(df)
test = predict_artists(df_cleaned, 1059637, 5)

test.show()

+---------+---------+---------+------------------+------------------+--------------------+
|user_id_1|user_id_2|      dot|            norm_1|            norm_2|          similarity|
+---------+---------+---------+------------------+------------------+--------------------+
|  1059765|  2007381|   293918|10106.283090327554|10255.900178571626|0.002835704284334...|
|  1031009|  2007381|111411000|27630.654150418977|10255.900178571626|   0.393154381781434|
|  1046559|  2007381|   603012|14581.818850390428|10255.900178571626|0.004032185196518931|
|  1052461|  2007381| 35605044| 35869.42508749899|10255.900178571626| 0.09678617475449397|
|  1017610|  2007381|   692258| 4960.309336950912|10255.900178571626|0.013607722279122686|
|  1063644|  2007381|   345799|  8440.68773156796|10255.900178571626| 0.00399458911523962|
|  1026084|  2007381|   703003|  4176.01578173489|10255.900178571626| 0.01641425824583619|
|  1009943|  2007381|   993857|11378.845571471757|10255.900178571626| 0.00851631883669535|

In [None]:
#a) Populate a utility matrix from the data with user cols and artist rows
df_cleaned = clean_df(df)
df_cleaned.show()

df_utility = create_df_utility(df_cleaned)
df_utility.show()

+-------+---------+---------+
|user_id|playcount|artist_id|
+-------+---------+---------+
|1059637|      238|  1000010|
|1059637|        1|  1000049|
|1059637|        1|  1000056|
|1059637|       11|  1000062|
|1059637|        1|  1000094|
|1059637|      423|  1000112|
|1059637|        5|  1000113|
|1059637|        2|  1000114|
|1059637|        2|  1000123|
|1059637|    19129|  1000130|
|1059637|        4|  1000139|
|1059637|      188|  1000241|
|1059637|      180|  1000263|
|1059637|        2|  1000289|
|1059637|        1|  1000305|
|1059637|       21|  1000320|
|1059637|        1|  1000340|
|1059637|       20|  1000427|
|1059637|       12|  1000428|
|1059637|       10|  1000433|
+-------+---------+---------+
only showing top 20 rows

+---------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+-------+---

In [None]:
#b) Calculate the Pearson similarity between users
df_similarity = calc_pearson_sim(df_cleaned)
df_similarity.show()

df_pivot = df_similarity.groupBy("user_id_1").pivot("user_id_2").agg(F.first("similarity")).fillna(0)
df_pivot.show()

+---------+---------+---------+------------------+------------------+--------------------+
|user_id_1|user_id_2|      dot|            norm_1|            norm_2|          similarity|
+---------+---------+---------+------------------+------------------+--------------------+
|  1059765|  2007381|   293918|10106.283090327554|10255.900178571626|0.002835704284334...|
|  1031009|  2007381|111411000|27630.654150418977|10255.900178571626|   0.393154381781434|
|  1046559|  2007381|   603012|14581.818850390428|10255.900178571626|0.004032185196518931|
|  1052461|  2007381| 35605044| 35869.42508749899|10255.900178571626| 0.09678617475449397|
|  1017610|  2007381|   692258| 4960.309336950912|10255.900178571626|0.013607722279122686|
|  1063644|  2007381|   345799|  8440.68773156796|10255.900178571626| 0.00399458911523962|
|  1026084|  2007381|   703003|  4176.01578173489|10255.900178571626| 0.01641425824583619|
|  1009943|  2007381|   993857|11378.845571471757|10255.900178571626| 0.00851631883669535|

In [None]:
# c)
user_id = 1055449
k = 5
df_similarity.show()
top_k_similar_users = get_top_k_similar_users(df_similarity, user_id, k)
top_k_similar_users.show()

+---------+---------+---------+------------------+------------------+--------------------+
|user_id_1|user_id_2|      dot|            norm_1|            norm_2|          similarity|
+---------+---------+---------+------------------+------------------+--------------------+
|  1059765|  2007381|   293918|10106.283090327554|10255.900178571626|0.002835704284334...|
|  1031009|  2007381|111411000|27630.654150418977|10255.900178571626|   0.393154381781434|
|  1046559|  2007381|   603012|14581.818850390428|10255.900178571626|0.004032185196518931|
|  1052461|  2007381| 35605044| 35869.42508749899|10255.900178571626| 0.09678617475449397|
|  1017610|  2007381|   692258| 4960.309336950912|10255.900178571626|0.013607722279122686|
|  1063644|  2007381|   345799|  8440.68773156796|10255.900178571626| 0.00399458911523962|
|  1026084|  2007381|   703003|  4176.01578173489|10255.900178571626| 0.01641425824583619|
|  1009943|  2007381|   993857|11378.845571471757|10255.900178571626| 0.00851631883669535|

In [None]:
# d)
new_user_id = 1
#Check if there is no user with id 1 in df
#df.filter(df.user_id == new_user_id).show()

favorite_artists = df_names.filter(df_names.artist_id.isin([1240105, 6671632, 1002584, 1241016, 1244581]))

#add a rows to df and name the new df df_with_user the user_id is new_user_id and the playcount is 20 for all artists in favorite_artists, artist_id is the artist_id from favorite_artists
new_user_entries = favorite_artists.withColumn("user_id", F.lit(new_user_id)).withColumn("playcount", F.lit(20)).drop("artist_name").select("user_id", "artist_id", "playcount")
new_user_entries.show()
new_user_entries.filter(F.col("user_id") == 1).show()
df.show()
df_with_user = df.unionByName(new_user_entries)
df_with_user.filter(F.col("user_id") == 1).show()
df_wu_clean = clean_df(df_with_user)
df_wu_clean.show()
df_wu_clean.filter(F.col("user_id") == 1).show()
#Recommendation new user
k = 20
rec_new_artists = predict_artists(df_wu_clean, new_user_id, 20)
rec_new_artists.show()

+-------+---------+---------+
|user_id|artist_id|playcount|
+-------+---------+---------+
|      1|  1240105|       20|
|      1|  6671632|       20|
|      1|  1002584|       20|
|      1|  1241016|       20|
|      1|  1244581|       20|
+-------+---------+---------+

+-------+---------+---------+
|user_id|artist_id|playcount|
+-------+---------+---------+
|      1|  1240105|       20|
|      1|  6671632|       20|
|      1|  1002584|       20|
|      1|  1241016|       20|
|      1|  1244581|       20|
+-------+---------+---------+

+-------+---------+---------+
|user_id|artist_id|playcount|
+-------+---------+---------+
|1059637|  1000010|      238|
|1059637|  1000049|        1|
|1059637|  1000056|        1|
|1059637|  1000062|       11|
|1059637|  1000094|        1|
|1059637|  1000112|      423|
|1059637|  1000113|        5|
|1059637|  1000114|        2|
|1059637|  1000123|        2|
|1059637|  1000130|    19129|
|1059637|  1000139|        4|
|1059637|  1000241|      188|
|1059637

In [None]:
# d)
user_id = 1029563

df.show()
df_cleaned = clean_df(df_cleaned)
df_cleaned.show()
df_cleaned.filter(F.col("user_id") == user_id).show()
#Recommendation new user
k = 5
rec_new_artists = predict_artists(df_cleaned, user_id, k)
rec_new_artists.show()

+-------+---------+---------+
|user_id|artist_id|playcount|
+-------+---------+---------+
|1059637|  1000010|      238|
|1059637|  1000049|        1|
|1059637|  1000056|        1|
|1059637|  1000062|       11|
|1059637|  1000094|        1|
|1059637|  1000112|      423|
|1059637|  1000113|        5|
|1059637|  1000114|        2|
|1059637|  1000123|        2|
|1059637|  1000130|    19129|
|1059637|  1000139|        4|
|1059637|  1000241|      188|
|1059637|  1000263|      180|
|1059637|  1000289|        2|
|1059637|  1000305|        1|
|1059637|  1000320|       21|
|1059637|  1000340|        1|
|1059637|  1000427|       20|
|1059637|  1000428|       12|
|1059637|  1000433|       10|
+-------+---------+---------+
only showing top 20 rows

+-------+---------+---------+
|user_id|playcount|artist_id|
+-------+---------+---------+
|1059637|      238|  1000010|
|1059637|        1|  1000049|
|1059637|        1|  1000056|
|1059637|       11|  1000062|
|1059637|        1|  1000094|
|1059637|     