In [1]:
import pyspark.sql.functions as sql_func
from pyspark.sql.types import *
from pyspark.ml.recommendation import ALS, ALSModel
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.mllib.evaluation import RegressionMetrics, RankingMetrics
from pyspark.ml.evaluation import RegressionEvaluator



from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, when
from pyspark.sql.types import IntegerType, StringType, ArrayType, StructType, StructField


import pandas as pd
from ast import literal_eval
from datetime import datetime

In [2]:
sc = SparkContext('local')
spark = SparkSession(sc)

In [3]:
credits = pd.read_csv('../data/credits.csv')
keywords = pd.read_csv('../data/keywords.csv')
movies = pd.read_csv('../data/movies_metadata.csv').drop(['belongs_to_collection', 'homepage', 'imdb_id','production_countries', 'poster_path','production_companies', 'status', 'title', 'video'], axis=1).drop([19730, 29503, 35587]) # Incorrect data type
movies['id'] = movies['id'].astype('int64')
df = movies.merge(keywords, on='id').merge(credits, on='id')
df['original_language'] = df['original_language'].fillna('')
df['runtime'] = df['runtime'].fillna(0)
df['tagline'] = df['tagline'].fillna('')
df.dropna(inplace=True)
def get_text(text, obj='name'):
    text = literal_eval(text)
    
    if len(text) == 1:
        for i in text:
            return i[obj]
    else:
        s = []
        for i in text:
            s.append(i[obj])
        return ', '.join(s)
    
df['genres'] = df['genres'].apply(get_text)
df['crew'] = df['crew'].apply(get_text)
df['spoken_languages'] = df['spoken_languages'].apply(get_text)
df['keywords'] = df['keywords'].apply(get_text)


df.drop('cast', axis=1, inplace=True)
df = df[~df['original_title'].duplicated()]
df = df.reset_index(drop=True)

  movies = pd.read_csv('../data/movies_metadata.csv').drop(['belongs_to_collection', 'homepage', 'imdb_id','production_countries', 'poster_path','production_companies', 'status', 'title', 'video'], axis=1).drop([19730, 29503, 35587]) # Incorrect data type


In [5]:
ratings_df = pd.read_csv('../data/ratings.csv')

ratings_df['date'] = ratings_df['timestamp'].apply(lambda x: datetime.fromtimestamp(x))
ratings_df.drop('timestamp', axis=1, inplace=True)

ratings_df = ratings_df.merge(df[['id', 'original_title', 'genres', 'overview']], left_on='movieId',right_on='id', how='left')
ratings_df = ratings_df[~ratings_df['id'].isna()]
ratings_df.drop('id', axis=1, inplace=True)
ratings_df.reset_index(drop=True, inplace=True)

ratings_df.head()

Unnamed: 0,userId,movieId,rating,date,original_title,genres,overview
0,1,110,1.0,2015-03-10 05:52:09,Trois couleurs : Rouge,"Drama, Mystery, Romance",Red This is the third film from the trilogy by...
1,1,147,4.5,2015-03-10 06:07:15,Les Quatre Cents Coups,Drama,"For young Parisian boy Antoine Doinel, life is..."
2,1,858,5.0,2015-03-10 05:52:03,Sleepless in Seattle,"Comedy, Drama, Romance",A young boy who tries to set his dad up on a d...
3,1,1246,5.0,2015-03-10 05:52:36,Rocky Balboa,Drama,When he loses a highly publicized virtual boxi...
4,1,1968,4.0,2015-03-10 06:02:28,Fools Rush In,"Drama, Comedy, Romance",Alex Whitman (Matthew Perry) is a designer fro...


In [10]:

ratings_df[['userId', 'movieId', 'rating']].to_csv('../data/newRatings.csv', index=False)

print("CSV file has been created.")

CSV file has been created.


