<a href="https://colab.research.google.com/github/AhmedBaari/Big-Data-Analytics/blob/main/10%20-%20Collaborative%20Filtering.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Experiment 10: Collaborative Filtering using Jaccard and Cosine Similarity with PySpark


## AIM
To implement collaborative filtering using Jaccard and Cosine similarity measures with PySpark for parallel computation.

## 1. Import Libraries and Start Spark

In [None]:
# Import required libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, collect_set, size, array_intersect, array_union
from pyspark.sql.types import *
from pyspark.sql.functions import udf
import urllib.request
import zipfile
import math

# Start Spark
spark = SparkSession.builder \
    .appName("CollaborativeFiltering") \
    .getOrCreate()

print("Spark started successfully!")

Spark started successfully!


## 2. Download and Load MovieLens Dataset
In the lab exam, dataset will be provided locally, so this cell can be ignored.

In [None]:
# Download MovieLens dataset
print("Downloading MovieLens dataset...")
url = "http://files.grouplens.org/datasets/movielens/ml-100k.zip"
urllib.request.urlretrieve(url, "ml-100k.zip")

# Extract the zip file
with zipfile.ZipFile("ml-100k.zip", 'r') as zip_ref:
    zip_ref.extractall()

print("Dataset downloaded and extracted!")

Downloading MovieLens dataset...
Dataset downloaded and extracted!


We'll use the below method of loading the dataset for our exam

In [None]:
# Load ratings data
schema = StructType([
    StructField("userId", IntegerType(), True),
    StructField("movieId", IntegerType(), True),
    StructField("rating", FloatType(), True),
    StructField("timestamp", LongType(), True)
])

ratings = spark.read.csv("ml-100k/u.data", sep="\t", schema=schema)
ratings = ratings.drop("timestamp") # Remove timestamp as we don't need it

print(f"Loaded {ratings.count()} movie ratings")
ratings.show(5)

Loaded 100000 movie ratings
+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|   196|    242|   3.0|
|   186|    302|   3.0|
|    22|    377|   1.0|
|   244|     51|   2.0|
|   166|    346|   1.0|
+------+-------+------+
only showing top 5 rows



## 3. Prepare Data for Similarity Calculation

In [None]:
# Create user preference sets (movies with rating >= 3)
user_preferences = ratings.filter(col("rating") >= 3) \
    .groupBy("userId") \
    .agg(collect_set("movieId").alias("liked_movies"))

print("Created user preference sets for similarity calculation")
user_preferences.show(5)

