In [None]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.functions import col
import pandas as pd

In [None]:
spark = SparkSession.builder \
    .appName("Anime Recommendation System") \
    .getOrCreate()

In [None]:
from pyspark.ml.feature import MinMaxScaler, StringIndexer
from pyspark.sql.functions import col, min, max

# Scaling our "rating" column
scaler = MinMaxScaler(inputCol="rating", outputCol="scaled_score")
scaler_model = scaler.fit(df)
df = scaler_model.transform(df)

# Encoding categorical data
## Encoding user IDs
user_indexer = StringIndexer(inputCol="user_id", outputCol="user_encoded")
df = user_indexer.fit(df).transform(df)
num_users = df.select("user_id").distinct().count()

## Encoding anime IDs
anime_indexer = StringIndexer(inputCol="anime_id", outputCol="anime_encoded")
df = anime_indexer.fit(df).transform(df)
num_animes = df.select("anime_id").distinct().count()

# Printing dataset information
min_rating = df.select(min("rating")).collect()[0][0]
max_rating = df.select(max("rating")).collect()[0][0]
print("Number of unique users: {}, Number of unique anime: {}".format(num_users, num_animes))
print("Minimum rating: {}, Maximum rating: {}".format(min_rating, max_rating))

In [None]:
# Load the anime dataset
def load_anime_data(filepath):
    df = spark.read.csv(filepath, header=True, inferSchema=True)
    
    ratings_df = df.select("user_id", "anime_id", "rating")
    
    ratings_df = ratings_df.withColumn("rating", col("rating").cast("float"))
    
    return df, ratings_df

Split the data into training and test sets

In [None]:
def split_data(ratings_df, train_ratio=0.8):
    return ratings_df.randomSplit([train_ratio, 1 - train_ratio], seed=42)

Create & train ALS Model

We decided to use ALS model due to these given reasons:

ALS is a matrix factorization algorithm designed for large-scale recommendation systems. It’s particularly useful when:

- The dataset is sparse (i.e., most users have rated only a small fraction of all available items).
- There are implicit or explicit ratings (ALS can handle both).
- Scalability is a concern (Spark’s ALS is optimized for distributed computing).
- Given that the Anime Dataset (2023) consists of user ratings for anime titles, ALS is a strong choice because:

1. It can generalize well to unseen users and items by learning latent factors.
2. It works well with sparse data, which is common in recommendation problems.
3. It’s optimized for large datasets, making it a good fit for Spark.

Compared to other methods:
- User-based or item-based collaborative filtering (kNN-based methods) don’t scale well for large datasets.
- Content-based filtering doesn’t generalize well if metadata is missing or inconsistent.
- ALS balances scalability and predictive performance better than most traditional models.

In [None]:
def train_als_model(train_df, max_iter=10, reg_param=0.1, rank=10):
    als = ALS(
        maxIter=max_iter,
        regParam=reg_param,
        rank=rank,
        userCol="user_id",
        itemCol="anime_id",
        ratingCol="rating",
        coldStartStrategy="drop",  # Handle missing values by dropping them during evaluation
        nonnegative=True  # Constrain the factors to be non-negative
    )
    
    model = als.fit(train_df)
    
    return model

Model evaluation

In [None]:
def evaluate_model(model, test_df):
    predictions = model.transform(test_df)
    
    # Drop NaN values that might have been introduced
    predictions = predictions.na.drop()
    
    # Evaluate using RMSE
    evaluator = RegressionEvaluator(
        metricName="rmse",
        labelCol="rating",
        predictionCol="prediction"
    )
    rmse = evaluator.evaluate(predictions)
    
    return rmse, predictions

Generate recommendations for a user

Read anime-dataset

In [None]:
from pyspark.sql import SparkSession

# Initialize SparkSession if you haven't already
spark = SparkSession.builder.appName("AnimeRecommendation").getOrCreate()

spark_df = spark.read.option("header", "true") \
                           .option("inferSchema", "true") \
                           .csv("/kaggle/input/myanimelist-dataset/anime-dataset-2023.csv")

# Read the CSV file into a Spark DataFrame
spark_df_anime = spark.read.option("header", "true") \
                           .option("inferSchema", "true") \
                           .csv("/input/myanimelist-dataset/anime-dataset-2023.csv")

I want my model to recommend only those animes that have been rated by at least a certain number of users. This threshold helps ensure that the recommended anime titles have received a sufficient number of ratings.

In [None]:
popularity_threshold = 50
spark_df_anime = spark_df_anime.filter(spark_df_anime["Members"] >= popularity_threshold)

row_count = spark_df_anime.count()
column_count = len(spark_df_anime.columns)
print(f"DataFrame shape: ({row_count}, {column_count})")

spark_df_anime.show(3, truncate=False)

