In [2]:
# [+] SparkSession 설정
from pyspark.sql import SparkSession
ss = SparkSession.builder.master('local').appName('movie_recommendation').getOrCreate()

In [3]:
# [+] movielens 데이터 불러오기( csv -> DataFrame)
# ratings_short.csv: 원본 데이터에서 7만개의 평점 데이터만 선택한 버전

ratings_df = ss.read.csv('./data/ratings_short.csv',header=True, inferSchema = True)

In [4]:
ratings_df.show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    296|   5.0|1147880044|
|     1|    306|   3.5|1147868817|
|     1|    307|   5.0|1147868828|
|     1|    665|   5.0|1147878820|
|     1|    899|   3.5|1147868510|
|     1|   1088|   4.0|1147868495|
|     1|   1175|   3.5|1147868826|
|     1|   1217|   3.5|1147878326|
|     1|   1237|   5.0|1147868839|
|     1|   1250|   4.0|1147868414|
|     1|   1260|   3.5|1147877857|
|     1|   1653|   4.0|1147868097|
|     1|   2011|   2.5|1147868079|
|     1|   2012|   2.5|1147868068|
|     1|   2068|   2.5|1147869044|
|     1|   2161|   3.5|1147868609|
|     1|   2351|   4.5|1147877957|
|     1|   2573|   4.0|1147878923|
|     1|   2632|   5.0|1147878248|
|     1|   2692|   5.0|1147869100|
+------+-------+------+----------+
only showing top 20 rows



In [6]:
ratings_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [7]:
# [+] 타임스탬프 제외한 컬럼 선택
ratings_df = ratings_df.select(['userId', 'movieId', 'rating'])

In [8]:
# [+] describe(): 기본 통계치 출력
ratings_df.describe().show()

+-------+------------------+-----------------+------------------+
|summary|            userId|          movieId|            rating|
+-------+------------------+-----------------+------------------+
|  count|             71921|            71921|             71921|
|   mean| 283.8726241292529|21643.63890935888|3.5821387355570695|
| stddev|167.23649146725324|38820.78314308897| 1.042406032579843|
|    min|                 1|                1|               0.5|
|    max|               550|           205106|               5.0|
+-------+------------------+-----------------+------------------+



In [9]:
# [+] randomSplit(): 훈련 데이터셋과 테스트 데이터셋을 나누기
tranin_df, test_df = ratings_df.randomSplit([0.8, 0.2])

In [10]:
# [+] 추천 알고리즘(Alternating Least Squares) 임포트
# pyspark.ml.recommendation.ALSf
from pyspark.ml.recommendation import ALS

In [11]:
# 추천 알고리즘 설정

als = ALS(
    maxIter=5,
    regParam=0.1,
    userCol='userId',
    itemCol='movieId',
    ratingCol='rating',
    coldStartStrategy='drop'
)

In [12]:
# [+] 모델 학습
model = als.fit(tranin_df)

In [None]:
# # 메모리 부족으로 인한 오류 발생시, 아래의 코드를 실행
# from pyspark.sql import SparkSession

# MAX_MEMORY = '5g'
# ss = SparkSession.builder.appName('movie-recommendation')\
#     .config('spark.executor.memory', MAX_MEMORY)\
#     .config('spark.driver.memory', MAX_MEMORY)\
#     .getOrCreate()

In [13]:
# [+] 모델 예측
predictions = model.transform(test_df)

In [14]:
predictions.show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   148|     32|   4.0| 3.9888577|
|   148|     50|   4.5| 4.4863024|
|   148|    296|   5.0| 4.1451836|
|   148|    899|   4.0| 4.0523167|
|   148|    953|   4.0|  4.375654|
|   148|   1089|   4.5|  4.134958|
|   148|   1136|   4.5|  4.249985|
|   148|   1193|   4.5| 4.2548003|
|   148|   1206|   4.0| 4.0005383|
|   148|   1208|   5.0| 4.2468276|
|   148|   1212|   4.0| 4.3853817|
|   148|   1213|   4.5| 4.1827016|
|   148|   1225|   4.0| 3.8674154|
|   148|   1260|   4.0|  4.458582|
|   148|   2186|   4.0|  4.010993|
|   148|   2324|   5.0|  4.013653|
|   148|   2858|   4.0| 4.1816654|
|   148|   3949|   4.0| 3.9243815|
|   148|   6016|   4.5|   4.28027|
|   148|   7132|   4.0| 3.7809455|
+------+-------+------+----------+
only showing top 20 rows



In [15]:
# [+] 평점과 예측평점에 대한 통계 출력
predictions.select(['rating', 'prediction']).describe().show()

+-------+------------------+------------------+
|summary|            rating|        prediction|
+-------+------------------+------------------+
|  count|             13531|             13531|
|   mean|3.5914936072721897|3.4139634650931567|
| stddev| 1.045747059101007|0.7680652229693378|
|    min|               0.5|       -0.11807555|
|    max|               5.0|          5.686183|
+-------+------------------+------------------+



In [16]:
# 모델 성능 평가: RMSE(Root Mean Squared Error)
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(
    metricName='rmse',
    labelCol='rating',
    predictionCol='prediction'
)

In [17]:
# [+] RMSE 측정
rmse = evaluator.evaluate(predictions)

In [18]:
rmse

0.9209835838244532

