In [1]:
import torch
import torch.nn as nn
#device information
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

print("device:", device)

device: cpu


In [2]:
#move tensor from GPU to cpu
x = torch.tensor(1.0, device=torch.device('cpu'))
print(x.device)  # cpu

y = x.to('cuda' if torch.cuda.is_available() else 'cpu')
print(y.device)  # cuda:0 if a GPU is available, otherwise cpu

cpu
cpu


In [None]:
def train(model):
    # Use GPU if available
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    model.to(device)
    model.train()

### parallelism
In deep learning, parallelism is used to distribute the training process across multiple devices, such as GPUs or TPUs, to accelerate the training time. There are two main types of parallelism: data parallelism and model parallelism.

1. Data parallelism:
Data parallelism involves splitting the input data into smaller batches and distributing them across multiple devices. Each device processes a subset of the data independently and computes the gradients for the model's parameters. The gradients are then combined (averaged) across all devices, and the model parameters are updated accordingly.

Data parallelism is the most common form of parallelism in deep learning, as it is relatively easy to implement and works well for many tasks. It can lead to substantial speedup in training, especially when working with large datasets and large batch sizes.

2. Model parallelism:
Model parallelism is the process of distributing the model itself across multiple devices. This approach is particularly useful when the model is too large to fit into the memory of a single device. In model parallelism, different parts of the model, such as layers or sub-networks, are assigned to different devices. The forward and backward passes are then performed by coordinating the execution of the model's parts across the devices.

Model parallelism can be more challenging to implement than data parallelism, as it requires careful partitioning of the model and synchronization of the devices during training. However, it can be an effective strategy for training very large models that cannot fit into the memory of a single device.

In some cases, it is possible to combine both data parallelism and model parallelism to take advantage of their respective strengths. This is known as hybrid parallelism and can lead to even greater speedup in training times and the ability to train extremely large models.

### Data Parallelism
Data parallelism can be implemented in variou
s deep learning frameworks. Here, I'll show you how to do data parallelism using PyTorch.

1. First, make sure that you have multiple GPUs available on your system. You can check this using the following command:

In [3]:
import torch
print("Number of GPUs available:", torch.cuda.device_count())

Number of GPUs available: 0


2. Next, create your model using a neural network class that inherits from `torch.nn.Module`. For example, here is a simple model:

In [None]:
import torch.nn as nn

class MyModel(nn.Module):
    def __init__(self):
        super(MyModel, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, 3)
        self.conv2 = nn.Conv2d(32, 64, 3)
        self.fc1 = nn.Linear(64 * 6 * 6, 128)
        self.fc2 = nn.Linear(128, 10)

    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.max_pool2d(x, 2)
        x = F.relu(self.conv2(x))
        x = F.max_pool2d(x, 2)
        x = x.view(-1, 64 * 6 * 6)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

model = MyModel()

3. Now, wrap your model using `torch.nn.DataParallel`. This will automatically distribute the model across multiple GPUs:

In [None]:
if torch.cuda.device_count() > 1:
    model = nn.DataParallel(model)

model = model.to('cuda')

4. Create your data loaders for training and validation data. Make sure that the batch size is set appropriately to make use of the multiple GPUs:

In [None]:
import torch.utils.data as data

train_dataset = MyTrainDataset()
train_loader = data.DataLoader(train_dataset, batch_size=64, shuffle=True)

val_dataset = MyValDataset()
val_loader = data.DataLoader(val_dataset, batch_size=64, shuffle=False)


5. Finally, train your model using the standard training loop. The data parallelism will automatically handle the distribution of the data and model across the available GPUs:

By following these steps, your model will be trained using data parallelism, distributing the workload across the available GPUs to speed up the training process.

In [None]:

criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

for epoch in range(num_epochs):
    model.train()
    for batch, (inputs, targets) in enumerate(train_loader):
        inputs, targets = inputs.cuda(), targets.cuda()
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, targets)
        loss.backward()
        optimizer.step()

    # Validate your model
    model.eval()
    # ... (Your validation code here)

### Gradients across each parallel model are synchronized during the backward pass
When using data parallelism in deep learning frameworks like PyTorch, the framework automatically ensures that the gradients across each parallel model are synchronized during the backward pass.

