In [1]:
import threading
import time
import random

# Shared global epoch counter
global_epoch = 0
MAX_GLOBAL_EPOCHS = 10
lock = threading.Lock()  # Ensures thread-safe updates

# Function to simulate aggregation of updates
def aggregate_updates(client_id, global_round, update_value):
    print(f"[Server] Received update {update_value} from Client {client_id} after Global Epoch {global_round}")

# Fast client function
def fast_clients(client_id):
    global global_epoch
    
    while True:
        with lock:
            if global_epoch >= MAX_GLOBAL_EPOCHS:
                break
            current_epoch = global_epoch
            global_epoch += 1  # Increment the global epoch for the next iteration
        
        print(f"\n--- Global Epoch {current_epoch} ---", flush=True)
        print(f"[Fast Client {client_id}] Global Epoch {current_epoch} started.", flush=True)
        
        time.sleep(1)  # Simulate fast client training
        update_value = random.randint(1, 100)  # Simulate a model update
        print(f"[Fast Client {client_id}] Global Epoch {current_epoch} finished. Sending update {update_value}", flush=True)
        aggregate_updates(client_id, current_epoch, update_value)

# Slow client function
def slow_clients(client_id):
    global global_epoch
    slow_epoch = 0
    
    while slow_epoch < MAX_GLOBAL_EPOCHS:
        with lock:
            if slow_epoch >= global_epoch:
                lock.release()
                time.sleep(0.5)  # Avoid busy waiting
                continue  # Wait until a new global epoch starts
        
        print(f"[Slow Client {client_id}] Global Epoch {slow_epoch} started.", flush=True)
        
        for local_epoch in range(3):  # Simulating 3 local epochs
            print(f"   [Slow Client {client_id}] Local Epoch {local_epoch} running...", flush=True)
            time.sleep(2)  # Simulating longer local training time
        
        update_value = random.randint(1, 100)  # Simulate a model update
        print(f"[Slow Client {client_id}] Global Epoch {slow_epoch} finished after local training. Sending update {update_value}", flush=True)
        aggregate_updates(client_id, slow_epoch, update_value)
        slow_epoch += 1  # Move to the next slow epoch

# Creating threads for clients
# threads = []
# num_fast_clients = 3  # Reduced to prevent excessive threading
# num_slow_clients = 2  # Reduced to prevent excessive threading

# for i in range(num_fast_clients):
#     t = threading.Thread(target=fast_clients, args=(i,))
#     threads.append(t)
#     t.start()

# for i in range(num_slow_clients):
#     t = threading.Thread(target=slow_clients, args=(i + num_fast_clients,))
#     threads.append(t)
#     t.start()

# # Wait for all threads to complete
# for t in threads:
#     t.join()


# Number of clients
num_fast_clients = 3
num_slow_clients = 2

def create_and_start_threads():
    threads = []
    
    # Create and start threads in an interleaved manner
    for i in range(max(num_fast_clients, num_slow_clients)):
        if i < num_fast_clients:
            t = threading.Thread(target=fast_clients, args=(i,), name=f"FastClient-{i}")
            threads.append(t)
            t.start()
        
        if i < num_slow_clients:
            t = threading.Thread(target=slow_clients, args=(i + num_fast_clients,), name=f"SlowClient-{i}")
            threads.append(t)
            t.start()

    # Wait for all threads to complete
    for t in threads:
        t.join()

# Execute the thread creation function
create_and_start_threads()


print("Training completed.")



--- Global Epoch 0 ---
[Fast Client 0] Global Epoch 0 started.[Slow Client 3] Global Epoch 0 started.

   [Slow Client 3] Local Epoch 0 running...

--- Global Epoch 1 ---
[Fast Client 1] Global Epoch 1 started.
[Slow Client 4] Global Epoch 0 started.

--- Global Epoch 2 ---
[Fast Client 2] Global Epoch 2 started.
   [Slow Client 4] Local Epoch 0 running...
