In [1]:
import os
import logging
import numpy as np
import tensorflow as tf
from tensorflow.keras import layers, losses, optimizers, metrics
from  tqdm import *


tf.keras.backend.set_floatx('float32')

In [2]:
# 全局参数
NUM_USER = 100
np.random.seed(12)
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s')
import warnings
warnings.filterwarnings("ignore")

In [3]:
loss_fn = losses.SparseCategoricalCrossentropy()
metrics_list = [tf.keras.metrics.Accuracy()]
# optimizer = optimizers.SGD(learning_rate=1e-2)
optimizer_class = optimizers.SGD
E = 3
batch_size = 20
hidden_unit = 100
beta = 0.99

## 准备数据

In [4]:
import pickle

# 加载数据
with open('../data/mnist/train_array.pkl', 'rb') as f:
    train_data = pickle.load(f)
with open('../data/mnist/test_array.pkl', 'rb') as f:
    test_data = pickle.load(f)

## 定义模型

In [5]:
class Mnist_LG(tf.Module):
    def __init__(self, hidden_unit):
        """
        :param
            hidden_unit: positive integer, dimensionality of the hidden layer.
        """
        super(Mnist_LG, self).__init__()
        self.hidden_unit = hidden_unit
        self.fc_1 = layers.Dense(hidden_unit, activation='relu')
        self.fc_2 = layers.Dense(10, activation='softmax')
        self.global_index = 2
        
    def __call__(self, x):
        x = self.fc_1(x)
        output = self.fc_2(x)
        return output

## 客户端

In [6]:
class Client_LG():
    def __init__(self, id, hidden_unit, train_data={'x': [], 'y': []},
                 test_data={'x': [], 'y': []}, **kwargs):
        self.id = id
        self.train_data = {'x': np.array(train_data['x']),
                           'y': np.array(train_data['y'])}
        self.test_data = {'x': np.array(test_data['x']),
                          'y': np.array(test_data['y'])}
        self.train_samples = len(train_data['y'])
        self.test_samples = len(test_data['y'])
        self.index = np.arange(self.train_samples)
        self.model = Mnist_LG(hidden_unit)  # TODO:动态的加载类，因为更新方式不一样，所以好像没办法聚合,options应该更好地设置一下
        for key, value in kwargs.items():
            setattr(self, key, value)

        assert hasattr(self, 'E')
        assert hasattr(self, 'optimizer')
        assert hasattr(self, 'lr')  #
        assert hasattr(self, 'loss_fn')
        assert hasattr(self, 'metrics')
        assert hasattr(self, 'batch_size')
        assert (self.batch_size * self.E) < self.train_samples
        self.init_model()
        self.optimizer = kwargs['optimizer'](self.lr)

    def init_model(self):
        '''
        using call method to initialize net parameters.
        :return:
        '''
        init_feature = tf.cast(self.test_data["x"][:5], dtype=tf.float32)
        _ = self.model(init_feature)

    def forward(self, communication_round=None):
        np.random.shuffle(self.index)
        self.select_index = self.index[:self.batch_size * self.E]
        train_set = tf.data.Dataset.from_tensor_slices((self.train_data["x"][self.select_index],
                                                        self.train_data["y"][self.select_index])).batch(self.batch_size)
        local_round = 0
        for feature, label in train_set:
            with tf.GradientTape() as tape:
                predict = self.model(feature)  # without softmax
                loss = self.loss_fn(label, predict)
            grads = tape.gradient(loss, self.model.trainable_variables)
            self.optimizer.apply_gradients(zip(grads, self.model.trainable_variables))
            local_round += 1
        return (self.model.trainable_variables[self.model.global_index:])

    def update_variables(self, new_variables):
        """
        :param
            new_variables: tuple, each element is ndarray
        :result: None, only copy server model
        """
        assert len(self.model.trainable_variables[self.model.global_index:]) == len(new_variables)
        for i in range(len(new_variables)):
            self.model.trainable_variables[self.model.global_index + i].assign(new_variables[i])

    def test(self):
        test_set = tf.data.Dataset.from_tensor_slices((self.test_data["x"], self.test_data["y"])).batch(
            self.test_samples)
        train_set = tf.data.Dataset.from_tensor_slices((self.train_data["x"], self.train_data["y"])).batch(
            self.train_samples)
        for feature, label in test_set:
            output = self.model(feature)
            loss = self.loss_fn(label, output).numpy()

        prediction = tf.argmax(output, axis=-1).numpy()

        metric_result = []
        for metric in self.metrics:
            metric.reset_states()
            _ = metric.update_state(self.test_data['y'], prediction)
            metric_result.append(metric.result().numpy())

        for feature, label in train_set:
            output = self.model(feature)
            train_loss = self.loss_fn(label, output).numpy()
        # group_idx = tf.argmax(self.model.c, axis=1).numpy()[0]
        # group_idx = tf.argmax(self.model.c, axis=0).numpy()[0]
        return (loss, metric_result, train_loss)

