# GPT going federated...

> References and inspiration: [this](https://flower.dev/docs/framework/tutorial-series-what-is-federated-learning.html)

## Classical machine learning
In machine learning, we have a model, and we have data. We train this model using the data to perform some task. In practice, the training data we work with doesn't origincate on the machine we train the model on. It originates from our mobiles when we interact with apps, a car collecting sensor data, a laptop receiving input via the keyboard, or a smart speaker listening to someone trying to sing a song. Once, the data is collected, it is sent to a central server where the model is aggregated.

### Challenges with classical approach
Although this type of learning is useful most cases, but the limitation is also that, it can be only trained if the data is available on a central server.

But the approach cannot be used in anu other cases where the data is de-centralized i.e. the data is not available on just one server and is distributed.

## Federated learning
In centralized machine learning, we move the data to where the computation is and in federated learning, the computation is moved to the data. In other words, the model is trained locally and is trained locally. After which 

In [1]:
!pip install -q 'flwr[simulation]' torch torchvision matplotlib

In [2]:
from collections import OrderedDict
from typing import List, Tuple, Dict, Optional

import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
# import torchvision
# import torchvision.transforms as tfms
# from torchvision.datasets import CIFAR10
from torch.utils.data import Dataset, DataLoader, random_split
import string
import pandas as pd

import flwr as fl
from flwr.common import Metrics
from tqdm import tqdm

DEVICE = torch.device(
    "cuda" if torch.backends.cuda.is_built() 
#     else "mps" if torch.backends.mps.is_built() 
    else "cpu"
)
print(f"Training on {DEVICE} using PyTorch {torch.__version__} and Flwr {fl.__version__}")

Training on cpu using PyTorch 2.1.0 and Flwr 1.5.0


In [35]:
df = pd.read_csv('../data/multilingual/en-fr.csv')
df.head()

Unnamed: 0,en,fr
0,Changing Lives | Changing Society | How It Wor...,Il a transformé notre vie | Il a transformé la...
1,Site map,Plan du site
2,Feedback,Rétroaction
3,Credits,Crédits
4,Français,English


In [36]:
df = df.head(20000)

In [37]:
df.dropna(inplace=True)
df = df.reset_index(drop=True)
# df = df.head(5000)

In [38]:
df['en_len'] = df['en'].apply(lambda x: len(x))
df['fr_len'] = df['fr'].apply(lambda x: len(x))
df.shape

(19999, 4)

In [40]:
len(list(set(' '.join(df['en'].values.tolist() + df['fr'].values.tolist()))))

159

In [39]:
df[['en_len', 'fr_len']].describe()

Unnamed: 0,en_len,fr_len
count,19999.0,19999.0
mean,117.508825,141.033802
std,108.68192,126.543994
min,1.0,1.0
25%,62.0,74.0
50%,104.0,124.0
75%,152.0,184.0
max,5821.0,6159.0


In [41]:
df.to_csv('../data/multilingual/en-fr-mid.csv')

In [7]:
# CLASSES = ("plane", "car", "bird", "cat", "deer", "dog", "frog", "horse", "ship", "truck",)
BATCH_SIZE = 32
NUM_CLIENTS = 3

vocab = list(set(' '.join(df['en'].values.tolist() + df['fr'].values.tolist())))
vocab = ["<s>", "</s>", "<pad>"] + vocab
ix2ch = {ix:ch for ix,ch in enumerate(vocab)}
ch2ix = {ch:ix for ix,ch in ix2ch.items()}
encode = lambda s: [ch2ix[c] for c in s]
decode = lambda l: ''.join([ix2ch[i] for i in l])


class NanoGptDataset(Dataset):
    def __init__(self, texts: List[str]) -> None:
        super().__init__()
        self.texts = texts
    
    def __len__(self) -> int:
        return len(self.texts)

    def __getitem__(self, ix: int):
        text = self.texts[ix]
#         text = ''.join([i if ord(i) < 128 else ' ' for i in text.strip])
        input_ids = [ch2ix['<s>']] + encode(text) # [<s> a b c d   e ]
        input_ids = input_ids[:256]
        output_ids = input_ids[1:] + [ch2ix['</s>']] # [ a  b c d e </s>]
        assert len(input_ids) == len(output_ids), print(input_ids, output_ids, "\n\n======= Something went wrong when encoding the input and outputs ========\n\n")
        return  {
            'input_ids': torch.tensor(input_ids, dtype=torch.long),
            'labels': torch.tensor(output_ids, dtype=torch.long)
        }


def collate_fn(batch):
    max_len = 0
    for b in batch:
        max_len = max(len(b['input_ids']), max_len)
#         print({k:v.shape for k, v in b.items()})
    
    res = None

    for b in batch:
        req_padding = max_len - len(b['input_ids'])
        if res is None:
            if req_padding == 0:
                res = {k:v[None, ...] for k,v in b.items()}
            else:
                res = {
                    'input_ids': torch.hstack([b['input_ids'], torch.tensor([ch2ix['<pad>']]*req_padding, dtype=torch.long)])[None, ...],
                    'labels': torch.hstack([b['labels'],  torch.tensor([ch2ix['<pad>']]*req_padding, dtype=torch.long)])[None, ...]
                }
            continue
        
        if res is not None:
            if req_padding == 0:
                res = {
                    k: torch.cat([res[k], b[k].view(1, max_len)], dim=0) for k,v in res.items()
                }
            else:
                tmp = {
                    'input_ids': torch.hstack([b['input_ids'], torch.tensor([ch2ix['<pad>']]*req_padding, dtype=torch.long)])[None, ...],
                    'labels': torch.hstack([b['labels'], torch.tensor([ch2ix['<pad>']]*req_padding, dtype=torch.long)])[None, ...]
                }
                res = {
                    k: torch.cat([res[k], tmp[k].view(1, max_len)], dim=0) for k,v in res.items()
                }
    return res


def load_datasets(train_texts: List[str], test_texts: List[str], num_clients: int):
    trainset = NanoGptDataset(texts=train_texts)
    testset = NanoGptDataset(texts=test_texts)

    # Split training set into `num_clients` partitions to simulate different local datasets
    partition_size = len(trainset) // num_clients
    lengths = [partition_size] * num_clients
    datasets = random_split(
        trainset, lengths=lengths, generator=torch.Generator().manual_seed(42))
    
    # Split each partition into train/val and create DataLoader
    trainloaders = []
    validloaders = []
    for ds in datasets:
        len_val = len(ds) // 10
        len_train = len(ds) - len_val
        lengths = [len_train, len_val]
        print(lengths)
        ds_train, ds_val = random_split(
            ds, lengths=lengths, generator=torch.Generator().manual_seed(42)
        )
        trainloaders.append(DataLoader(ds_train, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn))
        validloaders.append(DataLoader(ds_val, batch_size=BATCH_SIZE, collate_fn=collate_fn))
    testloader = DataLoader(testset, batch_size=BATCH_SIZE, collate_fn=collate_fn)
    return trainloaders, validloaders, testloader

In [8]:
texts = df['en'].values.tolist() + df['fr'].values.tolist()
len(texts)

9998

In [9]:
import random
random.shuffle(texts)

train_texts = texts[:9000]
test_texts = texts[9000:]

In [10]:
trainloaders, validloaders, testloader = load_datasets(train_texts=train_texts, test_texts=test_texts, num_clients=NUM_CLIENTS)

[2700, 300]
[2700, 300]
[2700, 300]


In [11]:
len(trainloaders), len(validloaders)

(3, 3)

## Step 1: Centralized Training with PyTorch

### Define a model

In [12]:
import math
from dataclasses import dataclass

import torch
import torch.nn as nn
import torch.nn.functional as F


# ---------------------------------- Config ---------------------------------- #
@dataclass
class GPTConfig:
    buffer_size: int = 256
    vocab_size: int =  len(vocab) # 50304  # GPT2 has a total of 50257, padded to nearest multiple of 64 for efficiency
    n_layers: int = 3
    n_head: int = 4
    n_embed: int = 368
    dropout: float = 0.1
    bias: bool = False
    use_sinusoidal: bool = True


# ----------------------------- Attention Module ----------------------------- #
class Attention(nn.Module):
    '''Unlike RNNs where we were required to get one output and then pass it back onto the RNN and repeat the process
    again and again, here with masked attention, we simply find the lower triangular matrix and then weight them according 
    the vector product the lower triangular matrix and the embedding vectors, we  build a masked representation for each word only using 
    the values which occured/was predicted prior to the current index. 
        - This is achieved by the torch.tril function and masking all zeros to -torch.inf (taking softmax makes it equal to zero)
    '''
    def __init__(self, n_embed: int, head_size: int) -> None:
        super().__init__()
        self.Q = nn.Linear(n_embed, head_size, bias=GPTConfig.bias)
        self.K = nn.Linear(n_embed, head_size, bias=GPTConfig.bias)
        self.V = nn.Linear(n_embed, head_size, bias=GPTConfig.bias)
        tril = torch.tril(
            torch.ones(size=(GPTConfig.buffer_size, GPTConfig.buffer_size))
        )
        self.register_buffer("tril", tril)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        _, T, _ = x.shape
        q, k, v = self.Q(x), self.K(x), self.V(x)  # (B, T, C) => (B, T, H)
        wei = (
            q @ k.mT * (1.0 / math.sqrt(k.size(-1)))
        )  # (B, T, H) @ (B, H, T) = (B, T, T)
        wei = wei.masked_fill(self.tril[:T, :T] == 0, float("-inf"))
        wei = F.softmax(wei, dim=-1)
        return wei @ v  # (B, T, T) @ (B, T, H) => (B, T, H)


# --------------------------- Multi Head Attention --------------------------- #
class MultiHeadAttention(nn.Module):
    def __init__(self, n_embed: int, n_heads: int) -> None:
        super().__init__()
        assert (
            n_embed % n_heads == 0
        ), "The number of heads must divide the embedding dimensions"
        head_size = n_embed // n_heads
        self.heads = nn.ModuleList(
            [Attention(n_embed=n_embed, head_size=head_size) for _ in range(n_heads)]
        )
        self.proj = nn.Linear(n_embed, n_embed, bias=GPTConfig.bias)
        self.dropout = nn.Dropout(p=GPTConfig.dropout)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = torch.cat([h(x) for h in self.heads], dim=-1) # (B, T, C) -> (B, T, C//N_HEADS) -> (B, T, C)
        return self.dropout(self.proj(x)) #  (B, T, C) -> (B, T, C)


# ------------------------------- Feed Forward ------------------------------- #
class FeedForward(nn.Module):
    def __init__(self, n_embed: int) -> None:
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_embed, 4 * n_embed), nn.GELU(), nn.Linear(4 * n_embed, n_embed)
        )

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        return self.net(x)


