In [1]:
sc = spark.sparkContext

In [2]:
# read data and and give the column names
rating = sc.textFile("/Users/alicewu/Downloads/ml-100k/u.data")\
           .map(lambda x: x.split()).map(lambda x: x[0:3]).toDF(["user","item","rating"])

In [3]:
# Also keep track of the total number of people who rated a movie
ratingWithsize = rating.groupBy("item").count().join(rating, on = "item", how = "right")

In [4]:
ratingWithsize.show(5)

+----+-----+----+------+
|item|count|user|rating|
+----+-----+----+------+
|1090|   37|  56|     3|
|1090|   37| 303|     1|
|1090|   37| 268|     2|
|1090|   37| 145|     2|
|1090|   37| 320|     3|
+----+-----+----+------+
only showing top 5 rows



In [5]:
# read movie data and give column names
item = sc.textFile("/Users/alicewu/Downloads/ml-100k/u.item"
                  ).map(lambda x: x.split("|")).map(lambda x: x[0:2]).toDF(["item","movie_name"])

In [6]:
# join movie with rating to obtain movie name.
rating_movie = ratingWithsize.join(item, on = "item", how = "left")

In [7]:
# Make a dummy copy of the ratings, so we can do a self-join.
rating_movie2 = rating_movie.withColumnRenamed("item","item2").\
                             withColumnRenamed("movie_name","movie_name2").\
                             withColumnRenamed("rating","rating2").\
                             withColumnRenamed("count","count2")

In [8]:
# Join the two rating streams on their user fields,  
# in order to find all pairs of movies that a user has rated.  
rating_pair=rating_movie.join(rating_movie2, on = "user", how ="left")\
                        .filter(rating_movie.item < rating_movie2.item2)

In [9]:
rating_pair.show(5)

