# HBM数据复制优化实验

## 参与人及时间

- 参与人：陈柳青，缪弘博
- 时间：2025-08-19

hbm数据复制基础代码

In [None]:
# sender.py
import ucp
import asyncio
import numpy as np
import cupy as cp

NUM_BLOCKS = 10000

async def sender(ep):
    for exp in range(0, 15):  # 128B to 2MB
        size = 128 * (2 ** exp)
        print(f"Transferring {NUM_BLOCKS} blocks of size {size} bytes")

        # 创建10K个block，一起搬运（可优化为cupy array view）
        gpu_data = cp.random.randint(0, 255, size=(NUM_BLOCKS, size), dtype=cp.uint8)

        for i in range(NUM_BLOCKS):
            await ep.send(gpu_data[i])
    await ep.close()

async def main():
    ep = await ucp.create_endpoint("receiver_ip", 13337)
    await sender(ep)

if __name__ == "__main__":
    asyncio.run(main())


In [None]:
# receiver.py
import ucp
import asyncio
import cupy as cp

NUM_BLOCKS = 10000

async def recv_handler(ep):
    for exp in range(0, 15):
        size = 128 * (2 ** exp)
        print(f"Receiving {NUM_BLOCKS} blocks of size {size} bytes")

        for i in range(NUM_BLOCKS):
            recv_buf = cp.empty(size, dtype=cp.uint8)
            await ep.recv(recv_buf)

    await ep.close()

async def main():
    listener = ucp.create_listener(recv_handler, port=13337)
    while True:
        await asyncio.sleep(1)

if __name__ == "__main__":
    asyncio.run(main())


优化一————减少调用次数	将多个小block拼接成1个大buffer，用offset映射回10k块。

优化二————保证内存 RDMA 注册成功，走 GPU DMA。

优化三————并行管道调度	多线程发送多个通道（多连接）提高带宽利用率。

优化四————动态调整批量大小：根据当前的传输状态动态调整 BATCH_SIZE，在网络负载较高时降低批量大小。

In [None]:
import ucp
import asyncio
import cupy as cp

NUM_BLOCKS = 10000
BATCH_SIZE = 100  # 每次发送的数据块数量

# 用于并行发送数据块的协程
async def sender(ep, gpu_data, start_idx, end_idx):
    for i in range(start_idx, end_idx):
        await ep.send(gpu_data[i])

async def main():
    # 创建多个端点连接
    ep = await ucp.create_endpoint("receiver_ip", 13337)

    for exp in range(0, 15):  # 128B to 2MB
        size = 128 * (2 ** exp)
        print(f"Transferring {NUM_BLOCKS} blocks of size {size} bytes")

        # 创建NUM_BLOCKS个数据块，采用gpu内存视图优化
        gpu_data = cp.random.randint(0, 255, size=(NUM_BLOCKS, size), dtype=cp.uint8)

        # 按批次发送数据
        tasks = []
        for i in range(0, NUM_BLOCKS, BATCH_SIZE):
            end_idx = min(i + BATCH_SIZE, NUM_BLOCKS)
            tasks.append(sender(ep, gpu_data, i, end_idx))

        # 等待所有批次完成
        await asyncio.gather(*tasks)

    await ep.close()

if __name__ == "__main__":
    asyncio.run(main())


In [None]:
import ucp
import asyncio
import cupy as cp

NUM_BLOCKS = 10000
BATCH_SIZE = 100  # 每次接收的数据块数量

async def recv_handler(ep):
    for exp in range(0, 15):
        size = 128 * (2 ** exp)
        print(f"Receiving {NUM_BLOCKS} blocks of size {size} bytes")

        # 批量接收数据
        for i in range(0, NUM_BLOCKS, BATCH_SIZE):
            tasks = []
            for j in range(i, min(i + BATCH_SIZE, NUM_BLOCKS)):
                recv_buf = cp.empty(size, dtype=cp.uint8)
                tasks.append(ep.recv(recv_buf))  # 异步接收
            # 等待所有任务完成
            await asyncio.gather(*tasks)

    await ep.close()