[Fast Client 1] Global Epoch 1 finished. Sending update 51[Fast Client 2] Global Epoch 2 finished. Sending update 69
[Fast Client 0] Global Epoch 0 finished. Sending update 76

[Server] Received update 76 from Client 0 after Global Epoch 0

--- Global Epoch 3 ---
[Server] Received update 51 from Client 1 after Global Epoch 1

--- Global Epoch 4 ---
[Fast Client 1] Global Epoch 4 started.
[Server] Received update 69 from Client 2 after Global Epoch 2

--- Global Epoch 5 ---
[Fast Client 0] Global Epoch 3 started.
[Fast Client 2] Global Epoch 5 started.
[Fast Client 0] Global Epoch 3 finished. Sending update 49
   [Slow Client 3] Loca

In [6]:
client_ids_all = [1,2,3,4,5,6,7,8,9,10]
slow_client_ids = [3,6,8,9]
fast_client_ids = [1,2,4,5,7,10]

def train_fast_clients(client_ids):
    for cid in client_ids:
        fast_clients(client_id=cid)

def train_slow_clients(client_ids):
    for cid in client_ids:
        slow_clients(client_id=cid)

t_fast = threading.Thread(target=train_fast_clients,args=[fast_client_ids])
threads.append(t_fast)
t_fast.start()

t_slow = threading.Thread(target=train_slow_clients,args=[slow_client_ids])
threads.append(t_slow)
t_slow.start()

# Wait for all threads to complete
for t in threads:
    t.join()

[Slow Client 3] Global Epoch 0 started.
   [Slow Client 3] Local Epoch 0 running...
   [Slow Client 3] Local Epoch 1 running...
   [Slow Client 3] Local Epoch 2 running...
[Slow Client 3] Global Epoch 0 finished after local training. Sending update 50
[Server] Received update 50 from Client 3 after Global Epoch 0
[Slow Client 3] Global Epoch 1 started.
   [Slow Client 3] Local Epoch 0 running...
   [Slow Client 3] Local Epoch 1 running...
   [Slow Client 3] Local Epoch 2 running...
[Slow Client 3] Global Epoch 1 finished after local training. Sending update 64
[Server] Received update 64 from Client 3 after Global Epoch 1
[Slow Client 3] Global Epoch 2 started.
   [Slow Client 3] Local Epoch 0 running...
   [Slow Client 3] Local Epoch 1 running...
   [Slow Client 3] Local Epoch 2 running...