## 服务器端

In [7]:
class Server_LG():
    def __init__(self, hidden_unit, train_data, test_data, E, optimizer, loss_fn, metrics, batch_size, epoches,
                 lr, filename='/home/dihao/code/PFedL/preliminary/Result/mnist_lg.txt'):
        self.hidden_unit = hidden_unit
        self.train_data = train_data
        self.test_data = test_data
        self.E = E
        self.optimizer = optimizer
        self.loss_fn = loss_fn
        self.metrics = metrics
        self.batch_size = batch_size
        self.epoches = epoches
        self.model = Mnist_LG(hidden_unit)

        self.init_model()
        self.lastest_model = self.get_parameters()
        self.clients = self.setup_clients(lr)
        self.file = filename

    def init_model(self):
        '''
        using call method to initialize net parameters.
        :return:
        '''
        init_feature = tf.cast(self.test_data[0]['x'][:5], dtype=tf.float32)
        _ = self.model(init_feature)

    def get_parameters(self):
        parameter = []
        for variable in self.model.trainable_variables[self.model.global_index:]:
            parameter.append(variable.numpy())
        return parameter

    def setup_clients(self, lr):
        client_list = []
        for i in range(NUM_USER):
            client = Client_LG(i, self.hidden_unit, train_data=self.train_data[i], test_data=self.test_data[i],
                            E=self.E, optimizer=self.optimizer, loss_fn=self.loss_fn, metrics=self.metrics,
                            batch_size=self.batch_size, lr=lr)
            client_list.append(client)

        return client_list

    def broadcast(self):
        for client in self.clients:
            client.update_variables(self.lastest_model)

    def train(self): 
        for epoch in trange(self.epoches):
            self.broadcast() # broadcast to all clients
            client_solution = [client.forward(epoch) for client in self.clients]
            self.lastest_model = self.aggragate(client_solution)

            round_loss, round_metrics, train_loss = self.test()
            with open(self.file, 'a+') as f:
                f.write('At round {}, test loss is: {:.4f}, metrics result is {}, train loss is {}'.format(
                        epoch, round_loss, round_metrics, train_loss))
                f.write('\n')
            logging.info('At round {}, test loss is: {:.4f}, metrics result is {}, train loss is {}'.format(
                        epoch, round_loss, round_metrics, train_loss))

    def aggragate(self, client_solution):
        concate_V = [np.zeros_like(x) for x in self.lastest_model]
        # update lastest_model
        for i, solution in enumerate(client_solution):
            client_variables = solution
            concate_V = [concate_V[j] + client_variables[j].numpy() for j in range(len(concate_V))]
        V = [element / NUM_USER for element in concate_V]
        return V

    def test(self):
        loss = []
        metrics_list = [[] * len(self.metrics)]
        train_loss = []
        for idx, client in enumerate(self.clients):
            client_loss, client_metrics, client_trainloss = client.test()
            loss.append(client_loss)
            train_loss.append(client_trainloss)
            for i in range(len(metrics_list)):
                metrics_list[i].append(client_metrics[i])
        return np.mean(loss), [np.mean(metrics_list[i]) for i in range(len(metrics_list))], np.mean(train_loss)


In [8]:
np.random.seed(12)
tf.random.set_seed(123)

In [9]:
server = Server_LG(hidden_unit=hidden_unit, train_data=train_data['user_data'], test_data=test_data['user_data'], E=3, optimizer=optimizer_class,
loss_fn=loss_fn, metrics=metrics_list, batch_size=20, epoches=200, lr=1e-2)

In [16]:
server.train()

2:35<22:37, 15.09s/it]2020-12-17 04:38:51,699 - <ipython-input-7-a8ee6df97764>[line:60] - INFO: At round 10, test loss is: 0.3808, metrics result is [0.92514956], train loss is 0.3415924906730652
 11%|█         | 11/100 [02:51<22:27, 15.14s/it]2020-12-17 04:39:05,917 - <ipython-input-7-a8ee6df97764>[line:60] - INFO: At round 11, test loss is: 0.3619, metrics result is [0.92676115], train loss is 0.32113248109817505
 12%|█▏        | 12/100 [03:05<21:47, 14.86s/it]2020-12-17 04:39:20,669 - <ipython-input-7-a8ee6df97764>[line:60] - INFO: At round 12, test loss is: 0.3443, metrics result is [0.9302185], train loss is 0.3038184344768524
 13%|█▎        | 13/100 [03:20<21:30, 14.83s/it]2020-12-17 04:39:35,428 - <ipython-input-7-a8ee6df97764>[line:60] - INFO: At round 13, test loss is: 0.3310, metrics result is [0.9307283], train loss is 0.2884969413280487
 14%|█▍        | 14/100 [03:34<21:13, 14.81s/it]2020-12-17 04:39:50,637 - <ipython-input-7-a8ee6df97764>[line:60] - INFO: At round 14, test