# Collaborative Filtering Recommendation Engine with Spark MLlib on EMR Studio

This notebook demonstrates how to build a simple collaborative filtering movie recommender system using the Alternating Least Squares (ALS) algorithm in Spark MLlib. This notebook is designed to be run on Amazon EMR Studio, but works in any Spark-enabled Jupyter environment.


## 1. Initialize Spark Session

Make sure your notebook is attached to a running kernel with PySpark.

In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("Collaborative Filtering Recommendation ASL").getOrCreate()
spark

VBox()

Starting Spark application


ID,Kind,State,Spark UI,Driver log,User,Current session?
0,pyspark,idle,Link,,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7fa039795f90>

## 2. Load Ratings Data from S3

Replace the S3 path below with your dataset location. The CSV should have columns like `user_id`, `item_id`, `rating`.

In [2]:
from pyspark.sql.types import StructType, StructField, IntegerType

s3_path = "s3://emr-studio-blog/data/rating.csv"  # <-- Change this one to your csv data in your S3 bucket!

schema = StructType(
    [
        StructField("user_id", IntegerType(), True),
        StructField("item_id", IntegerType(), True),
        StructField("rating", IntegerType(), True),
    ]
)

# Load data from S3
ratings = (
    spark.read.option("header", "true")
    .schema(schema)
    .csv(s3_path)
)

print(f"Total ratings: {ratings.count()}")

ratings.show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Total ratings: 1000
+-------+-------+------+
|user_id|item_id|rating|
+-------+-------+------+
|      3|      3|     4|
|      3|     19|     5|
|      3|     15|     2|
|      6|      3|     4|
|      5|     10|     4|
|      4|     11|     4|
|      5|     16|     2|
|      9|      3|     5|
|      9|     11|     1|
|      6|     10|     1|
+-------+-------+------+
only showing top 10 rows

## 3. Split Data into Training and Test Sets

80% train, 20% test

In [3]:
train, test = ratings.randomSplit([0.8, 0.2])

print(f"Training set size: {train.count()}\nTest set size: {test.count()}")

train.cache()
test.cache()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Training set size: 818
Test set size: 182
DataFrame[user_id: int, item_id: int, rating: int]

## 4. Train ALS Model

Train the model

In [4]:
from pyspark.ml.recommendation import ALS

als = ALS(
    maxIter=10,
    regParam=0.1,
    userCol="user_id",
    itemCol="item_id",
    ratingCol="rating",
    coldStartStrategy="drop"
)

print("Training ALS model...")
model = als.fit(train)
print("ALS model trained!")
model

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Training ALS model...
ALS model trained!
ALSModel: uid=ALS_156a9151b32c, rank=10

## 5.Try to predict with test set

In [5]:
predictions = model.transform(test)
predictions.show(10)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+-------+------+----------+
|user_id|item_id|rating|prediction|
+-------+-------+------+----------+
|      1|      2|     5| 2.9376748|
|      1|      4|     3| 2.5711138|
|      1|      5|     4| 2.3773007|
|      1|      6|     2| 3.2031214|
|      1|      8|     2| 2.4138818|
|      1|      8|     4| 2.4138818|
|      1|      9|     2|  2.848854|
|      1|     10|     2| 3.4588642|
|      1|     11|     4| 2.8927827|
|      1|     12|     2| 2.9755352|
+-------+-------+------+----------+
only showing top 10 rows

## 6. Evaluate the Model

In [6]:
from pyspark.ml.evaluation import RegressionEvaluator

# 1
evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction"
)

rmse = evaluator.evaluate(predictions)
print(f"Root Mean Square Error (RMSE): {rmse:.4f}")

# 2
mae_evaluator = RegressionEvaluator(
    metricName="mae",
    labelCol="rating",
    predictionCol="prediction"
)

mae = mae_evaluator.evaluate(predictions)
print(f"Mean Absolute Error (MAE): {mae:.4f}")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Root Mean Square Error (RMSE): 1.5291
Mean Absolute Error (MAE): 1.3266

## 7. Generate Top-N Recommendations for Each User

