In [1]:
import time  
import pyspark  
from pyspark.sql import SparkSession  
spark = SparkSession.builder.appName('recommendation').getOrCreate()

24/04/16 23:10:37 WARN Utils: Your hostname, bemas-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 10.194.166.221 instead (on interface en0)
24/04/16 23:10:37 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/16 23:10:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
movies = spark.read.load("sparkdata/movies.csv", format='csv', header = True)
ratings = spark.read.load('sparkdata/ratings.csv', format='csv', header = True)
links = spark.read.load("sparkdata/links.csv", format='csv', header = True)
tags = spark.read.load("sparkdata/tags.csv", format='csv', header = True)
ratings.show()

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
|     1|     70|   3.0|964982400|
|     1|    101|   5.0|964980868|
|     1|    110|   4.0|964982176|
|     1|    151|   5.0|964984041|
|     1|    157|   5.0|964984100|
|     1|    163|   5.0|964983650|
|     1|    216|   5.0|964981208|
|     1|    223|   3.0|964980985|
|     1|    231|   5.0|964981179|
|     1|    235|   4.0|964980908|
|     1|    260|   5.0|964981680|
|     1|    296|   3.0|964982967|
|     1|    316|   3.0|964982310|
|     1|    333|   5.0|964981179|
|     1|    349|   4.0|964982563|
+------+-------+------+---------+
only showing top 20 rows



In [3]:
# print the schema to understand the data types of features
ratings = ratings.select("userId", "movieId", "rating")
ratings.printSchema()

root
 |-- userId: string (nullable = true)
 |-- movieId: string (nullable = true)
 |-- rating: string (nullable = true)



In [4]:
# convert the data type to integer and float
df = ratings.withColumn('userId', ratings['userId'].cast('int')).\
withColumn('movieId', ratings['movieId'].cast('int')).withColumn('rating', ratings['rating'].cast('float'))
df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: float (nullable = true)



In [5]:
# split the data into train, validation and test sets
train, validation, test = df.randomSplit([0.6,0.2,0.2], seed = 0)
print("The number of ratings in each set: {}, {}, {}".format(train.count(), validation.count(), test.count()))

                                                                                

The number of ratings in each set: 60435, 20052, 20349


In [6]:
from pyspark.sql.functions import col, sqrt
def RMSE(predictions):
    squared_diff = predictions.withColumn("squared_diff", pow(col("rating") - col("prediction"), 2))
    mse = squared_diff.selectExpr("mean(squared_diff) as mse").first().mse
    return mse ** 0.5

In [7]:
# implement the model using ALS algorithm and find the right hyperparameters using Grid Search
from pyspark.ml.recommendation import ALS

def GridSearch(train, valid, num_iterations, reg_param, n_factors):
    min_rmse = float('inf')
    best_n = -1
    best_reg = 0
    best_model = None
    # run Grid Search for all the parameter defined in the range in a loop
    for n in n_factors:
        for reg in reg_param:
            als = ALS(rank = n, 
                      maxIter = num_iterations, 
                      seed = 0, 
                      regParam = reg,
                      userCol="userId", 
                      itemCol="movieId", 
                      ratingCol="rating", 
                      coldStartStrategy="drop")            
            model = als.fit(train)
            predictions = model.transform(valid)
            rmse = RMSE(predictions)     
            print('{} latent factors and regularization = {}: validation RMSE is {}'.format(n, reg, rmse))
            # track the best model using RMSE
            if rmse < min_rmse:
                min_rmse = rmse
                best_n = n
                best_reg = reg
                best_model = model
                
    pred = best_model.transform(train)
    train_rmse = RMSE(pred)
    # best model and its metrics
    print('\nThe best model has {} latent factors and regularization = {}:'.format(best_n, best_reg))
    print('traning RMSE is {}; validation RMSE is {}'.format(train_rmse, min_rmse))
    return best_model

In [8]:
# build the model using different ranges for Grid Search
from pyspark.sql.functions import col, sqrt
num_iterations = 10
ranks = [6, 8, 10, 12]
reg_params = [0.05, 0.1, 0.2, 0.4, 0.8]

start_time = time.time()
final_model = GridSearch(train, validation, num_iterations, reg_params, ranks)
print('Total Runtime: {:.2f} seconds'.format(time.time() - start_time))

