In [1]:
import sys
sys.path.append("../src")

In [2]:
from Utils import *

In [3]:
sc = SparkContext.getOrCreate()
ss = SparkSession(sc)

In [4]:
def sample_data(df, movies_count=5000, users_count=1000):
    return df.select("movieId").dropDuplicates().limit(movies_count)\
            .join(df, on="movieId").join(df.select("userId").dropDuplicates().limit(users_count), on="userId")
            

In [5]:
def get_user_recs_error(ui, similarities, recs_size=10):
    user_recs = similarities.filter(similarities.ui == ui) \
        .join(ratings, similarities.uj == ratings.userId) \
        .select([col("ui").alias("userId"), "movieId", "title",
                 "genres", col("reliable_rating").alias("rating"), "sim"])

    user_recs = user_recs.withColumn("pred", user_recs.rating * user_recs.sim) \
        .groupBy(["userId", "movieId", "title", "genres"]).avg("pred") \
        .sort(desc("avg(pred)")) \
        .select(["userId", "movieId", "title", "genres",
                 col("avg(pred)").alias("pred")])

    errors = user_recs.join(ratings, on=["userId", "movieId"]) \
        .select([col("reliable_rating").alias("rating"), "pred"])
    errors = errors.withColumn("error", (errors.rating - errors.pred) ** 2)

    error = errors.rdd.map(lambda x: x["error"]).reduce(lambda x1, x2: x1 + x2)
    return error, user_recs.limit(recs_size), errors.count()

def get_recommendations(users, similarities, recs_size=10):
    error = 0
    recs = None
    count = 0
    for u in users:
        print("Predicting for user: {0}".format(u))
        e, r, c = get_user_recs_error(u, similarities, recs_size)
        recs = recs.union(r) if recs else r
        error += e
        count += c
        print("MSE: ", round(e/c, 3))
        print()
    error /= count
    return recs, error**0.5

<img src="../misc/r.png">

In [6]:
ratings = ss.read\
            .format("csv")\
            .option('header', 'true')\
            .load(os.path.join("../data","userId_movieId_title_ratings.csv"), inferSchema='true')
    
ratings = sample_data(ratings)
ratings.persist()
print("Users: {0:,}".format(get_count(ratings,"userId")))
print("Movies: {0:,}".format(get_count(ratings,"movieId")))
print("Ratings: {0:,}".format(ratings.count()))
ratings.show(5)

