In [1]:
from pyspark.sql import SparkSession

In [2]:
MAX_MEMORY = "5g"
spark = SparkSession.builder.appName("movie-recommendation")\
    .config("spark.executor.memory", MAX_MEMORY)\
    .config("spark.driver.memory", MAX_MEMORY)\
    .getOrCreate()

In [3]:
ratings_file = "/Users/gimhyeonjeong/data-engineering/01-spark/data/ml-25m/ratings.csv"
ratings_df = spark.read.csv(f"file:///{ratings_file}", inferSchema=True, header= True)

In [4]:
ratings_df.show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    296|   5.0|1147880044|
|     1|    306|   3.5|1147868817|
|     1|    307|   5.0|1147868828|
|     1|    665|   5.0|1147878820|
|     1|    899|   3.5|1147868510|
|     1|   1088|   4.0|1147868495|
|     1|   1175|   3.5|1147868826|
|     1|   1217|   3.5|1147878326|
|     1|   1237|   5.0|1147868839|
|     1|   1250|   4.0|1147868414|
|     1|   1260|   3.5|1147877857|
|     1|   1653|   4.0|1147868097|
|     1|   2011|   2.5|1147868079|
|     1|   2012|   2.5|1147868068|
|     1|   2068|   2.5|1147869044|
|     1|   2161|   3.5|1147868609|
|     1|   2351|   4.5|1147877957|
|     1|   2573|   4.0|1147878923|
|     1|   2632|   5.0|1147878248|
|     1|   2692|   5.0|1147869100|
+------+-------+------+----------+
only showing top 20 rows



In [5]:
ratings_df = ratings_df.select(["userId", "movieId", "rating"])

In [6]:
ratings_df.printSchema()

root
 |-- userId: integer (nullable = true)
 |-- movieId: integer (nullable = true)
 |-- rating: double (nullable = true)



In [7]:
ratings_df.select("rating").describe().show()

+-------+------------------+
|summary|            rating|
+-------+------------------+
|  count|          25000095|
|   mean| 3.533854451353085|
| stddev|1.0607439611423508|
|    min|               0.5|
|    max|               5.0|
+-------+------------------+



In [8]:
train_df, test_df = ratings_df.randomSplit([0.8, 0.2])

In [9]:
from pyspark.ml.recommendation import ALS

In [10]:
als = ALS(
    maxIter=5,
    regParam = 0.1,
    userCol = "userId",
    itemCol = "movieId",
    ratingCol = "rating",
    coldStartStrategy = "drop"
)
#  coldStartStrategy 학습하지 못한 데이터를 만났을 때 어떻게 대처해야 하는가를 개발자가 설정해 주는 것
# NaN과 drop 둘 중 하나를 선택할 수 있다. 

In [11]:
model = als.fit(train_df)

In [13]:
predictions = model.transform(test_df)

In [14]:
predictions.show()

+------+-------+------+----------+
|userId|movieId|rating|prediction|
+------+-------+------+----------+
|   101|   8638|   5.0| 3.6144485|
|   137|   1645|   3.0|  3.153439|
|   243|  44022|   3.0| 2.4311182|
|   321| 175197|   0.5| 2.0705743|
|   322|   1645|   4.0| 3.6479063|
|   497|   1580|   5.0|  3.338778|
|   497|   2366|   4.0| 3.9773235|
|   501|   1580|   5.0| 3.9613252|
|   587|   6466|   4.0|  3.487387|
|   597|   1645|   5.0|  3.539884|
|   606|   1580|   5.0| 4.2250643|
|   606|  44022|   4.5|  4.024259|
|   606|  68135|   3.5| 3.9022818|
|   606| 160563|   4.0|  4.057866|
|   613|   1088|   4.0| 3.1773624|
|   626|   1580|   4.0| 3.5669951|
|   626|   2366|   3.0| 3.1994846|
|   626|  36525|   4.0| 3.4081855|
|   626|  44022|   3.0| 3.2233274|
|   642|   1580|   3.5| 3.4667833|
+------+-------+------+----------+
only showing top 20 rows



In [15]:
predictions.select("rating", "prediction").describe().show()

+-------+------------------+------------------+
|summary|            rating|        prediction|
+-------+------------------+------------------+
|  count|           4995859|           4995859|
|   mean|3.5343254283197343|3.4543073999785956|
| stddev|1.0608520764224363|0.6509529454659341|
|    min|               0.5|        -1.5936148|
|    max|               5.0|          6.352272|
+-------+------------------+------------------+



In [18]:
from pyspark.ml.evaluation import RegressionEvaluator 
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating", predictionCol="prediction")

In [19]:
rmse = evaluator.evaluate(predictions)

In [20]:
print(rmse)

0.8049534420669959


In [21]:
model.recommendForAllUsers(3).show()