async def main():
    listener = ucp.create_listener(recv_handler, port=13337)
    while True:
        await asyncio.sleep(1)

if __name__ == "__main__":
    asyncio.run(main())


优化五————启用dma代码

In [None]:
import ucp
import asyncio
import cupy as cp

NUM_BLOCKS = 10000
BATCH_SIZE = 100  # 每次发送的数据块数量

# 用于并行发送数据块的协程
async def sender(ep, registered_gpu_data, start_idx, end_idx, size):
    for i in range(start_idx, end_idx):
        # 使用已注册内存块的 slice
        await ep.send(registered_gpu_data[i * size:(i + 1) * size])

async def main():
    ep = await ucp.create_endpoint("receiver_ip", 13337)

    for exp in range(0, 15):  # 128B to 2MB
        size = 128 * (2 ** exp)
        print(f"Transferring {NUM_BLOCKS} blocks of size {size} bytes")

        # 分配一整块 GPU 内存（连续）用于批量数据
        raw_gpu_buffer = cp.random.randint(0, 255, size=(NUM_BLOCKS * size), dtype=cp.uint8)

        # 注册 GPU 内存，启用 RDMA（重要）
        registered_gpu_buffer = ucp.register(raw_gpu_buffer)

        # 分批发送
        tasks = []
        for i in range(0, NUM_BLOCKS, BATCH_SIZE):
            end_idx = min(i + BATCH_SIZE, NUM_BLOCKS)
            tasks.append(sender(ep, registered_gpu_buffer, i, end_idx, size))

        await asyncio.gather(*tasks)

    await ep.close()

if __name__ == "__main__":
    asyncio.run(main())


In [None]:
import ucp
import asyncio
import cupy as cp

NUM_BLOCKS = 10000
BATCH_SIZE = 100  # 每次接收的数据块数量

async def recv_handler(ep):
    for exp in range(0, 15):
        size = 128 * (2 ** exp)
        print(f"Receiving {NUM_BLOCKS} blocks of size {size} bytes")

        # 分配整块 GPU 内存用于接收
        raw_gpu_buffer = cp.empty(NUM_BLOCKS * size, dtype=cp.uint8)

        # 注册接收缓冲区，确保走 GPU DMA
        registered_gpu_buffer = ucp.register(raw_gpu_buffer)

        for i in range(0, NUM_BLOCKS, BATCH_SIZE):
            tasks = []
            for j in range(i, min(i + BATCH_SIZE, NUM_BLOCKS)):
                offset = j * size
                recv_buf = registered_gpu_buffer[offset:offset + size]
                tasks.append(ep.recv(recv_buf))
            await asyncio.gather(*tasks)

    await ep.close()

async def main():
    listener = ucp.create_listener(recv_handler, port=13337)
    while True:
        await asyncio.sleep(1)

if __name__ == "__main__":
    asyncio.run(main())


优化六————  预分配最大尺寸的缓冲区并复用

In [None]:
import ucp
import asyncio
import cupy as cp

NUM_BLOCKS = 10000
BATCH_SIZE = 100  # 每次发送的数据块数量

# 最大块尺寸为2MB
MAX_BLOCK_SIZE = 128 * (2 ** 14)  # 
MAX_BUFFER_SIZE = NUM_BLOCKS * MAX_BLOCK_SIZE  # 最大总缓冲区大小

# 预分配一次最大尺寸的GPU缓冲区
raw_gpu_buffer = cp.empty(MAX_BUFFER_SIZE, dtype=cp.uint8)
# 仅注册一次
registered_gpu_buffer = ucp.register(raw_gpu_buffer)

# 用于并行发送数据块的协程
async def sender(ep, registered_gpu_data, start_idx, end_idx, size):
    for i in range(start_idx, end_idx):
        # 使用已注册内存块的 slice
        await ep.send(registered_gpu_data[i * size:(i + 1) * size])