Created user preference sets for similarity calculation
+------+--------------------+
|userId|        liked_movies|
+------+--------------------+
|     1|[212, 183, 52, 13...|
|     2|[306, 285, 277, 3...|
|     3|[328, 307, 329, 3...|
|     4|[356, 357, 300, 3...|
|     5|[233, 102, 204, 1...|
+------+--------------------+
only showing top 5 rows



## 4. Implement Jaccard Similarity with PySpark

In [None]:
00# Define Jaccard similarity function
def jaccard_similarity(set1, set2):
    if not set1 or not set2:
        return 0.0

    intersection = len(set(set1).intersection(set(set2)))
    union = len(set(set1).union(set(set2)))

    return float(intersection) / union if union > 0 else 0.0

# Register UDF for parallel execution
jaccard_udf = udf(jaccard_similarity, FloatType())

print("Jaccard similarity UDF created successfully!")

Jaccard similarity UDF created successfully!


## 5. Calculate User Similarities using Parallel Processing

In [None]:
# Create user pairs and calculate Jaccard similarity in parallel
user_pairs = user_preferences.alias("u1").crossJoin(
    user_preferences.alias("u2")
).filter(col("u1.userId") < col("u2.userId"))  # Avoid duplicate pairs

# Calculate Jaccard similarities using PySpark for parallelization
similarities = user_pairs.select(
    col("u1.userId").alias("user1"),
    col("u2.userId").alias("user2"),
    jaccard_udf(col("u1.liked_movies"), col("u2.liked_movies")).alias("jaccard_similarity")
)

# Filter significant similarities
significant_similarities = similarities.filter(col("jaccard_similarity") > 0.2)

print("Calculated similarities using parallel processing:")
significant_similarities.orderBy(col("jaccard_similarity").desc()).show(5)

Calculated similarities using parallel processing:
+-----+-----+------------------+
|user1|user2|jaccard_similarity|
+-----+-----+------------------+
|  408|  898|         0.6896552|
|  328|  788|         0.6548043|
|  554|  764|         0.5217391|
|  674|  879|         0.5121951|
|  600|  826|               0.5|
+-----+-----+------------------+
only showing top 5 rows



## 6. Implement Cosine Similarity with PySpark

In [None]:
# Define Cosine similarity function
def cosine_similarity(set1, set2):
    if not set1 or not set2:
        return 0.0

    intersection = len(set(set1).intersection(set(set2)))
    magnitude1 = math.sqrt(len(set1))
    magnitude2 = math.sqrt(len(set2))

    return float(intersection) / (magnitude1 * magnitude2) if magnitude1 * magnitude2 > 0 else 0.0

# Register Cosine UDF for parallel execution
cosine_udf = udf(cosine_similarity, FloatType())

# Calculate Cosine similarities in parallel
cosine_similarities = user_pairs.select(
    col("u1.userId").alias("user1"),
    col("u2.userId").alias("user2"),
    cosine_udf(col("u1.liked_movies"), col("u2.liked_movies")).alias("cosine_similarity")
)

# Filter significant cosine similarities
significant_cosine = cosine_similarities.filter(col("cosine_similarity") > 0.3)

print("Cosine similarity UDF created successfully!")
significant_cosine.orderBy(col("cosine_similarity").desc()).show(5)

Cosine similarity UDF created successfully!
+-----+-----+-----------------+
|user1|user2|cosine_similarity|
+-----+-----+-----------------+
|  408|  898|        0.8164966|
|  328|  788|        0.7936492|
|  674|  879|       0.69047576|
|  554|  764|        0.6859943|
|  600|  826|       0.67951584|
+-----+-----+-----------------+
only showing top 5 rows



## 7. Generate Recommendations

In [None]:
print("=== COLLABORATIVE FILTERING RESULTS ===")
print("✓ Implemented Jaccard and Cosine similarity measures")
print("✓ Used PySpark for parallel computation of similarities")
print("✓ Processed user preferences efficiently across multiple cores")

# Show sample similar users using Jaccard similarity
print("\nTop similar user pairs (Jaccard):")
significant_similarities.orderBy(col("jaccard_similarity").desc()).show(3)

print("\nTop similar user pairs (Cosine):")
significant_cosine.orderBy(col("cosine_similarity").desc()).show(3)

=== COLLABORATIVE FILTERING RESULTS ===
✓ Implemented Jaccard and Cosine similarity measures
✓ Used PySpark for parallel computation of similarities
✓ Processed user preferences efficiently across multiple cores

Top similar user pairs (Jaccard):
+-----+-----+------------------+
|user1|user2|jaccard_similarity|
+-----+-----+------------------+
|  408|  898|         0.6896552|
|  328|  788|         0.6548043|
|  554|  764|         0.5217391|
+-----+-----+------------------+
only showing top 3 rows


Top similar user pairs (Cosine):
+-----+-----+-----------------+
|user1|user2|cosine_similarity|
+-----+-----+-----------------+
|  408|  898|        0.8164966|
|  328|  788|        0.7936492|
|  674|  879|       0.69047576|
+-----+-----+-----------------+
only showing top 3 rows



## 8. Clean Up

In [None]:
# Stop Spark
spark.stop()
print("Experiment completed successfully!")

Experiment completed successfully!


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, collect_set, udf
from pyspark.sql.types import FloatType
import math

# Start Spark
spark = SparkSession.builder.appName("CollaborativeFiltering").getOrCreate()

# Load ratings data
ratings = spark.read.csv("ml-100k/u.data", sep="\t",
                         schema="userId INT, movieId INT, rating FLOAT, timestamp LONG") \
                  .drop("timestamp")

# Create user preference sets (movies with rating >= 3)
user_prefs = ratings.filter(col("rating") >= 3) \
                   .groupBy("userId") \
                   .agg(collect_set("movieId").alias("liked_movies"))

# Jaccard Similarity UDF
jaccard_udf = udf(lambda s1, s2: len(set(s1) & set(s2)) / len(set(s1) | set(s2))
                  if s1 and s2 else 0.0, FloatType())

# Cosine Similarity UDF
cosine_udf = udf(lambda s1, s2: len(set(s1) & set(s2)) / (math.sqrt(len(s1)) * math.sqrt(len(s2)))
                 if s1 and s2 else 0.0, FloatType())

# Calculate similarities in parallel
u1, u2 = user_prefs.alias("u1"), user_prefs.alias("u2")
pairs = u1.crossJoin(u2).filter(col("u1.userId") < col("u2.userId"))

similarities = pairs.select(
    col("u1.userId").alias("user1"),
    col("u2.userId").alias("user2"),
    jaccard_udf(col("u1.liked_movies"), col("u2.liked_movies")).alias("jaccard_similarity"),
    cosine_udf(col("u1.liked_movies"), col("u2.liked_movies")).alias("cosine_similarity")
)

# Show top similarities
print("Top Jaccard similarities:")
similarities.orderBy(col("jaccard_similarity").desc()).show(5)

print("Top Cosine similarities:")
similarities.orderBy(col("cosine_similarity").desc()).show(5)

spark.stop()

AnalysisException: [PATH_NOT_FOUND] Path does not exist: file:/content/ml-100k/u.data.