### Extract and manipulate Dataframes, uploading dataframe into MongoDB
#### Data Source: MovieLens-25M
- Movie Meta-data: movies.csv
- User Rating-data: ratings.csv
- Movie Tags-data: tags.csv

#### Output Data Schema:
Information Schema

- Movie Meta-data with Genres: MovieMeta
- User Ratings: UserRating
- Movie Tags from User Edited: MovieTag
- User Information: UserInfo

Statictis Analysis Schema

- Recent Movie Rating Count: RecentMovieRateCnt
- Movie Average Rating: MovieAvgRate
- Total Movie Rating Count: MovieRateCnt
- Top10 Movie in each genres: Top10MoviewGenres

Recommendation Schema:

- offline： each movie similarity matrix: MovieSimOffline
- offline： each user recommend movie matrix: RecMovie2UserOffline
- online:   each user recommend movie matrix: RecMovie2UserOnline

### !!! MongoDB json standard format
#### [{data1}, {data2}, ...]

In [1]:
import pandas as pd
import numpy as np
from pymongo import MongoClient

### Load MovieLens25M dataset

In [2]:
path = "../Dataset/ml-25m/"

In [3]:
movies_df = pd.read_csv(path+"movies.csv")
ratings_df = pd.read_csv(path+"ratings.csv")
tags_df = pd.read_csv(path+"tags.csv")

In [4]:
movies_df_link = pd.read_json(path+"movieMeta.json")
# movies_df_link.movieId = movies_df_link.movieId.apply(str)
# movies_df_link = movies_df_link.set_index('movieId')

user_df = pd.read_json(path+"userLogin.json")
# user_df.userId = user_df.userId.apply(str)
# user_df = user_df.set_index('userId')

### Connect the MongoDB
#### Important: In MongoDB, a collection is not created until it gets content!

In [5]:
client = MongoClient("mongodb+srv://root:13820381042bq@recomsys.zqcg8.mongodb.net/test")
db = client['PandoRec']


## create all collection
movie_meta = db['MovieMeta']
user_rating = db['UserRating']
movie_tag = db['MovieTag']
user_info = db['UserInfo']

recent_movie_rating_count = db['RecentMovieRateCnt']
movie_avg_rating = db['MovieAvgRate']
total_movie_rating_count = db['MovieRateCnt']
top10_movie_in_each_genres = db['Top10MovieEachGenres']

movie_sim_matrix_offline = db['MovieSimOffline']
user_sim_matrix_offline = db['UserSimOffline']
recommend_movie_matrix_toUser_offline = db['RecMovie2UserOffline']
recommend_movie_matrix_toUser_online = db['RecMovie2UserOnline']

### Start Progress: Loading data into collection
- Step1: Load dataframe -> convert each rows into list of dict (key in each dict is Field of collection)
- Step2: Convert dataframe into dictionary
- Step3: (collection).insert_many(dictionary) -> to insert multiple document need to use insert_many

### Part1. Movie Meta and User Login info

In [11]:
movies_link_dict = movies_df_link.to_dict('record')
user_info_dict = user_df.to_dict('record')

In [14]:
movie_meta.insert_many(movies_link_dict)
user_info.insert_many(user_info_dict)

<pymongo.results.InsertManyResult at 0x26582024ec8>

### Part 2. Rating history data
#### Consider free space only 500M , so just load part of history data

In [20]:
ratings_dict = ratings_df[:10000].to_dict('record')

In [22]:
user_rating.insert_many(ratings_dict)

<pymongo.results.InsertManyResult at 0x268eb114988>

In [25]:
movies_rate_df = ratings_df.groupby('movieId')['userId'].size() \
                                       .reset_index(name='count') \
                                       .sort_values(['count'], ascending=False)

# movies_rate_df.movieId = movies_rate_df.movieId.apply(str)
# movies_rate_df = movies_rate_df.set_index('movieId')
movies_rate_dict = movies_rate_df.to_dict('record')

total_movie_rating_count.insert_many(movies_rate_dict)

<pymongo.results.InsertManyResult at 0x268ee52be08>

#### Challenge: when dataframe size is too large, row operation will very slow.

#### Solution: Note! Spark is parallel cannot help you solve memory issue.
   - chunk processing - map reduce
   - Pyspark (my option)

In [9]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("PandoRec").getOrCreate()

ratings_spark_df = spark.read.csv(
    path+"ratings.csv",
    sep=",",
    header=True,
    quote='"',
    inferSchema=True
)

ratings_spark_df.show()

