# DFedAvgM

In [1]:
import sys
directory_path = "../../../"
if directory_path not in sys.path:
    # Add the directory to sys.path
    sys.path.append(directory_path)

import copy
import time
import time
import numpy as np
import argparse
import yaml
import networkx as nx
import matplotlib.pyplot as plt

from torch.utils import data
from torch import optim

from utils.utils import *
from utils import load_config
from utils.validate import *
from fedlearning.model import *
from fedlearning.dataset import *
from fedlearning.evolve import *
from fedlearning.optimizer import GlobalUpdater, LocalUpdater, get_omegas
from fedlearning.quantizer import SqcCompressor

Helper functions

In [2]:
def replace_with_average(client_id, neighbors, model_dict, verbose=False):
        averaged_weight = average_neighbor_weights(client_id, neighbors, model_dict)
        # Load weight into client
        model_dict[client_id].load_state_dict(averaged_weight)


def create_random_graph(n, p, graph_name=None):
    # Generate the graph
    G = nx.erdos_renyi_graph(n, p)

    if graph_name != None:
        # Draw the graph
        nx.draw(G, with_labels=True)
        plt.savefig(graph_name)
    return G

def create_ring_graph(n, graph_name=None):
    G = nx.cycle_graph(n)
    if graph_name != None:
        # Draw the graph
        nx.draw(G, with_labels=True)
        plt.savefig(graph_name)
    return G

def create_regular_graph(n, d, graph_name=None):
    # Generate the graph
    G = nx.random_regular_graph(d, n)
    
    if graph_name != None:
        # Draw the graph
        nx.draw(G, with_labels=True)
        plt.savefig(graph_name)
    return G

def average_neighbor_weights(client_id, neighbor_ids, model_dict):
    # Average the weights of the models in the cluster
    weight_dict = copy.deepcopy(model_dict[client_id].state_dict())
    weight_aggregator = WeightMod(weight_dict)
    for user_id in neighbor_ids:
        weight_aggregator.add(copy.deepcopy(model_dict[user_id].state_dict()))
    # Add one for the client itself
    weight_aggregator.mul(1.0/ (len(neighbor_ids)+1) )
    return weight_aggregator.state_dict()


def load_and_deload_neighbor_weights(neighbor_ids, model_dict, avg_weight_dict):
    # Save the weights of the neighbors
    older_weight_dicts = [copy.deepcopy(model_dict[user_id].state_dict()) for user_id in neighbor_ids]
    # Load the average weights
    for user_id in neighbor_ids:
        model_dict[user_id].load_state_dict(avg_weight_dict)
    return older_weight_dicts

def reload_neighbor_weights(neighbor_ids, model_dict, old_weight_dicts):
    for i, user_id in enumerate(neighbor_ids):
        model_dict[user_id].load_state_dict(old_weight_dicts[i])

class NumpyDataset(Dataset):
    def __init__(self, data, targets, transform=None):
        """
        Args:
            data (numpy array): Array of data samples.
            targets (numpy array): Array of labels corresponding to the data samples.
            transform (callable, optional): Optional transform to be applied on a sample.
        """
        self.data = data
        self.targets = targets
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sample = self.data[idx]
        target = self.targets[idx]
        
        if self.transform:
            sample = self.transform(sample)
        
        return sample, target

def numpy_to_tensor_transform(data):
    return torch.from_numpy(data)

In [3]:
def train_client_momentum(user_model, user_id, dataset, config, logger, loss_fn, 
sgd_batch_size=32, local_epochs=1, lr = 0.01, momentum = 0.99, verbose=False): 
    # Get data corresponding to a certain user
    user_resource = assign_user_resource(config, user_id, 
                        dataset["train_data"], dataset["user_with_data"])
    
    # Define the momentum optimizer
    # Following the DFedAvgM paper
    optimizer = optim.SGD(user_model.parameters(), lr=lr, momentum=momentum, 
    weight_decay=0, dampening=0, nesterov=False)
    np_dataset = NumpyDataset(user_resource["images"], user_resource["labels"], transform=numpy_to_tensor_transform)

    user_data_loader = DataLoader(np_dataset, batch_size=sgd_batch_size, shuffle=True)
   
    # Doing local_epochs number of local training rounds
    for epoch in range(local_epochs):
        # Iterate over the user's data
        for batch_idx, (data, target) in enumerate(user_data_loader):
            data, target = data.to(config.device), target.to(config.device)
            # Clear the gradients
            optimizer.zero_grad()
            
            # Forward pass
            output = user_model(data)
            
            # Compute the loss
            loss = loss_fn(output, target)
            
            # Backward pass
            loss.backward()
            
            # Update the model parameters
            optimizer.step()

            if batch_idx % 100 == 0:
                if verbose: logger.info(f'Train Epoch: {epoch} [{batch_idx * len(data)}/{len(user_data_loader.dataset)} ({100. * batch_idx / len(user_data_loader):.0f}%)]\tLoss: {loss.item():.6f}')
    if verbose: print()

