# 2020 Massive Data Analysis Term Project -- Recommendation System

109062623 林鎰鋒 Group51

In [1]:
from pyspark import SparkConf, SparkContext
from operator import itemgetter
from math import sqrt
import numpy as np

In [2]:
conf = SparkConf().setAppName("RecommendationSystem") \
                .setMaster("local[*]") \

sc = SparkContext(conf=conf)

## Process Data
Read and process the dataset 

`
610 users, 9742 movies, ~100k rating record
(userId, movieId, rating, timestamp)
`

From MovieLens: https://grouplens.org/datasets/movielens/

#### movie_as_key (map)
Transform data as follow

`
(movieId, (userId, rating))
`

In [3]:
def movie_as_key(x):
    x = x.split(",")
    return (int(x[1]), (int(x[0]), float(x[2])))

In [4]:
raw_data = sc.textFile("ratings_minimal.csv")
header = raw_data.first()

data = raw_data.filter(lambda line: line != header) \
        .map(movie_as_key)

## Item-item Collaborative Filtering
Output Result: https://www.dropbox.com/s/ao53hl6rfkbwl4p/similarity.out?dl=0

### Subtract Mean & Norm
Caculate the substract mean and norm for each movie item

#### total_sum
Get the sum of user rating for each movie

#### total_count
Get the number of user rating for each movie

#### mean
Caculate the mean user rating of each movie

#### subtract_mean (map)
Calculate the subtract mean, and transform data as follow

`
(movie_k, (userId, rating - mean of movie_k))
`

#### caculate_norm (map)
Transform data as follow, and calculate the norm

`
(movie_k, (norm_k, [(user_1, rating_1k), (user_2, rating_2k), ... , (user_i, rating_ik)]))
rating_ij -> (movie_i, user_j)
`

#### user_as_key (map)
Transform data as follow

`
(userId, (movieId, subtract_mean_rating, movie_norm))
`

In [5]:
def subtract_mean(x):
    ((user, rating), mean) = x
    return (user, rating - mean)

def caculate_norm(rating_list):
    return (sqrt(sum([rating[1] ** 2 for rating in rating_list])), rating_list)

def user_as_key(x):
    ((movie, norm), (user, rating)) = x
    return (user, (movie, rating, norm))

In [6]:
total_sum = data.map(lambda x: (x[0], x[1][1])) \
                .reduceByKey(lambda a, b: a + b)

total_count = data.map(lambda x: (x[0], x[1][1])) \
                .groupByKey() \
                .mapValues(len)

mean = total_sum.join(total_count) \
            .mapValues(lambda x: x[0]/x[1])

In [7]:
item = data.join(mean) \
    .mapValues(subtract_mean) \
    .groupByKey() \
    .mapValues(caculate_norm) \
    .map(lambda x: ((x[0], x[1][0]), x[1][1])) \
    .flatMapValues(lambda x: x) \
    .map(user_as_key)

### Cosine Similarity
Caculate the "Cosine Similarity" for each movie item pairs

#### filter_duplicates
Join the all movie item to get all possible movie item pairs and drop all duplicate movie item pairs by filter_duplicates()

`
((movie1, rating1, norm1), (movie2, rating2, norm2))
`

#### make_pairs (map)
Transform data as follow

`
((movie1, rating1, norm1), (movie2, rating2, norm2)) -> ((movie1, movie2), ((rating1, norm1), (rating2, norm2)))
`

#### caculate_dot_product (map)
After groupByKey() we get structure as follow, then we caculate the dot product of the movie item pairs by their user rating which rating by same user

`
((movie1, movie2), [((rating_11, norm1), (rating_21, norm2)), ((rating_12, norm1), (rating_22, norm2)), ...])
rating_ij -> (movie_i, user_j)
`
#### caculate_cosine_similarity (map)
Here, each movie item pairs, we got both dot product and their self norm.

Then, we caculate the cosine similarity for each movie item pairs.

Finally, we got data as follow.

`
((movie_i, movie_j), similarity_ij)
`

Ps. if two movie items' norm is both 0, then we set the cosine similarity as zero

In [8]:
def filter_duplicates(user_ratings):
    ratings = user_ratings[1]
    (movie1, rating1, norm1) = ratings[0]
    (movie2, rating2, norm2) = ratings[1]
    return movie1 < movie2