+------+-------+------+----------+
|userId|movieId|rating| timestamp|
+------+-------+------+----------+
|     1|    296|   5.0|1147880044|
|     1|    306|   3.5|1147868817|
|     1|    307|   5.0|1147868828|
|     1|    665|   5.0|1147878820|
|     1|    899|   3.5|1147868510|
|     1|   1088|   4.0|1147868495|
|     1|   1175|   3.5|1147868826|
|     1|   1217|   3.5|1147878326|
|     1|   1237|   5.0|1147868839|
|     1|   1250|   4.0|1147868414|
|     1|   1260|   3.5|1147877857|
|     1|   1653|   4.0|1147868097|
|     1|   2011|   2.5|1147868079|
|     1|   2012|   2.5|1147868068|
|     1|   2068|   2.5|1147869044|
|     1|   2161|   3.5|1147868609|
|     1|   2351|   4.5|1147877957|
|     1|   2573|   4.0|1147878923|
|     1|   2632|   5.0|1147878248|
|     1|   2692|   5.0|1147869100|
+------+-------+------+----------+
only showing top 20 rows



### Convert String Unix-TimeStamp Value into Datetime

In [10]:
from pyspark.sql import functions as f
from pyspark.sql import types as t
from pyspark.sql.types import *


ratings_spark_df = ratings_spark_df.withColumn('timestamp', 
                                               f.date_format(ratings_spark_df.timestamp.cast(dataType=t.TimestampType()), 
                                                                          "yyyy-MM-dd"))
ratings_spark_df = ratings_spark_df.withColumn('timestamp', 
                                   f.to_date(ratings_spark_df.timestamp.cast(dataType=t.TimestampType())))

#### Recent Ratings count analysis

In [11]:
recent_ratings_spark_df = ratings_spark_df.filter(ratings_spark_df.timestamp>'2019-06-30')

recent_ratings_cnt_spark_df = recent_ratings_spark_df.groupBy('movieId') \
                                               .count() \
                                               .orderBy('count', ascending=False)

recent_ratings_cnt_df = recent_ratings_cnt_spark_df.toPandas()

In [12]:
# recent_ratings_cnt_df.index = recent_ratings_cnt_df.index.astype(str)
recent_ratings_cnt_dict = recent_ratings_cnt_df.to_dict('record')
recent_movie_rating_count.insert_many(recent_ratings_cnt_dict)

<pymongo.results.InsertManyResult at 0x2b484226b48>

#### Average Rating for each movies

In [13]:
avg_ratings_spark_df = ratings_spark_df.groupBy('movieId').avg('rating')

In [14]:
avg_ratings_spark_df = avg_ratings_spark_df.withColumn('avg(rating)', 
                                                       f.round(avg_ratings_spark_df['avg(rating)'], 2))

avg_ratings_df = avg_ratings_spark_df.toPandas()

In [15]:
avg_ratings_df = avg_ratings_df.rename(columns={'avg(rating)': 'avg_rate'})
# avg_ratings_df.index = avg_ratings_df.index.astype(str)
avg_ratings_dict = avg_ratings_df.to_dict('record')
movie_avg_rating.insert_many(avg_ratings_dict)

<pymongo.results.InsertManyResult at 0x2b483837088>

#### Top10 High Ratings Movies in each genres

In [6]:
import re
## extract year by regex
def extract_year(row):
    get_year = re.findall(r'\(\d{4}\)', row)
    if not get_year:
        return 'unknown'
    return get_year[0][1:-1]

## remove release year in title
def simplify_title(row):
    row = re.sub(r'\(\d{4}\)','', row) ## hardcode remove ('year-of-release')
    return row

def one_hot(row):
    list_of_movieTags = row.genres.split('|')
    for tag in list_of_movieTags:
        row[tag] = 1
    return row

In [7]:
movies_df['release_year'] = movies_df.title.apply(lambda row: extract_year(row))
movies_df['title'] = movies_df.title.apply(lambda row:simplify_title(row))
movies_df = movies_df.apply(lambda row: one_hot(row), axis=1)
movies_df = movies_df.fillna(0)

In [16]:
genres_list = ['(no genres listed)', 'Action', 'Adventure', 'Animation', 'Children',
       'Comedy', 'Crime', 'Documentary', 'Drama', 'Fantasy', 'Film-Noir',
       'Horror', 'IMAX', 'Musical', 'Mystery', 'Romance', 'Sci-Fi', 'Thriller',
       'War', 'Western']
rated_movies_df = avg_ratings_df.merge(movies_df, left_on='movieId', right_on='movieId')
rated_movies_df.head()

