# DATA 612 Project 5 | Implementing a Recommender System on Spark 

In [0]:
#imports
from pyspark.sql import SparkSession
from pyspark.sql import Row

from pyspark.ml.recommendation import ALS 
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import TrainValidationSplit, ParamGridBuilder

from pyspark.sql.functions import col
import numpy as np

import pyspark.sql.functions as sf

from pyspark.ml.feature import MinMaxScaler, VectorAssembler
from pyspark.sql.functions import col, udf
from pyspark.ml.linalg import VectorUDT
from pyspark.sql.types import ArrayType, DoubleType

## Jester Joke Data
2.5 million anonymous ratings of jokes by users of the Jester Joke Recommender System (Ken Goldberg, AUTOLab, UC Berkeley). Values from (-10.00 to +10.00) of 100 jokes collected between April 1999 - May 2003. Data from 24,983 users who have rated 36 or more jokes, a matrix with dimensions 24983 X 101.

In [0]:
df = spark.read.table("workspace.default.jester_data_1")
df = df.drop("Number of Jokes Rated")
display(df)

User,1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33,34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50,51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66,67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83,84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100
1,-7.82,8.79,-9.66,-8.16,-7.52,-8.5,-9.85,4.17,-8.98,-4.76,-8.5,-6.75,-7.18,8.45,-7.18,-7.52,-7.43,-9.81,-9.85,-9.85,-9.37,1.5,-4.37,-9.81,-8.5,1.12,7.82,2.86,9.13,-7.43,2.14,-4.08,-9.08,7.82,5.05,4.95,-9.17,-8.4,-8.4,-8.4,-8.11,-9.13,-9.03,-9.08,-7.14,-6.26,3.79,-0.1,3.93,4.13,-8.69,-7.14,3.2,8.3,-4.56,0.92,-9.13,-9.42,2.82,-8.64,8.59,3.59,-6.84,-9.03,2.82,-1.36,-9.08,8.3,5.68,-4.81,,,,,,,,-9.42,,,,-7.72,,,,,,,,,2.82,,,,,,-5.63,,,
2,4.08,-0.29,6.36,4.37,-2.38,-9.66,-0.73,-5.34,8.88,9.22,6.75,8.64,4.42,7.43,4.56,-0.97,4.66,-0.68,3.3,-1.21,0.87,8.64,8.35,9.17,0.05,7.57,4.71,0.87,-0.39,6.0,6.5,-0.92,7.14,9.03,-1.8,0.73,7.09,3.4,-0.87,7.91,7.82,-3.83,8.64,8.98,-4.32,-3.2,-4.9,-0.92,-1.46,3.64,3.16,9.03,0.97,-1.31,-6.5,-3.2,8.64,-2.14,0.1,9.03,-6.7,-3.35,-9.03,4.47,4.08,-3.83,8.74,1.12,0.78,7.52,-5.0,2.77,8.3,7.77,7.33,6.21,7.72,8.98,8.64,8.2,3.93,4.85,4.85,6.07,8.98,4.51,-0.05,3.69,4.56,0.58,2.82,-4.95,-0.29,7.86,-0.19,-2.14,3.06,0.34,-4.32,1.07
3,,,,,9.03,9.27,9.03,9.27,,,7.33,7.57,9.37,6.17,-6.36,-6.89,-7.86,9.03,9.03,9.03,7.28,,8.25,,,7.48,7.28,7.28,8.93,,6.17,7.28,,,8.98,7.33,,6.17,9.08,7.33,7.52,9.27,9.27,,9.27,,6.17,7.33,9.08,7.28,,,7.28,7.33,,7.23,,,,9.27,6.46,7.28,,,7.04,7.28,,7.28,8.25,,,,,,,8.93,,,,9.08,,,,,,,,,,9.03,,,,9.08,,,,,,
4,,8.35,,,1.8,8.16,-2.82,6.21,,1.84,7.33,6.6,6.31,8.11,-7.23,-6.65,1.17,-6.6,-3.64,-2.09,5.34,,,,,2.91,3.93,6.75,6.6,,6.65,-6.12,,7.57,6.21,6.65,,-8.3,7.18,2.82,,1.55,,,,6.84,6.84,-3.98,6.0,3.45,,6.94,1.55,7.67,,6.55,,,,,0.0,-3.69,,,7.82,0.24,,7.28,-2.33,,,,,,,,,,,,,,,,0.63,,,-2.33,,,,,,0.53,,,,,,
5,8.5,4.61,-4.17,-5.39,1.36,1.6,7.04,4.61,-0.44,5.73,8.25,6.84,-3.93,7.23,-2.33,-9.66,2.72,-1.36,2.57,4.51,8.2,6.12,8.3,-1.26,7.77,1.89,-1.17,5.68,8.45,4.61,8.06,-9.47,7.28,5.68,2.48,3.2,-1.26,6.8,4.51,2.48,0.34,6.84,0.19,-8.74,5.24,6.31,8.06,1.26,4.51,-0.05,4.42,3.06,8.93,7.82,1.75,8.11,-8.06,-9.17,4.95,3.35,7.38,6.17,4.71,-2.28,7.38,4.56,7.14,4.22,3.01,3.83,,,,,,,4.13,,,,5.24,5.92,0.87,7.28,3.93,-0.63,6.31,4.71,2.82,2.96,5.19,5.58,4.27,5.19,5.73,1.55,3.11,6.55,1.8,1.6
6,-6.17,-3.54,0.44,-8.5,-7.09,-4.32,-8.69,-0.87,-6.65,-1.8,-6.8,-5.73,-5.0,-8.59,0.49,-8.93,-3.69,-2.18,-2.28,-6.12,-3.01,-0.58,-2.38,-7.77,-2.23,-2.28,-5.24,-5.53,-0.68,-6.94,-3.93,-0.34,-6.6,-5.49,-1.55,-0.49,-6.21,-8.06,-5.39,-7.04,-8.83,-7.91,1.07,-1.89,-6.36,-4.22,-9.27,-9.81,-7.82,-0.63,-6.31,-7.48,-6.07,-3.45,-3.2,0.53,-9.27,-9.56,-5.49,-1.12,-6.65,-2.86,-4.61,-9.42,-7.91,-9.81,-9.42,-8.98,-1.94,-6.0,-9.66,-5.19,-5.0,-4.42,-7.28,-6.5,-4.32,-6.94,-9.85,0.73,-6.21,-2.86,-7.62,-6.12,-0.29,-1.41,-3.93,-1.94,-5.68,-4.71,-3.54,-6.89,-0.68,-2.96,-2.18,-3.35,0.05,-9.08,-5.05,-3.45
7,,,,,8.59,-9.85,7.72,8.79,,,4.27,7.62,-6.26,2.96,6.07,-3.5,-2.09,6.17,5.15,4.42,5.63,2.43,,,4.13,7.09,7.33,7.18,0.92,,5.15,5.87,,,7.96,3.79,,1.55,,3.11,6.26,,,,6.0,3.2,,7.33,4.71,3.54,,2.28,8.5,9.22,,8.16,,,,,-1.5,5.78,,,8.93,8.5,,2.43,7.48,,,5.97,6.36,,,,,,,,,,3.83,,,,,,,,,,,,,2.33,,,,
8,6.84,3.16,9.17,-6.21,-8.16,-1.7,9.27,1.41,-5.19,-4.42,8.2,-7.86,-6.94,-7.96,0.29,-9.9,-7.09,-7.18,1.02,-0.29,-4.71,-7.43,1.12,-8.25,3.79,0.1,3.45,-3.45,7.57,-4.17,-0.44,9.27,5.83,-3.4,-5.44,1.6,-7.14,3.01,-6.07,5.68,-6.46,-4.42,-8.98,0.53,-1.26,-4.42,-8.93,8.93,-0.1,7.43,-4.51,-7.38,4.17,-9.03,-2.14,1.31,-5.68,-9.08,-6.21,2.48,-3.2,-6.12,-2.91,-9.22,-2.62,-7.33,-3.25,-4.22,-9.81,-2.09,-9.9,-3.79,0.49,-7.86,-9.85,1.8,-4.17,-2.38,-9.13,-8.88,3.54,-4.56,-5.44,-5.97,-9.9,-0.34,-4.13,-0.1,-0.24,-7.96,7.23,-1.12,-0.1,-5.68,-3.16,-3.35,2.14,-0.05,1.31,0.0
9,-3.79,-3.54,-9.42,-6.89,-8.74,-0.29,-5.29,-8.93,-7.86,-1.6,-2.91,-0.29,-4.85,-0.49,-8.74,-6.0,-8.74,-2.91,-3.35,-0.29,3.98,-1.6,-0.29,1.21,3.79,2.91,4.85,2.28,0.97,-0.29,6.36,0.53,-0.29,4.66,3.35,0.97,-3.11,-1.84,4.42,-4.42,-1.41,1.41,-5.34,-6.0,1.41,5.1,1.84,-0.29,0.53,-0.29,-6.0,-3.54,6.36,1.84,3.35,-0.29,-0.29,-5.29,-6.89,-5.29,1.21,5.53,1.41,-6.5,4.85,4.85,2.62,1.41,6.36,1.41,-4.76,4.17,-2.04,-6.31,-0.29,-0.29,0.78,-0.29,-0.29,-0.29,-0.29,-3.2,-0.29,-3.2,-3.4,-0.29,-0.29,5.73,4.56,-2.23,4.37,-0.29,4.17,-0.29,-0.29,-0.29,-0.29,-0.29,-3.4,-4.95
10,3.01,5.15,5.15,3.01,6.41,5.15,8.93,2.52,3.01,8.16,5.53,6.02,4.47,5.44,-4.66,-0.97,-0.44,1.55,0.49,4.37,5.44,5.24,6.6,3.01,6.02,6.8,2.91,5.15,3.5,5.15,6.6,-0.1,3.11,6.5,9.22,7.86,3.01,6.5,3.01,7.67,6.6,5.15,3.01,4.27,6.8,6.8,9.22,5.73,5.44,4.85,3.11,5.15,6.0,2.52,3.11,3.2,2.43,4.85,3.11,3.01,4.08,6.7,6.8,3.11,4.08,4.37,3.01,9.13,4.66,3.01,,,,,,,,,,,,,,,4.56,,,,,,,4.47,,,,,,,,


