In [0]:
import pymongo
import pyspark
from pyspark.sql import SparkSession

# Connection session
client = pymongo.MongoClient('mongodb://admin:nUOSRJ3kasFEspIb@sg-mycluster-44605.servers.mongodirector.com', 27017)
db = client['movielens']

# Connection session
connectionString = "mongodb://admin:nUOSRJ3kasFEspIb@sg-mycluster-44605.servers.mongodirector.com:27017/admin?authSource=admin&readPreference=primary&appname=MongoDB%20Compass&ssl=false"
spark = SparkSession\
    .builder\
    .config('spark.mongodb.input.uri', connectionString)\
    .config('spark.mongodb.output.uri', connectionString)\
    .config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.11:2.3.1')\
    .getOrCreate()

# Caricamento delle collection
ratings_collection = db.ratings
movies_collection = db.movies
user_preferences_collection = db.userPreferences
user_movies_recommendation_collection = db.userMoviesRecommendation

# Caricamento dei film in un RDD
pipe_movies = "{'$project': {'_id': 0}}"
moviesDF = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", connectionString).option("database", "movielens").option("collection", "moviesTags").option("pipeline", pipe_movies).load()
moviesDF = moviesDF.select('movieId','title','genres','tags')
moviesRDD = moviesDF.rdd.map(lambda x: (x[0],x[1],x[2],x[3])).cache()  

In [0]:
import os
import json
from pyspark.mllib.recommendation import MatrixFactorizationModel

# Carica il modello ALS addestrato precedentemente e salvato in locale
def load_model_ALS():
  model_path = os.path.join('/', 'models', 'movie_lens')
  return MatrixFactorizationModel.load(sc, model_path)

# Predizioni del rating per ogni film ancora non visto dall'utente
def predict_ratings_for_unrated_movies(db, userId, complete_model):
    userRatingsMovieIds = list(db.ratings.distinct('movieId', {'userId': userId}))                               # tutti gli id dei film votati dall'utente 
    userUnratedRDD = moviesRDD.filter(lambda x: x[0] not in userRatingsMovieIds).map(lambda x: (userId, x[0]))   # get just movie unrated
    return complete_model.predictAll(userUnratedRDD)                                                             # predict ratings for unrated movies

# Conteggio dei ratings per ogni film
def computeMoviesRatingCount(db):
    rating_count = db.ratings.aggregate([
        {
            '$match': {
                'movieId': { '$not': {'$size': 0} }
            }
        },
        { '$unwind': "$movieId" },
        {
            '$group': {
                '_id': {'$toLower': '$movieId'},
                'count': { '$sum': 1 }
            }
        }
    ]);
    jsonString = json.dumps(list(rating_count))
    dbutils.fs.rm("/data/data.json")
    dbutils.fs.put("/data/data.json",jsonString)
    moviesRatingsCountsRDD = spark.read.json('/data/data.json', multiLine=True).cache().rdd.map(lambda x: (x[0],x[1]))
    return moviesRatingsCountsRDD

# Sostituisce all'id del film, il rating assegnato dall'utente a quel film
def substitute_movieID_withRating(genre, ratings_by_user):
    substituted = []
    for movie in genre['rating']:
        for rating in ratings_by_user:
            if rating['movieId'] == movie:
                substituted.append(float(rating['rating']))
    result_per_genre['rating'] = substituted
    return result_per_genre

# Trova il rating medio dei film votati dall'utente per ogni genere 
def average_rating_for_genreViewByUser(db, ID):
    ratings_by_user = list(db.ratings.find({'userId': ID}))                                # tutti i ratings dell'utente
    movie_ids_rated_by_user = list(db.ratings.distinct('movieId', {'userId': ID}))         # tutti i movieId votati dall'utente 
    pipeline = [
        {"$match": {"movieId": {"$in": movie_ids_rated_by_user}}},
        {"$unwind": "$genres"},
        {"$group": {"_id": "$genres", "rating": {"$addToSet": "$movieId"}}},
        {"$addFields": {"genre": "$_id"}},
        {"$project": {"_id": 0}}
        ]
    genre_rated_by_user = list(db.movies.aggregate(pipeline, cursor={}))                    # crea la lista di generi visti dall'utente, ognuno con il relativo array di movieId

    preferences = {}
    preferences['userId'] = str(ID)
    preferences['genre'] = []
    for genre in genre_rated_by_user:
        genre = substitute_movieID_withRating(genre, ratings_by_user)            
        genre['rating'] = round((sum(genre['rating'])/len(genre['rating'])),2)             # calcola la media di tutti i  ratings relativi ad un genere
        preferences['genre'].append(genre)
    return preferences

