# Experiment 3: Additional Labels Experiments
This notebook contains templates for additional labels experiments with different configurations:
- Tasks: Binary Classification and Image Classification (no regression for label experiments)
- Network Types: Centralised and Decentralised
- Network Size: 10 (fixed)
- LabelPartitioner: 10 (fixed)
- Additional Labels: 1, 3, 5

In [1]:
import sys
sys.path.append("../..")
import torch
import numpy as np
import torch.nn as nn
from torch.optim import SGD
import asyncio
import nest_asyncio
from flwr_datasets.partitioner import DirichletPartitioner, Partitioner
from moxi import create_experiment

# Initialize asyncio for Jupyter
nest_asyncio.apply()



## Model Definitions
Define the models for classification tasks (binary and image)

In [2]:
# Binary Classification Model
class BinaryClassifierModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.layer1 = nn.Linear(30, 64)
        self.act_func = nn.ReLU()
        self.layer2 = nn.Linear(64, 1)

    def forward(self, x):
        x = self.act_func(self.layer1(x))
        x = self.layer2(x)
        return x

# Image Classification Model
class CNNModel(nn.Module):
    def __init__(self):
        super().__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.fc1 = nn.Linear(128 * 4 * 4, 512)
        self.fc2 = nn.Linear(512, 10)
        self.maxpool = nn.MaxPool2d(2, 2)
        self.activation = nn.ReLU()
        
    def forward(self, x):
        x = self.activation(self.maxpool(self.conv1(x)))
        x = self.activation(self.maxpool(self.conv2(x)))
        x = self.activation(self.maxpool(self.conv3(x)))
        x = x.view(-1, 128 * 4 * 4)
        x = self.activation(self.fc1(x))
        x = self.fc2(x)
        return x

## Experiment Runner Function

In [3]:
async def execute_experiment(flnw, num_rounds:int, epochs_per_round:int, experiment_name:str):
    """
    Execute a federated learning experiment.
    
    Args:
        flnw: Federated learning network object
        num_rounds: Number of federated learning rounds
        epochs_per_round: Number of local epochs per round
        experiment_name: Name for the experiment
    """
    await flnw.train(num_rounds=num_rounds, epochs_per_round=epochs_per_round, experiment_name=experiment_name)
    print(f"Experiment '{experiment_name}' completed.")

In [4]:
## Custom Partitioner

In [5]:
from datasets import load_dataset

from datasets import load_dataset, Dataset
import numpy as np
from flwr_datasets.partitioner import Partitioner

class LabelDirichletPartitioner(Partitioner):
    """Filter by labels, then distribute with Dirichlet across clients."""

    def __init__(
        self,
        train_labels=None,
        test_labels=None,
        alpha: float = 0.5,
        seed: int = 42,
        num_partitions: int = 1,
        label_key: str = "label",  # dataset column to partition by
    ):
        # Initialize the parent class (sets _dataset = None)
        super().__init__()
        
        self.train_labels = train_labels
        self.test_labels = test_labels
        self.alpha = alpha
        self.seed = seed
        self._num_partitions = num_partitions
        self.label_key = label_key
        self.rng = np.random.default_rng(seed)
        
        # Cache for partition indices (computed once when dataset is assigned)
        self._partition_indices = None
        self._filtered_dataset = None

    @property
    def num_partitions(self) -> int:
        return self._num_partitions

    @property
    def dataset(self) -> Dataset:
        """Dataset property (inherited from Partitioner)."""
        return super().dataset

    @dataset.setter
    def dataset(self, value: Dataset) -> None:
        """Set the dataset and compute partitions."""
        # Use parent setter (includes validation)
        super(LabelDirichletPartitioner, self.__class__).dataset.fset(self, value)
        
        # Apply label filtering if specified
        if self.train_labels is not None:
            self._filtered_dataset = value.filter(
                lambda x: x[self.label_key] in self.train_labels
            )
        else:
            self._filtered_dataset = value
            
        # Compute partition indices once when dataset is assigned
        labels = np.array(self._filtered_dataset[self.label_key])
        self._partition_indices = self._dirichlet_split(labels, self._num_partitions)

    def load_partition(self, partition_id: int) -> Dataset:
        """Load a single partition based on the partition index."""
        if not self.is_dataset_assigned():
            raise ValueError("Dataset must be assigned before loading partitions")
            
        if partition_id >= self._num_partitions:
            raise ValueError(f"partition_id {partition_id} >= num_partitions {self._num_partitions}")
        
        if partition_id < 0:
            raise ValueError(f"partition_id must be non-negative, got {partition_id}")
            
        # Return the partition using precomputed indices
        partition_indices = self._partition_indices[partition_id]
        if len(partition_indices) == 0:
            # Return empty dataset with same structure if no samples for this partition
            return self._filtered_dataset.select([])
            
        return self._filtered_dataset.select(partition_indices)

    def _dirichlet_split(self, labels, num_partitions):
        """Split indices into clients using a Dirichlet distribution per class."""
        if len(labels) == 0:
            return [[] for _ in range(num_partitions)]
            
        unique_labels = np.unique(labels)
        label_indices = {c: np.where(labels == c)[0] for c in unique_labels}
        client_indices = [[] for _ in range(num_partitions)]

        for c, idxs in label_indices.items():
            if len(idxs) == 0:
                continue
                
            # Generate proportions per client for this class
            proportions = self.rng.dirichlet([self.alpha] * num_partitions)
            
            # Shuffle indices to ensure random distribution
            self.rng.shuffle(idxs)
            
            # Calculate how many samples each client gets for this class
            counts = (proportions * len(idxs)).astype(int)
            
            # Handle rounding issues - distribute remaining samples
            remaining = len(idxs) - counts.sum()
            for i in range(remaining):
                counts[i % num_partitions] += 1
            
            # Distribute indices to clients
            start_idx = 0
            for client_id, count in enumerate(counts):
                if count > 0:
                    end_idx = start_idx + count
                    client_indices[client_id].extend(idxs[start_idx:end_idx].tolist())
                    start_idx = end_idx

        return client_indices

