In [1]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import col

In [3]:
# Step 1: Start a Spark Session
spark = SparkSession.builder \
    .appName("ALS Recommender") \
    .getOrCreate()

In [4]:
file_path = 'interactions.csv'
data = spark.read.csv(file_path, header=True, inferSchema=True)

In [5]:
data.limit(10).toPandas()

Unnamed: 0,User_A,User_B,Interaction_Intensity
0,0,1,1.0
1,0,2,1.0
2,0,3,1.0
3,0,7,4.0
4,0,8,1.0
5,0,9,2.0
6,0,10,5.0
7,0,14,5.0
8,0,23,1.0
9,0,28,3.0


In [7]:
# Check the dataset schema
data.printSchema()

root
 |-- User_A: integer (nullable = true)
 |-- User_B: integer (nullable = true)
 |-- Interaction_Intensity: double (nullable = true)



In [8]:
data = data.select(
    col("User_A").alias("user_id"),
    col("User_B").alias("item_id"),
    col("Interaction_Intensity").alias("rating")
)

In [9]:
from pyspark.sql.functions import col, count, when

# Count null values in each column
data.select([count(when(col(c).isNull(), c)).alias(c) for c in data.columns]).show()

+-------+-------+------+
|user_id|item_id|rating|
+-------+-------+------+
|      0|      0|     0|
+-------+-------+------+



In [10]:
# Ensure there are no nulls
data = data.dropna()

In [11]:
data.printSchema()

root
 |-- user_id: integer (nullable = true)
 |-- item_id: integer (nullable = true)
 |-- rating: double (nullable = true)



In [12]:
# Total rows in the dataset
total_rows = data.count()
print(f"Total rows: {total_rows}")

# Count the number of distinct rows
distinct_rows = data.dropDuplicates().count()
print(f"Distinct rows: {distinct_rows}")

# Calculate duplicates
duplicates = total_rows - distinct_rows
print(f"Number of duplicate rows: {duplicates}")


Total rows: 632211


[Stage 9:>                                                          (0 + 2) / 2]

Distinct rows: 632211
Number of duplicate rows: 0


                                                                                

In [13]:
from pyspark.sql.functions import rand

data = data.orderBy(rand())


In [14]:
data.limit(10).show()

+-------+-------+------+
|user_id|item_id|rating|
+-------+-------+------+
|   1250|    605|   1.0|
|    292|    693|   5.0|
|    453|   1272|   3.0|
|      7|      4|   4.0|
|    868|    415|   5.0|
|    865|    189|   1.0|
|    747|   1092|   2.0|
|   1207|    231|   3.0|
|    336|   1274|   4.0|
|    393|    435|   2.0|
+-------+-------+------+



In [15]:
train_data, test_data = data.randomSplit([0.9, 0.1], seed=42)
# Verify split sizes
print(f"Training data count: {train_data.count()}")
print(f"Test data count: {test_data.count()}")


                                                                                

Training data count: 569082
Test data count: 63157


In [16]:
from pyspark.ml.recommendation import ALS

# Initialize ALS model
als = ALS(
    implicitPrefs=True,
    maxIter=20,              # Number of iterations
    regParam=0.1,            # Regularization parameter
    rank=20,                 # Number of latent factors
    userCol="user_id",       # Column for user IDs
    itemCol="item_id",       # Column for item IDs
    ratingCol="rating",      # Column for ratings
    coldStartStrategy="drop" # Handle unseen users/items during predictions
)

# Fit the ALS model on the training data
als_model = als.fit(train_data)


24/12/16 11:53:12 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/12/16 11:53:12 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
24/12/16 11:53:12 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK


In [25]:
# Generate predictions on the test set
predictions = als_model.transform(test_data)

# Show the predictions
predictions.show()


+-------+-------+------+----------+
|user_id|item_id|rating|prediction|
+-------+-------+------+----------+
|      0|    151|   5.0|  3.127446|
|      0|    364|   1.0|  2.845566|
|      1|    329|   2.0| 2.7982461|
|      1|    508|   3.0| 2.8757863|
|      1|   1003|   1.0| 2.8427062|
|      2|    194|   1.0| 2.9449258|
|      2|    412|   2.0| 2.9658844|
|      2|    431|   3.0|  2.731868|
|      2|    655|   2.0| 3.0503056|
|      3|    140|   1.0|  2.862554|
|      3|    266|   4.0| 2.7611225|
|      3|    577|   1.0| 2.9654324|
|      3|    867|   4.0| 2.6659179|
|      3|   1244|   2.0| 2.7762034|
|      4|    838|   5.0| 2.8890398|
|      4|   1005|   5.0| 2.8038964|
|      5|    732|   4.0| 2.8919232|
|      6|    506|   3.0| 2.7655413|
|      6|    766|   4.0| 2.5914567|
|      6|   1066|   3.0| 2.5186863|
+-------+-------+------+----------+
only showing top 20 rows