In PyTorch, when you wrap your model with `nn.DataParallel`, it creates replicas of your model on each available GPU. During the forward pass, the input batch is divided into smaller mini-batches, and each mini-batch is processed by a separate model replica on each GPU.

After the forward pass, the backward pass computes the gradients for each model replica. Once the gradients are computed, PyTorch automatically averages the gradients across all model replicas and updates the original model's parameters with these averaged gradients.

You don't have to worry about synchronizing gradients manually, as the framework takes care of it. Here's the example code from the previous answer for reference:

```python
if torch.cuda.device_count() > 1:
    model = nn.DataParallel(model)

model = model.to('cuda')
```

By using `nn.DataParallel(model)`, the gradient synchronization is automatically handled by PyTorch during training.

#### Parallelism on cpu
Yes, parallelism can work on CPUs, but it typically does not provide as significant performance improvements as GPUs. This is because GPUs are specifically designed for parallel processing with a large number of cores, whereas CPUs have fewer cores optimized for general-purpose computing.

However, you can still utilize multiple CPU cores for parallel processing using libraries like use `torch.nn.parallel.DistributedDataParallel` or `torch.distributed`.

We use `torch.distributed` to set up a distributed training environment. The `train` function initializes the distributed environment, loads the dataset with a `DistributedSampler`, creates the model, wraps it with `DistributedDataParallel`, and performs the training. The main function spawns multiple processes to run the `train` function in parallel.

Please note that `DistributedDataParallel` is designed for multi-GPU training and might not be as efficient for multi-CPU training. If you plan to use multiple GPUs, consider setting the device to `cuda` and using `nccl` as the backend for `init_process_group`.

the backend used for CPU data parallelism is 'gloo'. The gloo backend supports both CPU and GPU operations, but it is optimized for CPU.

`torch.multiprocessing` was not used in the revised example because `torch.distributed` provides a higher-level API for distributed training with better support for multi-GPU and multi-node setups. By using `torch.distributed` along with `DistributedDataParallel`, we can handle the complexity of distributed training more easily.


In [None]:
import os
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.multiprocessing import Process
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from torch.utils.data.distributed import DistributedSampler

# Define the model
class SimpleNet(nn.Module):
    def __init__(self):
        super(SimpleNet, self).__init__()
        self.fc = nn.Linear(28 * 28, 10)

    def forward(self, x):
        x = x.view(x.size(0), -1)
        x = self.fc(x)
        return x

def train(rank, world_size, num_epochs=5, batch_size=64, learning_rate=0.01):
    # Initialize the distributed environment
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'
    dist.init_process_group("gloo", rank=rank, world_size=world_size)

    # Load the dataset
    #rank: The rank of the current worker process (0 to world_size-1).
    transform = transforms.Compose([transforms.ToTensor(), transforms.Normalize((0.1307,), (0.3081,))])
    train_dataset = datasets.MNIST(root='/Users/isabelleliu/Desktop/code practice', train=True, transform=transform, download=True)
    train_sampler = DistributedSampler(train_dataset, num_replicas=world_size, rank=rank)
    train_loader = DataLoader(train_dataset, batch_size=batch_size, sampler=train_sampler)

    # Create the model
    model = SimpleNet()

    # Wrap the model with DistributedDataParallel
    model = nn.parallel.DistributedDataParallel(model)

    # Define the optimizer
    optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)

    # Start training
    for epoch in range(num_epochs):
        for batch_idx, (data, target) in enumerate(train_loader):
            optimizer.zero_grad()
            output = model(data)
            loss = nn.functional.cross_entropy(output, target)
            loss.backward()
            optimizer.step()

            if batch_idx % 100 == 0:
                print(f'Rank: {rank}, Epoch: {epoch}, Batch: {batch_idx}, Loss: {loss.item()}')

    # Clean up
    dist.destroy_process_group()

def main():
    #world_size: The total number of worker processes.
    world_size = 4
    processes = []
    for rank in range(world_size):
        p = Process(target=train, args=(rank, world_size))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

if __name__ == '__main__':
    main()


Output sample

isabelleliu@wireless-nat-inside Desktop % python distributed_training.py

