
# MovieLens Baseline (PySpark ALS) – .ipynb

This notebook is a drop-in PySpark alternative to a Surprise SVD baseline.  
It loads MovieLens ratings, trains an **explicit-feedback ALS** model with simple hyperparameter tuning, evaluates **RMSE**, and exports **Top‑N recommendations** and a saved model.

**Sections**
1. Environment & Spark Bootstrap
2. Load Ratings
3. Basic Stats
4. Train/Validation Split
5. Train/Tune ALS (TrainValidationSplit)
6. Evaluate RMSE
7. Top‑N Recommendations
8. Save/Load Model
9. (Optional) Write validation predictions snapshot

> Tested with: Python 3.10, PySpark 3.5.x, OpenJDK 17.


## 1) Environment & Spark Bootstrap

In [3]:
from pyspark.sql import SparkSession

def build_spark(app_name: str = "MovieLens-ALS"):
    spark = (
        SparkSession.builder
        .appName(app_name)
        .config("spark.sql.execution.arrow.pyspark.enabled", "true")
        .config("spark.sql.shuffle.partitions", "200")
        .getOrCreate()
    )
    spark.sparkContext.setLogLevel("WARN")
    return spark

spark = build_spark()
spark


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/10/28 23:24:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## 2) Load Ratings

In [4]:
from pathlib import Path
import os

from pyspark.sql import functions as F, types as T

BASE_DIR = Path.cwd().parent
DATA_DIR = BASE_DIR / 'data'
RAW_DIR = DATA_DIR / 'raw'

if not RAW_DIR.exists():
    raise FileNotFoundError(f"Expected raw data at {RAW_DIR}. Run notebook from repo root or place ratings.csv there.")

def load_ratings(spark, ratings_path, fmt: str = 'csv', sep: str = ',', header: bool = True):
    """Load MovieLens ratings with schema."""
    path = Path(ratings_path).expanduser()
    schema = T.StructType([
        T.StructField('userId', T.IntegerType(), False),
        T.StructField('movieId', T.IntegerType(), False),
        T.StructField('rating', T.FloatType(), False),
        T.StructField('timestamp', T.TimestampType(), True),
    ])
    reader = spark.read.schema(schema)
    if fmt.lower() == 'csv':
        reader = (
            reader
            .option('header', str(header).lower())
            .option('sep', sep)
            .option('timestampFormat', 'yyyy-MM-dd HH:mm:ss')
        )
        df = reader.csv(str(path))
    elif fmt.lower() in {'parquet', 'pq'}:
        df = reader.parquet(str(path))
    else:
        raise ValueError("Unsupported fmt. Use 'csv' or 'parquet'.")
    df = (
        df.dropna(subset=['userId', 'movieId', 'rating'])
          .filter((F.col('rating') >= 0.5) & (F.col('rating') <= 5.0))
    )
    return df

def resolve_ratings_path():
    env_path = os.environ.get('MOVIELENS_RATINGS_PATH')
    if env_path:
        candidate = Path(env_path).expanduser()
        if candidate.exists():
            return candidate
        raise FileNotFoundError(f"MOVIELENS_RATINGS_PATH={env_path!r} does not exist.")
    candidate = RAW_DIR / 'ratings.csv'
    if candidate.exists():
        return candidate
    candidate = DATA_DIR / 'ratings.csv'
    if candidate.exists():
        return candidate
    raise FileNotFoundError(
        f"Could not find ratings.csv under {RAW_DIR}. Set MOVIELENS_RATINGS_PATH if stored elsewhere."
    )

RATINGS_PATH = resolve_ratings_path()
FMT = 'csv'  # or 'parquet'
ratings = load_ratings(spark, RATINGS_PATH, fmt=FMT, sep=',', header=True)
print(f"Loaded ratings from: {RATINGS_PATH}")
ratings.printSchema()
ratings.show(5, truncate=False)



Loaded ratings from: /Users/alanyu/Documents/IIT/ITM/ITMD-524-Applied AI and Deep Learning/finalproject/MovieLens-MCRS/data/raw/ratings.csv
root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: float (nullable = true)
 |-- timestamp: timestamp (nullable = true)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|1     |1      |4.0   |NULL     |
|1     |3      |4.0   |NULL     |
|1     |6      |4.0   |NULL     |
|1     |47     |5.0   |NULL     |
|1     |50     |5.0   |NULL     |
+------+-------+------+---------+
only showing top 5 rows