24/04/16 23:10:56 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/04/16 23:10:56 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
                                                                                

6 latent factors and regularization = 0.05: validation RMSE is 0.9774929328788823
6 latent factors and regularization = 0.1: validation RMSE is 0.9129091223429058
6 latent factors and regularization = 0.2: validation RMSE is 0.8951553382560972
6 latent factors and regularization = 0.4: validation RMSE is 0.9694803178313401
6 latent factors and regularization = 0.8: validation RMSE is 1.1934058854574956
8 latent factors and regularization = 0.05: validation RMSE is 0.991145449065857
8 latent factors and regularization = 0.1: validation RMSE is 0.9168968752992065
8 latent factors and regularization = 0.2: validation RMSE is 0.8984989590130781


[Stage 1030:>                                                       (0 + 1) / 1]                                                                                

8 latent factors and regularization = 0.4: validation RMSE is 0.970257089065676
8 latent factors and regularization = 0.8: validation RMSE is 1.1934001722789012
10 latent factors and regularization = 0.05: validation RMSE is 0.9978579799721213
10 latent factors and regularization = 0.1: validation RMSE is 0.9176672173294838
10 latent factors and regularization = 0.2: validation RMSE is 0.89872811595771
10 latent factors and regularization = 0.4: validation RMSE is 0.9695217458657989
10 latent factors and regularization = 0.8: validation RMSE is 1.1934037198726448
12 latent factors and regularization = 0.05: validation RMSE is 1.0053856065363034
12 latent factors and regularization = 0.1: validation RMSE is 0.9177483821613751
12 latent factors and regularization = 0.2: validation RMSE is 0.9000614069040118
12 latent factors and regularization = 0.4: validation RMSE is 0.9701108570324755


                                                                                

12 latent factors and regularization = 0.8: validation RMSE is 1.193400724202843


[Stage 2320:>                                                       (0 + 1) / 1]                                                                                


The best model has 6 latent factors and regularization = 0.2:
traning RMSE is 0.6876113846732682; validation RMSE is 0.8951553382560972
Total Runtime: 162.54 seconds


In [9]:
pred_test = final_model.transform(test)
print('The testing RMSE is ' + str(RMSE(pred_test)))

The testing RMSE is 0.8959197572737135


In [10]:
single_user = test.filter(test['userId']==12).select(['movieId','userId'])
single_user.show()

+-------+------+
|movieId|userId|
+-------+------+
|    543|    12|
|   1357|    12|
|   2485|    12|
+-------+------+



In [11]:
single_user.join(movies, single_user.movieId == movies.movieId, 'inner').show()

+-------+------+-------+--------------------+--------------------+
|movieId|userId|movieId|               title|              genres|
+-------+------+-------+--------------------+--------------------+
|    543|    12|    543|So I Married an A...|Comedy|Romance|Th...|
|   1357|    12|   1357|        Shine (1996)|       Drama|Romance|
|   2485|    12|   2485|She's All That (1...|      Comedy|Romance|
+-------+------+-------+--------------------+--------------------+



In [12]:
reccomendations = final_model.transform(single_user)
reccomendations.orderBy('prediction',ascending=False).show()

[Stage 2439:>                                                       (0 + 1) / 1]                                                                                

+-------+------+----------+
|movieId|userId|prediction|
+-------+------+----------+
|   1357|    12|  5.015935|
|    543|    12| 3.6550279|
|   2485|    12| 3.4955368|
+-------+------+----------+



In [13]:
reccomendations.join(movies, reccomendations.movieId == movies.movieId, 'inner').show()

+-------+------+----------+-------+--------------------+--------------------+
|movieId|userId|prediction|movieId|               title|              genres|
+-------+------+----------+-------+--------------------+--------------------+
|    543|    12| 3.6550279|    543|So I Married an A...|Comedy|Romance|Th...|
|   1357|    12|  5.015935|   1357|        Shine (1996)|       Drama|Romance|
|   2485|    12| 3.4955368|   2485|She's All That (1...|      Comedy|Romance|
+-------+------+----------+-------+--------------------+--------------------+



In [14]:
from pyspark.sql.functions import col, lit

# select a single user from the test set
user_id = 12
single_user_ratings = test.filter(test['userId'] == user_id).select(['movieId', 'userId', 'rating'])