In [0]:
df = df.na.fill(value=0)

In [0]:
# List of numeric column headers (as strings)
joke_cols = [str(i) for i in range(1, 101)]  # '1' to '100'

# Build stack expression
stack_expr = "stack({}, {}) as (JokeID, Rating)".format(
    len(joke_cols),
    ", ".join([f"{j}, `{j}`" for j in joke_cols])
)

# Melt
melted_df = df.selectExpr("User", stack_expr)


melted_df = melted_df.withColumn("JokeID", col("JokeID").cast("int")) \
                     .withColumn("Rating", col("Rating").cast("double")) \
                    .withColumn("User", col("User").cast("int"))

melted_df.take(10)

[Row(User=1, JokeID=1, Rating=-7.82),
 Row(User=2, JokeID=1, Rating=4.08),
 Row(User=3, JokeID=1, Rating=None),
 Row(User=4, JokeID=1, Rating=None),
 Row(User=5, JokeID=1, Rating=8.5),
 Row(User=6, JokeID=1, Rating=-6.17),
 Row(User=7, JokeID=1, Rating=None),
 Row(User=8, JokeID=1, Rating=6.84),
 Row(User=9, JokeID=1, Rating=-3.79),
 Row(User=10, JokeID=1, Rating=3.01)]

### Alternating Least Squares
Alternating Least Squares (ALS) is a matrix factorization algorithm used primarily in recommendation systems to uncover hidden (latent) patterns between users and items, even when many ratings are missing. In this case, ALS works by fixing the user matrix, then solving for the joke matrix, then fixing the other, solving again, and repeating until both sides converge. This minimizes the regularized squared error on the set of known rating.

