# Running ALS on Outfit Recommendation (PySpark)

In [46]:
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

import sys
import pyspark
from pyspark.ml.recommendation import ALS
from pyspark.ml.feature import StringIndexer
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import StringType, FloatType, IntegerType, LongType

from recommenders.utils.timer import Timer
from recommenders.utils.notebook_utils import is_jupyter
from recommenders.datasets.spark_splitters import spark_random_split
from recommenders.evaluation.spark_evaluation import SparkRatingEvaluation, SparkRankingEvaluation
from recommenders.utils.spark_utils import start_or_get_spark
from recommenders.utils.notebook_utils import store_metadata

# Dataset
from datasets import outfits

print(f"System version: {sys.version}")
print("Spark version: {}".format(pyspark.__version__))


System version: 3.9.23 (main, Jun  5 2025, 13:40:20) 
[GCC 11.2.0]
Spark version: 4.0.0


In [47]:
# top k items to recommend
TOP_K = 1

OUTFITS_DATA_SIZE = '100'

# Column names for the dataset
COL_USER = "UserId"
COL_ITEM = "Clothing"
COL_RATING = "Rating"
COL_WEATHER = "Weather"
COL_ITEM_ID = "ClothingId"

In [48]:
# Start Spark session
spark = start_or_get_spark("ALS PySpark", memory="16g")
spark.conf.set("spark.sql.analyzer.failAmbiguousSelfJoin", "false")

### 1. Download Dataset

In [49]:
schema = StructType(
    (
        StructField(COL_USER, IntegerType()),
        StructField(COL_WEATHER, StringType()),
        StructField(COL_ITEM, StringType()),
        StructField(COL_RATING, FloatType()),
    )
)

data = outfits.load_spark_df(spark, size=None, schema=schema, filepath="./datasets/csv/feature1.csv")
data.show()

indexer = StringIndexer(inputCol=COL_ITEM, outputCol=COL_ITEM_ID)
indexed_data = indexer.fit(data).transform(data)
print("Data after indexing the 'Clothing' column:")
indexed_data.show()


+------+-------+--------------------+------+
|UserId|Weather|            Clothing|Rating|
+------+-------+--------------------+------+
|    17|   Mild|    Crewneck Sweater|   4.2|
|     1|   Cool|              Chinos|   4.1|
|     2|   Cool|    Pleated Trousers|   3.1|
|    24|   Cold|    Crewneck Sweater|   3.5|
|    20|    Hot|    Pleated Trousers|   2.5|
|    18|  Rainy|    Pleated Trousers|   2.8|
|    25|   Cold|Oxford Cloth Butt...|   2.6|
|     1|   Cold|              Chinos|   2.9|
|    12|   Cold|      Linen Trousers|   1.0|
|     9|  Rainy|Oxford Cloth Butt...|   3.3|
|     3|   Cold|       Wool Trousers|   5.0|
|     2|  Sunny|         Linen Shirt|   4.1|
|    19|   Cool|    Pleated Trousers|   3.0|
|    15|   Cool|Oxford Cloth Butt...|   3.6|
|     7|  Rainy|       Wool Trousers|   3.6|
|    23|   Cool|    Pleated Trousers|   3.2|
|    14|    Hot|      V-Neck Sweater|   3.1|
|     2|  Sunny|       Wool Trousers|   1.2|
|    11|   Cool|      V-Neck Sweater|   3.9|
|    14|  

### 2. Splitting the Data

In [50]:
train, test = spark_random_split(indexed_data, ratio=0.75, seed=123)
print ("N train", train.cache().count())
print ("N test", test.cache().count())

N train 154
N test 46


### 3. Training the Model and Getting Our Predictions

In [51]:
header = {
    "userCol": COL_USER,
    "itemCol": COL_ITEM_ID,
    "ratingCol": COL_RATING,
}


als = ALS(
    rank=10,
    maxIter=15,
    implicitPrefs=False,
    regParam=0.05,
    coldStartStrategy='drop',
    nonnegative=False,
    seed=42,
    **header
)

In [52]:
with Timer() as train_time:
    model = als.fit(train)

print("Took {} seconds for training.".format(train_time.interval))

Took 3.5284303050002563 seconds for training.


In [53]:
# with Timer() as test_time:

#     # Get the cross join of all user-item pairs and score them.
#     users = train.select(COL_USER).distinct()
#     items = train.select(COL_ITEM).distinct()
#     user_item = users.crossJoin(items)
#     dfs_pred = model.transform(user_item)

#     # Remove seen items.
#     dfs_pred_exclude_train = dfs_pred.alias("pred").join(
#         train.alias("train"),
#         (dfs_pred[COL_USER] == train[COL_USER]) & (dfs_pred[COL_ITEM] == train[COL_ITEM]),
#         how='outer'
#     )

#     top_all = dfs_pred_exclude_train.filter(dfs_pred_exclude_train[f"train.{COL_RATING}"].isNull()) \
#         .select('pred.' + COL_USER, 'pred.' + COL_ITEM, 'pred.' + "prediction")

#     # In Spark, transformations are lazy evaluation
#     # Use an action to force execute and measure the test time 
#     top_all.cache().count()

# print("Took {} seconds for prediction.".format(test_time.interval))

with Timer() as test_time:
    users = train.select(COL_USER).distinct()

    items = train.select(COL_ITEM_ID).distinct()

    user_item = users.crossJoin(items)
    dfs_pred = model.transform(user_item)

    top_all = dfs_pred.join(
        indexed_data.select(COL_USER, COL_ITEM_ID),
        on=[COL_USER, COL_ITEM_ID],
        how='left_anti'
    )

    # Force execution to measure the time
    top_all.cache().count()

