In [49]:
pip install jdc

Note: you may need to restart the kernel to use updated packages.


In [50]:
import glob
import itertools
import logging
import math
import multiprocessing
from pkgutil import iter_modules
import queue
import random
import shutil
import threading
import time
from pyspark import SparkConf, SparkContext, StorageLevel
import pyspark as sp
from datetime import datetime
import re
import sys
from scipy import spatial
import os
from itertools import combinations
import more_itertools as mit
from pyspark.sql import SparkSession
import jdc
import pandas as pd

import findspark
findspark.init()


## Class Item-item Collaborative Filtering Algorithm
This class implements all the methods needed to run the CF algorithm over the given ratings.csv file.  


In [51]:

# This script contains the implementation of the apriori algorithm for finding frequent item sets
class CFItemItem:

    def __init__(self, rat_file, rec_output, sim_threshold,  part_size = 100000):
        
        self.spark = SparkContext.getOrCreate()
        self.sparkSession = SparkSession(self.spark)
        self.spark.setLogLevel("ERROR")
        
        self.rat_file = rat_file                      # ratings file
        self.output_folder = "CFItemItem"             # folder where to store results and other data
        self.output_file = rec_output                 # Output file where to save results
        self.sim_threshold = float(sim_threshold)     # similarity threshold
        self.movie_ratings = dict()
        self.movie_ratings_userlist = dict()
        self.partition_size = part_size
    
    # Encerra a spark session  
    def close(self):
        self.spark.stop()
    
    # clears the output folder and deletes all of its contents
    def clear_output_folder(self):
        if os.path.exists(self.output_folder):
            shutil.rmtree(self.output_folder)
        os.mkdir(self.output_folder)

**Arguments**
> -rf rat_file: Filename with of the ratings.  
> -o  rec_output: filepath where the final calculated ratings are scored.   
> -t  sim_threashold: similarity threshold, only movies with similarity above are considered similar.    
> --partition-size part_size: number of similar movie pairs saved by partition.  

In [52]:
pd.set_option('max_colwidth', 400)
sys.argv  = ["collab_filtering.py","-mf","movies.csv","-rf","ratings.csv","-o","results","-t","0.5","-lm"]
params = ["-mf", "-rf", "-o","-t"]
if len(set([x for x in params if x in sys.argv])) !=4:
    print("Incorrect Parameters\n")
    print("Use: \n-rf Filename_of_User_Ratings(String)\n-o recommendation_output_file(String)\n-t similarity_threshold(Double/Float)")
    sys.exit(-1)
    
partSize = 100000
for i in range(8):
    if sys.argv[i+1] == "-rf":
        r_filename = sys.argv[i+2]

    elif sys.argv[i+1] == "-o":
        out = sys.argv[i+2]

    elif sys.argv[i+1] == "-t":
        thresh = sys.argv[i+2]

    elif sys.argv[i+1] == "--partition-size":
        partSize = int(sys.argv[i+2])

lm = False
if "-lm" in sys.argv:
    lm = True


cfItemItem = CFItemItem(r_filename, out, float(thresh), partSize)

### Load Movie Ratings By Movie
Reads the ratings file  
Data stored in RDD as MovieId: Array of tuples (UserId, Rating)


In [53]:
%%add_to CFItemItem                  
def load_movie_ratings(self): #Key = movieId, values = [(userId, Rating)...] 
    textfile = self.spark.textFile(self.rat_file)
    self.ratings = textfile.flatMap(lambda line: re.split(r'\n', line.lower()))\
        .filter(lambda line: len(line.split(","))== 4 and "movieid" not in line)\
            .map(lambda line : (line.split(",")[1],[(int(line.split(",")[0]), float(line.split(",")[2]))]))\
                .reduceByKey(lambda a,b: sorted(set(a+b), key = lambda p: p[0]))\
                .sortBy(lambda p: p[0])
    
    # Gets the maximum user ID
    users_ratings = [p[1] for p in self.ratings.collect()]
    ids = [x[0] for k in users_ratings for x in k ]
    self.max_user_id = max(ids)


In [54]:
cfItemItem.load_movie_ratings()
df = cfItemItem.ratings.toDF().toPandas()
df.columns = ["MovieId","(UserId,Rating)"]
df