In [4]:
config_file = "baseline_configs/config_dfedavgm.yaml"
config = load_config(config_file)

logger = init_logger(config)
logger.info("Loaded configuration from {}".format(config_file))
logger.info("Dataset path: {}".format(config.train_data_dir))

# Define a model to extract number of parameters for record
if config.record_path is not None:
    record = load_record(config.record_path)
    logger.info("Loaded record from {}".format(config.record_path))
    loaded_record = True
else:
    model = init_model(config, logger)
    record = init_record(config, model)
    loaded_record = False

if config.device == "cuda":
    torch.backends.cudnn.benchmark = True
    torch.backends.cudnn.deterministic = True

--------------------------------------------------------------------------------
Loaded configuration from baseline_configs/config_dfedavgm.yaml
Dataset path: ../../../data/mnist/train.dat


Creating config from filepath:  baseline_configs/config_dfedavgm.yaml
/home/gathomp3/Deep_Learning/NeuralTangent/ntk-fed/notebooks/baselines/dfedavgm/../../../../records/baseline_trials/dfedavgm/trial_test/train.log


Create user_ids and dataset

In [5]:
# Create user_ids
user_ids = np.arange(0, config.users)
# load the dataset
# dataset object is a dictionary with keys: train_data, test_data, user_with_data
# user_with_data is a dictionary with keys: userID:sampleID
# For example, in the IID setting ID's are just assigned like 0, 1, 2, 3, ...
dataset = assign_user_data(config, logger)
test_images = torch.from_numpy(dataset["test_data"]["images"]).to(config.device)
test_labels = torch.from_numpy(dataset["test_data"]["labels"]).to(config.device)

Non-IID data distribution
Load user_with_data from /home/gathomp3/Deep_Learning/NeuralTangent/ntk-fed/data/user_with_data/mnist300/a0.5/user_dataidx_map_0.50_0.dat


Initialize user model dictionary

In [6]:
loaded_record = False
# Create a dictionary of models for each user
# Same initialization for all users
# If record/model_dict is passed, continue training from where it left off
if loaded_record == True:
    model_dict = record["models"]
else:
    if config.same_init:
        model = init_model(config, logger)
        model_dict = {model_id: copy.deepcopy(model) for model_id in user_ids}
    else:
        model_dict = {model_id: init_model(config, logger) for model_id in user_ids}

# Get zeroth round loss, acc

In [7]:
# Get zeroth round loss, acc
verbose = True
if record["epoch"] == 0:
    logger.info("Logging initial loss, acc")
    client_losses = []
    client_accs = []
    for client_id in user_ids:
        # Evaluate the client's model on the slice of the training data corresponding to the client's data
        user_images = torch.from_numpy(dataset["train_data"]["images"][dataset["user_with_data"][client_id]]).to(config.device)
        user_labels = torch.from_numpy(dataset["train_data"]["labels"][dataset["user_with_data"][client_id]]).to(config.device)
        
        # Get model outputs
        output_on_own_data = model_dict[client_id](user_images)
        output_on_test_set = model_dict[client_id](test_images)
        
        # Get losses/accs, and append to list
        loss = loss_with_output(output_on_own_data, user_labels, config.loss)
        acc = accuracy_with_output(output_on_test_set, test_labels)
        
        client_losses.append(loss)
        client_accs.append(acc)
        if verbose: logger.info("client {:d} loss {:.4f} acc {:.4f}".format(client_id, loss, acc))
    # Get rid of unnecessary variables to free up memory
    del user_images; del user_labels; del output_on_own_data; del output_on_test_set
    # Finally, append the initial losses, accs to the record
    record["loss"].append(client_losses)
    record["testing_accuracy"].append(client_accs)

Logging initial loss, acc
client 0 loss 2.8094 acc 0.0798
client 1 loss 2.9916 acc 0.0798
client 2 loss 3.0223 acc 0.0798
client 3 loss 2.9153 acc 0.0798
client 4 loss 2.8716 acc 0.0798
client 5 loss 2.8487 acc 0.0798
client 6 loss 2.8266 acc 0.0798
client 7 loss 2.5721 acc 0.0798
client 8 loss 3.3079 acc 0.0798
client 9 loss 2.6787 acc 0.0798
client 10 loss 3.1393 acc 0.0798
client 11 loss 3.1946 acc 0.0798
client 12 loss 2.8435 acc 0.0798
client 13 loss 2.6586 acc 0.0798
client 14 loss 2.3556 acc 0.0798
client 15 loss 2.6228 acc 0.0798
client 16 loss 2.9121 acc 0.0798
client 17 loss 2.6320 acc 0.0798
client 18 loss 2.9415 acc 0.0798
client 19 loss 2.9095 acc 0.0798
client 20 loss 3.3225 acc 0.0798
client 21 loss 2.5120 acc 0.0798
client 22 loss 2.6807 acc 0.0798
client 23 loss 2.8600 acc 0.0798
client 24 loss 2.6811 acc 0.0798
client 25 loss 3.5398 acc 0.0798
client 26 loss 2.5985 acc 0.0798
client 27 loss 3.4576 acc 0.0798
client 28 loss 2.6968 acc 0.0798
client 29 loss 3.2190 acc 0