[Slow Client 3] Global Epoch 2 finished after local training. Sending update 55
[Server] Received update 55 from Client 3 after Global Epoch 2
[Slow Client 3] Global Epoch 3 started.
   [Slow Client

In [2]:
import threading
import time
import random

# Shared global epoch counter
global_epoch = 0
MAX_GLOBAL_EPOCHS = 5
lock = threading.Lock()  # Ensures thread-safe updates

num_fast_clients = 3
num_slow_clients = 2

# Function to simulate aggregation of updates
def aggregate_updates(client_id, global_round, update_value):
    print(f"[Server] Received update {update_value} from Client {client_id} after Global Epoch {global_round}")

# Fast clients function (handles all fast clients in one thread)
def fast_clients_group():
    global global_epoch
    
    while global_epoch < MAX_GLOBAL_EPOCHS:
        with lock:
            current_epoch = global_epoch
            global_epoch += 1  # Increment global epoch
        
        print(f"\n--- Global Epoch {current_epoch} ---", flush=True)
        for client_id in range(num_fast_clients):
            print(f"[Fast Client {client_id}] Global Epoch {current_epoch} started.", flush=True)
            time.sleep(1)  # Simulate fast client training
            update_value = random.randint(1, 100)  # Simulate a model update
            print(f"[Fast Client {client_id}] Global Epoch {current_epoch} finished. Sending update {update_value}", flush=True)
            aggregate_updates(client_id, current_epoch, update_value)

# Slow clients function (handles all slow clients in one thread)
def slow_clients_group():
    slow_epoch = 0
    
    while slow_epoch < MAX_GLOBAL_EPOCHS:
        print(f"[Slow Clients] Global Epoch {slow_epoch} started.", flush=True)
        
        for client_id in range(num_slow_clients):
            print(f"   [Slow Client {client_id}] Starting Local Training for Global Epoch {slow_epoch}", flush=True)
            
            for local_epoch in range(3):  # Simulating 3 local epochs independently
                print(f"   [Slow Client {client_id}] Local Epoch {local_epoch} running...", flush=True)
                time.sleep(2)  # Simulating longer local training time
            
            update_value = random.randint(1, 100)  # Simulate a model update
            print(f"   [Slow Client {client_id}] Finished local training. Sending update {update_value}", flush=True)
            aggregate_updates(client_id, slow_epoch, update_value)
        
        slow_epoch += 1  # Move to the next slow epoch

# Create two threads, one for all fast clients and one for all slow clients
fast_thread = threading.Thread(target=fast_clients_group, name="FastClientsThread")
slow_thread = threading.Thread(target=slow_clients_group, name="SlowClientsThread")

# Start both threads
fast_thread.start()
slow_thread.start()

# Wait for both threads to complete
fast_thread.join()
slow_thread.join()

print("Training completed.")



--- Global Epoch 0 ---
[Slow Clients] Global Epoch 0 started.
[Fast Client 0] Global Epoch 0 started.   [Slow Client 0] Starting Local Training for Global Epoch 0

   [Slow Client 0] Local Epoch 0 running...
[Fast Client 0] Global Epoch 0 finished. Sending update 70
[Server] Received update 70 from Client 0 after Global Epoch 0
[Fast Client 1] Global Epoch 0 started.
   [Slow Client 0] Local Epoch 1 running...[Fast Client 1] Global Epoch 0 finished. Sending update 58

[Server] Received update 58 from Client 1 after Global Epoch 0
[Fast Client 2] Global Epoch 0 started.
[Fast Client 2] Global Epoch 0 finished. Sending update 17
[Server] Received update 17 from Client 2 after Global Epoch 0

--- Global Epoch 1 ---
[Fast Client 0] Global Epoch 1 started.
[Fast Client 0] Global Epoch 1 finished. Sending update 97
   [Slow Client 0] Local Epoch 2 running...
[Server] Received update 97 from Client 0 after Global Epoch 1
[Fast Client 1] Global Epoch 1 started.
[Fast Client 1] Global Epoch 1 

In [4]:
import threading
import time
import random

# Shared global epoch counter
global_epoch = 0
MAX_GLOBAL_EPOCHS = 5
lock = threading.Lock()  # Ensures thread-safe updates

num_fast_clients = 3
num_slow_clients = 3

# Function to simulate aggregation of updates
def aggregate_updates(client_id, global_round, update_value):
    print(f"[Server] Received update {update_value} from Client {client_id} after Global Epoch {global_round}")

# Fast clients function (handles all fast clients in one thread)
def fast_clients_group():
    global global_epoch
    
    while global_epoch < MAX_GLOBAL_EPOCHS:
        with lock:
            current_epoch = global_epoch
            global_epoch += 1  # Increment global epoch
        
        print(f"\n--- Global Epoch {current_epoch} ---", flush=True)
        for client_id in range(num_fast_clients):
            print(f"[Fast Client {client_id}] Global Epoch {current_epoch} started.", flush=True)
            time.sleep(1)  # Simulate fast client training
            update_value = random.randint(1, 100)  # Simulate a model update
            print(f"[Fast Client {client_id}] Global Epoch {current_epoch} finished. Sending update {update_value}", flush=True)
            aggregate_updates(client_id, current_epoch, update_value)

# Slow client function (runs independently in a separate thread)
def slow_client_thread(client_id):
    slow_epoch = 0
    while slow_epoch < MAX_GLOBAL_EPOCHS:
        print(f"   [Slow Client {client_id}] Starting Local Training for Global Epoch {slow_epoch}", flush=True)
        
        for local_epoch in range(3):  # Simulating 3 local epochs independently
            print(f"   [Slow Client {client_id}] Local Epoch {local_epoch} running...", flush=True)
            time.sleep(2)  # Simulating longer local training time
        
        update_value = random.randint(1, 100)  # Simulate a model update
        print(f"   [Slow Client {client_id}] Finished local training. Sending update {update_value}", flush=True)
        aggregate_updates(client_id, slow_epoch, update_value)
        
        slow_epoch += 1  # Move to the next slow epoch

# Slow clients function (spawns a thread for each slow client)
def slow_clients_group():
    slow_threads = []
    for client_id in range(num_slow_clients):
        t = threading.Thread(target=slow_client_thread, args=(client_id,), name=f"SlowClient-{client_id}")
        slow_threads.append(t)
        t.start()

    # Wait for all slow clients to finish
    for t in slow_threads:
        t.join()

# Create two threads: one for fast clients and one for all slow clients
fast_thread = threading.Thread(target=fast_clients_group, name="FastClientsThread")
slow_thread = threading.Thread(target=slow_clients_group, name="SlowClientsThread")

# Start both threads
fast_thread.start()
slow_thread.start()

# Wait for both threads to complete
fast_thread.join()
slow_thread.join()

print("Training completed.")



--- Global Epoch 0 ---
[Fast Client 0] Global Epoch 0 started.
   [Slow Client 0] Starting Local Training for Global Epoch 0
   [Slow Client 1] Starting Local Training for Global Epoch 0
   [Slow Client 2] Starting Local Training for Global Epoch 0
   [Slow Client 0] Local Epoch 0 running...   [Slow Client 1] Local Epoch 0 running...

   [Slow Client 2] Local Epoch 0 running...
[Fast Client 0] Global Epoch 0 finished. Sending update 76
[Server] Received update 76 from Client 0 after Global Epoch 0
[Fast Client 1] Global Epoch 0 started.
   [Slow Client 2] Local Epoch 1 running...
   [Slow Client 0] Local Epoch 1 running...
   [Slow Client 1] Local Epoch 1 running...
[Fast Client 1] Global Epoch 0 finished. Sending update 38
[Server] Received update 38 from Client 1 after Global Epoch 0
[Fast Client 2] Global Epoch 0 started.
[Fast Client 2] Global Epoch 0 finished. Sending update 30
[Server] Received update 30 from Client 2 after Global Epoch 0

--- Global Epoch 1 ---
[Fast Client 0] 

In [6]:
import threading
import time
import random

# Shared global epoch counter
global_epoch = 0
MAX_GLOBAL_EPOCHS = 10
lock = threading.Lock()  # Ensures thread-safe updates

num_fast_clients = 3
num_slow_clients = 2

# Dictionary to store slow client updates for communication
slow_client_updates = {}

# Function to simulate aggregation of updates
def aggregate_updates(client_id, global_round, update_value):
    print(f"[Server] Received aggregated update {update_value} from Client {client_id} after Global Epoch {global_round}")

# Fast clients function (handles all fast clients in one thread)
def fast_clients_group():
    global global_epoch
    
    while global_epoch < MAX_GLOBAL_EPOCHS:
        with lock:
            current_epoch = global_epoch
            global_epoch += 1  # Increment global epoch
        
        print(f"\n--- Global Epoch {current_epoch} ---", flush=True)
        for client_id in range(num_fast_clients):
            print(f"[Fast Client {client_id}] Global Epoch {current_epoch} started.", flush=True)
            time.sleep(1)  # Simulate fast client training
            update_value = random.randint(1, 100)  # Simulate a model update
            print(f"[Fast Client {client_id}] Global Epoch {current_epoch} finished. Sending update {update_value}", flush=True)
            aggregate_updates(client_id, current_epoch, update_value)

# Slow client function (runs independently in a separate thread)
def slow_client_thread(client_id):
    global slow_client_updates
    slow_epoch = 0
    
    while slow_epoch < MAX_GLOBAL_EPOCHS:
        print(f"   [Slow Client {client_id}] Starting Local Training for Global Epoch {slow_epoch}", flush=True)
        
        # Simulating local training with 3 local epochs
        for local_epoch in range(3):
            print(f"   [Slow Client {client_id}] Local Epoch {local_epoch} running...", flush=True)
            time.sleep(2)  # Simulating longer local training time
        
        # Generate local update
        local_update = random.randint(1, 100)
        
        # Store the update in a shared dictionary
        with lock:
            slow_client_updates[client_id] = local_update
            print(f"   [Slow Client {client_id}] Finished local training. Local update: {local_update}", flush=True)
        
        # Simulating communication with other slow clients
        time.sleep(1)  # Give time for other clients to update their values
        
        # Aggregate updates from all slow clients
        aggregated_update = local_update
        with lock:
            for other_id, update in slow_client_updates.items():
                if other_id != client_id:  # Sum updates from other clients
                    aggregated_update += update
            print(f"   [Slow Client {client_id}] Aggregated update (self + neighbors): {aggregated_update}", flush=True)
        
        # Send the final aggregated update to the server
        aggregate_updates(client_id, slow_epoch, aggregated_update)
        
        slow_epoch += 1  # Move to the next slow epoch

# Slow clients function (spawns a thread for each slow client)
def slow_clients_group():
    slow_threads = []
    for client_id in range(num_slow_clients):
        t = threading.Thread(target=slow_client_thread, args=(client_id,), name=f"SlowClient-{client_id}")
        slow_threads.append(t)
        t.start()

    # Wait for all slow clients to finish
    for t in slow_threads:
        t.join()

# Create two threads: one for fast clients and one for all slow clients
fast_thread = threading.Thread(target=fast_clients_group, name="FastClientsThread")
slow_thread = threading.Thread(target=slow_clients_group, name="SlowClientsThread")

# Start both threads
fast_thread.start()
slow_thread.start()

# Wait for both threads to complete
fast_thread.join()
slow_thread.join()

print("Training completed.")



--- Global Epoch 0 ---
[Fast Client 0] Global Epoch 0 started.
   [Slow Client 0] Starting Local Training for Global Epoch 0
   [Slow Client 1] Starting Local Training for Global Epoch 0
   [Slow Client 0] Local Epoch 0 running...
   [Slow Client 1] Local Epoch 0 running...
[Fast Client 0] Global Epoch 0 finished. Sending update 3
[Server] Received aggregated update 3 from Client 0 after Global Epoch 0
[Fast Client 1] Global Epoch 0 started.
   [Slow Client 1] Local Epoch 1 running...
   [Slow Client 0] Local Epoch 1 running...
[Fast Client 1] Global Epoch 0 finished. Sending update 83
[Server] Received aggregated update 83 from Client 1 after Global Epoch 0
[Fast Client 2] Global Epoch 0 started.
[Fast Client 2] Global Epoch 0 finished. Sending update 30
[Server] Received aggregated update 30 from Client 2 after Global Epoch 0

--- Global Epoch 1 ---
[Fast Client 0] Global Epoch 1 started.
   [Slow Client 0] Local Epoch 2 running...
   [Slow Client 1] Local Epoch 2 running...
[Fast C

In [7]:
import threading
import time
import random

# Shared global epoch counter
global_epoch = 0
MAX_GLOBAL_EPOCHS = 10
lock = threading.Lock()  # Ensures thread-safe updates

num_fast_clients = 3
num_slow_clients = 2

# Dictionary to store updates from slow clients
slow_client_updates = {}
# List to store updates from both fast and slow clients
server_updates = []

# Function to simulate the server aggregating updates
def aggregate_server_updates(global_round):
    with lock:
        if server_updates:
            aggregated_update = sum(server_updates)  # Aggregate all updates
            print(f"\n[Server] Aggregated update for Global Epoch {global_round}: {aggregated_update}\n", flush=True)
            server_updates.clear()  # Reset for next round

# Fast clients function (handles all fast clients in one thread)
def fast_clients_group():
    global global_epoch

    while global_epoch < MAX_GLOBAL_EPOCHS:
        with lock:
            current_epoch = global_epoch
            global_epoch += 1  # Increment global epoch
        
        print(f"\n--- Global Epoch {current_epoch} ---", flush=True)

        for client_id in range(num_fast_clients):
            print(f"[Fast Client {client_id}] Global Epoch {current_epoch} started.", flush=True)
            time.sleep(1)  # Simulate fast client training
            update_value = random.randint(1, 100)  # Simulate a model update
            print(f"[Fast Client {client_id}] Global Epoch {current_epoch} finished. Sending update {update_value}", flush=True)

            with lock:
                server_updates.append(update_value)  # Store fast client update
            
        # Wait for all slow clients to send updates before aggregation
        time.sleep(3)  # Allow slow clients to complete
        aggregate_server_updates(current_epoch)

# Slow client function (runs independently in a separate thread)
def slow_client_thread(client_id):
    global slow_client_updates

    slow_epoch = 0

    while slow_epoch < MAX_GLOBAL_EPOCHS:
        print(f"   [Slow Client {client_id}] Starting Local Training for Global Epoch {slow_epoch}", flush=True)

        # Simulating local training with 3 local epochs
        for local_epoch in range(3):
            print(f"   [Slow Client {client_id}] Local Epoch {local_epoch} running...", flush=True)
            time.sleep(2)  # Simulating longer local training time
        
        # Generate local update
        local_update = random.randint(1, 100)

        # Store the update in a shared dictionary
        with lock:
            slow_client_updates[client_id] = local_update
            print(f"   [Slow Client {client_id}] Finished local training. Local update: {local_update}", flush=True)

            # Store slow client update in server updates list
            server_updates.append(local_update)

        slow_epoch += 1  # Move to the next slow epoch

# Slow clients function (spawns a thread for each slow client)
def slow_clients_group():
    slow_threads = []
    for client_id in range(num_slow_clients):
        t = threading.Thread(target=slow_client_thread, args=(client_id,), name=f"SlowClient-{client_id}")
        slow_threads.append(t)
        t.start()

    # Wait for all slow clients to finish
    for t in slow_threads:
        t.join()

# Create two threads: one for fast clients and one for all slow clients
fast_thread = threading.Thread(target=fast_clients_group, name="FastClientsThread")
slow_thread = threading.Thread(target=slow_clients_group, name="SlowClientsThread")

# Start both threads
fast_thread.start()
slow_thread.start()

# Wait for both threads to complete
fast_thread.join()
slow_thread.join()

print("Training completed.")



--- Global Epoch 0 ---
   [Slow Client 0] Starting Local Training for Global Epoch 0
[Fast Client 0] Global Epoch 0 started.
   [Slow Client 0] Local Epoch 0 running...
   [Slow Client 1] Starting Local Training for Global Epoch 0
   [Slow Client 1] Local Epoch 0 running...
[Fast Client 0] Global Epoch 0 finished. Sending update 10
[Fast Client 1] Global Epoch 0 started.
   [Slow Client 0] Local Epoch 1 running...
[Fast Client 1] Global Epoch 0 finished. Sending update 33
[Fast Client 2] Global Epoch 0 started.
   [Slow Client 1] Local Epoch 1 running...
[Fast Client 2] Global Epoch 0 finished. Sending update 70
   [Slow Client 0] Local Epoch 2 running...
   [Slow Client 1] Local Epoch 2 running...
   [Slow Client 0] Finished local training. Local update: 67
   [Slow Client 0] Starting Local Training for Global Epoch 1
[Server] Aggregated update for Global Epoch 0: 180


   [Slow Client 0] Local Epoch 0 running...
--- Global Epoch 1 ---
[Fast Client 0] Global Epoch 1 started.

   [Slo