In [1]:
def get_user_recommendations(model, user_id, num_recommendations=10):
    # Get top N recommendations for the user
    user_recs = model.recommendForUserSubset(
        spark.createDataFrame([(user_id,)], ["user_id"]),
        num_recommendations
    )
    
    return user_recs


UBCF

Finding Similar Users: The `find_similar_users` function calculates the similarity between users using a weighted matrix and returns a dataframe of similar users.

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import numpy as np

def find_similar_users(item_input, n=10, return_dist=False, neg=False):
    try:
        index = item_input
        encoded_index = user_encoder.transform([index])[0]
        weights = user_weights
        dists = np.dot(weights, weights[encoded_index])
        sorted_dists = np.argsort(dists)
        n = n + 1
        
        if neg:
            closest = sorted_dists[:n]
        else:
            closest = sorted_dists[-n:]
        
        similarity_list = []
        for close in closest:
            similarity = dists[close]
            if isinstance(item_input, int):
                decoded_id = user_encoder.inverse_transform([close])[0]
                similarity_list.append((decoded_id, similarity))
        
        # Create a Spark DataFrame
        spark = SparkSession.builder.getOrCreate()
        schema = ["similar_users", "similarity"]
        df = spark.createDataFrame(similarity_list, schema=schema)
        
        return df.orderBy(col("similarity"), ascending=False)
    except Exception as e:
        print(f'\033[1m{item_input}\033[0m, Not Found in User list: {e}')
        return None

In [None]:
# Select a random user
ratings_per_user = spark_df.groupBy("user_id").count()  # Get number of ratings per user
random_user = ratings_per_user.filter(col("count") < 500).orderBy(rand()).select("user_id").limit(1).collect()[0][0]

# Find similar users to the random user
similar_users_df = find_similar_users(random_user, n=10, neg=False)

# Filter users with similarity > 0.4 and exclude the original user
similar_users_filtered = similar_users_df.filter((col("similarity") > 0.4) & (col("similar_users") != random_user))

# Show results
similar_users_filtered.show()


User preferences: The `get_user_preferences` function takes a user ID as input and retrieves the anime preferences of that user. It considers the top-rated animes by the user and analyzes the genres they prefer. The function also provides an option to plot a word cloud to visualize the preferred genres. The output is a dataframe containing the anime titles and their associated genres.

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from wordcloud import WordCloud
import matplotlib.pyplot as plt
from collections import Counter
import numpy as np

# Function to display a word cloud of preferred genres
def showWordCloud(all_genres):
    genres_cloud = WordCloud(width=700, height=400, background_color='white', colormap='gnuplot').generate_from_frequencies(all_genres)
    plt.figure(figsize=(10, 8))
    plt.imshow(genres_cloud, interpolation='bilinear')
    plt.axis('off')
    plt.show()

def get_user_preferences(user_id, spark_df, spark_df_anime, plot=False, verbose=0):
    # Filter for animes watched by user
    animes_watched_by_user = spark_df.filter(F.col('user_id') == user_id)
    
    # Check if user has watched any animes
    if animes_watched_by_user.count() == 0:
        print("User #{} has not watched any animes.".format(user_id))
        return spark.createDataFrame([], schema=StructType([
            StructField("Name", StringType(), True),
            StructField("Genres", StringType(), True)
        ]))
    
    # Calculate 75th percentile of user's ratings
    user_rating_percentile = animes_watched_by_user.stat.approxQuantile("rating", [0.75], 0.01)[0]
    
    animes_watched_by_user_top = animes_watched_by_user.filter(F.col("rating") >= user_rating_percentile)
    
    windowSpec = Window.orderBy(F.desc("rating"))
    top_animes_user_df = animes_watched_by_user_top.withColumn("rank", F.row_number().over(windowSpec)) \
                                                 .select("anime_id")
    
    top_animes_user = [row.anime_id for row in top_animes_user_df.collect()]
    
    anime_df_rows = spark_df_anime.filter(F.col("anime_id").isin(top_animes_user)) \
                                 .select("Name", "Genres")
    
    if verbose != 0:
        avg_rating = animes_watched_by_user.agg(F.mean("rating").alias("avg_rating")).collect()[0]["avg_rating"]
        
        print("User \033[1m{}\033[0m has watched {} anime(s) with an average rating of {:.1f}/10\n".format(
            user_id, animes_watched_by_user.count(), avg_rating
        ))
        print('\033[1m----- Preferred genres----- \033[0m\n')
    
    if plot:
        genres_df = anime_df_rows.select("Genres").collect()
        genres_list = []
        
        for row in genres_df:
            genres = row["Genres"]
            if genres and isinstance(genres, str):
                for genre in genres.split(','):
                    genres_list.append(genre.strip())
        
        showWordCloud(dict(Counter(genres_list)))
    
    return anime_df_rows

