## PLE

In [5]:
import os
import joblib
import numpy as np
import pandas as pd
from collections import namedtuple

import tensorflow as tf
from tensorflow.keras.layers import *
from tensorflow.keras.models import *
import tensorflow.keras.backend as K
from tensorflow.python.keras.callbacks import EarlyStopping
from sklearn.preprocessing import  MinMaxScaler, LabelEncoder

from sklearn import metrics
from sklearn.metrics import log_loss
from sklearn.metrics import classification_report, roc_curve

##### 获取数据

In [6]:
def get_wechat_data():
    """ 读取wechat数据集 """
    train_path = '../data/wechat/train_df.pkl'
    test_path = '../data/wechat/test_df.pkl'
    encoder_dict_path = '../data/wechat/encoder_dict.pkl'
    train_df = pd.read_pickle(train_path)
    test_df = pd.read_pickle(test_path)
    encoder_dict = joblib.load(encoder_dict_path)
    return train_df, test_df, encoder_dict

train_df, test_df, encoder_dict = get_wechat_data()
train_df = train_df.sample(frac=1.0)
data = pd.concat([train_df, test_df], axis=0)
print('train_df.shape: {}, test_df.shape: {}'.format(train_df.shape, test_df.shape))

train_df.shape: (6708846, 12), test_df.shape: (609036, 12)


##### 模型构建

In [7]:
target = ['read_comment', 'like', 'click_avatar', 'forward']

# 稠密特征、稀疏特征、变长稀疏特征
dense_column_names = ['videoplayseconds']
sparse_column_names = ['userid', 'feedid', 'authorid', 'bgm_song_id', 'bgm_singer_id']
varlen_sparse_column_names = ['manual_keyword_list', 'manual_tag_list']

SparseFeature = namedtuple('SparseFeature', ['name', 'vocabulary_size', 'embedding_size'])
DenseFeature = namedtuple('DenseFeature', ['name', 'dimension'])
VarLenSparseFeature = namedtuple('VarLenSparseFeature', ['name', 'vocabulary_size', 'embedding_size', 'maxlen'])

varlen_sparse_column_maxlen_dict = {
    'manual_keyword_list': 18,
    'manual_tag_list': 11
}

feature_columns = [SparseFeature(f, vocabulary_size=data[f].nunique(), embedding_size=8) for f in sparse_column_names] + \
[DenseFeature(f, 1) for f in dense_column_names] + \
[VarLenSparseFeature(f, len(encoder_dict[f])+1, embedding_size=8, maxlen=varlen_sparse_column_maxlen_dict[f]) for f in varlen_sparse_column_names]
feature_columns

[SparseFeature(name='userid', vocabulary_size=17390, embedding_size=8),
 SparseFeature(name='feedid', vocabulary_size=16448, embedding_size=8),
 SparseFeature(name='authorid', vocabulary_size=6966, embedding_size=8),
 SparseFeature(name='bgm_song_id', vocabulary_size=5773, embedding_size=8),
 SparseFeature(name='bgm_singer_id', vocabulary_size=4573, embedding_size=8),
 DenseFeature(name='videoplayseconds', dimension=1),
 VarLenSparseFeature(name='manual_keyword_list', vocabulary_size=21576, embedding_size=8, maxlen=18),
 VarLenSparseFeature(name='manual_tag_list', vocabulary_size=349, embedding_size=8, maxlen=11)]

In [8]:
def model_metric(prob, label, thr=0.5):
    """ 模型评估 """
    # AUC
    fpr, tpr, threshold = metrics.roc_curve(label, prob)
    auc = metrics.auc(fpr, tpr)
    score = metrics.accuracy_score(label, prob > thr)
    # LogLoss
    logloss = log_loss(label, prob)
    print('模型准确率:{}, AUC得分:{}, LogLoss:{}'.format(score, round(auc, 4), logloss))
    print(classification_report(label, prob > thr, digits=2))
    print('==========================================================')

