In [3]:
import ssl
import os
import sys
from py_func.read_db import get_dataloaders
from py_func.create_model import load_model

# 获取输入参数
dataset = "MNIST_nbal_0.001"           # 数据集
mod = "MNIST_Net"               # 模型
lr = 0.2         # 学习率
decay = 1      # 分组衰减
n_iter = 600       # 总轮次
alpha_coef = 0.01

# 内置默认参数
seed = 0                # 模型种子
n_SGD = 20              # 本地轮次
p = 1.0                 # 选取比例
batch_size = 50         # batch_size
meas_perf_period = 5    # 汇报频率


"""获取超参数"""
# 全局轮次、batch_size、acc汇报频率
# n_iter, batch_size, meas_perf_period = get_hyperparams(dataset, n_SGD)

"""用于保存数据的文件名"""
file_name = (
    f"fedDC_{dataset}_{mod}_lr{lr}_d{decay}_iter{n_iter}_test"
)

print("==========>>> FedDC 超参数 <<<========")
print("  全局轮次: ", n_iter)
print("  本地轮次: ", n_SGD)
print("  batch size: ", batch_size)
print("  汇报频率: ", meas_perf_period)
print("  alpha: ", alpha_coef)
print("  文件名: ", file_name)
print("===============================")


"""获取数据集"""
ssl._create_default_https_context = ssl._create_unverified_context
list_dls_train, list_dls_test, shannon_list = get_dataloaders(
    dataset, batch_size)        # 数据集加载 ！
# 按100个clients划分好了数据集
# shannon_list : list(K,1)，元素为每个客户端所持有数据的香农多样性指数


"""根据数据集划分确定设备个数"""
n_sampled = int(p * len(list_dls_train))
# print("  number fo sampled clients: ", n_sampled)


"""加载初始模型"""
model_0 = load_model(mod, seed)
print(f"模型加载完成：{mod}")

  全局轮次:  600
  本地轮次:  20
  batch size:  50
  汇报频率:  5
  alpha:  0.01
  文件名:  fedDC_MNIST_nbal_0.001_MNIST_Net_lr0.2_d1_iter600_test
./data/MNIST_nbal_0.001_train_100.pkl
./data/MNIST_nbal_0.001_test_100.pkl
MNIST_nonIID数据集加载成功！
MNISTNet
模型加载完成：MNIST_Net


In [8]:
def get_model_para(model, flag=True):
    '''将模型转为参数list'''
    if not flag:
        return model

    list_params = list(model.parameters())
    list_params = [
        tens_param.detach() for tens_param in list_params
    ]

    return list_params

def get_mdl_params(model_list, n_par=None):
    
    if n_par==None:
        exp_mdl = model_list[0]
        n_par = 0
        for name, param in exp_mdl.named_parameters():
            n_par += len(param.data.reshape(-1))
    
    param_mat = np.zeros((len(model_list), n_par)).astype('float32')
    for i, mdl in enumerate(model_list):
        idx = 0
        for name, param in mdl.named_parameters():
            temp = param.data.cpu().numpy().reshape(-1)
            param_mat[i, idx:idx + len(temp)] = temp
            idx += len(temp)
    return np.copy(param_mat)

def fedDC_local_learning(
        model,
        alpha: float,
        optimizer,
        train_data,
        n_SGD: int,
        loss_f,
        lr,
        grad_global_pre,
        grad_local_pre,
        hist_i,
        n_par
    ):

    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

    model.to(device)  # 移动模型到cuda
    model_0 = deepcopy(model)

    state_update_diff = torch.tensor(-grad_local_pre + grad_global_pre,  dtype=torch.float32, device=device)

    global_model_param = torch.tensor(get_mdl_params([model_0],n_par)[0], dtype=torch.float32, device=device)


    for _ in range(n_SGD):

        features, labels = next(iter(train_data))

        features = features.to(device)  # 移动数据到cuda
        labels = labels.to(device)
        # 或者  labels = labels.cuda() if torch.cuda.is_available() else labels

        optimizer.zero_grad()

        predictions = model(features)

        batch_loss = loss_f(predictions, labels)

        local_parameter = None
        for param in model.parameters():
            if not isinstance(local_parameter, torch.Tensor):
                local_parameter = param.reshape(-1)
            else:
                local_parameter = torch.cat((local_parameter, param.reshape(-1)), 0)

        loss_r = alpha/2 * torch.sum((local_parameter - (global_model_param - hist_i))*(local_parameter - (global_model_param - hist_i)))
        loss_g = torch.sum(local_parameter * state_update_diff) / (lr * n_SGD)
        
        batch_loss = batch_loss + loss_r + loss_g

        batch_loss.backward()
        optimizer.step()

def set_param_to_model(model, model_param):
    """ 根据权重进行FedAvg聚合 """
    dict_param = copy.deepcopy(dict(model.named_parameters()))
    idx = 0
    for name, param in model.named_parameters():
        weights = param.data
        length = len(weights.reshape(-1))
        dict_param[name].data.copy_(torch.tensor(model_param[idx:idx+length].reshape(weights.shape)).to(device))
        idx += length
    
    model.load_state_dict(dict_param)    
    return model

In [10]:
from py_func.FedProx import *
import copy

# GPU选择
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

model = model_0
training_sets = list_dls_train
testing_sets = list_dls_test
n_iter = n_iter + 1
metric_period=meas_perf_period

print("========>>> 正在初始化训练")

model.to(device)  # 移动模型到cuda
print(f"模型放入{device}")

# 损失函数
loss_f = loss_classifier

''' ------------------------变量初始化>>>>>>>>>>>>>>>>>>>>>>>> '''
K = len(training_sets)  # clients总数

