<a href="https://colab.research.google.com/github/handsomeboyck/colab_copy/blob/master/model.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np

# 单例模式
class Singleton(type):
    _instance = {}
        
    def __call__(cls,*args,**kwargs):
        if cls not in Singleton._instance:
            Singleton._instance[cls] = type.__call__(cls,*args,**kwargs)
        return Singleton._instance[cls]

class PS(metaclass=Singleton):

    def __init__(self,embedding_dim):
        # 设置随机数种子
        np.random.seed(2021)
        self.params_server = dict()
        self.dim = embedding_dim
        print("ps inited....")
    # 拉取数据
    def pull(self,keys):
        values = []
        for k in keys:
            tmp = []
            for arr in k:
                value = self.params_server.get(arr,None)
                if value is None:  #若初始为None 则通过随机种子进行产生随机数
                    value = np.random.rand(self.dim)
                    self.params_server[arr] = value
                tmp.append(value)
            values.append(tmp)
        return np.asarray(values,dtype='float32')
    # 更新参数
    def push(self,keys,values):
        for i in range(len(keys)):
            for j in range(len(keys[i])):
                self.params_server[keys[i][j]] = values[i][j]
    def delete(self,keys):
        for k in keys:
            self.params_server.pop(k)
    # 保存参数到文件，便于后续操作
    def save(self,path):
        print("总共包含keys: ",len(self.params_server))
        writer= open(path,"w")
        for k,v in self.params_server.items():
            writer.write(str(k)+"\t"+",".join(['%.8f' % _ for _ in v])+"\n")
        writer.close()

In [None]:
# 测试一下PS
# ps_local = PS(8)
# keys = [[123,234],[567,891]]
# res = ps_local.pull(keys)
# print("参数服务器中有哪些参数：\n",ps_local.params_server)
# print("keys,获取到的对应向量： \n",res)

# gradient = 10
# res = res -0.01*gradient
# ps_local.push(keys,res)

# print("经过迭代后，参数服务器中有哪些参数：\n",ps_local.params_server)


In [None]:
import tensorflow as tf
import os
tf.compat.v1.disable_eager_execution()
class InputFn:
    def __init__(self,local_ps):
        self.feature_len = 2
        self.label_len =1
        self.n_parse_threads = 4
        self.shuffle_buffer_size =1024
        self.prefetch_buffer_size =1
        self.batch = 8
        self.local_ps = local_ps
    def input_fn(self,data_dir,is_test=False):
        def _parse_example(example):
            features = {
                "feature":tf.io.FixedLenFeature(self.feature_len,tf.int64),
                "label":tf.io.FixedLenFeature(self.label_len,tf.float32),
            }
            return tf.io.parse_single_example(example,features)
        
        def _get_embedding(parsed):
            keys = parsed["feature"]
            keys_array = tf.compat.v1.py_func(self.local_ps.pull,[keys],tf.float32)
            result ={
                "feature":parsed["feature"],
                "label":parsed["label"],
                "feature_embedding":keys_array,
            }
            return result
        
        file_list = os.listdir(data_dir)
        files = []
        for i in range(len(file_list)):
            files.append(os.path.join(data_dir,file_list[i]))
        dataset = tf.compat.v1.data.Dataset.list_files(files)
        
        #数据复制多少份
        if is_test:
            dataset = dataset.repeat(1)
        else:
            dataset = dataset.repeat()
        
        #读取tfrecord数据
        dataset = dataset.interleave(
            lambda _:tf.compat.v1.data.TFRecordDataset(_),
            cycle_length =1
        )
        #对tfrecord的数据进行解析
        dataset = dataset.map(
             _parse_example,
            num_parallel_calls = self.n_parse_threads
        )
        
        #batch data
        dataset = dataset.batch(
             self.batch,drop_remainder=True
        )
        
        dataset = dataset.map(
           _get_embedding,
            num_parallel_calls = self.n_parse_threads
        )
        
        #对数据进行打乱
        if not is_test:
            dataset.shuffle(self.shuffle_buffer_size)
        
        #数据预加载
        dataset = dataset.prefetch(
            buffer_size =self.prefetch_buffer_size
        )
        #迭代器
        iterator = tf.compat.v1.data.make_initializable_iterator(dataset)
        return iterator,iterator.get_next()
        

