# Simulate FL

In [None]:
# Load python packages.
import os
import sys
import time
import copy
import torch
import typing
import pickle
import inspect
import argparse

import numpy as np
import matplotlib.pyplot as plt

from itertools import islice
from tqdm import tqdm, trange
from os import path, getcwd, makedirs
from tensorboardX import SummaryWriter
# NLP dataset loading
from torchtext.datasets import AG_NEWS

from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator

In [52]:
# Append `torch_federated_learning` package path.
torch_package_path = "Federated_Learning_Workshop_AMLD2021/src/torch_federated_learning/"
sys.path.insert(0, torch_package_path)
# Import FL simulation objects from the above directory. 
from options import (
    args_parser,
    get_nb_args,
    print_train_stats,
)
from utils import (
    #get_dataset, 
    num_batches_per_epoch,
    average_weights, 
    exp_details,
)
from update import (
    test_inference,
    ClientShard,
)
from models import (
    get_model, 
    get_optimizer,
)
from sampling import ade_iid
# Define path and directory to save trained model and graphs.
save_path = "./save/"
model_save_dir = "objects"
# Load default arguments.
args = get_nb_args()
# Create directory to save trained model weight `.pkl` (pickle) files.
makedirs(path.join(save_path, model_save_dir), exist_ok=True)
# Define paths.
path_project = path.abspath('./')
logger = SummaryWriter('./logs')

In [2]:
train_iter = AG_NEWS(split='train')

train.csv: 29.5MB [02:49, 174kB/s]                                              


In [5]:
tokenizer = get_tokenizer('basic_english')

In [6]:
def yield_tokens(data_iter):
    for _, text in data_iter:
        yield tokenizer(text)

vocab = build_vocab_from_iterator(yield_tokens(train_iter), specials=["<unk>"])
vocab.set_default_index(vocab["<unk>"])

In [7]:
vocab(['here', 'is', 'an', 'example'])

[475, 21, 30, 5297]

In [8]:
text_pipeline = lambda x: vocab(tokenizer(x))
label_pipeline = lambda x: int(x) - 1

In [10]:
from torch.utils.data import DataLoader
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [11]:
def collate_batch(batch):
    label_list, text_list, offsets = [], [], [0]
    for (_label, _text) in batch:
         label_list.append(label_pipeline(_label))
         processed_text = torch.tensor(text_pipeline(_text), dtype=torch.int64)
         text_list.append(processed_text)
         offsets.append(processed_text.size(0))
    label_list = torch.tensor(label_list, dtype=torch.int64)
    offsets = torch.tensor(offsets[:-1]).cumsum(dim=0)
    text_list = torch.cat(text_list)
    return label_list.to(device), text_list.to(device), offsets.to(device)

In [12]:
train_iter = AG_NEWS(split='train')
dataloader = DataLoader(train_iter, batch_size=8, shuffle=False, collate_fn=collate_batch)

In [13]:
from torch import nn

class TextClassificationModel(nn.Module):

    def __init__(self, vocab_size, embed_dim, num_class):
        super(TextClassificationModel, self).__init__()
        self.embedding = nn.EmbeddingBag(vocab_size, embed_dim, sparse=True)
        self.fc = nn.Linear(embed_dim, num_class)
        self.init_weights()

    def init_weights(self):
        initrange = 0.5
        self.embedding.weight.data.uniform_(-initrange, initrange)
        self.fc.weight.data.uniform_(-initrange, initrange)
        self.fc.bias.data.zero_()

    def forward(self, text, offsets):
        embedded = self.embedding(text, offsets)
        return self.fc(embedded)

In [14]:
train_iter = AG_NEWS(split='train')
num_class = len(set([label for (label, text) in train_iter]))
vocab_size = len(vocab)
emsize = 64
model = TextClassificationModel(vocab_size, emsize, num_class).to(device)

In [15]:
import time

def train(dataloader):
    model.train()
    total_acc, total_count = 0, 0
    log_interval = 500
    start_time = time.time()

    for idx, (label, text, offsets) in enumerate(dataloader):
        optimizer.zero_grad()
        predicted_label = model(text, offsets)
        loss = criterion(predicted_label, label)
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), 0.1)
        optimizer.step()
        total_acc += (predicted_label.argmax(1) == label).sum().item()
        total_count += label.size(0)
        if idx % log_interval == 0 and idx > 0:
            elapsed = time.time() - start_time
            print('| epoch {:3d} | {:5d}/{:5d} batches '
                  '| accuracy {:8.3f}'.format(epoch, idx, len(dataloader),
                                              total_acc/total_count))
            total_acc, total_count = 0, 0
            start_time = time.time()

In [16]:
def evaluate(dataloader):
    model.eval()
    total_acc, total_count = 0, 0

    with torch.no_grad():
        for idx, (label, text, offsets) in enumerate(dataloader):
            predicted_label = model(text, offsets)
            loss = criterion(predicted_label, label)
            total_acc += (predicted_label.argmax(1) == label).sum().item()
            total_count += label.size(0)
    return total_acc/total_count

In [17]:
from torch.utils.data.dataset import random_split
from torchtext.data.functional import to_map_style_dataset
# Hyperparameters
EPOCHS = 10 # epoch
LR = 5  # learning rate
BATCH_SIZE = 64 # batch size for training

criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=LR)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, 1.0, gamma=0.1)
total_accu = None
train_iter, test_iter = AG_NEWS()
train_dataset = to_map_style_dataset(train_iter)
test_dataset = to_map_style_dataset(test_iter)
num_train = int(len(train_dataset) * 0.95)
split_train_, split_valid_ = \
    random_split(train_dataset, [num_train, len(train_dataset) - num_train])

