# Distributed Machine Learning in Colab

In this notebook, we'll explore how to run distributed machine learning on Google Colab using three popular libraries: **TensorFlow Distributed**, **PyTorch Distributed**, and **Horovod**. We'll walk through the steps to set up each library, run distributed operations, and see the results.

Since Colab only provides access to a single GPU, we'll simulate distributed environments. Although full multi-node functionality cannot be replicated on Colab, this approach gives you a practical feel for distributed training.

---

## Section 1: TensorFlow Distributed

In this code, we are using **TensorFlow’s MirroredStrategy** to distribute training across multiple GPUs. In this setup, the model is mirrored on all available devices (GPUs), and TensorFlow handles splitting the input data and synchronizing the gradients after each batch.

**MirroredStrategy** works by ensuring that each GPU has the same copy of the model and that all GPUs perform forward and backward propagation in parallel. After each batch, the gradients from each device are averaged using an All-Reduce operation to ensure that the model weights stay consistent across devices.

This type of distributed training is useful when working with large datasets or models because it reduces the training time by leveraging multiple devices in parallel.

### Key Components:
- **MirroredStrategy**: A TensorFlow strategy that mirrors the model across multiple GPUs and synchronizes after each batch.
- **All-Reduce**: An operation that sums up the gradients from all GPUs and averages them, ensuring all GPUs have consistent weights.
- **strategy.scope()**: A context manager that ensures the defined model is properly distributed across devices.

If you weren't using a distributed strategy like MirroredStrategy, your model would only run on a single device, and all the training steps would be performed sequentially. Using distributed strategies like this one speeds up training by distributing the workload.

In this Colab notebook, we simulate the use of multiple GPUs, even though Colab only provides access to a single GPU, allowing you to learn the concepts without requiring a multi-GPU setup.



### Install TensorFlow

We need to install TensorFlow first, which is already included in Colab, but it’s good to ensure the latest version is installed.



In [1]:
!pip install tensorflow --upgrade



### Explanation of the Non-Distributed TensorFlow Code

This code demonstrates a simple neural network model training on the MNIST dataset using TensorFlow. In this version, the code **does not** utilize distributed strategies, meaning that it runs on a single GPU or CPU.

- **Loading the MNIST dataset**: The dataset is loaded using TensorFlow's built-in `tf.keras.datasets.mnist`, which provides images of handwritten digits. The pixel values of the images are scaled to between 0 and 1 for normalization.
  
- **Model Architecture**: A basic feed-forward neural network is defined using the `tf.keras.Sequential()` model API. The architecture consists of:
  - A `Flatten` layer that converts each 28x28 image into a one-dimensional array of 784 values.
  - A `Dense` layer with 128 units and ReLU activation, which acts as a hidden layer.
  - A final `Dense` layer with 10 units and softmax activation for classifying the digits (0-9).

- **Compilation**: The model is compiled with:
  - **Adam optimizer** for adjusting the weights.
  - **Sparse categorical crossentropy** as the loss function since the labels are integers (0-9).
  - **Accuracy metric** to evaluate performance during training.

- **Training**: The model is trained for 5 epochs using the `.fit()` method on the training dataset (`x_train`, `y_train`). This is where the backpropagation happens to adjust weights.

- **Evaluation**: The model is evaluated on the test dataset (`x_test`, `y_test`) using `.evaluate()` to check how well the model performs on unseen data.

#### Why this is a non-distributed version:
- No use of `tf.distribute.MirroredStrategy` or any distributed strategy API.
- All computations happen on a single device (CPU or one GPU if available).
- If you have multiple GPUs, only one will be used in this setup.

#### Result:
The model will output the following:
1. **Training Progress**: During training, the accuracy and loss will be printed after each epoch.
2. **Final Test Accuracy**: After training, the model will be evaluated on the test set, giving you the loss and accuracy, for example:


In [3]:
import tensorflow as tf

# Load and preprocess the MNIST dataset
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0

# Define a simple neural network without distributed strategy
model = tf.keras.Sequential([
    tf.keras.layers.Flatten(input_shape=(28, 28)),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(10, activation='softmax')
])

# Compile the model
model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

# Train the model on the dataset
model.fit(x_train, y_train, epochs=5)

# Evaluate the model
model.evaluate(x_test, y_test)


