# Genre-based collaborative filtering

This experimentation of the item-based collaborative filtering, inspired from the implementation of the improved method proposed by S. Choi, S. Ko, Y. Han, *A Movie Recommendation Algorithm Based on Genre Correlations, 2012*, is going to focus solely on *genre correlations* (**GBCF**) among movies, avoiding any kind of influence from fluid sources, like user ratings. Indeed, since genres are assumed to be assigned to movies by domain experts, we want to assert if they can provide a better accuracy than the previous ones, w.r.t. the predictive power of the similarity model.

In [1]:
import findspark

findspark.init()
findspark.find()

'D:\\ComputerScience\\Anaconda3\\pkgs\\pyspark-2.4.5-py_0\\site-packages\\pyspark'

In [2]:
from pyspark.sql import SparkSession

ss = SparkSession.builder.appName('IBCF with genre analysis') \
            .config('spark.master', 'local[*]').getOrCreate()
sc = ss.sparkContext

## Data analysis

### Training/test RDDs schema:

`userid, movieid, rating, timestamp, genres`

In [3]:
rdd_training = sc.parallelize([(1, 1, 5.0, 964982600, ['Comedy', 'Drama', 'Romantic']),
                                    (1, 2, 2.0, 964982730, ['Action', 'Drama']),
                                    (2, 1, 4.0, 964982503, ['Comedy', 'Drama', 'Romantic'])])
rdd_test = sc.parallelize([(2, 2, 4.0, 964985830, ['Action', 'Drama'])])

### Average ratings RDD schema:    

`movieid, avgrating, genres`

In [4]:
rdd_avg_rating = sc.parallelize([(1, 4.5, ['Comedy', 'Drama', 'Romantic']),
                                 (2, 2.0, ['Action', 'Drama'])])

### User favourite genres RDD schema:

`userid, favgenres`

In [5]:
rdd_fav_genres = sc.parallelize([(1, ['Action', 'Comedy']), (2, ['Drama'])])

## Genre correlation matrix

Map *rdd_training* as key-value pairs in the form `movieid => (genres, avgrating)`.

In [6]:
rdd_movies = rdd_training.map(lambda x: (x[1], tuple(x[4]))) \
                    .join(rdd_avg_rating.map(lambda x: (x[0], x[1]))) \
                    .distinct().cache()

### Local implementation

In [7]:
class GCF():
    """Genre-based collaborative filtering"""
    
    def __init__(self, movies_data):
        self.__create_corr_matrix(movies_data)
        
    ## Private methods ##
    def __create_corr_matrix(self, movies_data):
        self.__corr_matrix = {}
        genres_tuples = []
        
        for movie_data in list(movies_data):
            genres = movie_data[1][0]
            
            genres_tuples.append(genres)

            while(len(genres) > 1):
                criterion_genre = genres[0]
                
                if criterion_genre not in self.__corr_matrix.keys():
                    from collections import defaultdict
                    self.__corr_matrix[criterion_genre] = defaultdict(int)

                genres = genres[1:]

                for g in genres:
                    self.__corr_matrix[criterion_genre]['Total_cnt'] += 1
                    self.__corr_matrix[criterion_genre][g] += 1
                    
        for k, v in self.__corr_matrix.items():
            total_cnt = v.pop('Total_cnt')

            for g in v.keys():
                v[g] /= total_cnt
                
        self.__filter_unique_genres(genres_tuples)
                        
    def __filter_unique_genres(self, genres_tuples):
        from functools import reduce
        import numpy as np

        self.__unique_genres = np.unique(np.array(reduce(
            lambda a, b: a + b, genres_tuples)))
        
    def __compute_movie_recommendation_points(self, user_fav_genres,
                        movie_genres, avg_rating):
        points_sum = 0.0

        for user_genre in user_fav_genres:
            norm_factor = len(movie_genres)
            if user_genre in movie_genres:
                norm_factor -= 1

            for movie_genre in movie_genres:
                points_sum += self.get_corr_coeff(user_genre, movie_genre,
                                            norm_factor)

        return round(points_sum * avg_rating \
                    / len(user_fav_genres), 6)
                
    ## Public methods ##
    def get_corr_coeff(self, genre_A, genre_B, norm_factor=1):
        if self.__corr_matrix is None \
                or len(self.__corr_matrix) == 0:
            raise ValueError('Correlation matrix not created yet')
            
        if genre_A not in self.__unique_genres:
            raise ValueError("Gender doesn't exists: %s" % genre_A.capitalize())
        elif genre_B not in self.__unique_genres:
            raise ValueError("Gender doesn't exists: %s" % genre_B.capitalize())
            
        if genre_A == genre_B:
            return 1.0
        elif genre_A not in self.__corr_matrix.keys():
            return 0.0
        elif genre_B not in self.__corr_matrix[genre_A].keys():
            return 0.0
        else:
            return self.__corr_matrix[genre_A][genre_B] \
                        * (1. / norm_factor)
        
    def make_recommendations(self, users_data,
                    unseen_movies_data, n_movies=10):
        movies_recommendations = {}

        for user_data in users_data:
            user_id, fav_genres = user_data

            movies_ratings = {}

            try:
                for movie_data in unseen_movies_data[user_id]:
                    movie_id, movie_info = movie_data
                    movie_genres, avg_rating = movie_info

                    movies_ratings[movie_id] = self.__compute_movie_recommendation_points(fav_genres,
                                                        movie_genres, avg_rating)

                    sorted_ratings = sorted(movies_ratings
                                .items())[:n_movies]

                    movies_recommendations[user_id] = sorted_ratings
            except KeyError:
                movies_recommendations[user_id] = None

        return movies_recommendations

