In [1]:
# [+] SparkSession 설정
from pyspark.sql import SparkSession

ss = SparkSession.builder.master('local').appName('movie-recommendation').getOrCreate()

In [2]:
# [+] movielens 데이터 불러오기
# ratings_short.csv: 원본 데이터에서 7만개의 평점 데이터만 선택한 버전

ratings_df = ss.read.csv('./data/ratings_short.csv', header=True, inferSchema=True) # csv로부터 dataframe만들어주는 메소드

# header옵션 : 그냥 하면 header를 value로 인식함. header로 인식해야함.
# inferSchema옵션 : schema추론,들어온 값들 보고 데이터 타입 추측해서 결정해주는 것

In [3]:
ratings_df.show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    296|   5.0|1147880044|
|     1|    306|   3.5|1147868817|
|     1|    307|   5.0|1147868828|
|     1|    665|   5.0|1147878820|
|     1|    899|   3.5|1147868510|
|     1|   1088|   4.0|1147868495|
|     1|   1175|   3.5|1147868826|
|     1|   1217|   3.5|1147878326|
|     1|   1237|   5.0|1147868839|
|     1|   1250|   4.0|1147868414|
|     1|   1260|   3.5|1147877857|
|     1|   1653|   4.0|1147868097|
|     1|   2011|   2.5|1147868079|
|     1|   2012|   2.5|1147868068|
|     1|   2068|   2.5|1147869044|
|     1|   2161|   3.5|1147868609|
|     1|   2351|   4.5|1147877957|
|     1|   2573|   4.0|1147878923|
|     1|   2632|   5.0|1147878248|
|     1|   2692|   5.0|1147869100|
+------+-------+------+----------+
only showing top 20 rows



In [4]:
ratings_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)
 |-- timestamp: integer (nullable = true)



In [5]:
# [+] 타임스탬프 제외한 컬럼 선택
# 간단한 컬럼 선택하는 정도는 select 메소드 사용
ratings_df = ratings_df.select(['userId', 'movieId', 'rating'])
# 이렇게 하면 이 column 뺴고 날아감

In [7]:
# [+] describe(): 기본 통계치 출력 : show메소드 붙여줘야 출력됨
# ratings_df.describe().show()

ratings_df.select('rating').describe().show()
# userID는 의미없고 rating값이 의미있음.

+-------+------------------+
|summary|            rating|
+-------+------------------+
|  count|             71921|
|   mean|3.5821387355570695|
| stddev| 1.042406032579843|
|    min|               0.5|
|    max|               5.0|
+-------+------------------+



In [9]:
# [+] randomSplit(): 훈련 데이터셋과 테스트 데이터셋을 나누기
# 테스트 데이터셋에는 정답 안주고 훈련 데이터셋에만 정답 주기.
train_df, test_df = ratings_df.randomSplit([0.8, 0.2])

# 두 개의 실수값 : 훈련 데이터셋과 테스트 데이터셋의 비율

In [10]:
# [+] 추천 알고리즘(Alternating Least Squares) 임포트
# pyspark.ml.recommendation.ALS
from pyspark.ml.recommendation import ALS

In [11]:
# 추천 알고리즘 설정 : 얘도 모델이기 때문에 매개변수 작성해줘야함.

als = ALS(
    maxIter=5,
    regParam=0.1,
    userCol='userId',
    itemCol='movieId',
    ratingCol='rating',
    coldStartStrategy='drop'
)

# maxIter : 몇 번 반복 -> 값 너무 크면 학습시간 오래 걸림. 너무 작으면 학습이 잘 안됨
# regParam : 정규화 매개변수 -> 모델이 학습할 때 특정 문제에 꽂혀서 쏠리는 '과적합'을 막기 위해 도움
# userCol : user의 ID가 어떤 column인지
# ItemCol : Item의 ID가 어떤 column인지
# ratingCol : rating의 ID가 어떤 column인지
# coldStartStrategy : 협업 필터링도 일반적으로 데이터 끌어와서 시작. 초반에 데이터 부족할 때 어떻게 처리할거냐(일단은 drop시킨다)

In [12]:
# [+] 모델 학습
# 학습 알고리즘은 estimater임 : df입력(훈련데이터) 받고 fit()호출하면 최종 결과물이 학습된 결과물 내보냄
# 머신러닝에서는 학습된 모델도 transformter임
model = als.fit(train_df)

In [13]:
# # 메모리 부족으로 인한 오류 발생시, 아래의 코드를 실행
# from pyspark.sql import SparkSession