Epoch 1/6
[1m1875/1875[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 1ms/step - accuracy: 0.8757 - loss: 0.4314
Epoch 2/6
[1m1875/1875[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 1ms/step - accuracy: 0.9628 - loss: 0.1283
Epoch 3/6
[1m1875/1875[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 2ms/step - accuracy: 0.9758 - loss: 0.0827
Epoch 4/6
[1m1875/1875[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m4s[0m 2ms/step - accuracy: 0.9815 - loss: 0.0585
Epoch 5/6
[1m1875/1875[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 2ms/step - accuracy: 0.9872 - loss: 0.0452
Epoch 6/6
[1m1875/1875[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 2ms/step - accuracy: 0.9894 - loss: 0.0339
[1m313/313[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 3ms/step - accuracy: 0.9737 - loss: 0.0871


[0.07285197079181671, 0.9779999852180481]

In [4]:
import tensorflow as tf

# Define a distributed strategy
strategy = tf.distribute.MirroredStrategy()

# Load and preprocess the MNIST dataset
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0

# Use strategy.scope to ensure all computations are distributed
with strategy.scope():
    # Define a simple neural network
    model = tf.keras.Sequential([
        tf.keras.layers.Flatten(input_shape=(28, 28)),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dense(10, activation='softmax')
    ])

    model.compile(optimizer='adam',
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])

# Train the model on the dataset
model.fit(x_train, y_train, epochs=5)

# Evaluate the model
model.evaluate(x_test, y_test)


Epoch 1/5
[1m1875/1875[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m8s[0m 3ms/step - accuracy: 0.8761 - loss: 0.4343
Epoch 2/5
[1m1875/1875[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 3ms/step - accuracy: 0.9638 - loss: 0.1207
Epoch 3/5
[1m1875/1875[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 3ms/step - accuracy: 0.9762 - loss: 0.0795
Epoch 4/5
[1m1875/1875[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 3ms/step - accuracy: 0.9821 - loss: 0.0596
Epoch 5/5
[1m1875/1875[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 3ms/step - accuracy: 0.9860 - loss: 0.0458
[1m313/313[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 2ms/step - accuracy: 0.9749 - loss: 0.0878


[0.07630106806755066, 0.9775000214576721]

###Explanation:
MirroredStrategy distributes training across all available GPUs, but here it's simulated on a single GPU in Colab.

The MNIST dataset is used for training a simple neural network. After 5 epochs, the model will be evaluated on the test data.

## Exercise: Modify the TensorFlow Distributed Model to run on multiple nodes

Now that you have learned about TensorFlow Distributed with `MirroredStrategy`, let’s modify the model to add complexity and adjust some parameters.

### Task:
1. Modify the model by adding an extra **Dense** layer with 64 units and `relu` activation.
2. Change the number of **epochs** to 3 instead of 5.
3. Rerun the training process and observe the changes in performance and training time.

### Steps:
1. Locate the section where the model is defined.
2. Add another layer to the model using `Dense(64, activation='relu')`.
3. Reduce the number of epochs in the `model.fit()` function to 3.
4. Run the training and observe the output.

This exercise will help you understand how the complexity of a model impacts training time and how distributed strategies handle more complex models.


1. **Setting up the `TF_CONFIG` environment variable**:
   - The `TF_CONFIG` environment variable is essential when running TensorFlow in a multi-worker setup.
   - It defines the cluster configuration, specifying the IP addresses of the workers in the cluster.
   - Each worker is assigned a role:
     - The `worker` type is responsible for training, and there may be multiple workers in the cluster.
     - The `index` parameter indicates which worker the current machine is (e.g., index 0 for the first worker, index 1 for the second, and so on).
   - The environment is set before restarting the runtime to ensure it takes effect.

2. **Restarting the runtime**:
   - After defining `TF_CONFIG`, the runtime needs to be restarted manually to apply the cluster settings.
   - This ensures TensorFlow recognizes the distributed setup across multiple workers.

3. **Initializing `MultiWorkerMirroredStrategy`**:
   - Once the runtime restarts, `MultiWorkerMirroredStrategy` is initialized, which allows the model to be replicated across all workers and synchronizes gradient updates during training.

4. **Model definition and training**:
   - Inside the `strategy.scope()`, the model is built, compiled, and trained.
   - `strategy.scope()` ensures that the model’s computations are distributed across the cluster of workers, allowing for efficient distributed training.

5. **Training and evaluation**:
   - The training process runs in parallel across all workers, allowing for faster training.
   - The `evaluate()` method computes accuracy on the test dataset, showing the model’s performance after distributed training.

This approach is useful in scenarios where training datasets are large, and computation needs to be scaled across multiple nodes or machines.


In [1]:
import os
import json
import tensorflow as tf

# Step 1: Define the TF_CONFIG environment variable
# Replace the worker addresses with the actual IPs of your nodes
os.environ['TF_CONFIG'] = json.dumps({
    'cluster': {
        'worker': ["worker1_ip:port", "worker2_ip:port"]
    },
    'task': {'type': 'worker', 'index': 0}  # Set 'index': 0 for the first worker, 1 for the second, etc.
})

# Restart runtime manually to apply the settings and re-run the next cells


In [1]:
import tensorflow as tf

# Step 2: Initialize the strategy after the runtime restart
strategy = tf.distribute.MultiWorkerMirroredStrategy()

# Load and preprocess the MNIST dataset
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0

# Use strategy.scope to ensure computations are distributed across nodes and GPUs
with strategy.scope():
    # Define a simple neural network
    model = tf.keras.Sequential([
        tf.keras.layers.Flatten(input_shape=(28, 28)),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dense(10, activation='softmax')
    ])

    model.compile(optimizer='adam',
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])

# Train the model on the dataset
model.fit(x_train, y_train, epochs=5)

# Evaluate the model
model.evaluate(x_test, y_test)


Downloading data from https://storage.googleapis.com/tensorflow/tf-keras-datasets/mnist.npz
[1m11490434/11490434[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step


  super().__init__(**kwargs)


Epoch 1/5
[1m1875/1875[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m15s[0m 5ms/step - accuracy: 0.8827 - loss: 0.4213
Epoch 2/5
[1m1875/1875[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m12s[0m 7ms/step - accuracy: 0.9636 - loss: 0.1217
Epoch 3/5
[1m1875/1875[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 3ms/step - accuracy: 0.9760 - loss: 0.0795
Epoch 4/5
[1m1875/1875[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m7s[0m 3ms/step - accuracy: 0.9828 - loss: 0.0585
Epoch 5/5
[1m1875/1875[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m5s[0m 3ms/step - accuracy: 0.9859 - loss: 0.0449
[1m313/313[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 2ms/step - accuracy: 0.9715 - loss: 0.0896


[0.0782981589436531, 0.9746999740600586]

##Exercise

1. **Modify the `TF_CONFIG`** to include at least 3 worker nodes.
   - Set up a cluster configuration with 3 workers by modifying the `TF_CONFIG` environment variable.
   - Ensure that the `index` and IP addresses for each worker are updated accordingly.

2. **Experiment with different model architectures** within the `strategy.scope()`.
   - Try changing the architecture (e.g., adding more layers or changing the activation function) and observe how the distributed strategy handles the new model.

3. **Scaling Learning Rate**:
   - Adjust the learning rate by multiplying the base rate by the number of workers. This is a best practice in distributed training to maintain effective learning across nodes.



## Section 2: PyTorch Distributed

In this example, we use **PyTorch's `torch.distributed`** package to simulate distributed training. We are simulating communication between multiple processes using the **Gloo backend**, which is optimized for CPU-based communication (though it can also work with GPUs). Each process in this simulation represents a "worker," and each worker holds its own tensor.

The key operation we use here is **AllReduce**, which is responsible for summing the tensors from all processes and distributing the result back to each one. In a real-world distributed setup, each process would compute gradients on its own mini-batch of data, and AllReduce would sum these gradients to synchronize the model across all workers.

In non-distributed PyTorch training, everything runs on a single process and device (GPU or CPU). No communication between processes would be necessary, and operations like AllReduce wouldn't be used. In distributed training, however, such operations are critical to ensure all processes maintain consistent model weights.

### Key Components:
- **torch.distributed**: A PyTorch package that facilitates distributed training across multiple devices or machines.
- **Gloo Backend**: A backend optimized for communication between CPUs and GPUs, often used for small clusters.
- **AllReduce**: An operation that sums and distributes data (like tensors or gradients) across all workers.
- **Process Group**: A group of processes that communicate with each other, initialized using `init_process_group`.

This example demonstrates how data is synchronized across processes using AllReduce. While this simulation runs on a single device in Colab, it helps you understand how distributed training works when you scale to multiple devices or machines.



### Install PyTorch

Colab comes preinstalled with PyTorch, but we’ll ensure the latest version.




In [2]:
#update torch
!pip install torch --upgrade



In [3]:
import torch

# Simulated training without distributed setup
def run():
    # Each "worker" would just operate independently
    tensor = torch.ones(1) * 0  # For example, a single rank with value 0
    print(f"Before operation: The tensor has {tensor.item()}")

    # Normally, there would be no AllReduce or inter-process communication
    # Here, we'll just simulate a simple addition for demonstration
    tensor += 1  # Simulate some local computation or operation

    print(f"After operation: The tensor has {tensor.item()}")

# Run the non-distributed simulation
run()


Before operation: The tensor has 0.0
After operation: The tensor has 1.0


In [4]:
import torch
import torch.distributed as dist
from torch.multiprocessing import Process
import os

# Initialize the process group using the Gloo backend for CPU communication
def init_process(rank, size, fn, backend='gloo'):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'
    dist.init_process_group(backend, rank=rank, world_size=size)
    fn(rank, size)
    dist.destroy_process_group()

# Simulated distributed training example
def run(rank, size):
    tensor = torch.ones(1) * rank  # Simulate each worker having its own value
    print(f"Before AllReduce: Rank {rank} has {tensor.item()}")

    # Perform AllReduce operation (sum the tensors across ranks)
    dist.all_reduce(tensor, op=dist.ReduceOp.SUM)

    print(f"After AllReduce: Rank {rank} has {tensor.item()}")

# Function to spawn processes and simulate workers
def spawn_processes():
    size = 2  # Simulate two processes (workers)
    processes = []

    for rank in range(size):
        p = Process(target=init_process, args=(rank, size, run))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

# Run the simulation
spawn_processes()


Before AllReduce: Rank 0 has 0.0Before AllReduce: Rank 1 has 1.0

After AllReduce: Rank 1 has 1.0
After AllReduce: Rank 0 has 1.0


###Explanation:
torch.distributed provides a way to perform distributed communication.

We use the Gloo backend to simulate an AllReduce operation, where two processes sum their tensors and distribute the result to all participants.


## Exercise: Modify the PyTorch Distributed Setup

Now that you’ve learned about distributed training with **PyTorch** and the **AllReduce** operation, let's expand the example by modifying the number of processes (workers).

### Task:
1. Change the number of **workers** (processes) in the simulation from 2 to 3.
2. Observe how the tensor values before and after **AllReduce** change when using 3 workers instead of 2.

### Steps:
1. Locate the `spawn_processes()` function in the code.
2. Change the `size` variable from 2 to 3 to simulate 3 workers.
3. Run the simulation and observe the printed values before and after AllReduce for each worker.

This exercise will help you understand how AllReduce works across different numbers of processes and how data synchronization happens in distributed environments.


In [10]:
!nvidia-smi


Sat Oct 12 18:06:36 2024       
+---------------------------------------------------------------------------------------+
| NVIDIA-SMI 535.104.05             Driver Version: 535.104.05   CUDA Version: 12.2     |
|-----------------------------------------+----------------------+----------------------+
| GPU  Name                 Persistence-M | Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp   Perf          Pwr:Usage/Cap |         Memory-Usage | GPU-Util  Compute M. |
|                                         |                      |               MIG M. |
|   0  Tesla T4                       Off | 00000000:00:04.0 Off |                    0 |
| N/A   52C    P0              29W /  70W |    629MiB / 15360MiB |      0%      Default |
|                                         |                      |                  N/A |
+-----------------------------------------+----------------------+----------------------+
                                                                    

## PyTorch Distributed Training Example

In this example, we are demonstrating the use of PyTorch's `torch.distributed` package to perform distributed training across multiple workers (or processes). This example doesn't perform a complex task but rather focuses on demonstrating the mechanics of the **AllReduce** operation in a distributed setting.

### Code Overview:

We initialize the process group using `torch.distributed.init_process_group` to manage communication between different processes (workers). For this example, we're using the **Gloo** backend, which is typically used for CPU-based communication in distributed training, but we could also use **NCCL** for GPU communication.

The key function being demonstrated is **AllReduce**, which performs a reduction operation (such as summing) across all tensors from the participating processes and synchronizes their values across workers.

### Key Functions in the Code:

- **init_process**: Initializes the process group and sets up the distributed environment.
  - The environment variables `MASTER_ADDR` and `MASTER_PORT` are used to establish communication between processes.
  - The `init_process_group` call connects each worker into a distributed communication group.

- **run**: This is where the core computation happens. Each worker starts with a tensor containing its **rank** (which is essentially the worker ID). We then print the value of the tensor before performing the **AllReduce** operation.

  - The `dist.all_reduce` operation is performed on this tensor, summing the tensors across all workers.
  - After the **AllReduce** operation, each worker has the same value, which is the sum of all the individual tensors from the different workers.
  
  Example output:


In [11]:
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
from torch.multiprocessing import Process
import os

# Define a simple neural network
class SimpleModel(nn.Module):
    def __init__(self):
        super(SimpleModel, self).__init__()
        self.fc = nn.Linear(10, 1)

    def forward(self, x):
        return self.fc(x)

# Function to initialize the process group and run training
def init_process(rank, size, fn, backend='gloo'):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'
    dist.init_process_group(backend, rank=rank, world_size=size)
    fn(rank, size)
    dist.destroy_process_group()

# Training function for each process
def train(rank, size):
    # Create a simple model and move it to the appropriate device (CPU in this case)
    model = SimpleModel()

    # Define a loss function and an optimizer
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(model.parameters(), lr=0.01)

    # Create fake data
    inputs = torch.randn(10)
    targets = torch.randn(1)

    # Forward pass
    outputs = model(inputs)
    loss = loss_fn(outputs, targets)

    # Backward pass
    optimizer.zero_grad()
    loss.backward()

    # Perform AllReduce to average the gradients across all processes
    for param in model.parameters():
        dist.all_reduce(param.grad.data, op=dist.ReduceOp.SUM)
        param.grad.data /= size  # Average the gradients

    # Apply the gradient updates
    optimizer.step()

    print(f"Rank {rank}, Loss: {loss.item()}")

# Function to spawn processes and simulate distributed training
def spawn_processes():
    size = 2  # Simulate two workers
    processes = []

    for rank in range(size):
        p = Process(target=init_process, args=(rank, size, train))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

# Run the real distributed training example
spawn_processes()


Rank 1, Loss: 1.6182228326797485
Rank 0, Loss: 1.6182228326797485



In this output, each worker had a different initial value based on its rank (0 for worker 0, 1 for worker 1). After the **AllReduce** operation, both workers have the same value (1.0), which is the sum of the values across workers.

- **spawn_processes**: This function spawns the worker processes. We are simulating a small distributed system with 2 workers.

### What is Happening:

1. **Initialization**: Each worker (process) is initialized with its own rank and the size of the worker pool (2 in this case).
2. **Before AllReduce**: Each worker has its own local value. For example, rank 0 has a tensor with the value `0.0` and rank 1 has a tensor with the value `1.0`.
3. **AllReduce Operation**: The `all_reduce` function is used to sum the tensors across all workers. In this case, the operation sums the values across ranks (0 + 1 = 1), so both workers end up with the same result (`1.0`).
4. **After AllReduce**: After the **AllReduce** operation, both workers have the same synchronized value.

### Why Distributed Training:

In real distributed training tasks, workers compute gradients on their local data, and **AllReduce** is often used to average these gradients across all workers before updating the model parameters. This ensures that all workers are working with the same model state after each training iteration.

### Example Output:



# Single GPU Training Explanation

In this example, we are training a simple feedforward neural network on the MNIST dataset using a **single GPU**.

### Key Steps:

1. **Dataset Preparation**:
   - The MNIST dataset is loaded using `torchvision.datasets.MNIST`. This dataset contains images of handwritten digits and is commonly used for training image classification models.
   - We use the `DataLoader` to load the data in batches of 64 images.

2. **Model Definition**:
   - We define a simple neural network with two hidden layers. The input size is 28x28 (the size of the MNIST images), and the output layer has 10 units corresponding to the 10 digit classes (0-9).

3. **Device Setup**:
   - We check if a GPU is available using `torch.cuda.is_available()`. If a GPU is available, the model and data will be moved to the GPU for faster computation.

4. **Loss Function and Optimizer**:
   - The loss function used is `CrossEntropyLoss`, which is suitable for classification tasks.
   - The optimizer is Adam, which adjusts the weights of the model based on the gradients computed during backpropagation.

5. **Training Loop**:
   - The training process runs for 5 epochs. In each epoch, the model goes through the training data, performs forward and backward passes, and updates its weights using the optimizer.
   - After each epoch, the average loss is printed to track the model's performance.

### Important Points:
- This code runs on a single GPU (or CPU if a GPU isn't available).
- It's a basic example of how to use PyTorch for training a neural network on a small dataset like MNIST.


In [None]:
print(torch.cuda.device_count())  # Should return 1 in Colab, so it will work only the first example, not the other two.

1


In [2]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader

# Define a simple neural network
class SimpleNN(nn.Module):
    def __init__(self):
        super(SimpleNN, self).__init__()
        self.fc1 = nn.Linear(28 * 28, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 10)

    def forward(self, x):
        x = x.view(-1, 28 * 28)  # Flatten the image
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

# Check if GPU is available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Prepare the MNIST dataset
transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))])
train_dataset = torchvision.datasets.MNIST(root='./data', train=True, transform=transform, download=True)
train_loader = DataLoader(dataset=train_dataset, batch_size=64, shuffle=True)

# Create the model, define the loss function and the optimizer
model = SimpleNN().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Training loop
def train(model, loader, criterion, optimizer, device):
    model.train()
    for epoch in range(5):  # 5 epochs for simplicity
        running_loss = 0.0
        for images, labels in loader:
            images, labels = images.to(device), labels.to(device)

            # Zero the parameter gradients
            optimizer.zero_grad()

            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, labels)

            # Backward pass and optimize
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
        print(f"Epoch [{epoch+1}/5], Loss: {running_loss/len(loader)}")

# Run the training
train(model, train_loader, criterion, optimizer, device)


Epoch [1/5], Loss: 0.390142286514073
Epoch [2/5], Loss: 0.1884266047526016
Epoch [3/5], Loss: 0.13713808088247648
Epoch [4/5], Loss: 0.11010322737684256
Epoch [5/5], Loss: 0.09465299468429914


# PyTorch DataParallel Example

In this notebook, we will explore how to use **PyTorch's `DataParallel`** to leverage multiple GPUs in a single node (machine) for training a neural network. This method splits your input data across multiple GPUs, processes it in parallel, and then combines the results.

### What is `DataParallel`?

- **`DataParallel`** is a simple way to distribute the model across multiple GPUs.
- It automatically splits your input batch across GPUs, computes the forward and backward pass in parallel, and then averages the gradients across all GPUs.
- The model is then updated based on these averaged gradients.
  
`DataParallel` works efficiently on **a single node** with multiple GPUs and is a quick way to get your training up and running without much modification to your existing code.

---

## Steps Covered in this Example

1. **Import Libraries**: We'll use `torch` and `torchvision` to load the MNIST dataset and define a simple feed-forward neural network.
2. **Define the Neural Network**: We define a simple neural network with three fully connected layers.
3. **Check GPU Availability**: We check if GPUs are available and set up `DataParallel` if more than one GPU is present.
4. **Training Loop**: We write a simple training loop that runs for 5 epochs and prints the loss at each epoch.
5. **Running the Model**: If multiple GPUs are available, the model will automatically run in parallel across them.

---

## Key Differences from DistributedDataParallel:

- **Single Process**: Unlike `DistributedDataParallel` (DDP), `DataParallel` uses a single process to handle multiple GPUs.
- **Simplicity**: You do not need to manage distributed initialization or communication between nodes.
- **Overhead**: `DataParallel` incurs more overhead than `DDP` because synchronization happens on the main GPU, potentially becoming a bottleneck.


In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader

# Define the neural network
class SimpleNN(nn.Module):
    def __init__(self):
        super(SimpleNN, self).__init__()
        self.fc1 = nn.Linear(28 * 28, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 10)

    def forward(self, x):
        x = x.view(-1, 28 * 28)
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

# Training function (no need for rank and world_size)
def train(model, loader, criterion, optimizer, device):
    model.train()
    for epoch in range(5):  # 5 epochs for simplicity
        running_loss = 0.0
        for images, labels in loader:
            images, labels = images.to(device), labels.to(device)

            # Zero the parameter gradients
            optimizer.zero_grad()

            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, labels)

            # Backward pass and optimize
            loss.backward()
            optimizer.step()

            running_loss += loss.item()
        print(f"Epoch [{epoch+1}/5], Loss: {running_loss/len(loader)}")

# Main function
def main():
    # Check if GPU is available
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Load the dataset
    transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))])
    train_dataset = torchvision.datasets.MNIST(root='./data', train=True, transform=transform, download=True)
    train_loader = DataLoader(dataset=train_dataset, batch_size=64, shuffle=True)

    # Initialize the model and move it to the available device
    model = SimpleNN()

    # Wrap the model with DataParallel to use multiple GPUs if available
    if torch.cuda.device_count() > 1:
        print(f"Using {torch.cuda.device_count()} GPUs with DataParallel")
        model = nn.DataParallel(model)

    model = model.to(device)

    # Define the loss function and optimizer
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    # Start training
    train(model, train_loader, criterion, optimizer, device)

# Run the training
if __name__ == "__main__":
    main()


Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Failed to download (trying next):
HTTP Error 403: Forbidden

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-images-idx3-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-images-idx3-ubyte.gz to ./data/MNIST/raw/train-images-idx3-ubyte.gz


100%|██████████| 9912422/9912422 [00:10<00:00, 910825.06it/s] 


Extracting ./data/MNIST/raw/train-images-idx3-ubyte.gz to ./data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
Failed to download (trying next):
HTTP Error 403: Forbidden

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-labels-idx1-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-labels-idx1-ubyte.gz to ./data/MNIST/raw/train-labels-idx1-ubyte.gz


100%|██████████| 28881/28881 [00:00<00:00, 65365.94it/s]


Extracting ./data/MNIST/raw/train-labels-idx1-ubyte.gz to ./data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
Failed to download (trying next):
HTTP Error 403: Forbidden

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-images-idx3-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-images-idx3-ubyte.gz to ./data/MNIST/raw/t10k-images-idx3-ubyte.gz


100%|██████████| 1648877/1648877 [00:06<00:00, 241915.00it/s]


Extracting ./data/MNIST/raw/t10k-images-idx3-ubyte.gz to ./data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
Failed to download (trying next):
HTTP Error 403: Forbidden

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-labels-idx1-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-labels-idx1-ubyte.gz to ./data/MNIST/raw/t10k-labels-idx1-ubyte.gz


100%|██████████| 4542/4542 [00:00<00:00, 3087605.96it/s]


Extracting ./data/MNIST/raw/t10k-labels-idx1-ubyte.gz to ./data/MNIST/raw

Epoch [1/5], Loss: 0.40487796728258957
Epoch [2/5], Loss: 0.18897997165348993
Epoch [3/5], Loss: 0.13608456222908394
Epoch [4/5], Loss: 0.1107894161295122
Epoch [5/5], Loss: 0.09589088165503083


----

#End of the Examples

-----


###Other Examples below needs to be adapted to Colab.

# Distributed Training Explanation

In this example, we modify the training process to run on **multiple GPUs** using PyTorch's **DistributedDataParallel (DDP)**. This allows us to train the model in parallel, which speeds up training when using large datasets or models.

### Key Steps:

1. **Distributed Setup**:
   - The distributed environment is initialized using `torch.distributed.init_process_group()`, which sets up communication between the GPUs.
   - We use the **NCCL backend**, which is optimized for GPU communication.

2. **Distributed Sampler**:
   - In distributed training, each GPU needs to work on a different part of the dataset to avoid redundant computations.
   - We use `DistributedSampler` to ensure that each GPU gets a unique portion of the training data.

3. **Model Parallelism**:
   - The model is wrapped in `torch.nn.parallel.DistributedDataParallel`, which ensures that gradients are synchronized across all GPUs after each training step.
   - Each GPU trains its own mini-batch, and then the gradients are averaged across all GPUs.

4. **Training Loop**:
   - Similar to the single GPU version, we train the model for 5 epochs. Each GPU processes a subset of the data and updates its version of the model. The updates are synchronized across GPUs at the end of each iteration.

5. **Multiprocessing**:
   - We use `torch.multiprocessing.spawn()` to launch a separate training process for each GPU. Each process handles the training on one GPU.

### Important Points:
- This version of the code is designed to run on multiple GPUs, with each GPU handling a portion of the dataset.
- Distributed training helps speed up model training, especially for large-scale tasks.
- The communication between GPUs is handled by PyTorch's Distributed Data Parallel (DDP) mechanism, ensuring efficient and synchronized training across all devices.


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.distributed as dist
import torch.multiprocessing as mp
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, DistributedSampler

# Define the neural network (same as before)
class SimpleNN(nn.Module):
    def __init__(self):
        super(SimpleNN, self).__init__()
        self.fc1 = nn.Linear(28 * 28, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 10)

    def forward(self, x):
        x = x.view(-1, 28 * 28)
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

# Distributed initialization
def setup(rank, world_size):
    dist.init_process_group("nccl", rank=rank, world_size=world_size)

def cleanup():
    dist.destroy_process_group()

# Distributed training function
def train(rank, world_size):
    setup(rank, world_size)

    # Set the device based on rank
    device = torch.device(f'cuda:{rank}')

    # Load the dataset and create a DistributedSampler
    transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))])
    train_dataset = torchvision.datasets.MNIST(root='./data', train=True, transform=transform, download=True)

    sampler = DistributedSampler(train_dataset, num_replicas=world_size, rank=rank)
    train_loader = DataLoader(dataset=train_dataset, batch_size=64, shuffle=False, sampler=sampler)

    # Initialize the model, loss, and optimizer
    model = SimpleNN().to(device)
    model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[rank])

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    # Training loop
    for epoch in range(5):  # 5 epochs for simplicity
        model.train()
        sampler.set_epoch(epoch)  # Shuffle data differently every epoch
        running_loss = 0.0
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        if rank == 0:  # Only rank 0 prints
            print(f"Epoch [{epoch+1}/5], Loss: {running_loss/len(train_loader)}")

    cleanup()

# Multi-GPU main function
def main():
    world_size = torch.cuda.device_count()  # Number of GPUs
    mp.spawn(train, args=(world_size,), nprocs=world_size, join=True)

# Run the distributed training
if __name__ == "__main__":
    main()


ProcessExitedException: process 0 terminated with exit code 1

# Multi-node Multi-GPU Training in PyTorch

### What Happens:

- **Runs on multiple nodes**, each with one or more GPUs.
- A master node coordinates the distributed training across all nodes and GPUs.
- Each GPU on each node processes a portion of the data and computes gradients.
- Gradients are synchronized across all GPUs (across nodes), ensuring that model updates are consistent.

### Key Steps:

1. **Master Node Setup**:
   - The master node controls and coordinates communication between nodes.
   - Requires setting the `MASTER_ADDR` and `MASTER_PORT` environment variables.

2. **World Size and Ranks**:
   - The `world_size` is the total number of processes across all nodes (total number of GPUs).
   - Each GPU on each node has a unique rank, which helps to assign tasks and synchronize data.

3. **Distributed Data Parallelism**:
   - Each node uses `torch.nn.parallel.DistributedDataParallel` to wrap its model, ensuring synchronized training.
   - Data is split across GPUs both within and across nodes.

4. **Communication Backend**:
   - Uses the **NCCL** backend for GPU communication across multiple nodes and GPUs.
   - Communication across nodes happens over the network.

### Key Points:

- **Scalability**: Allows for training across multiple machines, drastically speeding up large-scale machine learning tasks.
- **Communication Overhead**: More communication between nodes (compared to single-node), which can introduce latency but allows for training at a larger scale.
- **Complex Setup**: Requires coordination across nodes (master node, IP addresses, and ports), but scales well for large datasets and models.
- **Use Cases**: Suitable for very large datasets and models that require multiple nodes and multiple GPUs for efficient training.

### Main Differences from Single-node Training:

- **Single-node**: Only one machine (node) with multiple GPUs. Faster communication due to shared memory.
- **Multi-node**: Multiple machines (nodes) with GPUs, requiring network communication between nodes.


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.distributed as dist
import torch.multiprocessing as mp
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, DistributedSampler

# Define the neural network (same as before)
class SimpleNN(nn.Module):
    def __init__(self):
        super(SimpleNN, self).__init__()
        self.fc1 = nn.Linear(28 * 28, 128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, 10)

    def forward(self, x):
        x = x.view(-1, 28 * 28)
        x = torch.relu(self.fc1(x))
        x = torch.relu(self.fc2(x))
        x = self.fc3(x)
        return x

# Distributed setup across nodes and GPUs
def setup(rank, world_size, master_addr, master_port):
    os.environ['MASTER_ADDR'] = master_addr  # Master node's IP address
    os.environ['MASTER_PORT'] = master_port  # Master node's port
    dist.init_process_group("nccl", rank=rank, world_size=world_size)

def cleanup():
    dist.destroy_process_group()

# Distributed training function
def train(rank, world_size, master_addr, master_port):
    setup(rank, world_size, master_addr, master_port)

    # Set the device based on rank (each GPU on each node)
    device = torch.device(f'cuda:{rank % torch.cuda.device_count()}')

    # Load the dataset and create a DistributedSampler
    transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.5,), (0.5,))])
    train_dataset = torchvision.datasets.MNIST(root='./data', train=True, transform=transform, download=True)

    sampler = DistributedSampler(train_dataset, num_replicas=world_size, rank=rank)
    train_loader = DataLoader(dataset=train_dataset, batch_size=64, shuffle=False, sampler=sampler)

    # Initialize the model, loss, and optimizer
    model = SimpleNN().to(device)
    model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[device])

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)

    # Training loop
    for epoch in range(5):  # 5 epochs for simplicity
        model.train()
        sampler.set_epoch(epoch)  # Shuffle data differently every epoch
        running_loss = 0.0
        for images, labels in train_loader:
            images, labels = images.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

        if rank == 0:  # Only rank 0 prints
            print(f"Epoch [{epoch+1}/5], Loss: {running_loss/len(train_loader)}")

    cleanup()

# Multi-node Multi-GPU main function
def main():
    # Number of GPUs per node
    world_size = torch.cuda.device_count() * num_nodes  # Example: num_nodes = 2 and each has 4 GPUs
    master_addr = '192.168.1.1'  # Replace with the actual master node's IP address
    master_port = '12355'  # Replace with the port used for distributed training

    mp.spawn(train, args=(world_size, master_addr, master_port), nprocs=torch.cuda.device_count(), join=True)

if __name__ == "__main__":
    num_nodes = 2  # Adjust this to your number of nodes
    main()


ProcessExitedException: process 0 terminated with exit code 1

# Distributed Linear Regression Training with PyTorch (Not working in one node only)

In this example, we demonstrate how to perform distributed training in PyTorch using a simple linear regression model. We will use PyTorch’s `torch.distributed` package to synchronize gradients across multiple processes, simulating a multi-worker distributed environment.

## Key Concepts

- **Linear Regression Model**: A basic model that takes one input and predicts one output. The equation we are modeling is `y = 2x + 1`, and we use synthetic data to train the model.
  
- **Data Creation**: We generate synthetic training data for this model using the equation. Each process will use the same dataset to compute the loss and gradients.

- **Distributed Setup**: PyTorch’s `init_process_group` is used to initialize the distributed communication backend (in this case, using the **Gloo** backend, which is optimized for CPU communication). This allows each process to communicate with others.

- **Training Process**:
    - Each worker initializes a copy of the same model.
    - Each worker computes the gradients based on its local loss.
    - Gradients are **summed** across all workers using the `dist.all_reduce` operation, and then averaged by dividing the summed gradients by the number of workers.
    - The optimizer then updates the model parameters with the averaged gradients, ensuring that all workers’ models are synchronized.

- **Parallel Training**: In this simulation, we have two workers (processes) that perform parallel training. This simulates a scenario where you distribute the training across different nodes or GPUs.

## What Happens in the Code

1. **Model Initialization**: A linear regression model is defined using PyTorch’s `nn.Linear`. This model is simple, with one input and one output.

2. **Gradient Synchronization**: After computing gradients locally, each process synchronizes the gradients across all workers using `dist.all_reduce`. This ensures that every process has the same gradients and that the model is updated consistently.

3. **Training**: The training runs for 20 epochs, with each process printing its initial and final weights. You'll notice that the final weights converge to similar values across both processes, showing how distributed training synchronizes learning.

4. **Output**: You’ll see that the weights of both processes start from different initial values but converge to the same value after training, illustrating the benefit of distributed gradient aggregation.

## Why Distributed?

In non-distributed training, you only use one process or one machine to train your model, which can be slow for large models or datasets. In distributed training, you split the workload across multiple processes or machines, speeding up training while still ensuring all models stay in sync.

This example uses the Gloo backend (for CPU communication) and simulates two processes working in parallel to train a model. For real-world applications, you would scale this to more processes and potentially use GPUs.

---

Run the code below to see the distributed training in action. You’ll observe how the model’s weights are synchronized across two workers.


In [None]:
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
from torch.multiprocessing import Process
import os

# Define a simple Linear Regression model
class LinearRegressionModel(nn.Module):
    def __init__(self):
        super(LinearRegressionModel, self).__init__()
        self.linear = nn.Linear(1, 1)

    def forward(self, x):
        return self.linear(x)

# Initialize the process group using the Gloo backend for CPU communication
def init_process(rank, size, fn, backend='gloo'):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '23554'  # Use a different port if issues
    dist.init_process_group(backend, rank=rank, world_size=size)
    fn(rank, size)
    dist.destroy_process_group()

# Define the training function for distributed workers
def train(rank, size):
    # Create synthetic data for linear regression
    x_train = torch.tensor([[i] for i in range(10)], dtype=torch.float32)
    y_train = torch.tensor([[2 * i + 1] for i in range(10)], dtype=torch.float32)

    # Initialize the model and move it to the current process
    model = LinearRegressionModel()

    # Create a loss function and optimizer
    criterion = nn.MSELoss()
    optimizer = optim.SGD(model.parameters(), lr=0.01)

    # Simulate different workers starting with their own weights
    print(f"Rank {rank} model initial weights: {model.linear.weight.item()}")

    # Perform training for a few epochs
    for epoch in range(20):
        optimizer.zero_grad()
        output = model(x_train)
        loss = criterion(output, y_train)
        loss.backward()

        # Perform AllReduce operation to sum gradients across workers
        for param in model.parameters():
            dist.all_reduce(param.grad.data, op=dist.ReduceOp.SUM)
            param.grad.data /= size  # Average the gradients

        optimizer.step()

    # Final weights after training
    print(f"Rank {rank} model final weights: {model.linear.weight.item()}")

# Function to spawn processes and simulate workers
def spawn_processes():
    size = 2  # Simulate two processes (workers)
    processes = []

    for rank in range(size):
        p = Process(target=init_process, args=(rank, size, train))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

# Run the distributed training simulation
spawn_processes()


Process Process-15:
Traceback (most recent call last):
  File "/usr/lib/python3.10/multiprocessing/process.py", line 314, in _bootstrap
    self.run()
  File "/usr/lib/python3.10/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-15-a478fe360d71>", line 21, in init_process
    dist.init_process_group(backend, rank=rank, world_size=size)
  File "/usr/local/lib/python3.10/dist-packages/torch/distributed/c10d_logger.py", line 79, in wrapper
    return func(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/torch/distributed/c10d_logger.py", line 93, in wrapper
    func_return = func(*args, **kwargs)
  File "/usr/local/lib/python3.10/dist-packages/torch/distributed/distributed_c10d.py", line 1361, in init_process_group
    store, rank, world_size = next(rendezvous_iterator)
  File "/usr/local/lib/python3.10/dist-packages/torch/distributed/rendezvous.py", line 258, in _env_rendezvous_handler
    store = _create_c1

KeyboardInterrupt: 


## Section 3: Horovod

In this example, we use **Horovod**, an open-source distributed training library designed for scaling deep learning models across multiple GPUs and machines. Originally developed by Uber, Horovod supports both TensorFlow and PyTorch. It simplifies the process of distributed training by providing easy integration with minimal code changes.

Horovod uses the **Ring-AllReduce** algorithm, which efficiently synchronizes gradients between workers (GPUs). Each worker computes its local gradients, then Horovod aggregates them by passing gradients through a ring of GPUs. This minimizes communication overhead, making it possible to scale across many GPUs or even multiple machines.

Horovod allows us to wrap the standard optimizers (like Adam or SGD) in a **Horovod DistributedOptimizer**, which handles gradient synchronization automatically. This ensures that all workers (GPUs) update their models with the same gradient values, maintaining model consistency across the system.

### Key Components:
- **Horovod**: A distributed training library for TensorFlow and PyTorch, which simplifies scaling deep learning models.
- **Ring-AllReduce**: A communication algorithm that passes gradients around a ring of workers (GPUs) to aggregate them efficiently.
- **hvd.DistributedOptimizer**: A wrapper around standard optimizers that automatically synchronizes gradients between workers.
- **BroadcastGlobalVariablesCallback**: Ensures all workers start with the same initial weights by broadcasting the variables from the first worker to all others.

Unlike non-distributed training where everything happens on a single GPU or machine, Horovod allows training to be distributed across multiple GPUs or machines. This reduces training time by leveraging more computational resources in parallel.

Although this Colab example runs on a single GPU, it simulates the behavior of multiple GPUs using Horovod. When scaling to multiple GPUs or nodes, this setup remains the same, and Horovod automatically handles the communication between devices.


---

### Install Horovod

Horovod needs to be installed along with TensorFlow for this example.

In [None]:
!apt-get update && apt-get install -y --no-install-recommends openmpi-bin libopenmpi-dev


Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:3 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:4 http://archive.ubuntu.com/ubuntu jammy-updates InRelease
Hit:5 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Ign:6 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Hit:7 https://r2u.stat.illinois.edu/ubuntu jammy Release
Hit:8 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:9 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Hit:11 http://security.ubuntu.com/ubuntu jammy-security InRelease
Fetched 3,626 B in 4s (890 B/s)
Reading package lists... Done
W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (

In [None]:
%env HOROVOD_WITH_TENSORFLOW=1


env: HOROVOD_WITH_TENSORFLOW=1


In [None]:
!pip install horovod[tensorflow]


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
                   from /usr/local/lib/python3.10/dist-packages/torch/include/c10/util/Float8_e5m2.h:17,
                   from /usr/local/lib/python3.10/dist-packages/torch/include/c10/core/ScalarType.h:8,
                   from /usr/local/lib/python3.10/dist-packages/torch/include/c10/core/Scalar.h:9,
                   from /usr/local/lib/python3.10/dist-packages/torch/include/ATen/core/TensorBody.h:16,
                   from /usr/local/lib/python3.10/dist-packages/torch/include/ATen/core/Tensor.h:3,
                   from /usr/local/lib/python3.10/dist-packages/torch/include/ATen/Tensor.h:3,
                   from /usr/local/lib/python3.10/dist-packages/torch/include/torch/csrc/autograd/function_hook.h:3,
                   from /usr/local/lib/python3.10/dist-packages/torch/include/torch/csrc/autograd/cpp_hook.h:2,
                   from /usr/local/lib/python3.10/dist-packages/torch/include/torch/csrc/autograd/v

In [None]:
import horovod.tensorflow.keras as hvd

# Initialize Horovod
hvd.init()

print(f"Horovod is running with {hvd.size()} process(es).")


In [None]:
import tensorflow as tf
import horovod.tensorflow.keras as hvd

# Initialize Horovod
hvd.init()

# Pin GPU to be used by this process (if using GPU)
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    tf.config.set_visible_devices(gpus[hvd.local_rank()], 'GPU')

# Load and preprocess the dataset
(x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data()
x_train = x_train[hvd.rank()::hvd.size()]
y_train = y_train[hvd.rank()::hvd.size()]
x_train, x_test = x_train / 255.0, x_test / 255.0

# Adjust learning rate based on the number of processes
scaled_lr = 0.001 * hvd.size()

# Build the model
model = tf.keras.Sequential([
    tf.keras.layers.Flatten(input_shape=(28, 28)),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(10, activation='softmax')
])

# Wrap the optimizer with Horovod DistributedOptimizer
optimizer = tf.keras.optimizers.Adam(learning_rate=scaled_lr)
optimizer = hvd.DistributedOptimizer(optimizer)

# Compile the model
model.compile(optimizer=optimizer, loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# Broadcast initial variable states from rank 0 to all other processes
callbacks = [
    hvd.callbacks.BroadcastGlobalVariablesCallback(0),
    hvd.callbacks.MetricAverageCallback(),
]

# Train the model
model.fit(
    x_train, y_train,
    batch_size=64,
    callbacks=callbacks,
    epochs=5,
    verbose=1 if hvd.rank() == 0 else 0  # Only print logs on rank 0
)

# Evaluate the model
if hvd.rank() == 0:
    model.evaluate(x_test, y_test)


## Exercise: Modify the Horovod Distributed Training

Now that you've learned about **Horovod** and its distributed training capabilities, let's modify the training script to see how different settings affect the performance.

### Task:
1. Modify the learning rate scaling. Instead of scaling it linearly with the number of GPUs, scale it by a factor of 0.5 per GPU.
2. Change the number of **epochs** from 5 to 3 and observe how the training performance changes.

### Steps:
1. Locate the line where the learning rate is scaled: `scaled_lr = 0.001 * hvd.size()`.
2. Modify this line to `scaled_lr = 0.001 * hvd.size() * 0.5`.
3. Change the `epochs` argument in the `model.fit()` function from 5 to 3.
4. Rerun the training and observe the training speed and accuracy.

This exercise will help you understand how learning rate scaling impacts distributed training and how modifying training parameters can affect performance in a distributed setting.


#Explanation:
Horovod uses the Ring-AllReduce algorithm to synchronize gradients across processes efficiently.

We use the MNIST dataset and a simple neural network model for training. The learning rate is scaled by the number of workers to ensure stability.

GLOO

In [None]:
import torch
import torch.distributed as dist
from torch.multiprocessing import Process
import os

# Initialize the process group
def init_process(rank, size, fn, backend='gloo'):  # Use 'gloo' instead of 'nccl'
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'
    dist.init_process_group(backend, rank=rank, world_size=size)
    fn(rank, size)
    dist.destroy_process_group()

# A simple function to simulate AllReduce using Gloo
def run(rank, size):
    tensor = torch.ones(1) * rank  # Run on CPU now
    print(f"Before AllReduce: Rank {rank} has {tensor.item()}")

    # Perform an AllReduce operation across "virtual" processes
    dist.all_reduce(tensor, op=dist.ReduceOp.SUM)

    print(f"After AllReduce: Rank {rank} has {tensor.item()}")

# Spawn multiple processes to simulate distributed training
def spawn_processes():
    size = 2  # Simulate 2 processes
    processes = []

    for rank in range(size):
        p = Process(target=init_process, args=(rank, size, run))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

# Run the modified Gloo example
spawn_processes()


Before AllReduce: Rank 1 has 1.0
Before AllReduce: Rank 0 has 0.0
After AllReduce: Rank 1 has 1.0After AllReduce: Rank 0 has 1.0



## Distributed ML in HPC Cluster 

### Notebook 01 — PyTorch single GPU sanity (pt1n)

#### Cell 1: setup and submit

In [None]:


set -euxo pipefail
WORKDIR="$HOME/pt-single"
mkdir -p "$WORKDIR" && cd "$WORKDIR"

# copy the template locally
install -D -m 0644 /project/dlstack/templates/pt1n.sbatch ./pt1n.sbatch

# submit
jid=$(sbatch pt1n.sbatch | awk '{print $4}')
echo "JOBID=$jid" | tee .jobid




#### Cell 2: monitor

In [None]:
jid=$(cat $HOME/pt-single/.jobid)
echo "Watching JOBID=$jid"
squeue -j "$jid" -o "%.18i %.10u %.12P %.40j %.2t %.10M %R"


#### Cell 3: show output (once file appears)

In [None]:

cd "$HOME/pt-single"
OUT=$(ls pt1n.$(cat .jobid).out 2>/dev/null || true)
if [ -n "$OUT" ]; then
  sed -n '1,160p' "$OUT"
else
  echo "Output file not present yet. Re-run this cell in a bit."
fi


### Notebook 02 — PyTorch DDP with real data (CIFAR-10, 2 nodes)

#### Cell 1: setup files

In [None]:
set -euxo pipefail
WORKDIR="$HOME/ddp-cifar10"
mkdir -p "$WORKDIR" && cd "$WORKDIR"

install -D -m 0644 /project/dlstack/templates/pt_ddp_cifar10.sbatch ./pt_ddp_cifar10.sbatch
install -D -m 0644 /project/dlstack/examples/train_cifar10_ddp.py ./train_cifar10_ddp.py


#### Cell 2: choose quick knobs

In [None]:

# fast demo defaults
export EPOCHS=1 BATCH=128 WORKERS=2 SUBSET=8000 AMP=1
env | egrep '^(EPOCHS|BATCH|WORKERS|SUBSET|AMP)=' || true

#### Cell 3: submit

In [None]:
cd "$HOME/ddp-cifar10"
jid=$(sbatch pt_ddp_cifar10.sbatch | awk '{print $4}')
echo "JOBID=$jid" | tee .jobid


#### Cell 4: monitor & peek logs

In [None]:
jid=$(cat $HOME/ddp-cifar10/.jobid)
echo "JOBID=$jid"
squeue -j "$jid" -o "%.18i %.10u %.12P %.40j %.2t %.10M %R"

#### Cell 5: show last lines (re-run to refresh)

In [None]:

cd "$HOME/ddp-cifar10"
OUT="cifar10_ddp.$(cat .jobid).out"
test -f "$OUT" && tail -n 80 "$OUT" || echo "Waiting for $OUT ..."

#### Cell 6: check artifact

In [None]:
ls -lh "$HOME/ddp-cifar10/checkpoints/" || true

### Notebook 03 — TensorFlow single GPU sanity (tf1n)

#### Cell 1: setup and submit

In [None]:
set -euxo pipefail
WORKDIR="$HOME/tf-single"
mkdir -p "$WORKDIR" && cd "$WORKDIR"

install -D -m 0644 /project/dlstack/templates/tf1n.sbatch ./tf1n.sbatch

jid=$(sbatch tf1n.sbatch | awk '{print $4}')
echo "JOBID=$jid" | tee .jobid

#### Cell 2: monitor & read

In [None]:

jid=$(cat $HOME/tf-single/.jobid)
squeue -j "$jid" -o "%.18i %.10u %.12P %.40j %.2t %.10M %R"
OUT="tf1n.$jid.out"
test -f "$OUT" && sed -n '1,200p' "$OUT" || echo "Waiting for $OUT ..."
