In [23]:
import numpy as np
import pandas as pd

In [26]:
ratings = pd.DataFrame({
    1: [1, np.nan, 2, np.nan, np.nan, 1],
    2: [np.nan, np.nan, 4, 2, np.nan, np.nan],
    3: [3, 5, np.nan, 4, 4, 3],
    4: [np.nan, 4, 1, np.nan, 3, np.nan],
    5: [np.nan, np.nan, 2, 5, 4, 3],
    6: [5, np.nan, np.nan, np.nan, 2, np.nan],
    7: [np.nan, 4, 3, np.nan, np.nan, np.nan],
    8: [np.nan, np.nan, np.nan, 4, np.nan, 2],
    9: [5, np.nan, 4, np.nan, np.nan, np.nan],
    10: [np.nan, 2, 3, np.nan, np.nan, np.nan],
    11: [4, 1, 5, 2, 2, 4],
    12: [np.nan, 3, np.nan, np.nan, 5, np.nan]
}, index=range(1, 7))

ratings

Unnamed: 0,1,2,3,4,5,6,7,8,9,10,11,12
1,1.0,,3.0,,,5.0,,,5.0,,4,
2,,,5.0,4.0,,,4.0,,,2.0,1,3.0
3,2.0,4.0,,1.0,2.0,,3.0,,4.0,3.0,5,
4,,2.0,4.0,,5.0,,,4.0,,,2,
5,,,4.0,3.0,4.0,2.0,,,,,2,5.0
6,1.0,,3.0,,3.0,,,2.0,,,4,


In [28]:
ratings = pd.DataFrame({
    1: [1, 0, 2, 0, 0, 1],
    2: [0, 0, 4, 2, 0, 0],
    3: [3, 5, 0, 4, 4, 3],
    4: [0, 4, 1, 0, 3, 0],
    5: [np.nan, 0, 2, 5, 4, 3],
    6: [5, 0, 0, 0, 2, 0],
    7: [0, 4, 3, 0, 0, 0],
    8: [0, 0, 0, 4, 0, 2],
    9: [5, 0, 4, 0, 0, 0],
    10: [0, 2, 3, 0, 0, 0],
    11: [4, 1, 5, 2, 2, 4],
    12: [0, 3, 0, 0, 5, 0]
}, index=range(1, 7))

ratings

Unnamed: 0,1,2,3,4,5,6,7,8,9,10,11,12
1,1,0,3,0,,5,0,0,5,0,4,0
2,0,0,5,4,0.0,0,4,0,0,2,1,3
3,2,4,0,1,2.0,0,3,0,4,3,5,0
4,0,2,4,0,5.0,0,0,4,0,0,2,0
5,0,0,4,3,4.0,2,0,0,0,0,2,5
6,1,0,3,0,3.0,0,0,2,0,0,4,0


In [29]:

# Use item-item based collaborative filtering to find user 1's rating for item 5
user_id = 1
item_id = 5
item_similarity = ratings.corr()
rated_items = ratings.loc[user_id].dropna().index
similarity_scores = item_similarity.loc[item_id, rated_items]
weighted_ratings = ratings.loc[user_id, rated_items] * similarity_scores
predicted_rating = weighted_ratings.sum() / similarity_scores.sum()
print(f"Predicted rating for user {user_id} on item {item_id}: {predicted_rating}")




Predicted rating for user 1 on item 5: -0.4111226925307214


## Collaborative Filter via PySpark

In [30]:
!pip install pyspark
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

openjdk-8-jdk-headless is already the newest version (8u422-b05-1~22.04).
0 upgraded, 0 newly installed, 0 to remove and 49 not upgraded.


In [31]:
from pyspark.sql import SparkSession
from pyspark.ml.recommendation import ALS

In [32]:
spark = SparkSession \
    .builder \
    .appName("RecommendationSystem") \
    .getOrCreate()

In [33]:
# Download the movielens dataset
import os
import urllib.request
import zipfile

