# YoutubeDNN 召回实现

## 1. 下载文件

In [None]:
from urllib.request import urlretrieve
import zipfile
import pandas as pd
import os

filename = 'ml-100k.zip'
if not os.path.exists(filename):
    # 下载文件
    urlretrieve("http://files.grouplens.org/datasets/movielens/ml-100k.zip", filename)
    zip_ref = zipfile.ZipFile(filename, 'r')
    zip_ref.extractall()
    print(f'Download File: {filename}')
print(f'{filename} existed.')

## 2. Preprocess

处理四种数据：movies, users, ratings, genre

In [None]:
# user --- u.user
users_col = ['user_id', 'age', 'gender', 'occupation', 'zip_code']
users = pd.read_csv('ml-100k/u.user', sep='|', names=users_col)

# rating --- u.data
ratings_col = ['user_id', 'movie_id', 'rating', 'timestamp']
ratings = pd.read_csv('ml-100k/u.data', sep='\t', names=ratings_col)

# movies and genres --- aggregate u.item and u.genre
movies_col = ['movie_id', 'movie_title', 'release_date', 'video_release_date', 'IMDb_URL']
genres_col = ['genre_unknown', 'Action', 'Adventure', 'Animation', 'Children', 'Comedy', 'Crime', 'Documentary', 'Drama', 'Fantasy', 
              'Film-Noir', 'Horror', 'Musical', 'Mystery', 'Romance', 'Sci-Fi', 'Thriller', 'War', 'Western']
movies_col = movies_col + genres_col
movies = pd.read_csv('ml-100k/u.item', sep='|', names=movies_col)

print(users.dtypes, '\n')
print(ratings.dtypes, '\n')
print(movies.dtypes, '\n')

将电影所属的 genre 拼接成一个多值属性，比如 `3,4,5,15` 的形式

In [None]:
genre_encoded = {x:i for i, x in enumerate(genres_col)}
all_genre = [','.join([str(i) for i,x in enumerate(arr) if x == 1]) for arr in movies[genres_col].values]
movies['all_genres'] = all_genre

In [None]:
movies.head(3)

将 ratings, movies, users 全部聚合在一起

In [None]:
ratings_all = ratings.merge(movies, on='movie_id').merge(users, on='user_id')
ratings_all = ratings_all.drop(columns=genres_col)
ratings_all.shape

根据 ratings 的数值来判断喜欢还是不喜欢，>= 3 则为喜欢

In [None]:
import numpy as np
# TODO 这里可以换为 0 和 1
ratings_all['like_type'] = np.where(ratings_all['rating']>=3, 'like', 'dislike')
ratings_all['movie_name'] = ratings_all['movie_title'].str[:-6]

按照 user_id 来排序，内部再根据时间戳来排序

In [None]:
ratings_all=ratings_all.sort_values(by=['user_id', 'timestamp'])
ratings_all.head(10)

将可能会不连续的 user_id 映射转化为连续的 user_id

In [None]:
user_ids = ratings_all['user_id'].unique().tolist()
user2user_encoded = {x: i for i, x in enumerate(user_ids)}

movie_ids = ratings_all["movie_id"].unique().tolist()
movie2movie_encoded = {x: i for i, x in enumerate(movie_ids)}

title_ids = ratings_all["movie_name"].unique().tolist()
title2title_encoded = {x: i for i, x in enumerate(title_ids)}


occupation_ids = ratings_all['occupation'].unique().tolist()
oc2oc_encoded = {x: i for i, x in enumerate(occupation_ids)}

ratings_all['user'] = ratings_all['user_id'].map(user2user_encoded)
ratings_all['movie'] = ratings_all['movie_id'].map(movie2movie_encoded)
ratings_all['title_d'] = ratings_all['movie_name'].map(title2title_encoded)
ratings_all['occupation'] = ratings_all['occupation'].map(oc2oc_encoded)

In [None]:
ratings_all.head(10)