# Ritorna true se le due liste hanno almeno un elemento in comune
def anyElem_List1_in_List2(list1, list2):
  return any(item in list1 for item in list2)

def top5_user_genre_preferences(userId,db):
  preferences_user = db.userPreferences.find({'userId': userId})
  preferences_user = list(preferences_user)[0]['genre']
  preferences_user = sorted(preferences_user, key = lambda i: i['rating'], reverse=True)
  top_preferences_user = preferences_user[:5]
  top_preferences_user = [d['genre'] for d in top_preferences_user]
  return top_preferences_user

In [0]:
################################ CREAZIONE COLLECTION USER GENRES PREFERENCES ##########################################
preferencesList = [] 
all_users = list(ratings_collection.distinct('userId'))
all_users = sorted([int(i) for i in all_users])

for user in all_users[:100]: 
    preferencesList.append(average_rating_for_genreViewByUser(db,str(user)))
user_preferences_collection.insert_many(preferencesList)

In [0]:
################################ CREAZIONE COLLECTION USER TOP 10 RECOMMENDATION MOVIES ##########################################

# Load del modello ALS già addestrato
complete_model = load_model_ALS() 

# Conteggio dei ratings per ogni film
moviesRatingsCountsRDD = computeMoviesRatingCount(db)

# Calcolo delle recommendation per i primi 100 utenti
for user in all_users[:100]: 
  
    userId = str(user)
    
    # Predizioni del rating per ogni film ancora non visto dall'utente
    myuserRecommendationsPredictionsRDD = predict_ratings_for_unrated_movies(db, userId, complete_model)

    # Join tra rating predetto, conteggio rating e generi del film 
    myuserRecommendationsRDD = myuserRecommendationsPredictionsRDD.map(lambda x : (str(x.product), x.rating))\
                                                                  .join(moviesRDD.map(lambda x: (str(x[0]),[x[1],x[2]])))\
                                                                  .join(moviesRatingsCountsRDD.map(lambda x: (str(x[0]),x[1])))\
                                                                  .map(lambda x: (x[0],x[1][0][1][0],x[1][0][0],x[1][1],x[1][0][1][1]))
    # Top 5 generi preferiti dall'utente
    genres_user_preferences = top5_user_genre_preferences(userId,db)

    # Filtraggio delle raccomandazioni di film con almeno 30 recensioni, e appartenenti ad un genere incluso nella lista di generi preferiti
    myuserTopRecommendationRDD = myuserRecommendationsRDD.filter(lambda x: x[3]>=30).filter(lambda x: anyElem_List1_in_List2(x[4],genres_user_preferences))

    # Filtraggio dei top 10 aventi rating più alto 
    myuserTopRecommendationRDD = myuserTopRecommendationRDD.map(lambda x: (x[0],x[1],x[2],x[3])).takeOrdered(10, key=lambda x: -x[2])

    # Convert RDD to dict and persist recommendation 
    moviesList = []
    keys = ["movieId", "title", "rating", "ratings_count"]
    
    for lista in myuserTopRecommendationRDD:
      yourdic = dict(zip(keys, lista))
      moviesList.append(yourdic)

    userRecommendations = {}
    userRecommendations['user'] = userId
    userRecommendations['movies'] = moviesList

    user_movies_recommendation_collection.insert_one(userRecommendations)

 

In [0]:
##################################### COLLECTION DEI RATINGS RELATIVI AI PRIMI 100 UTENTI  ###############################

pipeline = [
      {"$addFields": {"userId": { "$toInt": "$userId" }}},
      {"$match": {"userId": {"$lt": 101}}},
      {"$out": "userRatings" }
      ]
db.ratings.aggregate(pipeline, cursor={})