# Thư viện

In [1]:
!pip install -q flwr[simulation]  pandas matplotlib scikit-learn torch

In [2]:
from collections import OrderedDict
from typing import List, Tuple
from collections import Counter

import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
import pandas as pd
from sklearn.model_selection import train_test_split
from sklearn.feature_extraction.text import TfidfVectorizer

import pickle

import flwr
from flwr.client import Client, ClientApp, NumPyClient
from flwr.common import Metrics, Context
from flwr.server import ServerApp, ServerConfig, ServerAppComponents
from flwr.server.strategy import FedAvg
from flwr.simulation import run_simulation
from flwr.server.client_proxy import ClientProxy
from flwr.common import Parameters, Scalar, FitRes, parameters_to_ndarrays
from typing import Optional, Union

DEVICE = torch.device("cpu")  # Try "cuda" to train on GPU
print(f"Training on {DEVICE}")
print(f"Flower {flwr.__version__} / PyTorch {torch.__version__}")


Training on cpu
Flower 1.13.0 / PyTorch 2.5.1


In [3]:
# Cấu hình thiết bị (CUDA hoặc CPU)
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [16]:
BATCH_SIZE = 16
NUM_CLIENTS = 10

# Dataset

In [19]:
# Đường dẫn đến tập dữ liệu
DATA_PATH = r"..\..\data\SMSSpamCollection"

# Đọc dữ liệu
data = pd.read_csv(DATA_PATH, sep='\t', header=None, names=["label", "text"])

# Chuyển đổi nhãn 'ham' và 'spam' thành 0 và 1
data["label"] = data["label"].map({"ham": 0, "spam": 1})


## Xử lý dữ liệu

In [20]:
class SMSDataset(Dataset):
    def __init__(self, texts, labels, vectorizer):
        self.texts = texts
        self.labels = labels
        self.vectorizer = vectorizer

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        text_vector = self.vectorizer.transform([self.texts[idx]]).toarray().squeeze()
        label = self.labels[idx]
        return torch.tensor(text_vector, dtype=torch.float32), torch.tensor(label, dtype=torch.long)

def load_datasets(partition_id: int):
    print(f"Loading dataset for partition ID: {partition_id}")

    # Load raw data
    data = pd.read_csv(DATA_PATH, sep="\t", header=None, names=["label", "text"])
    data["label"] = data["label"].map({"ham": 0, "spam": 1})

    # Vectorize text data
    vectorizer = TfidfVectorizer(stop_words="english", max_features=5000)
    vectorizer.fit(data["text"])

    # Split into NUM_CLIENTS partitions
    indices = np.arange(len(data))
    np.random.seed(42)
    np.random.shuffle(indices)
    split_indices = np.array_split(indices, NUM_CLIENTS)

    # Select partition
    partition_indices = split_indices[partition_id]
    partition_data = data.iloc[partition_indices]

    # Train/val split
    train_data, val_data = train_test_split(partition_data, test_size=0.1, random_state=42)
    train_dataset = SMSDataset(train_data["text"].tolist(), train_data["label"].tolist(), vectorizer)
    val_dataset = SMSDataset(val_data["text"].tolist(), val_data["label"].tolist(), vectorizer)

    # Dataloaders
    trainloader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
    valloader = DataLoader(val_dataset, batch_size=BATCH_SIZE)

    # Test set
    test_data = data.iloc[split_indices[-1]]
    test_dataset = SMSDataset(test_data["text"].tolist(), test_data["label"].tolist(), vectorizer)
    testloader = DataLoader(test_dataset, batch_size=BATCH_SIZE)

    print(f"Partition {partition_id}: Train {len(train_data)}, Val {len(val_data)}, Test {len(test_data)}")
    return trainloader, valloader, testloader


In [21]:
data.head()