Unnamed: 0,movieId,avg_rate,(no genres listed),Action,Adventure,Animation,Children,Comedy,Crime,Documentary,...,Musical,Mystery,Romance,Sci-Fi,Thriller,War,Western,genres,release_year,title
0,1088,3.25,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,1.0,0.0,1.0,0.0,0.0,0.0,0.0,Drama|Musical|Romance,1987,Dirty Dancing
1,1580,3.58,0.0,1.0,0.0,0.0,0.0,1.0,0.0,0.0,...,0.0,0.0,0.0,1.0,0.0,0.0,0.0,Action|Comedy|Sci-Fi,1997,Men in Black (a.k.a. MIB)
2,3175,3.61,0.0,0.0,1.0,0.0,0.0,1.0,0.0,0.0,...,0.0,0.0,0.0,1.0,0.0,0.0,0.0,Adventure|Comedy|Sci-Fi,1999,Galaxy Quest
3,44022,3.26,0.0,0.0,1.0,1.0,1.0,1.0,0.0,0.0,...,0.0,0.0,0.0,0.0,0.0,0.0,0.0,Adventure|Animation|Children|Comedy,2006,Ice Age 2: The Meltdown
4,175197,2.75,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,...,0.0,0.0,0.0,1.0,0.0,0.0,1.0,Fantasy|Horror|Sci-Fi|Western,2017,The Dark Tower


In [18]:
top10_genres_movies_dict = []

for genre in genres_list:
    
    top10_movieIds_list = rated_movies_df[rated_movies_df[genre]==1].sort_values(by='avg_rate', 
                                                                                 ascending=False) \
                                                                    .head(10) \
                                                                    .movieId.values.tolist()
    top10_genres_movies_dict.append({
        'genre': genre,
        'Top10Movie': top10_movieIds_list
    })
    

In [20]:
top10_movie_in_each_genres.insert_many(top10_genres_movies_dict)

<pymongo.results.InsertManyResult at 0x2b48423d808>

### Upload Similarity matrix based on Embedding Result from Deep Learning Model

In [134]:
from keras.models import load_model

model = load_model('../Recsys_Model/embedding_full')

### Calculate pearson correlation coeffiencient with each user and movie
#### Chanllenge: Since matrix is so large, full-load might require >100GB memory space, we need to load this matrix by patition from disk space.
#### Solution: using np.memmap -> as an iterator to stream this large dataset

In [27]:
import os.path as path
from tempfile import mkdtemp
filename = path.join(mkdtemp(),'user_pearson_sim.dat')

#### Get user & movie embedding ID mapping

In [135]:
full_dataset_df = ratings_df.drop('timestamp', axis=1).sample(frac=1).reset_index(drop=True)
user_id_mapping = {i: id for i, id in enumerate(full_dataset_df.userId.unique())}
movie_id_mapping = {i: id for i, id in enumerate(full_dataset_df.movieId.unique())}

In [136]:
import numpy as np

user_embeddings = model.layers[2].get_weights()[0].astype(np.float16)
movie_embeddings = model.layers[3].get_weights()[0].astype(np.float16)

#### Find Top5 Sim User and Upload user sim matrix into MongoDB

#### Calculate pearson matrix for each user & movie

In [5]:
# minus mean 
user_embeddings -= np.mean(user_embeddings, axis=1)[:,None]
# normalize the data
user_embeddings /= np.sqrt(np.sum(user_embeddings*user_embeddings, axis=1))[:,None]

In [6]:
SPLITROWS = 1000
numrows = user_embeddings.shape[0]

res = np.memmap(filename, "float16", mode="w+", shape=(numrows, numrows))

for r in range(0, numrows, SPLITROWS):
    for c in range(0, numrows, SPLITROWS):
        r1 = r + SPLITROWS
        c1 = c + SPLITROWS
        chunk1 = user_embeddings[r:r1]
        chunk2 = user_embeddings[c:c1]
        
        res[r:r1, c:c1] = np.dot(chunk1, chunk2.T)

In [None]:
import time
from sklearn.metrics.pairwise import cosine_similarity


numrow = user_embeddings.shape[0]

user_sim_dict = {}

for _row in range(numrow):
    _temp_dict = {}
    _sim_matrix = cosine_similarity(user_embeddings[_row,:].reshape(1,100), user_embeddings)
    top5_users_inner_index = _sim_matrix[0].argsort()[-6:-1][::-1]

    for _inner_idx in top5_users_inner_index:
    ## get real id from user_id_mapping based on inner idx
        _temp_dict[str(user_id_mapping[_inner_idx])] = float(_sim_matrix[0][_inner_idx])

    user_sim_dict[str(user_id_mapping[_row])] = _temp_dict