## Experiment Configuration

In [6]:
# Experiment parameters
additional_labels = [1, 3, 5]  # Additional labels to test
ADDITIONAL_LABEL_TRAIN = {1: [0,1], 3: [0,1,2,3], 5: [0,1,2,3,4,5]}
testing_labels = [1] 
network_types = ["centralised", "decentralised"]  # Network architectures
task_types = [
    {"name": "image_classification", "model": CNNModel, "criterion": nn.CrossEntropyLoss}
]

## Run Experiments
Choose a specific configuration to run by uncommenting and modifying the code below.

In [7]:
# Example: Centralised Binary Classification with 1 additional label
# Change these variables to run different experiments
"""
task_type = task_types[0]  # 0: binary_classification, 1: image_classification
network_type = network_types[0]  # 0: centralised, 1: decentralised
additional_label = additional_labels[0]  # Choose from additional_labels list

# Create the configuration
config = {
    "network_name": f"{network_type}_{task_type['name']}_addlabels_{additional_label}",
    "network_type": network_type,
    "model_type": "parametric",
    "ml_framework": "pytorch",
    "comms": "async",
    "federated_rounds": 5,
    "network_size": 10,  # Fixed network size for Experiment 3
    "metrics": ["mean_perfomance", "convergence"],
    "logger": "mlflow",
    "node_base_config": {
        "model": None,
        "learning_rate": 1e-3,
        "optimizer": SGD,
        "train_data": None,
        "val_data": None,
        "criterion": task_type["criterion"],
        "random_sampling": False,
        "n_epochs": 5,
        "batch_size": 16
    },
    "experiment_config": {
        "task": task_type["name"],
        "partitioner": LabelDirichletPartitioner,
        "partitioner_params": { "train_labels": ADDITIONAL_LABEL_TRAIN[additional_label], "test_labels": testing_labels, "alpha":10 }, # Fixed alpha for Experiment 3
        "num_worker": 2,
        "max_samples": 1000,
        "additional_labels": additional_label  # Variable parameter
    },
    "adjcency_matrix": None,
    "model": None
}

# Create the model
model = task_type["model"]()

# Create the experiment
print(f"Running experiment: {network_type} {task_type['name']} with {additional_label} additional labels")
flnw = create_experiment(config, model)

# Run the experiment
await execute_experiment(
    flnw, 
    num_rounds=5,  # Set to desired number (20 for full experiment)
    epochs_per_round=1, 
    experiment_name=f"exp3_{network_type}_{task_type['name']}_addlabels_{additional_label}"
)

"""