## 3) Basic Stats

In [5]:

def basic_stats(ratings):
    n_ratings = ratings.count()
    n_users = ratings.select("userId").distinct().count()
    n_items = ratings.select("movieId").distinct().count()
    density = n_ratings / (n_users * n_items)
    print("===== Dataset Stats =====")
    print(f"#Ratings: {n_ratings:,}")
    print(f"#Users:   {n_users:,}")
    print(f"#Items:   {n_items:,}")
    print(f"Density:  {density:.8f}")
    ratings.groupBy().agg(F.mean("rating").alias("global_mean")).show(truncate=False)

basic_stats(ratings)


===== Dataset Stats =====
#Ratings: 100,836
#Users:   610
#Items:   9,724
Density:  0.01699968
+-----------------+
|global_mean      |
+-----------------+
|3.501556983616962|
+-----------------+



## 4) Train/Validation Split

In [6]:

def split_train_val(ratings, seed: int = 42):
    # For time-aware split, sort per user by timestamp and split; here we use random split.
    train, val = ratings.randomSplit([0.8, 0.2], seed=seed)
    return train.cache(), val.cache()

train, val = split_train_val(ratings)
print(f"Train: {train.count():,}, Val: {val.count():,}")


                                                                                

Train: 80,578, Val: 20,258


                                                                                

## 5) Train/Tune ALS (TrainValidationSplit)

In [7]:

from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

def tune_and_fit(train, implicit: bool = False):
    als = ALS(
        userCol="userId",
        itemCol="movieId",
        ratingCol="rating",
        implicitPrefs=implicit,
        coldStartStrategy="drop",
        nonnegative=True,
        seed=42,
    )
    param_grid = (
        ParamGridBuilder()
        .addGrid(als.rank, [16, 32, 64])
        .addGrid(als.regParam, [0.05, 0.1, 0.2])
        .addGrid(als.maxIter, [10, 15])
        .build()
    )
    evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
    tvs = TrainValidationSplit(
        estimator=als,
        estimatorParamMaps=param_grid,
        evaluator=evaluator,
        trainRatio=0.8,
        parallelism=2,
        seed=42,
    )
    return tvs.fit(train)

tvs_model = tune_and_fit(train, implicit=False)
tvs_model


25/10/28 23:25:06 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
25/10/28 23:25:06 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS


TrainValidationSplitModel_dc8d6a98d7a3

## 6) Evaluate RMSE

In [8]:

def evaluate(model, val):
    best_model = model.bestModel
    preds = best_model.transform(val)
    evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
    rmse = evaluator.evaluate(preds)
    print("===== Best ALS Params =====")
    print(f"rank={best_model._java_obj.parent().getRank()}, "
          f"regParam={best_model._java_obj.parent().getRegParam()}, "
          f"maxIter={best_model._java_obj.parent().getMaxIter()}, "
          f"implicitPrefs={best_model._java_obj.parent().getImplicitPrefs()}")
    print(f"Validation RMSE = {rmse:.4f}")
    return rmse

rmse = evaluate(tvs_model, val)
rmse


===== Best ALS Params =====
rank=64, regParam=0.2, maxIter=15, implicitPrefs=False
Validation RMSE = 0.8749


0.8748588135300418

## 7) Top‑N Recommendations

In [9]:

from pyspark.sql import functions as F

best = tvs_model.bestModel
TOPN = 10

user_recs = best.recommendForAllUsers(TOPN)
item_recs = best.recommendForAllItems(TOPN)

# Pretty print a small sample
user_recs.select("userId", F.explode("recommendations").alias("rec")) \
    .select("userId", F.col("rec.movieId").alias("movieId"), F.col("rec.rating").alias("score")) \
    .orderBy("userId", F.desc("score")).show(30, truncate=False)

item_recs.select("movieId", F.explode("recommendations").alias("rec")) \
    .select("movieId", F.col("rec.userId").alias("userId"), F.col("rec.rating").alias("score")) \
    .orderBy("movieId", F.desc("score")).show(30, truncate=False)


                                                                                

