In [1]:
import findspark
findspark.init()

import sys
from pyspark import SparkContext, SparkConf
from scipy import spatial
import numpy as np

In [2]:
# now we will apart from the movie names, we will also extract the last 19 elements in each movie record. The last 19 elements
# forms a binary vector with 1 and 0 placed against 19 genres (1 meaning the movie is of that genre, 0 meaning otherwise). Note
# a movie can belong to multiple genres

def loadMovieNames():
    movieDetails = {}
    with open('ml-100k\u.item') as file:
        for line in file:
            fields = line.split('|')
            genres = fields[-19:]
            movieDetails[int(fields[0])] = [fields[1].decode('ascii','ignore'), genres]
       
    return movieDetails

In [3]:
def filterDuplicates((userid, pairs)):
    movie1 = pairs[0][0]
    movie2 = pairs[1][0]
    return movie1 < movie2
            
def sigmoid(x,a):
    return float(1/(1+ np.exp(-a*x)))
 
alpha = 0.5    

def similarityFunc(list_of_ratings):
    arr = np.array(list_of_ratings)
    # the method gives the distance. 1 - the value is the similarity
    sim = 1-spatial.distance.cosine(arr[:,0], arr[:,1])
    num_of_pairs = len(list_of_ratings)
    weight = sigmoid(num_of_pairs, 0.005)
    weighted_sim = alpha*weight + (1-alpha)*sim
    
    return (float(weighted_sim), num_of_pairs)
    

def makeItemPairs((userid, pairs)):
    (movie1, rating1) = pairs[0]
    (movie2, rating2) = pairs[1]
    
    return ((movie1,movie2),(rating1,rating2))

# this is a cosine similarity function for the genre vectors
def genreSimilarity(pair):
    v1 = np.array(pair[0])
    v2 = np.array(pair[1])
    genre_sim = 1-spatial.distance.cosine(v1, v2)
    return float(genre_sim)

def netSimilarty((key,value)):
    rating_simi = value[0][0]
    genre_simi = value[1]
    
    net_simi = alpha*rating_simi + (1-alpha)*genre_simi
    
    return (key, (net_simi, value[0][1]))

In [4]:
# the dataset has 100,000 records. So we will be using all the cores available in the computer
# by setting the setMaster argument to 'local[*]'

conf = SparkConf().setMaster('local[*]').setAppName('MovieSimi_ver3')
sc = SparkContext(conf = conf)

# load the movie names dataset into a large dictionary called names
details = loadMovieNames()
movie_genre = sc.parallelize([(k,map(int,v[1])) for k,v in details.iteritems()])

data = sc.textFile('.\ml-100k\u.data')

# map ratings in the form of userid as key and (movieid, rating) as value
user_ratings = data.map(lambda x: x.split('\t')).map(lambda parts: (int(parts[0]), (int(parts[1]),\
                                                     float(parts[2]))))

# compute a list of movie pairs with rating pairs from all users who have rated that pair of movies
# this will have each record in the form userid as key and  ((movieid1, rating1), (movieid2, rating2))
# as the value
movie_ratings = user_ratings.join(user_ratings)

# removing duplicates...since a join could result in both movies being same and all combinations of 
# the same movie pair in different orders. we will only keep the movies where the first id is less 
# than the second id. 
movie_ratings_clean = movie_ratings.filter(filterDuplicates)

# the records are still in the form userid as key and  ((movieid1, rating1), (movieid2, rating2)) as
# the value. convert this rdd to the form (movieid1, movieid2) as the key and (rating1,rating2) as 
# the value
movie_pairs = movie_ratings_clean.map(makeItemPairs)

# Now we will group by movie pairs to find all available pairs of ratings for each unique movie pair
movie_pairs_group = movie_pairs.groupByKey().map(lambda (x,y) : (x, list(y)))

#now for each movie pair as the key the value is a list of rating pairs collected from all users. We
# will consider each rating pair as elements from two vectors and essentially compute the similarity
# of the two vectors each vector being the collection of ratings

