In [1]:
import sys
import tensorflow as tf
from tensorflow.keras import layers
from base.options import FLAGS,Options
from base.features import FeatureConf
from base.hook import CheckpointSaverHook

if tf.__version__.split(".",1)[0] == "1":
    from tensorflow.keras import layers
    tf.data.AUTOTUNE=tf.data.experimental.AUTOTUNE
else:
    print(tf.__version__.split(".",1)[0])
    from tensorflow.keras import layers
    tf.truncated_normal_initializer=tf.compat.v1.truncated_normal_initializer
    tf.feature_column.shared_embedding_columns=tf.feature_column.shared_embeddings
    tf.feature_column.input_layer=tf.compat.v1.feature_column.input_layer
    tf.losses.log_loss=tf.compat.v1.losses.log_loss
    tf.metrics.auc=tf.compat.v1.losses.log_loss
    tf.train.get_global_step=tf.compat.v1.train.get_global_step


OMP: Info #155: KMP_AFFINITY: Initial OS proc set respected: 0-11
OMP: Info #216: KMP_AFFINITY: decoding x2APIC ids.
OMP: Info #157: KMP_AFFINITY: 12 available OS procs
OMP: Info #158: KMP_AFFINITY: Uniform topology
OMP: Info #287: KMP_AFFINITY: topology layer "LL cache" is equivalent to "socket".
OMP: Info #192: KMP_AFFINITY: 1 socket x 6 cores/socket x 2 threads/core (6 total cores)
OMP: Info #218: KMP_AFFINITY: OS proc to physical thread map:
OMP: Info #172: KMP_AFFINITY: OS proc 0 maps to socket 0 core 0 thread 0 
OMP: Info #172: KMP_AFFINITY: OS proc 1 maps to socket 0 core 0 thread 1 
OMP: Info #172: KMP_AFFINITY: OS proc 2 maps to socket 0 core 1 thread 0 
OMP: Info #172: KMP_AFFINITY: OS proc 3 maps to socket 0 core 1 thread 1 
OMP: Info #172: KMP_AFFINITY: OS proc 4 maps to socket 0 core 2 thread 0 
OMP: Info #172: KMP_AFFINITY: OS proc 5 maps to socket 0 core 2 thread 1 
OMP: Info #172: KMP_AFFINITY: OS proc 6 maps to socket 0 core 4 thread 0 
OMP: Info #172: KMP_AFFINITY: OS





In [2]:
from layers.mmoe import MMoE
from layers.ppnet import PPNetBlock
from layers.base import MLP,SELayer,FMCrossLayer