def build_input_layers(feature_columns):
    """ 构建输入层 """
    dense_input_dict, sparse_input_dict, varlen_sparse_input_dict = {}, {}, {}
    for f in feature_columns:
        if isinstance(f, DenseFeature):
            dense_input_dict[f.name] = Input(shape=(f.dimension, ), name=f.name)
        elif isinstance(f, SparseFeature):
            sparse_input_dict[f.name] = Input(shape=(1, ), name=f.name)
        elif isinstance(f, VarLenSparseFeature):
            varlen_sparse_input_dict[f.name] = Input(shape=(f.maxlen, ), name=f.name)
    return dense_input_dict, sparse_input_dict, varlen_sparse_input_dict

class MMoELayer(Layer):
    def __init__(self, expert_dim, expert_num, task_num):
        super(MMoELayer, self).__init__()
        self.task_num = task_num
        # expert，将拼接的向量（sparse embedding+dense feature）输入到多个relu为激活函数的全连接层，表示多个Expert网络。
        self.expert_layers = [Dense(expert_dim, activation='relu') for i in range(expert_num)]
        # gate，将拼接的向量输入到一个softmax层，输出expert_num个权重，表示Expert网络的权重。
        self.gate_layers = [Dense(expert_num, activation='softmax') for i in range(task_num)]
        
    def call(self, x):
        # 专家网络的输出张量
        expert_out = [expert_layer(x) for expert_layer in self.expert_layers] # [(None, expert_dim)...]
        # [(None, 1, expert_dim)] => (None, expert_num, expert_dim)
        expert_out = Concatenate(axis=1)([e[:, tf.newaxis, :] for e in expert_out])
        
        # 门控网络的输出权重
        gate_out = [gate_layer(x) for gate_layer in self.gate_layers] # [(None, expert_num)...]
        
        # 一个任务一个门网络，每个任务的门网络*所有专家网络，得到每个任务所需要的浅层网络信息
        tower_inputs = []
        for i in range(self.task_num):
            _gate_out = tf.expand_dims(gate_out, axis=-1) # (None, expert_num, 1)
            # (None, expert_dim, expert_num) * (None, expert_num, 1) => (None, expert_dim, 1)
            _tower_input = tf.matmul(expert_out, _gate_out, transpose_a=True) 
            tower_inputs.append(Flatten()(_tower_input)) # (None, expert_dim)
        return tower_inputs
    
class CGCLayer(Layer):
    def __init__(self, expert_dim, sp_expert_nums, share_expert_num, task_num):
        """
        :expert_dim 专家层维度
        :sp_expert_nums 每个任务使用的特定专家层数量
        :task_num 任务数量
        :share_expert_num 共享的专家层数量
        """
        super(CGCLayer, self).__init__()
        self.task_num = task_num
        
        # 每个任务生成expert_num个特定专家层
        self.sp_expert_layers = []
        for i in range(task_num):
            _sp_expert_layers = [Dense(expert_dim, activation='relu') for i in range(sp_expert_nums[i])]
            self.sp_expert_layers.append(_sp_expert_layers)
        
        # 共享专家层
        self.share_expert_layers = [Dense(expert_dim, activation='relu') for i in range(share_expert_num)]
        
        # 每个任务一个门层，门层输出维度=共享专家层数 + 特定专家层
        self.gate_layers = [Dense(share_expert_num+sp_expert_nums[i], activation='softmax') for i in range(task_num)]
        
    def call(self, x):
        # 专家网络的输出张量
        sp_expert_out = [[expert_layer(x) for expert_layer in _sp_expert_layers] for _sp_expert_layers in self.sp_expert_layers] # [[(None, expert_dim)...](任务一)]
        share_expert_out = [expert_layer(x) for expert_layer in self.share_expert_layers] 
        
        tower_inputs = []
        for i in range(self.task_num):
            gate_out = self.gate_layers[i](x)
            _gate_out = tf.expand_dims(gate_out, axis=-1)
            expert_out = share_expert_out + sp_expert_out[i]
            expert_out = Concatenate(axis=1)([e[:, tf.newaxis, :] for e in expert_out])
            _tower_input = tf.matmul(expert_out, _gate_out, transpose_a=True) 
            tower_inputs.append(Flatten()(_tower_input)) # (None, expert_dim)
        return tower_inputs
        
