# Running ALS on Outfit Recommendation (PySpark)

In [4]:
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

import sys
import pyspark
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import StringIndexer
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, FloatType, IntegerType, LongType

from recommenders.utils.timer import Timer
from recommenders.utils.notebook_utils import is_jupyter
from recommenders.datasets.spark_splitters import spark_random_split
from recommenders.evaluation.spark_evaluation import SparkRatingEvaluation, SparkRankingEvaluation
from recommenders.utils.spark_utils import start_or_get_spark
from recommenders.utils.notebook_utils import store_metadata

# Dataset
from datasets import outfits

print(f"System version: {sys.version}")
print("Spark version: {}".format(pyspark.__version__))


System version: 3.10.13 | packaged by Anaconda, Inc. | (main, Sep 11 2023, 13:24:38) [MSC v.1916 64 bit (AMD64)]
Spark version: 4.0.0


In [5]:
# top k items to recommend
TOP_K = 1

OUTFITS_DATA_SIZE = '100'

# Column names for the dataset
COL_USER = "UserId"
COL_ITEM = "Clothing"
COL_RATING = "Rating"
COL_WEATHER = "Weather"
COL_ITEM_ID = "ClothingId"

In [6]:
# Start Spark session
spark = start_or_get_spark("ALS PySpark", memory="16g")
spark.conf.set("spark.sql.analyzer.failAmbiguousSelfJoin", "false")

KeyboardInterrupt: 

### 1. Download Dataset

In [None]:
schema = StructType(
    (
        StructField(COL_USER, IntegerType()),
        StructField(COL_WEATHER, StringType()),
        StructField(COL_ITEM, StringType()),
        StructField(COL_RATING, FloatType()),
    )
)

data = outfits.load_spark_df(spark, size=None, schema=schema, filepath="./datasets/csv/own_feature.csv")
data.show()

indexer = StringIndexer(inputCol=COL_ITEM, outputCol=COL_ITEM_ID)
indexed_data = indexer.fit(data).transform(data)
print("Data after indexing the 'Clothing' column:")
indexed_data.show()


+------+-------+-----------+------+
|UserId|Weather|   Clothing|Rating|
+------+-------+-----------+------+
|     1|  Sunny|    T-shirt|   5.0|
|     1|    Hot|     Shorts|   4.0|
|     1|  Rainy|     Hoodie|   3.0|
|     2|   Cold|Long Sleeve|   5.0|
|     2|  Windy|      Pants|   4.0|
|     2|  Sunny|     Jacket|   3.0|
|     3|    Hot|      Jeans|   5.0|
|     3|  Rainy|    Sweater|   4.0|
|     3|   Cold|       Polo|   3.0|
|     4|  Sunny|     Blazer|   5.0|
|     4|  Windy|      Skirt|   4.0|
|     4|  Rainy|   Cardigan|   3.0|
|     5|    Hot|     Hoodie|   5.0|
|     5|   Cold|    T-shirt|   4.0|
|     5|  Sunny|     Shorts|   3.0|
|     6|  Windy|    Sweater|   4.0|
|     6|  Sunny|     Jacket|   5.0|
|     6|   Cold|      Pants|   3.0|
|     7|    Hot|    T-shirt|   5.0|
|     7|  Rainy|     Hoodie|   4.0|
+------+-------+-----------+------+
only showing top 20 rows
Data after indexing the 'Clothing' column:
+------+-------+-----------+------+----------+
|UserId|Weather|   Cl

### 2. Splitting the Data

In [None]:
train, test = spark_random_split(indexed_data, ratio=0.75, seed=123)
print ("N train", train.cache().count())
print ("N test", test.cache().count())

N train 92
N test 28


### 3. Training the Model and Getting Our Predictions

In [None]:
header = {
    "userCol": COL_USER,
    "itemCol": COL_ITEM_ID,
    "ratingCol": COL_RATING,
}



als = ALS(
    rank=10,
    maxIter=15,
    implicitPrefs=False,
    regParam=0.05,
    coldStartStrategy='drop',
    nonnegative=False,
    seed=42,
    **header
)

In [None]:
with Timer() as train_time:
    model = als.fit(train)

print("Took {} seconds for training.".format(train_time.interval))

Took 7.807712599998922 seconds for training.


In [None]:
# with Timer() as test_time:

#     # Get the cross join of all user-item pairs and score them.
#     users = train.select(COL_USER).distinct()
#     items = train.select(COL_ITEM).distinct()
#     user_item = users.crossJoin(items)
#     dfs_pred = model.transform(user_item)

