In [1]:
from __future__ import print_function, division
import tensorflow as tf
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
import random

# 切分数据集

In [2]:
def split_data(path, clients_num):
    # 读取数据
    data = pd.read_csv(path)
    # 拆分数据
    X_train, X_test, y_train, y_test = train_test_split(
        data[["Temperature", "Humidity", "Light", "CO2", "HumidityRatio"]].values,
        data["Occupancy"].values.reshape(-1, 1),
        random_state=42)
    
    # one-hot 编码
    y_train = np.concatenate([1 - y_train, y_train], 1)
    y_test = np.concatenate([1 - y_test, y_test], 1)
    
    # 训练集划分给多个client
    X_train = np.array_split(X_train, clients_num)
    y_train = np.array_split(y_train, clients_num)
    return X_train, X_test, y_train, y_test

CLIENT_NUM = 6
X_train, X_test, y_train, y_test = split_data("./data/datatraining.txt", CLIENT_NUM)

# 模拟一个储存服务
1. Client请求最新的全局模型、以及epoch
2. Client上传一次模型更新（epoch为参数）
3. Server获取所有模型更新（epoch为参数）
4. Server上传新的全局模型（epoch为参数）

In [3]:
import os
import pickle
import gzip

BASE_DIR = "./storage"

if not os.path.isdir(BASE_DIR):
    os.mkdir(BASE_DIR)

def pack(model):
    pkl = pickle.dumps(model)
    pkl = gzip.compress(pkl)
    return pkl


def unpack(data):
    pkl = gzip.decompress(data)
    model = pickle.loads(pkl)
    return model


def client_query_model():
    """return the newest model and epoch num"""
    
    newest_epoch = -1
    res_f = None
    
    for f in os.listdir(BASE_DIR):
        if not f.startswith('global_model'):
            continue
        file_name = os.path.splitext(f)[0]
        epoch = int(file_name.split('_')[-1])
        
        if epoch > newest_epoch:
            newest_epoch = epoch
            res_f = f
    
    # file found
    with open("{}/{}".format(BASE_DIR, res_f), 'rb') as rf:
        res = rf.read()
    
    return unpack(res), newest_epoch


def client_upload_one_update(update, epoch, c_id):
    """upload one model update"""
    
    file_name = "{}/local_update_{}_{}.ieen".format(BASE_DIR, c_id, epoch)
    data = pack(update)
    
    with open(file_name, 'wb') as wf:
        wf.write(data)
    
    return


def server_query_updates(cur_epoch):
    """query all model updates"""
    
    res = []
    
    for f in os.listdir(BASE_DIR):
        if not f.startswith('local_update'):
            continue
        file_name = os.path.splitext(f)[0]
        epoch = int(file_name.split('_')[-1])
        
        if epoch == cur_epoch:
            with open("{}/{}".format(BASE_DIR, f), 'rb') as rf:
                data = unpack(rf.read())
                res.append(data)
    
    return res


def server_upload_model(model, epoch):
    """upload one model with epoch num"""
    
    file_name = "{}/global_model_{}.ieen".format(BASE_DIR, epoch)
    data = pack(model)
    
    with open(file_name, 'wb') as wf:
        wf.write(data)
        
    return

# Client训练获得梯度

In [4]:
# client 要训练的epoch
client_epoch = [0] * CLIENT_NUM
client_learning_rate = 0.001

def train_model(client_id):
    model, epoch = client_query_model()
    if epoch < client_epoch[client_id]:
        return
    
    tf.compat.v1.reset_default_graph()
    
    n_samples = X_train[client_id].shape[0]
    
    x = tf.placeholder(tf.float32, [None, n_features])
    y = tf.placeholder(tf.float32, [None, n_class])
    
    ser_W, ser_b = model
    W = tf.Variable(ser_W)
    b = tf.Variable(ser_b)

    pred = tf.matmul(x, W) + b

    # 定义损失函数
    cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits_v2(logits=pred, labels=y))

    # 梯度下降
#     optimizer = tf.train.AdamOptimizer(learning_rate)
    optimizer = tf.train.GradientDescentOptimizer(client_learning_rate)
    
    gradient = optimizer.compute_gradients(cost)
    train_op = optimizer.apply_gradients(gradient)

    # 初始化所有变量
    init = tf.global_variables_initializer()

    # 训练模型
    with tf.Session() as sess:
        sess.run(init)
        
        avg_cost = 0
        total_batch = int(n_samples / batch_size)
        for i in range(total_batch):
            _, c = sess.run(
                [train_op, cost],
                feed_dict={
                    x: X_train[client_id][i * batch_size:(i + 1) * batch_size],
                    y: y_train[client_id][i * batch_size:(i + 1) * batch_size, :]
                })
            avg_cost += c / total_batch
    
        # 获取更新量
        val_W, val_b = sess.run([W, b])
    
    delta_W = (ser_W-val_W)/client_learning_rate
    delta_b = (ser_b-val_b)/client_learning_rate
    delta_model = [delta_W, delta_b]
    meta = [n_samples, avg_cost]
    
    client_upload_one_update([delta_model, meta], epoch, client_id)
    
    client_epoch[client_id] = epoch
    return

# Server端更新模型