### Load User & Movie Sim Matrix into Mongo

In [47]:
import json
with open('user_sim.json', 'r') as f:
    user_sim_dict = json.load(f)

In [55]:
users_sim_dict_list = []

for key, val in user_sim_dict.items():
    _temp_dict = {}
    _temp_dict['userId'] = int(key)
    _temp_dict['sim_user'] = [int(k) for k in val.keys()]
    users_sim_dict_list.append(_temp_dict)

In [58]:
user_sim_matrix_offline.insert_many(users_sim_dict_list)

<pymongo.results.InsertManyResult at 0x2692629ed48>

In [59]:
with open('movie_sim.json', 'r') as f:
    movie_sim_dict = json.load(f)

In [60]:
movies_sim_dict_list = []

for key, val in movie_sim_dict.items():
    _temp_dict = {}
    _temp_dict['movieId'] = int(key)
    _temp_dict['sim_movie'] = [int(k) for k in val.keys()]
    movies_sim_dict_list.append(_temp_dict)

In [61]:
movie_sim_matrix_offline.insert_many(movies_sim_dict_list)

<pymongo.results.InsertManyResult at 0x268ef34ce48>

#### Calculate movie similarity matrix

In [102]:
numrow = user_embeddings.shape[0]
user_sim_dict = {}
i = 0
for _row in range(numrow):
    _userID = user_id_mapping[_row]
    _temp_dict = {}
    _sim_matrix = np.dot(user_embeddings[_row,:].reshape(1,100), user_embeddings.T)
    top5_users_inner_index = _sim_matrix[0].argsort()[-6:-1][::-1]
    
    for _inner_idx in top5_users_inner_index:
    ## get real id from movie_id_mapping based on inner idx
        _temp_dict[str(user_id_mapping[_inner_idx])] = _sim_matrix[0][_inner_idx]
    
    user_sim_dict[str(user_id_mapping[_row])] = _temp_dict
    i += 1
    if i%1000 == 0:
        print(i)

array([[ 0.364   ,  0.01741 , -0.17    , ...,  0.4836  , -0.439   ,
         0.4016  ],
       [ 0.626   ,  0.07996 , -0.4165  , ..., -0.01782 , -0.1381  ,
        -0.03824 ],
       [ 0.03223 ,  0.148   , -0.2979  , ...,  0.2491  , -0.5894  ,
         0.2413  ],
       ...,
       [ 0.2341  , -0.3242  , -0.3008  , ...,  0.3765  , -0.1278  ,
         0.252   ],
       [ 0.1858  , -0.006405, -0.1653  , ...,  0.3123  , -0.294   ,
         0.03735 ],
       [ 0.0863  , -0.2101  , -0.3047  , ...,  0.4214  , -0.3706  ,
         0.1136  ]], dtype=float16)

#### Find Top10 Sim Movie and Upload movie sim matrix into MongoDB

In [54]:
movie_embeddings -= np.mean(movie_embeddings, axis=1)[:,None]
movie_embeddings /= np.sqrt(np.sum(movie_embeddings*movie_embeddings, axis=1))[:,None]

In [None]:
numrow = movie_embeddings.shape[0]
movies_sim_dict = {}

for _row in range(numrow):
    _movieID = movie_id_mapping[_row]
    _temp_dict = {}
    _sim_matrix = np.dot(movie_embeddings[_row,:].reshape(1,100), movie_embeddings.T)
    top10_movies_inner_index = _sim_matrix[0].argsort()[-11:-1][::-1]
    
    for _inner_idx in top10_movies_inner_index:
    ## get real id from movie_id_mapping based on inner idx
        _temp_dict[str(movie_id_mapping[_inner_idx])] = sim_matrix[0][_inner_idx]
    
    movies_sim_dict[str(movie_id_mapping[_row])] = _temp_dict


### Note: MongoDB insert document only allow string key, also value would better primary datatype

In [90]:
movie_sim_key_values = movies_sim_dict.items()
movies_sim_dict_cvt = {}

for key, value in movie_sim_key_values:
#     try:
    _sub_dict_items = value.items()
    _sub_dict = {}
    for _k, _v in _sub_dict_items:
        _sub_dict[str(_k)] = float(_v)

    movies_sim_dict_cvt[str(key)] = _sub_dict
#     except:
#         print(value)
#         print(key)

In [92]:
movie_sim_matrix_offline.insert_one(movies_sim_dict_cvt)

<pymongo.results.InsertOneResult at 0x24013493048>