print("Took {} seconds for prediction.".format(test_time.interval))



Took 2.3500981329998467 seconds for prediction.


                                                                                

In [54]:
top_all.show()

+------+----------+----------+
|UserId|ClothingId|prediction|
+------+----------+----------+
|    22|         8| 2.4058814|
|     1|         8| 3.2283807|
|    13|         8| 2.8466918|
|     3|         8| 3.4562228|
|    20|         8| 3.2722473|
|    19|         8|  2.844372|
|    15|         8| 2.9901443|
|     9|         8| 3.4880786|
|     4|         8| 3.7692726|
|    25|         8| 3.1036925|
|     2|         8| 2.9269228|
|    18|         8|  3.415548|
|    16|         0| 2.7907774|
|    17|         0| 3.2832756|
|     4|         0| 3.5647602|
|     7|         0| 2.7452872|
|    25|         0|  2.718147|
|    21|         0| 2.8855975|
|    22|         7| 1.0010359|
|     1|         7| 1.9375099|
+------+----------+----------+
only showing top 20 rows


### 5. Evaluation

In [55]:
rank_eval = SparkRankingEvaluation(test, top_all, k = TOP_K, col_user=COL_USER, col_item=COL_ITEM_ID, 
                                    col_rating=COL_RATING, col_prediction="prediction", 
                                    relevancy_method="top_k")

                                                                                

In [56]:
print("Model:\tALS",
      "Top K:\t%d" % rank_eval.k,
      "MAP:\t%f" % rank_eval.map_at_k(),
      "NDCG:\t%f" % rank_eval.ndcg_at_k(),
      "Precision@K:\t%f" % rank_eval.precision_at_k(),
      "Recall@K:\t%f" % rank_eval.recall_at_k(), sep='\n')

Model:	ALS
Top K:	1
MAP:	0.000000
NDCG:	0.000000
Precision@K:	0.000000
Recall@K:	0.000000


### 5. Evaluate Rating Prediction

In [57]:
# Generate predicted ratings.
prediction = model.transform(test)
prediction.cache().show()


+------+-------+--------------------+------+----------+----------+
|UserId|Weather|            Clothing|Rating|ClothingId|prediction|
+------+-------+--------------------+------+----------+----------+
|    12|  Rainy|    Cashmere Sweater|   2.5|         2|  3.190533|
|    12|  Sunny|      Linen Trousers|   4.6|         7|  1.259581|
|    22|    Hot|       Wool Trousers|   1.0|         9| 2.8805444|
|    22|   Mild|              Chinos|   4.0|         1|  3.383572|
|     1|   Cold|          Polo Shirt|   1.0|         4| 3.4242387|
|     1|   Mild|              Chinos|   4.3|         1| 3.4575682|
|    13|   Cool|    Crewneck Sweater|   3.7|         5| 3.2912312|
|    13|  Rainy|              Chinos|   2.9|         1| 4.1585917|
|    16|   Cool|Oxford Cloth Butt...|   3.7|         6|  2.697968|
|    16|   Mild|       Wool Trousers|   2.9|         9| 2.6002858|
|     3|  Sunny|          Polo Shirt|   4.6|         4| 3.0810018|
|    20|   Mild|    Crewneck Sweater|   3.6|         5| 3.2652

In [58]:
rating_eval = SparkRatingEvaluation(test, prediction, col_user=COL_USER, col_item=COL_ITEM, 
                                    col_rating=COL_RATING, col_prediction="prediction")

print("Model:\tALS rating prediction",
      "RMSE:\t%f" % rating_eval.rmse(),
      "MAE:\t%f" % rating_eval.mae(),
      "Explained variance:\t%f" % rating_eval.exp_var(),
      "R squared:\t%f" % rating_eval.rsquared(), sep='\n')

Model:	ALS rating prediction
RMSE:	1.472280
MAE:	1.176934
Explained variance:	-0.819469
R squared:	-1.130056


In [70]:
from pyspark.ml.feature import IndexToString
from pyspark.sql.functions import explode, col

# Recommend items for all users
user_recs = model.recommendForAllUsers(5)

# explode recommendations into one row per (user, rec)
recs_exploded = user_recs.select(COL_USER, explode(col("recommendations")).alias("rec"))

# pull out the indexed item id (ClothingId) and rating into top-level columns
recs_flat = recs_exploded.select(
    COL_USER,
    col("rec." + COL_ITEM_ID).alias(COL_ITEM_ID),
    col("rec.rating").alias("rating"),
)

# Convert back to original item name
inverter = IndexToString(inputCol=COL_ITEM_ID, outputCol=COL_ITEM, labels=indexer.fit(data).labels)
itd = inverter.transform(recs_flat)

# Show recommendations for a user
itd.where(itd[COL_USER] == 1).show(truncate = 0)

+------+----------+---------+----------------+
|UserId|ClothingId|rating   |Clothing        |
+------+----------+---------+----------------+
|1     |5         |3.8331847|Crewneck Sweater|
|1     |2         |3.7567925|Cashmere Sweater|
|1     |1         |3.4575682|Chinos          |
|1     |4         |3.4242387|Polo Shirt      |
|1     |0         |3.230017 |Pleated Trousers|
+------+----------+---------+----------------+



                                                                                

In [59]:
# cleanup spark instance
spark.stop()