async def main():
    ep = await ucp.create_endpoint("receiver_ip", 13337)

    for exp in range(0, 15):  # 128B to 2MB
        size = 128 * (2 ** exp)
        print(f"Transferring {NUM_BLOCKS} blocks of size {size} bytes")

        # 复用缓冲区
        current_size = NUM_BLOCKS * size
        raw_gpu_buffer[:current_size] = cp.random.randint(
            0, 255, size=current_size, dtype=cp.uint8
        )

        # 分批发送
        tasks = []
        for i in range(0, NUM_BLOCKS, BATCH_SIZE):
            end_idx = min(i + BATCH_SIZE, NUM_BLOCKS)
            tasks.append(sender(ep, registered_gpu_buffer, i, end_idx, size))

        await asyncio.gather(*tasks)

    await ep.close()

if __name__ == "__main__":
    asyncio.run(main())
    

In [None]:
import ucp
import asyncio
import cupy as cp

NUM_BLOCKS = 10000
BATCH_SIZE = 100  # 每次接收的数据块数量

# 与发送端保持一致的最大缓冲区配置
MAX_BLOCK_SIZE = 128 * (2 ** 14)  
MAX_BUFFER_SIZE = NUM_BLOCKS * MAX_BLOCK_SIZE  # 最大总缓冲区大小

# 预分配一次最大尺寸的GPU接收缓冲区
raw_gpu_buffer = cp.empty(MAX_BUFFER_SIZE, dtype=cp.uint8)
# 仅注册一次
registered_gpu_buffer = ucp.register(raw_gpu_buffer)

async def recv_handler(ep):
    for exp in range(0, 15):
        size = 128 * (2 ** exp)
        print(f"Receiving {NUM_BLOCKS} blocks of size {size} bytes")

        # 复用预分配的缓冲区
        current_size = NUM_BLOCKS * size
        current_buffer = registered_gpu_buffer[:current_size]

        for i in range(0, NUM_BLOCKS, BATCH_SIZE):
            tasks = []
            for j in range(i, min(i + BATCH_SIZE, NUM_BLOCKS)):
                offset = j * size
                recv_buf = current_buffer[offset:offset + size]
                tasks.append(ep.recv(recv_buf))
            await asyncio.gather(*tasks)

    await ep.close()

async def main():
    listener = ucp.create_listener(recv_handler, port=13337)
    while True:
        await asyncio.sleep(1)

if __name__ == "__main__":
    asyncio.run(main())
    

优化七————尝试使内存地址的对齐，避免UCX内部触发额外的拷贝

In [None]:
import ucp
import asyncio
import cupy as cp

NUM_BLOCKS = 10000
BATCH_SIZE = 100  # 每次发送的数据块数量


MAX_BLOCK_SIZE = 128 * (2 **14)  # 2MB
ALIGNMENT = 512  # RDMA内存对齐粒度

