In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("test_app").getOrCreate()

Looking at the data present and removing the timestamp

In [2]:
data_for_model=spark.read.csv("./data/ratings.csv", header=True, inferSchema=True)

In [3]:
data_for_model=data_for_model.drop("timestamp")

In [4]:
ratings=data_for_model
# Count the total number of ratings in the dataset
numerator = ratings.select("rating").count()

# Count the number of distinct userIds and distinct movieIds
num_users = ratings.select("userId").distinct().count()
num_movies = ratings.select("movieId").distinct().count()

# Set the denominator equal to the number of users multiplied by the number of movies
denominator = num_users * num_movies

# Divide the numerator by the denominator
sparsity = (1.0 - (numerator *1.0)/denominator)*100
print("The ratings dataframe is ", "%.2f" % sparsity + "% empty.")

The ratings dataframe is  98.30% empty.


In [5]:
from pyspark.sql.functions import count, avg, min
# Min num ratings for movies
print("Movie with the fewest ratings: ")
ratings.groupBy("movieId").count().select(min("count")).show()

# Avg num ratings per movie
print("Avg num ratings per movie: ")
ratings.groupBy("movieId").count().select(avg("count")).show()

# Min num ratings for user
print("User with the fewest ratings: ")
ratings.groupBy("userId").count().select(min("count")).show()

# Avg num ratings per users
print("Avg num ratings per user: ")
ratings.groupBy("userId").count().select(avg("count")).show()

Movie with the fewest ratings: 
+----------+
|min(count)|
+----------+
|         1|
+----------+

Avg num ratings per movie: 
+------------------+
|        avg(count)|
+------------------+
|10.369806663924312|
+------------------+

User with the fewest ratings: 
+----------+
|min(count)|
+----------+
|        20|
+----------+

Avg num ratings per user: 
+------------------+
|        avg(count)|
+------------------+
|165.30491803278687|
+------------------+



In [6]:
ratings.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)



Model Creation and Evaluation

In [7]:
train_df, test_df = data_for_model.randomSplit(weights=[0.7,0.3], seed=100)


In [8]:
# Configure the ALS model
import setuptools.dist
from pyspark.ml.recommendation import ALS


In [20]:

als = ALS(userCol='userId', itemCol='movieId', ratingCol='rating',
          coldStartStrategy='drop', nonnegative=True,rank=4,seed=0)

model = als.fit(train_df)

In [21]:
predictions = model.transform(test_df)

from pyspark.ml.evaluation import RegressionEvaluator
# Tell Spark how to evaluate predictions
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")
# Obtain and print RMSE
rmse = evaluator.evaluate(predictions)
print ("RMSE: ", rmse)


RMSE:  0.8879303777841778


Recommendation for a single user


In [None]:
single_user = test_df.filter(test_df['userId']==1)

In [38]:
reccomendations = model.transform(single_user)

In [42]:
movies=spark.read.csv("./data/movies.csv", header=True, inferSchema=True)


In [48]:
recs_final=reccomendations.orderBy('prediction',ascending=False).limit(5)
recs_final.join(movies,recs_final.movieId==movies.movieId).select(['title','genres']).show(truncate=False)

+-----------------------------------------+---------------------------------------+
|title                                    |genres                                 |
+-----------------------------------------+---------------------------------------+
|Star Wars: Episode IV - A New Hope (1977)|Action|Adventure|Sci-Fi                |
|Silence of the Lambs, The (1991)         |Crime|Horror|Thriller                  |
|Mr. Smith Goes to Washington (1939)      |Drama                                  |
|Princess Bride, The (1987)               |Action|Adventure|Comedy|Fantasy|Romance|
|Office Space (1999)                      |Comedy|Crime                           |
+-----------------------------------------+---------------------------------------+



Recommendations for all users


In [11]:
user_recs=model.recommendForAllUsers(3)
data=user_recs.collect()

In [None]:
for user in data:
    print(user['userId'])
    for rec in user['recommendations']:
        print(rec['movieId'])

In [13]:
from pyspark.sql import Row

to_predict = {
                'userId':[0,0],
                'movieId':[ 954, 457],
                'rating': [3.0, 5.0]} 


new_user_id=0
new_user_ratings = [
    Row(userId=new_user_id, movieId=954, rating=4.0),
    Row(user=new_user_id, movieId=457, rating=5.0),
    Row(user=new_user_id, movieId=33649, rating=3.0),
]

# Create a DataFrame for the new user's ratings
new_user_ratings_df = spark.createDataFrame(new_user_ratings)
        
# Create DataFrame with schema 
df = spark.createDataFrame(new_user_ratings) 