In [7]:
userRecs = model.recommendForAllUsers(10)
userRecs.show(5, truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|user_id|recommendations                                                                                                                                                      |
+-------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|10     |[{15, 3.5726023}, {10, 3.52049}, {6, 3.317073}, {3, 3.232316}, {11, 3.2248077}, {19, 3.2028217}, {8, 3.1894932}, {17, 3.1809297}, {9, 3.1416864}, {2, 3.1215622}]    |
|1      |[{14, 3.8033876}, {10, 3.4588642}, {15, 3.242338}, {6, 3.2031214}, {18, 3.0111866}, {12, 2.9755352}, {2, 2.9376748}, {19, 2.8976045}, {11, 2.8927827}, {13, 2.87652}]|
|2      |[{4, 3.7576337}, {13, 3.655106}, {5, 3.6065657}, {10, 3.559215}, {12, 3.4828944}, {19, 3.372758}, {8, 3.345009}

## 8. (Optional) Save Recommendations to S3

You can save the recommendations as a Parquet or JSON file for downstream use.

In [8]:
output_path = "s3://emr-studio-blog/result/"  # <-- Change this
userRecs.write.mode("overwrite").json(output_path)
print(f"Recommendations saved to: {output_path}")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Recommendations saved to: s3://emr-studio-blog/result/

## 9. Save model to S3


In [11]:
model_output_path = "s3://emr-studio-blog/model/"
model.write().overwrite().save(model_output_path)
print(f"Model saved to: {model_output_path}")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model saved to: s3://emr-studio-blog/model/

## 10. (Optional) Hyperparameter Tuning

In [None]:
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder

paramGrid = ParamGridBuilder() \
    .addGrid(als.regParam, [0.01, 0.1, 1.0]) \
    .addGrid(als.rank, [10, 50, 100]) \
    .addGrid(als.maxIter, [5, 10, 15]) \
    .build()

crossval = CrossValidator(
    estimator=als,
    estimatorParamMaps=paramGrid,
    evaluator=evaluator,
    numFolds=3
)

# Fit cross validator (this step may take some time)
print("Performing hyperparameter tuning...")
cvModel = crossval.fit(training)

# Get best model
best_model = cvModel.bestModel

print(f"Best parameters:")
print(f"Rank: {best_model.rank}")
print(f"RegParam: {best_model._java_obj.parent().getRegParam()}")
print(f"MaxIter: {best_model._java_obj.parent().getMaxIter()}")

## 11. (Optional) Model Persistence and Deployment

In [None]:
from pyspark.ml.recommendation import ALSModel

model_path = "s3://emr-studio-blog/model/" # <-- Change this
loaded_model = ALSModel.load(model_path)

# Create a recommendation service function
def batch_recommendations(user_ids, num_recommendations=10):
    """Generate recommendations for a batch of users"""
    users_df = spark.createDataFrame([(uid,) for uid in user_ids], ["user_id"])
    recommendations = loaded_model.recommendForUserSubset(users_df, num_recommendations)

    result = {}
    for row in recommendations.collect():
        user_id = row['user_id']
        recs = [(rec['item_id'], rec['rating']) for rec in row['recommendations']]
        result[user_id] = recs

    return result

## 12. (Optional) Monitoring and Evaluation Pipeline

In [None]:
def evaluate_model_performance(model, test_data):
    predictions = model.transform(test_data)

    rmse = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction").evaluate(predictions)
    mae = RegressionEvaluator(metricName="mae", labelCol="rating", predictionCol="prediction").evaluate(predictions)

    # Coverage metrics
    total_items = ratings_df.select("item_id").distinct().count()
    recommended_items = model.recommendForAllUsers(10).select(explode("recommendations.item_id")).distinct().count()
    coverage = recommended_items / total_items

    metrics = {
        "rmse": rmse,
        "mae": mae,
        "coverage": coverage,
        "timestamp": spark.sql("SELECT current_timestamp()").collect()[0][0]
    }

    return metrics

# Save metrics for tracking
metrics = evaluate_model_performance(best_model, test)
metrics_df = spark.createDataFrame([metrics])
metrics_df.write.mode("append").parquet("s3://emr-studio-blog/model-metrics/") # <-- Change this

## 13. Clean Up

When all done, stop your Spark session to release resources.

In [12]:
spark.stop()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

---
You’ve built a collaborative filtering recommender system using Spark MLlib on EMR Studio.