movie_pair_simi = movie_pairs_group.mapValues(similarityFunc).persist() #we cache this rdd as this 
# will be used later

# each record is now of the form (movieid1, movieid2) as key and (similarity, number of rating pairs) as the value

print movie_pair_simi.take(10)

[((197, 1097), (0.7423110081201001, 7)), ((42, 364), (0.7159167404158419, 18)), ((773, 1409), (0.7506249986979199, 1)), ((273, 617), (0.7370222333904755, 7)), ((372, 974), (0.7506249986979199, 1)), ((789, 865), (0.7467487273331919, 3)), ((496, 1314), (0.7407083328480895, 4)), ((246, 1008), (0.7337086520601352, 18)), ((856, 1006), (0.7405811481955664, 10)), ((747, 795), (0.6623528535395095, 6))]


In [8]:
user_choice = 50
num_of_reco = 20

# extract the genre vector for the movie chosen 
user_choice_genre = map(int,details[user_choice][1])
    
# create a rdd with all movie IDs except the chosen movie as keys and the corresponding genre vector as the value   
movie_genre_userchoice = movie_genre.filter(lambda (k,v) : k!=user_choice)


# Transform the rdd into the form (chosen movie, other movie) as key and (chosen movie genre vector, other movie genre vector)
# as value
movie_genre_simi = movie_genre_userchoice.map(lambda (k,v) : ((int(user_choice),k),((user_choice_genre),(v)))).\
mapValues(genreSimilarity)
    
    
def reshuffle((key,value)):
    m1 = key[0]
    m2 = key[1]
    if m1 == user_choice:
        key = (m1,m2)
        return (key,value)
    else:
        key = (m2,m1)
        return (key,value)
        
# use the reshuffle function to filter the rdd to contain only those movie ID pairs that contain the chosen movie ID 
# then reshuffle the key such that the chose movie ID is first followed by the other movie ID
movie_rating_simi = movie_pair_simi.filter(lambda (pair, (sim,num)) : user_choice in  pair).map(reshuffle)


# joining with the similar pair RDD we had with ratings similarity
result_1 = movie_rating_simi.join(movie_genre_simi)

# use the netSimilarity function to get a RDD that has (net similaity) as key and ((chosen movie, other movie), number of 
# ratings) as the value
net_result = result_1.map(netSimilarty).map(lambda (k,v): (v[0], (k,v[1]))).sortByKey(ascending=False)
   
results_final  = net_result.take(num_of_reco)

print 'Top {} similar movies for the movie:  {}\n'.format(num_of_reco, details[user_choice][0])
for val in results_final:
    (score, ((choice,other_movie), num_ratings)) = val
    print details[other_movie][0] + ' with score: {:.2} from {} ratings'.format(score, num_ratings) 

Top 20 similar movies for the movie:  Star Wars (1977)

Return of the Jedi (1983) with score: 0.98 from 480 ratings
Empire Strikes Back, The (1980) with score: 0.92 from 345 ratings
African Queen, The (1951) with score: 0.86 from 138 ratings
Starship Troopers (1997) with score: 0.85 from 138 ratings
Independence Day (ID4) (1996) with score: 0.84 from 362 ratings
Star Trek: First Contact (1996) with score: 0.84 from 316 ratings
Jurassic Park (1993) with score: 0.82 from 242 ratings
Star Trek: The Wrath of Khan (1982) with score: 0.82 from 230 ratings
Star Trek IV: The Voyage Home (1986) with score: 0.8 from 184 ratings
Star Trek III: The Search for Spock (1984) with score: 0.8 from 162 ratings
Star Trek VI: The Undiscovered Country (1991) with score: 0.79 from 151 ratings
Star Trek: Generations (1994) with score: 0.79 from 109 ratings
Last of the Mohicans, The (1992) with score: 0.79 from 118 ratings
Stargate (1994) with score: 0.78 from 121 ratings
Star Trek: The Motion Picture (1979) 

In [None]:
# Much better list of suggestions than the preivious versions. We see the inclusion of a lot of science fiction movies like 
# Star Trek, ALiens, and Starship Troopers. The inclusion of genre information helped.