Users: 1,000
Movies: 3,306
Ratings: 80,453
+------+-------+--------+--------------------+--------------------+------+------------------+
|userId|movieId|ratingId|               title|              genres|rating|   reliable_rating|
+------+-------+--------+--------------------+--------------------+------+------------------+
|  5936|    111|  887646|  Taxi Driver (1976)|Crime|Drama|Thriller|   4.5|3.7691303537482317|
|  5936|    223|  887648|       Clerks (1994)|              Comedy|   4.0|3.3503380922206505|
|  5936|    296|  887650| Pulp Fiction (1994)|Comedy|Crime|Dram...|   5.0| 4.187922615275813|
|  5936|    471|  887655|Hudsucker Proxy, ...|              Comedy|   4.5|3.7691303537482317|
|  5936|    858|  887660|Godfather, The (1...|         Crime|Drama|   4.0|3.3503380922206505|
+------+-------+--------+--------------------+--------------------+------+------------------+
only showing top 5 rows



### Pearson Correlation Coefficient

In [7]:
avg_ratings_per_item = lambda item: ratings.select([item, "rating", "reliable_rating"])\
                                               .groupBy(item).avg()\
                                               .select(
    [item, col("avg(rating)").alias("#rating"),col("avg(reliable_rating)").alias("#reliable_rating")])

avg_ratings_per_item = avg_ratings_per_item("movieId")
print("{0:,} movies".format(avg_ratings_per_item.count()))
avg_ratings_per_item.show(5)

3,306 movies
+-------+------------------+------------------+
|movieId|           #rating|  #reliable_rating|
+-------+------------------+------------------+
|    471|3.7389380530973453|2.9379942895836697|
|  31528|               3.3|2.9599100902263764|
|   4900|               3.5| 3.074913963128231|
|   1580| 3.502918287937743|2.8652828102817773|
|  44022| 3.267543859649123|2.5368997352610396|
+-------+------------------+------------------+
only showing top 5 rows



In [8]:
avg_movie_ratings = sc.broadcast({r["movieId"]: (r["#rating"], r["#reliable_rating"]) for r in avg_ratings_per_item.collect()})

In [9]:
def pearson_sim_per_movie(rates, avg_rating):
    """
    rates is a list [(ui, rating) for every user rating that movie]
    """
    d = dict()
    for (u1, r1), (u2, r2) in product(rates, rates):
        if u1 == u2: continue
        d[(u1, u2)] = ((r1 - avg_rating) * (r2-avg_rating), (r1 - avg_rating)**2, (r2 - avg_rating)**2)
    return list(d.items())

In [10]:
pearson_similarities = ratings.rdd.map(lambda r: (r["movieId"], (r["userId"], r["reliable_rating"])))\
                                    .groupByKey()\
                                    .map(lambda x: (x[0], pearson_sim_per_movie(x[1], avg_movie_ratings.value[x[0]][1])))\
                                    .flatMapValues(lambda x:x)\
                                    .map(lambda x:x[1])\
                                    .reduceByKey(lambda x1, x2: (x1[0]+x2[0], x1[1]+x2[1], x1[2]+x2[2]))\
                                    .map(lambda x: Row(ui=x[0][0], uj=x[0][1], sim=x[1][0]/(x[1][1]**0.5 * x[1][2]**0.5)))\
                                    .toDF()\
                                    .sort(desc("sim"))
pearson_similarities.write.mode('overwrite').csv("../data/pearson_similarities_similarities.csv", header=True)
pearson_similarities.show(5)

+---+-----+-----+
|sim|   ui|   uj|
+---+-----+-----+
|1.0| 5936|55858|
|1.0| 5936|29370|
|1.0| 5936|88162|
|1.0|22356|54838|
|1.0|11571|93731|
+---+-----+-----+
only showing top 5 rows



In [13]:
test_users = ratings.select(["userId"]).dropDuplicates().rdd.map(lambda r: r[0]).collect()[:10]

pearson_recs, error = get_recommendations(test_users, pearson_similarities, recs_size=10)
print("RMSE: ", error)
pearson_recs.show()
del pearson_similarities

Predicting for user: 4935
MSE:  7.816

Predicting for user: 7880
MSE:  8.443

Predicting for user: 7993
MSE:  7.916

Predicting for user: 10817
MSE:  7.065

Predicting for user: 13285
MSE:  8.268

Predicting for user: 23364
MSE:  9.14

Predicting for user: 23571
MSE:  8.604

Predicting for user: 25591
MSE:  9.116

Predicting for user: 34234
MSE:  10.56

Predicting for user: 34239
MSE:  12.334

RMSE:  2.929756791890806
+------+-------+--------------------+--------------------+------------------+
|userId|movieId|               title|              genres|              pred|
+------+-------+--------------------+--------------------+------------------+
|  4935|   8025|Thief, The (Vor) ...|               Drama|4.4509764577351705|
|  4935|  46083|Drawing Restraint...|             Fantasy| 3.912137394900763|
|  4935| 128600|John Doe: Vigilan...|      Crime|Thriller|3.1524388468780846|
|  4935|  73135|Good Life, The (2...|        Comedy|Drama| 3.129709915920611|
|  4935|  96490|Possession, The 

### Bipartite Graph Reinforcement (BGR) similarity
- What is the probability that $u_1$ would give similar rating to *at least* one movie as $u_2$

<img src="../misc/bgrs.png">

$$ \textbf{BGR}(u_i, u_j)=p(u_j|u_i)^{\frac{1}{M_{u_i,u_j}}}$$
<br>
$$U_m \text{ is the set of users who rated the movie }m$$
$$M_u \text{ is the set of movies rated by user } u$$
$$R(u, m) = \text{ Rating by user } u \text{of movie } m$$

$$p(m|u)=\frac{R(u, m)}{\sum_{m_i \in M_u}{R(u, m_i)}}$$

<br>

$$p(u|m)=\frac{R(u, m)}{\sum_{u_i \in U_m}{R(u_i, m)}}$$

<br>

$$p(u_j|u_i) = 1-\prod_{\forall u_i, u_j \in users, \forall m \in movies} (1-
p(m|u_i)p(u_j|m))$$
$M_{u_i, u_j}$ is the number of items rated by both $u_i$ and $u_j$


In [14]:
sum_ratings_per_item = lambda item: ratings.select([item, "rating", "reliable_rating"])\
                                               .groupBy(item).sum()\
                                               .select(
    [item, col("sum(rating)").alias("#rating"),col("sum(reliable_rating)").alias("#reliable_rating")])

sum_ratings_per_movie = sum_ratings_per_item("movieId")
print("{0:,} movies".format(sum_ratings_per_movie.count()))
sum_ratings_per_movie.show()

3,306 movies
+-------+-------+------------------+
|movieId|#rating|  #reliable_rating|
+-------+-------+------------------+
|    471|  422.5| 331.9933547229547|
|  31528|   16.5|14.799550451131882|
|   4900|   14.0|12.299655852512924|
|   1580| 1800.5|1472.7553644848335|
|  44022|  372.5| 289.2065698197585|
|   8638|  431.5|334.65899107883394|
|  68135|  151.0|125.01652238579794|
|   1645|  712.0|  571.114497435239|
|   6620|  328.0| 259.6850387219688|
|   1591|  230.5|191.61377612675983|
|   3794|   33.0|25.040078355365807|
|   4519|  145.5| 112.5548165024736|
|   5300|   42.0| 36.17397618794674|
|  54190|  199.0|161.35774790708342|
|  36525|  135.0|103.88202769736819|
|   3175|  649.5| 520.3550331494652|
|   1088|  486.5| 386.2016813547274|
|  96488|  110.0| 92.45895907476827|
|   5803|   57.5|37.402608507388564|
|   3997|   89.0| 77.08222406229666|
+-------+-------+------------------+
only showing top 20 rows



In [15]:
sum_ratings_per_user = sum_ratings_per_item("userId")
print("{0:,} users".format(sum_ratings_per_user.count()))
sum_ratings_per_user.show()

1,000 users
+------+-------+------------------+
|userId|#rating|  #reliable_rating|
+------+-------+------------------+
|  4935|   69.5|  37.7193094337887|
|  7880|  425.5| 354.4145730346695|
|  7993|  149.5|126.47313947905427|
| 10817|  150.0|105.73242521490101|
| 13285| 1153.5| 900.4192426933348|
| 23364|  114.0| 88.29159878493387|
| 23571|  459.0| 445.9161209602262|
| 25591|  396.0| 338.5322793187616|
| 34234|  138.5| 93.45434951056633|
| 34239|   88.0| 87.66553437581042|
| 36224|  169.0|124.08681376664087|
| 36538|   86.5| 85.04890707290085|
| 40335|    7.5| 7.476394285176255|
| 45011|  229.0|223.92158366716382|
| 65867|   27.5| 17.90408484700385|
| 73683|  137.0|102.73435703062788|
| 83693|   34.0| 32.41705014329398|
| 97092|   66.5| 64.38088050381117|
|101055|  150.0|135.16712449352124|
|102594|   75.0| 50.03185657677887|
+------+-------+------------------+
only showing top 20 rows



In [16]:
def users_bgr_similarities_per_movie(probs):
    """
    probs is an iterable [( p_m_u, p_u_m, userId)] where i is a movie m_i
    """
#     """
#     probs is an iterable [( p_i_u_r, p_i_u_relr, p_u_i_r, p_u_i_relr, userId)] where i is a movie m_i
#     """
    d = dict()  # {(ui, uj): (1-(p(i_ui) * p(uj_i)), repeat for 'reliable' probs)}
    for i, j in product(probs, probs):
        ui, uj = i[-1], j[-1]
        if ui == uj: continue
#         p_e_ui, p_e_ui_rel = i[0], i[1]
#         p_uj_e, p_uj_e_rel = j[2], j[3]
        p_e_ui = i[0]
        p_uj_e = j[1]
#         d[(ui, uj)] = (1 - p_e_ui*p_uj_e, 1 - p_e_ui_rel*p_uj_e_rel)
        d[(uj, ui)] = (1-p_e_ui*p_uj_e, 1)
    return list(d.items())

In [17]:
u_ratings = sc.broadcast({r["userId"]: (r["#rating"], r["#reliable_rating"]) for r in sum_ratings_per_user.collect()})
i_ratings = sc.broadcast({r["movieId"]: (r["#rating"], r["#reliable_rating"]) for r in sum_ratings_per_movie.collect()})

In [18]:
# probs = ratings.select(["userId", "movieId", "rating", "reliable_rating"]).rdd\
probs = ratings.select(["userId", "movieId", "reliable_rating"]).rdd\
                .map(lambda x: Row(userId=x["userId"], movieId=x["movieId"],
                                   
#                         p_u_i_r=x["rating"]/i_ratings.value[x["movieId"]][0],
                        p_u_m=x["reliable_rating"]/i_ratings.value[x["movieId"]][1],
                                   
#                         p_i_u_r=x["rating"]/u_ratings.value[x["userId"]][0],
                        p_m_u=x["reliable_rating"]/u_ratings.value[x["userId"]][1]))\
                .toDF()
probs.show(5)

+-------+--------------------+--------------------+------+
|movieId|               p_m_u|               p_u_m|userId|
+-------+--------------------+--------------------+------+
|    111|0.041095890410958895|0.003974649184938958|  5936|
|    223|  0.0365296803652968|0.004336258502422931|  5936|
|    296|0.045662100456620995|0.001814151978670191|  5936|
|    471|0.041095890410958895|0.011353029511369394|  5936|
|    858|  0.0365296803652968|0.001902992345807...|  5936|
+-------+--------------------+--------------------+------+
only showing top 5 rows



In [19]:
del sum_ratings_per_user, sum_ratings_per_movie, u_ratings, i_ratings

In [20]:
# users_similar_to_ui = probs.rdd.map(lambda r: (r["movieId"],
#                          (r["p_i_u_r"], r["p_i_u_relr"], r["p_u_i_r"], r["p_u_i_relr"], r["userId"])))\
bgr_similarities = probs.rdd.map(lambda r: (r["movieId"],
                         (r["p_m_u"], r["p_u_m"], r["userId"])))\
        .groupByKey()\
        .mapValues(lambda x: users_bgr_similarities_per_movie(x))\
        .flatMapValues(lambda x: x)\
        .map(lambda x: x[1])\
        .reduceByKey(lambda x1, x2: (x1[0] * x2[0], x1[1] + x2[1]))\
        .map(lambda r: Row(ui=r[0][1], uj=r[0][0], sim=(1-r[1][0])**(1./r[1][1])))\
        .toDF()\
        .sort(desc("sim"))\

#         .reduceByKey(lambda x1, x2: (x1[0] * x2[0], x1[1]*x2[1]))\
#         .map(lambda r: Row(ui=r[0][0], uj=r[0][1], sim=1-r[1][0], sim_rel=1-r[1][1]))\
bgr_similarities.write.mode('overwrite').csv("../data/bgr_similarities.csv", header=True)
bgr_similarities.show(5)


+------------------+------+------+
|               sim|    ui|    uj|
+------------------+------+------+
|0.9918097703615764| 54305|121535|
|0.9895498653874586|121535| 54305|
|0.9890767459733864| 64843|121535|
|0.9888496124917898|117144|121535|
| 0.987418518936548| 23523|121535|
+------------------+------+------+
only showing top 5 rows



In [21]:
ui = 48838
user_recs = bgr_similarities.filter(bgr_similarities.ui == ui)\
                                        .join(ratings, bgr_similarities.uj==ratings.userId)\
                                        .select([col("ui").alias("userId"), "movieId", "title",
                                                 col("reliable_rating").alias("rating"), "sim"])

user_recs = user_recs.withColumn("pred", user_recs.rating*user_recs.sim)\
                                        .groupBy(["userId", "movieId"]).avg("pred")\
                                        .select(["userId", "movieId", col("avg(pred)").alias("pred")])\


user_recs = user_recs.join(ratings, on=["userId", "movieId"])\
                                        .select(["userId", "movieId", "title",
                                                 col("reliable_rating").alias("rating"), "pred"])
user_recs = user_recs.withColumn("error", (user_recs.rating-user_recs.pred)**2).sort(desc("pred"))
user_recs.show(5)

+------+-------+--------------------+------------------+------------------+-------------------+
|userId|movieId|               title|            rating|              pred|              error|
+------+-------+--------------------+------------------+------------------+-------------------+
| 48838|   8015|Phantom Tollbooth...|3.5073220312181785|2.8322763346526094| 0.4556866924516944|
| 48838|   7122|King of Hearts (1...|3.1176195833050477|2.7370062111190854| 0.1448665390867699|
| 48838|   9018| Control Room (2004)|  3.89702447913131|2.4889382171885353| 1.9827069210719759|
| 48838|   1207|To Kill a Mocking...|  3.89702447913131|2.4844017158974046|  1.995503071206594|
| 48838|   2090|Rescuers, The (1977)|2.3382146874787857|2.4605075387863944|0.01495554148094489|
+------+-------+--------------------+------------------+------------------+-------------------+
only showing top 5 rows



In [22]:
bgrrecs, error = get_recommendations(test_users, bgr_similarities, recs_size=10)
print("RMSE: ", error)
bgrrecs.show()
del bgr_similarities

Predicting for user: 4935
MSE:  0.916

Predicting for user: 7880
MSE:  1.152

Predicting for user: 7993
MSE:  2.809

Predicting for user: 10817
MSE:  1.471

Predicting for user: 13285
MSE:  0.414

Predicting for user: 23364
MSE:  1.997

Predicting for user: 23571
MSE:  1.269

Predicting for user: 25591
MSE:  1.11

Predicting for user: 34234
MSE:  2.102

Predicting for user: 34239
MSE:  6.288

RMSE:  1.0708719205764197
+------+-------+--------------------+--------------------+------------------+
|userId|movieId|               title|              genres|              pred|
+------+-------+--------------------+--------------------+------------------+
|  4935| 100617|Patton Oswalt: No...|              Comedy|3.4765902485111884|
|  4935| 120815|Patton Oswalt: We...|              Comedy|3.4765902485111884|
|  4935| 101862|50 Children: The ...|         Documentary| 3.304758326300755|
|  4935| 102951|All In: The Poker...|         Documentary| 3.196502822425268|
|  4935| 127096|Project Almanac 

In [23]:
def users_W_similarities_per_movie(rates, max_rating=5.):
    """
    rates is a list [(ui, rating) for every user rating that movie]
    """
    d = dict()
    for (u1, r1), (u2, r2) in product(rates, rates):
        if u1 == u2: continue
        d[(u1, u2)] = (abs(r1 - r2), max_rating)
    return list(d.items())

$$W^{sim}(u, v) = 1-\sum_{m\in {M_u \cap M_v}}{\frac{{abs(r_u(m) - r_v(m))}}{R}}$$

$R$ is the maximum rating (i.e 5.0)

In [24]:
weighted_similarities = ratings.rdd.map(lambda r: (r["movieId"], (r["userId"], r["reliable_rating"])))\
                                    .groupByKey()\
                                    .mapValues(lambda x: users_W_similarities_per_movie(x))\
                                    .flatMapValues(lambda x:x)\
                                    .map(lambda x:x[1])\
                                    .reduceByKey(lambda x1, x2: (x1[0]+x2[0], x1[1]+x2[1]))\
                                    .map(lambda x: Row(ui=x[0][0], uj=x[0][1], sim=1-float(x[1][0])/x[1][1]))\
                                    .toDF()\
                                    .sort(desc("sim"))
weighted_similarities.write.mode('overwrite').csv("../data/weighted_similarities.csv", header=True)
weighted_similarities.show(5)

+------------------+------+------+
|               sim|    ui|    uj|
+------------------+------+------+
|0.9999943499883635|104397| 24994|
|0.9999943499883635| 24994|104397|
|0.9999937846416633| 12373| 40335|
|0.9999937846416633| 40335| 12373|
|0.9999927547972378| 75808|  6108|
+------------------+------+------+
only showing top 5 rows



In [25]:
recs, error = get_recommendations(test_users, weighted_similarities, recs_size=10)
print("RMSE: ", error)
recs = recs.cache()
recs.show()
del weighted_similarities

Predicting for user: 4935
MSE:  0.337

Predicting for user: 7880
MSE:  1.011

Predicting for user: 7993
MSE:  1.157

Predicting for user: 10817
MSE:  0.402

Predicting for user: 13285
MSE:  0.481

Predicting for user: 23364
MSE:  0.734

Predicting for user: 23571
MSE:  1.432

Predicting for user: 25591
MSE:  0.952

Predicting for user: 34234
MSE:  0.334

Predicting for user: 34239
MSE:  3.659

RMSE:  0.9341897029596626
+------+-------+--------------------+--------------------+------------------+
|userId|movieId|               title|              genres|              pred|
+------+-------+--------------------+--------------------+------------------+
|  4935|   8025|Thief, The (Vor) ...|               Drama| 4.225135465870284|
|  4935| 107627|Physician, The (2...|     Adventure|Drama|3.6975812898193454|
|  4935|  46083|Drawing Restraint...|             Fantasy| 3.691640469826862|
|  4935| 118338|Hard to Be a God ...|              Sci-Fi|3.6095736372850182|
|  4935| 101862|50 Children: Th

### Notice that RMSE is not suitable for evaluating this approach, because predicted ratings are scaled by users similarities, which are typically very small. What matters is the rank of the prediction.

In [26]:
u1, u2 = recs.select(["userId"]).dropDuplicates().rdd.collect()[:2]
u1, u2 = u1[0], u2[0]

In [27]:
ratings.filter(ratings.userId==u1).sort(desc("reliable_rating")).show()

+------+-------+--------+--------------------+--------------------+------+------------------+
|userId|movieId|ratingId|               title|              genres|rating|   reliable_rating|
+------+-------+--------+--------------------+--------------------+------+------------------+
|  4935|   4973|  738692|Amelie (Fabuleux ...|      Comedy|Romance|   5.0|2.7136193837258067|
|  4935|   2791|  738683|    Airplane! (1980)|              Comedy|   5.0|2.7136193837258067|
|  4935|   2502|  738678| Office Space (1999)|        Comedy|Crime|   5.0|2.7136193837258067|
|  4935|   1288|  738661|This Is Spinal Ta...|              Comedy|   4.5| 2.442257445353226|
|  4935|   8464|  738705|Super Size Me (2004)|Comedy|Documentar...|   4.5| 2.442257445353226|
|  4935|  93855|  738737|God Bless America...|        Comedy|Drama|   4.0|2.1708955069806453|
|  4935|   3421|  738684| Animal House (1978)|              Comedy|   4.0|2.1708955069806453|
|  4935|   1274|  738660|        Akira (1988)|Action|Adventu

In [28]:
ratings.filter(ratings.userId==u2).sort(desc("reliable_rating")).show()

+------+-------+----------+--------------------+--------------------+------+-----------------+
|userId|movieId|  ratingId|               title|              genres|rating|  reliable_rating|
+------+-------+----------+--------------------+--------------------+------+-----------------+
| 23571|    296|8590793923| Pulp Fiction (1994)|Comedy|Crime|Dram...|   5.0|4.857474084534058|
| 23571|   1466|8590794051|Donnie Brasco (1997)|         Crime|Drama|   4.5|4.371726676080652|
| 23571|   2231|8590794131|     Rounders (1998)|               Drama|   4.5|4.371726676080652|
| 23571|    111|8590793897|  Taxi Driver (1976)|Crime|Drama|Thriller|   4.5|4.371726676080652|
| 23571|  49530|8590794497|Blood Diamond (2006)|Action|Adventure|...|   4.5|4.371726676080652|
| 23571|    593|8590793966|Silence of the La...|Crime|Horror|Thri...|   4.5|4.371726676080652|
| 23571|  86377|8590794602|Louis C.K.: Shame...|              Comedy|   4.5|4.371726676080652|
| 23571|   1265|8590794033|Groundhog Day (1993)|Co