Unnamed: 0,MovieId,"(UserId,Rating)"
0,1,"[(1, 4.0), (5, 4.0), (7, 4.5), (15, 2.5), (17, 4.5), (18, 3.5), (19, 4.0), (21, 3.5), (27, 3.0), (31, 5.0), (32, 3.0), (33, 3.0), (40, 5.0), (43, 5.0), (44, 3.0), (45, 4.0), (46, 5.0), (50, 3.0), (54, 3.0), (57, 5.0), (63, 5.0), (64, 4.0), (66, 4.0), (68, 2.5), (71, 5.0), (73, 4.5), (76, 0.5), (78, 4.0), (82, 2.5), (86, 4.0), (89, 3.0), (90, 3.0), (91, 4.0), (93, 3.0), (96, 5.0), (98, 4.5), (1..."
1,10,"[(6, 3.0), (8, 2.0), (11, 3.0), (19, 2.0), (21, 5.0), (26, 3.0), (31, 4.0), (34, 5.0), (42, 5.0), (43, 4.0), (46, 3.0), (56, 4.0), (57, 3.0), (59, 3.0), (63, 3.0), (68, 4.5), (81, 1.0), (82, 3.5), (84, 3.0), (91, 3.5), (93, 4.0), (94, 3.0), (99, 4.0), (104, 3.0), (117, 3.0), (119, 4.0), (136, 5.0), (144, 3.0), (160, 1.0), (166, 4.5), (170, 3.0), (173, 3.0), (174, 3.0), (176, 5.0), (178, 4.0), ..."
2,100,"[(6, 3.0), (32, 4.0), (84, 3.0), (181, 3.0), (182, 3.0), (207, 3.0), (297, 1.0), (314, 3.0), (337, 3.0), (444, 3.0), (474, 2.0), (492, 3.0)]"
3,100044,"[(318, 4.0)]"
4,100068,"[(483, 3.5)]"
...,...,...
8913,99853,"[(89, 4.0)]"
8914,999,"[(6, 2.0), (57, 2.0), (156, 4.0), (182, 3.5), (202, 4.0), (294, 2.0), (313, 4.0), (368, 3.0), (385, 3.0), (387, 3.0), (414, 3.0)]"
8915,99910,"[(249, 3.5), (380, 3.0)]"
8916,99917,"[(567, 3.5)]"


### Load Movie Ratings by User
Reads the ratings file  
Data stored in RDD as MovieId: Array of tuples (UserId, Rating)


In [55]:
%%add_to CFItemItem 
def load_ratings_by_user(self):
    textfile = self.spark.textFile(self.rat_file)
    self.ratings_by_user = textfile.flatMap(lambda line: re.split(r'\n', line.lower()))\
        .filter(lambda line: len(line.split(","))== 4 and "movieid" not in line)\
            .map(lambda line : (int(line.split(",")[0]),[(int(line.split(",")[1]), float(line.split(",")[2]))]))\
                .reduceByKey(lambda a,b: sorted(set(a+b), key = lambda p: p[0]))\
                .sortBy(lambda p: p[0])

In [56]:
cfItemItem.load_ratings_by_user()
df = cfItemItem.ratings_by_user.toDF().toPandas()
df.columns = ["userId","(Movie,Rating)"]
df

                                                                                