Rating prediction metrics: RMSE, MAE, explained variance, R-squared
Ranking-based evaluation: MAP, NDCG, precision@K, recall@K


In [27]:
# Evaluate with RMSE and MAE

from pyspark.ml.evaluation import RegressionEvaluator

evaluator_rmse = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
evaluator_mae = RegressionEvaluator(metricName="mae", labelCol="rating", predictionCol="prediction")

rmse = evaluator_rmse.evaluate(predictions)
mae = evaluator_mae.evaluate(predictions)
print(f"rmse : {rmse} \n mae: {mae}")

rmse : 1.4015036480446443 
 mae: 1.2079919460116295


In [44]:
top_k_recommendations = als_model.recommendForAllUsers(10)


In [50]:
top_k_recommendations.filter(col("user_id").isin([0, 2, 4, 7])).show(truncate=False)


+-------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|user_id|recommendations                                                                                                                                                                      |
+-------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|0      |[{557, 3.4198232}, {145, 3.3708172}, {902, 3.3346689}, {563, 3.2411604}, {804, 3.234417}, {701, 3.2271514}, {426, 3.2252626}, {1072, 3.224292}, {974, 3.2189589}, {934, 3.21756}]    |
|2      |[{407, 3.3153088}, {1190, 3.3107648}, {515, 3.2818205}, {202, 3.265611}, {1067, 3.2291625}, {1273, 3.2221503}, {335, 3.209711}, {1126, 3.203563}, {417, 3.1885412}, {201, 3.1879504}]|
|4      |[{902, 3.3007252}, {347, 3.2395

In [51]:
# Flatten Recommendations: Explode the nested recommendations to make it easier to evaluate: 
from pyspark.sql.functions import explode, col

top_k_recommendations = top_k_recommendations.withColumn("recommendation", explode(col("recommendations"))) \
    .select("user_id", col("recommendation.item_id").alias("item_id"), col("recommendation.rating").alias("prediction"))


In [58]:
top_k_recommendations.filter(col("user_id").isin([0, 1, 2])).show(n=30, truncate=False)


+-------+-------+----------+
|user_id|item_id|prediction|
+-------+-------+----------+
|1      |902    |3.25111   |
|1      |407    |3.20571   |
|1      |1047   |3.1858816 |
|1      |833    |3.1620946 |
|1      |525    |3.1469517 |
|1      |515    |3.1460884 |
|1      |449    |3.1346653 |
|1      |56     |3.1315854 |
|1      |718    |3.1201718 |
|1      |291    |3.1190429 |
|0      |557    |3.4198232 |
|0      |145    |3.3708172 |
|0      |902    |3.3346689 |
|0      |563    |3.2411604 |
|0      |804    |3.234417  |
|0      |701    |3.2271514 |
|0      |426    |3.2252626 |
|0      |1072   |3.224292  |
|0      |974    |3.2189589 |
|0      |934    |3.21756   |
|2      |407    |3.3153088 |
|2      |1190   |3.3107648 |
|2      |515    |3.2818205 |
|2      |202    |3.265611  |
|2      |1067   |3.2291625 |
|2      |1273   |3.2221503 |
|2      |335    |3.209711  |
|2      |1126   |3.203563  |
|2      |417    |3.1885412 |
|2      |201    |3.1879504 |
+-------+-------+----------+



In [59]:
# Join with Ground Truth
joined = top_k_recommendations.join(test_data, on=["user_id", "item_id"], how="inner")

# Precision@K and Recall@K
precision_at_k = joined.count() / (top_k_recommendations.count() * 10)
recall_at_k = joined.count() / test_data.count()

print(f"Ranking Evaluation - Precision@K: {precision_at_k}, Recall@K: {recall_at_k}")

                                                                                

Ranking Evaluation - Precision@K: 0.002076923076923077, Recall@K: 0.00424857324032974
