In [1]:
from pyspark.sql import SparkSession

In [2]:
MAX_MEMORY = "5g"
spark = SparkSession.builder.appName("movie-recommendation")\
    .config("spark.executor.memory", MAX_MEMORY)\
    .config("spark.driver.memory", MAX_MEMORY)\
    .getOrCreate()

22/01/17 20:57:56 WARN Utils: Your hostname, wshid-MacBookPro.local resolves to a loopback address: 127.0.0.1; using 192.168.0.18 instead (on interface en0)
22/01/17 20:57:56 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/01/17 20:57:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
# 사용자의 영화에 대한 평점 데이터
ratings_file = "/Users/sion/Workspace/data-engineering/01-spark/data/ml-25m/ratings.csv"
ratings_df = spark.read.csv(f"file:///{ratings_file}", inferSchema=True, header=True)

                                                                                

In [4]:
ratings_df.show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    296|   5.0|1147880044|
|     1|    306|   3.5|1147868817|
|     1|    307|   5.0|1147868828|
|     1|    665|   5.0|1147878820|
|     1|    899|   3.5|1147868510|
|     1|   1088|   4.0|1147868495|
|     1|   1175|   3.5|1147868826|
|     1|   1217|   3.5|1147878326|
|     1|   1237|   5.0|1147868839|
|     1|   1250|   4.0|1147868414|
|     1|   1260|   3.5|1147877857|
|     1|   1653|   4.0|1147868097|
|     1|   2011|   2.5|1147868079|
|     1|   2012|   2.5|1147868068|
|     1|   2068|   2.5|1147869044|
|     1|   2161|   3.5|1147868609|
|     1|   2351|   4.5|1147877957|
|     1|   2573|   4.0|1147878923|
|     1|   2632|   5.0|1147878248|
|     1|   2692|   5.0|1147869100|
+------+-------+------+----------+
only showing top 20 rows



In [7]:
# 불필요한 timestamp 제거
ratings_df = ratings_df.select(["userId", "movieId", "rating"])

In [8]:
ratings_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)



In [9]:
# describe를 통해 데이터의 통계값 확인
ratings_df.select("rating").describe().show()



+-------+------------------+
|summary|            rating|
+-------+------------------+
|  count|          25000095|
|   mean| 3.533854451353085|
| stddev|1.0607439611423508|
|    min|               0.5|
|    max|               5.0|
+-------+------------------+



                                                                                

In [10]:
# training data set, test data set을 8:2로 나눔
train_df, test_df = ratings_df.randomSplit([0.8, 0.2])

In [11]:
# ALS 알고리즘 사용
from pyspark.ml.recommendation import ALS

In [12]:
als = ALS(
    maxIter=5, # 최대 반복 횟수
    regParam=0.1, # regulation parameter, default 1.0
    userCol="userId", # user id
    itemCol="movieId", # item
    ratingCol="rating", # rating
    coldStartStrategy="drop" # 학습하지 못한 데이터를 어떻게 할것인지 처리 여부 (N/A | drop)
)

In [13]:
# 모델 학습
model = als.fit(train_df)

22/01/17 21:03:09 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
22/01/17 21:03:09 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS
22/01/17 21:03:09 WARN InstanceBuilder$NativeLAPACK: Failed to load implementation from:dev.ludovic.netlib.lapack.JNILAPACK
                                                                                

In [14]:
# test_df로 예측
predictions = model.transform(test_df)

In [16]:
# rating은 실제 값
# predication은 예측 값
predictions.show()

[Stage 102:>                (0 + 8) / 8][Stage 104:>                (0 + 0) / 2]

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|    31|   1580|   3.0|  2.283293|
|    31|   3175|   1.5| 2.3984544|
|    76|   3175|   3.5|   3.55222|
|   321|   6620|   3.5|  3.644322|
|   322|    463|   3.0|    3.2425|
|   368|   1580|   3.5| 3.6547854|
|   472|   1088|   4.0| 3.3510244|
|   513|  44022|   5.0| 4.2461896|
|   516|    833|   3.0| 2.9450257|
|   587|   6466|   4.0|  3.452406|
|   588|   1645|   2.5| 2.5667627|
|   596|   1580|   3.0| 3.5415156|
|   597|   1088|   3.0| 3.3035998|
|   597|   1645|   5.0|   3.44155|
|   597|   3997|   1.0| 2.0033834|
|   606|  36525|   2.5| 4.1796446|
|   606|  44022|   4.5| 4.0583444|
|   606| 160563|   4.0|  4.043702|
|   626|   1580|   4.0| 3.4979348|
|   626|   2866|   3.0| 3.3942947|
+------+-------+------+----------+
only showing top 20 rows





In [17]:
# 통계값 확인
predictions.select('rating', 'prediction').describe().show()



+-------+------------------+------------------+
|summary|            rating|        prediction|
+-------+------------------+------------------+
|  count|           4993906|           4993906|
|   mean|3.5338325350937723|3.3957891412286845|
| stddev|1.0609135850925895|0.6378993098272947|
|    min|               0.5|        -1.3874503|
|    max|               5.0|         6.5356765|
+-------+------------------+------------------+



                                                                                

In [19]:
from pyspark.ml.evaluation import RegressionEvaluator

# RMSE를 통한 evaluation, labelCol이 실제 값 컬럼
evaluator = RegressionEvaluator(metricName="rmse", labelCol='rating', predictionCol='prediction')

In [20]:
rmse = evaluator.evaluate(predictions)

                                                                                

In [21]:
print(rmse)