+------+-------+---------+
|userId|movieId|score    |
+------+-------+---------+
|1     |132333 |5.928354 |
|1     |96004  |5.9241643|
|1     |3379   |5.9241643|
|1     |33649  |5.6812305|
|1     |60943  |5.6718407|
|1     |59018  |5.6718407|
|1     |5915   |5.556229 |
|1     |102217 |5.548975 |
|1     |93008  |5.5297456|
|1     |77846  |5.5297456|
|2     |67618  |4.811521 |
|2     |96004  |4.767877 |
|2     |3379   |4.767877 |
|2     |131724 |4.741022 |
|2     |33649  |4.71986  |
|2     |184245 |4.6064463|
|2     |134796 |4.6064463|
|2     |117531 |4.6064463|
|2     |86237  |4.6064463|
|2     |84273  |4.6064463|
|3     |6835   |4.77144  |
|3     |5746   |4.77144  |
|3     |5181   |4.6396904|
|3     |4518   |4.482605 |
|3     |2851   |4.3266068|
|3     |7899   |4.294297 |
|3     |26409  |4.074115 |
|3     |3024   |3.6480796|
|3     |3703   |3.3276978|
|3     |4821   |3.2661479|
+------+-------+---------+
only showing top 30 rows





+-------+------+---------+
|movieId|userId|score    |
+-------+------+---------+
|1      |53    |5.1805596|
|1      |276   |4.7790174|
|1      |43    |4.7342415|
|1      |452   |4.690034 |
|1      |93    |4.5877824|
|1      |12    |4.5567875|
|1      |543   |4.5472484|
|1      |99    |4.529876 |
|1      |171   |4.518492 |
|1      |169   |4.5059223|
|2      |53    |4.718187 |
|2      |43    |4.407033 |
|2      |543   |4.353454 |
|2      |276   |4.3002896|
|2      |452   |4.239373 |
|2      |12    |4.229362 |
|2      |93    |4.2253966|
|2      |584   |4.1333857|
|2      |337   |4.111156 |
|2      |578   |4.1066585|
|3      |53    |4.446891 |
|3      |43    |4.2145166|
|3      |543   |3.9929395|
|3      |276   |3.930692 |
|3      |93    |3.9045453|
|3      |452   |3.8745174|
|3      |337   |3.854468 |
|3      |171   |3.8544395|
|3      |243   |3.7752533|
|3      |12    |3.7746556|
+-------+------+---------+
only showing top 30 rows



                                                                                

## 8) Save/Load Model

In [10]:
MODEL_DIR = str((BASE_DIR / 'models' / 'als_movielens').resolve())

# Save
best.write().overwrite().save(MODEL_DIR)
print(f"Model saved to: {MODEL_DIR}")

# Load (if needed)
from pyspark.ml.recommendation import ALSModel
loaded = ALSModel.load(MODEL_DIR)
loaded


25/10/28 23:25:34 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/10/28 23:25:34 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/10/28 23:25:34 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
25/10/28 23:25:35 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/10/28 23:25:35 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/10/28 23:25:35 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/10/28 23:25:35 WARN MemoryManager: Total allocation exceeds 95.00%

Model saved to: /Users/alanyu/Documents/IIT/ITM/ITMD-524-Applied AI and Deep Learning/finalproject/MovieLens-MCRS/models/als_movielens


25/10/28 23:25:35 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/10/28 23:25:35 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


ALSModel: uid=ALS_f5812b623bb5, rank=64

## 9) (Optional) Write validation predictions snapshot

In [11]:
preds = best.transform(val)
output_dir = (BASE_DIR / 'outputs' / 'als_val_predictions').resolve()
(preds
 .select('userId', 'movieId', 'rating', F.round('prediction', 3).alias('prediction'))
 .coalesce(1)
 .write.mode('overwrite')
 .option('header', True)
 .csv(str(output_dir)))
print(f"Validation predictions written to {output_dir}")


Validation predictions written to /Users/alanyu/Documents/IIT/ITM/ITMD-524-Applied AI and Deep Learning/finalproject/MovieLens-MCRS/outputs/als_val_predictions



---

### SVD (Surprise) vs ALS (Spark) quick mapping
- **Goal**: both approximate user×item rating matrix; compare with RMSE/MAE.
- **Optimization**: SVD uses SGD with biases; ALS alternates closed‑form solves (can enable `nonnegative=True`).
- **Cold start**: set `coldStartStrategy="drop"` to drop NaN predictions for unseen users/items in validation.
- **Implicit feedback**: set `implicitPrefs=True` **and** change data to confidence‑weighted interactions; metrics differ.
- **Time‑aware split**: for production‑like evaluation, split per‑user by timestamp rather than random split.
