In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Start a Spark session
spark = SparkSession.builder.appName("UserRecommendationSystem").getOrCreate()

# Load the dataset
ratings = spark.read.csv("dataset_music.csv", header=True, inferSchema=True)

# Drop unnecessary or null columns
ratings_cleaned = ratings.na.drop(subset=["user_id", "popularity"])

# Ensure correct data types
ratings_cleaned = ratings_cleaned.withColumn("user_id", col("user_id").cast("integer")) \
                                 .withColumn("popularity", col("popularity").cast("float"))


In [2]:
train_data, test_data = ratings_cleaned.randomSplit([0.8, 0.2], seed=42)


In [3]:
from pyspark.ml.recommendation import ALS

als = ALS(
    userCol="user_id",
    itemCol="user_id",  # Using user-to-user recommendations
    ratingCol="popularity",
    nonnegative=True,
    coldStartStrategy="drop"
)

model = als.fit(train_data)


In [4]:
predictions = model.transform(test_data)
predictions.show()

# Generate recommendations for all users
user_recommendations = model.recommendForAllUsers(10)
user_recommendations.show(truncate=False)


+-------+--------+-------+----------+----------+----------+-----------------+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-----+--------------+-----------+----------+
|user_id|track_id|artists|album_name|track_name|popularity|duration_listened|explicit|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|tempo|time_signature|track_genre|prediction|
+-------+--------+-------+----------+----------+----------+-----------------+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-----+--------------+-----------+----------+
+-------+--------+-------+----------+----------+----------+-----------------+--------+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-----+--------------+-----------+----------+



AnalysisException: [AMBIGUOUS_REFERENCE] Reference `user_id` is ambiguous, could be: [`user_id`, `user_id`].

In [5]:
from pyspark.sql.functions import monotonically_increasing_id

# Generate a unique item_id
ratings_cleaned = ratings_cleaned.withColumn("item_id", monotonically_increasing_id())


In [6]:
ratings_final = ratings_cleaned.select("user_id", "item_id", "popularity")


In [7]:
train_data, test_data = ratings_final.randomSplit([0.8, 0.2], seed=42)


In [8]:
als = ALS(
    userCol="user_id",
    itemCol="item_id",
    ratingCol="popularity",
    nonnegative=True,
    coldStartStrategy="drop"
)
model = als.fit(train_data)


In [9]:
predictions = model.transform(test_data)
predictions.show()


+-------+-------+----------+----------+
|user_id|item_id|popularity|prediction|
+-------+-------+----------+----------+
+-------+-------+----------+----------+



In [10]:
user_recommendations = model.recommendForAllUsers(10)
user_recommendations.show(truncate=False)


+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|user_id|recommendations                                                                                                                                                                        |
+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|28     |[{149, 0.0}, {139, 0.0}, {127, 0.0}, {126, 0.0}, {125, 0.0}, {122, 0.0}, {119, 0.0}, {115, 0.0}, {113, 0.0}, {109, 0.0}]                                                               |
|31     |[{149, 0.0}, {139, 0.0}, {127, 0.0}, {126, 0.0}, {125, 0.0}, {122, 0.0}, {119, 0.0}, {115, 0.0}, {113, 0.0}, {109, 0.0}]                                                               |
|34     |[{149, 0.0}, {139, 0.

In [13]:
# Save user recommendations to a CSV file
user_recommendations.write.csv(r"C:\BigData\user_recommendations", header=True, mode="overwrite")


AnalysisException: [UNSUPPORTED_DATA_TYPE_FOR_DATASOURCE] The CSV datasource doesn't support the column `recommendations` of the type "ARRAY<STRUCT<item_id: INT, rating: FLOAT>>".

In [14]:
from pyspark.sql.functions import explode, col

# Flatten the recommendations
flattened_recommendations = user_recommendations.withColumn("recommendation", explode(col("recommendations"))) \
                                                .select(
                                                    col("user_id"),
                                                    col("recommendation.item_id").alias("item_id"),
                                                    col("recommendation.rating").alias("rating")
                                                )


In [15]:
flattened_recommendations.write.csv(r"C:\BigData\user_recommendations", header=True, mode="overwrite")
