转换数据格式-tfrecords：
------------------------
1、tfrecords是一种序列化数据格式，适用于tensorflow框架中模型训练使用

2、转换成tfrecords数据格式，具体数据格式如下：

```
Example{
    "feature":tf.init64_list,
    "label":tf.float_list
}
```

In [11]:
!nvidia-smi

Sun Apr 11 15:22:37 2021       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 450.80.02    Driver Version: 450.80.02    CUDA Version: 11.0     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  TITAN V             Off  | 00000000:3F:00.0 Off |                  N/A |
| 28%   32C    P2    35W / 250W |  11962MiB / 12066MiB |      2%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Proces

In [1]:
import tensorflow as tf
import os
import numpy as np
tf.compat.v1.disable_eager_execution()

def get_tfrecords_example(feature, label):
    tfrecords_features = {
        'feature': tf.train.Feature(int64_list=tf.train.Int64List(value=feature)),
        'label': tf.train.Feature(float_list=tf.train.FloatList(value=label))
    }
    
    return tf.train.Example(
        features=tf.train.Features(feature=tfrecords_features))

In [None]:
def to_tfrecords(file, save_dir):
    print("Process To tfrecord File: %s ..." % file)
    num = 0
    writer = tf.io.TFRecordWriter(save_dir + "/" + "part-0000" + str(num) + ".tfrecords")
    lines = open(file)
    for i, line in enumerate(lines):
        tmp = line.strip().split(",")
        feature = [int(tmp[0]), int(tmp[1])]
        label = [float(1) if float(tmp[2]) >= 3 else float(0)]
        example = get_tfrecords_example(feature, label)
        writer.write(example.SerializeToString())
        if (i+1) % 200000 == 0:
            writer.close()
            num += 1
            writer = tf.io.TFRecordWriter(save_dir + "/" + "part-0000" + str(num) + ".tfrecords")
    print("Process To tfrecord File: %s End" % file)
    writer.close()
    
train_file_path = './save_data/train_set_tohash'
train_totfrecord = './save_data/train'
test_file_path = './save_data/test_set_tohash'
test_totfrecord = './save_data/val'

os.mkdir(train_totfrecord)
os.mkdir(test_totfrecord)
to_tfrecords(train_file_path, train_totfrecord)
to_tfrecords(test_file_path, test_totfrecord)

Process To tfrecord File: ./save_data/train_set_tohash ...
Process To tfrecord File: ./save_data/train_set_tohash End
Process To tfrecord File: ./save_data/test_set_tohash ...


模型代码开发架构图
------------------------

PS(Parameter Server)
------------------------
```
Key1, vector1
Key2, vector2
Key3, vector3
...
KeyN, vectorN
```
简化版Parameter Server，用一个字典进行存储，不是一开始就对所有的特征向量随机化，而是用到了才随机产生

Input层
------------------------
读取tfrecords数据，并从PS(参数服务)中取出对应的向量，构建完整的input层

Data1{
    feature->[batch, featureNum].
    label->[1]
}

Data1{
    feature->[batch, featureNum, vector_len],
    label->[1]
}


模型层
------------------------
模型多轮训练
$$\mathrm{M}^{\prime} \mathrm{UI}=\sum_{k=1}^{K} P_{U, k} Q_{k, I}$$
$$S S E=E^{2}=\sum_{U, \mathrm{I}}\left(M_{U, I}-M_{U, l}^{\prime}\right)^{2}$$


参数更新SGD
------------------------

## PS(Parameter Server)

In [None]:
# metaclass=Singleton
class PS:
    def __init__(self, embedding_dim):
        np.random.seed(2020)
        self.params_server = dict()
        self.dim = embedding_dim
        print("ps inited...")
        
    def pull(self, keys):  # 从参数服务器拉去特征所对应的参数
        values = []
        # 这里传进来的数据是[batch, feature_len]->一个样本的数据，样本的特征长度
        for k in keys:
            tmp = []
            for arr in k:
                value = self.params_server.get(arr, None)
                if value is None:
                    value = np.random.rand(self.dim)
                    self.params_server[arr] = value
                tmp.append(value)
            values.append(tmp)
        
        return np.asarray(values, dtype='float32')
    
    def push(self, keys, values):
        for i in range(len(keys)):
            for j in range(len(keys[i])):  # [batch, feature_len]
                self.params_server[keys[i][j]] = values[i][j]
    
    
    def delete(self, keys):
        for k in keys:
            self.params_server.pop(k)
            
    def save(self, path):
        print("总共包含keys： ", len(self.params_server))
        writer = open(path, "w")
        for k, v in self.params_server.items():
            writer.write(str(k) + "\t" + ",".join(["%.8f" % _ for _ in v]) + "\n")
        writer.close()