def make_pairs(user_ratings):
    ratings = user_ratings[1]
    (movie1, rating1, norm1) = ratings[0]
    (movie2, rating2, norm2) = ratings[1]
    return ((movie1, movie2), ((rating1, norm1), (rating2, norm2)))

def caculate_dot_product(rating_pairs):
    dot_product = 0
    for (rating1, norm1), (rating2, norm2) in rating_pairs:
        dot_product += rating1 * rating2
    return (dot_product, norm1, norm2)
    
def caculate_cosine_similarity(x):
    (dot_product, norm1, norm2) = x
    if (norm1 != 0 and norm2 != 0):
        return dot_product / (norm1 * norm2)
    else:
        return 0.0

In [9]:
similarity = item.join(item) \
    .filter(filter_duplicates) \
    .map(make_pairs) \
    .groupByKey() \
    .mapValues(caculate_dot_product) \
    .mapValues(caculate_cosine_similarity) \
    .sortBy(lambda x: x[1], ascending=False)

In [10]:
result = similarity.collect()

In [11]:
np.savetxt("similarity.out", result, delimiter=",", fmt='%s')

  return array(a, dtype, copy=False, order=order)


## Rating Predictions
Select top 10 similarity to calculate the movie rating for each user

Output Result: https://www.dropbox.com/s/wtcogl0lv1fxx7n/predict.out?dl=0

#### movie_all, user_predict, user_rating
Used to create (user, movie) pair which need to predict
`
user_predict -> (user, movie)
user_rating  -> (user, (movie, rating)
`

#### item_pair = user_predict.join(user_rating)
Join the users which want to predict with other movie which also rating by them. And transform data as follow

`
item_pair -> ((movie_i, movie_j), (movie_i, user_k, rating_jk))
`

#### item_pair.join(similarity)
Join the item pair with their similarity
`
((movie_i, movie_j), (movie_i, user_k, rating_jk)) join ((movie_i, movie_j), similarity_ij) ->
((movie_i, movie_j), ((movie_i, user_k, rating_jk), similarity_ij)
`

#### rating_predictions_top_10 (map)
Calculate the movie's rating by top 10 similarity movies' user rating

If the similarity in top 10 is less or equal to zero than drop it

In [12]:
def rating_predictions_top_10(x):
    numerator = denominator = counter = 0
    x = list(x)
    x.sort(key=itemgetter(1), reverse=True)
    
    for (rating, similarity) in x:
        if(counter == 10 or similarity <= 0): break
        numerator += (similarity * rating)
        denominator += similarity
        counter += 1
      
    if(denominator == 0):
        return 0
    else:
        return numerator / denominator

In [13]:
user = sc.broadcast(data.map(lambda x: (x[1][0], (x[0], x[1][1]))) \
                        .groupByKey() \
                        .keys() \
                        .collect())

movie = data.map(lambda x: (x[0], x[1][0])) \
            .groupByKey() \
            .mapValues(list) \
            .flatMapValues(lambda x: x)

movie_all = data.groupByKey() \
            .keys() \
            .map(lambda x: (x, user.value)) \
            .flatMapValues(lambda x: x)

In [14]:
user_predict = movie_all.subtract(movie) \
                        .map(lambda x: (x[1], x[0])) \

user_rating = data.map(lambda x: (x[1][0], (x[0], x[1][1]))) \
                .groupByKey()

In [15]:
item_pair = user_predict.join(user_rating) \
                .map(lambda x: ((x[0], x[1][0]), x[1][1])) \
                .flatMapValues(lambda x: x) \
                .map(lambda x: ((x[0][1], x[1][0]), (x[0][1], x[0][0], x[1][1])))

In [16]:
prediction = item_pair.join(similarity) \
                    .map(lambda x: ((x[1][0][1], x[1][0][0]), (x[1][0][2], x[1][1]))) \
                    .groupByKey() \
                    .mapValues(rating_predictions_top_10) \
                    .filter(lambda x: x[1] > 0) \
                    .sortBy(lambda x: x[1], ascending=False)

In [17]:
result = prediction.collect()

In [18]:
np.savetxt("predict.out", result, delimiter=",", fmt='%s')

In [19]:
sc.stop()