#    Assignment 3 (20%) 
# STQD6324 Data Management 
#  SEMESTER 2 2024/2025 

This notebook answers all parts of the assignment using Spark2 and MongoDB.  
We inserted the MovieLens 100k data into MongoDB, and used Spark to compute the following:

1. Calculate average ratings per movie.
2. Identify the top 10 highest rated movies.
3. Find the users who have rated at least 50 movies and identify their favourite movie 
genres. 
4. Find all the users who are less than 20 years old. 
5. Find all the users whose occupation is “scientist” and whose age is between 30 and 40 
years old. 

> **⚠️ Note Limitation :**
This notebook contains PySpark and MongoDB integration code. Due to environment limitations, the code cells cannot be executed directly in GitHub or Jupyter Notebook. However, all scripts were tested using `spark-submit` inside the HDP Sandbox, and results are shown below each cell for reference.

### Question i : Calculate the average rating for each movie

**Section 1: Insert Data into MongoDB**

We first inserted the u.user and u.data files into MongoDB using the script below:

In [None]:
vi mongo_insert.py

"
# -*- coding: utf-8 -*-
from pymongo import MongoClient

# Connect to MongoDB
client = MongoClient("mongodb://localhost:27017/")
db = client["movielens"]

# --- Insert u.user ---
db.users.drop()
with open("/home/maria_dev/ml-100k/u.user") as f:
    for line in f:
        uid, age, gender, occupation, zipc = line.strip().split("|")
        db.users.insert_one({
            "user_id": int(uid),
            "age": int(age),
            "gender": gender,
            "occupation": occupation,
            "zip_code": zipc
        })

# --- Insert u.data (ratings) ---
db.ratings.drop()
ratings_batch = []
with open("/home/maria_dev/ml-100k/u.data") as f:
    for line in f:
        uid, mid, rate, ts = line.strip().split("\t")
        ratings_batch.append({
            "user_id": int(uid),
            "movie_id": int(mid),
            "rating": int(rate),
            "timestamp": int(ts)
        })
db.ratings.insert_many(ratings_batch)

print("Data inserted into MongoDB.")
"

 Data *insertion* was verified via Mongo Shell:

In [None]:
> use movielens
switched to db movielens
> db.ratings.count()
100000
> db.users.findOne()
{
        "_id" : ObjectId("686280494707626e58a81ceb"),
        "user_id" : 1,
        "occupation" : "technician",
        "gender" : "M",
        "age" : 24,
        "zip_code" : "85711"
}
>

In [None]:
# -*- coding: utf-8 -*-
# spark_query_q1.py
from pyspark.sql import SparkSession

# Step 1: Create SparkSession with MongoDB config
spark = SparkSession.builder \
    .appName("Average Rating Per Movie") \
    .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/movielens.ratings") \
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/movielens.output") \
    .getOrCreate()

# Step 2: Load 'ratings' collection
ratings_df = spark.read.format("com.mongodb.spark.sql.DefaultSource") \
    .option("uri", "mongodb://127.0.0.1/movielens.ratings") \
    .load()

# Step 3: Load 'movies' collection (for movie titles)
movies_df = spark.read.format("com.mongodb.spark.sql.DefaultSource") \
    .option("uri", "mongodb://127.0.0.1/movielens.movies") \
    .load()

# Step 4: Join ratings with movies on movie_id
joined_df = ratings_df.join(movies_df, on="movie_id", how="inner")

# Step 5: Calculate average rating per movie
avg_ratings_df = joined_df.groupBy("movie_id", "title") \
    .avg("rating") \
    .withColumnRenamed("avg(rating)", "avg_rating")

# Step 6: Round the average ratings to 2 decimal places
from pyspark.sql.functions import round
avg_ratings_df = avg_ratings_df.withColumn("avg_rating", round(avg_ratings_df.avg_rating, 2))

# Step 7: Show result
avg_ratings_df.orderBy("movie_id").show(truncate=False)

# Step 8: Stop Spark
spark.stop()

-----------------------------------
spark-submit \
  --jars /home/maria_dev/mongo-spark-connector_2.11-2.3.2.jar,/home/maria_dev/.ivy2/jars/org.mongodb_mongo-java-driver-3.8.2.jar \
  spark_query_q1.py
-----------------------------------