def concat_input_list(input_list):
    """ 合并input列表 """
    _num = len(input_list)
    if _num > 1:
        return Concatenate(axis=1)(input_list)
    elif len(input_list) == 1:
        return input_list[0]
    else:
        return None
        
def build_embedding_layers(feature_columns):
    """ 构建embedding层 """
    embedding_layer_dict = {}
    for f in feature_columns:
        if isinstance(f, SparseFeature):
            embedding_layer_dict[f.name] = Embedding(f.vocabulary_size+1, f.embedding_size, name='emb_' + f.name)
        elif isinstance(f, VarLenSparseFeature):
            embedding_layer_dict[f.name] = Embedding(f.vocabulary_size+1, f.embedding_size, name='var_emb_' + f.name, mask_zero=True)
    return embedding_layer_dict
        
def embedding_lookup(columns, input_dict, embedding_layer_dict, flatten=False):
    """ 根据feature_columns或column_names查表，得到对应embedding向量列表 """
    embedding_list = []
    for f in columns:
        if type(f) == str:
            column_name = f
        else:
            column_name = f.name
        _input = input_dict[column_name]
        _embed = embedding_layer_dict[column_name]
        embed_layer = _embed(_input)
        if flatten:
            embed_layer = Flatten()(embed_layer)
        embedding_list.append(embed_layer)
    return embedding_list
    
class MeanPoolingLayer(Layer):
    def __init__(self, axis, **kwargs):
        super(MeanPoolingLayer, self).__init__(**kwargs)
        self.axis = axis
        self.supports_masking = True
    
    def compute_mask(self, input, input_mask=None):
        # need not to pass the mask to next layers
        return None
    
    def call(self, x, mask=None):
        if x is not None:
            mask = K.cast(mask, K.floatx()) # (None, 18)
            mask = K.repeat(mask, x.shape[-1]) # (None, 4, 18)
            mask = tf.transpose(mask, [0, 2, 1]) # (None, 18, 4)
            x = x * mask # # (None, 18, 4) * (None, 18, 4)
            return K.sum(x, axis=self.axis) / K.sum(mask, axis=self.axis)
        else:
            return K.mean(x, axis=self.axis)
    