In [19]:
class DeepModel():
    def __init__(self, opt):
        self.opt=opt
        feat_obj=FeatureConf(opt.feature_config_path,opt.feature_type_path,opt.KV_map_path,opt.field_emb_config_path)
        self.feat_columns=feat_obj.feat_columns
        self.feat_input=feat_obj.feat_input
        self.feat_schema=feat_obj.feat_schema
        self.feat_default=feat_obj.feat_default
        
        self.params={
            'lr': self.opt.lr,
            'min_lr': self.opt.min_lr,
            'optimizer': self.opt.optimizer,
            'max_train_step': self.opt.max_train_step,
            'num_cross_layers': 4
        }

    def _build_optimizer(self, params, distribute=True):
        return tf.train.AdamOptimizer(learning_rate=1e-4, beta1=0.9, beta2=0.999, epsilon=1e-8)

        optim_type = params['optimizer'].lower()
        max_train_step = params['max_train_step']

        cur_step = tf.cast(tf.train.get_global_step(), tf.float32)
        init_lr = params['lr']
        min_lr = params['min_lr']
        
        cur_lr = init_lr - cur_step * (init_lr - min_lr) / (max_train_step - 1)

        if optim_type == 'adam':
            optimizer = tf.train.AdamOptimizer(learning_rate=cur_lr, beta1=0.9, beta2=0.999, epsilon=1e-8)
        elif optim_type == 'adagrad':
            optimizer = tf.train.AdagradOptimizer(learning_rate=cur_lr, initial_accumulator_value=1e-8)
        elif optim_type == 'momentum':
            optimizer = tf.train.MomentumOptimizer(learning_rate=cur_lr, momentum=0.95)
        elif optim_type == 'ftrl':
            optimizer = tf.train.FtrlOptimizer(cur_lr)
        else:
            optimizer = tf.train.GradientDescentOptimizer(cur_lr)

        # SyncReplicasOptimizer for distribution
        if distribute:
            optimizer = tf.train.SyncReplicasOptimizer(optimizer,
                                                       replicas_to_aggregate=self.opt.worker_num,
                                                       total_num_replicas=self.opt.worker_num,
                                                       use_locking=False, sparse_accumulator_type="multi_map")

        return optimizer

    def model_fn(self,features, labels, mode, params):
        ### input
        user_sparse_input = tf.feature_column.input_layer(features, self.feat_columns["user_sparse_column_list"])
        item_sparse_input = tf.feature_column.input_layer(features, self.feat_columns["item_sparse_column_list"])
        query_sparse_input = tf.feature_column.input_layer(features, self.feat_columns["query_sparse_column_list"])
        bias_sparse_input = tf.feature_column.input_layer(features, self.feat_columns["bias_sparse_column_list"])
        allemb=tf.concat([user_sparse_input,item_sparse_input,query_sparse_input],axis=1)
        
        sparse_num=len(self.feat_columns["user_sparse_column_list"])+len(self.feat_columns["item_sparse_column_list"])+len(self.feat_columns["query_sparse_column_list"])

    
        allemb=SELayer([16])(tf.reshape(allemb,(-1,sparse_num,16)))
        allemb=tf.reshape(allemb,(-1,16*sparse_num))
        
        # #ctr_emb,cvr_emb=MMoE(256,4,2)(allemb)
        # ctr_fm=FMCrossLayer()(tf.reshape(allemb,(-1,sparse_num,16)))
        # cvr_fm=FMCrossLayer()(tf.reshape(allemb,(-1,sparse_num,16)))

        ctr_emb,cvr_emb=MMoE(4,[256],2,[128])(allemb)
        # ctr_emb,cvr_emb=PLELayer(1,4,[256],2,[128],1)(allemb)
        ## model
        ctr=MLP(1,[256,128,64])(ctr_emb)
        #ctr=ctr+ctr_fm
        cvr=MLP(1,[256,128,64])(ctr_emb)
        #cvr=cvr+cvr_fm
        cvr_pred = tf.reshape(tf.sigmoid(cvr, name="cvr"),[-1])
        
        if mode == tf.estimator.ModeKeys.PREDICT:
            ctr_pred = tf.reshape(tf.sigmoid(ctr, name="ctr"),[-1])

            predictions = {
                'ctr_pred': ctr_pred,
                'cvr_pred': cvr_pred,
            }
            export_outputs = {'serving_default': tf.estimator.export.PredictOutput(predictions)}
            return tf.estimator.EstimatorSpec(mode, predictions=predictions, export_outputs=export_outputs)
        
        bias_ctr=MLP(1,[8,8])(bias_sparse_input)
        bias_ctr=layers.Dropout(0.8)(bias_ctr)
        # bias_ctr=tf.layers.dropout(bias_ctr, rate=0.8, training=True)
        ctr_pred = tf.reshape(tf.sigmoid(ctr+bias_ctr, name="ctr"),[-1])

        ground_truth_ctr = tf.reshape(labels["ctr"],[-1])
        ground_truth_cvr = tf.reshape(labels["cvr"],[-1])
        loss1 = tf.reduce_mean(
            tf.losses.log_loss(labels=ground_truth_ctr, predictions=ctr_pred))
        ## 只使用点击样本
        loss2 = tf.reduce_mean(
            tf.losses.log_loss(labels=ground_truth_cvr, predictions=cvr_pred,
                               weights=ground_truth_ctr+ground_truth_cvr*20))
        # loss2 = tf.reduce_sum(
        #     tf.losses.log_loss(labels=ground_truth_cvr, predictions=cvr_pred,
        #                        weights=ground_truth_ctr))/tf.reduce_sum(ground_truth_ctr)
        loss = loss1 + loss2
        tf.add_to_collection('loss',loss)
        tf.add_to_collection('loss1',loss1)
        tf.add_to_collection('loss2',loss2)
        
        # eval
        auc_ctr = tf.metrics.auc(labels=ground_truth_ctr, predictions=ctr_pred)
        auc_cvr = tf.metrics.auc(labels=ground_truth_cvr, predictions=cvr_pred)

        # tf.print("loss_ctr: ", loss1,"  loss_cvr:", loss2)
        # print("auc_ctr", auc_ctr[1],"auc_cvr", auc_cvr[1])
        metrics = {'auc_ctr': auc_ctr,"auc_cvr":auc_cvr}

        if mode == tf.estimator.ModeKeys.EVAL:
            return tf.estimator.EstimatorSpec(mode, loss=loss, eval_metric_ops=metrics)

        # train
        optimizer = self._build_optimizer(params, distribute=self.opt.distribute)
        
        training_hooks=[]
        training_chief_hooks=[]
        
        saver_hook = CheckpointSaverHook(checkpoint_dir=self.opt.output_dir,
                                                       save_steps=self.opt.save_checkpoint_and_eval_step)
        pos_cvr=tf.reduce_sum(ground_truth_cvr)
        pos_ctr=tf.reduce_sum(ground_truth_ctr)
        logging_hook = tf.train.LoggingTensorHook({"loss":loss, "loss1": loss1, "loss2": loss2,"auc_ctr":auc_ctr[1],"auc_cvr":auc_cvr[1],"pos_ctr":pos_ctr,"pos_cvr":pos_cvr}, every_n_iter=100)
        if self.opt.distribute:
            print("hook_sync_replicas is set")
            self.hook_sync_replicas = optimizer.make_session_run_hook(is_chief=self.is_chief, num_tokens=0)
            training_hooks.append(self.hook_sync_replicas)
            training_chief_hooks.append(saver_hook)
            training_chief_hooks.append(loggin_hook)
        else:
            training_hooks.append(logging_hook)
            training_hooks.append(saver_hook)
        # train_op = optimizer.minimize(loss)
        train_op = optimizer.minimize(loss, global_step=tf.train.get_global_step())
        if self.opt.distribute:
            self.sync_init_op = optimizer.get_init_tokens_op()
        return tf.estimator.EstimatorSpec(mode, loss=loss, train_op=train_op,
                                          training_hooks=training_hooks,
                                          training_chief_hooks=training_chief_hooks)


