In [2]:
import os

from pyspark.sql import SparkSession

In [4]:
MAX_MEMORY = "5g"
spark = (
    SparkSession.builder.appName("movie-recommendation")
    .config("spark.executor.memory", MAX_MEMORY) # executor의 메모리 설정
    .config("spark.driver.memory", MAX_MEMORY) # driver의 메모리
    .getOrCreate()
)

In [11]:
DIR_PATH = os.path.join(os.getcwd(), 'data')
DATA_PATH = os.path.join(DIR_PATH, 'ratings.csv')

rating_df = spark.read.csv(f"file:///{DATA_PATH}", inferSchema=True, header=True)

                                                                                

In [12]:
rating_df = rating_df.select(['userId', 'movieId', 'rating'])
rating_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)



In [13]:
rating_df.show(3)

+------+-------+------+
|userId|movieId|rating|
+------+-------+------+
|     1|    296|   5.0|
|     1|    306|   3.5|
|     1|    307|   5.0|
+------+-------+------+
only showing top 3 rows



In [14]:
train_df, test_df = rating_df.randomSplit([0.8, 0.2], seed=13)

In [15]:
from pyspark.ml.recommendation import ALS

In [16]:
als = ALS(
    maxIter=10,
    regParam=0.01,
    userCol='userId',
    itemCol='movieId',
    ratingCol='rating',
    nonnegative=True,
    coldStartStrategy='drop'
)

model = als.fit(train_df)

22/03/02 22:48:12 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/03/02 22:48:12 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
                                                                                

In [17]:
prediction = model.transform(test_df)
prediction.show(5)

                                                                                

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|    31|   6620|   1.5|  2.598858|
|    76|   1959|   5.0| 2.8625617|
|   243|   1580|   3.0| 2.4226482|
|   321|   3175|   3.0| 3.4213023|
|   321| 175197|   0.5| 1.7772007|
+------+-------+------+----------+
only showing top 5 rows



In [18]:
prediction.select(['rating', 'prediction']).describe().show()



+-------+------------------+------------------+
|summary|            rating|        prediction|
+-------+------------------+------------------+
|  count|           5001747|           5001747|
|   mean| 3.534025511486287| 3.469178528512191|
| stddev|1.0605557917302697|0.7167497065317837|
|    min|               0.5|               0.0|
|    max|               5.0|         10.424969|
+-------+------------------+------------------+



                                                                                

In [19]:
from pyspark.ml.evaluation import RegressionEvaluator

In [20]:
evaluator = RegressionEvaluator(
    labelCol='rating',
    predictionCol='prediction',
    metricName='rmse'
)

rmse = evaluator.evaluate(prediction)
print(rmse)



0.8026522429091694


                                                                                

In [40]:
import numpy as np
from sklearn.metrics import mean_squared_error

In [23]:
datas = prediction.select(['rating', 'prediction']).toPandas()

                                                                                

In [35]:
datas['scaled_prediction'] = (
    datas['prediction']
    .map(lambda x: 5.0 if x >= 5.0 else x)
    .map(lambda x: 0.0 if x < 0.0 else x)
)

In [39]:
pred_y = datas.scaled_prediction.values
true_y = datas.rating

rmse = np.sqrt(mean_squared_error(true_y, pred_y))
rmse

0.8016420737871666

In [42]:
unique_movies = rating_df.select('movieId').distinct()
unique_movies.show(2)



+-------+
|movieId|
+-------+
|   1088|
|   1580|
+-------+
only showing top 2 rows



                                                                                

In [43]:
a = unique_movies.alias('a')

In [45]:
user_id = 243

user_movies = rating_df.filter(rating_df['userId'] == user_id).select('movieId')
user_movies.show(2)

+-------+
|movieId|
+-------+
|      1|
|     10|
+-------+
only showing top 2 rows



In [46]:
b = user_movies.alias('b')

In [49]:
from pyspark.sql.functions import mean, col, split, regexp_extract, when, lit

In [50]:
user_unwatched = (
    a
    .join(b, a['movieId'] == b['movieId'], how='left')
    .where(col('b.movieId').isNull())
    .select('a.movieId').distinct()
)

user_unwatched.show(2)

                                                                                

+-------+
|movieId|
+-------+
|   1088|
|   3175|
+-------+
only showing top 2 rows



In [51]:
user_unwatched = user_unwatched.withColumn('userId', lit(int(user_id)))
user_unwatched.show(2)

                                                                                

+-------+------+
|movieId|userId|
+-------+------+
|   1088|   243|
|   3175|   243|
+-------+------+
only showing top 2 rows



In [69]:
n_recommend_movies = model.transform(user_unwatched).orderBy('prediction', ascending=False).limit(10)
n_recommend_movies.show()

                                                                                