# MAX_MEMORY = '5g'
# ss = SparkSession.builder.appName('movie-recommendation')\
#     .config('spark.executor.memory', MAX_MEMORY)\
#     .config('spark.driver.memory', MAX_MEMORY)\
#     .getOrCreate()

In [14]:
# [+] 모델 예측
predictions = model.transform(test_df)

In [13]:
predictions.show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   148|     19|   3.0| 2.4453733|
|   148|     32|   4.0| 3.9750795|
|   148|    296|   5.0| 4.2587757|
|   148|    527|   5.0|  4.327331|
|   148|    608|   3.0|   4.17881|
|   148|    899|   4.0| 4.0264363|
|   148|    903|   4.0|  4.113871|
|   148|    926|   4.0|  4.129926|
|   148|   1199|   3.5| 3.9897857|
|   148|   1204|   4.0| 4.1555805|
|   148|   1219|   4.5| 3.9384148|
|   148|   1284|   4.0|  4.102206|
|   148|   2951|   4.5| 4.0937843|
|   148|   2959|   4.5|  4.321707|
|   148|   5952|   4.5| 3.9747446|
|   148|  44191|   4.0|  4.094489|
|   148|  48394|   4.0| 3.9580433|
|   148|  54286|   3.5| 3.6169014|
|   148|  63082|   3.5| 3.7964337|
|   148|  88810|   4.0| 3.8019857|
+------+-------+------+----------+
only showing top 20 rows



In [19]:
# [+] 평점과 예측평점에 대한 통계 출력
predictions.select('rating','prediction').describe().show()

+-------+------------------+------------------+
|summary|            rating|        prediction|
+-------+------------------+------------------+
|  count|             13404|             13404|
|   mean|3.6026186213070726|3.4263027497914362|
| stddev|1.0458274172510669| 0.755273742684219|
|    min|               0.5|        -0.2114266|
|    max|               5.0|         5.4964566|
+-------+------------------+------------------+



In [20]:
# 모델 성능 평가: RMSE(Root Mean Squared Error)
from pyspark.ml.evaluation import RegressionEvaluator

evaluator = RegressionEvaluator(
    metricName='rmse',
    labelCol='rating',
    predictionCol='prediction'
)
# metric : 손실함수(?)
# labelCol : lable이 뭐냐 -> 여기선 rating이 정답. 예측값이 prediction
# predictionCol : 예측값이 뭐냐

In [21]:
# [+] RMSE 측정
rmse = evaluator.evaluate(predictions)

In [22]:
rmse

0.9196393714889269

In [23]:
# [+] 학습된 모델을 이용하여 유저별 아이템을 3개씩 추천
model.recommendForAllUsers(3).show()