[W ProcessGroupGloo.cpp:725] Warning: Unable to resolve hostname to a (local) address. Using the loopback address as fallback. Manually set the network interface to bind to with GLOO_SOCKET_IFNAME. (function operator())

[W ProcessGroupGloo.cpp:725] Warning: Unable to resolve hostname to a (local) address. Using the loopback address as fallback. Manually set the network interface to bind to with GLOO_SOCKET_IFNAME. (function operator())

[W ProcessGroupGloo.cpp:725] Warning: Unable to resolve hostname to a (local) address. Using the loopback address as fallback. Manually set the network interface to bind to with GLOO_SOCKET_IFNAME. (function operator())

[W ProcessGroupGloo.cpp:725] Warning: Unable to resolve hostname to a (local) address. Using the loopback address as fallback. Manually set the network interface to bind to with GLOO_SOCKET_IFNAME. (function operator())

Rank: 2, Epoch: 0, Batch: 0, Loss: 2.477926015853882
Rank: 1, Epoch: 0, Batch: 0, Loss: 2.477217674255371
Rank: 0, Epoch: 0, Batch: 0, Loss: 2.4008755683898926
Rank: 3, Epoch: 0, Batch: 0, Loss: 2.440624952316284
Rank: 2, Epoch: 0, Batch: 100, Loss: 0.5909628868103027
Rank: 1, Epoch: 0, Batch: 100, Loss: 0.6203895807266235
Rank: 3, Epoch: 0, Batch: 100, Loss: 0.4222588837146759
Rank: 0, Epoch: 0, Batch: 100, Loss: 0.6428020596504211
Rank: 2, Epoch: 0, Batch: 200, Loss: 0.4741484820842743
Rank: 1, Epoch: 0, Batch: 200, Loss: 0.47861358523368835
Rank: 0, Epoch: 0, Batch: 200, Loss: 0.3796921372413635
Rank: 3, Epoch: 0, Batch: 200, Loss: 0.39659440517425537
Rank: 2, Epoch: 1, Batch: 0, Loss: 0.4313582479953766
Rank: 3, Epoch: 1, Batch: 0, Loss: 0.5145975351333618
Rank: 1, Epoch: 1, Batch: 0, Loss: 0.6627392768859863
Rank: 0, Epoch: 1, Batch: 0, Loss: 0.45296281576156616
Rank: 2, Epoch: 1, Batch: 100, Loss: 0.46159809827804565
Rank: 1, Epoch: 1, Batch: 100, Loss: 0.46639543771743774
Rank: 0, Epoch: 1, Batch: 100, Loss: 0.43611079454421997
Rank: 3, Epoch: 1, Batch: 100, Loss: 0.2286127507686615
Rank: 0, Epoch: 1, Batch: 200, Loss: 0.2949753701686859
Rank: 3, Epoch: 1, Batch: 200, Loss: 0.3011402189731598
Rank: 1, Epoch: 1, Batch: 200, Loss: 0.40933674573898315
Rank: 2, Epoch: 1, Batch: 200, Loss: 0.3895317614078522
Rank: 3, Epoch: 2, Batch: 0, Loss: 0.4275888502597809
Rank: 2, Epoch: 2, Batch: 0, Loss: 0.35294681787490845
Rank: 1, Epoch: 2, Batch: 0, Loss: 0.598163902759552
Rank: 0, Epoch: 2, Batch: 0, Loss: 0.3804122805595398
Rank: 3, Epoch: 2, Batch: 100, Loss: 0.18629871308803558
Rank: 2, Epoch: 2, Batch: 100, Loss: 0.4326363801956177
Rank: 0, Epoch: 2, Batch: 100, Loss: 0.3880370259284973
Rank: 1, Epoch: 2, Batch: 100, Loss: 0.43369781970977783
Rank: 3, Epoch: 2, Batch: 200, Loss: 0.26491454243659973
Rank: 1, Epoch: 2, Batch: 200, Loss: 0.38378211855888367
Rank: 0, Epoch: 2, Batch: 200, Loss: 0.26226669549942017
Rank: 2, Epoch: 2, Batch: 200, Loss: 0.3595591187477112
Rank: 2, Epoch: 3, Batch: 0, Loss: 0.3174893260002136
Rank: 1, Epoch: 3, Batch: 0, Loss: 0.5703485608100891
Rank: 3, Epoch: 3, Batch: 0, Loss: 0.38869717717170715
Rank: 0, Epoch: 3, Batch: 0, Loss: 0.34726154804229736
Rank: 2, Epoch: 3, Batch: 100, Loss: 0.41687583923339844
Rank: 1, Epoch: 3, Batch: 100, Loss: 0.4193125367164612
Rank: 3, Epoch: 3, Batch: 100, Loss: 0.1669737845659256
Rank: 0, Epoch: 3, Batch: 100, Loss: 0.3657465875148773
Rank: 2, Epoch: 3, Batch: 200, Loss: 0.3439997434616089
Rank: 0, Epoch: 3, Batch: 200, Loss: 0.24382120370864868
Rank: 1, Epoch: 3, Batch: 200, Loss: 0.3696056306362152
Rank: 3, Epoch: 3, Batch: 200, Loss: 0.24463820457458496
Rank: 2, Epoch: 4, Batch: 0, Loss: 0.2956679165363312
Rank: 3, Epoch: 4, Batch: 0, Loss: 0.36574244499206543
Rank: 0, Epoch: 4, Batch: 0, Loss: 0.32660651206970215
Rank: 1, Epoch: 4, Batch: 0, Loss: 0.5530359745025635
Rank: 1, Epoch: 4, Batch: 100, Loss: 0.41100385785102844
Rank: 0, Epoch: 4, Batch: 100, Loss: 0.3527187705039978
Rank: 3, Epoch: 4, Batch: 100, Loss: 0.15558141469955444
Rank: 2, Epoch: 4, Batch: 100, Loss: 0.4053698480129242
Rank: 3, Epoch: 4, Batch: 200, Loss: 0.23122680187225342
Rank: 0, Epoch: 4, Batch: 200, Loss: 0.23176565766334534
Rank: 1, Epoch: 4, Batch: 200, Loss: 0.3601338863372803
Rank: 2, Epoch: 4, Batch: 200, Loss: 0.33446624875068665

