# PAI-tensorflow DeepFM

当前样例为PAI TF 实现的DeepFM [1].本样例代码fork自[Github](https://github.com/ChenglongChen/tensorflow-DeepFM)并进行了一定的改造.


# 使用说明
## 样例数据
本实现仅接受kv类型数据，所有输入特征必须以kv类型表达，样例数据举例

|label|feature1|feature2|multi_tags1|multi_tags2|
|----|----|----|----|----|
|1|1:0.1|3:0.2|3:0.5,4:0.3|7:0.1|
|1|1:0.1|3:0.2,110:0.5,170:0.2|4:0.3|7:0.1|
|1|1:0.1|3:0.2|3:0.5,4:0.3|7:0.1|
|0|1:0.1|6:0.2,130:0.5|4:0.3|7:0.1|
|1|1:0.1|3:0.2|3:0.5,4:0.3|7:0.1|
|0|1:0.1|3:0.5|3:0.5,10:0.3|7:0.1,12:0.3|
|1|1:0.1|7:0.2|:0.3,4:0.3|7:0.1|
|0|1:0.1|3:0.2|3:0.5,9:0.3|7:0.1|

## 数据说明
输入kv数据分为两种类型，模型会根据key值确定具体embedding，value值可为整数或实数权重，为了支持实际应用中多标签对应同一个特征的情况，本实现还支持multi tags embedding，即会将多个tag的embedding合并为一个embedding输入模型，增强泛化

- 普通kv数据
   
   `如上例中feature1,feature2中的数据，对于每个kv对会对应一个embedding输入，同一列可以存在多个kv，允许key不出现，但需要注意一点,普通kv数据为稀疏表达，但模型仅接受稠密输入，故存在kv map，将key映射到不同的模型槽位`
   
    ![avatar](kv_map.png)
    
    `上述右侧kv map参数为本实现的一个输入参数，可以根据数据情况进行调整`
    `普通kv数据key共享一个key空间，多列输入和合并至一列并没有区别，key都受kv map影响，所有key共享一个最大key size限制`
    
    ***注意***：一条样本内同一个kv map区间内的key只应出现一次，如果不互斥训练行为会发生异常
    
- multi tags数据
    
    `如上例中的multi_tags1,multi_tags2中的数据，与普通kv数据不同的是，该类型一列数据仅代表一个feature，当拥有多个multi tags feature时，要存放至多个列中，每列内的key空间相互独立，但，为便于参数配置所有列的最大key size相同，本类型key不受kv map限制`
    
    ***注意***：当前多个tags的embedding仅支持sum方式合并
## 支持平台

|运行平台|OSS读取|Maxcompute读取|本地读取|
|----|----|----|----|
|DSW|否|否|是|
|open source|否|否|是|
|PAI studio(Maxcompute)|是|是|否|

                                注意 Maxcompute平台运行参数见后续章节

## 模型参数说明

|参数|说明|备注|
|------|------|-----|
|input_name |输入数据||
|feature_cols |普通kv特征列|可多个，kv格式|
|multi_tags_cols|multi tags特征列|0至多个，kv 格式|
|kv_map |kv映射|为python数据格式，例如[[[1],[1,100]],[[2],[101,30000]]]|
|feature_max_size|普通特征最大key||
|multi_tags_max_size| multi tags最大key|所有multi tags列共享该值|
|label_col_name|标签列列名|0或1|
|kvs_delimiter|kv分隔符|默认逗号|
|kv_delimiter|key与valu分隔符|默认为冒号|
|checkpointDir|checkpoint oss地址|必须经过ARN授权|
|output_name|输出数据|predict模式有效|
|mode|模式|train or predict|
|use_fm|是否使用FM|True or False|
|use_deep|是否使用Deep|同上|
|sync_type|同步模式|"async" or "sync"|
|embedding_size|embedding 大小|默认为:8|
|num_steps|最大迭代次数|默认为:100*1000*1000|
|epoch|最大轮数|默认1|
|dropout_fm|fm侧droptout参数|为pyhon数据格式,默认为[1.0, 1.0],必须为2个值参数|
|deep_layers|deep侧神经网络设置|为python数据个,默认为[32, 32], 可设置更多层数和每层神经元数|
|dropout_deep|deep侧dropout参数|为python格式,默认为[0.5, 0.5, 0.5], 注意:元素个数必须为len(deep_layers) + 1|
|batch_size|batch大小|默认为:128|
|learning_rate|学习率|默认为:0.001|
|optimizer_type|优化器类型|默认为:adam,可选值为adam,adagrad,gd,momentum|
|batch_norm|是否开启batch norm|True or False|
|batch_norm_decay|batch norm衰减|默认为: 0.995|
|l2_reg|L2正则参数|默认为:0.01|
|log_verbose|详细log 选项|True or False|
|adam_beta1|adam 优化器参数|默认0.9| 
|adam_beta2|adam 优化器参数|默认0.99| 
|adam_epsilon|adam 优化器参数|默认1e-8| 
|adagrad_initial_accumulator_value|adagrad 优化器参数|默认1e-8| 
|momentum|momentum 优化器参数|默认0.95| 
|random_seed|随机数种子|默认为:123456,注意在分布式异步模式下， 构图为必然结果，计算结果仍随机|

***注意***: `以上参数既试用于调用DeepFM接口，也适用于PAI stuido运行参数`

## 文件说明

  本实现包含两个文件，`DeepFM.py`为模型实现文件，`entry.py`为启动main文件（已适配PAI studio）
  
## 初始化和训练模型

In [1]:
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

from DeepFM import *
tf.logging.set_verbosity(tf.logging.INFO)

## 构造模型所需参数

In [4]:
kv_map = [[1],[1,300],[2],[300,700],[3],[700,1000],[4],[1000,1500]]
# 对于输入于不同平台输入格式略有差别，本示例以dsw本地文件为样例
input_name = "./train.csv" 
# DeepFM 参数
dfm_params = {
        "use_fm": True,
        "use_deep": True,
        "embedding_size": 8,
        "dropout_fm": [0.8,0.8],
        "deep_layers": [32,32],
        "dropout_deep": [0.5,0.5,0.5],
        "deep_layers_activation": tf.nn.relu,
        "batch_size": 128, 
        "learning_rate": 0.001,
        "optimizer_type": "adam",
        "batch_norm": False,
        "batch_norm_decay": 0.995,
        "l2_reg": 0.0,
        "epoch": 1,
        "num_steps": 10, 
        "verbose": True,
        "random_seed": 1,
        "server": None,
        "cluster": None,
        "task_index": 0, 
        "worker_num": 1,
        "input_name": input_name,
        "feature_cols": ["feature1"],
        "kv_map":kv_map,
        "feature_max_size": 3000,
        "label_col_name": "label",
        "kvs_delimiter": "|",
        "kv_delimiter": ":",
        "sync_type": "async",
        "checkpoint_dir": "./checkpoint",
        "mode": "train",
        "output_name": "./out",
        "multi_tags_col_name": ["multi_tags1", "multi_tags2"],
        "multi_tags_max_size":50
    }

## 构造模型

In [5]:
deepfm = DeepFM(**dfm_params)

kv map range start: 1  end: 300  solt: 0
kv map range start: 300  end: 700  solt: 1
kv map range start: 700  end: 1000  solt: 2
kv map range start: 1000  end: 1500  solt: 3
select columns : label,feature1,multi_tags1,multi_tags2
[2020-03-12 13:58:57,227] [INFO] [11055#MainThread] [../../.local/lib/python2.7/site-packages/tensorflow/python/util/auto_strategy_utils.py:108] Disable Auto Strategy.
Instructions for updating:
Create a `tf.sparse.SparseTensor` and use `tf.sparse.to_dense` instead.
[2020-03-12 13:58:59,198] [INFO] [11055#MainThread] [../../.local/lib/python2.7/site-packages/tensorflow/python/ops/summary_op_util.py:77] Summary name avg loss is illegal; using avg_loss instead.
model params number: 29671
model total size: 118684
build graph done


## 模型训练

In [7]:
deepfm.train_and_evaluate()

[2020-03-12 13:59:44,323] [INFO] [11055#MainThread] [../../.local/lib/python2.7/site-packages/tensorflow/python/training/basic_session_run_hooks.py:529] Create CheckpointSaverHook.
[2020-03-12 13:59:44,325] [INFO] [11055#MainThread] [../../.local/lib/python2.7/site-packages/tensorflow/python/training/basic_session_run_hooks.py:544] Init incremental saver , incremental_save:False, incremental_path:./checkpoint/.incremental_checkpoint/incremental_model.ckpt
[2020-03-12 13:59:44,326] [INFO] [11055#MainThread] [../../.local/lib/python2.7/site-packages/tensorflow/python/training/monitored_session.py:234] Graph was finalized.
[2020-03-12 13:59:45,396] [INFO] [11055#MainThread] [../../.local/lib/python2.7/site-packages/tensorflow/python/training/session_manager.py:507] Running local_init_op.
[2020-03-12 13:59:45,418] [INFO] [11055#MainThread] [../../.local/lib/python2.7/site-packages/tensorflow/python/training/session_manager.py:509] Done running local_init_op.
[2020-03-12 13:59:50,275] [INFO

## 使用模型预测

In [10]:
kv_map = [[1],[1,300],[2],[300,700],[3],[700,1000],[4],[1000,1500]]
# 模型预测文件 
input_name = "./test.csv" 
output_name = "./result.csv"
# DeepFM 参数,注意model参数调整为predict
dfm_params = {
        "use_fm": True,
        "use_deep": True,
        "embedding_size": 8,
        "dropout_fm": [0.8,0.8],
        "deep_layers": [32,32],
        "dropout_deep": [0.5,0.5,0.5],
        "deep_layers_activation": tf.nn.relu,
        "batch_size": 128, 
        "learning_rate": 0.001,
        "optimizer_type": "adam",
        "batch_norm": False,
        "batch_norm_decay": 0.995,
        "l2_reg": 0.0,
        "epoch": 1,
        "num_steps": 10, 
        "verbose": True,
        "random_seed": 1,
        "server": None,
        "cluster": None,
        "task_index": 0, 
        "worker_num": 1,
        "input_name": input_name,
        "feature_cols": ["feature1"],
        "kv_map":kv_map,
        "feature_max_size": 3000,
        "label_col_name": "label",
        "kvs_delimiter": "|",
        "kv_delimiter": ":",
        "sync_type": "async",
        "checkpoint_dir": "./checkpoint",
        "mode": "predict",
        "output_name": output_name,
        "multi_tags_col_name": ["multi_tags1", "multi_tags2"],
        "multi_tags_max_size":50
    }
# 会根据checkpoint dir参数自动拉起模型进行预测
deepfm = DeepFM(**dfm_params) 
deepfm.predict()

kv map range start: 1  end: 300  solt: 0
kv map range start: 300  end: 700  solt: 1
kv map range start: 700  end: 1000  solt: 2
kv map range start: 1000  end: 1500  solt: 3
select columns : label,feature1,multi_tags1,multi_tags2
build graph done
[2020-03-12 14:02:56,123] [INFO] [11055#MainThread] [../../.local/lib/python2.7/site-packages/tensorflow/python/training/basic_session_run_hooks.py:529] Create CheckpointSaverHook.
[2020-03-12 14:02:56,125] [INFO] [11055#MainThread] [../../.local/lib/python2.7/site-packages/tensorflow/python/training/basic_session_run_hooks.py:544] Init incremental saver , incremental_save:False, incremental_path:./checkpoint/.incremental_checkpoint/incremental_model.ckpt
[2020-03-12 14:02:56,270] [INFO] [11055#MainThread] [../../.local/lib/python2.7/site-packages/tensorflow/python/training/monitored_session.py:234] Graph was finalized.
[2020-03-12 14:02:56,545] [INFO] [11055#MainThread] [../../.local/lib/python2.7/site-packages/tensorflow/python/training/saver

In [11]:
pd.read_csv("./result.csv",header=None)

Unnamed: 0,0,1,2,3,4
0,0.476935,171:0.160769187967|558:0.762985057103|987:0.46...,7:0.551860799509|2:0.516122332872,3:0.578531033382|10:0.912359976594,1.0
1,0.502716,125:0.631063165963|424:0.610401693013|982:0.30...,6:0.850742992565|5:0.452926581219|4:0.74618124...,5:0.733476495488|4:0.746721584349,0.0
2,0.455586,226:0.590334677243|601:0.353207570678|972:0.87...,1:0.899031815365|3:0.287916015904,6:0.792869990048|1:0.806770056389,1.0
3,0.451006,187:0.167603031484|338:0.0603367158994|790:0.5...,7:0.325278910648,14:0.214951601542,0.0
4,0.483660,239:0.749947232634|630:0.832378526229|847:0.49...,9:0.501711706914|5:0.575738108738,3:0.466092055001|13:0.709491003408,0.0
5,0.479454,203:0.222651807156|481:0.567848503189|731:0.77...,5:0.285600906477,11:0.493049264151|2:0.158095697727|11:0.754516...,1.0
6,0.431707,59:0.855379126162|513:0.141471919663|783:0.409...,6:0.817890670226,4:0.364886715962|14:0.784435436416|10:0.043269...,0.0
7,0.462653,20:0.131093229566|677:0.560840054375|908:0.737...,4:0.709533662447,10:0.974380757476,1.0
8,0.487114,172:0.766779876387|584:0.66569748063|989:0.355...,6:0.114119799904,2:0.508507829738,1.0
9,0.469684,37:0.216288069064|361:0.270378044031|813:0.376...,5:0.15777133173|4:0.139784393334,6:0.494135082626,1.0


上例中第一行为模型打分，后续为特征原样输出

## 模型服务

本实现在模型正常训练完成后，会导出相应save model模型，存储在checkpoint dir目录中，如果是在pai studio 中训练，可在oss checkpoint dir路径下找到，模型可直接在tf serving或PAI EAS在线服务中使用，导出signature为：

```python
inputs["features"]  = all kv features
inputs[<mulit tags coluname 1>] = multi tags kv feture
inputs[<mulit tags coluname 2>] = multi tags kv feture
...
...
inputs[<mulit tags coluname n>] = multi tags kv feture
outputs["score"] = socre
```
其中**features**和**scores**为固定的名称,对于multi tags类型名称以训练时输入的列名确定，且每个multi tags输入为单独输入，在线预测是为batch预测，当仅有一行样本需要预测是，需确保构造为batch

# PAI studio（Maxcompute）
## 使用说明

本实现已经做了平台兼容，可以将DSW中的训练无缝的迁移至PAI studio（Maxompute）上进行大规模分布式生产作业。
此时程序入口可以使用本实现中提供的entry.py,将程序代码打包上传并准备好数据后（平台接受数据见上文），即可进行训练。
以Maxcompute前端工具Dataworks或odps cmdline提交为例：

```bash
pai -name tensorflow1120_ext -project algo_public
-Dscript='[odps|oss]://<your project>/resources/<deepfm.tgz>'  #程序文件包
-DentryFile='entry.py'
-Dcluster='{\"ps\":{\"count\":3,\"cpu\":800,\"memory\":16000},\"worker\": {\"count\":20,\"cpu\":800,\"memory\":4000}}'  #集群参数
# maxcompute 或oss 数据，此处以maxcompute表为例，但需注意此处只是资源声明，模型参数输入见DuserDefinedParameters
-Dtables='odps://<your project>/tables/<input table name>'   
-DcheckpointDir='oss://<your oss bucket>.oss-cn-<region name>-internal.aliyuncs.com/<your dir>'  
-Darn='<your arn>'
# 同tables参数一致，也为资源声明，为输出
-Doutputs='odps://<your project>/tables/<output table name>'
# 模型参数
-DuserDefinedParameters='--mode=train --embedding_size=16 --kv_map=[[1],[1-100],[2],[100-1000]] ....'
;
```

以上提交需要注意一点为：在maxcompute上模型并不会创建表，需要用户手动创建，同时不会进行列名搜索对齐，创建表schema仅需在特征表第一行增加score列，其他按照输入特征表schema格式创建即可

# 高级用法
## 增量训练

本实现一定程度上支持了增量训练，增量主要是通过预留kv中key的空间来实现的，可以预先留一部分key，在后续训练中增加相关key短期内无需重复大规模训练

## FM && Deep

本实保留了原作者实现是的use_fm和use deep，通过设定该参数可以将模型退化为纯FM或纯DNN



# 引用
[1] *DeepFM: A Factorization-Machine based Neural Network for CTR Prediction*, Huifeng Guo, Ruiming Tang, Yunming Yey, Zhenguo Li, Xiuqiang He.

# 致谢
- [ ] He Xiangnan
- [ ] Chen Chenglong 

# 协议
MIT

# 常见问题
- feature max size 设置小于实际kv值，造成embedding lookup报错
- mode train和predict设置错误，造成部分op找不到
- kv数据内每个槽位内有多个key造成冲突，embedding lookup位置返回-1报错
- 数据内分隔符使用错误造成，提示split错误
- 数据内存在非实数造成类型转换失败
- checkpoint未删除重复训练，造成模型加载和图不匹配错误