In [11]:
data_schema = StructType([
    StructField('userId', IntegerType(), False),
    StructField('movieId', IntegerType(), False),
    StructField('rating', FloatType(), False),
])
final_stat = spark.read.csv(
    '../data/newRatings.csv', header=True, schema=data_schema
).cache()

ratings = (final_stat.select(
    'userId',
    'movieId',
    'rating'
)).cache()
#   
# print(type(final_stat))

In [12]:
(training, test) = ratings.randomSplit([0.7, 0.3], seed=42)

In [13]:
als = ALS(
          rank=30,
          maxIter=4, 
          regParam=0.1,
          userCol='userId', 
          itemCol='movieId', 
          ratingCol='rating',
          coldStartStrategy='drop',
          implicitPrefs=False
         )
model = als.fit(training)

predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName='mae', labelCol='rating',
                                predictionCol='prediction')

mae = evaluator.evaluate(predictions)
print(f'MAE (Test) = {mae}')

MAE (Test) = 0.662852585545493


In [42]:
inputUserID = int(input())

# Make predictions for user 123
user_123_ratings = test.filter(test['userId'] == inputUserID)
user_123_predictions = model.transform(user_123_ratings)

# Show the top 10 recommendations for user 123
top_10_recommendations = model.recommendForUserSubset(user_123_ratings, 10)
top_10_recommendations.show(truncate=False)

# # Evaluate the model on the test set
# evaluator = RegressionEvaluator(metricName='mae', labelCol='rating', predictionCol='prediction')
# mae = evaluator.evaluate(user_123_predictions)
# print(f'MAE (User 123) = {mae}')

+------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                                                                                                                         |
+------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|123   |[{62206, 5.833834}, {173153, 5.6407146}, {98328, 5.5967803}, {89403, 5.5477943}, {132912, 5.4823127}, {166627, 5.3826385}, {137174, 5.341543}, {96717, 5.341543}, {172469, 5.299706}, {87358, 5.290487}]|
+------+--------------------------------------------------------------------------------------------------------------------------------------------------------

In [46]:
all_users_predictions = model.transform(test)

# Show the top 10 recommendations for all users
top_10_recommendations = model.recommendForAllUsers(10)
top_10_recommendations.show(truncate=False)