#     # Remove seen items.
#     dfs_pred_exclude_train = dfs_pred.alias("pred").join(
#         train.alias("train"),
#         (dfs_pred[COL_USER] == train[COL_USER]) & (dfs_pred[COL_ITEM] == train[COL_ITEM]),
#         how='outer'
#     )

#     top_all = dfs_pred_exclude_train.filter(dfs_pred_exclude_train[f"train.{COL_RATING}"].isNull()) \
#         .select('pred.' + COL_USER, 'pred.' + COL_ITEM, 'pred.' + "prediction")

#     # In Spark, transformations are lazy evaluation
#     # Use an action to force execute and measure the test time 
#     top_all.cache().count()

# print("Took {} seconds for prediction.".format(test_time.interval))

with Timer() as test_time:
    users = train.select(COL_USER).distinct()

    items = train.select(COL_ITEM_ID).distinct()

    user_item = users.crossJoin(items)
    dfs_pred = model.transform(user_item)

    top_all = dfs_pred.join(
        indexed_data.select(COL_USER, COL_ITEM_ID),
        on=[COL_USER, COL_ITEM_ID],
        how='left_anti'
    )

    # Force execution to measure the time
    top_all.cache().count()

print("Took {} seconds for prediction.".format(test_time.interval))

Took 4.616216800000984 seconds for prediction.


In [None]:
top_all.orderBy(COL_USER, F.desc("prediction")).show()


+------+----------+----------+
|UserId|ClothingId|prediction|
+------+----------+----------+
|     1|         7|  5.023626|
|     1|         8|   4.85732|
|     1|         6| 3.7927144|
|     1|         5| 3.5528836|
|     1|        11|  3.391508|
|     1|         9| 3.1464128|
|     1|        10| 2.9616942|
|     2|         7| 5.0620155|
|     2|         6|  4.709888|
|     2|        11| 4.1010056|
|     2|         4| 3.3333082|
|     2|         9| 3.1165466|
|     2|         8| 2.9009604|
|     3|         2|  4.277485|
|     3|         6| 4.1963844|
|     3|         5|  4.064257|
|     3|        10| 3.6585684|
|     3|         3|  3.625513|
|     3|        11| 3.5879092|
|     3|         9| 3.0859103|
+------+----------+----------+
only showing top 20 rows


### 5. Evaluation

In [None]:
print(test.columns)
print(top_all.columns)


['UserId', 'Weather', 'Clothing', 'Rating', 'ClothingId']
['UserId', 'ClothingId', 'prediction']


In [None]:
rank_eval = SparkRankingEvaluation(test, top_all, k = TOP_K, col_user=COL_USER, col_item=COL_ITEM_ID, 
                                    col_rating=COL_RATING, col_prediction="prediction", 
                                    relevancy_method="top_k")

In [None]:
print("Model:\tALS",
      "Top K:\t%d" % rank_eval.k,
      "MAP:\t%f" % rank_eval.map_at_k(),
      "NDCG:\t%f" % rank_eval.ndcg_at_k(),
      "Precision@K:\t%f" % rank_eval.precision_at_k(),
      "Recall@K:\t%f" % rank_eval.recall_at_k(), sep='\n')

Model:	ALS
Top K:	1
MAP:	0.000000
NDCG:	0.000000
Precision@K:	0.000000
Recall@K:	0.000000


### 5. Evaluate Rating Prediction

In [None]:
# Generate predicted ratings.
prediction = model.transform(test)
prediction.cache().show()


+------+-------+--------+------+----------+----------+
|UserId|Weather|Clothing|Rating|ClothingId|prediction|
+------+-------+--------+------+----------+----------+
|     1|    Hot|  Shorts|   5.0|         2| 3.9349766|
|    13|    Hot|    Polo|   5.0|         8|   4.20761|
|    13|  Sunny|   Pants|   5.0|         5| 3.3469734|
|     6|   Cold|   Pants|   3.0|         5| 4.2251115|
|     6|  Sunny| T-shirt|   4.0|         1| 4.5145974|
|     6|  Windy| Sweater|   4.0|         4| 4.7043977|
|     6|  Windy| Sweater|   4.0|         4| 4.7043977|
|    16|  Sunny|  Hoodie|   4.0|         0| 4.8954053|
|     3|   Cold|  Hoodie|   5.0|         0|  4.239459|
|     3|   Cold|    Polo|   3.0|         8| 3.8425083|
|    20|  Windy|  Blazer|   4.0|         6| 3.9430902|
|     5|  Windy|  Blazer|   5.0|         6|  4.475267|
|    19|  Windy|  Hoodie|   5.0|         0|  4.898927|
|    15|   Cold| T-shirt|   5.0|         1| 4.0074635|
|    17|  Windy|   Jeans|   4.0|         7|  4.431289|
|     4|  

