## 1. Load User Data from HDFS

In [None]:
%pyspark

# Read u.user from HDFS
user_rdd = sc.textFile("hdfs:///user/maria_dev/movielens/u.user")

# Parse into structured data
from pyspark.sql import Row

user_df = user_rdd.map(lambda line: line.split("|")) \
    .map(lambda parts: Row(
        userid=int(parts[0]),
        age=int(parts[1]),
        gender=parts[2],
        occupation=parts[3],
        zipcode=parts[4])
    ).toDF()

# Display the first few records
user_df.show(5)

## 2. Rename Columns and Write to Cassandra

In [None]:
%pyspark

# Change the column names from userid → user_id, zipcode → zip
renamed_df = user_df \
    .withColumnRenamed("userid", "user_id") \
    .withColumnRenamed("zipcode", "zip")

# Writing to Cassandra
renamed_df.write \
    .format("org.apache.spark.sql.cassandra") \
    .mode("append") \
    .option("keyspace", "movielens") \
    .option("table", "users") \
    .option("spark.cassandra.connection.host", "127.0.0.1") \
    .save()

## 3. Load User Data from Cassandra

In [None]:
%pyspark

df_from_cassandra = spark.read \
    .format("org.apache.spark.sql.cassandra") \
    .option("keyspace", "movielens") \
    .option("table", "users") \
    .option("spark.cassandra.connection.host", "127.0.0.1") \
    .load()

df_from_cassandra.show(5)

## 4. Load and Parse Rating Data

In [None]:
%pyspark


# 1. Read the rating data u.data from HDFS
rating_rdd = sc.textFile("hdfs:///user/maria_dev/movielens/u.data")

# 2. Convert to structured Row
from pyspark.sql import Row

rating_df = rating_rdd.map(lambda line: line.split("\t")) \
    .map(lambda parts: Row(
        user_id=int(parts[0]),
        item_id=int(parts[1]),
        rating=int(parts[2]))
    ).toDF()

# 3. Display the first few rating data
rating_df.show(5)


## 5. Average Rating per Movie

In [None]:
%pyspark

from pyspark.sql.functions import avg

# Group by item_id and calculate the average rating
avg_ratings_df = rating_df.groupBy("item_id").agg(avg("rating").alias("avg_rating"))

# Display the first 10 results
avg_ratings_df.show(10)


## 6. Load Movie Titles from u.item

In [None]:
%pyspark

# Read u.item file
item_rdd = sc.textFile("hdfs:///user/maria_dev/movielens/u.item")

# Take the first two fields: movie_id and title
from pyspark.sql import Row

item_df = item_rdd.map(lambda line: line.split("|")) \
    .map(lambda parts: Row(
        item_id=int(parts[0]),
        title=parts[1])
    ).toDF()

item_df.show(5)

## 7. Join Movie Titles with Average Ratings

In [None]:
%pyspark

# Join the average movie rating with the movie name
joined_df = avg_ratings_df.join(item_df, on="item_id")

# Display the first 10 results
joined_df.select("title", "avg_rating").show(10, truncate=False)


## 8. Top 10 Highest Rated Movies

In [None]:
%pyspark

# Top 10 highest rated movies
top10_movies = joined_df.select("title", "avg_rating") \
    .orderBy("avg_rating", ascending=False)

top10_movies.show(10, truncate=False)


## 9. Movies with ≥ 50 Ratings

In [None]:
%pyspark

from pyspark.sql.functions import avg, count

# Find the average rating and number of ratings for each movie
movie_stats_df = rating_df.groupBy("item_id") \
    .agg(
        avg("rating").alias("avg_rating"),
        count("rating").alias("num_ratings")
    )

# Add movie name
movie_stats_with_title = movie_stats_df.join(item_df, on="item_id")

# Display the top 10 movies with the highest ratings and times >= 50
movie_stats_with_title.filter("num_ratings >= 50") \
    .orderBy("avg_rating", ascending=False) \
    .select("title", "avg_rating", "num_ratings") \
    .show(10, truncate=False)

## 10. Active Users with ≥ 50 Ratings

In [None]:
%pyspark

from pyspark.sql.functions import count

# Number of movies rated by each user
user_rating_count = rating_df.groupBy("user_id") \
    .agg(count("item_id").alias("num_rated"))

# Only keep users with a rating ≥ 50
active_users = user_rating_count.filter("num_rated >= 50")

active_users.show(5)

## 11. Extract Movie Genres and Expand

In [None]:
%pyspark

# Names of all movie genres (MovieLens 19 genres)s）
genre_list = [
    "unknown", "Action", "Adventure", "Animation", "Children's", "Comedy", "Crime",
    "Documentary", "Drama", "Fantasy", "Film-Noir", "Horror", "Musical", "Mystery",
    "Romance", "Sci-Fi", "Thriller", "War", "Western"
]

# Read u.item, extract item_id and type flag
item_rdd = sc.textFile("hdfs:///user/maria_dev/movielens/u.item")

# Split one-hot types into multiple lines: (item_id, genre) format
genre_expanded_rdd = item_rdd.map(lambda line: line.split("|")) \
    .flatMap(lambda parts: [
        (int(parts[0]), genre_list[i]) 
        for i in range(19) if parts[5+i] == "1"
    ])

# Convert to DataFrame: column names are item_id, genre
genre_df = genre_expanded_rdd.toDF(["item_id", "genre"])

genre_df.show(5)

## 12. Most Preferred Genre per User

In [None]:
%pyspark

from pyspark.sql.functions import count, row_number
from pyspark.sql.window import Window

# Step 5: Join the rating data with the movie genre
user_genre_df = rating_df.join(genre_df, on="item_id")

# Step 6: Only keep users with a rating ≥ 50
filtered_user_genre = user_genre_df.join(active_users, on="user_id")

# Count the number of times each user rates each type
user_genre_count = filtered_user_genre.groupBy("user_id", "genre") \
    .agg(count("*").alias("genre_count"))

# Use window functions to find the most rated types for each user
window_spec = Window.partitionBy("user_id").orderBy(user_genre_count["genre_count"].desc())

top_genre_per_user = user_genre_count \
    .withColumn("rank", row_number().over(window_spec)) \
    .filter("rank = 1") \
    .select("user_id", "genre", "genre_count")

top_genre_per_user.show(10, truncate=False)

In [None]:
13. Users Younger than 20