# Module 2
## Lab 1: Multi-Accelerator Distribtuion
In this lab we will discuss how we can leverage multiple accelerators in a single device to perform distributed operations. We will review previous concepts, re-contextualized with multiple GPUs.

### Pre-Requisite Knowledge
We highly suggestion you read up on [sharding concepts and collective communications](https://jax-ml.github.io/scaling-book/sharding/).

### In this Lab You Will:
- Run a larger version of llama on multiple GPUs
- Use different sharding methods
- Do a basic distributed GEMM Calculation 
- Understand the different parallization approaches
- (Optional) See a more detailed/conceptual breakdown of how matrices are sharded across GPUs


### Imports and GPU Information
Here we import the relevant libraries and retrieve detailed information about the available GPUs in our environment. We use pynvml to get low-level GPU metrics (e.g., name, memory size, clock speeds), and set up any necessary environment variables (like PyTorch memory allocations). This step helps us understand and confirm our hardware configuration. 

In [1]:
import os
parent_dir = os.path.abspath(os.path.join(os.getcwd(), ".."))
os.chdir(parent_dir)

In [2]:
# Step 1: Environment Setup
%pip install deepspeed==0.16.2 transformers==4.47.1 accelerate==1.2.1 torch pynvml matplotlib numpy scipy torchvision mpi4py

Defaulting to user installation because normal site-packages is not writeable
Note: you may need to restart the kernel to use updated packages.


In [3]:
# Step 2: Imports and GPU Information

import torch
import pynvml
import os

# Configure PyTorch memory
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"

pynvml.nvmlInit()
num_gpus = pynvml.nvmlDeviceGetCount()
memory_bandwidth_tb_s = 0.3

print(f"Available GPUs: {num_gpus}")

gpu_info = {}
for i in range(num_gpus):
    handle = pynvml.nvmlDeviceGetHandleByIndex(i)
    name = pynvml.nvmlDeviceGetName(handle)
    memory_info = pynvml.nvmlDeviceGetMemoryInfo(handle)
    mem_total_tb = memory_info.total / (1024 ** 3)
    clock_info = pynvml.nvmlDeviceGetClockInfo(handle, pynvml.NVML_CLOCK_GRAPHICS)
    mem_clock_info = pynvml.nvmlDeviceGetClockInfo(handle, pynvml.NVML_CLOCK_MEM)
    compute_capability = torch.cuda.get_device_capability(i)
    
    gpu_info[i] = {
        'name': name,
        'memory_tb': mem_total_tb,
        'gpu_clock_mhz': clock_info,
        'mem_clock_mhz': mem_clock_info,
        'memory_bandwidth_tb_s': memory_bandwidth_tb_s,
        'compute_capability': compute_capability
    }
    
    print(f"GPU {i}: {name}")
    print(f"  Memory: {mem_total_tb:.2f} GB")
    print(f"  GPU Clock: {clock_info} MHz")
    print(f"  Memory Clock: {mem_clock_info} MHz")
    print(f"  Approx. Memory Bandwidth: {memory_bandwidth_tb_s:.2f} TB/s")
    print(f"  Compute Capability: {compute_capability}")

pynvml.nvmlShutdown()
print("\nNote: For detailed CUDA core and tensor core counts, refer to NVIDIA official GPU specifications.")


Available GPUs: 4
GPU 0: NVIDIA L4
  Memory: 22.49 GB
  GPU Clock: 210 MHz
  Memory Clock: 405 MHz
  Approx. Memory Bandwidth: 0.30 TB/s
  Compute Capability: (8, 9)
GPU 1: NVIDIA L4
  Memory: 22.49 GB
  GPU Clock: 210 MHz
  Memory Clock: 405 MHz
  Approx. Memory Bandwidth: 0.30 TB/s
  Compute Capability: (8, 9)
GPU 2: NVIDIA L4
  Memory: 22.49 GB
  GPU Clock: 210 MHz
  Memory Clock: 405 MHz
  Approx. Memory Bandwidth: 0.30 TB/s
  Compute Capability: (8, 9)
GPU 3: NVIDIA L4
  Memory: 22.49 GB
  GPU Clock: 210 MHz
  Memory Clock: 405 MHz
  Approx. Memory Bandwidth: 0.30 TB/s
  Compute Capability: (8, 9)

Note: For detailed CUDA core and tensor core counts, refer to NVIDIA official GPU specifications.


**This time we will use all 4 GPUs**

### Our Baseline

Like last time, let's aim to deploy our llama model. This time we'll try to get closer to the larger model we want to run. So we'll do a llama model at 7B parameters. We'll still try to run it on one GPU.

We'll use the same abstraction we used last time.

In [3]:
import torch
import pynvml
import os
import json
import importlib
import src.utils.model_utils as mutils
importlib.reload(mutils)

results = mutils.benchmark_batch_sizes(
    model_name="NousResearch/Meta-Llama-3.1-8B",
    seq_len=32,
    batch_sizes=[1],
    dtype=torch.bfloat16,
)

[2025-05-22 19:17:14,710] [INFO] [real_accelerator.py:222:get_accelerator] Setting ds_accelerator to cuda (auto detect)


/usr/bin/ld: cannot find -laio: No such file or directory
collect2: error: ld returned 1 exit status
/usr/bin/ld: cannot find -laio: No such file or directory
collect2: error: ld returned 1 exit status


Loading checkpoint shards:   0%|          | 0/4 [00:00<?, ?it/s]


🔁 Running batch size = 1
benchmark failed: CUDA out of memory. Tried to allocate 112.00 MiB. GPU 0 has a total capacity of 22.05 GiB of which 51.94 MiB is free. Including non-PyTorch memory, this process has 21.99 GiB memory in use. Of the allocated memory 21.54 GiB is allocated by PyTorch, and 217.35 MiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation.  See documentation for Memory Management  (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)


We ran out of memory! This is to be expected this model is more than 8x the size of the last model we ran. Batching won't solve this, now we'll have to split hte model up by utilizing sharding.

Let's clear our GPU memory first

In [5]:
import src.utils.model_utils as mutils
import gc
importlib.reload(mutils)
gc.collect()
import psutil
import torch.distributed as dist

mutils.reset_distributed_and_clear_memory()

✅ Distributed env torn down and memory cleared.


Now let's run our model 2 GPUs instead by utilizing sharding, data parallism, and other optimization techniques. This way we essentially double our memory, and split the model (parameters) across both GPUs. 

> Note we'll dive more into detail for these later

In [6]:
import json
import importlib
import torch
import src.utils.model_utils as mutils
importlib.reload(mutils)

seq_len = 32
min_new_tokens = 1
world_size = 4

ds_config = {
    "train_micro_batch_size_per_gpu": 1,
    "gradient_accumulation_steps": 1,

    "fp16": { "enabled": True },
    
    # Ignore this for now
    "zero_optimization": {
        "stage": 3,
        "offload_param": {
            "device": "cpu",
            "pin_memory": True
        },
        "offload_optimizer": {
            "device": "cpu",
            "pin_memory": True
        },
        # optional perf tweaks:
        "contiguous_gradients": True,
        "overlap_comm": True
    },

    "optimizer": {
        "type": "AdamW",
        "params": {
            "lr": 3e-5,
            "betas": [0.9, 0.999],
            "eps": 1e-8,
            "weight_decay": 0.01
        }
    },
    
    # Splitting the model across 4 GPUs
    "tensor_parallel": {
        "enabled": True,
        "tp_size": world_size
    },

    "replace_with_kernel_inject": False,
    "enable_cuda_graph": False
}


results = mutils.run_distributed_benchmark(
    model_name="NousResearch/Meta-Llama-3.1-8B",
    seq_len=32,
    batch_sizes=[1],
    dtype=torch.bfloat16,
    sharding=True,
    world_size=4, # Number of GPUs
    ds_config=ds_config
)

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


[2025-05-21 20:40:37,741] [INFO] [real_accelerator.py:222:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2025-05-21 20:40:37,813] [INFO] [real_accelerator.py:222:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2025-05-21 20:40:37,869] [INFO] [real_accelerator.py:222:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2025-05-21 20:40:37,882] [INFO] [real_accelerator.py:222:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2025-05-21 20:40:39,510] [INFO] [comm.py:652:init_distributed] cdb=None
[2025-05-21 20:40:39,510] [INFO] [comm.py:683:init_distributed] Initializing TorchBackend in DeepSpeed with backend nccl
[2025-05-21 20:40:39,859] [INFO] [comm.py:652:init_distributed] cdb=None
[2025-05-21 20:40:39,865] [INFO] [comm.py:652:init_distributed] cdb=None
[2025-05-21 20:40:39,865] [INFO] [comm.py:652:init_distributed] cdb=None


Loading checkpoint shards: 100%|██████████| 4/4 [00:00<00:00,  8.58it/s]
Loading checkpoint shards: 100%|██████████| 4/4 [00:00<00:00,  8.40it/s]
Loading checkpoint shards: 100%|██████████| 4/4 [00:00<00:00,  7.10it/s]
Loading checkpoint shards: 100%|██████████| 4/4 [00:00<00:00,  7.14it/s]


[2025-05-21 20:40:42,826] [INFO] [config.py:733:__init__] Config mesh_device None world_size = 4
[2025-05-21 20:40:42,884] [INFO] [logging.py:128:log_dist] [Rank 0] DeepSpeed info: version=0.16.2, git-hash=unknown, git-branch=unknown
[2025-05-21 20:40:42,884] [INFO] [comm.py:677:init_distributed] Distributed backend already initialized
[2025-05-21 20:40:42,884] [INFO] [config.py:733:__init__] Config mesh_device None world_size = 4
[2025-05-21 20:40:43,096] [INFO] [config.py:733:__init__] Config mesh_device None world_size = 4
[2025-05-21 20:40:43,218] [INFO] [config.py:733:__init__] Config mesh_device None world_size = 4
[2025-05-21 20:40:43,525] [INFO] [logging.py:128:log_dist] [Rank 0] DeepSpeed Flops Profiler Enabled: False
Installed CUDA version 12.6 does not match the version torch was compiled with 12.4 but since the APIs are compatible, accepting this combination
Installed CUDA version 12.6 does not match the version torch was compiled with 12.4 but since the APIs are compatible

Using /home/ec2-user/.cache/torch_extensions/py312_cu124 as PyTorch extensions root...
Using /home/ec2-user/.cache/torch_extensions/py312_cu124 as PyTorch extensions root...
Using /home/ec2-user/.cache/torch_extensions/py312_cu124 as PyTorch extensions root...
Using /home/ec2-user/.cache/torch_extensions/py312_cu124 as PyTorch extensions root...
Emitting ninja build file /home/ec2-user/.cache/torch_extensions/py312_cu124/cpu_adam/build.ninja...
Building extension module cpu_adam...
Allowing ninja to set a default number of workers... (overridable by setting the environment variable MAX_JOBS=N)
Loading extension module cpu_adam...
Loading extension module cpu_adam...
Loading extension module cpu_adam...
Loading extension module cpu_adam...


[2025-05-21 20:40:47,172] [INFO] [utils.py:781:see_memory_usage] Stage 3 initialize beginning
[2025-05-21 20:40:47,173] [INFO] [utils.py:782:see_memory_usage] MA 14.96 GB         Max_MA 15.94 GB         CA 15.94 GB         Max_CA 16 GB 
[2025-05-21 20:40:47,173] [INFO] [utils.py:789:see_memory_usage] CPU Virtual Memory:  used = 9.09 GB, percent = 5.0%
[2025-05-21 20:40:47,174] [INFO] [stage3.py:168:__init__] Reduce bucket size 500000000
[2025-05-21 20:40:47,174] [INFO] [stage3.py:169:__init__] Prefetch bucket size 50000000
[2025-05-21 20:40:47,284] [INFO] [utils.py:781:see_memory_usage] DeepSpeedZeRoOffload initialize [begin]
[2025-05-21 20:40:47,285] [INFO] [utils.py:782:see_memory_usage] MA 14.96 GB         Max_MA 14.96 GB         CA 15.94 GB         Max_CA 16 GB 
[2025-05-21 20:40:47,285] [INFO] [utils.py:789:see_memory_usage] CPU Virtual Memory:  used = 9.09 GB, percent = 5.0%
[2025-05-21 20:40:47,288] [INFO] [config.py:733:__init__] Config mesh_device None world_size = 4
Parameter



Batch=1 | SeqLen=32
Elapsed GPU time: 14.0274s | TFLOP/s: 0.0 | AI: 22.85 FLOP/B
Batch=1 | SeqLen=32
Elapsed GPU time: 14.0298s | TFLOP/s: 0.0 | AI: 22.85 FLOP/B
Batch=1 | SeqLen=32
Elapsed GPU time: 14.0354s | TFLOP/s: 0.0 | AI: 22.85 FLOP/B
Batch=1 | SeqLen=32
Elapsed GPU time: 14.0382s | TFLOP/s: 0.0 | AI: 22.85 FLOP/B
--------- OUTPUT BREAKDOWN ---------
🧠 Tokens generated: 128
⚡ Throughput: 9.12155 tokens/sec
⏱️ Total time: 14.03270 sec
💸 Cost per 1M tokens: $36.84801
------------------------------------


Traceback (most recent call last):
  File "/usr/local/lib/python3.12/multiprocessing/util.py", line 303, in _run_finalizers
    finalizer()
  File "/usr/local/lib/python3.12/multiprocessing/util.py", line 227, in __call__
    res = self._callback(*self._args, **self._kwargs)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/multiprocessing/synchronize.py", line 87, in _cleanup
    sem_unlink(name)
FileNotFoundError: [Errno 2] No such file or directory
Traceback (most recent call last):
  File "/usr/local/lib/python3.12/multiprocessing/util.py", line 303, in _run_finalizers
    finalizer()
  File "/usr/local/lib/python3.12/multiprocessing/util.py", line 227, in __call__
    res = self._callback(*self._args, **self._kwargs)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/lib/python3.12/multiprocessing/synchronize.py", line 87, in _cleanup
    sem_unlink(name)
FileNotFoundError: [Errno 2] No such file or directory
Traceback (m

It takes about a minute on these GPUs to load the model into memory (one time). But once it finishes we can fit a much larger model into memory by utilizing mulitiple GPUs. And based on our last lab we can likely improve this by increasing the batch size as well.

So what's happening?
- We're splitting the model's weights across GPUs (sharding or tensor parallism), allowing us to fit a model with many more weights than we could on a single GPU
- We are introducing communication between these GPUs, or **collective communications**

> Note the library we're using in this case **Deepspeed** is likely doing additional optimizations

Next let's peel away the library and see what's happening under the hood with sharding, or splitting data across GPUs, and collectivs where the GPUs communicate

## Utilizing Multiple GPUs with GEMM

In this section we'll demonstrate how data is split across GPUs, and how communication is achieved. This is a very deep topic so we'll only be covering the surface so you understand what the libraries you're utilizing are doing unde the hood.

### In Practice
Let's go back to our GEMM calculation. This time let's use a much larger GEMM operation, and split it across our GPUs. First let's try to run a 26 GB matrix multiplication on a single GPU.

In [7]:
import torch
try:
    # Define matrix dimensions (e.g., 200_000 x 200_000 of float32 ~= 149 GB)
    # We'll cut this down to fit ~26 GB (e.g., 115_000 x 60_000 float32)
    rows, cols = 115_000, 60_000  # ~26 GB total

    A = torch.randn((rows, cols), dtype=torch.float32, device='cuda:0')

    # Matrix to multiply with (on each GPU, send it there)
    # Shape: [cols, 1024] -> Output shape will be [rows, 1024]
    B = torch.randn((cols, 1024), dtype=torch.float32, device='cuda:0')

    out = A @ B
except Exception as e:
    print(f"load failed: {e}")
    pass


load failed: CUDA out of memory. Tried to allocate 25.71 GiB. GPU 0 has a total capacity of 22.05 GiB of which 21.79 GiB is free. Including non-PyTorch memory, this process has 250.00 MiB memory in use. Of the allocated memory 8.12 MiB is allocated by PyTorch, and 11.88 MiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting PYTORCH_CUDA_ALLOC_CONF=expandable_segments:True to avoid fragmentation.  See documentation for Memory Management  (https://pytorch.org/docs/stable/notes/cuda.html#environment-variables)


Predictably this failed. Now let's manually device the matrix in half, and move each half to a seperate GPU.

In [8]:
import torch
import src.utils.model_utils as mutils
import importlib
import gc
importlib.reload(mutils)
torch.cuda.empty_cache()
torch.cuda.ipc_collect()
gc.collect()
# Assumes 2 GPUs available
assert torch.cuda.device_count() >= 2

# Simulate a large matrix that won't fit on one GPU
# We'll split it along the row dimension
half_rows = rows // 2

# Allocate on GPU 0
A0 = torch.randn((half_rows, cols), dtype=torch.float32, device='cuda:0')

# Allocate on GPU 1
A1 = torch.randn((rows - half_rows, cols), dtype=torch.float32, device='cuda:1')

# Matrix to multiply with (on each GPU, send it there)
# Shape: [cols, 1024] -> Output shape will be [rows, 1024]
B = torch.randn((cols, 1024), dtype=torch.float32)

# Send appropriate B chunks to GPUs
B0 = B.to('cuda:0')
B1 = B.to('cuda:1')

# Multiply independently on both GPUs
with torch.no_grad():
    out0 = A0 @ B0
    out1 = A1 @ B1

# Bring result back to CPU
out = torch.cat([out0.cpu(), out1.cpu()], dim=0)

del A0, A1, B0, B1, out0, out1

print(f"Output shape: {out.shape}")  # [115000, 1024]

torch.cuda.empty_cache()
torch.cuda.ipc_collect()
gc.collect()

Output shape: torch.Size([115000, 1024])


0

Great! As you can see we were able to process a massive matrix across 2 GPUs easily, by simply providing half of the data to one GPU and half to the other. You can think of this like splitting your weights across 2 GPUs, that is effectively what libraries like Deepspeed are doing. 

This works great for 1 calculation, but what happens when you need to use the output of this for the next calculation? This is how neural networks and transformers work, one output is used as the input for the next layer. In the case we did above we just write back to the CPU, but this can be slow, so we want to keep data on GPU and have these GPUs communicate.

This is where collectives come into play.

#### Collectives

Here we will take our matrices, split them across GPUs, and communicate these changes across those GPUs so they could perform the next calculation without going back to the CPU. 

We'll effectively be doing the same thing, but adding an "all_gather" step. This informs the GPUs to communicate the results of the GEMM to each other GPU.

In [9]:
import importlib
import src.utils.gemm_utils as gutils
importlib.reload(gutils)

gutils.distributed_gemm()

huggingface/tokenizers: The current process just got forked, after parallelism has already been used. Disabling parallelism to avoid deadlocks...
	- Avoid using `tokenizers` before the fork if possible
	- Explicitly set the environment variable TOKENIZERS_PARALLELISM=(true | false)


[Rank 0] Starting process on cuda:0
[Rank 1] Starting process on cuda:1
[Rank 1] Initialized NCCL process group with world_size=2
[Rank 0] Initialized NCCL process group with world_size=2
[Rank 1] Created local shard of A: shape=torch.Size([57500, 60000]) on cuda:1
[Rank 1] Allocated empty matrix B to receive broadcast: shape=torch.Size([60000, 1024]) on cuda:1
[Rank 0] Created local shard of A: shape=torch.Size([57500, 60000]) on cuda:0
[Rank 0] Created full matrix B: shape=torch.Size([60000, 1024]) on cuda:0
[Rank 0] Completed broadcast of B
[Rank 0] Performing matmul: A_local (torch.Size([57500, 60000])) @ B (torch.Size([60000, 1024]))
[Rank 1] Completed broadcast of B
[Rank 1] Performing matmul: A_local (torch.Size([57500, 60000])) @ B (torch.Size([60000, 1024]))
[Rank 1] Finished matmul. Output shape: torch.Size([57500, 1024]). Time: 0.683s
[Rank 1] Prepared buffers for all_gather
[Rank 1] Completed all_gather of local outputs
[Rank 0] Finished matmul. Output shape: torch.Size([57

[{'output_shape': torch.Size([115000, 1024]),
  'rows_per_rank': 57500,
  'device': 'cuda:0'}]

If you read through the results you should see the following:

Two processes are launched, one per GPU (cuda:0 and cuda:1). Each process initializes its own NCCL (GPU communications) communication context as part of a world of 2 ranks.
Both ranks independently allocate a shard of the large matrix A, each of shape [57500, 60000], representing half of the full input.

Rank 0 generates the shared weight matrix B with shape [60000, 1024], while Rank 1 allocates an empty buffer for B. Rank 0 then broadcasts B to Rank 1 so that both GPUs have the same weights.

Each rank performs matrix multiplication using its local A shard and the full B, producing an output tensor of shape [57500, 1024]. These operations take approximately 0.7 seconds on each GPU.

After local matmul, both ranks allocate output buffers and perform an all_gather, collecting the outputs from each rank. This results in a fully assembled output tensor of shape [115000, 1024] on both GPUs.

Finally, Rank 0 logs the output metadata, and both ranks cleanly shut down their distributed process groups.

It is highly recomended you read through [gemm_utils.py](../src/utils/gemm_utils.py), specifically the `_distributed_gemm_worker` function to gain a full understanding of what each GPU is running. This is effectively what pytorch, deepspeed, and most other libraries are using under the hood to break up and aggregate results from matrix multiplications.

> Note in practice you won't be writing these from scratch, however having an understanding of what these libraries are doing provide you capabilities to optimize your workload to a very deep level once you hit massive scale

## Conclusion

In this portion of the lab we learned how you can utilize sharding and collectives to allow your GPUs to collaborate. In the next portion of this lab, we will learn how common libraries utilize this technique at a high level to get optimal performance, and the different strategies that can be employed.