This is my regular python code from Project 3. 

In [0]:
def matrix_factorization_als(R, k=10, max_iter=100, reg=0.1, tol=1e-6):
        """
        Perform matrix factorization using Alternating Least Squares
        R: ratings matrix (sparse, 0 means missing)
        k: number of latent factors
        """
        n_users, n_jokes = R.shape
        
        # Initialize factors randomly
        U = np.random.normal(0, 0.1, (n_users, k))
        V = np.random.normal(0, 0.1, (n_jokes, k))
        
        # Get positions of known ratings
        known_ratings = R != 0
        
        prev_error = float('inf')
    
        for iteration in range(max_iter):
            # Update U (user factors)
            for i in range(n_users):
                rated_jokes = known_ratings[i, :]
                if np.any(rated_jokes):
                    V_rated = V[rated_jokes, :]
                    ratings = R[i, rated_jokes]
                    
                    # Solve: U[i] = argmin ||ratings - U[i] @ V_rated.T||^2 + reg * ||U[i]||^2
                    A = V_rated.T @ V_rated + reg * np.eye(k)
                    b = V_rated.T @ ratings
                    U[i, :] = np.linalg.solve(A, b)
            
            # Update V (movie factors)
            for j in range(n_jokes):
                rating_users = known_ratings[:, j]
                if np.any(rating_users):
                    U_rating = U[rating_users, :]
                    ratings = R[rating_users, j]
                    
                    # Solve: V[j] = argmin ||ratings - U_rating @ V[j]||^2 + reg * ||V[j]||^2
                    A = U_rating.T @ U_rating + reg * np.eye(k)
                    b = U_rating.T @ ratings
                    V[j, :] = np.linalg.solve(A, b)
            
            # Calculate error on known ratings
            predicted = U @ V.T
            error = np.mean((R[known_ratings] - predicted[known_ratings])**2)
            
            if iteration % 20 == 0:
                print(f"  Iteration {iteration}: RMSE = {np.sqrt(error):.4f}")
            
            if abs(prev_error - error) < tol:
                print(f"  Converged after {iteration} iterations")
                break
            
            prev_error = error
        
        return U, V, predicted, prev_error