+------+--------------------+
|userId|     recommendations|
+------+--------------------+
|    26|[{203086, 5.43596...|
|    27|[{203086, 5.96192...|
|    28|[{203882, 6.78700...|
|    31|[{203882, 3.81420...|
|    34|[{203882, 5.46517...|
|    44|[{203882, 6.38860...|
|    53|[{192089, 6.35343...|
|    65|[{122754, 5.69666...|
|    76|[{203882, 6.12306...|
|    78|[{203882, 6.73146...|
|    81|[{203882, 4.67133...|
|    85|[{177411, 5.76137...|
|   101|[{203882, 5.01555...|
|   103|[{203882, 6.06124...|
|   108|[{203882, 5.02522...|
|   115|[{203882, 5.99085...|
|   126|[{203882, 6.05805...|
|   133|[{203882, 5.66569...|
|   137|[{203086, 5.62483...|
|   148|[{203882, 5.69230...|
+------+--------------------+
only showing top 20 rows



In [23]:
model.recommendForAllItems(3).show()

+-------+--------------------+
|movieId|     recommendations|
+-------+--------------------+
|     12|[{87426, 5.448573...|
|     26|[{105801, 5.12766...|
|     27|[{87426, 5.62924}...|
|     28|[{67565, 5.860608...|
|     31|[{87426, 5.3407},...|
|     34|[{67565, 5.82396}...|
|     44|[{87426, 5.400751...|
|     53|[{18885, 5.702614...|
|     65|[{87426, 5.430656...|
|     76|[{87426, 5.42044}...|
|     78|[{67467, 4.865630...|
|     81|[{67467, 4.830916...|
|     85|[{67565, 5.188597...|
|    101|[{42665, 4.981856...|
|    103|[{87426, 5.261060...|
|    108|[{86709, 5.803177...|
|    115|[{67467, 6.046056...|
|    126|[{87426, 5.063264...|
|    133|[{67565, 5.763334...|
|    137|[{31506, 5.889635...|
+-------+--------------------+
only showing top 20 rows



In [25]:
from pyspark.sql.types import IntegerType

user_list = [65, 78, 81]
users_df = spark.createDataFrame(user_list, IntegerType()).toDF('userId')

users_df.show()

+------+
|userId|
+------+
|    65|
|    78|
|    81|
+------+



In [34]:
user_recs = model.recommendForUserSubset(users_df, 5)

In [35]:
movies_list = user_recs.collect()[0].recommendations

In [40]:
recs_df = spark.createDataFrame(movies_list)
recs_df.show()

+-------+------------------+
|movieId|            rating|
+-------+------------------+
| 122754|5.6966657638549805|
| 169606| 5.688094139099121|
| 178453| 5.593918323516846|
| 129516|   5.4950270652771|
| 205453| 5.465090274810791|
+-------+------------------+



In [41]:
movies_file = "/Users/gimhyeonjeong/data-engineering/01-spark/data/ml-25m/movies.csv"
movies_df = spark.read.csv(f"file:///{movies_file}", inferSchema=True, header= True)

In [42]:
movies_df.show()

+-------+--------------------+--------------------+
|movieId|               title|              genres|
+-------+--------------------+--------------------+
|      1|    Toy Story (1995)|Adventure|Animati...|
|      2|      Jumanji (1995)|Adventure|Childre...|
|      3|Grumpier Old Men ...|      Comedy|Romance|
|      4|Waiting to Exhale...|Comedy|Drama|Romance|
|      5|Father of the Bri...|              Comedy|
|      6|         Heat (1995)|Action|Crime|Thri...|
|      7|      Sabrina (1995)|      Comedy|Romance|
|      8| Tom and Huck (1995)|  Adventure|Children|
|      9| Sudden Death (1995)|              Action|
|     10|    GoldenEye (1995)|Action|Adventure|...|
|     11|American Presiden...|Comedy|Drama|Romance|
|     12|Dracula: Dead and...|       Comedy|Horror|
|     13|        Balto (1995)|Adventure|Animati...|
|     14|        Nixon (1995)|               Drama|
|     15|Cutthroat Island ...|Action|Adventure|...|
|     16|       Casino (1995)|         Crime|Drama|
|     17|Sen

In [44]:
# sql에서 사용하기 위해 tempview에 넣기 
recs_df.createOrReplaceTempView("recommendations")
movies_df.createOrReplaceTempView("movies")

In [46]:
query = """
SELECT * 
FROM 
    movies JOIN recommendations
    ON movies.movieId = recommendations.movieId
ORDER BY 
    rating desc
"""

recommended_movies = spark.sql(query)
recommended_movies.show()

+-------+--------------------+--------------------+-------+------------------+
|movieId|               title|              genres|movieId|            rating|
+-------+--------------------+--------------------+-------+------------------+
| 122754|Nemesis 3: Time L...|Action|Sci-Fi|Thr...| 122754|5.6966657638549805|
| 169606|Dara O'Briain Cro...|              Comedy| 169606| 5.688094139099121|
| 178453|Ukraine on Fire (...|  (no genres listed)| 178453| 5.593918323516846|
| 129516|       Poison (1951)|              Comedy| 129516|   5.4950270652771|
| 205453|The Good Fight: T...|         Documentary| 205453| 5.465090274810791|
+-------+--------------------+--------------------+-------+------------------+



In [58]:
def get_recommendations(user_id, num_recs):
    users_df = spark.createDataFrame([user_id], IntegerType()).toDF('userId')
    user_recs_df = model.recommendForUserSubset(users_df, num_recs)
    
    recs_list = user_recs_df.collect()[0].recommendations
    recs_df = spark.createDataFrame(recs_list)
    recommended_movies = spark.sql(query)
    return recommended_movies

In [59]:
recs = get_recommendations(456, 10)

In [60]:
recs.toPandas()

Unnamed: 0,movieId,title,genres,movieId.1,rating
0,122754,Nemesis 3: Time Lapse (1996),Action|Sci-Fi|Thriller,122754,5.696666
1,169606,Dara O'Briain Crowd Tickler (2015),Comedy,169606,5.688094
2,178453,Ukraine on Fire (2016),(no genres listed),178453,5.593918
3,129516,Poison (1951),Comedy,129516,5.495027
4,205453,The Good Fight: The Abraham Lincoln Brigade in...,Documentary,205453,5.46509


In [61]:
spark.stop()