+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|     1|[{127098, 4.99715...|
|     2|[{27831, 5.841985...|
|     3|[{98056, 5.000975...|
|     4|[{5269, 5.2346363...|
|     5|[{3896, 5.3201604...|
|     6|[{66371, 5.592423...|
|     7|[{26758, 4.810811...|
|     8|[{27815, 5.020187...|
|     9|[{2747, 5.3472943...|
|    10|[{87234, 4.877119...|
|    11|[{3896, 4.7304196...|
|    12|[{3881, 5.037133}...|
|    13|[{3881, 5.122769}...|
|    14|[{3881, 5.4408813...|
|    15|[{3881, 5.8478184...|
|    16|[{3881, 5.417631}...|
|    17|[{27831, 4.833974...|
|    18|[{2116, 4.65739},...|
|    19|[{8235, 5.02025},...|
|    20|[{3896, 5.5270505...|
+------+--------------------+
only showing top 20 rows



In [24]:
# [+] 학습된 모델을 이용하여 아이템별 유저를 3명씩 추천
model.recommendForAllItems(3).show()

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|      1|[{327, 5.280331},...|
|      2|[{240, 4.902102},...|
|      3|[{198, 4.730104},...|
|      4|[{240, 4.837999},...|
|      5|[{484, 4.615079},...|
|      6|[{87, 5.0074406},...|
|      7|[{49, 4.4785647},...|
|      9|[{50, 4.235085}, ...|
|     10|[{87, 4.669992}, ...|
|     11|[{448, 5.0882316}...|
|     12|[{448, 3.1003792}...|
|     13|[{327, 5.450998},...|
|     14|[{235, 4.586305},...|
|     15|[{198, 4.5943217}...|
|     16|[{448, 5.404675},...|
|     17|[{198, 5.386191},...|
|     18|[{240, 5.1541247}...|
|     19|[{199, 4.5232077}...|
|     20|[{501, 5.0340548}...|
|     21|[{474, 4.421303},...|
+-------+--------------------+
only showing top 20 rows



In [69]:
# 특정 유저 선택
user_lst = [1]

In [70]:
from pyspark.sql.types import IntegerType

In [71]:
# 데이터프레임생성
users_df = ss.createDataFrame(user_lst, IntegerType()).toDF('userID')

In [72]:
users_df.show()

+------+
|userID|
+------+
|     1|
+------+



In [73]:
# recommendForUserSubset(): 특정 유저 그룹에 대한 아이템 추천
user_recs = model.recommendForUserSubset(users_df, 5)



In [74]:
user_recs.show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|     1|[{127098, 4.99715...|
+------+--------------------+



In [75]:
# 추천결과를 파이썬 객체로 받아오기
movies_lst = user_recs.collect()[0].recommendations

In [76]:
movies_lst 
# 여기까지 추천한 것. 영화제목 불러오기 위해 join연산 할 것

[Row(movieId=127098, rating=4.997156620025635),
 Row(movieId=8327, rating=4.93673038482666),
 Row(movieId=5767, rating=4.9037184715271),
 Row(movieId=106452, rating=4.903459548950195),
 Row(movieId=55363, rating=4.885514259338379)]

In [77]:
# movies_lst 에 대한 데이터프레임 생성
recs_df = ss.createDataFrame(movies_lst)
recs_df.show()

+-------+-----------------+
|movieId|           rating|
+-------+-----------------+
| 127098|4.997156620025635|
|   8327| 4.93673038482666|
|   5767|  4.9037184715271|
| 106452|4.903459548950195|
|  55363|4.885514259338379|
+-------+-----------------+



In [78]:
# [+] 영화 데이터에 대한 데이터프레임 생성
movies_df = ss.read.csv('./data/movies_short.csv', inferSchema=True, header=True)
movies_df.show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

In [79]:
# [+] recs_df, movies_df 에 대한 Temporary View 생성
recs_df.createOrReplaceTempView('recommendations')
movies_df.createOrReplaceTempView('movies')

In [80]:
# [+] SQL JOIN 연산을 통해 추천된 영화 제목 받아오기
ss.sql("SELECT * FROM movies \
        JOIN recommendations ON movies.movieID = recommendations.movieID \
        ORDER BY rating DESC").toPandas()

# 한 사람에 대한 추천 결과와 title을 볼 수 있음. 
# spark sql에서 출력하면 잘리고 show메소드로 볼 수 있는데 pandas의 dataframe으로 바꿔서 보면 편하게 볼 수 있음

Unnamed: 0,movieId,title,genres,movieId.1,rating
0,8327,Dolls (2002),Drama|Romance,8327,4.93673


In [81]:
"""
    유저 별 영화 추천 서비스를 함수로 정의하기
    1. 쿼리문 작성
    2. 추천 함수 작성
"""

query = """
SELECT * 
FROM movies
JOIN recommendations ON movies.movieID = recommendations.movieID \
ORDER BY rating DESC
"""

# query를 문자열로 빼고 함수를 정의할 때 sql메소드 통해 => ss.sql(query) 정의

In [82]:
def get_recommendations(user_id, num_recs):
    users_df = ss.createDataFrame([user_id], IntegerType()).toDF('userID')
    users_recs_df = model.recommendForUserSubset(users_df, num_recs)
    
    recs_lst = users_recs_df.collect()[0].recommendations
    recs_df = ss.createDataFrame(recs_lst)
    recommended_movies = ss.sql(query)
    return recommended_movies

In [83]:
# 1번 유저에 대한 영화추천
recs = get_recommendations(1, 5)



In [84]:
# toPandas(): Pandas 데이터프레임으로 출력
recs.toPandas()

Unnamed: 0,movieId,title,genres,movieId.1,rating
0,8327,Dolls (2002),Drama|Romance,8327,4.93673


In [68]:
# 문제는 평점을 날렸고 전체 영화 정보를 잘라냄. 그래서 다른 정보 받아서 추천 받으려면 오류날 수 잇음.추천해주는데 movies데이터에 존재하지않을 수 있음.
# cluster를 구성하면 MovieLens 데이터셋으로 분석하고 추천 가능하긴 함. 