In [None]:
# 用户看过所有电影并且根据喜欢和不喜欢进行分类
movie_list = ratings_all.groupby(['user','like_type'])['movie'].apply(list).reset_index()
# 每个用户看过的所有电影
title_list = ratings_all.groupby(['user'])['title_d'].apply(list).reset_index()
# 每个用户看过电影的所有题材类型
genre_list = ratings_all.groupby(['user'])['all_genres'].unique().apply(list).reset_index()
genre_list

去除重复的 genres 项

In [None]:
genre_list['all_genres'] = genre_list['all_genres'].apply(lambda x: [i for i in list(set(','.join(x))) if i.isdigit()] )
genre_list

将电影分为用户喜欢和不喜欢的两种类别

In [None]:
user_video_list = movie_list.pivot(index='user', columns='like_type', values='movie').reset_index()
# 填充无效值，可能会存在没有喜欢的或者没有不喜欢的
user_video_list.fillna(ratings_all['movie'].max()+1, inplace=True)
user_video_list.head(3)

In [None]:
user_data = ratings_all[['user', 'occupation', 'gender', 'age']]
# 相当于复制一份数据？
user_data =user_data.drop_duplicates()
user_data = user_data.reset_index()
user_data = user_data.drop('index',axis=1)
user_data

In [None]:
dataset = user_video_list.merge(title_list, on='user').merge(genre_list).merge(user_data)
dataset['like'] = dataset['like'].apply(lambda x: x if type(x) is list else [x])
dataset['dislike'] = dataset['dislike'].apply(lambda x: x if type(x) is list else [x])
dataset['predict_labels'] = dataset['like'].apply(lambda x: x[-1])
dataset['like'] = dataset['like'].apply(lambda x: x[:-1])
dataset['age'] = (dataset['age'] - dataset['age'].min()) / (dataset['age'].max() - dataset['age'].min())
dataset['gender'] = np.where(dataset['gender'] == 'M', 0, 1)
dataset

预处理完毕，开始分割数据集为训练集和测试集

## 3. 构建模型

首先引入依赖

In [None]:
import tensorflow as tf
from keras.layers import Layer, Embedding, Dense, Input, BatchNormalization
from keras.models import Model
from tensorflow import keras

Masked Embedding Aggregation

In [None]:
class MaskedEmbeddingsAggregatorLayer(Layer):
    def __init__(self, agg_mode='sum', *args, **kwargs):
        super(MaskedEmbeddingsAggregatorLayer, self).__init__(**kwargs)
        
        if agg_mode not in ['sum', 'mean']:
            raise NotImplementedError
        self.agg_mode = agg_mode
    
    @tf.function
    def call(self, inputs, mask=None):
        # 对不规则张量进行 mask 操作
        masked_embeddings = tf.ragged.boolean_mask(inputs, mask)
        if self.agg_mode == 'sum':
            aggregated = tf.reduce_sum(masked_embeddings, axis=1)
        elif self.agg_mode == 'mean':
            aggregated = tf.reduce_mean(masked_embeddings, axis=1)
        return aggregated
    
    def get_config(self):
        return {'agg_mode': self.agg_mode}

L2 Normalize Layer

In [None]:
class L2NormLayer(Layer):
    def __init__(self, **kwargs):
        super(L2NormLayer, self).__init__(**kwargs)
    
    @tf.function
    def call(self, inputs, mask=None):
        if mask is not None:
            inputs = tf.ragged.boolean_mask(inputs, mask).to_tensor()
        return tf.math.l2_normalize(inputs, axis=-1)
    
    def compute_mask(self, inputs, mask):
        return mask

Model

