In [1]:
!pip install -q flwr[simulation] torch torchvision matplotlib

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.3/139.3 KB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m57.4/57.4 MB[0m [31m12.1 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m8.7/8.7 MB[0m [31m86.7 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m90.5/90.5 KB[0m [31m8.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m201.4/201.4 KB[0m [31m20.1 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m128.2/128.2 KB[0m [31m14.4 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.0/3.0 MB[0m [31m82.2 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m57.1/57.1 KB[0m [31m5.0 MB/s[0m 

In [2]:
from collections import OrderedDict
from typing import List, Tuple

import flwr as fl
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torchvision
import torch.nn.functional as F
import torchvision.transforms as transforms
from flwr.common import Metrics
from torch.utils.data import DataLoader, random_split
from torchvision.datasets import CIFAR10

DEVICE = torch.device("cpu")  # Try "cuda" to train on GPU
print(f"Training on {DEVICE} using PyTorch {torch.__version__} and Flower {fl.__version__}")

Training on cpu using PyTorch 1.13.1+cu116 and Flower 1.3.0


## Data Preparation


In [3]:
# parameters
time_step = 48
BATCH_SIZE = 32

In [4]:
data = []
with open ('House_30.txt', 'r') as reader:
  for line in reader:
    stripped_line = line.strip().split()
    data.append(stripped_line)

tem = [x[0] for x in data]
houses = list(set(tem))

date = []
consumption = []
for i in houses:
  date.append([float(x[1]) for x in data if x[0]==i])
  consumption.append([float(x[2]) for x in data if x[0]==i])    

In [5]:
def create_label(data, time_step):
  x_nest, y_nest = [], []
  for j in range(len(data)):
    x_data, y_data = [], []
    for i in range(len(data[j]) - time_step):
      x = data[j][i: (i + time_step)]
      x_data.append(x)
      y = [data[j][i + time_step]]
      y_data.append(y)

    #x_data = np.array(x_data)[:, :, np.newaxis]
    x_data = np.array(x_data)[:, :]
    #x_data = np.array(x_data)[:, np.newaxis, :]
    x_nest.append(x_data)
    y_nest.append(y_data)
  x_nest = np.array(x_nest)
  y_nest = np.array(y_nest)
  return x_nest, y_nest
# 可能要去掉x的最后一个维度 从（48，1）变为（48）

In [6]:
input, labels = create_label(consumption, time_step)


In [7]:
input = np.float32(input)
labels = np.float32(labels)


### Create Dataset

In [8]:
# 定义GetLoader类，继承Dataset方法，并重写__getitem__()和__len__()方法
class GetLoader(torch.utils.data.Dataset):
	# 初始化函数，得到数据
    def __init__(self, data_root, data_label):
        self.data = data_root
        self.label = data_label
    # index是根据batchsize划分数据后得到的索引，最后将data和对应的labels进行一起返回
    def __getitem__(self, index):
        data = self.data[index]
        labels = self.label[index]
        return data, labels
    # 该函数返回数据大小长度，目的是DataLoader方便划分，如果不知道大小，DataLoader会一脸懵逼
    def __len__(self):
        return len(self.data)


In [9]:
length = len(input[0])
val = int(0.7*length)
test = int(0.9*length)
trainloaders = []
valloaders = []
testloaders = []

def load_datasets(input, labels):

  Xtrain_raw = [x[0: val] for x in input]
  Xval_raw = [x[val: test] for x in input]
  Xtest_raw = [x[test: ] for x in input]

  Ytrain_raw = [x[0: val] for x in labels]
  Yval_raw = [x[val: test] for x in labels]
  Ytest_raw = [x[test: ] for x in labels]

  for i in range(30):
    ds_train = GetLoader(Xtrain_raw[i], Ytrain_raw[i])
    trainloaders.append(DataLoader(ds_train, batch_size=BATCH_SIZE, shuffle=True, drop_last=True))
    ds_val = GetLoader(Xval_raw[i], Yval_raw[i])
    valloaders.append(DataLoader(ds_val, batch_size=BATCH_SIZE, drop_last=True))
    ds_test= GetLoader(Xtest_raw[i], Ytest_raw[i])
    testloaders.append(DataLoader(ds_test, batch_size=BATCH_SIZE, drop_last=True))

  return trainloaders, valloaders, testloaders




In [10]:
trainloaders, valloaders, testloaders = load_datasets(input, labels)

## Create Model

In [11]:
import torch.nn as nn


In [12]:
class CasualDilatedConv1D(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, dilation, padding=1):
        super().__init__()
        self.conv1D = nn.Conv1d(in_channels, out_channels, kernel_size, dilation=dilation, bias=False, padding='same')
        self.ignoreOutIndex = (kernel_size - 1) * dilation

    def forward(self, x):
        return self.conv1D(x)[..., :-self.ignoreOutIndex]


class DenseLayer(nn.Module):
    def __init__(self, in_channels):
        super().__init__()
        self.relu = nn.ReLU()
        self.softmax = nn.Softmax(dim=1) # dim=2
        self.conv1d = nn.Conv1d(in_channels, in_channels, kernel_size=1, bias=False)

    def forward(self, skipConnection):
        # as b c outputsize -> skipConnection size
        out = torch.mean(skipConnection, dim=0)

        for i in range(2):
            out = self.relu(out)
            out = self.conv1d(out)
        return self.softmax(out)


class ResBlock(nn.Module):
    def __init__(self, res_channels, skip_channels, kernel_size, dilation):
        super().__init__()
        self.casualDilatedConv1D = CasualDilatedConv1D(res_channels, res_channels, kernel_size, dilation=dilation)
        self.resConv1D = nn.Conv1d(res_channels, res_channels, kernel_size=1)
        self.skipConv1D = nn.Conv1d(res_channels, skip_channels, kernel_size=1)
        self.tanh = nn.Tanh()
        self.sigmoid = nn.Sigmoid()

    def forward(self, inputX, skipSize):
        x = self.casualDilatedConv1D(inputX)
        x1 = self.tanh(x)
        x2 = self.sigmoid(x)
        x = x1 * x2
        resOutput = self.resConv1D(x)
        resOutput = resOutput + inputX[..., -resOutput.size(1):] # resOutput.size(2)
        skipOutput = self.skipConv1D(x)
        skipOutput = skipOutput[..., -skipSize:]
        return resOutput, skipOutput


class StackOfResBlocks(nn.Module):

    def __init__(self, stack_size, layer_size, res_channels, skip_channels, kernel_size):
        super().__init__()
        buildDilationFunc = np.vectorize(self.buildDilation)
        dilations = buildDilationFunc(stack_size, layer_size)
        self.resBlocks = []
        for s,dilationPerStack in enumerate(dilations):
            for l,dilation in enumerate(dilationPerStack):
                resBlock=ResBlock(res_channels, skip_channels, kernel_size, dilation)
                self.add_module(f'resBlock_{s}_{l}', resBlock) # Add modules manually
                self.resBlocks.append(resBlock)

    def buildDilation(self, stack_size, layer_size):
        # stack1=[1,2,4,8,16,...512]
        dilationsForAllStacks = []
        for stack in range(stack_size):
            dilations = []
            for layer in range(layer_size):
                dilations.append(2 ** layer)
            dilationsForAllStacks.append(dilations)
        return dilationsForAllStacks

    def forward(self, x, skipSize):
        resOutput = x
        skipOutputs = []
        for resBlock in self.resBlocks:
            resOutput, skipOutput = resBlock(resOutput, skipSize)
            skipOutputs.append(skipOutput)
        return resOutput, torch.stack(skipOutputs)


class WaveNet(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, stack_size, layer_size):
        super().__init__()
        self.stack_size = stack_size
        self.layer_size = layer_size
        self.kernel_size = kernel_size
        self.casualConv1D = CasualDilatedConv1D(in_channels, in_channels, kernel_size, dilation=1)
        self.stackResBlock = StackOfResBlocks(self.stack_size, self.layer_size, in_channels, out_channels, kernel_size)
        self.denseLayer = DenseLayer(out_channels)


    def calculateReceptiveField(self):
        return np.sum([(self.kernel_size - 1) * (2 ** l) for l in range(self.layer_size)] * self.stack_size)

    def calculateOutputSize(self, x):
        return int(x.size(1)) - self.calculateReceptiveField() # x.size(2)

    def forward(self, x):
        # x: b c t -> input data size
        x = self.casualConv1D(x)
        skipSize = self.calculateOutputSize(x)
        _, skipConnections = self.stackResBlock(x, skipSize)
        dense=self.denseLayer(skipConnections)
        return dense
    
class WaveNetPre(nn.Module):
    def __init__(self,seqLen,output_size):
        super().__init__()
        self.output_size=output_size
        self.wavenet=WaveNet(32,32,2,3,4) # 1 1 2 3 4
      #  self.liner=nn.Linear(2,output_size) # seqLen-self.wavenet.calculateReceptiveField()
        self.softmax=nn.Softmax(-1)
    
    def forward(self,x):
        x=self.wavenet(x)
      #  x=self.liner(x)
        return self.softmax(x)

In [13]:
def train(net, trainloader, epochs: int, verbose=False):
    """Train the network on the training set."""
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(net.parameters())
    net.train()
    for epoch in range(epochs):
        correct, total, epoch_loss = 0, 0, 0.0
        for x, y in trainloader:
            
            x, y = x.to(DEVICE), y.to(DEVICE)
            optimizer.zero_grad()

            #x = x.unsqueeze(0)
            outputs = net(x)
            loss = criterion(net(x), y)
            loss.backward()
            optimizer.step()
            epoch_loss += loss
           
        epoch_loss /= len(trainloader.dataset)
        if verbose:
            print(f"Epoch {epoch+1}: train loss {epoch_loss}") 


def test(net, testloader):
    """Evaluate the network on the entire test set."""
    criterion = nn.MSELoss()
    correct, total, loss = 0, 0, 0.0
    net.eval()
    with torch.no_grad():
        for x, y in testloader:
            x, y = x.to(DEVICE), y.to(DEVICE)
            outputs = net(x)
            loss += criterion(outputs, y).item()

    loss /= len(testloader.dataset)
   
    return loss 

### central test

In [14]:
x, y = next(iter(trainloaders[0]))

In [15]:
x.unsqueeze(0).shape

torch.Size([1, 32, 48])

In [16]:
trainloader = trainloaders[0]
valloader = valloaders[0]
testloader = testloaders[0]

net = WaveNetPre(48,1).to(DEVICE)
#net = WaveNet().to(DEVICE)
#net = CNN().to(DEVICE)

for epoch in range(5):
    train(net, trainloader, 1)
    loss = test(net, valloader)
    print(f"Epoch {epoch+1}: validation loss {loss}")

loss = test(net, testloader)
print(f"Final test set performance:\n\tloss {loss}")



  return F.conv1d(input, weight, bias, self.stride,
  return F.mse_loss(input, target, reduction=self.reduction)


Epoch 1: validation loss 0.005170582816863543
Epoch 2: validation loss 0.005170582798004986
Epoch 3: validation loss 0.005170582780597088
Epoch 4: validation loss 0.00517058281106091
Epoch 5: validation loss 0.005170582816863543
Final test set performance:
	loss 0.005210138094318433


## FL

In [17]:
def get_parameters(net) -> List[np.ndarray]:
    return [val.cpu().numpy() for _, val in net.state_dict().items()]

def set_parameters(net, parameters: List[np.ndarray]):
    params_dict = zip(net.state_dict().keys(), parameters)
    state_dict = OrderedDict({k: torch.Tensor(v) for k, v in params_dict})
    net.load_state_dict(state_dict, strict=True)



In [18]:
class FlowerClient(fl.client.NumPyClient):
    def __init__(self, net, trainloader, valloader):
        self.net = net
        self.trainloader = trainloader
        self.valloader = valloader

    def get_parameters(self, config):
        return get_parameters(self.net)

    def fit(self, parameters, config):
        set_parameters(self.net, parameters)
        train(self.net, self.trainloader, epochs=1)
        return get_parameters(self.net), len(self.trainloader), {}

    def evaluate(self, parameters, config):
        set_parameters(self.net, parameters)
        loss = test(self.net, self.valloader)
        return float(loss), len(self.valloader), {"accuracy": float(0)}



In [20]:
def client_fn(cid: str) -> FlowerClient:
    """Create a Flower client representing a single organization."""

    # Load model
    net = WaveNetPre(48,1).to(DEVICE)



    trainloader = trainloaders[int(cid)]
    valloader = valloaders[int(cid)]

    # Create a  single Flower client representing a single organization
    return FlowerClient(net, trainloader, valloader)

In [21]:
NUM_CLIENTS = 30

In [22]:
# Create FedAvg strategy
strategy = fl.server.strategy.FedAvg(
        fraction_fit=1.0,  # Sample 100% of available clients for training
        fraction_evaluate=0.3,  # Sample 50% of available clients for evaluation
        min_fit_clients=10,  # Never sample less than 10 clients for training
        min_evaluate_clients=5,  # Never sample less than 5 clients for evaluation
        min_available_clients=10,  # Wait until all 10 clients are available
)

# Specify client resources if you need GPU (defaults to 1 CPU and 0 GPU)
client_resources = None
if DEVICE.type == "cuda":
  client_resources = {"num_gpus": 1}

# Start simulation
fl.simulation.start_simulation(
    client_fn=client_fn,
    num_clients=NUM_CLIENTS,
    config=fl.server.ServerConfig(num_rounds=5), #10
    strategy=strategy,
    client_resources=client_resources,
)

INFO flwr 2023-03-01 08:34:16,942 | app.py:145 | Starting Flower simulation, config: ServerConfig(num_rounds=5, round_timeout=None)
INFO:flwr:Starting Flower simulation, config: ServerConfig(num_rounds=5, round_timeout=None)
2023-03-01 08:34:18,842	INFO worker.py:1529 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m
INFO flwr 2023-03-01 08:34:20,624 | app.py:179 | Flower VCE: Ray initialized with resources: {'node:172.28.0.12': 1.0, 'CPU': 2.0, 'memory': 7911577191.0, 'object_store_memory': 3955788595.0}
INFO:flwr:Flower VCE: Ray initialized with resources: {'node:172.28.0.12': 1.0, 'CPU': 2.0, 'memory': 7911577191.0, 'object_store_memory': 3955788595.0}
INFO flwr 2023-03-01 08:34:20,639 | server.py:86 | Initializing global parameters
INFO:flwr:Initializing global parameters
INFO flwr 2023-03-01 08:34:20,651 | server.py:270 | Requesting initial parameters from one random client
INFO:flwr:Requesting initial parameters from one random clien

History (loss, distributed):
	round 1: 0.004836346873192792
	round 2: 0.004782465589256921
	round 3: 0.0046594065878225645
	round 4: 0.004905144954322685
	round 5: 0.004861670817345556