In [5]:
# 测试集
def testing(ser_W, ser_b):
    tf.compat.v1.reset_default_graph()
    
    x = tf.placeholder(tf.float32, [None, n_features])
    y = tf.placeholder(tf.float32, [None, n_class])
    
    W = tf.Variable(ser_W)
    b = tf.Variable(ser_b)
    pred = tf.matmul(x, W) + b
    
    correct_prediction = tf.equal(tf.argmax(pred, 1), tf.argmax(y, 1))
    accuracy = tf.reduce_mean(tf.cast(correct_prediction, tf.float32))
    
    # 初始化所有变量
    init = tf.global_variables_initializer()

    # 训练模型
    with tf.Session() as sess:
        sess.run(init)
        acc = accuracy.eval({x: X_test, y: y_test})
    
    return acc

# 设置模型
batch_size = 100
n_features = 5
n_class = 2

EPOCH_NUM = 50 * CLIENT_NUM
server_lr = 0.001

# 模型参数
server_W = np.zeros([n_features, n_class], dtype=np.float32)
server_b = np.zeros([n_class], dtype=np.float32)
server_model = [server_W, server_b]

for epoch in range(EPOCH_NUM):
    server_upload_model(server_model, epoch)
    
    for c_id in range(CLIENT_NUM):
        train_model(c_id)
    
    total_grad_W = None
    total_grad_b = None
    total_size = 0
    total_cost = 0
    
    updates = server_query_updates(epoch)
    for update in updates:
        grads, meta = update
        grad_W, grad_b = grads
        data_size, cost = meta
        
        total_grad_W = (grad_W * data_size) if (total_grad_W is None) else (total_grad_W + grad_W * data_size)
        total_grad_b = (grad_b * data_size) if (total_grad_b is None) else (total_grad_b + grad_b * data_size)
        total_size += data_size
        total_cost += cost
        
    total_grad_W /= total_size
    total_grad_b /= total_size
    total_cost /= CLIENT_NUM
    
    
    # update global model
    server_W = server_W - server_lr * total_grad_W
    server_b = server_b - server_lr * total_grad_b
    server_model = [server_W, server_b]
    
    test_acc = testing(server_W, server_b)
    print("Epoch: {:03}, cost: {:.2f}, test_acc: {:.4f}".format(epoch, total_cost, test_acc))

Epoch: 000, cost: 44.17, test_acc: 0.9047
Epoch: 001, cost: 8.77, test_acc: 0.8497
Epoch: 002, cost: 17.45, test_acc: 0.8379
Epoch: 003, cost: 18.28, test_acc: 0.8404
Epoch: 004, cost: 9.93, test_acc: 0.8816
Epoch: 005, cost: 19.77, test_acc: 0.9224
Epoch: 006, cost: 10.29, test_acc: 0.8639
Epoch: 007, cost: 23.08, test_acc: 0.9347
Epoch: 008, cost: 7.54, test_acc: 0.9229
Epoch: 009, cost: 13.26, test_acc: 0.9258
Epoch: 010, cost: 19.63, test_acc: 0.9032
Epoch: 011, cost: 9.56, test_acc: 0.9072
Epoch: 012, cost: 19.78, test_acc: 0.9062
Epoch: 013, cost: 14.20, test_acc: 0.9327
Epoch: 014, cost: 18.85, test_acc: 0.9219
Epoch: 015, cost: 7.35, test_acc: 0.8325
Epoch: 016, cost: 24.33, test_acc: 0.9376
Epoch: 017, cost: 7.25, test_acc: 0.8944
Epoch: 018, cost: 21.88, test_acc: 0.9028
Epoch: 019, cost: 7.52, test_acc: 0.8811
Epoch: 020, cost: 16.61, test_acc: 0.9244
Epoch: 021, cost: 11.97, test_acc: 0.9337
Epoch: 022, cost: 15.78, test_acc: 0.8094
Epoch: 023, cost: 17.29, test_acc: 0.9386

Epoch: 198, cost: 4.45, test_acc: 0.9666
Epoch: 199, cost: 4.56, test_acc: 0.9617
Epoch: 200, cost: 6.41, test_acc: 0.9710
Epoch: 201, cost: 4.54, test_acc: 0.9617
Epoch: 202, cost: 4.88, test_acc: 0.9700
Epoch: 203, cost: 4.78, test_acc: 0.9720
Epoch: 204, cost: 4.14, test_acc: 0.9661
Epoch: 205, cost: 4.32, test_acc: 0.9661
Epoch: 206, cost: 4.46, test_acc: 0.9661
Epoch: 207, cost: 4.09, test_acc: 0.9720
Epoch: 208, cost: 6.73, test_acc: 0.9612
Epoch: 209, cost: 3.29, test_acc: 0.9646
Epoch: 210, cost: 4.03, test_acc: 0.9612
Epoch: 211, cost: 6.00, test_acc: 0.9705
Epoch: 212, cost: 4.27, test_acc: 0.9720
Epoch: 213, cost: 3.81, test_acc: 0.9646
Epoch: 214, cost: 4.42, test_acc: 0.9740
Epoch: 215, cost: 3.45, test_acc: 0.9627
Epoch: 216, cost: 6.30, test_acc: 0.9627
Epoch: 217, cost: 4.61, test_acc: 0.9632
Epoch: 218, cost: 6.42, test_acc: 0.9646
Epoch: 219, cost: 4.71, test_acc: 0.9646
Epoch: 220, cost: 4.72, test_acc: 0.9720
Epoch: 221, cost: 3.90, test_acc: 0.9632
Epoch: 222, cost