In [1]:
from IPython.display import Image
import os
os.environ['http_proxy'] = 'http://127.0.0.1:7890'
os.environ['https_proxy'] = 'http://127.0.0.1:7890'

- references
    - https://tigress-web.princeton.edu/~jdh4/PyTorchPerformanceTuningGuide_GTC2021.pdf
        - https://pytorch.org/blog/optimizing-cuda-rnn-with-torchscript/
    - https://towardsdatascience.com/optimize-pytorch-performance-for-speed-and-memory-efficiency-2022-84f453916ea6
    - https://blog.dailydoseofds.com/p/memory-pinning-to-accelerate-model

In [2]:
import torch
import torch.jit
import timeit

### pin_memory & non_blocking (num_workers)

```
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
for epoch in range(epochs): 
    for batch_idx, (data, target) in enumerate(train_loader):
        
        data, target = data.to(device), target.to(device)
        
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()        
        optimizer.step()
```

standard PyTorch model training loop
- `data, target = data.to(device), target.to(device)` transfers the data to the GPU from the CPU.
- Everything executes on the GPU **after** the data transfer.

In [4]:
# synchronization
Image(url='https://substackcdn.com/image/fetch/w_1456,c_limit,f_webp,q_auto:good,fl_progressive:steep/https%3A%2F%2Fsubstack-post-media.s3.amazonaws.com%2Fpublic%2Fimages%2F5956919f-affc-4206-8296-03ff549476db_2160x780.png', width=500)

- When the model is being trained on the 1st mini-batch, the CPU can transfer the 2nd mini-batch to the GPU.
- This ensures that the GPU does not have to wait for the next mini-batch of data as soon as it completes processing an existing mini-batch.

In [5]:
Image(url='https://substackcdn.com/image/fetch/w_1456,c_limit,f_webp,q_auto:good,fl_progressive:steep/https%3A%2F%2Fsubstack-post-media.s3.amazonaws.com%2Fpublic%2Fimages%2Fdb51e307-211f-479e-a677-b6b8ad69fcfd_2370x780.png', width=500)

- While the CPU may remain idle, this process ensures that the GPU (which is the actual accelerator for our model training) always has data to work with.
    - 尽可能地避免 gpu 的 idle
- Formally, this process is known as **memory pinning**, and it is used to speed up the data transfer from the CPU to the GPU by making the training workflow asynchronous.
    - 内存的 swapping（交换）是指操作系统将物理内存中的数据临时存储到硬盘上的交换空间（**Swap Space**,，位于硬盘）或交换文件（Swap File）中的过程。当系统的物理内存（RAM）不足以容纳正在运行的所有程序和数据时，操作系统会将一部分当前不活跃的数据从内存中移出，写入交换空间，从而腾出物理内存供其他程序使用。
    - pin_memory 指的是将内存“锁页”（pin memory，vs. 可分页（pageable）虚拟内存），即将内存页固定在物理内存中，防止操作系统将其交换到磁盘（swapping）。这种锁页内存也被称为“页面锁定内存”或“固定内存”。
        - pinned memory，又称page-locked memory
        - unpinned memory，又称pageable memory
    - pinned memory is used as a staging area for transfers from the device to the host. We can avoid the cost of the transfer between pageable and pinned host arrays by directly allocating our host arrays in pinned memory.

In [14]:
# https://developer.nvidia.com/blog/how-optimize-data-transfers-cuda-cc/
Image(url='https://developer-blogs.nvidia.com/wp-content/uploads/2012/12/pinned-1024x541.jpg', width=500)

In [10]:
X = torch.randn(5)
X, X.is_pinned()

(tensor([ 0.8242,  0.5101,  1.0573, -0.3424,  1.5565]), False)

In [11]:
X = X.pin_memory()
X.is_pinned()

True

In [12]:
X = torch.randn(5, device='cuda')
print(X, X.is_pinned())
X = X.pin_memory()
print(X, X.is_pinned())

tensor([-1.5154, -1.1938, -0.9721,  0.0941, -0.5044], device='cuda:0') False


RuntimeError: cannot pin 'torch.cuda.FloatTensor' only dense CPU tensors can be pinned

- enable `pin_memory` and set num_workers (muti-core processors) for faster transfers
```
train_loader = DataLoader(train_dataset,
                          batch_size=64, shuffle=True,
                          pin_memory=True, num_workers=8)
```

- during the data transfer step in the training step, specify `non_blocking=True`, as depicted below:
    - non_blocking => gpu training on prev minibatch 时执行 cpu 上的 transfer
    - When the model is being trained on the 1st mini-batch, the CPU can transfer the 2nd mini-batch to the GPU.
    - This ensures that the GPU does not have to wait for the next mini-batch of data as soon as it completes processing an existing mini-batch.
```
for epoch in range(epochs):
    for batch_idx, (data, target) in enumerate(train_loader):
        
        data = data.to(device, non_blocking=True)
        target = target.to(device, non_blocking=True)
        
        optimizer.zero_grad()        
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
```