# ------------------------------ Attention Block ----------------------------- #
class AttentionBlock(nn.Module):
    def __init__(self, n_embed: int, n_heads: int) -> None:
        super().__init__()
        self.sa = MultiHeadAttention(n_embed=n_embed, n_heads=n_heads)
        self.ffwd = FeedForward(n_embed=n_embed)
        self.ln1 = nn.LayerNorm((n_embed,))
        self.ln2 = nn.LayerNorm((n_embed,))

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = x + self.sa(self.ln1(x)) # (B, T, C) -> (B, T, C)
        x = x + self.ffwd(self.ln2(x))
        return x


# --------------------------- Positional Embeddings -------------------------- #
class PositionalEncoding(nn.Module):
    def __init__(self, n_embed: int, max_seq_len: int) -> None:
        super().__init__()

        position_id = torch.arange(0, max_seq_len).unsqueeze(1)
        frequencies = torch.arange(0, n_embed, 2, dtype=torch.float32) / n_embed
        frequencies = torch.pow(10000.0, -frequencies)

        positional_encodings = torch.zeros(size=(max_seq_len, n_embed))
        # print(frequencies.shape, position_id.shape, positional_encodings.shape)

        positional_encodings[:, 0::2] = torch.sin(position_id * frequencies)
        positional_encodings[:, 1::2] = torch.cos(position_id * frequencies)

        self.register_buffer("positional_encodings", positional_encodings)

        self.dropout = nn.Dropout(p=GPTConfig.dropout)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        pos_encodings = self.positional_encodings[: x.shape[1]]
        return self.dropout(pos_encodings + x)