This code demonstrates a simple distributed training example using PyTorch's `DistributedDataParallel` module. The model being trained is a simple feed-forward neural network called `SimpleNet` designed for the MNIST dataset. The code can be broken down into several parts:

1. `SimpleNet` class definition: Defines a simple feed-forward neural network with one fully connected (linear) layer that takes a 28x28 input (the size of MNIST images) and outputs 10 logits (one for each class).

2. `train` function: This function is the main training loop for each worker process. It takes the following arguments:
    - `rank`: The rank of the current worker process (0 to `world_size-1`).
    - `world_size`: The total number of worker processes.
    - `num_epochs`, `batch_size`, and `learning_rate`: Hyperparameters for the training process.

   The function initializes the distributed environment using the "gloo" backend, loads the dataset, creates the model, and wraps it with `DistributedDataParallel`. It then defines the optimizer and starts the training loop. During training, the model's gradients are synchronized across all worker processes to perform an update. The loss is printed every 100 batches.

3. `main` function: This function creates and starts separate processes for each worker (one process per worker) using the `torch.multiprocessing.Process` class. Each process runs the `train` function with its unique rank and the total number of worker processes (`world_size`). After starting all processes, the main function waits for them to finish using the `join()` method.

4. The last part of the code is the `if __name__ == '__main__':` block, which calls the `main()` function when the script is run. This ensures that the distributed training is started only when the script is run as the main module, not when it is imported as a module in another script.

In summary, this code demonstrates a simple distributed training example with PyTorch using the `DistributedDataParallel` module for data parallelism. The training is performed using multiple worker processes, each with a separate instance of the model. The gradients are synchronized across all worker processes during the training process.

### Some backend intro:

1. PRC: "Process Group RPC Backend." RPC stands for Remote Procedure Call, which is a communication protocol that allows one program to request a service from another program over a network. In the context of PyTorch, the RPC backend is responsible for handling communication between distributed processes. The Process Group RPC Backend is built on top of the Process Group Backend, and it uses collective communication functions to implement RPC communication.

