# ðŸ“‹ Task 1: FedSGD vs. Centralized SGD Equivalence

This notebook implements Task 1 from the assignment. The goal is to demonstrate the theoretical equivalence between **Federated SGD (FedSGD)** and **Centralized SGD** under a controlled scenario:

1.  **FedSGD:** All clients participate, and each performs a *single SGD step* on its *entire* local dataset (full-batch).
2.  **Centralized SGD:** One model is trained with a *single SGD step* on the *entire* global dataset (full-batch).

We will use the `SimpleCNN` model from `federated_learning.py` to ensure architectural consistency with Task 2.

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Subset, ConcatDataset
import numpy as np
import copy
import matplotlib.pyplot as plt

# Ensure plots appear inline in the notebook
%matplotlib inline

# --- Import from your Task 2 file ---
# We reuse the model, aggregation, and evaluation functions
# from your 'federated_learning.py' file.
try:
    from federated_learning import SimpleCNN, aggregate_models, evaluate_model
    print("Successfully imported components from federated_learning.py")
    print("Make sure 'federated_learning.py' is in the same directory.")
except ImportError:
    print("ERROR: Could not find 'federated_learning.py'.")
    print("Please make sure it is in the same directory as this notebook.")

Successfully imported components from federated_learning.py
Make sure 'federated_learning.py' is in the same directory.


In [2]:
# --- Task 1 Configuration ---
NUM_CLIENTS = 6       # As suggested in assignment
NUM_ROUNDS = 20      # Run for 10-20 iterations
LEARNING_RATE = 0.01
SEED = 42
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

print(f"Using device: {DEVICE}")

# Set seeds for reproducibility
torch.manual_seed(SEED)
np.random.seed(SEED)

Using device: cuda


In [3]:
def load_data_task1(num_clients: int) -> tuple:
    """
    Loads CIFAR-10 and prepares data for Task 1:
    1. Centralized Loader: Full training set in one batch.
    2. FedSGD Loaders: IID split, each client's loader has one full batch.
    3. Test Loader: Standard test set.
    """
    transform = transforms.Compose(
        [
            transforms.ToTensor(),
            transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
        ]
    )

    train_dataset = torchvision.datasets.CIFAR10(
        root="./data", train=True, download=True, transform=transform
    )
    test_dataset = torchvision.datasets.CIFAR10(
        root="./data", train=False, download=True, transform=transform
    )

    # 1. Centralized Loader
    centralized_loader = DataLoader(
        train_dataset, batch_size=len(train_dataset), shuffle=False
    )
    print(f"Centralized loader: 1 batch of {len(train_dataset)} samples.")

    # 2. FedSGD Loaders (IID)
    total_size = len(train_dataset)
    indices = np.random.permutation(total_size)
    # Split evenly
    split_size = total_size // num_clients
    
    fedsgd_loaders = []
    client_sizes = []

    for i in range(num_clients):
        start_idx = i * split_size
        # Assign remainder to the last client
        end_idx = (i + 1) * split_size if i < num_clients - 1 else total_size
        client_indices = indices[start_idx:end_idx]
        
        client_subset = Subset(train_dataset, client_indices)
        
        # This is the key change: batch_size = full local dataset size
        client_loader = DataLoader(
            client_subset, batch_size=len(client_indices), shuffle=False
        )
        
        fedsgd_loaders.append(client_loader)
        client_sizes.append(len(client_indices))

    print(f"FedSGD loaders: {num_clients} clients.")
    print(f"Client sizes: {client_sizes}")

    # Calculate client weights (N_i / N) for aggregation
    total_samples = sum(client_sizes)
    client_weights = [size / total_samples for size in client_sizes]

    # 3. Test Loader
    test_loader = DataLoader(test_dataset, batch_size=128, shuffle=False)
    
    return centralized_loader, fedsgd_loaders, test_loader, client_weights