# Download MovieLens ml-latest-small dataset
url = "http://files.grouplens.org/datasets/movielens/ml-latest-small.zip"
if not os.path.exists("Data"):
    os.mkdir("Data")
urllib.request.urlretrieve(url, 'Data/movielens.zip')
with zipfile.ZipFile('Data/movielens.zip', "r") as file:
    file.printdir()
    file.extractall('Data')

File Name                                             Modified             Size
ml-latest-small/                               2018-09-26 15:50:12            0
ml-latest-small/links.csv                      2018-09-26 15:50:10       197979
ml-latest-small/tags.csv                       2018-09-26 15:49:40       118660
ml-latest-small/ratings.csv                    2018-09-26 15:49:38      2483723
ml-latest-small/README.txt                     2018-09-26 15:50:12         8342
ml-latest-small/movies.csv                     2018-09-26 15:49:56       494431


In [35]:
# Read file into dataframe
ratings = spark.read.csv('Data/ml-latest-small/ratings.csv',
                         inferSchema=True, header=True)
movies = spark.read.csv('Data/ml-latest-small/movies.csv',
                        inferSchema=True, header=True)
ratings.join(movies, "movieId").show(3)

+-------+------+------+---------+--------------------+--------------------+
|movieId|userId|rating|timestamp|               title|              genres|
+-------+------+------+---------+--------------------+--------------------+
|      1|     1|   4.0|964982703|    Toy Story (1995)|Adventure|Animati...|
|      3|     1|   4.0|964981247|Grumpier Old Men ...|      Comedy|Romance|
|      6|     1|   4.0|964982224|         Heat (1995)|Action|Crime|Thri...|
+-------+------+------+---------+--------------------+--------------------+
only showing top 3 rows



In [36]:
# Create training dataset and testing dataset
data = ratings.select("userId", "movieId", "rating")

splits = data.randomSplit([0.8, 0.2])
train = splits[0].withColumnRenamed("rating", "label")
test = splits[1].withColumnRenamed("rating", "trueLabel")

print("Training data:", train.count())
print("Testing data:", test.count())

Training data: 80716
Testing data: 20120


In [10]:
# Define alternating least-squares algorithm
als = ALS(maxIter=19, regParam=0.01, userCol="userId",
          itemCol="movieId", ratingCol="label")

In [39]:
# Train the ALS model
model = als.fit(train)

In [40]:
# Make predictions on testing set
predictions = model.transform(test)
predictions.count()

20120

In [41]:
# Calcualte the RMSE for the test predictions
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(labelCol="trueLabel",
                                predictionCol="prediction",
                                metricName="rmse")
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE):", rmse)

Root Mean Squared Error (RMSE): nan


The RMSE is NaN. Find out what happened.

In [42]:
# Look for missing values in the test set
from pyspark.sql.functions import isnan, when, count, col
test.select([count(when(isnan(c), c)).alias(c) for c in test.columns]).show()

+------+-------+---------+
|userId|movieId|trueLabel|
+------+-------+---------+
|     0|      0|        0|
+------+-------+---------+



In [43]:
test.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- trueLabel: double (nullable = true)



In [44]:
predictions.select([count(when(isnan(c), c)).alias(c) for c in predictions.columns]).show()

+------+-------+---------+----------+
|userId|movieId|trueLabel|prediction|
+------+-------+---------+----------+
|     0|      0|        0|       801|
+------+-------+---------+----------+



In [45]:
# Remove missing predictions and then calculate RMSE
predictions = predictions.na.drop()
predictions.select([count(when(isnan(c), c)).alias(c) for c in predictions.columns]).show()

+------+-------+---------+----------+
|userId|movieId|trueLabel|prediction|
+------+-------+---------+----------+
|     0|      0|        0|         0|
+------+-------+---------+----------+



In [46]:
rmse = evaluator.evaluate(predictions)
print("Root Mean Squared Error (RMSE):", rmse)

Root Mean Squared Error (RMSE): 1.1615078022096765


In [50]:
model.userCol

Param(parent='ALS_0ac0cf32e493', name='userCol', doc='column name for user ids. Ids must be within the integer value range.')