In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, regexp_replace
from pyspark.sql.types import IntegerType, FloatType
import pandas as pd
import numpy as np
from surprise import SVD, Dataset, Reader
from surprise.model_selection import train_test_split

# Initialize Spark session
spark = SparkSession.builder \
    .appName("NetflixRecommendationSVD") \
    .master("local[*]") \
    .getOrCreate()

# Set configuration for HDFS
sc = spark.sparkContext

In [None]:
# Load movie titles from HDFS
df_title = spark.read.csv("hdfs://ff.ff.ff.ff:9000/netflix_data/movie_titles.csv", header=False, inferSchema=True)
df_title = df_title.select(
    col("_c0").cast(IntegerType()).alias("Movie_Id"),
    regexp_replace(col("_c1"), "[\r\n]", "").alias("Year"),
    regexp_replace(col("_c2"), "[\r\n]", "").alias("Name")
)

# Load and parse combined data (e.g., combined_data_1.txt)
combined_files = ["combined_data_1.txt", "combined_data_2.txt", "combined_data_3.txt", "combined_data_4.txt"]
raw_data = spark.read.text("hdfs://172.30.1.11:9000/netflix_data/" + combined_files[0])

for file in combined_files[1:]:
    raw_data = raw_data.union(spark.read.text("hdfs://172.30.1.11:9000/netflix_data/" + file))

# Parse the block structure
def parse_block(index, iterator):
    result = []
    movie_id = None
    for line in iterator:
        line = line.value.strip()
        if line.endswith(':'):
            movie_id = int(line[:-1])
        elif line and movie_id:
            cust_id, rating, date = line.split(',')
            result.append((int(cust_id), movie_id, float(rating), date))
    return result

parsed_rdd = raw_data.rdd.mapPartitionsWithIndex(parse_block)
df_ratings = spark.createDataFrame(parsed_rdd, ["Cust_Id", "Movie_Id", "Rating", "Date"])

# Filter out invalid entries and cache
df_ratings = df_ratings.filter(df_ratings.Movie_Id.isNotNull())
df_ratings.cache()

# Convert to Pandas for Surprise compatibility
df_pandas = df_ratings.toPandas()

In [None]:
# Define Reader for Surprise
reader = Reader(rating_scale=(1, 5))

# Load data into Surprise Dataset
data = Dataset.load_from_df(df_pandas[['Cust_Id', 'Movie_Id', 'Rating']], reader)

# Train-test split for validation (optional)
trainset, testset = train_test_split(data, test_size=0.2)

# Train SVD model
model_svd = SVD()
model_svd.fit(trainset)

In [None]:
# Get all movie IDs
all_movie_ids = df_title.select("Movie_Id").rdd.flatMap(lambda x: x).collect()

# Predict for user 785314
user_id = 785314
df_user_recom = df_title.toPandas()
df_user_recom['Estimate_score'] = df_user_recom['Movie_Id'].apply(lambda x: model_svd.predict(user_id, x).est)

# Sort and get top 5
top_5_recommendations = df_user_recom.sort_values('Estimate_score', ascending=False).head(5)
print("Top 5 Recommended Movies for User 785314:")
print(top_5_recommendations[['Name', 'Estimate_score']].to_string(index=False))


In [None]:
# Convert sparse matrix to Pandas (for correlation)
df_sparse_mat = pd.pivot_table(df_pandas, values='Rating', index='Cust_Id', columns='Movie_Id')

def recommend_movie(movie_title):
    movie_id = df_title.filter(df_title.Name == movie_title).select("Movie_Id").collect()[0][0]
    if movie_id in df_sparse_mat.columns:
        corr_y = df_sparse_mat.corrwith(df_sparse_mat[movie_id])
        df_recommend = pd.DataFrame(corr_y, columns=['Pearson_R'])
        df_recommend.dropna(inplace=True)
        df_recommend = df_recommend.sort_values('Pearson_R', ascending=False)
        df_recommend.index = df_recommend.index.map(int)
        df_recommend = df_recommend.join(df_title.toPandas().set_index('Movie_Id')[['Name']])
        print(df_recommend[df_recommend.index.isin(df_sparse_mat.columns) & (df_recommend.index != movie_id)].head(10).to_string(index=False))
    else:
        print(f"Movie '{movie_title}' not found or insufficient ratings.")

# Example usage
recommend_movie("The Twilight Samurai")

In [None]:
# Stop Spark session
spark.stop()