In [1]:
# ! pip install deepctr==0.8.5 --no-deps
# ! pip install torch==1.7.0 torchvision==0.8.1 
# ! pip install tensorflow-gpu==1.13.1
# ! pip install numba

In [2]:
import sys
sys.path.append('..')
sys.path.append('../../config/')
from config_prosper import *
import os
import gc
import pandas as pd
import numpy as np
import tensorflow as tf

from time import time
from deepctr.feature_column import SparseFeat, DenseFeat, get_feature_names,VarLenSparseFeat
from deepctr.models.deepfm import DeepFM
from mytools.utils.myfile import savePkl,loadPkl
from evaluation import evaluate_deepctr,evaluate_deepctr_single
from tensorflow.python.keras.utils import multi_gpu_model
from tqdm import tqdm as tqdm
import warnings
warnings.filterwarnings('ignore')


BASE_DIR(目录): /home/tione/notebook


In [3]:
# GPU相关设置
os.environ["CUDA_VISIBLE_DEVICES"] = "1"
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"
# 设置GPU按需增长
config = tf.ConfigProto()
config.gpu_options.allow_growth = True
sess = tf.Session(config=config)

In [4]:
def loadFeedinfo():
    feed = loadPkl(FEED_INFO_DEAL)
    feed[["bgm_song_id", "bgm_singer_id"]] += 1  # 0 用于填未知
    feed[["bgm_song_id", "bgm_singer_id", "videoplayseconds"]] = \
        feed[["bgm_song_id", "bgm_singer_id", "videoplayseconds"]].fillna(0)
    feed['bgm_song_id'] = feed['bgm_song_id'].astype('int64')
    feed['bgm_singer_id'] = feed['bgm_singer_id'].astype('int64')
    print('feedinfo loading over...')
    return feed

def getFeedembeddings(df):
    #feedembeddings 降维

    feed_embedding_path = os.path.join(FEATURE_PATH,'feedembedings.pkl')
    feed_embeddings = loadPkl(feed_embedding_path)
    df = df.merge(feed_embeddings,on='feedid',how='left')
    dense = [x for x in list(feed_embeddings.columns) if x != 'feedid' ]
    
    return df,dense

def getSvdembeddings(df):
    dense = []
    #userid-feedid svd
    svd_embedding = loadPkl(os.path.join(FEATURE_PATH,'svd_userid_feedid_embedding.pkl'))
    df = df.merge(svd_embedding,on = ['userid'],how='left')
    dense += [x for x in list(svd_embedding.columns) if x not in ['userid']]
                            
    #userid_authorid svd
    svd_embedding = loadPkl(os.path.join(FEATURE_PATH,'svd_userid_authorid_embedding.pkl'))
    df  = df.merge(svd_embedding,on = ['userid'],how='left')
    dense += [x for x in list(svd_embedding.columns) if x not in ['userid']]
    
    #text svd
    svd_embedding = loadPkl(os.path.join(FEATURE_PATH,'texts_svd_embedding.pkl'))
    svd_embedding['feedid']  = svd_embedding['feedid'].astype(np.int32) 
    df  = df.merge(svd_embedding,on = ['feedid'],how='left')
    dense += [x for x in list(svd_embedding.columns) if x not in ['feedid']]
    
    return df, dense
def myLeftjoin(left,right,on):
    return left.merge(right[right[on].isin(left[on])].set_index(on),how='left',left_on=on,right_index=True)
def getHistFeatures(df,hist_features):
    dense = [x for x in hist_features.columns if x not in df.columns and  'hist_seq' not in x ]
    varlen = [x for x in hist_features.columns if 'hist_seq' in x]
    df = df.merge(hist_features[hist_features.userid.isin(df.userid.unique())][['userid','feedid','date_','device'] + dense],how = 'left',on = ['userid','feedid','date_','device'])
    return (df,dense)

In [5]:
class myDataGenerator(tf.keras.utils.Sequence):
    def __init__(self, data: pd.DataFrame,feedinfo,dnn_feature_columns,task,batch_size=2048, shuffle=True):
        self.data = data.copy()
        self.target = ACTION_LIST
        self.batch_size = batch_size
        self.shuffle = shuffle
        self.indexes = np.arange(self.data.shape[0])
        self.task = task
        
        self.feedinfo = feedinfo