In [0]:
(training, test) = df.randomSplit([0.8, 0.2])

In [0]:
U_als, V_als, predicted_als, prev_error = matrix_factorization_als(training.toPandas().values, k=5, max_iter=10)


  Iteration 0: RMSE = 414.3974


In [0]:
R = training.toPandas().values
mask = ~np.isnan(R)

# List of k values to try
k_values = [2, 5, 10, 20, 50, 100]
rmses = []
results = []


for k in k_values:
    U, V, predicted, prev_error = matrix_factorization_als(R, k=k, max_iter=101)
    rmses.append(prev_error)
    results.append((k, rmses))



  Iteration 0: RMSE = 257.7504
  Converged after 7 iterations
  Iteration 0: RMSE = 345.1388
  Iteration 20: RMSE = 3.8602
  Converged after 31 iterations
  Iteration 0: RMSE = 127.1116
  Iteration 20: RMSE = 3.5386
  Iteration 40: RMSE = 3.5353
  Iteration 60: RMSE = 3.5343
  Iteration 80: RMSE = 3.5338
  Iteration 100: RMSE = 3.5332
  Iteration 0: RMSE = 143.6021
  Iteration 20: RMSE = 3.0930
  Iteration 40: RMSE = 3.0875
  Iteration 60: RMSE = 3.0857
  Iteration 80: RMSE = 3.0850
  Iteration 100: RMSE = 3.0845
  Iteration 0: RMSE = 28.0838
  Iteration 20: RMSE = 2.0858
  Iteration 40: RMSE = 2.0276
  Iteration 60: RMSE = 1.9971
  Iteration 80: RMSE = 1.9785
  Iteration 100: RMSE = 1.9660
  Iteration 0: RMSE = 5.2439
  Iteration 20: RMSE = 0.2398
  Iteration 40: RMSE = 0.2116
  Iteration 60: RMSE = 0.2037
  Iteration 80: RMSE = 0.1990
  Iteration 100: RMSE = 0.1936


