In [None]:
import findspark

findspark.init()

In [3]:
findspark.find()

'/home/dsekerov/finki/rnmp/spark-env/lib64/python3.10/site-packages/pyspark'

In [1]:
from pyspark.sql import SparkSession

spark = (SparkSession.builder
         .appName("MovieLens-ALS")
         .config("spark.executor.extraJavaOptions", "-Ddev.ludovic.netlib.blas.nativeLib=libopenblas.so")
         .config("spark.driver.extraJavaOptions", "-Ddev.ludovic.netlib.blas.nativeLib=libopenblas.so")
         .getOrCreate())
spark

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/10 01:28:57 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
movies_df = spark.read.options(header=True, inferSchema=True).csv("./ml-latest-small/movies.csv")
movies_df.printSchema()
print("# of movies:", (movies_df.count()))

root
 |-- movieId: integer (nullable = true)
 |-- title: string (nullable = true)
 |-- genres: string (nullable = true)

# of movies: 9742


In [3]:
movies_df.show(5)

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
+-------+--------------------+--------------------+
only showing top 5 rows



In [19]:
ratings_df = spark.read.options(inferSchema=True, delimiter=',', header=True).csv("./ml-latest-small/ratings.csv")#.toDF("userId", "movieId", "rating", "timestamp")
ratings_df.printSchema()
print("# of ratings:", (ratings_df.count()))

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)

# of ratings: 100836


In [20]:
ratings_df.show(5)

+------+-------+------+---------+
|userId|movieId|rating|timestamp|
+------+-------+------+---------+
|     1|      1|   4.0|964982703|
|     1|      3|   4.0|964981247|
|     1|      6|   4.0|964982224|
|     1|     47|   5.0|964983815|
|     1|     50|   5.0|964982931|
+------+-------+------+---------+
only showing top 5 rows



In [21]:
from pyspark.mllib.recommendation import ALS, MatrixFactorizationModel, Rating

In [22]:
ratings_rdd = ratings_df.rdd.map(lambda x: Rating(x.userId, x.movieId, x.rating))
ratings_rdd

PythonRDD[384] at RDD at PythonRDD.scala:53

In [23]:
rank = 10
numIterations = 10
model = ALS.train(ratings_rdd, rank, numIterations)

                                                                                

In [24]:
import math
testdata = ratings_df.rdd.map(lambda p: (p[0], p[1]))
predictions = model.predictAll(testdata).map(lambda r: ((r[0], r[1]), r[2]))
ratesAndPreds = ratings_df.rdd.map(lambda r: ((r[0], r[1]), r[2])).join(predictions)
MSE = ratesAndPreds.map(lambda r: (r[1][0] - r[1][1])**2).mean()
MAE = ratesAndPreds.map(lambda r: abs(r[1][0] - r[1][1])).mean()
print(f"Mean Squared Error = {MSE:.4f}")
print(f"Mean Absolute Error = {MAE:.4f}")
print(f"Root Mean Squared Error = {math.sqrt(MSE):.4f}")

                                                                                

Mean Squared Error = 0.2585
Mean Absolute Error = 0.3578
Root Mean Squared Error = 0.5084


In [25]:
from surprise import SVD
from surprise import Dataset
from surprise.model_selection import cross_validate

# Load the movielens-100k dataset (download it if needed).
data = Dataset.load_builtin('ml-100k')

# Use the famous SVD algorithm.
algo = SVD()

# Run 5-fold cross-validation and print results.
cross_validate(algo, data, measures=['RMSE', "MSE", 'MAE'], cv=5, verbose=True)

Evaluating RMSE, MSE, MAE of algorithm SVD on 5 split(s).

                  Fold 1  Fold 2  Fold 3  Fold 4  Fold 5  Mean    Std     
RMSE (testset)    0.9348  0.9303  0.9383  0.9397  0.9331  0.9352  0.0034  
MSE (testset)     0.8739  0.8655  0.8804  0.8830  0.8707  0.8747  0.0064  
MAE (testset)     0.7376  0.7335  0.7404  0.7403  0.7333  0.7370  0.0031  
Fit time          2.03    1.94    1.93    2.14    1.92    1.99    0.08    
Test time         0.19    0.06    0.06    0.06    0.06    0.09    0.05    


{'test_rmse': array([0.93480424, 0.93031173, 0.93830878, 0.93970164, 0.93309786]),
 'test_mse': array([0.87385897, 0.86547991, 0.88042336, 0.88303916, 0.87067162]),
 'test_mae': array([0.73756476, 0.73351913, 0.74037272, 0.74031465, 0.73330927]),
 'fit_time': (2.0257320404052734,
  1.943732500076294,
  1.9330692291259766,
  2.1449594497680664,
  1.922614336013794),
 'test_time': (0.1880037784576416,
  0.06046557426452637,
  0.061232566833496094,
  0.06176471710205078,
  0.06047391891479492)}