In [8]:
def calculate_tensor_bits(state_dict):
    total_bits = 0
    for name, param in state_dict.items():
        if isinstance(param, torch.Tensor):
            # Calculate number of elements
            num_elements = param.numel()
            
            # Get number of bits per element based on data type
            if param.dtype == torch.float32:
                bits_per_element = 32
            elif param.dtype == torch.float16:
                bits_per_element = 16
            elif param.dtype == torch.int32:
                bits_per_element = 32
            elif param.dtype == torch.int64:
                bits_per_element = 64
            else:
                raise ValueError(f"Unsupported data type: {param.dtype}")
            
            # Calculate total bits for this tensor
            tensor_bits = num_elements * bits_per_element
            
            total_bits += tensor_bits
            
            print(f"{name}: {tensor_bits} bits")
    
    print(f"Total bits: {total_bits}")
    print(f"Total bytes: {total_bits / 8}")
    print(f"Total kilobytes: {total_bits / (8 * 1024):.2f}")
    print(f"Total megabytes: {total_bits / (8 * 1024 * 1024):.2f}")

    return total_bits

In [9]:
calculate_tensor_bits(model_dict[0].state_dict())

fc1.weight: 2508800 bits
fc1.bias: 3200 bits
fc2.weight: 32000 bits
fc2.bias: 320 bits
Total bits: 2544320
Total bytes: 318040.0
Total kilobytes: 310.59
Total megabytes: 0.30


2544320

In [14]:
temp = WeightMod(model_dict[0].state_dict())
# Define parameters
params = {
    "sparsity": 0.3,  # This means 10% of elements will be kept, 90% zeroed out
    "quant_level": 256  # Number of quantization levels
}

quantizer = SqcCompressor(params=params)
temp.apply_quant(quantizer=quantizer)
temp.state_dict()