+--------+----------------------------------------------------+----------+  
|movie_id|title                                               |avg_rating|  
+--------+----------------------------------------------------+----------+  
|1       |Toy Story (1995)                                    |3.88      |  
|2       |GoldenEye (1995)                                    |3.21      |  
|3       |Four Rooms (1995)                                   |3.03      |  
|4       |Get Shorty (1995)                                   |3.55      |    
|5       |Copycat (1995)                                      |3.3       |  
|6       |Shanghai Triad (Yao a yao yao dao waipo qiao) (1995)|3.58      |  
|7       |Twelve Monkeys (1995)                               |3.8       |    
|8       |Babe (1995)                                         |4.0       |  
|9       |Dead Man Walking (1995)                             |3.9       |  
|10      |Richard III (1995)                                  |3.83      |  
|11      |Seven (Se7en) (1995)                                |3.85      |  
|12      |Usual Suspects, The (1995)                          |4.39      |  
|13      |Mighty Aphrodite (1995)                             |3.42      |  
|14      |Postino, Il (1994)                                  |3.97      |  
|15      |Mr. Holland's Opus (1995)                           |3.78      |  
|16      |French Twist (Gazon maudit) (1995)                  |3.21      |  
|17      |From Dusk Till Dawn (1996)                          |3.12      |  
|18      |White Balloon, The (1995)                           |2.8       |  
|19      |Antonia's Line (1995)                               |3.96      |  
|20      |Angels and Insects (1995)                           |3.42      |  
+--------+----------------------------------------------------+----------+  
only showing top 20 rows  

## Question ii : Identify the top ten movies with the highest average ratings. 

**Section 1: Insert Movie Metadata**  
We inserted the u.item file containing movie metadata into MongoDB as follows:

In [None]:
✅ vi mongo_insert_item.py 

from pymongo import MongoClient
import codecs

client = MongoClient("mongodb://localhost:27017/")
db = client["movielens"]
collection = db["movies"]

with codecs.open("/home/maria_dev/ml-100k/u.item", "r", "ISO-8859-1") as file:
    for line in file:
        fields = line.strip().split('|')
        if len(fields) >= 3:
            doc = {
                "movie_id": int(fields[0]),
                "title": fields[1],
                "release_date": fields[2]
            }
            collection.insert_one(doc)

print("Done inserting u.item into MongoDB.")

**Section 2: Spark Query for Top Rated Movies**  
Then, we performed a join and sorted by descending average ratings using this script:

In [None]:
✅ vi spark_query_q2.py (Spark + MongoDB join)

from pyspark.sql import SparkSession
from pyspark.sql.functions import avg

# Spark session with correct MongoDB config
spark = SparkSession.builder \
    .appName("Q1_MovieAverageRatings") \
    .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/movielens.ratings") \
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/movielens.output") \
    .getOrCreate()

# Load ratings
ratings_df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()

# Load movies
movies_df = spark.read.format("com.mongodb.spark.sql.DefaultSource") \
    .option("uri", "mongodb://127.0.0.1/movielens.movies") \
    .load()

# Check schemas (debug)
print("📌 Ratings Schema:")
ratings_df.printSchema()
print("📌 Movies Schema:")
movies_df.printSchema()

# Join on movie_id
joined_df = ratings_df.join(movies_df, on="movie_id")

# Compute average rating and sort
avg_ratings = joined_df.groupBy("movie_id", "title") \
    .agg(avg("rating").alias("avg_rating")) \
    .orderBy("avg_rating", ascending=False) \
    .limit(10)

# Show results
avg_ratings.show(truncate=False)

# Stop session
spark.stop()

+--------+-------------------------------------------------+----------+  
|movie_id|title                                            |avg_rating|  
+--------+-------------------------------------------------+----------+  
|1599    |Someone Else's America (1995)                    |5.0       |   
|1293    |Star Kid (1997)                                  |5.0       |  
|1653    |Entertaining Angels: The Dorothy Day Story (1996)|5.0       |  
|1201    |Marlene Dietrich: Shadow and Light (1996)        |5.0       |  
|1189    |Prefontaine (1997)                               |5.0       |  
|1467    |Saint of Fort Washington, The (1993)             |5.0       |  
|1122    |They Made Me a Criminal (1939)                   |5.0       |  
|1500    |Santa with Muscles (1996)                        |5.0       |  
|1536    |Aiqing wansui (1994)                             |5.0       |  
|814     |Great Day in Harlem, A (1994)                    |5.0       |  
+--------+-------------------------------------------------+----------+   

## Question iii : Find the users who rated ≥ 50 movies and their favourite movie genres.

This question finds users who have rated at least 50 movies and identifies their most frequent movie genre.  
To do this, we joined the ratings and movies collections from MongoDB, exploded genre lists, counted them per user,  
and selected the top genre per user.

**Section 1: Insert Genre-Mapped Movies into MongoDB**

In [None]:
🐍 PySpark + Mongo

vi mongo_q3.py
"
# -*- coding: utf-8 -*-
from pymongo import MongoClient
import codecs 

# Connect to MongoDB
client = MongoClient("mongodb://localhost:27017/")
db = client["movielens"]

# --- Load genre index to name mapping from u.genre ---
genre_map = {}
with open("/home/maria_dev/ml-100k/u.genre") as f:
    for line in f:
        parts = line.strip().split("|")
        if len(parts) == 2:
            genre_name, genre_id = parts
            genre_map[int(genre_id)] = genre_name
        else:
            print("⚠️ Skipping malformed genre line:", repr(line.strip()))