2. NCCL: NVIDIA Collective Communications Library (NCCL) is a library developed by NVIDIA that provides highly optimized multi-GPU collective communication primitives. NCCL is designed to work with NVIDIA GPUs and can scale across multiple nodes. It is widely used in deep learning frameworks like PyTorch and TensorFlow to enable efficient communication between GPUs in a distributed training environment.

3. Gloo: Gloo is a collective communications library that was designed to be fast, flexible, and work well with various deep learning frameworks. It provides a variety of collective operations like AllReduce, AllGather, and Broadcast, which are essential for distributed deep learning. Gloo is part of PyTorch's distributed package and can be used as a backend for CPU and GPU-based distributed training.

In summary, NCCL and Gloo are both communication backends used for distributed training in deep learning frameworks like PyTorch. NCCL is optimized for NVIDIA GPUs, while Gloo is a more general-purpose library that works with both CPUs and GPUs.

### Launch and Spawn

In the context of distributed computing and parallel programming, "launch" and "spawn" refer to starting new processes or threads to perform tasks concurrently. Here is a brief explanation of each term:

1. Launch: Launching a process means starting a new instance of a program, typically by creating a new process or thread, to execute the program code. In the context of distributed deep learning, "launching" often refers to starting multiple processes, each running a copy of the same program, to perform model training in parallel. For example, in PyTorch, the `torch.distributed.launch` module allows you to launch multiple processes for distributed training easily.

2. Spawn: Spawning a process is similar to launching a process. It involves creating and starting a new process or thread to execute a specific function or a part of the program code. Spawning is often used in multiprocessing libraries to initiate parallel tasks. In Python's multiprocessing library, the `Process` class's `start` method is used to spawn a new process, and the target function is specified as a constructor argument.

Both "launch" and "spawn" involve creating and starting new processes or threads to perform tasks concurrently. The difference lies in the context and the libraries or tools used to initiate these parallel tasks.

### Run data parallelism on multiple GPUs using PyTorch
To run data parallelism on multiple GPUs using PyTorch, you can use the `torch.distributed.launch` module for launching and `torch.multiprocessing.spawn` for spawning processes. Here is an example of how to use both methods:

1. Using `torch.distributed.launch`:

First, create a Python script (e.g., `train.py`) containing your model and training code:

Next, use the `torch.distributed.launch` command to run your script on multiple GPUs:

Replace `NUM_GPUS_YOU_HAVE` with the number of GPUs you want to use for data parallelism.

2. Using `torch.multiprocessing.spawn`:

Modify your training script (e.g., `train_spawn.py`) to include the `spawn` function and run it in bash

In [None]:
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP
import os

def main():
    # Set up the device and rank
    device_id = int(os.environ['LOCAL_RANK'])
    torch.cuda.set_device(device_id)
    dist.init_process_group(backend='nccl')

    # Create your model, loss function, and optimizer
    model = nn.Linear(10, 10).to(device_id)
    ddp_model = DDP(model, device_ids=[device_id])
    loss_fn = nn.MSELoss()
    optimizer = optim.SGD(ddp_model.parameters(), lr=0.001)

    # Training loop
    for epoch in range(10):
        optimizer.zero_grad()
        outputs = ddp_model(torch.randn(20, 10).to(device_id))
        labels = torch.randn(20, 10).to(device_id)
        loss = loss_fn(outputs, labels)
        loss.backward()
        optimizer.step()
        print(f"Epoch: {epoch}, Loss: {loss.item()}")

if __name__ == "__main__":
    main()

#bash
python -m torch.distributed.launch --nproc_per_node=NUM_GPUS_YOU_HAVE train.py


In [None]:
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.multiprocessing import spawn
import os

def main(rank, world_size):
    # Set up the device and rank
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'
    dist.init_process_group("nccl", rank=rank, world_size=world_size)
    device_id = rank
    torch.cuda.set_device(device_id)

    # ... (the rest of the code is the same as above)

if __name__ == "__main__":
    world_size = torch.cuda.device_count()  # Number of available GPUs
    spawn(main, args=(world_size,), nprocs=world_size, join=True)
```

#bash
python train_spawn.py                                                                                                                                                                                                 