+------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|userId|recommendations                                                                                                                                                                                              |
+------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|1     |[{91007, 5.2898293}, {89403, 5.220022}, {173153, 5.1777782}, {54326, 5.1073136}, {41434, 5.0428166}, {62206, 4.952603}, {26018, 4.8598213}, {146724, 4.8413463}, {133441, 4.8364716}, {79850, 4.8115897}]    |
|12    |[{89403, 5.161976}, {62206, 5.0326033}, {166627, 4.9803925}, {87719, 4.8394227}, {26610, 4.6884055}, {127445, 4.624134}, {147841, 4.

In [15]:
print(type(top_10_recommendations) )

<class 'pyspark.sql.dataframe.DataFrame'>


In [43]:
recommendations_list = []

# Iterate through each row in the DataFrame
for row in top_10_recommendations.rdd.collect():
    user_id = row['userId']
    user_recommendations = row['recommendations']
    
    # Extract movieId and rating from each recommendation
    movie_ratings = [(recommendation['movieId'], recommendation['rating']) for recommendation in user_recommendations]
    
    # Extend the recommendations_list with movie_ratings
    recommendations_list.extend(movie_ratings)

# Display the recommendations list
print(recommendations_list)

[(62206, 5.833834171295166), (173153, 5.640714645385742), (98328, 5.596780300140381), (89403, 5.547794342041016), (132912, 5.4823126792907715), (166627, 5.382638454437256), (137174, 5.341543197631836), (96717, 5.341543197631836), (172469, 5.299705982208252), (87358, 5.290486812591553)]


In [44]:
# Assuming you have a DataFrame called recommendations_df with columns ['movieId', 'rating']
recommendations_df = pd.DataFrame(recommendations_list, columns=['movieId', 'predictionValue'])

# Merge recommendations_df with df to get additional movie information
result_df = pd.merge(recommendations_df, df[['id', 'original_title', 'genres', 'overview']], left_on='movieId', right_on='id', how='left')

# Drop unnecessary columns including 'movieId'
result_df.drop(['id', 'movieId'], axis=1, inplace=True)

# Reorder columns with 'predictionValue' behind 'overview'
result_df = result_df[['original_title', 'genres', 'overview', 'predictionValue']]

# Drop duplicates and reset index
result_df = result_df.drop_duplicates().reset_index(drop=True)

# Display the result
result_df.head()

Unnamed: 0,original_title,genres,overview,predictionValue
0,30 Minutes or Less,"Action, Adventure, Comedy",Two fledgling criminals kidnap a pizza deliver...,5.833834
1,Phil Spector,"TV Movie, Drama",A drama centered on the relationship between P...,5.640715
2,The Great Caruso,"Drama, Music",Loosely traces the life of tenor Enrico Caruso...,5.59678
3,Dear God No!,"Horror, Comedy",A gang of outlaw bikers pull a home invasion o...,5.547794
4,The Price of Sex,Documentary,The Price of Sex is a documentary about young ...,5.482313


In [45]:
# Create a semicolon-separated string of 'original_title' column
titles_str = str(inputUserID) + ";"
titles_str += ';'.join(result_df['original_title'].astype(str))

# Write the string to a file
with open('prediction.txt', 'w') as file:
    file.write(titles_str)

# For all users output

In [49]:
# Collect recommendations for all users
recommendations_list = []

# Iterate through each row in the DataFrame
for row in top_10_recommendations.collect():
    user_id = row['userId']
    user_recommendations = row['recommendations']
    
    # Extract movieId and rating from each recommendation
    movie_ratings = [(recommendation['movieId'], recommendation['rating']) for recommendation in user_recommendations]
    
    # Extend the recommendations_list with movie_ratings
    recommendations_list.extend([(user_id, movie_id) for movie_id, _ in movie_ratings])



In [54]:
print(recommendations_list[:20])

[(1, 91007), (1, 89403), (1, 173153), (1, 54326), (1, 41434), (1, 62206), (1, 26018), (1, 146724), (1, 133441), (1, 79850), (12, 89403), (12, 62206), (12, 166627), (12, 87719), (12, 26610), (12, 127445), (12, 147841), (12, 53774), (12, 87358), (12, 171277)]


In [55]:
# Convert the recommendations_list to a DataFrame
result_df = pd.DataFrame(recommendations_list, columns=['userId', 'movieId'])

# # Assuming you have a DataFrame called df with columns ['movieId', 'original_title', 'genres', 'overview']
# # Merge result_df with df to get additional movie information
# result_df = pd.merge(result_df, df[['id', 'original_title']], on='movieId', how='left')

# # Group the recommendations by userId and aggregate the movie titles
# result_df = result_df.groupby('userId')['original_title'].apply(list).reset_index()


In [57]:
result_df = pd.merge(result_df, df[['id', 'original_title', 'genres', 'overview']], left_on='movieId', right_on='id', how='left')


In [58]:
# Group the recommendations by userId and aggregate the movie titles
result_df = result_df.groupby('userId')['original_title'].apply(list).reset_index()

In [61]:
# Convert the list of movie titles to a semicolon-separated string
result_df['recommendations_str'] = result_df['userId'].astype(str) + ";" + result_df['original_title'].apply(lambda x: ';'.join(map(str, x)))
# Save the recommendations to a file

result_df[['recommendations_str']].to_csv('all_users_predictions.txt', index=False, header=False, sep=';', quoting=3, escapechar='\\', doublequote=False)

# Read the file content and replace the escape character
with open('all_users_predictions.txt', 'r') as file:
    content = file.read()

content = content.replace('\\;', ';')

# Write the modified content back to the file
with open('all_users_predictions.txt', 'w') as file:
    file.write(content)