In [23]:
# [+] 학습된 모델을 이용하여 유저별 아이템을 3개씩 추천
model.recommendForAllUsers(3).show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|     1|[{3881, 5.4032674...|
|     2|[{6946, 6.7859387...|
|     3|[{3881, 5.355999}...|
|     4|[{135456, 5.33314...|
|     5|[{7156, 4.938036}...|
|     6|[{32657, 5.652089...|
|     7|[{68522, 5.141847...|
|     8|[{671, 5.115135},...|
|     9|[{63, 5.569787}, ...|
|    10|[{71530, 4.965937...|
|    11|[{4833, 5.5037208...|
|    12|[{26758, 4.942486...|
|    13|[{135456, 5.13247...|
|    14|[{3881, 5.704973}...|
|    15|[{3881, 6.100778}...|
|    16|[{3881, 5.265323}...|
|    17|[{68848, 5.011752...|
|    18|[{6946, 4.751473}...|
|    19|[{135456, 4.96559...|
|    20|[{136449, 5.69359...|
+------+--------------------+
only showing top 20 rows



In [24]:
# [+] 학습된 모델을 이용하여 아이템별 유저를 3명씩 추천
model.recommendForAllItems(3).show()

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|      1|[{327, 5.2821536}...|
|      2|[{199, 4.366313},...|
|      3|[{49, 4.958502}, ...|
|      4|[{173, 4.1040187}...|
|      5|[{484, 4.7130713}...|
|      6|[{240, 5.135149},...|
|      7|[{451, 4.656458},...|
|      8|[{451, 4.63204}, ...|
|      9|[{153, 4.4080954}...|
|     10|[{327, 4.8133354}...|
|     11|[{22, 5.218604}, ...|
|     12|[{360, 2.8838553}...|
|     13|[{327, 5.456308},...|
|     14|[{153, 4.505759},...|
|     15|[{451, 4.0446343}...|
|     16|[{317, 5.1590037}...|
|     17|[{317, 5.424387},...|
|     18|[{117, 4.823543},...|
|     19|[{254, 4.7298284}...|
|     20|[{484, 4.6696944}...|
+-------+--------------------+
only showing top 20 rows



In [25]:
# 특정 유저 선택
user_lst = [1]

In [28]:
from pyspark.sql.types import IntegerType

In [29]:
# 데이터프레임생성
users_df = ss.createDataFrame(user_lst, IntegerType()).toDF('userID')

In [30]:
users_df.show()

+------+
|userID|
+------+
|     1|
+------+



In [31]:
# recommendForUserSubset(): 특정 유저 그룹에 대한 아이템 추천
user_recs = model.recommendForUserSubset(users_df, 5)

In [32]:
user_recs.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|     1|[{3881, 5.4032674...|
+------+--------------------+



In [33]:
# 추천결과를 파이썬 객체로 받아오기
movies_lst = user_recs.collect()[0].recommendations

In [34]:
movies_lst

[Row(movieId=3881, rating=5.4032673835754395),
 Row(movieId=4855, rating=5.235652923583984),
 Row(movieId=714, rating=5.144252777099609),
 Row(movieId=158972, rating=5.112921237945557),
 Row(movieId=4102, rating=5.088891983032227)]

In [36]:
# movies_lst 에 대한 데이터프레임 생성
recs_df = ss.createDataFrame(movies_lst)
recs_df.show()

+-------+------------------+
|movieId|            rating|
+-------+------------------+
|   3881|5.4032673835754395|
|   4855| 5.235652923583984|
|    714| 5.144252777099609|
| 158972| 5.112921237945557|
|   4102| 5.088891983032227|
+-------+------------------+



In [37]:
# [+] 영화 데이터에 대한 데이터프레임 생성
movies_df=ss.read.csv('./data/movies_short.csv', header=True, InferSchema=True)

TypeError: csv() got an unexpected keyword argument 'InferSchema'

In [38]:
movies_df.show()

NameError: name 'movies_df' is not defined

In [39]:
# [+] recs_df, movies_df 에 대한 Temporary View 생성
recs_df.createOrReplaceTempView('recommendation')
movies_df.createOrReplaceTempView('movie')

NameError: name 'movies_df' is not defined

In [41]:
# [+] SQL JOIN 연산을 통해 추천된 영화 제목 받아오기
ss.sql("SELECT * FROM movies \
        JOIN recommendations ON movies.movieID = recommendations.movieID \
        ORDER BY rating DESC").toPandas()

AnalysisException: Table or view not found: movies; line 1 pos 14;
'Sort ['rating DESC NULLS LAST], true
+- 'Project [*]
   +- 'Join Inner, ('movies.movieID = 'recommendations.movieID)
      :- 'UnresolvedRelation [movies], [], false
      +- 'UnresolvedRelation [recommendations], [], false


In [None]:
"""
    유저 별 영화 추천 서비스를 함수로 정의하기
    1. 쿼리문 작성
    2. 추천 함수 작성
"""

query = """
SELECT * 
FROM movies
JOIN recommendations ON movies.movieID = recommendations.movieID \
ORDER BY rating DESC
"""

In [None]:
def get_recommendations(user_id, num_recs):
    users_df = ss.createDataFrame([user_id], IntegerType()).toDF('userID')
    users_recs_df = model.recommendForUserSubset(users_df, num_recs)
    
    recs_lst = users_recs_df.collect()[0].recommendations
    recs_df = ss.createDataFrame(recs_lst)
    recommended_movies = ss.sql(query)
    return recommended_movies

In [None]:
# 1번 유저에 대한 영화추천
recs = get_recommendations(1, 5)

In [None]:
# toPandas(): Pandas 데이터프레임으로 출력
recs.toPandas()