In [None]:
random_user = 12345  # Replace with your actual random_user value
user_pref = get_user_preferences(random_user, spark_df, spark_df_anime, plot=True, verbose=1)

user_pref.limit(5).show(truncate=False)

Generate recommendations for an anime

In [None]:
def get_anime_recommendations(model, anime_id, num_recommendations=10):
    # Get top N users
    anime_recs = model.recommendForItemSubset(
        spark.createDataFrame([(anime_id,)], ["anime_id"]),
        num_recommendations
    )
    
    return anime_recs

Recommend animes to the selected user based on the preferences of similar users. The `get_recommended_animes` function iterates through the list of similar users, retrieves their preferences, and identifies animes that are not present in the preferences of the selected user. It then generates a list of recommended animes along with their genres and a brief synopsis.

In [None]:
def get_recommended_animes(similar_users, user_pref, spark_df, spark_df_anime, n=10):
    recommended_animes = []
    anime_list = []
    
    # Extract similar users list (collect to driver)
    similar_user_ids = [int(user_id) for user_id in similar_users.select("similar_users").collect()[0]["similar_users"]]
    
    # Get user preferences for each similar user
    user_pref_names = [row["Name"] for row in user_pref.select("Name").collect()]
    
    for user_id in similar_user_ids:
        pref_list = get_user_preferences(int(user_id), spark_df, spark_df_anime)
        
        # Check if user has watched any animes
        if pref_list.count() > 0:
            # Filter out animes that the target user has already watched
            pref_list = pref_list.filter(~F.col("Name").isin(user_pref_names))
            
            # Collect anime names to driver
            anime_names = [row["Name"] for row in pref_list.select("Name").collect()]
            if anime_names:
                anime_list.append(anime_names)
    
    if len(anime_list) == 0:
        print("No anime recommendations available for the given users.")
        return spark.createDataFrame([], schema=StructType([
            StructField("n", IntegerType(), True),
            StructField("anime_name", StringType(), True),
            StructField("Genres", StringType(), True), 
            StructField("Synopsis", StringType(), True)
        ]))
    
    # This part needs to be done on the driver as it's a flattening operation
    # Convert to pandas for the value_counts operation
    anime_list_flat = [anime for sublist in anime_list for anime in sublist]
    anime_count_series = pd.Series(anime_list_flat).value_counts()
    sorted_list = anime_count_series.head(n)
    
    # Count occurrences of each anime in the entire dataset (using Spark)
    anime_count_df = spark_df.groupBy("anime_id").count()
    
    # Create a broadcast dictionary of anime counts for faster lookups
    anime_count_dict = {row["anime_id"]: row["count"] for row in anime_count_df.collect()}
    anime_count_broadcast = spark.sparkContext.broadcast(anime_count_dict)
    
    # Process each recommended anime
    for anime_name in sorted_list.index:
        if isinstance(anime_name, str):
            try:
                # Get anime details
                anime_row = spark_df_anime.filter(F.col("Name") == anime_name).first()
                
                if anime_row:
                    anime_id = anime_row["anime_id"]
                    english_name = anime_row["English name"]
                    name = english_name if english_name != "UNKNOWN" else anime_name
                    genre = anime_row["Genres"]
                    synopsis = anime_row["Synopsis"]
                    
                    # Get count of users who watched this anime
                    n_user_pref = anime_count_broadcast.value.get(anime_id, 0)
                    
                    recommended_animes.append({
                        "n": n_user_pref,
                        "anime_name": anime_name,
                        "Genres": genre,
                        "Synopsis": synopsis
                    })
            except Exception as e:
                print(f"Error processing anime {anime_name}: {e}")
                pass
    
    # Create schema for result DataFrame
    schema = StructType([
        StructField("n", IntegerType(), True),
        StructField("anime_name", StringType(), True),
        StructField("Genres", StringType(), True),
        StructField("Synopsis", StringType(), True)
    ])
    
    # Create Spark DataFrame from recommended animes
    if recommended_animes:
        return spark.createDataFrame(recommended_animes, schema)
    else:
        return spark.createDataFrame([], schema)

In [None]:
recommendations = get_recommended_animes(similar_users, user_pref, spark_df, spark_df_anime, n=10)
recommendations.show(truncate=False)

IBCF

In [None]:
from pyspark.sql import functions as F
from pyspark.sql.types import StructType, StructField, StringType, ArrayType
import numpy as np