# --- Insert u.item (movies) into MongoDB ---
db.movies.drop()
with codecs.open("/home/maria_dev/ml-100k/u.item", encoding="ISO-8859-1") as f:
    for line in f:
        parts = line.strip().split("|")
        if len(parts) >= 24:
            movie_id = int(parts[0])
            title = parts[1]
            release_date = parts[2]
            genre_flags = list(map(int, parts[5:24]))  # binary genre indicators
            genres = [genre_map[i] for i, flag in enumerate(genre_flags) if flag == 1]

            db.movies.insert_one({
                "movie_id": movie_id,
                "title": title,
                "release_date": release_date,
                "genres": genres
            })
        else:
            print("⚠️ Skipping malformed movie line:", repr(line.strip()))

print("✅ Movie data inserted with genre names.")
"

**Section 2: PySpark Query to Identify Favourite Genres**

In [None]:
✅ vi spark_query_q3.py
"
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col, count, row_number
from pyspark.sql.window import Window

# Create Spark session
spark = SparkSession.builder \
    .appName("Q3_UserFavoriteGenres") \
    .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/movielens.ratings") \
    .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/movielens.output") \
    .getOrCreate()

# Load ratings and movies
ratings_df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()
movies_df = spark.read.format("com.mongodb.spark.sql.DefaultSource") \
    .option("uri", "mongodb://127.0.0.1/movielens.movies") \
    .load()

# Join on movie_id
joined_df = ratings_df.join(movies_df, on="movie_id")

# Explode genres (each row becomes one genre)
exploded_df = joined_df.select("user_id", explode("genres").alias("genre"))

# Count how many times each user rated each genre
user_genre_counts = exploded_df.groupBy("user_id", "genre").agg(count("*").alias("genre_count"))

# Window function to rank genres per user
window_spec = Window.partitionBy("user_id").orderBy(col("genre_count").desc())

# Add rank column to get top genre(s)
ranked_genres = user_genre_counts.withColumn("rank", row_number().over(window_spec))

# Filter only top genre(s) per user
top_genres_per_user = ranked_genres.filter(col("rank") == 1)

# Now get users who rated at least 50 movies
user_rating_counts = ratings_df.groupBy("user_id").agg(count("*").alias("rating_count"))
active_users = user_rating_counts.filter(col("rating_count") >= 50)

# Join with top genres to get only active users' favorite genres
result = active_users.join(top_genres_per_user, on="user_id") \
    .select("user_id", "rating_count", "genre")

# Show result
result.orderBy("user_id").show(truncate=False)

# Stop Spark
spark.stop()
"

Here are the top genres per active user:

## Question iv : Find all the users who are less than 20 years old.  

- filters users from the MongoDB `users` collection where age is below 20.

**PySpark Query**

In [None]:
vi spark_query_q4.py

----------------------------------------------------------------------------------
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Find Users Under 20") \
    .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/movielens.users") \
    .getOrCreate()

# Load users collection from MongoDB
users_df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()

# Filter users under 20
young_users_df = users_df.filter(users_df.age < 20)

# Show result
young_users_df.show()

# Stop the session
spark.stop()


----------------------------------------------------------------------------------------

spark-submit \
  --jars mongo-spark-connector_2.11-2.3.2.jar,mongo-java-driver-3.12.10.jar \
  spark_q4.py

----------------------------------------------------------------------------------------


Result shows users below 20 years old:

## Question v : Find Scientists Aged Between 30 and 40   

- use Spark to filter users whose occupation is "scientist" and age is between 30 and 40.

**PySpark Query**

In [None]:
from pyspark.sql import SparkSession

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Find Scientists Between 30 and 40") \
    .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/movielens.users") \
    .getOrCreate()

# Load users collection from MongoDB
users_df = spark.read.format("com.mongodb.spark.sql.DefaultSource").load()

# Filter for scientists between age 30 and 40
scientist_users_df = users_df.filter(
    (users_df.occupation == "scientist") & 
    (users_df.age >= 30) & 
    (users_df.age <= 40)
)

# Show result
scientist_users_df.show()

# Stop Spark session
spark.stop()


Resulting scientists between age 30–40 : 

**Assignment Checklist**

| Requirement                               | Done?                                  |
| ----------------------------------------- | -------------------------------------- |
| 1. Python libraries for Spark2 + NoSQL    | ✅ Yes (`pymongo`, `SparkSession`)    |
| 2. Parse `u.user` into DB (simulate HDFS) | ✅ Yes (`mongo_insert.py`)            |
| 3. Load + create RDDs (if needed)         | ✅/optional (Spark auto-DF works too) |
| 4. Convert RDD → DataFrame                | ✅ Yes (or direct from Mongo)         |
| 5. Write DF into DB (used in Q1)          | ✅ Already done in `insert_many()`     |
| 6. Read back from DB into Spark DF        | ✅ `spark.read.format("mongo")`       |