# 计算最大所需缓冲区大小
max_buffer_size = NUM_BLOCKS * MAX_BLOCK_SIZE
# 确保内存地址对齐
aligned_max_size = ((max_buffer_size + ALIGNMENT - 1) // ALIGNMENT) * ALIGNMENT

# 预分配一次最大尺寸的GPU缓冲区
raw_gpu_buffer = cp.empty(aligned_max_size, dtype=cp.uint8, order='C')
# 截取对齐后的有效区域
aligned_buffer = raw_gpu_buffer[:max_buffer_size]
# 仅注册一次
registered_gpu_buffer = ucp.register(aligned_buffer)

# 用于并行发送数据块的协程
async def sender(ep, registered_gpu_data, start_idx, end_idx, size):
    for i in range(start_idx, end_idx):
        # 使用已注册内存块的 slice
        await ep.send(registered_gpu_data[i * size:(i + 1) * size])

async def main():
    ep = await ucp.create_endpoint("receiver_ip", 13337)

    for exp in range(0, 15):  # 128B to 2MB
        size = 128 * (2** exp)
        print(f"Transferring {NUM_BLOCKS} blocks of size {size} bytes")

        # 复用缓冲区
        current_buffer_size = NUM_BLOCKS * size
        aligned_buffer[:current_buffer_size] = cp.random.randint(
            0, 255, size=current_buffer_size, dtype=cp.uint8
        )

        # 分批发送
        tasks = []
        for i in range(0, NUM_BLOCKS, BATCH_SIZE):
            end_idx = min(i + BATCH_SIZE, NUM_BLOCKS)
            tasks.append(sender(ep, registered_gpu_buffer, i, end_idx, size))

        await asyncio.gather(*tasks)

    await ep.close()

if __name__ == "__main__":
    asyncio.run(main())


In [None]:
import ucp
import asyncio
import cupy as cp

NUM_BLOCKS = 10000
BATCH_SIZE = 100  # 每次接收的数据块数量

# 与发送端保持一致的配置
MAX_BLOCK_SIZE = 128 * (2 ** 14)  # 2MB
ALIGNMENT = 512  # RDMA内存对齐粒度

# 计算最大所需缓冲区大小
max_buffer_size = NUM_BLOCKS * MAX_BLOCK_SIZE
aligned_max_size = ((max_buffer_size + ALIGNMENT - 1) // ALIGNMENT) * ALIGNMENT

# 预分配一次最大尺寸的GPU接收缓冲区
raw_gpu_buffer = cp.empty(aligned_max_size, dtype=cp.uint8, order='C')
# 截取对齐后的有效区域
aligned_buffer = raw_gpu_buffer[:max_buffer_size]
# 仅注册一次
registered_gpu_buffer = ucp.register(aligned_buffer)

async def recv_handler(ep):
    for exp in range(0, 15):
        size = 128 * (2 ** exp)
        print(f"Receiving {NUM_BLOCKS} blocks of size {size} bytes")

        # 复用预分配的缓冲区
        current_buffer_size = NUM_BLOCKS * size
        # 本次接收使用的缓冲区
        current_registered_buf = registered_gpu_buffer[:current_buffer_size]

        for i in range(0, NUM_BLOCKS, BATCH_SIZE):
            tasks = []
            for j in range(i, min(i + BATCH_SIZE, NUM_BLOCKS)):
                offset = j * size
                # 从复用的缓冲区中切片获取当前块的接收区域
                recv_buf = current_registered_buf[offset:offset + size]
                tasks.append(ep.recv(recv_buf))
            await asyncio.gather(*tasks)

    await ep.close()

async def main():
    listener = ucp.create_listener(recv_handler, port=13337)
    while True:
        await asyncio.sleep(1)

if __name__ == "__main__":
    asyncio.run(main())
    

添加计时用于调试

In [None]:
import ucp
import asyncio
import cupy as cp
import time  

NUM_BLOCKS = 10000
BATCH_SIZE = 100  # 每次发送的数据块数量


MAX_BLOCK_SIZE = 128 * (2 **14)  # 2MB
ALIGNMENT = 512  # RDMA内存对齐粒度

# 计算最大所需缓冲区大小
max_buffer_size = NUM_BLOCKS * MAX_BLOCK_SIZE
# 确保内存地址对齐
aligned_max_size = ((max_buffer_size + ALIGNMENT - 1) // ALIGNMENT) * ALIGNMENT

# 预分配一次最大尺寸的GPU缓冲区
raw_gpu_buffer = cp.empty(aligned_max_size, dtype=cp.uint8, order='C')
# 截取对齐后的有效区域
aligned_buffer = raw_gpu_buffer[:max_buffer_size]
# 仅注册一次
registered_gpu_buffer = ucp.register(aligned_buffer)

# 用于并行发送数据块的协程
async def sender(ep, registered_gpu_data, start_idx, end_idx, size):
    for i in range(start_idx, end_idx):
        await ep.send(registered_gpu_data[i * size:(i + 1) * size])

async def main():
    ep = await ucp.create_endpoint("receiver_ip", 13337)

    # 记录总传输任务开始时间
    total_start = time.perf_counter()

    for exp in range(0, 15):  # 128B to 2MB
        size = 128 * (2** exp)
        print(f"\n===== 开始传输 {NUM_BLOCKS} 个 {size}B 块 =====")

        # 复用缓冲区
        current_buffer_size = NUM_BLOCKS * size
        aligned_buffer[:current_buffer_size] = cp.random.randint(
            0, 255, size=current_buffer_size, dtype=cp.uint8
        )

        # 记录当前批次传输开始时间
        batch_start = time.perf_counter()

        # 分批发送
        tasks = []
        for i in range(0, NUM_BLOCKS, BATCH_SIZE):
            end_idx = min(i + BATCH_SIZE, NUM_BLOCKS)
            tasks.append(sender(ep, registered_gpu_buffer, i, end_idx, size))

        await asyncio.gather(*tasks)

        # 计算并打印当前批次耗时
        batch_end = time.perf_counter()
        batch_duration = batch_end - batch_start
        print(f"当前批次传输完成，耗时: {batch_duration:.4f} 秒")

    # 计算并打印总传输耗时
    total_end = time.perf_counter()
    total_duration = total_end - total_start
    print(f"\n===== 所有传输完成，总耗时: {total_duration:.4f} 秒 =====")

    await ep.close()

if __name__ == "__main__":
    asyncio.run(main())

In [None]:
import ucp
import asyncio
import cupy as cp
import time  

NUM_BLOCKS = 10000
BATCH_SIZE = 100  # 每次接收的数据块数量

# 与发送端保持一致的配置
MAX_BLOCK_SIZE = 128 * (2 ** 14)  # 2MB
ALIGNMENT = 512  # RDMA内存对齐粒度

# 计算最大所需缓冲区大小
max_buffer_size = NUM_BLOCKS * MAX_BLOCK_SIZE
aligned_max_size = ((max_buffer_size + ALIGNMENT - 1) // ALIGNMENT) * ALIGNMENT

# 预分配一次最大尺寸的GPU接收缓冲区
raw_gpu_buffer = cp.empty(aligned_max_size, dtype=cp.uint8, order='C')
# 截取对齐后的有效区域
aligned_buffer = raw_gpu_buffer[:max_buffer_size]
# 仅注册一次
registered_gpu_buffer = ucp.register(aligned_buffer)

async def recv_handler(ep):
    # 记录总接收任务开始时间
    total_start = time.perf_counter()

    for exp in range(0, 15):
        size = 128 * (2 ** exp)
        print(f"\n===== 开始接收 {NUM_BLOCKS} 个 {size}B 块 =====")

        # 复用预分配的缓冲区
        current_buffer_size = NUM_BLOCKS * size
        current_registered_buf = registered_gpu_buffer[:current_buffer_size]

        # 记录当前批次接收开始时间
        batch_start = time.perf_counter()

        # 分批接收
        for i in range(0, NUM_BLOCKS, BATCH_SIZE):
            tasks = []
            for j in range(i, min(i + BATCH_SIZE, NUM_BLOCKS)):
                offset = j * size
                recv_buf = current_registered_buf[offset:offset + size]
                tasks.append(ep.recv(recv_buf))
            await asyncio.gather(*tasks)

        # 计算并打印当前批次耗时
        batch_end = time.perf_counter()
        batch_duration = batch_end - batch_start
        print(f"当前批次接收完成，耗时: {batch_duration:.4f} 秒")

    # 计算并打印总接收耗时
    total_end = time.perf_counter()
    total_duration = total_end - total_start
    print(f"\n===== 所有接收完成，总耗时: {total_duration:.4f} 秒 =====")

    await ep.close()

async def main():
    listener = ucp.create_listener(recv_handler, port=13337)
    while True:
        await asyncio.sleep(1)

if __name__ == "__main__":
    asyncio.run(main())