Here’s a simple example of synchronous distributed training using TensorFlow and Horovod:

Synchronization Process Explained \\
**Gradient Calculation: **Each worker computes gradients on its local batch of data. \\
**Gradient Averaging:** Horovod's Allreduce operation averages gradients from all workers. \\
**Model Update:** Each worker updates its model parameters with the averaged gradients. \\
**Broadcasting Initial Variables:** The initial model parameters are broadcasted from rank 0 to ensure all workers start with the same model parameters. \\

In [8]:
!pip install horovod

Collecting horovod
  Using cached horovod-0.28.1.tar.gz (3.5 MB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: horovod
  Building wheel for horovod (setup.py) ... [?25l[?25hcanceled[31mERROR: Operation cancelled by user[0m[31m
[0m

In [None]:
import tensorflow as tf

# Set up the strategy for distributed training
strategy = tf.distribute.MirroredStrategy()

print(f"Number of devices: {strategy.num_replicas_in_sync}")

# Open a strategy scope
with strategy.scope():
    # Define a simple model
    model = tf.keras.models.Sequential([
        tf.keras.layers.Dense(128, activation='relu', input_shape=(784,)),
        tf.keras.layers.Dense(10, activation='softmax')
    ])

    # Define optimizer
    opt = tf.keras.optimizers.Adam(0.001)

    # Compile the model
    model.compile(optimizer=opt,
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])

# Load data
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0

# Scale batch size based on the number of devices
batch_size = 128 * strategy.num_replicas_in_sync

# Create dataset
dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
dataset = dataset.shuffle(10000).batch(batch_size)

# Train the model
model.fit(dataset, epochs=5)


In [None]:
import tensorflow as tf
import horovod.tensorflow as hvd

# Initialize Horovod
hvd.init()

# Pin GPU to be used by each process
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')

# Define a simple model
model = tf.keras.models.Sequential([
    tf.keras.layers.Dense(128, activation='relu', input_shape=(784,)),
    tf.keras.layers.Dense(10, activation='softmax')
])

# Horovod: adjust learning rate based on the number of GPUs
opt = tf.keras.optimizers.Adam(0.001 * hvd.size())

# Horovod: add Horovod DistributedOptimizer
opt = hvd.DistributedOptimizer(opt)

model.compile(optimizer=opt,
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

# Load data
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0

# Horovod: use `hvd.size()` to scale batch size
batch_size = 128 * hvd.size()

# Horovod: use `hvd.rank()`, `hvd.size()` to partition data
dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
dataset = dataset.shard(hvd.size(), hvd.rank()).shuffle(10000).batch(batch_size)

callbacks = [
    # Horovod: broadcast initial variable states from rank 0 to all other processes.
    # This is necessary to ensure consistent initialization of all workers when
    # training is started with random weights or restored from a checkpoint.
    hvd.callbacks.BroadcastGlobalVariablesCallback(0),
]

# Train the model
model.fit(dataset, epochs=5, callbacks=callbacks)


ModuleNotFoundError: No module named 'horovod'

In [None]:
import tensorflow as tf
import horovod.tensorflow as hvd

# Initialize Horovod
hvd.init()

# Pin GPU to be used by each worker
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    tf.config.experimental.set_visible_devices(gpus[hvd.local_rank()], 'GPU')

# Build a simple model
def create_model():
    return tf.keras.models.Sequential([
        tf.keras.layers.Dense(128, activation='relu', input_shape=(784,)),
        tf.keras.layers.Dense(10, activation='softmax')
    ])

# Each worker creates a model instance
model = create_model()

# Define the optimizer
optimizer = tf.keras.optimizers.Adam(0.001)

# Wrap the optimizer with Horovod DistributedOptimizer
optimizer = hvd.DistributedOptimizer(optimizer)

# Compile the model with the wrapped optimizer
model.compile(optimizer=optimizer,
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

# Load and preprocess data
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0

# Horovod: Adjust batch size to be larger in distributed settings
batch_size = 128 * hvd.size()

# Create a dataset and shard it among workers
dataset = tf.data.Dataset.from_tensor_slices((x_train, y_train))
dataset = dataset.shard(hvd.size(), hvd.rank()).shuffle(10000).batch(batch_size)

# Create a callback to broadcast initial variable states from rank 0 to all other processes.
callbacks = [
    hvd.callbacks.BroadcastGlobalVariablesCallback(0)
]

# Optionally, adjust learning rate based on the number of workers (warmup).
initial_lr = 0.001
scaled_lr = initial_lr * hvd.size()
optimizer.learning_rate = scaled_lr

# Train the model
model.fit(dataset, epochs=5, callbacks=callbacks)

# Save model only on worker 0
if hvd.rank() == 0:
    model.save('my_model.h5')


In [None]:
from threading import Thread, Lock
import numpy as np

# Function to simulate gradient computation
def compute_gradients(data, model_parameters):
    # A simple gradient computation example: gradient is proportional to model parameters
    gradients = model_parameters - data  # Example: dummy gradient calculation
    return gradients

# Number of epochs and learning rate
epochs = 10
learning_rate = 0.01

# Number of workers (threads)
num_workers = 4

# Simulated data for each worker
# Each worker gets a random data point for simplicity
data = [np.random.randn(10) for _ in range(num_workers)]

# Shared model parameters (initialization)
model_parameters = np.random.randn(10)

# Lock for thread-safe updates
lock = Lock()

def worker(worker_id, data_point):
    global model_parameters
    for epoch in range(epochs):
        # Compute gradients on the local subset of data
        gradients = compute_gradients(data_point, model_parameters)

        # Asynchronously update global model parameters
        with lock:  # Ensure thread-safe updates
            model_parameters -= learning_rate * gradients

        print(f"Worker {worker_id} updated model parameters in epoch {epoch}.")

# Create threads for each worker
workers = [Thread(target=worker, args=(i, data[i])) for i in range(num_workers)]

# Start all workers
for w in workers:
    w.start()

# Wait for all workers to finish
for w in workers:
    w.join()

# Print the final model parameters
print("Final model parameters:", model_parameters)


Worker 0 updated model parameters in epoch 0.Worker 1 updated model parameters in epoch 0.
Worker 1 updated model parameters in epoch 1.
Worker 0 updated model parameters in epoch 1.
Worker 0 updated model parameters in epoch 2.
Worker 0 updated model parameters in epoch 3.
Worker 0 updated model parameters in epoch 4.
Worker 0 updated model parameters in epoch 5.
Worker 0 updated model parameters in epoch 6.
Worker 0 updated model parameters in epoch 7.
Worker 0 updated model parameters in epoch 8.
Worker 0 updated model parameters in epoch 9.

Worker 1 updated model parameters in epoch 2.
Worker 1 updated model parameters in epoch 3.
Worker 1 updated model parameters in epoch 4.
Worker 1 updated model parameters in epoch 5.
Worker 1 updated model parameters in epoch 6.
Worker 1 updated model parameters in epoch 7.
Worker 1 updated model parameters in epoch 8.
Worker 1 updated model parameters in epoch 9.
Worker 2 updated model parameters in epoch 0.
Worker 2 updated model parameters 

Decentralized SGD

In [None]:
import numpy as np
from threading import Thread, Lock
import random

# Initialize parameters for each worker
num_workers = 4
epochs = 20
learning_rate = 0.01
exchange_interval = 5  # Number of epochs after which parameter exchange occurs

# Dummy data for each worker
data = [np.random.randn(10) for _ in range(num_workers)]

# Initialize parameters for each worker (each worker starts with different parameters)
worker_params = [np.random.randn(10) for _ in range(num_workers)]

# Lock for thread-safe updates
lock = Lock()

def compute_gradients(data, params):
    """
    Simulated gradient computation.
    In practice, this would be computed based on the model's loss with respect to the data.
    """
    # Example gradient: simple negative gradient proportional to the current parameters
    gradients = params - data
    return gradients

def get_neighbors(worker_id):
    """
    Determine the neighbors for a given worker.
    This function simulates a simple ring topology where each worker exchanges parameters with its next neighbor.
    """
    if worker_id == 0:
        return [1, num_workers - 1]  # First worker connects to the next and the last
    elif worker_id == num_workers - 1:
        return [0, num_workers - 2]  # Last worker connects to the first and the previous
    else:
        return [worker_id - 1, worker_id + 1]  # Middle workers connect to their neighbors

def decentralized_update(worker_id, data_point):
    global worker_params
    for epoch in range(epochs):
        # Compute gradients based on local parameters and data
        local_gradients = compute_gradients(data_point, worker_params[worker_id])

        # Update local parameters
        with lock:  # Ensure thread-safe update
            worker_params[worker_id] -= learning_rate * local_gradients

        # Periodically exchange parameters with neighbors
        if epoch % exchange_interval == 0:
            neighbors = get_neighbors(worker_id)
            with lock:  # Ensure thread-safe parameter exchange
                for neighbor in neighbors:
                    # Average parameters with neighbor
                    worker_params[worker_id] = (worker_params[worker_id] + worker_params[neighbor]) / 2

            print(f"Worker {worker_id} exchanged parameters with neighbors: {neighbors} at epoch {epoch}.")

        print(f"Worker {worker_id} updated parameters at epoch {epoch}.")


In [None]:
# Create and start threads for each worker
threads = [Thread(target=decentralized_update, args=(i, data[i])) for i in range(num_workers)]

for thread in threads:
    thread.start()

# Wait for all threads to finish
for thread in threads:
    thread.join()

# Final output of parameters for all workers
print("\nFinal parameters of all workers:")
for worker_id in range(num_workers):
    print(f"Worker {worker_id} final parameters: {worker_params[worker_id]}")


Worker 0 exchanged parameters with neighbors: [1, 3] at epoch 0.Worker 1 exchanged parameters with neighbors: [0, 2] at epoch 0.
Worker 1 updated parameters at epoch 0.
Worker 1 updated parameters at epoch 1.
Worker 1 updated parameters at epoch 2.
Worker 1 updated parameters at epoch 3.
Worker 2 exchanged parameters with neighbors: [1, 3] at epoch 0.
Worker 2 updated parameters at epoch 0.
Worker 2 updated parameters at epoch 1.
Worker 2 updated parameters at epoch 2.
Worker 2 updated parameters at epoch 3.
Worker 2 updated parameters at epoch 4.
Worker 2 exchanged parameters with neighbors: [1, 3] at epoch 5.
Worker 2 updated parameters at epoch 5.
Worker 2 updated parameters at epoch 6.
Worker 2 updated parameters at epoch 7.
Worker 2 updated parameters at epoch 8.
Worker 2 updated parameters at epoch 9.
Worker 2 exchanged parameters with neighbors: [1, 3] at epoch 10.
Worker 2 updated parameters at epoch 10.
Worker 2 updated parameters at epoch 11.
Worker 2 updated parameters at ep

 Distributed SGD with Gradient Quantization

In [None]:
import numpy as np
from threading import Thread, Lock

# Number of workers
num_workers = 4
epochs = 20
learning_rate = 0.01
bits = 8  # Number of bits for quantization

# Dummy data for each worker
data = [np.random.randn(10) for _ in range(num_workers)]

# Shared model parameters (initialization)
model_parameters = np.random.randn(10)

# Lock for thread-safe updates
lock = Lock()

def compute_gradients(data, params):
    """
    Simulated gradient computation.
    In practice, this would be computed based on the model's loss with respect to the data.
    """
    # Example gradient: simple negative gradient proportional to the current parameters
    gradients = params - data
    return gradients

def quantize_gradients(gradients, bits=8):
    """
    Quantize gradients to reduce precision to a specific number of bits.
    """
    max_val = np.max(np.abs(gradients))
    scale = (2 ** bits - 1) / max_val
    quantized = np.round(gradients * scale) / scale
    return quantized

def worker_with_quantization(worker_id, data_point):
    global model_parameters
    for epoch in range(epochs):
        # Compute gradients
        gradients = compute_gradients(data_point, model_parameters)

        # Quantize gradients
        quantized_gradients = quantize_gradients(gradients, bits)

        # Update parameters with quantized gradients
        with lock:
            model_parameters -= learning_rate * quantized_gradients

        print(f"Worker {worker_id} updated model parameters with quantized gradients at epoch {epoch}.")

# Create and start threads for each worker
threads = [Thread(target=worker_with_quantization, args=(i, data[i])) for i in range(num_workers)]

for thread in threads:
    thread.start()

# Wait for all threads to finish
for thread in threads:
    thread.join()

# Print the final model parameters
print("\nFinal model parameters:", model_parameters)


Worker 0 updated model parameters with quantized gradients at epoch 0.
Worker 0 updated model parameters with quantized gradients at epoch 1.
Worker 0 updated model parameters with quantized gradients at epoch 2.
Worker 0 updated model parameters with quantized gradients at epoch 3.
Worker 0 updated model parameters with quantized gradients at epoch 4.
Worker 0 updated model parameters with quantized gradients at epoch 5.
Worker 0 updated model parameters with quantized gradients at epoch 6.
Worker 0 updated model parameters with quantized gradients at epoch 7.
Worker 0 updated model parameters with quantized gradients at epoch 8.
Worker 0 updated model parameters with quantized gradients at epoch 9.
Worker 0 updated model parameters with quantized gradients at epoch 10.
Worker 0 updated model parameters with quantized gradients at epoch 11.
Worker 0 updated model parameters with quantized gradients at epoch 12.
Worker 0 updated model parameters with quantized gradients at epoch 13.
Wo

In [None]:
import numpy as np
from threading import Thread, Lock

# Number of workers
num_workers = 4
epochs = 5
learning_rate = 0.01
bits = 8  # Number of bits for quantization

# Dummy data for each worker
data = [np.random.randn(10) for _ in range(num_workers)]

# Shared model parameters (initialization)
model_parameters = np.random.randn(10)

# Lock for thread-safe updates
lock = Lock()

def compute_gradients(data, params):
    """
    Simulated gradient computation.
    In practice, this would be computed based on the model's loss with respect to the data.
    """
    # Example gradient: simple negative gradient proportional to the current parameters
    gradients = params - data
    return gradients

def quantize_gradients(gradients, bits=8):
    """
    Quantize gradients to reduce precision to a specific number of bits.
    """
    max_val = np.max(np.abs(gradients))
    scale = (2 ** bits - 1) / max_val
    quantized = np.round(gradients * scale) / scale
    return quantized

def worker_with_quantization(worker_id, data_point):
    global model_parameters
    for epoch in range(epochs):
        # Compute gradients
        gradients = compute_gradients(data_point, model_parameters)

        # Quantize gradients
        quantized_gradients = quantize_gradients(gradients, bits)

        # Print gradients before and after quantization
        print(f"Worker {worker_id}, Epoch {epoch}:")
        print(f"  Gradients before quantization: {gradients}")
        print(f"  Gradients after quantization:  {quantized_gradients}\n")

        # Update parameters with quantized gradients
        with lock:
            model_parameters -= learning_rate * quantized_gradients

        print(f"Worker {worker_id} updated model parameters with quantized gradients at epoch {epoch}.")

# Create and start threads for each worker
threads = [Thread(target=worker_with_quantization, args=(i, data[i])) for i in range(num_workers)]

for thread in threads:
    thread.start()

# Wait for all threads to finish
for thread in threads:
    thread.join()

# Print the final model parameters
print("\nFinal model parameters:", model_parameters)


Worker 0, Epoch 0:Worker 1, Epoch 0:
  Gradients before quantization: [-0.28432148 -1.29229182  0.74060259 -1.67875105  1.65377276  0.50226756
  1.06744535 -1.97821947 -0.26637252  1.54975347]
  Gradients after quantization:  [-0.28703577 -1.29553981  0.73698373 -1.67566826  1.65239509  0.50425202
  1.07056583 -1.97821947 -0.2637626   1.55154468]


  Gradients before quantization: [-0.30390838 -1.63080329  1.36668098 -1.76386803  1.50498167  1.48528902
  1.80408624 -1.05684439  0.40584963  0.42470923]
  Gradients after quantization:  [-0.30421846 -1.63428989  1.36544566 -1.76163715  1.50694262  1.48571808
  1.80408624 -1.05415235  0.40326634  0.42449088]

Worker 0 updated model parameters with quantized gradients at epoch 0.
Worker 0, Epoch 1:
  Gradients before quantization: [-0.3008662  -1.61446039  1.35302652 -1.74625166  1.48991224  1.47043184
  1.78604538 -1.04630286  0.40181696  0.42046432]
  Gradients after quantization:  [-0.30117628 -1.61794699  1.35179121 -1.74402078  1.49187