OrderedDict([('fc1.weight',
              tensor([[ 0.0000, -0.0726, -0.0779,  ...,  0.0000, -0.0000, -0.0000],
                      [-0.0000, -0.0000, -0.0000,  ..., -0.0000, -0.0000, -0.0000],
                      [-0.0000,  0.0000,  0.0000,  ..., -0.0000, -0.0000, -0.0000],
                      ...,
                      [ 0.0000,  0.0604,  0.0779,  ...,  0.0534,  0.0000, -0.0674],
                      [-0.0000, -0.0000,  0.0000,  ...,  0.0726, -0.0000, -0.0000],
                      [-0.0000, -0.0709,  0.0000,  ...,  0.0726, -0.0000, -0.0000]],
                     device='cuda:0')),
             ('fc1.bias',
              tensor([0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
                      0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
                      0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
                    

: 

In [12]:
for comm_round in range(record["epoch"],record["epoch"]+config.rounds):
    logger.info(f"Comm Round: {comm_round}")
    client_losses = []
    client_accs = []

    # Create the graph for this round
    if config.topology == "random":
        G = create_random_graph(config.users, config.p, config.graph_name)
    elif config.topology == "ring":
        G = create_ring_graph(config.users, config.graph_name)
    elif config.topology == "regular":
        if config.p is not None:
            raise ValueError("Regular graph requires d, not p")
        elif config.d is None:
            raise ValueError("Regular graph requires d")
        G = create_regular_graph(config.users, config.d, config.graph_name)
        if config.verbose: logger.info(f"Creating regular graph with d={config.d}")
    else: 
        raise ValueError("Invalid topology: {}".format(config.topology))

    # All clients perform multiple rounds of local training
    for client_id in user_ids:
        # SGD w/ momentum
        loss_fn_pytorch = nn.CrossEntropyLoss()
        train_client_momentum(model_dict[client_id], client_id, dataset, config, logger, 
        loss_fn_pytorch, sgd_batch_size=config.sgd_batch_size, local_epochs=config.local_update_steps,
        lr=config.lr, momentum=config.momentum, verbose=config.verbose)
    
    # All clients average with neighbors
    # Must find the avged weights then load weights to mimic synchronous averaging
    new_avged_weights = {}
    # Get new weights for all clients
    for client_id in user_ids:
        neighbors = list(G.neighbors(client_id))
        new_avged_weights[client_id] = average_neighbor_weights(client_id, neighbors, model_dict)
    # Load new weights for all clients
    for client_id in user_ids:
        model_dict[client_id].load_state_dict(new_avged_weights[client_id])
    del new_avged_weights
    
    # Now, test individual client accs
    with torch.no_grad():
        for client_id in user_ids:
            # Get client accuracy
            output_on_test_set = model_dict[client_id](test_images)
            acc = accuracy_with_output(output_on_test_set, test_labels)
            client_accs.append(acc)

    # Test the global, aggregated model
    # Note: Weighted averaging is unnecessary since all clients have the same number of samples
    
    # Init model to load aggregated state dict
    temp_global_model = init_model(config, logger)
    temp_global_model.load_state_dict(average_neighbor_weights(0, user_ids[1:], model_dict))
    
    global_output = temp_global_model(test_images)
    global_loss = nn.CrossEntropyLoss()(global_output, test_labels)
    global_acc = accuracy_with_output(global_output, test_labels)
    
    if comm_round % 5 == 0: 
        logger.info(f"Round {comm_round}: Test Loss: {global_loss.item()}, Avg Client Acc: {np.mean(client_accs)}, Agg Acc: {global_acc}")

    # Record the results
    record["testing_accuracy"].append(client_accs)
    
    if 'aggregated_accs' in record:
        record['aggregated_accs'].append(global_acc)
    else:
        record['aggregated_accs'] = [global_acc]

    record["epoch"] += 1

# Save the record
record["models"] = model_dict
record["dfedavgm_hyperparameters"] = {"learning_rate": config.lr, "local_update_steps": config.local_update_steps,
                                        "sgd_batch_size": config.sgd_batch_size, "momentum": config.momentum}
record["user_with_data"] = dataset["user_with_data"]
record["topology"] = config.topology
record["p"] = config.p
record["d"] = config.d

Comm Round: 0
  from .autonotebook import tqdm as notebook_tqdm
Round 0: Test Loss: 1.9127050638198853, Avg Client Acc: 0.28770899295806884, Agg Acc: 0.4334999918937683
Comm Round: 1
Comm Round: 2
Comm Round: 3
Comm Round: 4
Comm Round: 5
Round 5: Test Loss: 1.6543687582015991, Avg Client Acc: 0.4359306553006172, Agg Acc: 0.44290000200271606
Comm Round: 6
Comm Round: 7
Comm Round: 8
Comm Round: 9
Comm Round: 10
Round 10: Test Loss: 1.1399308443069458, Avg Client Acc: 0.5234139875570933, Agg Acc: 0.5979999899864197
Comm Round: 11
Comm Round: 12
Comm Round: 13
Comm Round: 14
Comm Round: 15
Round 15: Test Loss: 1.0309802293777466, Avg Client Acc: 0.559088652630647, Agg Acc: 0.6266999840736389
Comm Round: 16
Comm Round: 17
Comm Round: 18
Comm Round: 19
Comm Round: 20
Round 20: Test Loss: 0.9091405868530273, Avg Client Acc: 0.5946646518508594, Agg Acc: 0.6638000011444092
Comm Round: 21
Comm Round: 22
Comm Round: 23
Comm Round: 24
Comm Round: 25
Round 25: Test Loss: 0.8660927414894104, Avg C

In [None]:
record["aggregated_accs"]

[0.6800000071525574,
 0.7443999648094177,
 0.7813999652862549,
 0.7925999760627747,
 0.7997999787330627,
 0.8075999617576599,
 0.8172999620437622,
 0.8193999528884888,
 0.826200008392334,
 0.8288999795913696,
 0.8328999876976013,
 0.8369999527931213,
 0.8379999995231628,
 0.8416000008583069,
 0.840999960899353,
 0.8458999991416931,
 0.8448999524116516,
 0.848800003528595,
 0.8481999635696411,
 0.8499000072479248,
 0.8508999943733215,
 0.8529999852180481,
 0.8542999625205994,
 0.8537999987602234,
 0.8536999821662903,
 0.8553999662399292,
 0.8565999865531921,
 0.8571999669075012,
 0.8578999638557434,
 0.8571999669075012,
 0.8592000007629395,
 0.8586999773979187,
 0.8611999750137329,
 0.8603999614715576,
 0.8614999651908875,
 0.8610999584197998,
 0.8628000020980835,
 0.8625999689102173,
 0.8634999990463257,
 0.8631999492645264,
 0.8649999499320984,
 0.8650999665260315,
 0.8646000027656555,
 0.866599977016449,
 0.8662999868392944,
 0.8661999702453613,
 0.8669999837875366,
 0.86659997701644