In [None]:
# 测试一下数据读入模型
# local_ps = PS(8)
# inputs = InputFn(local_ps)
# data_dir = "./train_tf"
# train_itor,train_inputs = inputs.input_fn(data_dir,is_test=False)
# with tf.compat.v1.Session() as sess:
#     sess.run(train_itor.initializer)
#     for i in range(10):
#         print(sess.run(train_inputs))

ps inited....
Please report this to the TensorFlow team. When filing the bug, set the verbosity to 10 (on Linux, `export AUTOGRAPH_VERBOSITY=10`) and attach the full output.
Cause: module 'gast' has no attribute 'Index'
Please report this to the TensorFlow team. When filing the bug, set the verbosity to 10 (on Linux, `export AUTOGRAPH_VERBOSITY=10`) and attach the full output.
Cause: module 'gast' has no attribute 'Index'
Instructions for updating:
tf.py_func is deprecated in TF V2. Instead, there are two
    options available in V2.
    - tf.py_function takes a python function which manipulates tf eager
    tensors instead of numpy arrays. It's easy to convert a tf eager tensor to
    an ndarray (just call tensor.numpy()) but having access to eager tensors
    means `tf.py_function`s can use accelerators such as GPUs as well as
    being differentiable using a gradient tape.
    - tf.numpy_function maintains the semantics of the deprecated tf.py_func
    (it is not differentiable, and

In [None]:
# 定义训练模型
import tensorflow as tf

batch = 32
embedding_dim = 8
learning_rate = 0.001

def mf_fn(inputs,is_test):
    # 取特征和y值，feature为：user_id 和 movie_id
    embed_layer = inputs["feature_embedding"]
    embed_layer = tf.reshape(embed_layer,shape=[-1,2,embedding_dim])
    label = inputs["label"]
    # 切分数据，获得user_id的embedding 和 movie_id的embedding
    embed_layer = tf.split(embed_layer,num_or_size_splits=2,axis=1)
    user_id_embedding = tf.reshape(embed_layer[0],shape=[-1,embedding_dim])
    movie_id_embedding = tf.reshape(embed_layer[1],shape = [-1,embedding_dim])
    
    # 根据公式进行乘积并求和
    out_ = tf.reduce_mean(
        user_id_embedding*movie_id_embedding,axis=1
    )
    # 设定预估部分
    out_tmp = tf.sigmoid(out_)
    if is_test:
        tf.compat.v1.add_to_collections("input_tensor",embed_layer)
        tf.compat.v1.add_to_collections("output_tensor",out_tmp)
    
    # 损失函数loss
    label_ = tf.reshape(label,[-1])
    loss_ = tf.reduce_sum(tf.square(label_-out_))
    
    out_dic = {
        "loss": loss_,
        "ground_truth":label_,
        "prediction":out_
    }
    
    return out_dic

# 定义整个图结构,并给出梯度更新方式
def setup_graph(inputs,is_test=False):
    result = {}
    with tf.compat.v1.variable_scope("net_graph",reuse=is_test):
        #初始模型图
        net_out_dic = mf_fn(inputs,is_test)
        
        loss = net_out_dic["loss"]
        result["out"] = net_out_dic
        
        if is_test:
            return result
        
        # SGD
        emb_grad = tf.gradients(
            loss,[inputs["feature_embedding"]],name = "feature_embedding"
        )[0]
        result["feature_new_embedding"] = \
            inputs['feature_embedding']-learning_rate*emb_grad
        result["feature_embedding"] = inputs["feature_embedding"]
        result["feature"] = inputs["feature"]
        
        return result

In [None]:
# 开始训练

import numpy as np
from sklearn.metrics import roc_auc_score


# 定义一个AUC用来评估
class AUCUtils(object):
    def __init__(self):
        self.reset()
        
    def add(self,loss,g=np.array([]),p=np.array([])):
        self.loss.append(loss)
        self.ground_truth += g.flatten().tolist()
        self.prediction += p.flatten().tolist()
    
    def calc(self):
        return {
            "loss_num":len(self.loss),
            "loss":np.array(self.loss).mean(),
            "auc_num":len(self.ground_truth),
            "auc":roc_auc_score(self.ground_truth,self.prediction) if len(self.ground_truth)>0 else 0,
            "pcoc": sum(self.prediction)/sum(self.ground_truth)
        }
    
    def calc_str(self):
        res = self.calc()
        return "loss: %f(%d),auc:%f(%d),pcoc:%f" % (
            res["loss"],res["loss_num"],
            res["auc"],res["auc_num"],
            res["pcoc"]
        )
    
    def reset(self):
        self.loss = []
        self.ground_truth = []
        self.prediction = []

In [None]:
import tensorflow as tf
tf.compat.v1.disable_eager_execution()

batch = 32
embedding_dim = 8
local_ps = PS(embedding_dim)
n_parse_threads = 4
shuffle_buffer_size = 1024
prefetch_buffer_size = 16
max_steps = 100000
test_show_step = 1000

# 数据输入
inputs = InputFn(local_ps)

last_test_auc = 0

train_metric = AUCUtils()
test_metric = AUCUtils()

train_file = './train_tf'
test_file = './test_tf'

save_embedding = "./saved_embedding"

train_itor,train_inputs = inputs.input_fn(train_file,is_test=False)
train_dic = setup_graph(train_inputs,is_test=False)

test_itor,test_inputs = inputs.input_fn(test_file,is_test=True)
test_dic = setup_graph(test_inputs,is_test=True)

train_log_iter = 1000
last_test_auc = 0.5

def train():
    _step = 0
    print("#" * 80)
    # 建议sess 进行训练
    with tf.compat.v1.Session() as sess:
        sess.run([tf.compat.v1.global_variables_initializer(),
                  tf.compat.v1.local_variables_initializer()])
        #开始训练
        sess.run(train_itor.initializer)
        while _step < max_steps:
            feature_old_embedding,feature_new_embedding,keys,out = sess.run(
                [train_dic["feature_embedding"],
                 train_dic["feature_new_embedding"],
                 train_dic["feature"],
                 train_dic["out"]]
            )
            train_metric.add(
                out["loss"],
                out["ground_truth"],
                out["prediction"]
            )
            local_ps.push(keys,feature_new_embedding)
            _step +=1
            # 每训练多少个batch的训练数据,就打印一次训练这些batch的auc信息
            if _step % train_log_iter ==0:
                print("Train at step %d:%s",_step,train_metric.calc_str())
                train_metric.reset()
            if _step % test_show_step ==0:
                valid_step(sess,test_itor,test_dic)

            
def valid_step(sess,test_itor,test_dic):
    test_metric.reset()
    sess.run(test_itor.initializer)
    global last_test_auc
    while True:
        
        try:
            out = sess.run(test_dic["out"])
            test_metric.add(
                out["loss"],
                out["ground_truth"],
                out["prediction"]      
            )
        except tf.errors.OutOfRangeError:
            print("Test at step:%s",test_metric.calc_str())
            res = test_metric.calc()
            print(res["auc"])
            if res["auc"] > last_test_auc:
                last_test_auc = test_metric.calc()["auc"]
                local_ps.save(save_embedding)
                
            break

Please report this to the TensorFlow team. When filing the bug, set the verbosity to 10 (on Linux, `export AUTOGRAPH_VERBOSITY=10`) and attach the full output.
Cause: module 'gast' has no attribute 'Index'
Please report this to the TensorFlow team. When filing the bug, set the verbosity to 10 (on Linux, `export AUTOGRAPH_VERBOSITY=10`) and attach the full output.
Cause: module 'gast' has no attribute 'Index'
Please report this to the TensorFlow team. When filing the bug, set the verbosity to 10 (on Linux, `export AUTOGRAPH_VERBOSITY=10`) and attach the full output.
Cause: module 'gast' has no attribute 'Index'
Please report this to the TensorFlow team. When filing the bug, set the verbosity to 10 (on Linux, `export AUTOGRAPH_VERBOSITY=10`) and attach the full output.
Cause: module 'gast' has no attribute 'Index'


In [None]:
train()

################################################################################
Train at step %d:%s 1000 loss: 3.859538(1000),auc:0.492515(8000),pcoc:0.303126
Test at step:%s loss: 3.949846(17445),auc:0.499357(139560),pcoc:0.294674
0.49935679179805714
Train at step %d:%s 2000 loss: 3.821869(1000),auc:0.478450(8000),pcoc:0.307671
Test at step:%s loss: 3.947695(17445),auc:0.499689(139560),pcoc:0.294915
0.4996890147747465
Train at step %d:%s 3000 loss: 3.813024(1000),auc:0.513809(8000),pcoc:0.306068
Test at step:%s loss: 3.945544(17445),auc:0.500020(139560),pcoc:0.295158
0.5000199329588951
总共包含keys:  77273
Train at step %d:%s 4000 loss: 3.817105(1000),auc:0.501010(8000),pcoc:0.307362
Test at step:%s loss: 3.943401(17445),auc:0.500353(139560),pcoc:0.295399
0.5003531578812915
总共包含keys:  77350
Train at step %d:%s 5000 loss: 3.780049(1000),auc:0.499715(8000),pcoc:0.310574
Test at step:%s loss: 3.941271(17445),auc:0.500688(139560),pcoc:0.295638
0.500688498205281
总共包含keys:  77431
Train at step

Test at step:%s loss: 3.859653(17445),auc:0.513038(139560),pcoc:0.304976
0.5130376528925327
总共包含keys:  78755
Train at step %d:%s 45000 loss: 3.736927(1000),auc:0.511365(8000),pcoc:0.317633
Test at step:%s loss: 3.857570(17445),auc:0.513357(139560),pcoc:0.305218
0.5133569190858981
总共包含keys:  78772
Train at step %d:%s 46000 loss: 3.718017(1000),auc:0.519086(8000),pcoc:0.318912
Test at step:%s loss: 3.855569(17445),auc:0.513653(139560),pcoc:0.305450
0.5136534832880748
总共包含keys:  78793
Train at step %d:%s 47000 loss: 3.770943(1000),auc:0.497460(8000),pcoc:0.314715
Test at step:%s loss: 3.853536(17445),auc:0.513962(139560),pcoc:0.305686
0.513962312375544
总共包含keys:  78815
Train at step %d:%s 48000 loss: 3.737950(1000),auc:0.497039(8000),pcoc:0.318750
Test at step:%s loss: 3.851476(17445),auc:0.514269(139560),pcoc:0.305926
0.5142687046981358
总共包含keys:  78834
Train at step %d:%s 49000 loss: 3.719399(1000),auc:0.513042(8000),pcoc:0.320001
Test at step:%s loss: 3.849402(17445),auc:0.514586(13956

总共包含keys:  79278
Train at step %d:%s 88000 loss: 3.641615(1000),auc:0.527018(8000),pcoc:0.329764
Test at step:%s loss: 3.772510(17445),auc:0.525651(139560),pcoc:0.315248
0.5256514811051121
总共包含keys:  79290
Train at step %d:%s 89000 loss: 3.628697(1000),auc:0.530841(8000),pcoc:0.330411
Test at step:%s loss: 3.770582(17445),auc:0.525915(139560),pcoc:0.315480
0.5259153619206228
总共包含keys:  79299
Train at step %d:%s 90000 loss: 3.658807(1000),auc:0.538969(8000),pcoc:0.327173
Test at step:%s loss: 3.768591(17445),auc:0.526191(139560),pcoc:0.315719
0.526190770872437
总共包含keys:  79307
Train at step %d:%s 91000 loss: 3.633905(1000),auc:0.540814(8000),pcoc:0.328997
Test at step:%s loss: 3.766690(17445),auc:0.526449(139560),pcoc:0.315948
0.5264494388956733
总共包含keys:  79318
Train at step %d:%s 92000 loss: 3.619223(1000),auc:0.540401(8000),pcoc:0.330919
Test at step:%s loss: 3.764802(17445),auc:0.526714(139560),pcoc:0.316174
0.5267137692494529
总共包含keys:  79323
Train at step %d:%s 93000 loss: 3.61583

In [None]:
ls

 驱动器 D 中的卷是 Data
 卷的序列号是 D8DA-21A2

 D:\software_workplace\python_workplace\ssh_python_workplace\recommend\data_prefix 的目录

2021/08/09  15:14    <DIR>          .
2021/08/09  15:14    <DIR>          ..
2021/08/08  16:44    <DIR>          .ipynb_checkpoints
2021/08/08  19:26            12,509 data_prefix.ipynb
2021/08/09  15:14            73,617 model.ipynb
2021/07/30  14:39       265,105,635 ratings.dat
2021/08/09  15:15         8,580,999 saved_embedding
2021/07/30  16:34         1,923,952 test_set
2021/07/30  16:54         5,841,904 test_set_hash
2021/08/08  17:08    <DIR>          test_tf
2021/07/30  16:34       133,837,236 train_set
2021/07/30  16:54       403,964,180 train_set_hash
2021/08/08  17:08    <DIR>          train_tf
               8 个文件    819,340,032 字节
               5 个目录 279,717,195,776 可用字节


In [None]:
pwd

'D:\\software_workplace\\python_workplace\\ssh_python_workplace\\recommend\\data_prefix'