train_dataloader = DataLoader(split_train_, batch_size=BATCH_SIZE,
                              shuffle=True, collate_fn=collate_batch)
valid_dataloader = DataLoader(split_valid_, batch_size=BATCH_SIZE,
                              shuffle=True, collate_fn=collate_batch)
test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE,
                             shuffle=True, collate_fn=collate_batch)

for epoch in range(1, EPOCHS + 1):
    epoch_start_time = time.time()
    train(train_dataloader)
    accu_val = evaluate(valid_dataloader)
    if total_accu is not None and total_accu > accu_val:
      scheduler.step()
    else:
       total_accu = accu_val
    print('-' * 59)
    print('| end of epoch {:3d} | time: {:5.2f}s | '
          'valid accuracy {:8.3f} '.format(epoch,
                                           time.time() - epoch_start_time,
                                           accu_val))
    print('-' * 59)

test.csv: 1.86MB [00:02, 789kB/s]                                               


| epoch   1 |   500/ 1782 batches | accuracy    0.689
| epoch   1 |  1000/ 1782 batches | accuracy    0.849
| epoch   1 |  1500/ 1782 batches | accuracy    0.879
-----------------------------------------------------------
| end of epoch   1 | time:  8.96s | valid accuracy    0.885 
-----------------------------------------------------------
| epoch   2 |   500/ 1782 batches | accuracy    0.899
| epoch   2 |  1000/ 1782 batches | accuracy    0.901
| epoch   2 |  1500/ 1782 batches | accuracy    0.901
-----------------------------------------------------------
| end of epoch   2 | time:  8.96s | valid accuracy    0.896 
-----------------------------------------------------------
| epoch   3 |   500/ 1782 batches | accuracy    0.916
| epoch   3 |  1000/ 1782 batches | accuracy    0.915
| epoch   3 |  1500/ 1782 batches | accuracy    0.912
-----------------------------------------------------------
| end of epoch   3 | time:  9.24s | valid accuracy    0.899 
-------------------------------

In [18]:
print('Checking the results of test dataset.')
accu_test = evaluate(test_dataloader)
print('test accuracy {:8.3f}'.format(accu_test))

Checking the results of test dataset.
test accuracy    0.912


## Simulate Federated Learning through Distributed Learning <br>

Training a model on data shards to emulate distributed environment.

In [25]:
# Here user groups is a dictionary containing keys for every member (here 50) 
# with subsequent values containing row IDs allocated to that client shard.  

# Setting 50 clients.
args.num_users = 50
# Setting only 1 global round
args.epochs = 1
# Setting 10 epochs on client shard.
args.local_ep = 10
# Setting `frac` to use only one client shard
args.frac = 0.02 # 50 clients x 0.02 sampling fraction => 1 client

In [80]:
args.model = 'nlp'
args.task = 'nlp'
args.dataset = 'agnews'
args.iid = 1

In [81]:
args

Namespace(dataset='agnews', epochs=10, frac=0.2, gpu='', iid=1, kernel_num=9, kernel_sizes='3,4,5', local_bs=8, local_ep=10, lr=0.01, max_pool='True', model='nlp', momentum=0.5, norm='batch_norm', num_channels=3, num_classes=10, num_filters=32, num_users=10, optimizer='sgd', seed=1, stopping_rounds=10, task='nlp', test_frac=0.1, unequal=0, verbose=1)

In [98]:
# Load the global model
global_model = TextClassificationModel(vocab_size, emsize, num_class).to(device)

In [83]:
def ade_iid(dataset, num_users):
    """Sample I.I.D. client data from Ade_corpus dataset
    Args:
        dataset: Dataset to be used to simulate client datasets.
        num_users: Number of clients in federated learning.
    Returns:
        dict of text index
    """
    num_items = int(len(dataset)/num_users)
    dict_users, all_idxs = {}, [i for i in range(len(dataset))]
    for i in range(num_users):
        dict_users[i] = set(np.random.choice(all_idxs,
                                             num_items,
                                             replace=False))
        all_idxs = list(set(all_idxs) - dict_users[i])
    return dict_users

In [86]:
def get_dataset_(args, train_dataset, test_dataset, custom_sampling=None):
    """ Returns train and test datasets and a user group which is a dict where
    the keys are the user index and the values are the corresponding data for
    each of those users.
    """
    if args.task == 'nlp':
        assert args.dataset == "agnews", "Parsed dataset not implemented."
        # sample training data amongst users
        if args.iid:
            # Sample IID user data from Ade_corpus
            user_groups = ade_iid(train_dataset, args.num_users)
        else:
            # Sample Non-IID user data from Ade_corpus
            if args.unequal:
                # Chose unequal splits for every user
                raise NotImplementedError()
            else:
                # Chose equal splits for every user
                user_groups = ade_noniid(train_dataset, args.num_users)
                
    return train_dataset, test_dataset, user_groups

In [87]:
# Load the dataset with user groups
train_iter, test_iter = AG_NEWS()
train_dataset = to_map_style_dataset(train_iter)
test_dataset = to_map_style_dataset(test_iter)
train_dataset, test_dataset, user_groups = get_dataset_(args, train_dataset, test_dataset)

In [99]:
# Send the model to the device (cpu or gpu).
global_model.to(device)
# Set the model to train mode 
# (i.e. green flag for model weights to be updated in backward pass).
global_model.train()
# Instatiate a copy of the untrained model weights to be used for Federated Averaging as a base.  
global_weights = global_model.state_dict()

In [100]:
# Randomly sample dataset to create client data shards.
idxs_users = np.random.choice(range(args.num_users), 1, replace=False)
# Get our brave client's id.
lonely_client_idx = idxs_users[0]
# We also copy the untrained global model weights to be used to load local trained weights
updated_local_model = global_model