Unnamed: 0,userId,"(Movie,Rating)"
0,1,"[(1, 4.0), (3, 4.0), (6, 4.0), (47, 5.0), (50, 5.0), (70, 3.0), (101, 5.0), (110, 4.0), (151, 5.0), (157, 5.0), (163, 5.0), (216, 5.0), (223, 3.0), (231, 5.0), (235, 4.0), (260, 5.0), (296, 3.0), (316, 3.0), (333, 5.0), (349, 4.0), (356, 4.0), (362, 5.0), (367, 4.0), (423, 3.0), (441, 4.0), (457, 5.0), (480, 4.0), (500, 3.0), (527, 5.0), (543, 4.0), (552, 4.0), (553, 5.0), (590, 4.0), (592, 4...."
1,2,"[(318, 3.0), (333, 4.0), (1704, 4.5), (3578, 4.0), (6874, 4.0), (8798, 3.5), (46970, 4.0), (48516, 4.0), (58559, 4.5), (60756, 5.0), (68157, 4.5), (71535, 3.0), (74458, 4.0), (77455, 3.0), (79132, 4.0), (80489, 4.5), (80906, 5.0), (86345, 4.0), (89774, 5.0), (91529, 3.5), (91658, 2.5), (99114, 3.5), (106782, 5.0), (109487, 3.0), (112552, 4.0), (114060, 2.0), (115713, 3.5), (122882, 5.0), (1317..."
2,3,"[(31, 0.5), (527, 0.5), (647, 0.5), (688, 0.5), (720, 0.5), (849, 5.0), (914, 0.5), (1093, 0.5), (1124, 0.5), (1263, 0.5), (1272, 0.5), (1275, 3.5), (1302, 0.5), (1371, 3.0), (1587, 4.5), (2018, 0.5), (2080, 0.5), (2090, 0.5), (2105, 2.0), (2288, 4.0), (2424, 0.5), (2851, 5.0), (3024, 4.5), (3210, 0.5), (3703, 5.0), (3949, 0.5), (4518, 5.0), (5048, 0.5), (5181, 5.0), (5746, 5.0), (5764, 4.5), ..."
3,4,"[(21, 3.0), (32, 2.0), (45, 3.0), (47, 2.0), (52, 3.0), (58, 3.0), (106, 4.0), (125, 5.0), (126, 1.0), (162, 5.0), (171, 3.0), (176, 5.0), (190, 2.0), (215, 5.0), (222, 1.0), (232, 5.0), (235, 2.0), (247, 3.0), (260, 5.0), (265, 5.0), (296, 1.0), (319, 5.0), (342, 5.0), (345, 4.0), (348, 4.0), (351, 3.0), (357, 3.0), (368, 4.0), (417, 2.0), (441, 1.0), (450, 2.0), (457, 5.0), (475, 5.0), (492,..."
4,5,"[(1, 4.0), (21, 4.0), (34, 4.0), (36, 4.0), (39, 3.0), (50, 4.0), (58, 5.0), (110, 4.0), (150, 3.0), (153, 3.0), (232, 4.0), (247, 5.0), (253, 3.0), (261, 4.0), (265, 3.0), (266, 1.0), (290, 5.0), (296, 5.0), (300, 3.0), (316, 2.0), (318, 3.0), (344, 3.0), (349, 3.0), (357, 2.0), (364, 3.0), (367, 4.0), (380, 2.0), (410, 3.0), (457, 4.0), (474, 4.0), (475, 5.0), (515, 3.0), (527, 5.0), (531, 4..."
...,...,...
585,586,"[(2, 4.0), (110, 5.0), (161, 4.5), (168, 3.5), (185, 3.5), (260, 4.5), (316, 4.0), (318, 5.0), (339, 4.0), (368, 4.0), (380, 4.5), (440, 4.0), (457, 4.5), (527, 2.5), (539, 3.5), (553, 4.5), (587, 4.0), (588, 4.5), (589, 5.0), (733, 4.0), (786, 4.0), (832, 4.0), (858, 3.0), (1073, 4.5), (1097, 4.0), (1196, 4.5), (1198, 5.0), (1200, 5.0), (1210, 4.5), (1221, 3.0), (1265, 4.5), (1270, 4.5), (129..."
586,587,"[(1, 5.0), (11, 4.0), (21, 4.0), (32, 5.0), (50, 5.0), (58, 5.0), (141, 4.0), (153, 3.0), (205, 3.0), (235, 4.0), (236, 5.0), (252, 4.0), (296, 5.0), (329, 4.0), (342, 4.0), (344, 3.0), (345, 4.0), (356, 4.0), (357, 3.0), (372, 4.0), (380, 4.0), (432, 4.0), (440, 4.0), (445, 4.0), (497, 4.0), (500, 5.0), (515, 5.0), (534, 4.0), (539, 4.0), (587, 5.0), (590, 5.0), (597, 4.0), (628, 5.0), (691, ..."
587,588,"[(3, 3.0), (6, 5.0), (10, 3.0), (16, 4.0), (20, 2.0), (21, 3.0), (22, 3.0), (25, 3.0), (36, 2.0), (42, 3.0), (47, 3.0), (50, 5.0), (110, 5.0), (147, 3.0), (150, 3.0), (161, 4.0), (163, 3.0), (165, 4.0), (175, 4.0), (185, 1.0), (198, 3.0), (204, 2.0), (208, 2.0), (217, 1.0), (223, 5.0), (225, 3.0), (231, 2.0), (253, 2.0), (288, 3.0), (292, 3.0), (296, 5.0), (300, 4.0), (316, 2.0), (318, 5.0), (..."
588,589,"[(25, 5.0), (36, 5.0), (111, 5.0), (150, 4.0), (161, 3.0), (185, 3.0), (193, 3.0), (231, 4.0), (235, 4.0), (246, 5.0), (282, 4.0), (296, 5.0), (300, 3.0), (318, 5.0), (337, 5.0), (342, 4.0), (350, 3.0), (356, 5.0), (357, 4.0), (373, 4.0), (376, 5.0), (454, 4.0), (481, 4.0), (508, 4.0), (509, 4.0), (527, 5.0), (529, 3.0), (531, 3.0), (586, 3.0), (587, 5.0), (590, 5.0), (593, 5.0), (733, 2.0), (..."


