# Optimizing Collaborative Filtering with Spark ALS


This notebook implements a Collaborative Filtering recommendation system using Apache Spark’s ALS (Alternating Least Squares) algorithm. Given the inefficiency of SVD (Singular Value Decomposition) for large datasets, we leverage Spark to optimize performance and scalability. The workflow includes:
* Data loading & preprocessing with PySpark.
* Exploratory Data Analysis (EDA) to understand user-item interactions.
* Training & evaluating an ALS model to predict user preferences.
* Generating top-3 product recommendations for each user.

 Import necessary libraries:  PySpark offers distributed computing features that are critical for managing massive data processing.

In [1]:
# Install PySpark
!pip install pyspark

# Import required libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, count, mean , desc
import requests
import os

# Initialize ALS model
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import explode



## Spark Session Initialization
The first step in PySpark programming is to create a Spark session.  Our dispersed computations have a unified context thanks to this session.

In [2]:
# Initialize session
spark = SparkSession.builder.appName("Collaborative Filtering with ALS").getOrCreate()

# Data Loading and Integration


In [3]:
# Load Dataset
df = spark.read.csv("transactions.csv", header=True, inferSchema=True)

In [4]:
# Display schema
df.printSchema()

root
 |-- User: integer (nullable = true)
 |-- Product: integer (nullable = true)
 |-- Interaction: integer (nullable = true)
 |-- ProductName: string (nullable = true)



## Exploratory Data Analysis (EDA)


In [5]:
# Count total records
print("Total records:", df.count())

Total records: 1000000


In [6]:
# Summary statistics
print("\nSummary Statistics:")
df.describe().show()


Summary Statistics:
+-------+------------------+------------------+-----------------+--------------------+
|summary|              User|           Product|      Interaction|         ProductName|
+-------+------------------+------------------+-----------------+--------------------+
|  count|           1000000|           1000000|          1000000|             1000000|
|   mean|      50014.595087|          5.498436|         2.500161|                NULL|
| stddev|28864.277886609754|2.8710903498608187|1.117490591845948|                NULL|
|    min|                 1|                 1|                1|            Car Loan|
|    max|            100000|                10|                4|Travel Rewards Cr...|
+-------+------------------+------------------+-----------------+--------------------+



In [7]:
#count NULL values in each column
df.select([(count(col(c)) - count(col(c))).alias(f"missing_{c}") for c in df.columns]).show()

+------------+---------------+-------------------+-------------------+
|missing_User|missing_Product|missing_Interaction|missing_ProductName|
+------------+---------------+-------------------+-------------------+
|           0|              0|                  0|                  0|
+------------+---------------+-------------------+-------------------+



In [8]:
#counts non-null values for each column.
df.select([count(col(c)).alias(c) for c in df.columns]).show()

+-------+-------+-----------+-----------+
|   User|Product|Interaction|ProductName|
+-------+-------+-----------+-----------+
|1000000|1000000|    1000000|    1000000|
+-------+-------+-----------+-----------+




Since each column has 1,000,000 values, it means no missing values exist in the transactions data.

In [9]:
# Unique Users and Products
df.select(count("User").alias("Unique Users"), count("Product").alias("Unique Products")).show()

+------------+---------------+
|Unique Users|Unique Products|
+------------+---------------+
|     1000000|        1000000|
+------------+---------------+



In [10]:
# Distribution of Interactions (PySpark Aggregation)
df.groupBy("Interaction").count().orderBy("Interaction").show()

+-----------+------+
|Interaction| count|
+-----------+------+
|          1|249792|
|          2|249855|
|          3|250753|
|          4|249600|
+-----------+------+



In [11]:
# Most popular product
print("\nTop 5 Products by Interaction Count:")
df.groupBy("Product", "ProductName") \
    .count() \
    .orderBy(desc("count")) \
    .show(5)


Top 5 Products by Interaction Count:
+-------+--------------------+------+
|Product|         ProductName| count|
+-------+--------------------+------+
|      2|Cashback Credit Card|100789|
|      6|       Personal Loan|100307|
|      7|       Fixed Deposit|100080|
|      3| Student Credit Card|100039|
|      9|Mutual Fund - Hig...| 99917|
+-------+--------------------+------+
only showing top 5 rows



In [12]:
# Detecting Outliers using IQR Method
summary = df.selectExpr("percentile_approx(Interaction, 0.25) as Q1", "percentile_approx(Interaction, 0.75) as Q3").collect()
Q1 = summary[0]['Q1']
Q3 = summary[0]['Q3']
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR

print(f"IQR Method: Lower Bound = {lower_bound}, Upper Bound = {upper_bound}")

outliers_iqr = df.filter((col("Interaction") < lower_bound) | (col("Interaction") > upper_bound))
print("Outliers detected using IQR method:")
outliers_iqr.show(10)

IQR Method: Lower Bound = 0.5, Upper Bound = 4.5
Outliers detected using IQR method:
+----+-------+-----------+-----------+
|User|Product|Interaction|ProductName|
+----+-------+-----------+-----------+
+----+-------+-----------+-----------+