def find_similar_animes_spark(name, spark_df_anime, anime_encoder, anime_weights, n=10, return_dist=False, neg=False):
    try:
        anime_row = spark_df_anime.filter(F.col("Name") == name).first()
        
        if not anime_row:
            print('{} not found in Anime list'.format(name))
            return None
            
        index = anime_row["anime_id"]
        encoded_index = anime_encoder.transform([index])[0]
        weights = anime_weights
        
        dists = np.dot(weights, weights[encoded_index])
        sorted_indices = np.argsort(dists)
        
        n = n + 1
        if neg:
            closest = sorted_indices[:n]
        else:
            closest = sorted_indices[-n:]
            
        print('Animes closest to {}'.format(name))
        
        if return_dist:
            return dists, closest
        
        similar_animes = []
        
        dists_broadcast = spark.sparkContext.broadcast(dists)
        
        decoded_ids = [anime_encoder.inverse_transform([close])[0] for close in closest]
        decoded_ids_broadcast = spark.sparkContext.broadcast(decoded_ids)
        
        schema = StructType([
            StructField("Name", StringType(), True),
            StructField("Similarity", StringType(), True),
            StructField("Genres", StringType(), True),
            StructField("Synopsis", StringType(), True)
        ])
        
        filtered_df = spark_df_anime.filter(F.col("anime_id").isin(decoded_ids_broadcast.value))
        
        similar_animes_rows = filtered_df.collect()
        
        for row in similar_animes_rows:
            anime_id = row["anime_id"]
            anime_name = row["Name"]
            english_name = row["English name"]
            name_to_use = english_name if english_name != "UNKNOWN" else anime_name
            genre = row["Genres"]
            synopsis = row["Synopsis"]
            
            encoded_id = anime_encoder.transform([anime_id])[0]
            similarity = dists_broadcast.value[encoded_id]
            similarity_formatted = "{:.2f}%".format(similarity * 100)
            
            similar_animes.append({
                "Name": name_to_use, 
                "Similarity": similarity_formatted, 
                "Genres": genre, 
                "Synopsis": synopsis
            })
        
        result_df = spark.createDataFrame(similar_animes, schema)
        
        result_df = result_df.sort(F.col("Similarity").desc()).filter(F.col("Name") != name)
        
        return result_df
    except Exception as e:
        print(f'Error: {e}')
        print('{} not found in Anime list or other error occurred'.format(name))
        return None

Main function to run the recommendation system

In [None]:
# Main function to run the recommendation system
def run_anime_recommendation_system(filepath, user_id_to_recommend=None):
    print("Loading data...")
    full_df, ratings_df = load_anime_data(filepath)
    
    print(f"Total ratings: {ratings_df.count()}")
    print(f"Unique users: {ratings_df.select('user_id').distinct().count()}")
    print(f"Unique anime: {ratings_df.select('anime_id').distinct().count()}")
    
    print("Splitting data into training and test sets...")
    train_df, test_df = split_data(ratings_df)
    
    print("Training ALS model...")
    model = train_als_model(train_df)
    
    print("Evaluating model...")
    rmse, predictions = evaluate_model(model, test_df)
    print(f"Root Mean Squared Error (RMSE): {rmse}")
    
    if user_id_to_recommend:
        print(f"Generating recommendations for user {user_id_to_recommend}...")
        user_recs = get_user_recommendations(model, user_id_to_recommend)
        
        user_recs_pd = user_recs.toPandas()
        
        # Join with anime names
        if 'anime_name' in full_df.columns:
            anime_names = full_df.select("anime_id", "anime_name").distinct()
            
            # Extract recommendations from the nested structure
            from pyspark.sql.functions import explode
            user_recs_exploded = user_recs.select(
                "user_id", 
                explode("recommendations").alias("rec")
            )
            user_recs_flattened = user_recs_exploded.select(
                "user_id", 
                col("rec.anime_id").alias("anime_id"), 
                col("rec.rating").alias("predicted_rating")
            )
            
            # Join with anime names
            user_recs_with_names = user_recs_flattened.join(
                anime_names, 
                on="anime_id"
            ).orderBy(col("predicted_rating").desc())
            
            print("Top recommended anime:")
            user_recs_with_names.show(10, truncate=False)
    
    return model, full_df, ratings_df

Example use case

In [None]:
if __name__ == "__main__":
    # Placeholder for path
    filepath = "anime_ratings_2023.csv"
    
    model, full_df, ratings_df = run_anime_recommendation_system(filepath, user_id_to_recommend=123)
    
    # Optional: Save the model
    model.save("anime_als_model")
    
    spark.stop()

Completed tasks:
- Setting up the recommendation system models
- Read datasets and train/test split 
- Explore the reason to use the ALS model 
- Integrated the processed datasets (anime, user-rating) to the model 
- Setting up UBCF

Remaining tasks:
- Integrating Spark with Redis database
- Solve the problem with real-time data on Redis while using ALS with micro-batch processing
- Training the model, evaluation of the model 