0.8143175872661844


In [22]:
# userId별 추천(user별 3개의 item 추천)
model.recommendForAllUsers(3).show()



+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|    26|[{194434, 5.78166...|
|    27|[{194334, 5.92263...|
|    28|[{194434, 7.77216...|
|    31|[{194334, 3.80042...|
|    34|[{194434, 5.98385...|
|    44|[{194434, 6.87463...|
|    53|[{194334, 6.66743...|
|    65|[{194434, 6.38657...|
|    76|[{194434, 6.50299...|
|    78|[{194434, 6.98369...|
|    81|[{98693, 4.455678...|
|    85|[{189157, 5.66777...|
|   101|[{194434, 5.15328...|
|   103|[{194434, 6.27564...|
|   108|[{194434, 5.59471...|
|   115|[{194434, 6.58203...|
|   126|[{194434, 6.14670...|
|   133|[{194434, 5.41729...|
|   137|[{205453, 5.67344...|
|   148|[{194434, 6.12241...|
+------+--------------------+
only showing top 20 rows



                                                                                

In [22]:
# item별 추천(item마다 3개의 user 추천)
model.recommendForAllItems(3).show()

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|     28|[{105801, 5.48301...|
|     31|[{10417, 5.051482...|
|     34|[{128562, 5.38732...|
|     53|[{96740, 5.325608...|
|     65|[{87426, 4.871016...|
|     78|[{67467, 4.594074...|
|     81|[{7629, 4.6699777...|
|     85|[{105801, 4.77481...|
|    101|[{142811, 4.89765...|
|    108|[{4243, 4.9666142...|
|    115|[{105801, 5.67604...|
|    126|[{87426, 4.70493}...|
|    133|[{119077, 5.08030...|
|    137|[{113441, 4.99710...|
|    148|[{96740, 4.183742...|
|    155|[{10417, 5.021953...|
|    183|[{87426, 5.186278...|
|    193|[{87426, 5.032519...|
|    210|[{67467, 4.752000...|
|    211|[{105801, 5.12777...|
+-------+--------------------+
only showing top 20 rows



In [23]:
from pyspark.sql.types import IntegerType
# 유저 3명에 대한 recommendation을 만들기 위함
user_list = [65, 78, 81]
users_df = spark.createDataFrame(user_list, IntegerType()).toDF('userId')

users_df.show()

+------+
|userId|
+------+
|    65|
|    78|
|    81|
+------+



[Stage 248:>                                                        (0 + 1) / 1]                                                                                

In [24]:
# user 일부분에 대한 추천
# 유저별 5개씩 추천
user_recs = model.recommendForUserSubset(users_df, 5)

In [26]:
movies_list = user_recs.collect()[0].recommendations

In [27]:
recs_df = spark.createDataFrame(movies_list)
recs_df.show()

+-------+-----------------+
|movieId|           rating|
+-------+-----------------+
| 194434|6.386569976806641|
| 192261|5.964387893676758|
|  98221|5.939274311065674|
| 194212|5.904045104980469|
| 141636|5.890636444091797|
+-------+-----------------+



In [28]:
movies_file = "/Users/sion/Workspace/data-engineering/01-spark/data/ml-25m/movies.csv"
movies_df = spark.read.csv(f"file:///{movies_file}", inferSchema=True, header=True)

In [29]:
movies_df.show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

In [30]:
recs_df.createOrReplaceTempView("recommendations")
movies_df.createOrReplaceTempView("movies")

In [31]:
query = """
SELECT *
FROM
    movies JOIN recommendations
    ON movies.movieId = recommendations.movieId
ORDER BY
    rating desc
"""
recommended_movies = spark.sql(query)
recommended_movies.show()

+-------+--------------------+--------------------+-------+-----------------+
|movieId|               title|              genres|movieId|           rating|
+-------+--------------------+--------------------+-------+-----------------+
| 194434|   Adrenaline (1990)|  (no genres listed)| 194434|6.386569976806641|
| 192261|Don't Laugh at My...|        Comedy|Drama| 192261|5.964387893676758|
|  98221|Year One, The (L'...|              Comedy|  98221|5.939274311065674|
| 194212|Social Life of Sm...|  (no genres listed)| 194212|5.904045104980469|
| 141636|    Papanasam (2015)|Children|Crime|Dr...| 141636|5.890636444091797|
+-------+--------------------+--------------------+-------+-----------------+



In [32]:
# user마다 추천하는 영화 목록 리턴
def get_recommendations(user_id, num_recs):
    users_df = spark.createDataFrame([user_id], IntegerType()).toDF('userId')
    user_recs_df = model.recommendForUserSubset(users_df, num_recs)
    
    recs_list = user_recs_df.collect()[0].recommendations
    recs_df = spark.createDataFrame(recs_list)
    recommended_movies = spark.sql(query)
    return recommended_movies

In [33]:
# userId=456, 10개의 영화 추천
recs = get_recommendations(456, 10)

                                                                                

In [34]:
recs.toPandas()

Unnamed: 0,movieId,title,genres,movieId.1,rating
0,194434,Adrenaline (1990),(no genres listed),194434,6.38657
1,192261,Don't Laugh at My Romance (2008),Comedy|Drama,192261,5.964388
2,98221,"Year One, The (L'an 01) (1973)",Comedy,98221,5.939274
3,194212,Social Life of Small Urban Spaces (1988),(no genres listed),194212,5.904045
4,141636,Papanasam (2015),Children|Crime|Drama|Thriller,141636,5.890636


In [35]:
spark.stop()