'\ntask_type = task_types[0]  # 0: binary_classification, 1: image_classification\nnetwork_type = network_types[0]  # 0: centralised, 1: decentralised\nadditional_label = additional_labels[0]  # Choose from additional_labels list\n\n# Create the configuration\nconfig = {\n    "network_name": f"{network_type}_{task_type[\'name\']}_addlabels_{additional_label}",\n    "network_type": network_type,\n    "model_type": "parametric",\n    "ml_framework": "pytorch",\n    "comms": "async",\n    "federated_rounds": 5,\n    "network_size": 10,  # Fixed network size for Experiment 3\n    "metrics": ["mean_perfomance", "convergence"],\n    "logger": "mlflow",\n    "node_base_config": {\n        "model": None,\n        "learning_rate": 1e-3,\n        "optimizer": SGD,\n        "train_data": None,\n        "val_data": None,\n        "criterion": task_type["criterion"],\n        "random_sampling": False,\n        "n_epochs": 5,\n        "batch_size": 16\n    },\n    "experiment_config": {\n        "ta

In [8]:
# Custom experiment runner - select specific combinations
# Uncomment and modify to run specific experiment combinations
async def run_selected_experiments(selected_task_types=None, selected_network_types=None, selected_additional_labels=None):
    """
    Run experiments with specific parameter combinations
    
    Args:
        selected_task_types: List of task indices to run (e.g., [0] for binary_classification only)
        selected_network_types: List of network type indices to run (e.g., [0] for centralised only)
        selected_additional_labels: List of additional label values to run (e.g., [1, 3])
    """
    if selected_task_types is None:
        selected_task_types = range(len(task_types))
    if selected_network_types is None:
        selected_network_types = range(len(network_types))
    if selected_additional_labels is None:
        selected_additional_labels = additional_labels
        
    experiment_count = 0
    total_experiments = len(selected_task_types) * len(selected_network_types) * len(selected_additional_labels)
    start_time = time.time()
    
    print(f"Starting selected experiments: {total_experiments} total experiments to run")
    
    # Loop through selected task types
    for task_idx in selected_task_types:
        task_type = task_types[task_idx]
        
        # Loop through selected network types
        for net_idx in selected_network_types:
            network_type = network_types[net_idx]
            
            print(f"\n{'='*80}")
            print(f"Running experiments for {network_type} {task_type['name']}")
            print(f"{'='*80}\n")
            
            # Loop through selected additional label values
            for additional_label in selected_additional_labels:
                experiment_count += 1
                
                print(f"\nExperiment {experiment_count}/{total_experiments}")
                print(f"Task: {task_type['name']}, Network: {network_type}, Additional Labels: {additional_label}")
                
                # Create configuration
                config = {
                    "network_name": f"{network_type}_{task_type['name']}_addlabels_{additional_label}",
                    "network_type": network_type,
                    "model_type": "parametric",
                    "ml_framework": "pytorch",
                    "comms": "async",
                    "federated_rounds": 5,
                    "network_size": 10,  # Fixed for Experiment 3
                    "metrics": ["mean_perfomance", "convergence"],
                    "logger": "mlflow",
                    "node_base_config": {
                        "model": None,
                        "learning_rate": 1e-3,
                        "optimizer": SGD,
                        "train_data": None,
                        "val_data": None,
                        "criterion": task_type["criterion"],
                        "random_sampling": False,
                        "n_epochs": 5,
                        "batch_size": 16
                    },
                    "experiment_config": {
                        "task": task_type["name"],
                        "partitioner": DirichletPartitioner,
                        "alpha": 10,  # Fixed alpha for Experiment 3
                        "num_worker": 2,
                        "max_samples": 1000,
                        "additional_labels": additional_label  # Variable parameter
                    },
                    "adjcency_matrix": None,
                    "model": None
                }
                
                try:
                    # Create model
                    model = task_type["model"]()
                    
                    # Create experiment
                    print(f"Creating experiment: {network_type} {task_type['name']} with {additional_label} additional labels")
                    flnw = create_experiment(config, model)
                    
                    # Execute experiment
                    print(f"Running experiment...")
                    await execute_experiment(
                        flnw, 
                        num_rounds=5,  # Set to 20 for full experiment, 5 for testing
                        epochs_per_round=1,
                        experiment_name=f"exp3_{network_type}_{task_type['name']}_addlabels_{additional_label}"
                    )
                    
                except Exception as e:
                    print(f"Error in experiment {experiment_count}: {e}")
                
                # Wait briefly between experiments
                await asyncio.sleep(2)
    
    total_time = time.time() - start_time
    print(f"\n{'='*80}")
    print(f"Selected experiments completed!")
    print(f"Total time: {total_time:.1f} seconds")
    print(f"{'='*80}\n")