In [None]:
rating_eval = SparkRatingEvaluation(test, prediction, col_user=COL_USER, col_item=COL_ITEM, 
                                    col_rating=COL_RATING, col_prediction="prediction")

print("Model:\tALS rating prediction",
      "RMSE:\t%f" % rating_eval.rmse(),
      "MAE:\t%f" % rating_eval.mae(),
      "Explained variance:\t%f" % rating_eval.exp_var(),
      "R squared:\t%f" % rating_eval.rsquared(), sep='\n')

Model:	ALS rating prediction
RMSE:	0.923172
MAE:	0.756428
Explained variance:	-0.725397
R squared:	-0.727528


In [None]:
import pandas as pd
import matplotlib.pyplot as plt

# --- 1. SETUP PARAMETERS AND VARIABLES ---
# Assuming these variables are defined from your notebook's code:
# COL_USER, COL_ITEM_ID, COL_RATING, train, test

# Use the better-performing hyperparameters
FINAL_RANK = 10 # 10 in original
REG_PARAM = 0.05 # 0.05 in original
MAX_ITER_RANGE = 25 # Number of training steps to observe

error_data = []

# --- 2. ITERATIVE TRAINING AND EVALUATION ---
print(f"Starting iterative training (Rank={FINAL_RANK}, RegParam={REG_PARAM})...")

for max_iter in range(1, MAX_ITER_RANGE + 1):
    # a. Create a new ALS instance for the current iteration count
    als_iter = ALS(
        rank=FINAL_RANK,
        maxIter=max_iter, # <--- The key difference: incrementing maxIter
        implicitPrefs=False,
        regParam=REG_PARAM,
        coldStartStrategy='drop',
        seed=42,
        userCol=COL_USER,
        itemCol=COL_ITEM_ID,
        ratingCol=COL_RATING
    )
    
    # b. Fit the model and generate predictions on the TEST set
    model_iter = als_iter.fit(train)
    prediction = model_iter.transform(test)
    
    # c. Evaluate the model (calculate RMSE)
    rating_eval = SparkRatingEvaluation(
        test, prediction, 
        col_user=COL_USER, 
        col_item=COL_ITEM_ID, 
        col_rating=COL_RATING, 
        col_prediction="prediction"
    )
    
    rmse_value = rating_eval.rmse()
    error_data.append((max_iter, rmse_value))
    
    if max_iter % 5 == 0:
        print(f"Iteration {max_iter}: RMSE = {rmse_value:.4f}")

# --- 3. PLOT THE CONVERGENCE CURVE ---

# Convert the results to a Pandas DataFrame for plotting
df_errors = pd.DataFrame(error_data, columns=['Iterations', 'RMSE'])

# Plotting the curve
plt.figure(figsize=(10, 6))
plt.plot(df_errors['Iterations'], df_errors['RMSE'], marker='o', linestyle='-', color='b')
plt.title('ALS Model Convergence (Test Set RMSE vs. Iterations)')
plt.xlabel('Training Steps (maxIter)')
plt.ylabel('Test Set RMSE')
plt.grid(True)
plt.savefig("als_convergence_plot.png")
plt.close()

print("\nConvergence plot saved to als_convergence_plot.png")

Starting iterative training (Rank=10, RegParam=0.05)...
Iteration 5: RMSE = 0.8682


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "c:\Users\zivil\anaconda3\envs\outfit\lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "c:\Users\zivil\anaconda3\envs\outfit\lib\site-packages\py4j\clientserver.py", line 535, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "c:\Users\zivil\anaconda3\envs\outfit\lib\socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt
ERROR:py4j.clientserver:Exception occurred while shutting down connection
Traceback (most recent call last):
  File "c:\Users\zivil\anaconda3\envs\outfit\lib\site-packages\py4j\java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "c:\Users\zivil\anaconda3\envs\outfit\lib\site-packages\py4j\clientserver.py", line 535, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "c:\U

KeyboardInterrupt: 

In [None]:
# cleanup spark instance
spark.stop()