# display the movies the user has liked
print("Movies liked by user with ID", user_id)
single_user_ratings.join(movies, 'movieId').select('movieId', 'title', 'rating').show()

# generate recommendations for the user
all_movies = df.select('movieId').distinct()
user_movies = single_user_ratings.select('movieId').distinct()
movies_to_recommend = all_movies.subtract(user_movies)

# predict ratings for movies the user has not rated yet
recommendations = final_model.transform(movies_to_recommend.withColumn('userId', lit(user_id)))

# filter out the movies that the user has already rated or seen (this filters out the movies that the user has not liked as well)
recommendations = recommendations.filter(col('prediction') > 0)

# display the recommendations with movie names
print("Recommended movies for user with ID", user_id)
recommended_movies = recommendations.join(movies, 'movieId').select('movieId', 'title', 'prediction')

# Sort recommended movies by prediction in descending order
ordered_recommendations = recommended_movies.orderBy(col('prediction').desc())

# Display the ordered recommendations
ordered_recommendations.show()

Movies liked by user with ID 12
+-------+--------------------+------+
|movieId|               title|rating|
+-------+--------------------+------+
|    543|So I Married an A...|   3.5|
|   1357|        Shine (1996)|   5.0|
|   2485|She's All That (1...|   5.0|
+-------+--------------------+------+