In [24]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import time
from tqdm.notebook import tqdm

In [7]:
# 定义数据转换
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))
])


In [10]:
# 加载 MNIST 数据集
train_dataset = datasets.MNIST(root='./data', train=True, download=True, transform=transform)

In [11]:
class SimpleNet(nn.Module):
    def __init__(self):
        super(SimpleNet, self).__init__()
        self.fc = nn.Sequential(
            nn.Flatten(),
            nn.Linear(28*28, 512),
            nn.ReLU(),
            nn.Linear(512, 10)
        )
    def forward(self, x):
        return self.fc(x)

In [12]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [25]:
def train(loader, model, optimizer, device, non_blocking=False, epochs=5):
    model.train()
    total_loss = 0
    start_time = time.time()
    for epoch in tqdm(range(epochs), desc="Epochs"):
        batch_bar = tqdm(enumerate(loader), total=len(loader), desc=f"Epoch {epoch+1}/{epochs}", leave=False)
        for batch_idx, (data, target) in batch_bar:
            # 将数据移到设备上
            data = data.to(device, non_blocking=non_blocking)
            target = target.to(device, non_blocking=non_blocking)
            
            optimizer.zero_grad()
            output = model(data)
            loss = nn.functional.cross_entropy(output, target)
            loss.backward()
            optimizer.step()
            
            total_loss += loss.item()
    end_time = time.time()
    avg_loss = total_loss / (len(loader)*epochs)
    elapsed_time = end_time - start_time
    return avg_loss, elapsed_time

In [26]:
from multiprocessing import cpu_count
loader1 = DataLoader(train_dataset, batch_size=64, shuffle=True)
loader2 = DataLoader(train_dataset, batch_size=64, shuffle=True, num_workers=cpu_count()//2, pin_memory=True)

In [27]:
model1 = SimpleNet().to(device)
optimizer1 = optim.SGD(model1.parameters(), lr=0.01)
avg_loss1, time1 = train(loader1, model1, optimizer1, device, non_blocking=False, epochs=5)
print(f"设置 1 - 平均损失: {avg_loss1:.4f}, 训练时间: {time1:.2f} 秒")

Epochs:   0%|          | 0/5 [00:00<?, ?it/s]

Epoch 1/5:   0%|          | 0/938 [00:00<?, ?it/s]

Epoch 2/5:   0%|          | 0/938 [00:00<?, ?it/s]

Epoch 3/5:   0%|          | 0/938 [00:00<?, ?it/s]

Epoch 4/5:   0%|          | 0/938 [00:00<?, ?it/s]

Epoch 5/5:   0%|          | 0/938 [00:00<?, ?it/s]

设置 1 - 平均损失: 0.3854, 训练时间: 42.71 秒


In [28]:
model2 = SimpleNet().to(device)
optimizer2 = optim.SGD(model2.parameters(), lr=0.01)
avg_loss2, time2 = train(loader2, model2, optimizer2, device, non_blocking=True, epochs=5)
print(f"设置 2 - 平均损失: {avg_loss2:.4f}, 训练时间: {time2:.2f} 秒")

Epochs:   0%|          | 0/5 [00:00<?, ?it/s]

Epoch 1/5:   0%|          | 0/938 [00:00<?, ?it/s]

Epoch 2/5:   0%|          | 0/938 [00:00<?, ?it/s]

Epoch 3/5:   0%|          | 0/938 [00:00<?, ?it/s]

Epoch 4/5:   0%|          | 0/938 [00:00<?, ?it/s]

Epoch 5/5:   0%|          | 0/938 [00:00<?, ?it/s]

设置 2 - 平均损失: 0.3847, 训练时间: 19.92 秒


#### transformers Trainer

- When using Trainer, the corresponding TrainingArguments are:
    - `dataloader_pin_memory` (True by default),
    - `dataloader_num_workers` (defaults to 0).
- non_blocking
    - 从 `trainer.py` 源码中来看似乎是从 accelerate_config 中设置的；??

### JIT（Just-In-Time compilation) ）

- JIT 通过将模型编译成中间表示（Intermediate Representation, IR），然后进一步将其转换为机器代码
- Fuse the pointwise (elementwise) operations into a single kernel by PyTorch JIT
    - JIT fuse the pointwise operations

In [13]:
# 创建一个大型的随机张量作为输入数据
x = torch.randn(15000, 15000)

# 使用 JIT 编译的函数
@torch.jit.script
def fused_gelu(x):
    return x * 0.5 * (1.0 + torch.erf(x / 1.41421))

# 未使用 JIT 编译的相同函数
def gelu(x):
    return x * 0.5 * (1.0 + torch.erf(x / 1.41421))

# 使用 timeit 测量 JIT 编译函数的执行时间
jit_time = timeit.timeit('fused_gelu(x)', globals=globals(), number=100)
nonjit_time = timeit.timeit('gelu(x)', globals=globals(), number=100)

print(jit_time, nonjit_time)

20.05574530499871 31.39065190600013