In [20]:
FLAGS(sys.argv)
opt=Options()
modelobj=DeepModel(opt)
feat_schema=modelobj.feat_schema
# feat_schema["finalClickFlag"]=tf.io.FixedLenFeature((), tf.float32)
# feat_schema["pay_flag"]=tf.io.FixedLenFeature((), tf.float32)
feat_schema["finalClickFlag"]=tf.io.FixedLenFeature((), tf.int64)
feat_schema["pay_flag"]=tf.io.FixedLenFeature((), tf.int64)

feat_default=modelobj.feat_default
feat_default["finalClickFlag"]=0
feat_default["pay_flag"]=0

In [21]:
# with open("data/train/00.csv","r") as f:
#     head=f.readline().strip()
# column_names=head.split(",")
# feat_all=modelobj.feat_default.keys()
# for col in columns_names:
#     if col not in feat_all:
#         print(col)

### load from csv

In [22]:
from utils.dataio import *

train_list=glob.glob("./data/train/*.csv")
val_list=glob.glob("./data/val/*.csv")
test_list=glob.glob("./data/test/*.csv")
with open("data/train/00.csv","r") as f:
        head=f.readline().strip()
column_names=head.split(",")
record_defaults=[feat_default.get(col,"drop_value") for col in column_names]
drop_columns=["uid","normslotmatchscore","normtrunkmatchscore"]

# ds=input_fn(val_list,column_names,record_defaults,drop_columns,batch_size=1024)
ds=input_fn(train_list,column_names,record_defaults,drop_columns,batch_size=1024)
# batch=next(iter(ds.as_numpy_iterator()))

### load from tfrecords

In [23]:
train_list=glob.glob("./data/train_tf/*.tf")
val_list=glob.glob("./data/val_tf/*.tf")
test_list=glob.glob("./data/test_tf/*.tf")
ds=loadtf(train_list,feat_schema,batch_size=64)
# batch=next(iter(ds.as_numpy_iterator()))

### tf1

In [24]:
opt.max_train_step=7400
opt.output_dir="./tmp_model"
opt.save_summary_steps=500
opt.save_checkpoint_and_eval_step=500

classifier = tf.estimator.Estimator(
    model_fn=modelobj.model_fn,
    params=modelobj.params,
    config=tf.estimator.RunConfig(
                model_dir=opt.output_dir,
                tf_random_seed=2020,
                save_summary_steps=opt.save_summary_steps,
                save_checkpoints_steps=opt.save_checkpoint_and_eval_step,
                keep_checkpoint_max=1000,
                experimental_max_worker_delay_secs=2000)
)

# train_list=glob.glob("./data/train/*.csv")
# val_list=glob.glob("./data/val/*.csv")
# test_list=glob.glob("./data/test/*.csv")

# train_spec = tf.estimator.TrainSpec(
#     input_fn=lambda: input_fn(train_list,column_names,record_defaults,drop_columns,batch_size=1024),
#     max_steps=opt.max_train_step
# )
# eval_spec = tf.estimator.EvalSpec(
#     input_fn=lambda: input_fn(val_list,column_names,record_defaults,drop_columns,batch_size=1024),
#     steps=100,
#     start_delay_secs=3,
#     throttle_secs=3
# )