def PLE(feature_columns, target, dnn_hidden_units=[64, 64], expert_dim=32, sp_expert_nums=[4, 4, 4, 4], share_expert_num=2, task_num=4, is_cgc=False):
    
    dense_input_dict, sparse_input_dict, varlen_sparse_input_dict = build_input_layers(feature_columns)
    
    # Input
    input_list = list(dense_input_dict.values()) + list(sparse_input_dict.values()) + list(varlen_sparse_input_dict.values())
    
    # dense feature (input->concat)
    concat_dense_input_list = concat_input_list(list(dense_input_dict.values()))
    
    # sparse feature (input->embed->concat)
    embedding_layer_dict = build_embedding_layers(feature_columns)
    sparse_feature_columns = list(filter(lambda x: isinstance(x, SparseFeature), feature_columns))
    flatten_sparse_embed_list = embedding_lookup(sparse_feature_columns, sparse_input_dict, embedding_layer_dict, flatten=True)
    concat_flatten_sparse_embed_list = concat_input_list(flatten_sparse_embed_list)
    
    # seq embeddings (input->embed->pooling->concat)
    varlen_sparse_feature_columns = list(filter(lambda x: isinstance(x, VarLenSparseFeature), feature_columns))
    varlen_sparse_embed_list = []
    for f in varlen_sparse_feature_columns:
        _input = varlen_sparse_input_dict[f.name] #  (None, 18)
        _embed = embedding_layer_dict[f.name]
        embed_layer = _embed(_input) # (None, 18, 4)
        mask = Masking()(embed_layer) # (None, 18, 4)
        mean_pooling_embed = MeanPoolingLayer(axis=1)(mask) # (None, 4)
        varlen_sparse_embed_list.append(mean_pooling_embed)
    concat_varlen_sparse_embed_list = concat_input_list(varlen_sparse_embed_list)
    
    # concat dense feature + concat sparse embeddings + concat seq embeddings
    dnn_input = Concatenate(axis=1)([concat_dense_input_list, concat_flatten_sparse_embed_list, concat_varlen_sparse_embed_list]) 
    
    # DNN
    for dnn in dnn_hidden_units:
        dnn_input = Dropout(0.1)(Dense(dnn, activation='relu')(dnn_input))
    
    # MMoE网络层
    if is_cgc:
        tower_inputs = CGCLayer(expert_dim, sp_expert_nums, share_expert_num, task_num)(dnn_input)
    else:
        tower_inputs = CGCLayer(expert_dim, sp_expert_nums, share_expert_num, task_num)(dnn_input)
        concat_tower_inputs = Concatenate(axis=1)(tower_inputs)
        print('concat_tower_inputs.shape: ', concat_tower_inputs.shape)
        # 2层CGC网络
        tower_inputs = CGCLayer(expert_dim, sp_expert_nums, share_expert_num, task_num)(concat_tower_inputs)
    
    # 通过各个任务对应的门网络的加权求和，每个塔都得到自己任务所需要的专家信息。
    outputs = [Dense(1, activation='sigmoid', name=target_name)(tower_input) for tower_input, target_name in zip(tower_inputs, target)]
    model = Model(input_list, outputs)
    return model

model = PLE(feature_columns,
             target, 
             dnn_hidden_units=[64, 64],
             expert_dim=32, 
             sp_expert_nums=[4, 4, 4, 4],
             share_expert_num=2,
             task_num=4,
             is_cgc=True)