### Create Movie Ratings Array By Movie  
Creates an array of ratings per movie  
If a user has not given a rating, the rating in the array is 0.0  
Each entry in the movie_ratings array is of the sort:  
> movie = [ratingOfUser1, ratingOfUser2, ratingOfUser3, ratingOfUser4...]   

Creates an array of users who rated each movie
> movie = [userId1, userId2, userid3,...]  


In [57]:
%%add_to CFItemItem 
def create_movie_ratings_arrays(self):
    self.movie_ratings = dict()
    self.movie_ratings_userlist = dict()
    for movie, m_rating_list in self.ratings.collect():
        ratings = {k[0]:k[1] for k in m_rating_list}
        avg = sum(ratings.values())/len(ratings)
        self.movie_ratings[movie] = [ratings[u+1]-avg if u+1 in ratings else 0 for u in range(self.max_user_id)]
        self.movie_ratings_userlist[movie] = [k[0] for k in m_rating_list]

In [58]:
cfItemItem.create_movie_ratings_arrays()
for movie,ratings  in (list(cfItemItem.movie_ratings.items()))[:10]:
    print("{:6}".format(movie),["{:.4f}".format(a) for a in ratings[:10]],"...")

1      ['0.0520', '0.0000', '0.0000', '0.0000', '0.0520', '0.0000', '0.5520', '0.0000', '0.0000', '0.0000'] ...
10     ['0.0000', '0.0000', '0.0000', '0.0000', '0.0000', '-0.5000', '0.0000', '-1.5000', '0.0000', '0.0000'] ...
100    ['0.0000', '0.0000', '0.0000', '0.0000', '0.0000', '0.1667', '0.0000', '0.0000', '0.0000', '0.0000'] ...
100044 ['0.0000', '0.0000', '0.0000', '0.0000', '0.0000', '0.0000', '0.0000', '0.0000', '0.0000', '0.0000'] ...
100068 ['0.0000', '0.0000', '0.0000', '0.0000', '0.0000', '0.0000', '0.0000', '0.0000', '0.0000', '0.0000'] ...
100083 ['0.0000', '0.0000', '0.0000', '0.0000', '0.0000', '0.0000', '0.0000', '0.0000', '0.0000', '0.0000'] ...
100106 ['0.0000', '0.0000', '0.0000', '0.0000', '0.0000', '0.0000', '0.0000', '0.0000', '0.0000', '0.0000'] ...
100163 ['0.0000', '0.0000', '0.0000', '0.0000', '0.0000', '0.0000', '0.0000', '0.0000', '0.0000', '0.0000'] ...
100194 ['0.0000', '0.0000', '0.0000', '0.0000', '0.0000', '0.0000', '0.0000', '0.0000', '0.0000', '0.0

### Calculate Cosine Similarity for Each Possible Combination of 2 Movies
Creates all movie pair combinations and calculates the cosine similarity between both movies using the previously calculated movie_ratings array   
Because there will be many pairs, the calculated similarities  are saved in partitions of self.part_size

In [146]:
%%add_to CFItemItem 
def saveRdd(self, sims, fileN):
    similarities = self.spark.parallelize(sims).sortBy(lambda p: p[2])
    lines =  similarities.map(lambda data: ','.join(str(d) for d in data))    # map to CSV format
    lines.coalesce(1).saveAsTextFile("{0}/{1}".format(self.output_folder, "similarities/partition"+str(int(fileN))))        

    
def calculate_cosine_similarity(self):
    print("Starting Cosine Similarity Calculator")
    sims = []
    mv_list = list(self.movie_ratings_userlist.keys())
    
    all_movie_combs = list(combinations(mv_list,2))           #Get all combinations
    t = time.time()
    i= 0
    for (m1,m2) in all_movie_combs:
        #Get all user who rated both movies
        intersection = [x for x in self.movie_ratings_userlist[m1] if x in  self.movie_ratings_userlist[m2]] 
        if len(intersection)>0:
            # Creates a list of ratings only from the users who rated both movies
            lu1 = [self.movie_ratings[m1][int(i)-1] for i in intersection]
            lu2 = [self.movie_ratings[m2][int(i)-1] for i in intersection]

            # Use of scipy spatial distance cosine to calculate similarity
            sims += [(m1,m2, 1-spatial.distance.cosine(lu1,lu2))]
            if i%self.partition_size==0: 
                if (i!=0): self.saveRdd(sims, (i/self.partition_size))
                sims = []
            i+=1

    fn = math.ceil(i/100000)
    self.saveRdd(sims, fn)
    print("Finished Calculating Similarities in ",time.time()-t,"s")
    print("Average","{:.2f}".format(i/(time.time()-t)),"Similarities Calculated per Second")
    print("Results saved in ",fn," partitions")

In [147]:
cfItemItem.clear_output_folder()      
cfItemItem.calculate_cosine_similarity()

Starting Cosine Similarity Calculator


  dist = 1.0 - uv / np.sqrt(uu * vv)
                                                                                

Finished Calculating Similarities in  642.0745890140533 s
Average 16745.44 Similarities Calculated per Second
Results saved in  108  partitions


## Load all similarity records partitions from memory

In [59]:
%%add_to CFItemItem 
def load_similarity(self):
    t = time.time()
    all_files_to_load = [x[0]+"/part-00000" for x in os.walk("CFItemItem/similarities")][1:]

    extfile = self.spark.textFile(','.join(all_files_to_load))

    # Load CSV saved as txt file
    self.similarities = extfile.flatMap(lambda line: re.split(r'\n', line.lower())) \
                            .filter(lambda line: len(line.split(","))== 3 )\
                            .map(lambda pair : (int(pair.split(",")[0]),int(pair.split(",")[1]),float(pair.split(",")[2]))) \
                                .sortBy(lambda p: p[1])

    print(len(all_files_to_load), " Partitions loaded in","{:.2f}".format(time.time()-t)," seconds")

In [60]:
cfItemItem.load_similarity()



108  Partitions loaded in 30.59  seconds


                                                                                

### Estimate Rating for Movies by Certain Users
Using all previously calculated similarity values between pairs of movies, estimate the rating of one user to a certain movie  
**Steps**
> For each user, get movies which have not been rated  
> For each one of those movies, find similar ones from the list of movies which have been rated by the user  
> Predict by taking Weighted Average  

In [61]:
%%add_to CFItemItem 
def estimate_rating(self):
    if os.path.exists("{0}/{1}".format(self.output_folder,self.output_file)):
        shutil.rmtree("{0}/{1}".format(self.output_folder,self.output_file))
    self.estimate = []

    has_similars = []
    sims = dict()
    self.results = dict()
    for x in self.similarities.collect():
        if x[2]>self.sim_threshold:
            has_similars += [x[0],x[1]]
            sims[(x[0],x[1])] = x[2]
            sims[(x[1],x[0])] = x[2]


    print("Started Collecting Data")
    dataset = self.ratings_by_user.collect()
    t2 = time.time()

    sims_items = sims.items()
    print("Started Calculating Estimations")
    i=0
    for user, ratings in dataset:
        # for each user
        # Get All movies which have similarities movie:rating
        t1 = time.time()
        ratings_by_movie = {int(k[0]):k[1] for k in ratings if int(k[0]) in has_similars}
        # Get Similarities between rated movies and unrated movies (rated,unrated) = similarity
        filtered = {k:v for k,v in sims_items if k[0] in ratings_by_movie and k[1] not in ratings_by_movie}

        #Set of all unrated movies which can be estimated
        for movie in list(set([k[1] for k in filtered])):
            # Get all movies rated by user which are similar to movie
            can_count = [m for m in ratings_by_movie if (m,movie) in filtered]
            # If there is more that 0 movies
            if len(can_count)>0:
                weighted_sum = sum([filtered[(m, movie)]*ratings_by_movie[m] for m in can_count])
                sum_of_weights = sum([filtered[(m, movie)] for m in can_count])
                rating = weighted_sum/sum_of_weights 
                self.estimate.append((movie, user, rating))
                i+=1
    final_t = time.time()-t2
    print(i," rating estimations calculated in", "{:.2f}".format(final_t),"seconds")
    print("Average throughput of ","{:.2f}".format(i/final_t), "calculation per second")
    resultsRdd = self.spark.parallelize(self.estimate).sortBy(lambda p: p[0])
    lines =  resultsRdd.map(lambda data: ';'.join(str(d) for d in data))    # map to CSV format
    lines.saveAsTextFile("{0}/{1}".format(self.output_folder,self.output_file)) 

In [62]:
cfItemItem.estimate_rating()

                                                                                

Started Collecting Data
Started Calculating Estimations
5089284  rating estimations calculated in 1076.02 seconds
Average throughput of  4729.73 calculation per second


                                                                                

## Evaluation  
The pipeline.py algorithm split the records and stored 10% in totest.csv


In [63]:
textfile = cfItemItem.spark.textFile("totest.csv")
test_ratings = textfile.flatMap(lambda line: re.split(r'\n', line.lower()))\
    .filter(lambda line: len(line.split(","))== 4 and "movieid" not in line)\
        .map(lambda line : (line.split(",")[1],[(int(line.split(",")[0]), float(line.split(",")[2]))]))\
            .reduceByKey(lambda a,b: sorted(set(a+b), key = lambda p: p[0]))\
            .sortBy(lambda p: p[0])
df = test_ratings.toDF().toPandas()
df.columns = ["MovieId","(UserId,Rating)"]
df

Unnamed: 0,MovieId,"(UserId,Rating)"
0,1,"[(596, 4.0), (597, 4.0), (599, 3.0), (600, 2.5), (601, 4.0), (603, 4.0), (604, 3.0), (605, 4.0), (606, 2.5), (607, 4.0), (608, 2.5), (609, 3.0), (610, 5.0)]"
1,10,"[(592, 3.0), (597, 3.0), (599, 3.5), (602, 3.0), (608, 4.0), (609, 4.0)]"
2,100,"[(599, 2.0), (602, 3.0)]"
3,100083,"[(610, 3.5)]"
4,100159,"[(610, 4.5)]"
...,...,...
4728,99721,"[(610, 1.5)]"
4729,998,"[(597, 2.0)]"
4730,99813,"[(599, 3.0)]"
4731,999,"[(599, 3.0)]"


In [70]:
_sum = 0
counter = 0
to_dict = {int(m):{int(x[0]):float(x[1]) for x in rs} for m,rs in test_ratings.collect()}

for movie, user, estimated in cfItemItem.estimate:
    if int(movie) in to_dict and int(user) in to_dict[int(movie)]:
        _real = to_dict[int(movie)][int(user)]
        if counter<10: print("Real:",_real,"Estimated:",estimated)
        _sum+=(_real-estimated)**2
        counter+=1
        

In [71]:
eval = _sum/counter
print("RMDE",math.sqrt(eval))

RMDE 0.813584748534596


Although this value is large, it is because we have used a similarity threshold of 0.5 which is 

In [1]:
cfItemItem.close()

NameError: name 'cfItemItem' is not defined