# ------------------------------ NanoGPT Module ------------------------------ #
class NanoGPT(nn.Module):
    def __init__(
        self,
        vocab_size: int,
        n_embed: int,
        n_heads: int,
        buffer_size: int,
        n_blocks: int,
    ) -> None:
        super().__init__()
        self.vocab_size = vocab_size
        self.n_embed = n_embed
        self.n_heads = n_heads
        self.buffer_size = buffer_size
        self.n_blocks = n_blocks

        self.token_embeddings = nn.Embedding(
            num_embeddings=vocab_size, embedding_dim=n_embed
        )

        if GPTConfig.use_sinusoidal:
            self.positional_encodings = PositionalEncoding(
                n_embed=n_embed, max_seq_len=GPTConfig.buffer_size
            )
        else:
            self.positional_encodings = nn.Embedding(
                num_embeddings=GPTConfig.buffer_size, embedding_dim=n_embed
            )

        self.blocks = nn.Sequential(
            *[AttentionBlock(n_embed=n_embed, n_heads=n_heads) for _ in range(n_blocks)]
        )
        self.ln = nn.LayerNorm((n_embed,))
        self.lm_head = nn.Sequential(
            nn.Linear(n_embed, n_embed // 2), nn.GELU(), nn.Linear(n_embed//2, vocab_size)
        )

    def forward(
        self, input_ids: torch.Tensor, labels: torch.Tensor = None
    ) -> torch.Tensor:
        B, T = input_ids.shape
        tok_emb = self.token_embeddings(input_ids) # (B, T, C)
        if GPTConfig.use_sinusoidal:
            x = self.positional_encodings.forward(tok_emb) # (B, T, C) -> (B, T, C)
        else:
            x = tok_emb + self.positional_encodings(
                torch.arange(T, dtype=torch.long, device=input_ids.shape)
            ) # (B, T, C) -> (B, T, C)
        x = self.blocks(x)
        x = self.ln(x)
        x = self.lm_head(x)

        loss = None
        if labels is not None:
            B, T, C = x.shape
            loss = F.cross_entropy(x.view(B * T, C), labels.view(B * T))

        return x, loss

    def generate(
        self, ids: torch.Tensor, max_len: int, temperature: float = 0.7
    ) -> int:
        for _ in range(max_len):
            ids_cond = ids[:, -GPTConfig.buffer_size :]
            logits, _ = self.forward(input_ids=ids_cond, labels=None)
            logit = logits[:, -1, :]
            probs = F.softmax(logit, dim=-1)
            val, idx = torch.topk(probs, k=int(probs.shape[1] * temperature), dim=-1)
            # print(val[0][0])
            idx_next = torch.multinomial(val, num_samples=1)
            idx_next = idx[:, idx_next][0]
            ids = torch.cat([ids, idx_next], dim=-1)
            if idx_next == 0:
                break
        return ids


if __name__ == "__main__":
    model = NanoGPT(
        vocab_size=GPTConfig.vocab_size,
        n_embed=GPTConfig.n_embed,
        n_heads=GPTConfig.n_head,
        buffer_size=GPTConfig.buffer_size,
        n_blocks=GPTConfig.n_layers,
    )

#     print(model)

In [13]:
def train_fn(net: NanoGPT, trainloader: torch.utils.data.DataLoader, epochs: int):
    loss_fct = nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(net.parameters(), lr=3e-4)
    net.train()
    
    for epoch in range(epochs):
        print('#'*15)
        print(f'### Epoch {epoch+1}/{epochs}')
        print('#'*15)
        
        pbar = tqdm(trainloader, total=len(trainloader), desc='(train)')
        correct, total, running_loss = 0, 0, 0.0
        
        for batch in pbar:
            X, y = batch['input_ids'], batch['labels']
            X, y = X.to(DEVICE), y.to(DEVICE)
            
            optimizer.zero_grad()
            yHat, loss = net.forward(input_ids=X, labels=y)
            loss.backward()
            optimizer.step()
            
            running_loss += loss.item()
            total += y.size(0)
#             correct += (torch.argmax(yHat.data, dim=1) == y).sum().item()
            
            epoch_loss = running_loss / total
            epoch_accuracy = correct / total
            
            pbar.set_postfix({
                'loss': f'{epoch_loss:.4f}',
#                 'accuracy': f'{epoch_accuracy*100:.4f}%'
            })
        
        print(f'Epoch {epoch+1}: train loss {epoch_loss}, accuracy {epoch_accuracy}')


@torch.no_grad()
def eval_fn(net: NanoGPT, testloader: torch.utils.data.DataLoader):
    loss_fct = nn.CrossEntropyLoss()
    correct, total, running_loss = 0., 0., 0.
    net.eval()
    
    pbar = tqdm(testloader, total=len(testloader), desc='(eval)')
    
    for batch in pbar:
        X, y = batch['input_ids'], batch['labels']
        X, y = X.to(DEVICE), y.to(DEVICE)
        
        yHat, loss = net.forward(input_ids=X, labels=y)
            
        running_loss += loss.item()
        total += y.size(0)
#         correct += (torch.argmax(yHat.data, dim=1) == y).sum().item()
            
        epoch_loss = running_loss / total
#         epoch_accuracy = correct / total
        
        pbar.set_postfix({
            'loss': f'{epoch_loss:.4f}',
#             'accuracy': f'{epoch_accuracy*100:.4f}%'
        })
        
    return epoch_loss#, epoch_accuracy

### Training the model

In [14]:
trainloader = trainloaders[0]
valloader = validloaders[0]
net = NanoGPT(
    vocab_size=GPTConfig.vocab_size,
    n_embed=GPTConfig.n_embed,
    n_heads=GPTConfig.n_head,
    buffer_size=GPTConfig.buffer_size,
    n_blocks=GPTConfig.n_layers,
).to(DEVICE)

for epoch in range(5):
    train_fn(net, trainloader, 1)
    loss = eval_fn(net, valloader)
    print(f"Epoch {epoch+1}: validation loss {loss}")

loss = eval_fn(net, testloader)
print(f"Final test set performance:\n\tloss {loss}")

###############
### Epoch 1/1
###############


(train): 100%|█████████████████████| 85/85 [01:25<00:00,  1.00s/it, loss=0.0418]


Epoch 1: train loss 0.041758292979664276, accuracy 0.0


(eval): 100%|██████████████████████| 10/10 [00:03<00:00,  3.08it/s, loss=0.0355]


Epoch 1: validation loss 0.035519646803538
###############
### Epoch 1/1
###############


(train): 100%|█████████████████████| 85/85 [01:26<00:00,  1.02s/it, loss=0.0298]


Epoch 1: train loss 0.029788720629833364, accuracy 0.0


(eval): 100%|██████████████████████| 10/10 [00:03<00:00,  3.05it/s, loss=0.0328]


Epoch 2: validation loss 0.032781286438306176
###############
### Epoch 1/1
###############


(train): 100%|█████████████████████| 85/85 [01:24<00:00,  1.00it/s, loss=0.0290]


Epoch 1: train loss 0.0289840034643809, accuracy 0.0


(eval): 100%|██████████████████████| 10/10 [00:03<00:00,  3.05it/s, loss=0.0318]


Epoch 3: validation loss 0.03176670253276825
###############
### Epoch 1/1
###############


(train): 100%|█████████████████████| 85/85 [01:24<00:00,  1.00it/s, loss=0.0285]


Epoch 1: train loss 0.028488549060291714, accuracy 0.0


(eval): 100%|██████████████████████| 10/10 [00:03<00:00,  2.99it/s, loss=0.0311]


Epoch 4: validation loss 0.031093727350234985
###############
### Epoch 1/1
###############


(train): 100%|█████████████████████| 85/85 [01:25<00:00,  1.01s/it, loss=0.0273]


Epoch 1: train loss 0.027287881793799224, accuracy 0.0


(eval): 100%|██████████████████████| 10/10 [00:03<00:00,  3.05it/s, loss=0.0303]


Epoch 5: validation loss 0.030271690289179483


(eval): 100%|██████████████████████| 32/32 [00:10<00:00,  2.98it/s, loss=0.0279]

Final test set performance:
	loss 0.0278633141923763





## Step 2: Federated Learning

### Updating model parameters

In [16]:
def get_parameters(net: NanoGPT) -> List[np.array]:
    return [val.cpu().numpy() for _, val in net.state_dict().items()]

def set_parameters(net: NanoGPT, parameters: List[np.ndarray]):
    params_dict = zip(net.state_dict().keys(), parameters)
    state_dict = OrderedDict({k: torch.tensor(v) for k,v in params_dict})
    net.load_state_dict(state_dict, strict=True)

### Client

In [23]:
class FlowerClient(fl.client.NumPyClient):
    def __init__(self, cid: str, net: NanoGPT, trainloader: torch.utils.data.DataLoader, validloader: torch.utils.data.DataLoader) -> None:
        self.cid = cid
        self.net = net
        self.trainloader = trainloader
        self.validloader = validloader
    
    def get_parameters(self, config):
        print(f"[Client {self.cid}] get_parameters")
        return get_parameters(self.net)
    
    def fit(self, parameters, config):
        print("Fitting Client...")
        server_round = config["server_round"]
        local_epochs = config["local_epochs"]
        
        print(f"[Client {self.cid}, round {server_round}] fit, config: {config}")
        set_parameters(self.net, parameters)
        train_fn(self.net, self.trainloader, epochs=1)
        return get_parameters(self.net), len(self.trainloader), {}
    
    def evaluate(self, parameters, config):
        print(f"[Client {serlf.cid}] evaluate: config: {config}")
        set_parameters(self.net, parameters)
        loss, accuracy = eval_fn(self.net, self.validloader)
        return float(loss), len(self.validloader), {"accuracy": float(accuracy)}

### Virtual Client Engine

In [24]:
def client_fn(cid: str) -> FlowerClient:
    net = NanoGPT(
        vocab_size=GPTConfig.vocab_size,
        n_embed=GPTConfig.n_embed,
        n_heads=GPTConfig.n_head,
        buffer_size=GPTConfig.buffer_size,
        n_blocks=GPTConfig.n_layers,
    ).to(DEVICE)
    trainloader = trainloaders[int(cid)]
    validloader = validloaders[int(cid)]
    return FlowerClient(cid=cid, net=net, trainloader=trainloader, validloader=validloader)

## Starting training

### Server Side Evaluation

In [25]:
def evaluate_fn(
    server_round: int, 
    parameters: fl.common.NDArrays,
    config: Dict[str, fl.common.Scalar]
) -> Optional[Tuple[float, Dict[str, fl.common.Scalar]]]:
    net = NanoGPT(
        vocab_size=GPTConfig.vocab_size,
        n_embed=GPTConfig.n_embed,
        n_heads=GPTConfig.n_head,
        buffer_size=GPTConfig.buffer_size,
        n_blocks=GPTConfig.n_layers,
    ).to(DEVICE)
    set_parameters(net, parameters)
    loss = eval_fn(net, testloader=testloader)
    print(f"Server-side evaluation loss {loss}")
    # del net
    return loss, {"loss": loss}

To send config dictionary from server to clients:

In [26]:
def fit_config_fn(server_round: int):
    """Return training config dict for each round.
    
    Perform two rounds of training with one local epoch, increase two local
    epochs afterwards
    """
    config = {
        "server_round": server_round,
        "local_epochs": 1 if server_round < 2 else 2
    }
    return config

In [29]:
strategy = fl.server.strategy.FedAvg(
    fraction_fit=1.0,              # Sample 100% of the available clients for training
    fraction_evaluate=0.5,         # Sample 50% of available clients for eval
    min_fit_clients=NUM_CLIENTS,            # Never sample less than 10 clients for training
    min_evaluate_clients=5,        # Never sample less than 5 clients for eval
    min_available_clients=NUM_CLIENTS,      # Wait until all 10 clients are available 
    evaluate_fn=evaluate_fn,       # Evaluate function after every round
    on_fit_config_fn=fit_config_fn # Fit Config
)

client_resources = None
if DEVICE.type == "cuda": 
    client_resources = {"num_gpus": 1}

fl.simulation.start_simulation(
    client_fn=client_fn, 
    num_clients=NUM_CLIENTS,
    config=fl.server.ServerConfig(num_rounds=5),
    strategy=strategy,
    client_resources=client_resources
)

Setting `min_available_clients` lower than `min_fit_clients` or
`min_evaluate_clients` can cause the server to fail when there are too few clients
connected to the server. `min_available_clients` must be set to a value larger
than or equal to the values of `min_fit_clients` and `min_evaluate_clients`.

INFO flwr 2023-11-08 19:49:41,771 | app.py:175 | Starting Flower simulation, config: ServerConfig(num_rounds=5, round_timeout=None)
2023-11-08 19:49:45,409	INFO worker.py:1621 -- Started a local Ray instance.
INFO flwr 2023-11-08 19:49:46,890 | app.py:210 | Flower VCE: Ray initialized with resources: {'memory': 6624378880.0, 'CPU': 8.0, 'node:127.0.0.1': 1.0, 'object_store_memory': 2147483648.0, 'node:__internal_head__': 1.0}
INFO flwr 2023-11-08 19:49:46,891 | app.py:218 | No `client_resources` specified. Using minimal resources for clients.
INFO flwr 2023-11-08 19:49:46,891 | app.py:224 | Flower VCE: Resources for each Virtual Client: {'num_cpus': 1, 'num_gpus': 0.0}
INFO flwr 2023-11-

[2m[36m(DefaultActor pid=49638)[0m [Client 2] get_parameters


(eval): 100%|██████████████████████| 32/32 [00:10<00:00,  3.10it/s, loss=0.1563]
INFO flwr 2023-11-08 19:50:00,342 | server.py:94 | initial parameters (loss, other metrics): 0.1562864326522919, {'loss': 0.1562864326522919}
INFO flwr 2023-11-08 19:50:00,343 | server.py:104 | FL starting
DEBUG flwr 2023-11-08 19:50:00,343 | server.py:222 | fit_round 1: strategy sampled 3 clients (out of 3)


Server-side evaluation loss 0.1562864326522919
[2m[36m(DefaultActor pid=49638)[0m Fitting Client...
[2m[36m(DefaultActor pid=49638)[0m [Client 2, round 1] fit, config: {'server_round': 1, 'local_epochs': 1}
[2m[36m(DefaultActor pid=49638)[0m ###############
[2m[36m(DefaultActor pid=49638)[0m ### Epoch 1/1
[2m[36m(DefaultActor pid=49638)[0m ###############


(train):   0%|          | 0/85 [00:00<?, ?it/s]
(train):   1%|          | 1/85 [00:02<03:34,  2.56s/it, loss=0.1523]
(train):   2%|▏         | 2/85 [00:05<03:43,  2.69s/it, loss=0.1371]
(train):   0%|          | 0/85 [00:00<?, ?it/s][32m [repeated 2x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/master/ray-observability/ray-logging.html#log-deduplication for more options.)[0m
(train):   1%|          | 1/85 [00:02<04:00,  2.86s/it, loss=0.1524][32m [repeated 2x across cluster][0m
  File "python/ray/_raylet.pyx", line 1684, in ray._raylet.execute_task_with_cancellation_handler
  File "python/ray/_raylet.pyx", line 1366, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 1367, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 1418, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 1424, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 13

ERROR flwr 2023-11-08 19:50:09,942 | ray_client_proxy.py:147 | Traceback (most recent call last):
  File "/Users/aneeshaparajit/anaconda3/lib/python3.11/site-packages/flwr/simulation/ray_transport/ray_client_proxy.py", line 140, in _submit_job
    res = self.actor_pool.get_client_result(self.cid, timeout)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/aneeshaparajit/anaconda3/lib/python3.11/site-packages/flwr/simulation/ray_transport/ray_actor.py", line 402, in get_client_result
    return self._fetch_future_result(cid)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/aneeshaparajit/anaconda3/lib/python3.11/site-packages/flwr/simulation/ray_transport/ray_actor.py", line 295, in _fetch_future_result
    raise ex
  File "/Users/aneeshaparajit/anaconda3/lib/python3.11/site-packages/flwr/simulation/ray_transport/ray_actor.py", line 288, in _fetch_future_result
    res_cid, res = ray.get(future)  # type: (str, ClientRes)
                   ^^^^^^^^^^^^

  File "python/ray/_raylet.pyx", line 1684, in ray._raylet.execute_task_with_cancellation_handler
  File "python/ray/_raylet.pyx", line 1366, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 1367, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 1418, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 1424, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 1364, in ray._raylet.execute_task.function_executor
  File "/Users/aneeshaparajit/anaconda3/lib/python3.11/site-packages/ray/_private/function_manager.py", line 726, in actor_method_executor
    return method(__ray_actor, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/aneeshaparajit/anaconda3/lib/python3.11/site-packages/ray/util/tracing/tracing_helper.py", line 464, in _resume_span
    return method(self, *_args, **_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/aneeshaparajit/anaconda3/lib/python3.11/site-packages/fl

ERROR flwr 2023-11-08 19:50:10,287 | ray_client_proxy.py:147 | Traceback (most recent call last):
  File "/Users/aneeshaparajit/anaconda3/lib/python3.11/site-packages/flwr/simulation/ray_transport/ray_client_proxy.py", line 140, in _submit_job
    res = self.actor_pool.get_client_result(self.cid, timeout)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/aneeshaparajit/anaconda3/lib/python3.11/site-packages/flwr/simulation/ray_transport/ray_actor.py", line 402, in get_client_result
    return self._fetch_future_result(cid)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/aneeshaparajit/anaconda3/lib/python3.11/site-packages/flwr/simulation/ray_transport/ray_actor.py", line 295, in _fetch_future_result
    raise ex
  File "/Users/aneeshaparajit/anaconda3/lib/python3.11/site-packages/flwr/simulation/ray_transport/ray_actor.py", line 288, in _fetch_future_result
    res_cid, res = ray.get(future)  # type: (str, ClientRes)
                   ^^^^^^^^^^^^

 Traceback (most recent call last):
  File "python/ray/_raylet.pyx", line 1684, in ray._raylet.execute_task_with_cancellation_handler
  File "python/ray/_raylet.pyx", line 1366, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 1367, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 1418, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 1424, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 1364, in ray._raylet.execute_task.function_executor
  File "/Users/aneeshaparajit/anaconda3/lib/python3.11/site-packages/ray/_private/function_manager.py", line 726, in actor_method_executor
    return method(__ray_actor, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/aneeshaparajit/anaconda3/lib/python3.11/site-packages/ray/util/tracing/tracing_helper.py", line 464, in _resume_span
    return method(self, *_args, **_kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/aneeshaparajit/anaco

ERROR flwr 2023-11-08 19:50:11,199 | ray_client_proxy.py:148 | The actor died unexpectedly before finishing this task.
	class_name: DefaultActor
	actor_id: 065aa7ac61adedc5e7736b8001000000
	pid: 49638
	namespace: 650750fc-3091-42de-ad05-b114ccd987ae
	ip: 127.0.0.1
The actor is dead because its worker process has died. Worker exit type: SYSTEM_ERROR Worker exit detail: Worker exits unexpectedly. Worker exits with an exit code None.
 Traceback (most recent call last):
  File "python/ray/_raylet.pyx", line 1684, in ray._raylet.execute_task_with_cancellation_handler
  File "python/ray/_raylet.pyx", line 1366, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 1367, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 1418, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 1424, in ray._raylet.execute_task
  File "python/ray/_raylet.pyx", line 1364, in ray._raylet.execute_task.function_executor
  File "/Users/aneeshaparajit/anaconda3/lib/python3.1

KeyboardInterrupt: 