In [4]:
def train_centralized(model, data_loader, test_loader, lr, num_rounds):
    """
    Trains a model using centralized, full-batch Gradient Descent.
    """
    model.to(DEVICE)
    optimizer = optim.SGD(model.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()
    history = {'test_acc': [], 'test_loss': [], 'weights': []}
    
    # Get the single, full batch of data
    full_data, full_target = next(iter(data_loader))
    full_data, full_target = full_data.to(DEVICE), full_target.to(DEVICE)

    print("\n--- Starting Centralized Training ---")
    for round_idx in range(num_rounds):
        model.train()
        
        # Perform one full-batch SGD step
        optimizer.zero_grad()
        output = model(full_data)
        loss = criterion(output, full_target)
        loss.backward()
        optimizer.step()
        
        # Evaluate
        test_acc, test_loss = evaluate_model(model, test_loader, DEVICE)
        history['test_acc'].append(test_acc)
        history['test_loss'].append(test_loss)
        history['weights'].append(copy.deepcopy(model.state_dict()))
        
        print(f"Round {round_idx+1}/{num_rounds} | Test Loss: {test_loss:.4f} | Test Acc: {test_acc:.2f}%")
        
    print("--- Centralized Training Complete ---")
    return history

In [5]:
def client_update_task1(model, data_loader, lr):
    """
    Performs K=1 local step of full-batch GD.
    """
    model.to(DEVICE)
    model.train()
    optimizer = optim.SGD(model.parameters(), lr=lr)
    criterion = nn.CrossEntropyLoss()

    # The data loader was created to have only ONE batch
    try:
        data, target = next(iter(data_loader))
    except StopIteration:
        print("Error: Client data loader is empty.")
        return model.state_dict()
        
    data, target = data.to(DEVICE), target.to(DEVICE)

    # Perform one full-batch SGD step
    optimizer.zero_grad()
    output = model(data)
    loss = criterion(output, target)
    loss.backward()
    optimizer.step()
    
    return model.state_dict()

def train_fedsgd(model, client_loaders, test_loader, client_weights, lr, num_rounds):
    """
    Trains a model using FedSGD.
    - K=1 full-batch step per client
    - Full client participation
    """
    model.to(DEVICE)
    history = {'test_acc': [], 'test_loss': [], 'weights': []}
    
    print("\n--- Starting FedSGD Training ---")
    for round_idx in range(num_rounds):
        client_models_params = []
        global_params = model.state_dict()
        
        # All clients participate in each round
        for client_idx in range(len(client_loaders)):
            local_model = SimpleCNN() # Use the same imported model class
            local_model.load_state_dict(copy.deepcopy(global_params))
            
            # Perform the single local step
            updated_params = client_update_task1(
                local_model,
                client_loaders[client_idx],
                lr=lr
            )
            client_models_params.append(updated_params)
            
        # Aggregate the *model parameters*
        # Note: Aggregating 1-step models is equivalent to averaging 1-step gradients
        aggregated_params = aggregate_models(client_models_params, client_weights)
        model.load_state_dict(aggregated_params)
        
        # Evaluate
        test_acc, test_loss = evaluate_model(model, test_loader, DEVICE)
        history['test_acc'].append(test_acc)
        history['test_loss'].append(test_loss)
        history['weights'].append(copy.deepcopy(model.state_dict()))
        
        print(f"Round {round_idx+1}/{num_rounds} | Test Loss: {test_loss:.4f} | Test Acc: {test_acc:.2f}%")

    print("--- FedSGD Training Complete ---")
    return history

In [6]:
# --- Main Comparison Logic ---

# 1. Load Data
central_loader, fedsgd_loaders, test_loader, client_weights = load_data_task1(NUM_CLIENTS)

# 2. Initialize Models
# Must start with the *exact* same weights
model_fedsgd = SimpleCNN()
model_centralized = SimpleCNN()
model_centralized.load_state_dict(copy.deepcopy(model_fedsgd.state_dict()))

# 3. Run Training
history_fedsgd = train_fedsgd(
    model_fedsgd, fedsgd_loaders, test_loader, client_weights, LEARNING_RATE, NUM_ROUNDS
)
history_centralized = train_centralized(
    model_centralized, central_loader, test_loader, LEARNING_RATE, NUM_ROUNDS
)

# 4. Analyze and Verify Equivalence
print("\n--- Verification of Equivalence ---")
print("Round | FedSGD Acc (%) | Central Acc (%) | Norm of Weight Difference")
print("----------------------------------------------------------------------")

weight_diffs = []
rounds = range(1, NUM_ROUNDS + 1)

for i in range(NUM_ROUNDS):
    acc_f = history_fedsgd['test_acc'][i]
    acc_c = history_centralized['test_acc'][i]
    
    # Get weights from this round
    params_f = history_fedsgd['weights'][i]
    params_c = history_centralized['weights'][i]
    
    # Convert to vectors for comparison
    vec_f = nn.utils.parameters_to_vector([p for p in params_f.values()])
    vec_c = nn.utils.parameters_to_vector([p for p in params_c.values()])
    
    # Calculate L2 norm of the difference
    diff = torch.norm(vec_f - vec_c, p=2).item()
    weight_diffs.append(diff)
    
    print(f"  {i+1:2d}  |    {acc_f:6.2f}    |    {acc_c:6.2f}     | {diff:e}")
    
# 5. Plot Results
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 10))