Recommended movies for user with ID 12
+-------+--------------------+----------+
|movieId|               title|prediction|
+-------+--------------------+----------+
|  67618|Strictly Sexual (...|  6.166763|
|   3379| On the Beach (1959)| 6.1177487|
|   5867|        Thief (1981)| 5.9761686|
|  42730|   Glory Road (2006)| 5.9761686|
|   4535|Man from Snowy Ri...| 5.9761686|
|   7121|   Adam's Rib (1949)|  5.967025|
|  60943| Frozen River (2008)|  5.941128|
|  33649|  Saving Face (2004)|  5.935265|
|  25906|Mr. Skeffington (...| 5.9273844|
|  77846| 12 Angry Men (1997)| 5.9273844|
|   3200|Last Detail, The ...|  5.890436|
|   3567|   Bossa Nova (2000)| 5.8710847|
|  94070|Best Exotic Marig...|  

In [15]:
from pyspark.sql.types import IntegerType, FloatType

def recommend_similar_movies():
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import lit

    spark = SparkSession.builder.appName('recommendation').getOrCreate()

    # User input for movie names
    movie_list_input = input("Enter movie names (separated by commas), each followed by its release year (in parentheses): ")
    movie_list = [movie.strip() for movie in movie_list_input.split(',')]

    # Find movie IDs for the input movies
    movie_ids = movies.select("movieId", "title").filter(movies["title"].isin(movie_list))

    # Join the input movie IDs with ratings to find ratings for these movies
    user_ratings = movie_ids.join(ratings, "movieId")

    # Add a new column "userId" with a constant value (0) to represent the user
    user_ratings = user_ratings.withColumn("userId", lit(0))

    # Ensure "movieId" column is of type Integer
    user_ratings = user_ratings.withColumn("movieId", user_ratings["movieId"].cast(IntegerType()))

    # Ensure "rating" column is of type Float
    user_ratings = user_ratings.withColumn("rating", user_ratings["rating"].cast(FloatType()))

    # Add missing columns to user_ratings to match the schema of train
    user_ratings = user_ratings.select("userId", "movieId", "rating")

    # Add the user's ratings to the training data
    user_data = train.union(user_ratings)

    # Train the model again with the updated training data
    updated_model = ALS(rank=final_model.rank, maxIter=final_model._java_obj.parent().getMaxIter(),
                        seed=final_model._java_obj.parent().getSeed(),
                        regParam=final_model._java_obj.parent().getRegParam(),
                        userCol="userId", itemCol="movieId", ratingCol="rating",
                        coldStartStrategy="drop").fit(user_data)

    # Create DataFrame containing the movies rated by the user
    user_movies_df = user_ratings.select("userId", "movieId").distinct()

    # Get recommendations for the user
    recommendations = updated_model.recommendForUserSubset(user_movies_df, 10)

    # Extract recommended movie IDs
    recommended_movie_ids = [recommendation.movieId for recommendation in recommendations.select("recommendations").collect()[0][0]]

    # Fetch movie titles for recommended movie IDs
    recommended_movies_titles = movies.select("movieId", "title").filter(movies["movieId"].isin(recommended_movie_ids))

    # Display recommended movie titles
    recommended_movies_titles.show()

# Example usage:
recommend_similar_movies()

Enter movie names (separated by commas), each followed by its release year (in parentheses): Spider-Man (2002)


                                                                                

+-------+--------------------+
|movieId|               title|
+-------+--------------------+
|   3086|Babes in Toyland ...|
|   3379| On the Beach (1959)|
|   3567|   Bossa Nova (2000)|
|   5490|  The Big Bus (1976)|
|   7121|   Adam's Rib (1949)|
|  33649|  Saving Face (2004)|
|  60943| Frozen River (2008)|
|  67618|Strictly Sexual (...|
| 132333|         Seve (2014)|
| 147382|Doctor Who: Voyag...|
+-------+--------------------+



In [16]:
import matplotlib
matplotlib.use('Agg')

import tkinter as tk
from tkinter import ttk, messagebox
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS
from pyspark.sql.functions import lit
from pyspark.sql.types import IntegerType, FloatType

# Initialize SparkSession
spark = SparkSession.builder.appName('recommendation').getOrCreate()

# Assume you have 'movies' and 'ratings' DataFrames available

def recommend_similar_movies(movie_list_input):
    global spark
    
    # Find movie IDs for the input movies
    movie_list = [movie.strip() for movie in movie_list_input.split(',')]
    movie_ids = movies.select("movieId", "title").filter(movies["title"].isin(movie_list))

    # Join the input movie IDs with ratings to find ratings for these movies
    user_ratings = movie_ids.join(ratings, "movieId")

    # Add a new column "userId" with a constant value (0) to represent the user
    user_ratings = user_ratings.withColumn("userId", lit(0))

    # Ensure "movieId" column is of type Integer
    user_ratings = user_ratings.withColumn("movieId", user_ratings["movieId"].cast(IntegerType()))

    # Ensure "rating" column is of type Float
    user_ratings = user_ratings.withColumn("rating", user_ratings["rating"].cast(FloatType()))

    # Add missing columns to user_ratings to match the schema of train
    user_ratings = user_ratings.select("userId", "movieId", "rating")

    # Add the user's ratings to the training data
    user_data = train.union(user_ratings)

    # Train the model again with the updated training data
    updated_model = ALS(rank=final_model.rank, maxIter=final_model._java_obj.parent().getMaxIter(),
                        seed=final_model._java_obj.parent().getSeed(),
                        regParam=final_model._java_obj.parent().getRegParam(),
                        userCol="userId", itemCol="movieId", ratingCol="rating",
                        coldStartStrategy="drop").fit(user_data)

    # Create DataFrame containing the movies rated by the user
    user_movies_df = user_ratings.select("userId", "movieId").distinct()

    # Get recommendations for the user
    recommendations = updated_model.recommendForUserSubset(user_movies_df, 10)

    # Extract recommended movie IDs
    recommended_movie_ids = [recommendation.movieId for recommendation in recommendations.select("recommendations").collect()[0][0]]

    # Fetch movie titles for recommended movie IDs
    recommended_movies_titles = movies.select("movieId", "title").filter(movies["movieId"].isin(recommended_movie_ids))

    # Display recommended movie titles
    recommended_movies = recommended_movies_titles.collect()
    recommended_movie_titles = [movie.title for movie in recommended_movies]
    return recommended_movie_titles

def on_submit():
    movie_list_input = entry.get()
    if movie_list_input:
        recommendations = recommend_similar_movies(movie_list_input)
        if recommendations:
            messagebox.showinfo("Recommendations", "\n".join(recommendations))
        else:
            messagebox.showwarning("No Recommendations", "No recommendations found for the provided movies.")
    else:
        messagebox.showwarning("Empty Input", "Please enter at least one movie.")

# Create main window
root = tk.Tk()
root.title("Movie Recommendation System")

# Create input label and entry
label = ttk.Label(root, text="Enter movie names (separated by commas), each followed by its release year (in parentheses):")
label.pack(pady=10)

entry = ttk.Entry(root, width=50)
entry.pack(pady=5)

# Create submit button
submit_button = ttk.Button(root, text="Submit", command=on_submit)
submit_button.pack(pady=5)

# Run the application
root.mainloop()


                                                                                