train_list=glob.glob("./data/train_tf/*.tf")
val_list=glob.glob("./data/val_tf/*.tf")
test_list=glob.glob("./data/test_tf/*.tf")
train_spec = tf.estimator.TrainSpec(
    input_fn=lambda: loadtf(train_list,feat_schema,batch_size=1024),
    max_steps=opt.max_train_step
)
eval_spec = tf.estimator.EvalSpec(
    input_fn=lambda: loadtf(val_list,feat_schema,batch_size=1024),
    steps=1000,
    start_delay_secs=3,
    throttle_secs=3
)

INFO:tensorflow:Using config: {'_model_dir': './tmp_model', '_tf_random_seed': 2020, '_save_summary_steps': 500, '_save_checkpoints_steps': 500, '_save_checkpoints_secs': None, '_session_config': allow_soft_placement: true
graph_options {
  rewrite_options {
    meta_optimizer_iterations: ONE
  }
}
, '_keep_checkpoint_max': 1000, '_keep_checkpoint_every_n_hours': 10000, '_log_step_count_steps': 100, '_train_distribute': None, '_device_fn': None, '_protocol': None, '_eval_distribute': None, '_experimental_distribute': None, '_experimental_max_worker_delay_secs': 2000, '_session_creation_timeout_secs': 7200, '_service': None, '_cluster_spec': <tensorflow.python.training.server_lib.ClusterSpec object at 0x7f76e25bef90>, '_task_type': 'worker', '_task_id': 0, '_global_id_in_cluster': 0, '_master': '', '_evaluation_master': '', '_is_chief': True, '_num_ps_replicas': 0, '_num_worker_replicas': 1}


In [26]:
tf.estimator.train_and_evaluate(classifier, train_spec, eval_spec)


INFO:tensorflow:Not using Distribute Coordinator.
INFO:tensorflow:Running training and evaluation locally (non-distributed).
INFO:tensorflow:Start train and evaluate loop. The evaluate will happen after every checkpoint. Checkpoint frequency is determined based on RunConfig arguments: save_checkpoints_steps 500 or save_checkpoints_secs None.
INFO:tensorflow:Calling model_fn.
INFO:tensorflow:Create CheckpointSaverHook.
INFO:tensorflow:Done calling model_fn.
INFO:tensorflow:Graph was finalized.
INFO:tensorflow:Running local_init_op.
INFO:tensorflow:Done running local_init_op.
INFO:tensorflow:loss = 1.3862896, step = 1


OMP: Info #254: KMP_AFFINITY: pid 640886 tid 641114 thread 4 bound to OS proc set 8
OMP: Info #254: KMP_AFFINITY: pid 640886 tid 641113 thread 1 bound to OS proc set 2
OMP: Info #254: KMP_AFFINITY: pid 640886 tid 641317 thread 5 bound to OS proc set 10
OMP: Info #254: KMP_AFFINITY: pid 640886 tid 641318 thread 6 bound to OS proc set 1
OMP: Info #254: KMP_AFFINITY: pid 640886 tid 641319 thread 7 bound to OS proc set 3
OMP: Info #254: KMP_AFFINITY: pid 640886 tid 641320 thread 8 bound to OS proc set 5
OMP: Info #254: KMP_AFFINITY: pid 640886 tid 641321 thread 9 bound to OS proc set 7
OMP: Info #254: KMP_AFFINITY: pid 640886 tid 641322 thread 10 bound to OS proc set 9
OMP: Info #254: KMP_AFFINITY: pid 640886 tid 641323 thread 11 bound to OS proc set 11
OMP: Info #254: KMP_AFFINITY: pid 640886 tid 641324 thread 12 bound to OS proc set 0
OMP: Info #254: KMP_AFFINITY: pid 640886 tid 641325 thread 13 bound to OS proc set 2
OMP: Info #254: KMP_AFFINITY: pid 640886 tid 641326 thread 14 bound to

INFO:tensorflow:auc_ctr = 0.5279784, auc_cvr = 1.0, loss = 1.3862896, loss1 = 0.6932492, loss2 = 0.6930403, pos_ctr = 519, pos_cvr = 0


INFO:tensorflow:auc_ctr = 0.5279784, auc_cvr = 1.0, loss = 1.3862896, loss1 = 0.6932492, loss2 = 0.6930403, pos_ctr = 519, pos_cvr = 0


KeyboardInterrupt: 

In [None]:
# classifier.evaluate(input_fn=lambda :loadtf(val_list+test_list,feat_schema,num_epochs=1,batch_size=4096))

# https://LiTugou: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@github.com/LiTugou/tvsearch.git