# Proceed with ALS Processing
ALS requires numerical user IDs, product IDs, and interaction values.

In [13]:
# Rename columns
df= df.withColumnRenamed("User", "userId").withColumnRenamed("Product", "itemId").withColumnRenamed("Interaction", "rating")

In [14]:
# Drop missing values
df = df.dropna()

In [15]:
# Ensure correct data types
df= df.withColumn("userId", col("userId").cast("integer"))
df = df.withColumn("itemId", col("itemId").cast("integer"))
df = df.withColumn("rating", col("rating").cast("integer"))

In [16]:
# Train-test split (80-20)
(train, test) = df.randomSplit([0.8, 0.2], seed=42)

print("\nTraining Data Count:", train.count())
print("Test Data Count:", test.count())


Training Data Count: 799963
Test Data Count: 200037


## ALS (Alternating Least Squares ) Model Implementation
 ALS is particularly effective for large-scale recommendation systems due to its parallel computation capabilities.

In [17]:
# Configure and train ALS model (Number of iterations ,Regularization parameter,Number of latent factors,Handle new users/products )
als = ALS(
    userCol="userId", itemCol="itemId", ratingCol="rating",
    maxIter=10, regParam=0.1, rank=10,
    coldStartStrategy="drop"
)

In [18]:
# Train model
model = als.fit(train)

### Model Evaluation
Use Root Mean Square Error (RMSE), a common recommendation system statistic that gauges prediction accuracy, to assess our model's performance.

In [19]:
# Evaluate model
# Generate predictions
predictions = model.transform(test)

# Calculate RMSE
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print(f"\nRoot Mean Square Error (RMSE): {rmse}")


Root Mean Square Error (RMSE): 1.3136498946620652


## Generate top-3 recommendations for each user as a dataframe


In [20]:
# Generate top-3 recommendations
userRecs = model.recommendForAllUsers(3)

# Format recommendations
recommendations = userRecs.select(
    col("userId"),
    explode(col("recommendations")).alias("rec")
).select(
    "userId",
    col("rec.itemId").alias("recommended_item"),
    col("rec.rating").alias("predicted_interaction")
)

# Add product names
final_recommendations = recommendations.join(
    df.select("itemId", "ProductName").distinct(),
    recommendations.recommended_item == df.itemId
).select(
    "userId",
    "recommended_item",
    "ProductName",
    "predicted_interaction"
)

# Display sample recommendations
print("\nSample Recommendations:")
final_recommendations.show(5)

# Save recommendations
final_recommendations.coalesce(1).write \
    .mode("overwrite") \
    .option("header", "true") \
    .csv("recommendations")



Sample Recommendations:
+------+----------------+--------------------+---------------------+
|userId|recommended_item|         ProductName|predicted_interaction|
+------+----------------+--------------------+---------------------+
|     3|               1|Travel Rewards Cr...|             2.777319|
|     9|               1|Travel Rewards Cr...|            1.8132145|
|    15|               1|Travel Rewards Cr...|             2.421748|
|    26|               1|Travel Rewards Cr...|            2.8983932|
|    41|               1|Travel Rewards Cr...|            2.8494582|
+------+----------------+--------------------+---------------------+
only showing top 5 rows



## Cleanup and Session Termination
clean up resources and close the Spark session.

In [21]:
# Stop Spark session
spark.stop()

## Conclusion
In this project, we successfully optimized a Collaborative Filtering recommendation system using Apache Spark’s ALS (Alternating Least Squares) algorithm, overcoming the limitations of SVD (Singular Value Decomposition) for large-scale datasets.

### Key Achievements:
* Efficient Data Processing:

Instead of Pandas, we used PySpark to load and preprocess 1 million transactions, ensuring scalability.
Basic Exploratory Data Analysis (EDA) was performed to understand user-item interactions.
* Performance Improvement with ALS:

SVD was slow and memory-intensive, making it impractical for our large dataset.
ALS, designed for distributed computing, significantly improved scalability and efficiency on sparse user-item matrices.
* Model Evaluation & Recommendations:

Root Mean Square Error (RMSE): 1.31, indicating a good balance between accuracy and generalization.
Generated personalized recommendations for users, with predictions like:

| userId | recommended_item | ProductName                  | predicted_interaction |
|--------|-----------------|-----------------------------|-----------------------|
| 3      | 1               | Travel Rewards Credit Card  | 2.77                  |
| 9      | 1               | Travel Rewards Credit Card  | 1.81                  |

* Scalability & Real-World Application:

Spark ALS enabled parallel processing, making it practical for real-time recommendation systems.
The final recommendations were saved to a CSV, allowing seamless integration into business applications.

By leveraging Apache Spark’s ALS, we successfully optimized collaborative filtering for large-scale recommendation systems. This approach can be extended to real-world applications, such as personalized product recommendations, customer retention strategies, and targeted marketing campaigns.