+----+----+-----+------+-----------------+-----+------+-------+--------------------+
|user|item|count|rating|       movie_name|item2|count2|rating2|         movie_name2|
+----+----+-----+------+-----------------+-----+------+-------+--------------------+
| 296| 125|  244|     5|Phenomenon (1996)|    7|   392|      5|Twelve Monkeys (1...|
| 296| 125|  244|     5|Phenomenon (1996)|  475|   250|      4|Trainspotting (1996)|
| 296| 125|  244|     5|Phenomenon (1996)|  544|    71|      4|Things to Do in D...|
| 296| 125|  244|     5|Phenomenon (1996)|  272|   198|      5|Good Will Hunting...|
| 296| 125|  244|     5|Phenomenon (1996)|  462|   148|      4|Like Water For Ch...|
+----+----+-----+------+-----------------+-----+------+-------+--------------------+
only showing top 5 rows



In [10]:
# Compute (x*y, x^2, y^2), which we need for dot products and norms.
rating_pair = rating_pair.withColumn("ratingProd", rating_pair.rating*rating_pair.rating2
                        ).withColumn("sqRating", rating_pair.rating**2
                        ).withColumn("sqRating2", rating_pair.rating2**2)

In [11]:
# Compute dot products, norms, sums, and sizes of the rating vectors.
v = rating_pair.groupBy("movie_name","movie_name2").agg({"user":"count","ratingProd":"sum", 
                                        "rating":"sum", "rating2": "sum", "sqRating":"sum", 
                                         "sqRating2":"sum","count":"max", "count2":"max"})

In [12]:
# rename the columns
v = v.withColumnRenamed("count(user)", "size").withColumnRenamed("sum(rating)", "r1"
    ).withColumnRenamed("sum(rating2)", "r2").withColumnRenamed("sum(ratingProd)", "r_Prod"
    ).withColumnRenamed("sum(sqRating)", "sqR_sum1").withColumnRenamed("sum(sqRating2)", "sqR_sum2"
    ).withColumnRenamed("max(count)","count1").withColumnRenamed("max(count2)", "count2")

In [13]:
v.show(5)

+--------------------+--------------------+------+------+--------+--------+------+-----+-----+----+
|          movie_name|         movie_name2|count2|count1|sqR_sum1|sqR_sum2|r_Prod|   r1|   r2|size|
+--------------------+--------------------+------+------+--------+--------+------+-----+-----+----+
|    Lone Star (1996)|     In & Out (1997)|   230|   187|  1044.0|   683.0| 779.0|238.0|185.0|  57|
|Good Will Hunting...|English Patient, ...|   481|   198|  2649.0|  1967.0|2158.0|597.0|495.0| 139|
|Good Will Hunting...|   Booty Call (1997)|    48|   198|   298.0|    70.0| 127.0| 62.0| 26.0|  13|
|Seven (Se7en) (1995)|English Patient, ...|   481|   236|  1577.0|  1462.0|1417.0|383.0|368.0| 102|
|Bridge on the Riv...|  Chasing Amy (1997)|   255|   165|  1242.0|  1097.0|1137.0|276.0|259.0|  63|
+--------------------+--------------------+------+------+--------+--------+------+-----+-----+----+
only showing top 5 rows



In [14]:
# Create Similarity Measures:
def correlation(sqR_sum1,sqR_sum2,r_Prod,r1,r2,size):
    numerator = size * r_Prod - r1 * r2
    denominator = ((size * sqR_sum1 - r1 * r1)**0.5) * ((size * sqR_sum2 - r2 * r2)**0.5)
    corr = numerator / denominator
    return corr

In [15]:
def cosineSimilarity(r_Prod,sqR_sum1,sqR_sum2):
    cosin = r_Prod / (sqR_sum1 * sqR_sum2)
    return cosin

In [16]:
def jaccardSimilarity(usersInCommon, count1, count2):
    union = count1 + count2 - usersInCommon
    jaccard = usersInCommon / union
    return jaccard

In [17]:
PRIOR_COUNT = 10
PRIOR_CORRELATION = 0

In [18]:
def regularizedCorrelation(size, r_Prod, r1, r2, sqR_sum1, sqR_sum2, virtualCount, priorCorrelation): 
    unregularizedCorrelation = correlation(sqR_sum1,sqR_sum2,r_Prod,r1,r2,size)
    w = (size + 0.0) / (size + virtualCount)
    return w * unregularizedCorrelation + (1 - w) * priorCorrelation

In [19]:
# Calculate similarity between rating vectors using similarity measures 
# like correlation, cosine similarity, and Jaccard similarity.
movie_recom = v.withColumn("corr", correlation(v.sqR_sum1,v.sqR_sum2,v.r_Prod,v.r1,v.r2,v.size))\
               .withColumn("cosin", cosineSimilarity(v.r_Prod, v.sqR_sum1,v.sqR_sum2))\
               .withColumn("jaccard", jaccardSimilarity(v.size, v.count1, v.count2))\
               .withColumn("regular", regularizedCorrelation(v.size, v.r_Prod, v.r1, v.r2,v.sqR_sum1,\
                                                             v.sqR_sum2,PRIOR_COUNT,PRIOR_CORRELATION))

In [20]:
similarity = movie_recommendation.rdd.map(lambda x : ((x["movie_name"],x["movie_name2"]), x["corr"],\
                                                      x["cosin"],x["jaccard"],x["regular"]))

In [21]:
similarity.take(3)

[((u'Lone Star (1996)', u'In & Out (1997)'),
  0.1016006212653406,
  0.0010924869434487247,
  0.15833333333333333,
  0.08643634943469275),
 ((u'Good Will Hunting (1997)', u'English Patient, The (1996)'),
  0.2429528097354092,
  0.0004141571106342611,
  0.2574074074074074,
  0.22664725203504615),
 ((u'Good Will Hunting (1997)', u'Booty Call (1997)'),
  0.4654746681256314,
  0.0060882070949185045,
  0.055793991416309016,
  0.2630943776362264)]