# Federated Learning using Hugging Face and Flower

This tutorial will show how to leverage Hugging Face to federate the training of language models over multiple clients using [Flower](https://flower.dev/). More specifically, we will fine-tune a pre-trained Transformer model (alBERT) for sequence classification over a dataset of IMDB ratings. The end goal is to detect if a movie rating is positive or negative.


## Dependencies

For this tutorial we will need `datasets`, `flwr['simulation']`(here we use the extra 'simulation' dependencies from Flower as we will simulated the federated setting inside Google Colab), `torch`, and `transformers`.

In [None]:
!pip install datasets evaluate flwr["simulation"] torch transformers scikit-learn

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting datasets
  Downloading datasets-2.11.0-py3-none-any.whl (468 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m468.7/468.7 kB[0m [31m11.4 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting evaluate
  Downloading evaluate-0.4.0-py3-none-any.whl (81 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m81.4/81.4 kB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting flwr[simulation]
  Downloading flwr-1.4.0-py3-none-any.whl (157 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m157.2/157.2 kB[0m [31m11.5 MB/s[0m eta [36m0:00:00[0m
Collecting transformers
  Downloading transformers-4.28.1-py3-none-any.whl (7.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m7.0/7.0 MB[0m [31m35.0 MB/s[0m eta [36m0:00:00[0m
Collecting dill<0.3.7,>=0.3.0
  Downloading dill-0.3.6-py3-none-any.whl (110 kB)
[2K     [90m

We can now import the relevant modules.

In [None]:
!nvidia-smi

/bin/bash: nvidia-smi: command not found


In [None]:
from collections import OrderedDict
import os
import random
import warnings

import flwr as fl
import torch

from torch.utils.data import DataLoader

from datasets import load_dataset
from evaluate import load as load_metric
import evaluate

from transformers import AutoTokenizer, DataCollatorWithPadding
from transformers import AutoModelForSequenceClassification
from transformers import AdamW
from transformers import logging

Next we will set some global variables and disable some of the logging to clear out our output.

In [None]:
warnings.filterwarnings("ignore", category=UserWarning)
warnings.filterwarnings("ignore", category=DeprecationWarning)
logging.set_verbosity(logging.ERROR)
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'
warnings.simplefilter('ignore')

DEVICE = torch.device("cuda")
CHECKPOINT = "albert-base-v2"  # transformer model checkpoint
NUM_CLIENTS = 2
NUM_ROUNDS = 3

In [None]:
raw_dataset = load_dataset("hate_speech18")['train']
raw_dataset = raw_dataset.filter(lambda x: x['label'] <= 1)

raw_dataset = raw_dataset.sort("label")
raw_dataset = raw_dataset.select(range(9000,10000))
selected_columns = ["label", "text"]
for i, example in enumerate(raw_dataset):
    if i < 1000:
        print({column: example[column] for column in selected_columns})



  0%|          | 0/1 [00:00<?, ?it/s]



{'label': 0, 'text': 'Taking money from Oregon taxpayers , while 1500 miles away .'}
{'label': 0, 'text': 'I wanted to thank you Mjodr for taking the time today to go check out a house there in the area for my family and I to move to .'}
{'label': 0, 'text': 'We look forward to seeing you all in the PLE very soon .'}
{'label': 0, 'text': 'That is correct , White men with guns built America , and it will likely take White men with guns to take it back .'}
{'label': 0, 'text': 'When anyone is ready we are now taking reservations on lots as we speak first come first served contact me for more info .'}
{'label': 0, 'text': 'http://www.borderwatch.us/ REPORT TO CAMPO , CALIFORNIA FOR DUTY RIGHT AWAY .'}
{'label': 0, 'text': 'CAMPO SAN DIEGO EAST COUNTY CALIFORNIA On Now thru August 7 , 2005 760 644-0857 Come One Come All !'}
{'label': 0, 'text': 'www.borderwatch.us/camp.htm http://sandiego.indymedia.org/en/2005/07/109826.shtmlhttp://sandiego.indymedia.org/en/2005/07/109855.shtmlhttp://www.s

In [None]:
def load_data(cid, num_clients=NUM_CLIENTS):
    """Load HATE data (training and eval)"""

    tokenizer = AutoTokenizer.from_pretrained(CHECKPOINT)
    def tokenize_function(examples):
        return tokenizer(examples["text"], truncation=True)

    if NOISY_DATA:
      #use dataset we made noisy

      #load the train split
      raw_dataset = load_dataset('csv', data_files='train.csv')
      #load the test split and place into noisy_hate_ds as test split
      test = load_dataset('csv', data_files='test.csv')
      raw_dataset['test'] = test['train']
    else:
      #use dataset from huggingface

      #the dataset came with just the train split
      raw_dataset = load_dataset("hate_speech18")['train']

      #remove rows that are not hate/nohate labels
      raw_dataset = raw_dataset.filter(lambda x: x['label'] <= 1)

      #make the dataset smaller
      #raw_dataset = raw_dataset.select(range(1024))

      #balance the dataset
      raw_dataset = raw_dataset.sort("label")
      raw_dataset = raw_dataset.select(range(9000,10000))

      #shuffle
      raw_dataset = raw_dataset.shuffle(seed=42)

      #remove unused columns
      raw_dataset = raw_dataset.remove_columns(["user_id", 'subforum_id', 'num_contexts'])

    #tokenize
    tokenized_datasets = raw_dataset.map(tokenize_function, batched=True)

    #clean up cols
    tokenized_datasets = tokenized_datasets.remove_columns("text")
    tokenized_datasets = tokenized_datasets.rename_column("label", "labels")

    #take the data data out of the old split and split it again
    split_token_dataset = tokenized_datasets.train_test_split(test_size=0.2)


    if POISON_TRAIN:
      #get the training labels
      train_labels = split_token_dataset['train']['labels']

      #flip some of the train labels to poison
      zero_flips = 0
      one_flips = 0
      for i in range(1000):
        if zero_flips < 15 and train_labels[i] == 0:
          train_labels[i] = 1
          zero_flips += 1
        elif one_flips < 15 and train_labels[i] == 1:
          train_labels[i] = 0
          one_flips += 1
        elif zero_flips >= 15 and one_flips >= 15:
          break
        else:
          continue

      #replace the old labels with the poisoned labels
      poisoned_train = split_token_dataset['train'].map(lambda row, idx: {"labels": train_labels[idx]}, with_indices=True, remove_columns=["labels"])

      split_token_dataset['train'] = poisoned_train



    #create loaders
    data_collator = DataCollatorWithPadding(tokenizer=tokenizer)
    trainloader = DataLoader(
        split_token_dataset["train"],
        shuffle=True,
        batch_size=32,
        collate_fn=data_collator,
    )

    testloader = DataLoader(
        split_token_dataset["test"], batch_size=32, collate_fn=data_collator
    )

    return trainloader, testloader



  0%|          | 0/1 [00:00<?, ?it/s]



Dataset({
    features: ['text', 'user_id', 'subforum_id', 'num_contexts', 'label'],
    num_rows: 10944
})
DatasetDict({
    train: Dataset({
        features: ['labels', 'input_ids', 'token_type_ids', 'attention_mask'],
        num_rows: 800
    })
    test: Dataset({
        features: ['labels', 'input_ids', 'token_type_ids', 'attention_mask'],
        num_rows: 200
    })
})


## Standard Hugging Face workflow

### Handling the data

To fetch the IMDB dataset, we will use Hugging Face's `datasets` library. We then need to tokenize the data and create `PyTorch` dataloaders, this is all done in the `load_data` function:

In [None]:
def load_data(cid, num_clients=NUM_CLIENTS):
    """Load HATE data (training and eval)"""

    tokenizer = AutoTokenizer.from_pretrained(CHECKPOINT)
    def tokenize_function(examples):
        return tokenizer(examples["text"], truncation=True)

    #the dataset came with just the train split
    raw_dataset = load_dataset("hate_speech18")['train']

    #remove rows that are not hate/nohate labels
    raw_dataset = raw_dataset.filter(lambda x: x['label'] <= 1)

    #make the dataset smaller
    #raw_dataset = raw_dataset.select(range(1024))

    #balance the dataset
    raw_dataset = raw_dataset.sort("label")
    raw_dataset = raw_dataset.select(range(9000,10000))

    #shuffle
    raw_dataset = raw_dataset.shuffle(seed=42)

    #remove unused columns
    raw_dataset = raw_dataset.remove_columns(["user_id", 'subforum_id', 'num_contexts'])

    #tokenize
    tokenized_datasets = raw_dataset.map(tokenize_function, batched=True)

    #clean up cols
    tokenized_datasets = tokenized_datasets.remove_columns("text")
    tokenized_datasets = tokenized_datasets.rename_column("label", "labels")

    #take the data data out of the old split and split it again
    split_token_dataset = tokenized_datasets.train_test_split(test_size=0.1)

    #create loaders
    data_collator = DataCollatorWithPadding(tokenizer=tokenizer)
    trainloader = DataLoader(
        split_token_dataset["train"],
        shuffle=True,
        batch_size=32,
        collate_fn=data_collator,
    )

    testloader = DataLoader(
        split_token_dataset["test"], batch_size=32, collate_fn=data_collator
    )

    return trainloader, testloader

### Training and testing the model

Once we have a way of creating our trainloader and testloader, we can take care of the training and testing. This is very similar to any `PyTorch` training or testing loop:

In [None]:
def train(net, trainloader, epochs):
    optimizer = AdamW(net.parameters(), lr=5e-5)
    net.train()
    for _ in range(epochs):
        for batch in trainloader:
            batch = {k: v.to(DEVICE) for k, v in batch.items()}
            outputs = net(**batch)
            loss = outputs.loss
            loss.backward()
            optimizer.step()
            optimizer.zero_grad()


def test(net, testloader):
    #metric = load_metric("accuracy")
    metric = evaluate.combine(["accuracy", "f1", "precision", "recall"])

    loss = 0
    net.eval()
    for batch in testloader:
        batch = {k: v.to(DEVICE) for k, v in batch.items()}
        with torch.no_grad():
            outputs = net(**batch)
        logits = outputs.logits
        loss += outputs.loss.item()
        predictions = torch.argmax(logits, dim=-1)
        metric.add_batch(predictions=predictions, references=batch["labels"])
    loss /= len(testloader.dataset)
    finalMetrics = metric.compute()

    return loss, finalMetrics

### Creating the model itself

To create the model itself, we will just load the pre-trained alBERT model using Hugging Face’s `AutoModelForSequenceClassification` :

In [None]:
net = AutoModelForSequenceClassification.from_pretrained(
    CHECKPOINT, num_labels=2
).to(DEVICE)

## Federating the example

The idea behind Federated Learning is to train a model between multiple clients and a server without having to share any data. This is done by letting each client train the model locally on its data and send its parameters back to the server, which then aggregates all the clients’ parameters together using a predefined strategy. This process is made very simple by using the [Flower](https://github.com/adap/flower) framework. If you want a more complete overview, be sure to check out this guide: [What is Federated Learning?](https://flower.dev/docs/tutorial/Flower-0-What-is-FL.html)

### Creating the IMDBClient

To federate our example to multiple clients, we first need to write our Flower client class (inheriting from `flwr.client.NumPyClient`). This is very easy, as our model is a standard `PyTorch` model:

In [None]:
class HATEClient(fl.client.NumPyClient):
    def __init__(self, net, trainloader, testloader):
        self.net = net
        self.trainloader = trainloader
        self.testloader = testloader

    def get_parameters(self, config):
        return [val.cpu().numpy() for _, val in self.net.state_dict().items()]

    def set_parameters(self, parameters):
        params_dict = zip(self.net.state_dict().keys(), parameters)
        state_dict = OrderedDict({k: torch.Tensor(v) for k, v in params_dict})
        self.net.load_state_dict(state_dict, strict=True)

    def fit(self, parameters, config):
        self.set_parameters(parameters)
        print("Training Started...")
        train(self.net, self.trainloader, epochs=1)
        print("Training Finished.")
        return self.get_parameters(config={}), len(self.trainloader), {}

    def evaluate(self, parameters, config):
        self.set_parameters(parameters)
        loss, accuracy, finalMetrics = test(self.net, self.testloader)
        finalMetrics["loss"] = loss
        #return float(loss), len(self.testloader), {"accuracy": float(accuracy), "loss": float(loss)}
        return float(loss), len(self.testloader), finalMetrics

The `get_parameters` function lets the server get the client's parameters. Inversely, the `set_parameters` function allows the server to send its parameters to the client. Finally, the `fit` function trains the model locally for the client, and the `evaluate` function tests the model locally and returns the relevant metrics.

### Generating the clients

In order to simulate the federated setting we need to provide a way to instantiate clients for our simulation. Here, it is very simple as every client will hold the same piece of data (this is not realistic, it is just used here for simplicity sakes).

In [None]:

def client_fn(cid):
  trainloader, testloader = load_data(cid)
  return HATEClient(net, trainloader, testloader)

## Starting the simulation

We now have all the elements to start our simulation. The `weighted_average` function is there to provide a way to aggregate the metrics distributed amongst the clients (basically to display a nice average accuracy at the end of the training). We then define our strategy (here `FedAvg`, which will aggregate the clients weights by doing an average).

Finally, `start_simulation` is used to start the training.

In [None]:
def weighted_average(metrics):
  f1s = [num_examples * m["f1"] for num_examples, m in metrics]
  precisions = [num_examples * m["precision"] for num_examples, m in metrics]
  recalls = [num_examples * m["recall"] for num_examples, m in metrics]

  accuracies = [num_examples * m["accuracy"] for num_examples, m in metrics]
  losses = [num_examples * m["loss"] for num_examples, m in metrics]
  examples = [num_examples for num_examples, _ in metrics]
  return {"accuracy": sum(accuracies) / sum(examples),
          "loss": sum(losses) / sum(examples),
          "f1": sum(f1s) / sum(examples),
          "precision": sum(precisions) / sum(examples),
          "recall": sum(recalls) / sum(examples)}

strategy = fl.server.strategy.FedAvg(
    fraction_fit=1.0,
    fraction_evaluate=1.0,
    evaluate_metrics_aggregation_fn=weighted_average,
)

fl.simulation.start_simulation(
    client_fn=client_fn,
    num_clients=NUM_CLIENTS,
    config=fl.server.ServerConfig(num_rounds=NUM_ROUNDS),
    strategy=strategy,
    client_resources={"num_cpus": 1, "num_gpus": 1},
    ray_init_args={"log_to_driver": False, "num_cpus": 1, "num_gpus": 1}
)

INFO flwr 2023-04-10 22:01:50,023 | app.py:145 | Starting Flower simulation, config: ServerConfig(num_rounds=3, round_timeout=None)
INFO:flwr:Starting Flower simulation, config: ServerConfig(num_rounds=3, round_timeout=None)
2023-04-10 22:01:53,827	INFO worker.py:1529 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m
INFO flwr 2023-04-10 22:01:54,378 | app.py:179 | Flower VCE: Ray initialized with resources: {'accelerator_type:A100': 1.0, 'CPU': 1.0, 'GPU': 1.0, 'node:172.28.0.12': 1.0, 'memory': 53327727822.0, 'object_store_memory': 26663863910.0}
INFO:flwr:Flower VCE: Ray initialized with resources: {'accelerator_type:A100': 1.0, 'CPU': 1.0, 'GPU': 1.0, 'node:172.28.0.12': 1.0, 'memory': 53327727822.0, 'object_store_memory': 26663863910.0}
INFO flwr 2023-04-10 22:01:54,380 | server.py:86 | Initializing global parameters
INFO:flwr:Initializing global parameters
INFO flwr 2023-04-10 22:01:54,384 | server.py:270 | Requesting initial parameters fro

History (loss, distributed):
	round 1: 0.021836645305156707
	round 2: 0.012557988017797471
	round 3: 0.00966618899255991
History (metrics, distributed):
{'accuracy': [(1, 0.785), (2, 0.86), (3, 0.95)], 'loss': [(1, 0.021836645305156707), (2, 0.012557988017797471), (3, 0.00966618899255991)], 'f1': [(1, 0.7581254724111868), (2, 0.857201309328969), (3, 0.9556194125159642)], 'precision': [(1, 0.7747368421052632), (2, 0.8244318181818182), (3, 0.938828967642527)], 'recall': [(1, 0.7446705426356589), (2, 0.8933007985803016), (3, 0.9730221780867263)]}

Note that this is a very basic example, and a lot can be added or modified, it was just to showcase how simply we could federate a Hugging Face workflow using Flower. The number of clients and the data samples are intentionally very small in order to quickly run inside Colab, but keep in mind that everything can be tweaked and extended.