# Example usage:
# Run only binary classification with both network types but only additional labels 1 and 3
# await run_selected_experiments(
#     selected_task_types=[0],  # 0: binary_classification
#     selected_network_types=[0, 1],  # Both centralised and decentralised
#     selected_additional_labels=[1, 3]  # Only fewer additional labels
# )

# Run all tasks but only for centralised networks with 1 additional label
# await run_selected_experiments(
#     selected_network_types=[0],  # 0: centralised
#     selected_additional_labels=[1]  # Only 1 additional label
# )

## Automated Experiment Loop
This cell runs all experiments for a specific network type and task type.
Uncomment and modify to run a batch of experiments.

In [9]:
# Run all experiments for all task types and network types
import asyncio
import time
import datetime

async def run_all_experiments():
    """Run all additional labels experiments for all task types and network architectures"""
    
    experiment_count = 0
    total_experiments = len(task_types) * len(network_types) * len(additional_labels)
    start_time = time.time()
    
    print(f"Starting all experiments: {total_experiments} total experiments to run")
    print(f"Start time: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    
    # Loop through all task types
    for task_idx, task_type in enumerate(task_types):
        # Loop through all network types
        for net_idx, network_type in enumerate(network_types):
            print(f"\n{'='*80}")
            print(f"Running experiments for {network_type} {task_type['name']}")
            print(f"{'='*80}\n")
            
            # Loop through all additional label values
            for label_idx, additional_label in enumerate(additional_labels):
                experiment_count += 1
                
                print(f"\nExperiment {experiment_count}/{total_experiments}")
                print(f"Task: {task_type['name']}, Network: {network_type}, Additional Labels: {additional_label}")
                
                # Create configuration
                config = {
                    "network_name": f"{network_type}_{task_type['name']}_addlabels_{additional_label}",
                    "network_type": network_type,
                    "model_type": "parametric",
                    "ml_framework": "pytorch",
                    "comms": "async",
                    "federated_rounds": 5,
                    "network_size": 10,  # Fixed for Experiment 3
                    "metrics": ["mean_perfomance", "convergence"],
                    "logger": "mlflow",
                    "node_base_config": {
                        "model": None,
                        "learning_rate": 1e-3,
                        "optimizer": SGD,
                        "train_data": None,
                        "val_data": None,
                        "criterion": task_type["criterion"],
                        "random_sampling": False,
                        "n_epochs": 5,
                        "batch_size": 16
                    },
                    "experiment_config": {
                        "task": task_type["name"],
                        "partitioner": LabelDirichletPartitioner,
                        "partitioner_params": { "train_labels": ADDITIONAL_LABEL_TRAIN[additional_label], "test_labels": testing_labels, "alpha":10, "num_partitions":10 }, # Fixed alpha for Experiment 3
                        "num_worker": 2,
                        "max_samples": 1000,
                        "additional_labels": additional_label  # Variable parameter
                    },
                    "adjcency_matrix": None,
                    "model": None
                }
                
                try:
                    # Create model
                    model = task_type["model"]()
                    
                    # Create experiment
                    print(f"Creating experiment: {network_type} {task_type['name']} with {additional_label} additional labels")
                    flnw = create_experiment(config, model)
                    
                    # Execute experiment
                    print(f"Running experiment...")
                    await execute_experiment(
                        flnw, 
                        num_rounds=5,  # Set to 20 for full experiment, 5 for testing
                        epochs_per_round=1,
                        experiment_name=f"exp3_{network_type}_{task_type['name']}_addlabels_{additional_label}"
                    )
                    
                    # Calculate elapsed time
                    elapsed = time.time() - start_time
                    avg_time_per_exp = elapsed / experiment_count
                    remaining = avg_time_per_exp * (total_experiments - experiment_count)
                    
                    # Print progress
                    print(f"\nCompleted experiment {experiment_count}/{total_experiments}")
                    print(f"Elapsed time: {elapsed:.1f} seconds")
                    print(f"Estimated time remaining: {remaining:.1f} seconds")
                    print(f"Estimated completion time: {datetime.datetime.now() + datetime.timedelta(seconds=remaining)}")
                    
                except Exception as e:
                    print(f"Error in experiment {experiment_count}: {e}")
                
                # Wait briefly between experiments
                await asyncio.sleep(2)
    
    # Print completion summary
    total_time = time.time() - start_time
    print(f"\n{'='*80}")
    print(f"All experiments completed!")
    print(f"Total time: {total_time:.1f} seconds")
    print(f"End time: {datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"{'='*80}\n")



In [10]:
await run_all_experiments()

Starting all experiments: 6 total experiments to run
Start time: 2025-09-09 09:41:58

Running experiments for centralised image_classification


Experiment 1/6
Task: image_classification, Network: centralised, Additional Labels: 1
Creating experiment: centralised image_classification with 1 additional labels
Loading federated CIFAR-10 dataset: uoft-cs/cifar10


2025/09/09 09:42:01 INFO mlflow.tracking.fluent: Experiment with name 'exp3_centralised_image_classification_addlabels_1' does not exist. Creating a new experiment.


Running experiment...
Training Complete!
Experiment 'exp3_centralised_image_classification_addlabels_1' completed.

Completed experiment 1/6
Elapsed time: 99.7 seconds
Estimated time remaining: 498.7 seconds
Estimated completion time: 2025-09-09 09:51:56.559776

Experiment 2/6
Task: image_classification, Network: centralised, Additional Labels: 3
Creating experiment: centralised image_classification with 3 additional labels
Loading federated CIFAR-10 dataset: uoft-cs/cifar10


Filter:   0%|          | 0/50000 [00:00<?, ? examples/s]

2025/09/09 09:43:48 INFO mlflow.tracking.fluent: Experiment with name 'exp3_centralised_image_classification_addlabels_3' does not exist. Creating a new experiment.


Running experiment...
Training Complete!
Experiment 'exp3_centralised_image_classification_addlabels_3' completed.

Completed experiment 2/6
Elapsed time: 212.8 seconds
Estimated time remaining: 425.5 seconds
Estimated completion time: 2025-09-09 09:52:36.468043

Experiment 3/6
Task: image_classification, Network: centralised, Additional Labels: 5
Creating experiment: centralised image_classification with 5 additional labels
Loading federated CIFAR-10 dataset: uoft-cs/cifar10


Filter:   0%|          | 0/50000 [00:00<?, ? examples/s]

2025/09/09 09:45:41 INFO mlflow.tracking.fluent: Experiment with name 'exp3_centralised_image_classification_addlabels_5' does not exist. Creating a new experiment.


Running experiment...
Training Complete!
Experiment 'exp3_centralised_image_classification_addlabels_5' completed.

Completed experiment 3/6
Elapsed time: 326.6 seconds
Estimated time remaining: 326.6 seconds
Estimated completion time: 2025-09-09 09:52:51.395277

Running experiments for decentralised image_classification


Experiment 4/6
Task: image_classification, Network: decentralised, Additional Labels: 1
Creating experiment: decentralised image_classification with 1 additional labels
Loading federated CIFAR-10 dataset: uoft-cs/cifar10


2025/09/09 09:47:28 INFO mlflow.tracking.fluent: Experiment with name 'exp3_decentralised_image_classification_addlabels_1' does not exist. Creating a new experiment.


Running experiment...
Training Complete!
Experiment 'exp3_decentralised_image_classification_addlabels_1' completed.

Completed experiment 4/6
Elapsed time: 428.3 seconds
Estimated time remaining: 214.2 seconds
Estimated completion time: 2025-09-09 09:52:40.666476

Experiment 5/6
Task: image_classification, Network: decentralised, Additional Labels: 3
Creating experiment: decentralised image_classification with 3 additional labels
Loading federated CIFAR-10 dataset: uoft-cs/cifar10


2025/09/09 09:49:10 INFO mlflow.tracking.fluent: Experiment with name 'exp3_decentralised_image_classification_addlabels_3' does not exist. Creating a new experiment.


Running experiment...
Training Complete!
Experiment 'exp3_decentralised_image_classification_addlabels_3' completed.

Completed experiment 5/6
Elapsed time: 535.4 seconds
Estimated time remaining: 107.1 seconds
Estimated completion time: 2025-09-09 09:52:40.642968

Experiment 6/6
Task: image_classification, Network: decentralised, Additional Labels: 5
Creating experiment: decentralised image_classification with 5 additional labels
Loading federated CIFAR-10 dataset: uoft-cs/cifar10


2025/09/09 09:50:57 INFO mlflow.tracking.fluent: Experiment with name 'exp3_decentralised_image_classification_addlabels_5' does not exist. Creating a new experiment.


Running experiment...
Training Complete!
Experiment 'exp3_decentralised_image_classification_addlabels_5' completed.

Completed experiment 6/6
Elapsed time: 641.3 seconds
Estimated time remaining: 0.0 seconds
Estimated completion time: 2025-09-09 09:52:39.443065

All experiments completed!
Total time: 643.3 seconds
End time: 2025-09-09 09:52:41