In [None]:
class YoutubeDNNRecall(Model):
    def __init__(self, feature_columns, feature_vocab, ebd_dim, **kwargs):
        super(YoutubeDNNRecall, self).__init__(**kwargs)
        self.feature_columns = feature_columns
        # 注意设置 mask_zero 为 true
        self.feature_ebd = Embedding(input_dim=feature_vocab, input_length=1, output_dim=ebd_dim, embeddings_initializer='random_normal', mask_zero=True, name='feature_embeddings')
        self.label_ebd = Embedding(input_dim=feature_vocab, input_length=1, output_dim=ebd_dim, embeddings_initializer='random_normal', mask_zero=True, name='label_embeddings')
        self.mask_ebd = MaskedEmbeddingsAggregatorLayer('mean', name='aggregate_embedding')
        self.dense1 = Dense(units=64, activation='relu', name='dense_1')
        self.dense2 = Dense(units=64, activation='relu', name='dense_2')
        self.dense3 = Dense(units=64, activation='relu', name='dense_3')
        self.bn = BatchNormalization()
        self.l2 = L2NormLayer(name='l2_norm')
        self.final = Dense(feature_vocab, activation=tf.nn.softmax, name='dense_output')
        
    def summary(self, line_length=None, positions=None, print_fn=None, expand_nested=False, show_trainable=False):
#         inputs = {f['name']: Input(shape=(), dtype=tf.string if f['dtype'] == str else f['dtype'], name=f['name']) for f in self.feature_columns}
        inputs = [Input(shape=(None,)) for i in range(4)]
        model = Model(inputs, outputs=self.call(inputs))
        keras.utils.plot_model(model, 'model.png', show_shapes=True)
        model.summary()
        
    def call(self, inputs, training=None, mask=None):
        # TODO 对于不同的数据集需要对名称进行处理
#         feature_ebd = self.mask_ebd(self.l2(self.feature_ebd(inputs['title_d'])))
#         liked_ebd = self.mask_ebd(self.l2(self.feature_ebd(inputs['like'])))
#         disliked_ebd = self.mask_ebd(self.l2(self.feature_ebd(inputs['dislike'])))
#         genre_ebd = self.mask_ebd(self.l2(self.feature_ebd(inputs['all_genres'])))
        feature_ebd = self.mask_ebd(self.l2(self.feature_ebd(inputs[0])))
        liked_ebd = self.mask_ebd(self.l2(self.label_ebd(inputs[1])))
        disliked_ebd = self.mask_ebd(self.l2(self.label_ebd(inputs[2])))
        genre_ebd = self.mask_ebd(self.l2(self.label_ebd(inputs[3])))
        x = tf.concat([feature_ebd, liked_ebd, disliked_ebd, genre_ebd], axis=1)
#         x = self.bn(self.dense1(x))
        x = self.dense1(x)
#         x = self.bn(self.dense2(x))
        x = self.dense2(x)
        x = self.bn(self.dense3(x))
        return self.final(x)

In [None]:
feature_columns = [
    {'name': 'title_d', 'dtype': object},
    {'name': 'like', 'dtype': object},
    {'name': 'dislike', 'dtype': object},
    {'name': 'all_genres', 'dtype': object}
]
model = YoutubeDNNRecall(feature_columns, ratings_all['movie'].max()+2, 16)
model.summary()
optimizer = keras.optimizers.get('adam')
loss = 'sparse_categorical_crossentropy'
model.compile(loss=loss, optimizer=optimizer)

## 4. 开始训练

In [None]:
train_data = dataset[dataset.user <= 600]
test_data = dataset[dataset.user>600]
train_data

In [None]:
from keras.preprocessing.sequence import pad_sequences as ps

x = [ps(train_data['title_d']), ps(train_data['like']), ps(train_data['dislike']), ps(train_data['all_genres'])]
y = train_data['predict_labels'].values

model.fit(x, y, epochs=500)
test_x = [ps(test_data['title_d']), ps(test_data['like']), ps(test_data['dislike']), ps(test_data['all_genres'])]
preds = model.predict(test_x)
test_data['predicted_label'] = np.array([np.argmax(a) for a in preds])
test_data

In [None]:
tf.argsort(preds,direction='DESCENDING',axis=-1)

In [None]:
tf.nn.top_k(preds, k=20, sorted=True, name=None)