[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
File [0;32m<command-8996589371763644>, line 16[0m
[1;32m     13[0m     results[38;5;241m.[39mappend((k, rmses))
[1;32m     15[0m [38;5;28;01mfor[39;00m k, rmse [38;5;129;01min[39;00m results:
[0;32m---> 16[0m     [38;5;28mprint[39m([38;5;124mf[39m[38;5;124m"[39m[38;5;124mRank (k): [39m[38;5;132;01m{[39;00mk[38;5;132;01m}[39;00m[38;5;124m, RMSE: [39m[38;5;132;01m{[39;00mrmse[38;5;132;01m:[39;00m[38;5;124m.4f[39m[38;5;132;01m}[39;00m[38;5;124m"[39m)

[0;31mTypeError[0m: unsupported format string passed to list.__format__

In [0]:
i=0
for k, rmses in results:
    print(f"Rank (k): {k}, RMSE: {rmses[i]}")
    i+=1

Rank (k): 2, RMSE: 18.336145315825316
Rank (k): 5, RMSE: 14.901233438147964
Rank (k): 10, RMSE: 12.483706657623891
Rank (k): 20, RMSE: 9.514431391925017
Rank (k): 50, RMSE: 3.865035547753751
Rank (k): 100, RMSE: 0.03749579665492752


Error: _NotImplementedError: sc is not supported on serverless compute, consider using `spark` instead._

In [0]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

from pyspark.sql import SparkSession

# Start Spark session
spark = SparkSession.builder.getOrCreate()

# Load the data using DataFrame API
# Adjust delimiter if needed (e.g., '\t' for tab)
#df = spark.read.option("header", "false").schema("user INT, item INT, rating FLOAT").csv("dbfs:/data/mllib/als/test.data")

# Show a few rows
#df.take(10)

# Load and parse the data
#data = sc.textFile("data/mllib/als/test.data")

# Build the recommendation model using Alternating Least Squares
rank = 10
numIterations = 10
model = ALS.train(df, rank, numIterations)

# Evaluate the model on training data
testdata = df.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = df.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
print("Mean Squared Error = " + str(MSE))

# Save and load model
model.save(sc, "target/tmp/myCollaborativeFilter")
sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")

[0;31m---------------------------------------------------------------------------[0m
[0;31mTypeError[0m                                 Traceback (most recent call last)
File [0;32m<command-8996589371763653>, line 21[0m
[1;32m     19[0m rank [38;5;241m=[39m [38;5;241m10[39m
[1;32m     20[0m numIterations [38;5;241m=[39m [38;5;241m10[39m
[0;32m---> 21[0m model [38;5;241m=[39m ALS[38;5;241m.[39mtrain(df, rank, numIterations)
[1;32m     23[0m [38;5;66;03m# Evaluate the model on training data[39;00m
[1;32m     24[0m testdata [38;5;241m=[39m df[38;5;241m.[39mmap([38;5;28;01mlambda[39;00m p: (p[[38;5;241m0[39m], p[[38;5;241m1[39m]))

File [0;32m/databricks/python/lib/python3.11/site-packages/pyspark/mllib/recommendation.py:300[0m, in [0;36mALS.train[0;34m(cls, ratings, rank, iterations, lambda_, blocks, nonnegative, seed)[0m
[1;32m    253[0m [38;5;129m@classmethod[39m
[1;32m    254[0m [38;5;28;01mdef[39;00m [38;5;21mtrain[39m(
[1;32m   

### Unable to use the PySpark ALS
I tried using the pyspark.ml.recommendation.ALS but everytime it gave below error. The advice online is to switch to a non High Concurrency cluster? But i think with the Databricks free edition I cant choose that, im just on their default. Im unsure if if im implementing something wrong, or if its Databricks Free limitations, hopefully I will get some more clarity in next class

_P_y4JError: An error occurred while calling None.org.apache.spark.ml.recommendation.ALS. Trace:
py4j.security.Py4JSecurityException: Constructor public org.apache.spark.ml.recommendation.ALS(java.lang.String) is not whitelisted.
	at py4j.security.WhitelistingPy4JSecurityManager.checkConstructor(WhitelistingPy4JSecurityManager.java:451)
	at py4j.Gateway.invoke(Gateway.java:256)__

In [0]:
#als = ALS(userCol="User", itemCol="JokeID", ratingCol="Rating", rank=10, seed=0, coldStartStrategy="drop")
#model = als.fit(training)
#predictions = model.transform(test)
#display(predictions)

In [0]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col
import numpy as np

def matrix_factorization_als_pyspark(df_ratings, k=10, max_iter=100, reg=0.1, tol=1e-6):
    """
    Perform matrix factorization using PySpark ALS
    df_ratings: Spark DataFrame with columns (user, item, rating)
    k: number of latent factors (rank)
    """
    spark = SparkSession.builder.getOrCreate()

    # ALS setup
    als = ALS(
        userCol="User",
        itemCol="JokeID",
        ratingCol="Rating",
        rank=k,
        maxIter=max_iter,
        regParam=reg,
        coldStartStrategy="drop",  # drops NaNs from unseen items/users
        nonnegative=True
    )

    # Train ALS model
    model = als.fit(df_ratings)

    # Predict on known ratings to evaluate reconstruction
    predictions = model.transform(df_ratings)

    # Evaluate using RMSE
    evaluator = RegressionEvaluator(
        metricName="rmse",
        labelCol="rating",
        predictionCol="prediction"
    )
    rmse = evaluator.evaluate(predictions)
    print(f"Final RMSE on known ratings: {rmse:.4f}")

    # Return user and item factors as DataFrames
    user_factors = model.userFactors
    item_factors = model.itemFactors

    return model, user_factors, item_factors, rmse


In [0]:
# Run ALS
model, U, V, error = matrix_factorization_als_pyspark(melted_df, k=10, max_iter=20, reg=0.1)

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JError[0m                                 Traceback (most recent call last)
File [0;32m<command-8996589371763650>, line 2[0m
[1;32m      1[0m [38;5;66;03m# Run ALS[39;00m
[0;32m----> 2[0m model, U, V, error [38;5;241m=[39m matrix_factorization_als_pyspark(melted_df, k[38;5;241m=[39m[38;5;241m10[39m, max_iter[38;5;241m=[39m[38;5;241m20[39m, reg[38;5;241m=[39m[38;5;241m0.1[39m)

File [0;32m<command-8996589371763649>, line 16[0m, in [0;36mmatrix_factorization_als_pyspark[0;34m(df_ratings, k, max_iter, reg, tol)[0m
[1;32m     13[0m spark [38;5;241m=[39m SparkSession[38;5;241m.[39mbuilder[38;5;241m.[39mgetOrCreate()
[1;32m     15[0m [38;5;66;03m# ALS setup[39;00m
[0;32m---> 16[0m als [38;5;241m=[39m ALS(
[1;32m     17[0m     userCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mUser[39m[38;5;124m"[39m,
[1;32m     18[0m     itemCol[38;5;241m=[39m

## Comparing Original and Spark 

The original (NumPy ALS)  is Single-node. In this type of cluster, all operations, such as data storage, processing, and system management, take place on the single node

The new one (PySpark ALS) is Distributed and runs on a cluster of machines. This cluster typically consists of a driver node and multiple worker nodes (or executor nodes). Each worker node represents a distributed node. The data and computations are distributed across these worker nodes. Each node processes a portion of the data in parallel, significantly accelerating the processing of large datasets.


**Efficiency Improvements with Spark**

- Horizontal Scalability: can process datasets with millions of users and items using multiple nodes.
- Sparse Matrix Handling: Spark’s ALS is optimized for sparse data—memory and computation are efficient.
- Built-in Parallelism: Matrix updates for users and items are distributed across nodes, speeding up each iteration.
- Data Pipeline Integration: Spark makes it easy to integrate recommender training with ETL and streaming pipelines.
- Batch Prediction Efficiency: Predictions over large user-item pairs are distributed automatically.
 
**Challenges with using Spark**

- Overhead for Small/Mid-size Data: For datasets that fit comfortably in memory, Spark introduces unnecessary complexity and slower execution due to job scheduling and I/O overhead.
- Increased Implementation Complexity: Debugging, tuning, and managing Spark jobs require specialized knowledge of distributed systems and cluster configuration.
- Higher Resource Cost: Running distributed workloads involves cluster provisioning, compute costs, and potentially higher DevOps overhead compared to single-node environments.

### Comparing Speed:
The NumPY ALS should be faster for smaller dataset due to its low overhead, but as datasets increase in size it may slow down or even crash with memory overflow.
The PySpark ALS should be able to handle large sparse matrices (millions of users/items) efficiently in ways the single node could not.

Our Jokes Data from 24,983 users who have rated 36 or more jokes, a matrix with dimensions 24983 X 101, has ~2.5 million ratings. Previously i had to select a small subset for the single node to be able to run, the PySpark should be able to handle the entire df.

##### Benchmarking the NumPy ALS (~20 secs)

In [0]:
import time
start = time.time()
U, V, pred, error = matrix_factorization_als(R, k=10, max_iter=20, reg=0.1)
end = time.time()
print(f"NumPy ALS RMSE: {np.sqrt(error):.4f}, Training time: {end - start:.2f} sec")


  Iteration 0: RMSE = 200.2050
NumPy ALS RMSE: 3.5377, Training time: 21.09 sec


##### Benchmarking the PySpark ALS (~x secs)

In [0]:
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
import time

# Start timer
start = time.time()

# Train Spark ALS
als = ALS(userCol="User", itemCol="JokeID", ratingCol="Rating",
          rank=10, maxIter=20, regParam=0.1, coldStartStrategy="drop")
model = als.fit(training)

# Evaluate
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="Rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)

end = time.time()
print(f"Spark ALS RMSE: {rmse:.4f}, Training time: {end - start:.2f} sec")

[0;31m---------------------------------------------------------------------------[0m
[0;31mPy4JError[0m                                 Traceback (most recent call last)
File [0;32m<command-6939895423805157>, line 9[0m
[1;32m      6[0m start [38;5;241m=[39m time[38;5;241m.[39mtime()
[1;32m      8[0m [38;5;66;03m# Train Spark ALS[39;00m
[0;32m----> 9[0m als [38;5;241m=[39m ALS(userCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mUser[39m[38;5;124m"[39m, itemCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mJokeID[39m[38;5;124m"[39m, ratingCol[38;5;241m=[39m[38;5;124m"[39m[38;5;124mRating[39m[38;5;124m"[39m,
[1;32m     10[0m           rank[38;5;241m=[39m[38;5;241m10[39m, maxIter[38;5;241m=[39m[38;5;241m20[39m, regParam[38;5;241m=[39m[38;5;241m0.1[39m, coldStartStrategy[38;5;241m=[39m[38;5;124m"[39m[38;5;124mdrop[39m[38;5;124m"[39m)
[1;32m     11[0m model [38;5;241m=[39m als[38;5;241m.[39mfit(training)
[1;32m     13[0m [38;5;66

#####Size in memory (19.25 MB)

In [0]:
row_count = df.count()
row_size_bytes = 8 * len(df.columns)  # Approx. 8 bytes per column
estimated_size_mb = (row_count * row_size_bytes) / (1024 ** 2)
print(f"Estimated size: {estimated_size_mb:.2f} MB")

Estimated size: 19.25 MB


#### Our current Data
2.5M ratings × 3 values (24,983 users, 100 jokes, rating) = ~20 MB (could be more wiht different data types). Which is really comparitivity small. It will easily fits in memory on a single machine still.

ALS is iterative and can be computationally intensive, but with only 101 items (jokes), the item-factor matrix is still small. NumPy should be able to handle this efficiently without distributed computing.

On this scale, matrix factorization using NumPy completes in seconds (seen above) to a maybe a few minutes (if we increased the training set size), depending on iterations and regularization. There's currently no major training-time bottleneck that justifies moving to Spark.

#### What point would you see moving to a distributed platform such as Spark becoming necessary? 

If we were to scale this up considerablely, we may need the distributed platform like Spark. We could expand the dataset to 10–50 million users. include more jokes (100 up to maybe 10,000). If we were adding additional timestamps, contexts, feedback types (making it multidimensional).