Unnamed: 0,label,text
0,0,"Go until jurong point, crazy.. Available only ..."
1,0,Ok lar... Joking wif u oni...
2,1,Free entry in 2 a wkly comp to win FA Cup fina...
3,0,U dun say so early hor... U c already then say...
4,0,"Nah I don't think he goes to usf, he lives aro..."


# Train model

In [22]:
class Net(nn.Module):
    def __init__(self, input_dim: int):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(input_dim, 128)  # Fully connected layer
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 2)  # 2 output classes (ham, spam)

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x


In [23]:
# Lấy trainloader từ partition đầu tiên
trainloader, valloader, testloader = load_datasets(partition_id=0)

# Số chiều của đầu vào từ vectorizer
input_dim = 5000  # (đã được đặt trong load_datasets max_features=5000)

# Khởi tạo mô hình
net = Net(input_dim).to(DEVICE)
print(net)


Loading dataset for partition ID: 0
Partition 0: Train 502, Val 56, Test 557
Net(
  (fc1): Linear(in_features=5000, out_features=128, bias=True)
  (fc2): Linear(in_features=128, out_features=64, bias=True)
  (fc3): Linear(in_features=64, out_features=2, bias=True)
)


In [24]:
def train(net, trainloader, epochs: int, verbose=False):
    """Train the network on the training set."""
    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.Adam(net.parameters())
    net.train()
    for epoch in range(epochs):
        epoch_loss = 0.0
        correct, total = 0, 0
        for inputs, labels in trainloader:
            inputs, labels = inputs.to(DEVICE), labels.to(DEVICE)
            optimizer.zero_grad()
            outputs = net(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            # Metrics
            epoch_loss += loss.item()
            total += labels.size(0)
            correct += (torch.max(outputs, 1)[1] == labels).sum().item()
        epoch_loss /= len(trainloader)
        epoch_acc = correct / total
        if verbose:
            print(f"Train loss {epoch_loss}, Accuracy {epoch_acc}")

def test(net, testloader):
    """Evaluate the network on the entire test set."""
    criterion = torch.nn.CrossEntropyLoss()
    correct, total, loss = 0, 0, 0.0
    net.eval()
    with torch.no_grad():
        for inputs, labels in testloader:
            inputs, labels = inputs.to(DEVICE), labels.to(DEVICE)
            outputs = net(inputs)
            loss += criterion(outputs, labels).item()
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    loss /= len(testloader)
    accuracy = correct / total
    return loss, accuracy


# Federated learning

## Chiến thuật 2: Gradient Poisoning

### Mô tả chiến thuật:
- Chỉnh sửa gradient trước khi gửi về server nhằm gây ảnh hưởng lớn đến quá trình tổng hợp.

### Mục tiêu:
Hướng mô hình toàn cục theo một cách sai lệch.
Làm giảm hoặc đảo ngược hiệu suất của mô hình toàn cục.

### Chiến thuật cụ thể:
Gradient Scaling (Phóng đại hoặc giảm gradient):

Mục tiêu: Tăng ảnh hưởng của client độc hại hoặc làm gradient của client độc hại mất trọng số.


In [25]:
def set_parameters(net, parameters: List[np.ndarray]):
    params_dict = zip(net.state_dict().keys(), parameters)
    state_dict = OrderedDict({k: torch.Tensor(v) for k, v in params_dict})
    net.load_state_dict(state_dict, strict=True)


def get_parameters(net) -> List[np.ndarray]:
    return [val.cpu().numpy() for _, val in net.state_dict().items()]

In [26]:
class FlowerClient(NumPyClient):
    def __init__(self, net, trainloader, valloader, is_bad_client=False, scale_factor=10):
        self.net = net
        self.trainloader = trainloader
        self.valloader = valloader
        self.is_bad_client = is_bad_client
        self.scale_factor = scale_factor  # Hệ số để chỉnh sửa gradient

    def get_parameters(self, config):
        # Lấy tham số của mô hình
        return get_parameters(self.net)

    def fit(self, parameters, config):
        # Cập nhật tham số từ mô hình toàn cục
        set_parameters(self.net, parameters)

        # Huấn luyện mô hình cục bộ
        train(self.net, self.trainloader, epochs=1)

        # Lấy gradient (delta tham số) sau khi huấn luyện
        original_parameters = get_parameters(self.net)
        gradient = [
            original - updated
            for original, updated in zip(parameters, original_parameters)
        ]

        # Gradient Poisoning
        if self.is_bad_client:
            print(f"Client behaving badly (gradient poisoning): scaling gradients.")
            poisoned_gradient = [g * self.scale_factor for g in gradient]
        else:
            poisoned_gradient = gradient

        # Cập nhật tham số bị tấn công
        poisoned_parameters = [
            updated - g for updated, g in zip(original_parameters, poisoned_gradient)
        ]
        return poisoned_parameters, len(self.trainloader), {}

    def evaluate(self, parameters, config):
        # Đánh giá mô hình trên tập kiểm tra
        set_parameters(self.net, parameters)
        loss, accuracy = test(self.net, self.valloader)
        return float(loss), len(self.valloader), {"accuracy": float(accuracy)}


In [27]:
def client_fn(context: Context) -> Client:
    """Create a Flower client representing a single organization."""
    input_dim = 5000  # Đã được đặt trong load_datasets max_features=5000
    net = Net(input_dim).to(DEVICE)
    partition_id = context.node_config["partition-id"]
    trainloader, valloader, _ = load_datasets(partition_id=partition_id)

    # Định nghĩa các client độc hại (ví dụ: chỉ định partition ID là bad client)
    bad_clients = [1, 3, 5]  # Danh sách các partition ID của client độc hại
    is_bad_client = partition_id in bad_clients

    trainloader, valloader, _ = load_datasets(partition_id=partition_id)

    return FlowerClient(net, trainloader, valloader, is_bad_client).to_client()

# Create the ClientApp
client = ClientApp(client_fn=client_fn)



In [28]:
def weighted_average(metrics: List[Tuple[int, dict]]) -> dict:
    # Ensure there are metrics to aggregate
    if not metrics:
        return {}

    # Initialize storage for weighted sums
    weighted_sums = {}
    total_examples = 0

    for num_examples, metric_dict in metrics:
        total_examples += num_examples
        for key, value in metric_dict.items():
            if key not in weighted_sums:
                weighted_sums[key] = 0
            weighted_sums[key] += num_examples * value

    # Compute weighted averages
    aggregated_metrics = {
        key: weighted_sums[key] / total_examples for key in weighted_sums
    }
    return aggregated_metrics


In [29]:
# Custom SaveModelStrategy implementation
class SaveModelStrategy(FedAvg):
    def aggregate_fit(
        self,
        server_round: int,
        results: list[tuple[ClientProxy, FitRes]],
        failures: list[Union[tuple[ClientProxy, FitRes], BaseException]],
    ) -> tuple[Optional[Parameters], dict[str, Scalar]]:

        # Call aggregate_fit from the base class (FedAvg)
        aggregated_parameters, aggregated_metrics = super().aggregate_fit(
            server_round, results, failures
        )

        if aggregated_parameters is not None:
            # Convert `Parameters` to `list[np.ndarray]`
            aggregated_ndarrays = parameters_to_ndarrays(aggregated_parameters)

            # Save aggregated weights for each round
            print(f"Saving round {server_round} aggregated weights...")
            np.savez(f"round-{server_round}-weights.npz", *aggregated_ndarrays)

            # Save the MPA model at the end of training
            if server_round == 5:
                with open("MPA_model.pkl", "wb") as f:
                    pickle.dump(aggregated_ndarrays, f)
                print("MPA model saved as 'MPA_model.pkl'")
                # Lưu dưới dạng PyTorch
                torch.save(aggregated_ndarrays, "MPA_model.pth")
                print("MPA model saved as 'MPA_model.pth'")

        return aggregated_parameters, aggregated_metrics

In [30]:

# Define the server function
def server_fn(context: Context) -> ServerAppComponents:
    # Use the custom SaveModelStrategy
    strategy = SaveModelStrategy(
        fraction_fit=1.0,
        fraction_evaluate=0.5,
        min_fit_clients=10,
        min_evaluate_clients=5,
        min_available_clients=10,
        fit_metrics_aggregation_fn=weighted_average,
        evaluate_metrics_aggregation_fn=weighted_average,
    )

    # Configure the server for 5 rounds of training
    config = ServerConfig(num_rounds=5)

    return ServerAppComponents(strategy=strategy, config=config)


# Create a new server instance with the SaveModelStrategy
server = ServerApp(server_fn=server_fn)

In [31]:
# Specify the resources each of your clients need
# By default, each client will be allocated 1x CPU and 0x GPUs
backend_config = {"client_resources": {"num_cpus": 1, "num_gpus": 0.0}}

# When running on GPU, assign an entire GPU for each client
if DEVICE.type == "cuda":
    backend_config = {"client_resources": {"num_cpus": 1, "num_gpus": 1.0}}
    # Refer to our Flower framework documentation for more details about Flower simulations
    # and how to set up the `backend_config`

In [32]:
import os

# Disable oneDNN custom operations to avoid floating-point round-off errors
os.environ['TF_ENABLE_ONEDNN_OPTS'] = '0'

# Import TensorFlow and other required libraries
import tensorflow as tf

# Initialize TensorFlow
print("TensorFlow version:", tf.__version__)


TensorFlow version: 2.18.0


In [33]:
# Run simulation
history = run_simulation(
    server_app=server,
    client_app=client,
    num_supernodes=NUM_CLIENTS,
    backend_config=backend_config,
    verbose_logging=True
)



[94mDEBUG 2024-12-02 01:03:32,376[0m:     Asyncio event loop already running.
[94mDEBUG 2024-12-02 01:03:32,378[0m:     Logger propagate set to False
[94mDEBUG 2024-12-02 01:03:32,380[0m:     Pre-registering run with id 7088696580099432266
[94mDEBUG 2024-12-02 01:03:32,381[0m:     Using InMemoryState
[94mDEBUG 2024-12-02 01:03:32,383[0m:     Using InMemoryState
[92mINFO 2024-12-02 01:03:32,410[0m:      Starting Flower ServerApp, config: num_rounds=5, no round_timeout
[94mDEBUG 2024-12-02 01:03:32,928[0m:     Buffer time delay: 5s
[92mINFO 2024-12-02 01:03:32,930[0m:      
[92mINFO 2024-12-02 01:03:32,934[0m:      [INIT]
[92mINFO 2024-12-02 01:03:32,935[0m:      Requesting initial parameters from one random client
[94mDEBUG 2024-12-02 01:03:37,950[0m:     Using InMemoryState
[94mDEBUG 2024-12-02 01:03:37,950[0m:     Registered 10 nodes
[94mDEBUG 2024-12-02 01:03:37,955[0m:     Supported backends: ['ray']
[94mDEBUG 2024-12-02 01:03:37,955[0m:     Initialising:

[36m(ClientAppActor pid=36500)[0m Loading dataset for partition ID: 0
[36m(ClientAppActor pid=36500)[0m Partition 0: Train 502, Val 56, Test 557
[36m(ClientAppActor pid=36500)[0m Loading dataset for partition ID: 0


[92mINFO 2024-12-02 01:04:24,358[0m:      Received initial parameters from one random client
[92mINFO 2024-12-02 01:04:24,358[0m:      Starting evaluation of initial global parameters
[92mINFO 2024-12-02 01:04:24,364[0m:      Evaluation returned no results (`None`)
[92mINFO 2024-12-02 01:04:24,366[0m:      
[92mINFO 2024-12-02 01:04:24,368[0m:      [ROUND 1]
[92mINFO 2024-12-02 01:04:24,369[0m:      configure_fit: strategy sampled 10 clients (out of 10)


[36m(ClientAppActor pid=36500)[0m Partition 0: Train 502, Val 56, Test 557
[36m(ClientAppActor pid=36500)[0m Loading dataset for partition ID: 0
[36m(ClientAppActor pid=36500)[0m Partition 0: Train 502, Val 56, Test 557
[36m(ClientAppActor pid=36500)[0m Loading dataset for partition ID: 0
[36m(ClientAppActor pid=36500)[0m Partition 0: Train 502, Val 56, Test 557
[36m(ClientAppActor pid=2112)[0m Loading dataset for partition ID: 9
[36m(ClientAppActor pid=2112)[0m Partition 9: Train 501, Val 56, Test 557
[36m(ClientAppActor pid=2112)[0m Loading dataset for partition ID: 9
[36m(ClientAppActor pid=2112)[0m Partition 9: Train 501, Val 56, Test 557
[36m(ClientAppActor pid=33984)[0m Client behaving badly (gradient poisoning): scaling gradients.
[36m(ClientAppActor pid=13408)[0m Partition 3: Train 501, Val 56, Test 557[32m [repeated 16x across cluster] (Ray deduplicates logs by default. Set RAY_DEDUP_LOGS=0 to disable log deduplication, or see https://docs.ray.io/en/mast

[92mINFO 2024-12-02 01:04:48,796[0m:      aggregate_fit: received 10 results and 0 failures
[92mINFO 2024-12-02 01:04:49,093[0m:      configure_evaluate: strategy sampled 5 clients (out of 10)


Saving round 1 aggregated weights...


[92mINFO 2024-12-02 01:04:49,984[0m:      aggregate_evaluate: received 5 results and 0 failures
[92mINFO 2024-12-02 01:04:49,986[0m:      
[92mINFO 2024-12-02 01:04:49,987[0m:      [ROUND 2]
[92mINFO 2024-12-02 01:04:49,989[0m:      configure_fit: strategy sampled 10 clients (out of 10)
[92mINFO 2024-12-02 01:04:53,514[0m:      aggregate_fit: received 10 results and 0 failures
[92mINFO 2024-12-02 01:04:53,571[0m:      configure_evaluate: strategy sampled 5 clients (out of 10)


Saving round 2 aggregated weights...
[36m(ClientAppActor pid=2112)[0m Client behaving badly (gradient poisoning): scaling gradients.[32m [repeated 5x across cluster][0m
[36m(ClientAppActor pid=2112)[0m Partition 3: Train 501, Val 56, Test 557[32m [repeated 30x across cluster][0m
[36m(ClientAppActor pid=7096)[0m Loading dataset for partition ID: 5[32m [repeated 31x across cluster][0m


[92mINFO 2024-12-02 01:04:54,540[0m:      aggregate_evaluate: received 5 results and 0 failures
[92mINFO 2024-12-02 01:04:54,542[0m:      
[92mINFO 2024-12-02 01:04:54,543[0m:      [ROUND 3]
[92mINFO 2024-12-02 01:04:54,545[0m:      configure_fit: strategy sampled 10 clients (out of 10)
[92mINFO 2024-12-02 01:04:58,203[0m:      aggregate_fit: received 10 results and 0 failures
[92mINFO 2024-12-02 01:04:58,256[0m:      configure_evaluate: strategy sampled 5 clients (out of 10)


Saving round 3 aggregated weights...
[36m(ClientAppActor pid=2092)[0m Client behaving badly (gradient poisoning): scaling gradients.[32m [repeated 3x across cluster][0m
[36m(ClientAppActor pid=20796)[0m Partition 8: Train 501, Val 56, Test 557[32m [repeated 34x across cluster][0m
[36m(ClientAppActor pid=20796)[0m Loading dataset for partition ID: 8[32m [repeated 38x across cluster][0m


[92mINFO 2024-12-02 01:04:59,245[0m:      aggregate_evaluate: received 5 results and 0 failures
[92mINFO 2024-12-02 01:04:59,248[0m:      
[92mINFO 2024-12-02 01:04:59,249[0m:      [ROUND 4]
[92mINFO 2024-12-02 01:04:59,250[0m:      configure_fit: strategy sampled 10 clients (out of 10)
[92mINFO 2024-12-02 01:05:02,991[0m:      aggregate_fit: received 10 results and 0 failures
[92mINFO 2024-12-02 01:05:03,062[0m:      configure_evaluate: strategy sampled 5 clients (out of 10)


Saving round 4 aggregated weights...


[92mINFO 2024-12-02 01:05:04,249[0m:      aggregate_evaluate: received 5 results and 0 failures
[92mINFO 2024-12-02 01:05:04,249[0m:      
[92mINFO 2024-12-02 01:05:04,249[0m:      [ROUND 5]
[92mINFO 2024-12-02 01:05:04,256[0m:      configure_fit: strategy sampled 10 clients (out of 10)


[36m(ClientAppActor pid=31784)[0m Client behaving badly (gradient poisoning): scaling gradients.[32m [repeated 3x across cluster][0m
[36m(ClientAppActor pid=14712)[0m Partition 0: Train 502, Val 56, Test 557[32m [repeated 32x across cluster][0m
[36m(ClientAppActor pid=11592)[0m Loading dataset for partition ID: 6[32m [repeated 31x across cluster][0m


[92mINFO 2024-12-02 01:05:07,729[0m:      aggregate_fit: received 10 results and 0 failures
[92mINFO 2024-12-02 01:05:07,898[0m:      configure_evaluate: strategy sampled 5 clients (out of 10)


Saving round 5 aggregated weights...
MPA model saved as 'MPA_model.pkl'
MPA model saved as 'MPA_model.pth'


[92mINFO 2024-12-02 01:05:08,949[0m:      aggregate_evaluate: received 5 results and 0 failures
[92mINFO 2024-12-02 01:05:08,952[0m:      
[92mINFO 2024-12-02 01:05:08,953[0m:      [SUMMARY]
[92mINFO 2024-12-02 01:05:08,955[0m:      Run finished 5 round(s) in 44.58s
[92mINFO 2024-12-02 01:05:08,963[0m:      	History (loss, distributed):
[92mINFO 2024-12-02 01:05:08,967[0m:      		round 1: 1.1667768165469168
[92mINFO 2024-12-02 01:05:08,968[0m:      		round 2: 0.46377829015254973
[92mINFO 2024-12-02 01:05:08,969[0m:      		round 3: 1.279293768852949
[92mINFO 2024-12-02 01:05:08,970[0m:      		round 4: 0.2269435554742813
[92mINFO 2024-12-02 01:05:08,971[0m:      		round 5: 1.0610921427607536
[92mINFO 2024-12-02 01:05:08,972[0m:      	History (metrics, distributed, evaluate):
[92mINFO 2024-12-02 01:05:08,973[0m:      	{'accuracy': [(1, 0.9107142857142858),
[92mINFO 2024-12-02 01:05:08,973[0m:      	              (2, 0.8785714285714287),
[92mINFO 2024-12-02 01:05

[36m(ClientAppActor pid=13408)[0m Client behaving badly (gradient poisoning): scaling gradients.[32m [repeated 3x across cluster][0m
[36m(ClientAppActor pid=2092)[0m Partition 4: Train 501, Val 56, Test 557[32m [repeated 34x across cluster][0m
[36m(ClientAppActor pid=2092)[0m Loading dataset for partition ID: 4[32m [repeated 30x across cluster][0m


[94mDEBUG 2024-12-02 01:05:11,258[0m:     Terminated RayBackend
[94mDEBUG 2024-12-02 01:05:11,270[0m:     Stopping Simulation Engine now.