+-------+------+----------+
|movieId|userId|prediction|
+-------+------+----------+
| 128667|   243|  8.223953|
| 189913|   243|  8.035086|
| 154860|   243| 7.7409115|
| 124519|   243| 7.7217045|
| 171295|   243|  7.544285|
| 198657|   243|  7.426848|
| 135470|   243|  7.386353|
| 152043|   243| 7.3225923|
| 206064|   243| 7.2165213|
| 153184|   243|  7.093982|
+-------+------+----------+



In [70]:
TITLE_PATH = os.path.join(DIR_PATH, 'movies.csv')

movie_df = spark.read.csv(f"file:///{TITLE_PATH}", inferSchema=True, header=True)
movie_df.show(2)

+-------+----------------+--------------------+
|movieId|           title|              genres|
+-------+----------------+--------------------+
|      1|Toy Story (1995)|Adventure|Animati...|
|      2|  Jumanji (1995)|Adventure|Childre...|
+-------+----------------+--------------------+
only showing top 2 rows



In [71]:
n_recommend_movies = (
    n_recommend_movies
    .join(movie_df.select(['movieId', 'title']), n_recommend_movies['movieId'] == movie_df['movieId'])
    .orderBy('prediction', ascending=False)
    .select([
        'userId', 'a.movieId', 'title', 'prediction'
    ])
)
n_recommend_movies.show()

                                                                                

+------+-------+--------------------+----------+
|userId|movieId|               title|prediction|
+------+-------+--------------------+----------+
|   243| 128667|      Wiseguy (1996)|  8.223953|
|   243| 189913|Todd McFarlane's ...|  8.035086|
|   243| 154860|       Mother (2016)| 7.7409115|
|   243| 124519|Snow White and th...| 7.7217045|
|   243| 171295|Doug Stanhope: De...|  7.544285|
|   243| 198657|  Manikarnika (2019)|  7.426848|
|   243| 135470| Without Fail (2014)|  7.386353|
|   243| 152043|       Leader (2010)| 7.3225923|
|   243| 206064|Secret Millionair...| 7.2165213|
|   243| 153184|          Vergeef me|  7.093982|
+------+-------+--------------------+----------+



In [73]:
from pyspark.sql.types import IntegerType

In [None]:
users_df = spark.createDataFrame([user_id], IntegerType()).toDF('userId')
users_df.show()

In [None]:
user_recs_df = model.recommendForUserSubset(users_df, 10)
user_recs_df.show()

In [78]:
recs_list = user_recs_df.collect()[0].recommendations
recs_list

[Row(movieId=128667, rating=8.223953247070312),
 Row(movieId=189913, rating=8.035085678100586),
 Row(movieId=154860, rating=7.740911483764648),
 Row(movieId=124519, rating=7.721704483032227),
 Row(movieId=171295, rating=7.544284820556641),
 Row(movieId=198657, rating=7.4268479347229),
 Row(movieId=135470, rating=7.386353015899658),
 Row(movieId=152043, rating=7.322592258453369),
 Row(movieId=206064, rating=7.216521263122559),
 Row(movieId=153184, rating=7.093982219696045)]

In [79]:
recs_df = spark.createDataFrame(recs_list)
recs_df.show()

+-------+-----------------+
|movieId|           rating|
+-------+-----------------+
| 128667|8.223953247070312|
| 189913|8.035085678100586|
| 154860|7.740911483764648|
| 124519|7.721704483032227|
| 171295|7.544284820556641|
| 198657|  7.4268479347229|
| 135470|7.386353015899658|
| 152043|7.322592258453369|
| 206064|7.216521263122559|
| 153184|7.093982219696045|
+-------+-----------------+



In [81]:
recs_df.createOrReplaceTempView("recommendations")
movie_df.createOrReplaceTempView("movies")

In [88]:
query = """
SELECT M.movieId, M.title, M.genres, R.rating
FROM movies M
JOIN recommendations R
ON M.movieId = R.movieId
ORDER BY rating desc
"""

recommended_movies = spark.sql(query)
recommended_movies.show()

+-------+--------------------+--------------------+-----------------+
|movieId|               title|              genres|           rating|
+-------+--------------------+--------------------+-----------------+
| 128667|      Wiseguy (1996)|  (no genres listed)|8.223953247070312|
| 189913|Todd McFarlane's ...|Action|Animation|...|8.035085678100586|
| 154860|       Mother (2016)|  (no genres listed)|7.740911483764648|
| 124519|Snow White and th...|Adventure|Childre...|7.721704483032227|
| 171295|Doug Stanhope: De...|              Comedy|7.544284820556641|
| 198657|  Manikarnika (2019)|        Action|Drama|  7.4268479347229|
| 135470| Without Fail (2014)|              Comedy|7.386353015899658|
| 152043|       Leader (2010)|       Drama|Romance|7.322592258453369|
| 206064|Secret Millionair...|             Romance|7.216521263122559|
| 153184|          Vergeef me|  (no genres listed)|7.093982219696045|
+-------+--------------------+--------------------+-----------------+



In [90]:
spark.stop()