In [None]:
if __name__ == "__main__":
    
    # 测试PS各个功能
    ps_local = PS(8)
    keys = [[123, 234], [567, 891]]
    # 从参数服务pull keys, 如果参数服务中有这个key就直接去除，若没有就随机初始取出
    res = ps_local.pull(keys)
    print("参数服务器中有哪些参数：\n", ps_local.params_server)
    print("keys获取对应的向量：\n", res)
    
    # 经过多轮迭代更新后，传入参数服务器中
    gradient = 10
    res = res - 0.01 * gradient
    ps_local.push(keys, res)
    print("经过多轮迭代更新后，参数服务器中的参数:\n", ps_local.params_server)
    
    # 经过上述多乱的pull参数，然后梯度更新后，获得最终的key对应的向量embedding
    # 保存向量，该向量用于召回
    path = './save_data/feature_embedding/test_embedding'
    ps_local.save(path)

## Input层

In [None]:
class InputFn:
    
    def __init__(self, local_ps):
        self.feature_len = 2
        self.label_len = 1
        self.n_parse_threads = 4
        self.shuffle_buffer_size = 1024
        self.prefetch_buffer_size = 1
        self.batch = 8
        self.local_ps = local_ps
        
    def input_fn(self, data_dir, is_test=False):
        def _parse_example(example):
            features = {
                "feature": tf.io.FixedLenFeature(self.feature_len, tf.int64),
                "label": tf.io.FixedLenFeature(self.label_len, tf.float32),
            }
            return tf.io.parse_single_example(example, features)
        
        def _get_embedding(parsed):
            keys = parsed["feature"]
            keys_array = tf.compat.v1.py_func(self.local_ps.pull, [keys], tf.float32)
            result = {
                "feature": parsed["feature"],
                "label": parsed["label"]
            }
            return tf.io.parse_single_example(example, features)
        
        def _get_embedding(parsed):
            keys = parsed["feature"]
            keys_array = tf.compat.v1.py_func(self.local_ps.pull, [keys], tf.float32)
            result = {
                "feature": parsed["feature"],
                "label": parsed["label"],
                "feature_embedding": keys_array,
            }
            return result
        
        file_list = os.listdir(data_dir)
        files = []
        for i in range(len(file_list)):
            files.append(os.path.join(data_dir, file_list[i]))
        
        dataset = tf.compat.v1.data.Dataset.list_files(files)
        # 数据复制多少份
        if is_test:
            dataset = dataset.repeat(1)
        else:
            dataset = dataset.repeat()
        # 读取tfrecord数据
        dataset = dataset.interleave(
            lambda _: tf.compat.v1.data.TFRecordDataset(_),
            cycle_length=1
        )
        # 对tfrecord的数据进行解析
        dataset = dataset.map(
            _parse_example,
            num_parallel_calls=self.n_parse_threads)
        
        # batch data
        dataset = dataset.batch(
            self.batch, drop_remainder=True)
        
        dataset = dataset.map(
            _get_embedding,
            num_parallel_calls=self.n_parse_threads)
        
        # 对数据进行打乱
        if not is_test:
            dataset.shuffle(self.shuffle_buffer_size)
            
        # 数据预加载
        dataset = dataset.prefetch(
            buffer_size=self.prefetch_buffer_size)
        
        # 迭代器
        iterator = tf.compat.v1.data.make_initializable_iterator(dataset)
        return iterator, iterator.get_next()

In [None]:
if __name__ == '__main__':
    local_ps = PS(8)
    inputs = InputFn(local_ps)
    data_dir = './save_data/train/'
    train_itor, train_inputs = inputs.input_fn(data_dir, is_test=False)
    with tf.compat.v1.Session() as sess:
        sess.run(train_itor.initializer)
        for i in range(1):
            print(sess.run(train_inputs))