n_samples = np.array([len(db.dataset) for db in training_sets])     # array(K)，每个client拥有的样本数量

weights = n_samples / np.sum(n_samples)     # array(K)，每个client样本数量所占权重

loss_hist = np.zeros((n_iter + 1, K))       # array(n+1,k),记录n轮、k个设备的loss(全局模型)
acc_hist = np.zeros((n_iter + 1, K))        # array(n+1,k),记录n轮、k个设备的acc(全局模型)
dc_loss_hist = np.zeros((n_iter + 1, K))       # array(n+1,k),记录n轮、k个设备的loss(全局模型)
dc_acc_hist = np.zeros((n_iter + 1, K))        # array(n+1,k),记录n轮、k个设备的acc(全局模型)

server_loss_hist = []       # list(n,1),记录全局模型n轮的loss
server_acc_hist = []        # list(n,1),记录全局模型n轮的acc
dc_server_loss_hist = []       # list(n,1),记录全局模型n轮的loss
dc_server_acc_hist = []        # list(n,1),记录全局模型n轮的acc

# 初始化第0轮设备loss、acc
for k, dl in enumerate(training_sets):
    loss_hist[0, k] = float(loss_dataset(
        model, dl, loss_f).detach())
    acc_hist[0, k] = accuracy_dataset(model, dl)

    dc_loss_hist[0,k] = loss_hist[0, k]
    dc_acc_hist[0,k] = acc_hist[0, k]

# 全局模型的loss和acc
server_loss = np.dot(weights, loss_hist[0])     # 当前轮次全局模型的平均 loss
server_acc = np.dot(weights, acc_hist[0])       # 当前轮次全局模型的平均 acc

# 将初始loss、acc加入记录
server_loss_hist.append(server_loss)
server_acc_hist.append(server_acc)

dc_server_loss_hist.append(server_loss)
dc_server_acc_hist.append(server_acc)

'''********************************************************************'''

n_par = len(get_mdl_params([model])[0])

weight_list = weights * K

# # list(K) ,上一轮的梯度列表, 参数类型
# gradients = get_gradients(model, [model] * K)

# # h_i
# parameter_drifts = copy.deepcopy(gradients)
# 创建一个形状为(n_clnt, n_par)的数组，用于存储每个客户端的模型参数漂移值
parameter_drifts = np.zeros((K, n_par)).astype('float32')

# 获取初始模型的参数列表
init_par_list=get_mdl_params([model], n_par)[0]
# 创建一个形状为(n_clnt, n_par)的数组，用于存储每个客户端的模型参数，并将其初始化为初始模型的参数
clnt_params_list  = np.ones(K).astype('float32').reshape(-1, 1) * init_par_list.reshape(1, -1)

# 创建一个形状为(n_clnt+1, n_par)的数组，用于存储每个客户端和云端的状态梯度差异
state_gadient_diffs = np.zeros((K+1, n_par)).astype('float32') #including cloud state
# state_gadient_diffs = get_gradients(model, [model] * (K+1))

# # 上一轮的总梯度，模型类型
# grad = get_grad(model, model)
# grad_para = get_model_para(grad)

模型放入cuda:0


In [22]:
# 第一轮
global_model_param = get_mdl_params([model], n_par)[0]
        
delta_g_sum = np.zeros(n_par)

clients_params = []     # 当前轮次 所有客户端模型参数（占内存）
clients_models = []     # 当前轮次 所有客户端模型

# 根据轮次选择参与训练的客户 依据 聚合时的权重
clients_list = np.arange(K)
agre_weights = weights

k = 0

local_model = deepcopy(model)
local_optimizer = optim.SGD(local_model.parameters(), lr=lr)

'''********************** FedDC ******************'''
local_update_last = state_gadient_diffs[k]
global_update_last = state_gadient_diffs[-1]/weight_list[k]
alpha = alpha_coef / weight_list[k]
hist_i = torch.tensor(parameter_drifts[k], dtype=torch.float32, device=device) #h_i
'''***********************************************'''

'***********************************************'

In [29]:
# model,
# alpha: float,
# optimizer,
# train_data,
# n_SGD: int,
# loss_f,
# lr,
# grad_global_pre,
# grad_local_pre,
# hist_i,
# n_par          

# local_model,
# alpha,
# local_optimizer,
# training_sets[k],
# n_SGD,
# loss_f,
# lr,
# global_update_last,
# local_update_last,
# hist_i,
# n_par

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

local_model.to(device)  # 移动模型到cuda
model_0 = deepcopy(local_model)

state_update_diff = torch.tensor(-global_update_last + local_update_last,  dtype=torch.float32, device=device)

global_model_param = torch.tensor(get_mdl_params([model_0],n_par)[0], dtype=torch.float32, device=device)


In [33]:
train_data = training_sets[k]
optimizer = local_optimizer

features, labels = next(iter(train_data))

features = features.to(device)  # 移动数据到cuda
labels = labels.to(device)
# 或者  labels = labels.cuda() if torch.cuda.is_available() else labels

optimizer.zero_grad()

predictions = local_model(features)

batch_loss = loss_f(predictions, labels)

local_parameter = None
for param in local_model.parameters():
    if not isinstance(local_parameter, torch.Tensor):
        local_parameter = param.reshape(-1)
    else:
        local_parameter = torch.cat((local_parameter, param.reshape(-1)), 0)

loss_r = alpha/2 * torch.sum((local_parameter - (global_model_param - hist_i))*(local_parameter - (global_model_param - hist_i)))
loss_g = torch.sum(local_parameter * state_update_diff) / (lr * n_SGD)

In [39]:
a = loss_r
print(a)
print(a.shape)

tensor(0., device='cuda:0', grad_fn=<MulBackward0>)
torch.Size([])