In [8]:
movies_data = rdd_movies.collect()
print(movies_data)

[(2, (('Action', 'Drama'), 2.0)), (1, (('Comedy', 'Drama', 'Romantic'), 4.5))]


In [9]:
gcf = GCF(movies_data)
gcf.get_corr_coeff('Comedy', 'Drama')

0.5

### Distributed implementation (MapReduce)

Check Scala package *lsc.exam.diTo.Predictor*

## Movies recommendation

### Local implementation

In [10]:
users_data = rdd_fav_genres.collect()
print(users_data)

[(1, ['Action', 'Comedy']), (2, ['Drama'])]


#### Extract unseen movies per-user (Test set on a real scenario)

Transform *rdd_training* to extract yet to see/rate movies for each user with a <b>5-step</b> process:

<ol style="list-style-type: upper-alpha;">
  <li>Map <i>rdd_training</i> as key-value pairs in the form <code>userid => [seen_movieids]</code></li>
    
  <li>Map <i>rdd_training</i> as key-value pairs in the form <code>userid => ([seen_movieids], [all_movieids])</code></li>
  <li>Map <i>rdd_training</i> as key-value pairs in the form <code>userid => [unseen_movieids]</code>, by computing the set subtraction operation between the above two sets <code>[all_movieids] - [seen_movieids]</code>. </li>
    <li>Flatten <i>rdd_training</i> and invert key-value pairs in the form <code>unseen_movieid => userid</code> and join it with <i>rdd_movies</i></li> to extract movies information and obtain key-value pairs in the form <code>unseen_movieid => (userid, movieinfo)</code>
    <li>Re-invert key-value pairs in <i>rdd_training</i>, apply <i>reduceByKey</i> on <code>userid</code> to get key-list pairs in the form <code>userid => [(unseen_movieids, moviesinfo)]</code> and cache the results, which will be later collected as a Python dict with <i>collectAsMap()</i>.</li>
</ol>

In [11]:
# A
rdd_unseen_movies = rdd_training.map(lambda x: (x[0], (x[1],))) \
                        .reduceByKey(lambda x, y: x + y)

# B
rdd_unseen_movies = rdd_unseen_movies.cartesian(rdd_movies.map(lambda x: x[0])) \
                        .mapValues(lambda x: (x,)) \
                        .reduceByKey(lambda x, y: x + y)

# C
rdd_unseen_movies = rdd_unseen_movies.map(lambda x: (x[0][0], tuple(set(x[1]) - set(x[0][1])))) \
                        .flatMap(lambda x: [(x[0], i) for i in x[1]])

# D
rdd_unseen_movies = rdd_unseen_movies.map(lambda x: (x[1], x[0])) \
                        .join(rdd_movies) \
                        .map(lambda x: (x[1][0], [(x[0], x[1][1])]))

# E
rdd_unseen_movies = rdd_unseen_movies.reduceByKey(lambda x, y: x + y).cache()

In [12]:
unseen_movies_data = rdd_unseen_movies.collectAsMap()
print(unseen_movies_data)

{2: [(2, (('Action', 'Drama'), 2.0))]}


In [14]:
movies_recommendations = gcf.make_recommendations(users_data,
                                unseen_movies_data, 10)
print(movies_recommendations)

{1: None, 2: [(2, 2.0)]}


### Distributed implementation (MapReduce)

Check Scala package *lsc.exam.diTo.Recommender*

In [45]:
sc.stop()

## Results

Check PowerPoint slides *Genre-based collaborative filtering.pptx*