model.summary()

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
manual_keyword_list (InputLayer [(None, 18)]         0                                            
__________________________________________________________________________________________________
manual_tag_list (InputLayer)    [(None, 11)]         0                                            
__________________________________________________________________________________________________
userid (InputLayer)             [(None, 1)]          0                                            
__________________________________________________________________________________________________
feedid (InputLayer)             [(None, 1)]          0                                            
______________________________________________________________________________________________

In [9]:
# 模型训练
train_input = {f: np.array([row for row in train_df[f]]) for f in dense_column_names + sparse_column_names + varlen_sparse_column_names}
test_input = {f: np.array([row for row in test_df[f]]) for f in dense_column_names + sparse_column_names + varlen_sparse_column_names}

my_callbacks = [
    EarlyStopping(monitor='val_loss', patience=10, verbose=2, mode='auto')
]

loss = tf.keras.losses.binary_crossentropy
model.compile('adam',
              loss={'read_comment': loss, 'like': loss, 'click_avatar': loss, 'forward': loss},
              metrics=tf.keras.metrics.AUC(name='auc')) # ["binary_crossentropy", tf.keras.metrics.AUC(name='auc')]

# 多个任务
y_list = [train_df[i].values for i in target]
model.fit(x=train_input,
          y=y_list,
          batch_size=1024,
          epochs=100,
          validation_split=0.2,
          callbacks=my_callbacks)

Epoch 1/100
Epoch 2/100
Epoch 3/100
Epoch 4/100
Epoch 5/100
Epoch 6/100
Epoch 7/100
Epoch 8/100
Epoch 9/100
Epoch 10/100
Epoch 11/100
Epoch 12/100
Epoch 00012: early stopping


<keras.callbacks.History at 0x2002a14c438>

In [10]:
# 模型预测与评估
result = model.predict(test_input)

for idx, target_name in enumerate(target):
    print(idx, target_name)
    model_metric(np.array([i[0] for i in result[idx]]), test_df[target_name].values)

0 read_comment
模型准确率:0.9667277467998607, AUC得分:0.9118, LogLoss:0.10097670342414897
              precision    recall  f1-score   support

           0       0.97      1.00      0.98    588439
           1       0.53      0.15      0.24     20597

    accuracy                           0.97    609036
   macro avg       0.75      0.57      0.61    609036
weighted avg       0.96      0.97      0.96    609036

1 like
模型准确率:0.9759176797430694, AUC得分:0.8084, LogLoss:0.09929165239177191
              precision    recall  f1-score   support

           0       0.98      1.00      0.99    594422
           1       0.49      0.09      0.15     14614

    accuracy                           0.98    609036
   macro avg       0.73      0.54      0.57    609036
weighted avg       0.97      0.98      0.97    609036

2 click_avatar
模型准确率:0.9925406708306241, AUC得分:0.8092, LogLoss:0.04046633934532532
              precision    recall  f1-score   support

           0       0.99      1.00      1.00    604

##### 不同行为加权gAUC分数

In [11]:
from scipy.stats import rankdata
from collections import defaultdict

def fast_auc(actual, predicted):
    # https://www.kaggle.com/c/riiid-test-answer-prediction/discussion/208031    
    pred_ranks = rankdata(predicted)
    n_pos = np.sum(actual)
    n_neg = len(actual) - n_pos
    return (np.sum(pred_ranks[actual == 1]) - n_pos*(n_pos+1)/2) / (n_pos*n_neg)

def uAUC(labels, preds, users):
    """ 计算uAUC """
    label_dict, pred_dict, user_flag_dict = defaultdict(lambda: []), defaultdict(lambda: []), defaultdict(lambda: False)
    for idx, label in enumerate(labels):
        user = users[idx]
        pred = preds[idx]
        label = labels[idx]
        label_dict[user].append(label)
        pred_dict[user].append(pred)
    
    # 当前用户是否全为正/负样本
    for user in set(users):
        _labels = label_dict[user]
        flag = False
        for i in range(len(_labels)-1):
            if _labels[i] != _labels[i+1]:
                flag = True
                break
        user_flag_dict[user] = flag
    
    auc_sum = 0.0
    auc_cnt = 0.0
    for user in user_flag_dict:
        if user_flag_dict[user]:
            auc = fast_auc(np.asarray(label_dict[user]), np.asarray(pred_dict[user]))
            auc_sum += auc
            auc_cnt += 1.0
    return auc_sum * 1.0 / auc_cnt

def score(result_df, action_list):
    """ 计算多个行为的加权gAUC分数 """
    weight_dict = {
        "read_comment": 4.0,  # 是否查看评论
        "like": 3.0,  # 是否点赞
        "click_avatar": 2.0,  # 是否点击头像
        "forward": 1.0,  # 是否转发
        "favorite": 1.0,  # 是否收藏
        "comment": 1.0,  # 是否发表评论
        "follow": 1.0  # 是否关注
    }
    
    score = 0.0
    score_dict = {}
    weight_sum = 0.0
    for action in action_list:
        print('action: ', action)
        labels = result_df[action].values
        preds = result_df['p'+action].values
        users = result_df['userid'].values
        weight = weight_dict[action]
        gauc = uAUC(labels, preds, users)
        score_dict[action] = gauc
        score += weight * gauc
        weight_sum += weight
    
    score /= weight_sum
    score = round(score, 4)
    return {
        'score': score,
        'score_detail': score_dict
    }

result_df = test_df[['userid', 'feedid'] + target]
for idx, target_name in enumerate(target):
    result_df['p'+target_name] = [i[0] for i in result[idx]]

score(result_df, target)

A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy


action:  read_comment
action:  like
action:  click_avatar
action:  forward


{'score': 0.6176,
 'score_detail': {'read_comment': 0.6043690226531183,
  'like': 0.5842259667486817,
  'click_avatar': 0.6747976432374228,
  'forward': 0.6557983203994906}}