#         self.feed_embeddings = loadPkl(os.path.join(FEATURE_PATH,'feedembedings.pkl'))
#         self.user_feed_svd_embedding = loadPkl(os.path.join(FEATURE_PATH,'svd_userid_feedid_embedding.pkl'))
#         self.user_author_svd_embedding = loadPkl(os.path.join(FEATURE_PATH,'svd_userid_authorid_embedding.pkl'))
#         self.text_svd_embedding = loadPkl(os.path.join(FEATURE_PATH,'texts_svd_embedding.pkl'))
#         self.text_svd_embedding['feedid'] = self.text_svd_embedding['feedid'].astype(int)



        self.graph_emb8 = loadPkl(os.path.join(MODEL_PATH,'emb/graph_walk_emb_8.pkl'))
        self.feed_emb_16 = loadPkl(os.path.join(MODEL_PATH,'emb/feed_embeddings_16.pkl'))
        self.weight_emb8 = loadPkl(os.path.join(MODEL_PATH,'emb/user_weight_emd_8.pkl'))
        self.weight_emb8 = self.weight_emb8.drop('user_date_weight_emd',axis = 1)
        self.keyword_w2v_8 = loadPkl(os.path.join(MODEL_PATH,'emb/keyword_w2v_8.pkl'))
        self.userid_feedid_d2v_all_16 = loadPkl(os.path.join(MODEL_PATH,'emb/userid_feedid_d2v_all_16.pkl'))##加了初赛数据
        self.all_text_data_v8 = loadPkl(os.path.join(MODEL_PATH,'emb/all_text_data_v8.pkl'))
        self.userid_authorid_d2v_all_16 = loadPkl(os.path.join(MODEL_PATH,'emb/userid_authorid_d2v_all_16.pkl'))
        
        self.dnn_feature_columns = dnn_feature_columns
        self.feature_names = get_feature_names(self.dnn_feature_columns)
        
        if self.shuffle:
            print('shuffle data index ing...')
            np.random.shuffle(self.indexes)

    def __len__(self):

        return (self.data.shape[0] // self.batch_size) + 1

    def __getitem__(self, index):
        batch_indexs = self.indexes[index * self.batch_size:(index + 1) *
                                    self.batch_size]
        batch_data = self.data.iloc[batch_indexs, :]
        
        return self.get_feature_on_batch(batch_data)

    def on_epoch_end(self):
        if self.shuffle:
            print('shuffle data index ing...')
            np.random.shuffle(self.indexes)
    def on_epoch_begain(self):
        if self.shuffle:
            print('shuffle data index ing...')
            np.random.shuffle(self.indexes)

    def get_feature_on_batch(self, batch):
        
#         batch = batch.merge(self.user_feed_svd_embedding,on='userid',how='left')
#         batch = batch.merge(self.user_author_svd_embedding,on='userid',how='left')
#         batch = batch.merge(self.text_svd_embedding,on='feedid',how='left')
#         batch = batch.merge(self.feed_embeddings,on='feedid',how='left')
                
        batch = batch.merge(self.graph_emb8, how='left',
              on='userid')
        batch = batch.merge(self.feed_emb_16, how='left',
                      on='feedid')
        batch = batch.merge(self.weight_emb8, how='left',
                      on='userid')
        batch = batch.merge(self.keyword_w2v_8, how='left',
                      on='feedid')
        batch = batch.merge(self.userid_feedid_d2v_all_16, how='left',
                      on='userid')
        batch = batch.merge(self.all_text_data_v8, how='left',
                      on='feedid')
        batch = batch.merge(self.userid_authorid_d2v_all_16, how='left',
                      on='userid')
                                                      
        x = {name: batch[name].values for name in self.feature_names}
        for col in ['manual_tag_list','manual_keyword_list','machine_keyword_list']:
            x[col] = np.array(batch[col].tolist())
        y = batch[self.task].values
        
        return x,y

In [6]:
DEBUG = False
data = loadPkl(USER_ACTION)
data = data.head(1000000) if DEBUG else data
feedinfo = loadFeedinfo()
# feed_embeddings = loadPkl(os.path.join(FEATURE_PATH,'feedembedings.pkl'))
# user_feed_svd_embedding = loadPkl(os.path.join(FEATURE_PATH,'svd_userid_feedid_embedding.pkl'))
# user_author_svd_embedding = loadPkl(os.path.join(FEATURE_PATH,'svd_userid_authorid_embedding.pkl'))
# text_svd_embedding = loadPkl(os.path.join(FEATURE_PATH,'texts_svd_embedding.pkl'))

graph_emb8 = loadPkl(os.path.join(MODEL_PATH,'emb/graph_walk_emb_8.pkl'))
feed_emb_16 = loadPkl(os.path.join(MODEL_PATH,'emb/feed_embeddings_16.pkl'))
weight_emb8 = loadPkl(os.path.join(MODEL_PATH,'emb/user_weight_emd_8.pkl'))
weight_emb8 = weight_emb8.drop('user_date_weight_emd',axis = 1)
keyword_w2v_8 = loadPkl(os.path.join(MODEL_PATH,'emb/keyword_w2v_8.pkl'))
userid_feedid_d2v_all_16 = loadPkl(os.path.join(MODEL_PATH,'emb/userid_feedid_d2v_all_16.pkl'))##加了初赛数据
all_text_data_v8 = loadPkl(os.path.join(MODEL_PATH,'emb/all_text_data_v8.pkl'))
userid_authorid_d2v_all_16 = loadPkl(os.path.join(MODEL_PATH,'emb/userid_authorid_d2v_all_16.pkl'))


embedding_dim = 16
sparse_features = ['userid', 'feedid', 'authorid', 'bgm_song_id', 'bgm_singer_id' ]
dense_features = ['videoplayseconds',]
for df in [
    graph_emb8,
    feed_emb_16,
    weight_emb8,
    keyword_w2v_8,
    userid_feedid_d2v_all_16,
    all_text_data_v8,
    userid_authorid_d2v_all_16
]:
    dense_features += [x for x in df.columns if x not in ['userid','feedid']]
    del df
    gc.collect()
    
    
data = data.merge(feedinfo[[
    'feedid', 'authorid', 'videoplayseconds', 'bgm_song_id',
    'bgm_singer_id'
] + ['manual_tag_list', 'manual_keyword_list', 'machine_keyword_list'
     ]],
                    how='left',
                    on='feedid')

#dense 特征处理
data['videoplayseconds'] = data['videoplayseconds'].fillna(0, )
data['videoplayseconds'] = np.log(data['videoplayseconds'] + 1.0)
train = data[data.date_ != 14]
val = data[data.date_==14]

feedinfo loading over...


In [7]:
fixlen_feature_columns = [
    SparseFeat(feat,
               vocabulary_size = feedinfo[feat].max() + 1,
               embedding_dim=embedding_dim) for feat in sparse_features if feat !='userid'
] + [DenseFeat(feat, 1) for feat in dense_features
] + [SparseFeat('userid',
               vocabulary_size= data['userid'].max() + 1,
               embedding_dim=embedding_dim)]
tag_columns = [
    VarLenSparseFeat(SparseFeat('manual_tag_list',
                                vocabulary_size=TAG_MAX,
                                embedding_dim=16),
                     maxlen=4)
]
key_words_columns = [
    VarLenSparseFeat(SparseFeat('manual_keyword_list',
                                vocabulary_size=KEY_WORDS_MAX,
                                embedding_dim=16),
                     maxlen=4),
    VarLenSparseFeat(SparseFeat('machine_keyword_list',
                                vocabulary_size=KEY_WORDS_MAX,
                                embedding_dim=16),
                     maxlen=4),
]
dnn_feature_columns =  fixlen_feature_columns + tag_columns + key_words_columns

In [8]:
train_loader = myDataGenerator(data,feedinfo,dnn_feature_columns,batch_size=4096,task = 'read_comment')
val_loader = myDataGenerator(val,feedinfo,dnn_feature_columns,batch_size=4096*20,shuffle = False,task = 'read_comment')

shuffle data index ing...


In [None]:
num_tasks = len(ACTION_LIST)
for i in range(num_tasks):
    task = ACTION_LIST[i]
    print('🐟🐟🐟🐟🐟🐟🐟🐟🐟🐟🐟🐟🐟🐟🐟🐟🐟🐟🐟🐟🐟🐟🐟开始跑任务%s...🐟🐟🐟🐟🐟🐟🐟🐟🐟🐟🐟🐟🐟🐟🐟🐟' % (task))
    train_loader.task = task
    val_loader.task = task
    train_model = DeepFM(dnn_feature_columns, 
                         dnn_feature_columns,
                         dnn_hidden_units = (256,256),
                         l2_reg_embedding = 1e-1,
                         dnn_use_bn = False,
                      )
#     train_model = multi_gpu_model(train_model, gpus=2)
    optimizer = tf.keras.optimizers.Adagrad(
        lr=0.05, epsilon=1e-07,
    )
    train_model.compile('adagrad', loss='binary_crossentropy')
    
    best_score = -1
    early_stop = 1
    no_imporove = 0
    
    for epoch in range(7):
        print('开始训练%s任务_%s轮...' % (task,epoch))
        history = train_model.fit(train_loader,
                                  epochs=1, verbose=1,workers = 8,use_multiprocessing=True,max_queue_size=200)
        pred_ans = train_model.predict_generator(val_loader)
        weightauc = evaluate_deepctr_single(val_loader.data[task].values,pred_ans.reshape(-1),val_loader.data['userid'].values)
        print(weightauc)
        if best_score < weightauc:
            best_score = weightauc
            train_model.save_weights(os.path.join(MODEL_PATH,'tf_models/DEEPFM/offline_%s' % (task)))
            no_imporove = 0    
        else :
            no_imporove += 1
        if no_imporove >= early_stop:
            print('-----stoped on epoch %s ------- ' % (epoch))
            break
    

🐟🐟🐟🐟🐟🐟🐟🐟🐟🐟🐟🐟🐟🐟🐟🐟🐟🐟🐟🐟🐟🐟🐟开始跑任务read_comment...🐟🐟🐟🐟🐟🐟🐟🐟🐟🐟🐟🐟🐟🐟🐟🐟
Instructions for updating:
Colocations handled automatically by placer.
Instructions for updating:
keep_dims is deprecated, use keepdims instead
Instructions for updating:
Deprecated in favor of operator or tf.math.divide.
Instructions for updating:
Please use `rate` instead of `keep_prob`. Rate should be set to `rate = 1 - keep_prob`.
开始训练read_comment任务_0轮...
Instructions for updating:
Use tf.cast instead.
  715/19652 [>.............................] - ETA: 19:38 - loss: 0.2332Please check the latest version manually on https://pypi.org/project/deepctr/#history
0.619746913943889

Consider using a TensorFlow optimizer from `tf.train`.
Instructions for updating:
Use tf.train.CheckpointManager to manage checkpoints rather than manually editing the Checkpoint proto.
开始训练read_comment任务_1轮...
   55/19652 [..............................] - ETA: 31:44 - loss: 0.1476

Process ForkPoolWorker-9:
Process ForkPoolWorker-12:
Process ForkPoolWorker-10:
Process ForkPoolWorker-14:
Process ForkPoolWorker-16:
Traceback (most recent call last):
Process ForkPoolWorker-11:
  File "/opt/conda/envs/tensorflow_py3/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/opt/conda/envs/tensorflow_py3/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
Traceback (most recent call last):
  File "/opt/conda/envs/tensorflow_py3/lib/python3.6/multiprocessing/pool.py", line 119, in worker
    result = (True, func(*args, **kwds))
  File "/opt/conda/envs/tensorflow_py3/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
Traceback (most recent call last):
Traceback (most recent call last):
  File "/opt/conda/envs/tensorflow_py3/lib/python3.6/site-packages/tensorflow/python/keras/utils/data_utils.py", line 445, in get_index
    return _SHARED_SEQUENCES[uid][i]


In [None]:
--

In [None]:
train_model.load_weights(os.path.join(MODEL_PATH,'tf_models/MMOE_offline2'))

# online

In [None]:
data_loader = myDataGenerator(data,feedinfo,dnn_feature_columns,batch_size=4096)
for epoch in range(1):
    history = train_model.fit(data_loader,
                              epochs=1, verbose=1,workers = 8,use_multiprocessing=True,max_queue_size=100)
    pred_ans = train_model.predict_generator(val_loader)
    pred_ans = np.concatenate(pred_ans,1)
    pred_ans = pd.DataFrame(pred_ans,columns=ACTION_LIST)
    weightauc,uaucs = evaluate_deepctr(val_loader.data[ACTION_LIST],pred_ans,val_loader.data['userid'].values,ACTION_LIST)
train_model.save_weights(os.path.join(MODEL_PATH,'tf_models/MMOE_online'))

In [None]:
test = pd.read_csv('../../data/wedata/wechat_algo_data2/test_a.csv')
test = test.merge(feedinfo[['feedid', 'authorid', 'videoplayseconds', 'bgm_song_id', 'bgm_singer_id']+ ['manual_tag_list','manual_keyword_list','machine_keyword_list']], how='left',on='feedid')
test['videoplayseconds'] = test['videoplayseconds'].fillna(0, )
test['videoplayseconds'] = np.log(test['videoplayseconds'] + 1.0)
test[ACTION_LIST] = 0
t1 = time()
test_loader = myDataGenerator(test,feedinfo,dnn_feature_columns,shuffle=False,batch_size=4096*20)
pred_ans = train_model.predict(test_loader)
t2 = time()
print('7个目标行为%d条样本预测耗时（毫秒）：%.3f' % (len(test), (t2 - t1) * 1000.0))
ts = (t2 - t1) * 1000.0 / len(test) * 2000.0
print('7个目标行为2000条样本平均预测耗时（毫秒）：%.3f' % ts)

# 5.生成提交文件
for i, action in enumerate(ACTION_LIST):
    test[action] = pred_ans[i]
test[['userid', 'feedid'] + ACTION_LIST].to_csv(os.path.join(SUMIT_DIR,'tf_mmoe_base5.csv'), index=None, float_format='%.6f')
print('to_csv ok')

In [None]:
test[['userid', 'feedid'] + ACTION_LIST]