# Plot 1: Test Accuracy Comparison
ax1.plot(rounds, history_fedsgd['test_acc'], 'bo-', label='FedSGD', markersize=4)
ax1.plot(rounds, history_centralized['test_acc'], 'rs--', label='Centralized SGD', markersize=4)
ax1.set_title(f'Task 1: FedSGD vs. Centralized SGD (IID, Full-Batch, {NUM_CLIENTS} Clients)')
ax1.set_xlabel('Communication Round / SGD Step')
ax1.set_ylabel('Test Accuracy (%)')
ax1.legend()
ax1.grid(True, linestyle='--', alpha=0.6)

# Plot 2: Norm of Weight Difference
ax2.plot(rounds, weight_diffs, 'g-o', label='L2 Norm( || W_fedsgd - W_central || )', markersize=4)
ax2.set_title('Difference Between Model Weights')
ax2.set_xlabel('Communication Round / SGD Step')
ax2.set_ylabel('L2 Norm')
ax2.set_yscale('log') # Use log scale to see small differences
ax2.legend()
ax2.grid(True, linestyle='--', alpha=0.6)

plt.tight_layout()
plt.savefig('task1_comparison.png', dpi=300)
print("\nSaved comparison plot to 'task1_comparison.png'")
plt.show()

Centralized loader: 1 batch of 50000 samples.
FedSGD loaders: 6 clients.
Client sizes: [8333, 8333, 8333, 8333, 8333, 8335]

--- Starting FedSGD Training ---
Round 1/20 | Test Loss: 2.3032 | Test Acc: 8.82%
Round 2/20 | Test Loss: 2.3012 | Test Acc: 9.19%
Round 3/20 | Test Loss: 2.2992 | Test Acc: 9.68%
Round 4/20 | Test Loss: 2.2973 | Test Acc: 10.12%
Round 5/20 | Test Loss: 2.2954 | Test Acc: 10.68%
Round 6/20 | Test Loss: 2.2936 | Test Acc: 11.36%
Round 7/20 | Test Loss: 2.2918 | Test Acc: 11.83%
Round 8/20 | Test Loss: 2.2900 | Test Acc: 12.59%
Round 9/20 | Test Loss: 2.2883 | Test Acc: 13.05%
Round 10/20 | Test Loss: 2.2866 | Test Acc: 13.49%
Round 11/20 | Test Loss: 2.2849 | Test Acc: 13.85%
Round 12/20 | Test Loss: 2.2832 | Test Acc: 14.32%
Round 13/20 | Test Loss: 2.2815 | Test Acc: 14.63%
Round 14/20 | Test Loss: 2.2798 | Test Acc: 14.97%
Round 15/20 | Test Loss: 2.2781 | Test Acc: 15.25%
Round 16/20 | Test Loss: 2.2765 | Test Acc: 15.54%
Round 17/20 | Test Loss: 2.2748 | Test

OutOfMemoryError: CUDA out of memory. Tried to allocate 6.10 GiB. GPU 0 has a total capacity of 8.00 GiB of which 0 bytes is free. Of the allocated memory 6.87 GiB is allocated by PyTorch, and 1.65 GiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation.  See documentation for Memory Management  (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)