Skip to content

Commit

Permalink
[embedding] non-blocking cpu-gpu copy (#1647)
Browse files Browse the repository at this point in the history
  • Loading branch information
feifeibear committed Sep 26, 2022
1 parent 0767f67 commit 0444360
Showing 1 changed file with 48 additions and 6 deletions.
54 changes: 48 additions & 6 deletions colossalai/nn/parallel/layers/cache_embedding/cache_mgr.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def __init__(
buffer_size: int = 0,
pin_weight: bool = False,
evict_strategy: EvictionStrategy = EvictionStrategy.DATASET,
async_copy: bool = False,
) -> None:
super(CachedParamMgr, self).__init__()
self.buffer_size = buffer_size
Expand All @@ -58,6 +59,11 @@ def __init__(

self._evict_strategy = evict_strategy

self._async_copy = async_copy

if self._async_copy:
print('use async copy')

if self._evict_strategy == EvictionStrategy.LFU:
# cache_row_idx -> frequency, freq of the cache rows.
# classic lfu cache. evict the minimal freq value row in cuda cache.
Expand Down Expand Up @@ -312,6 +318,18 @@ def _prepare_rows_on_cuda(self, cpu_row_idxs: torch.Tensor) -> None:
cpu_row_idxs (torch.Tensor): the rows to be placed on CUDA
"""
evict_num = cpu_row_idxs.numel() - self.cuda_available_row_num

cpu_row_idxs_copy = cpu_row_idxs.cpu()

# move evict in rows to gpu
if self._async_copy:
if self.buffer_size == 0:
rows_cpu = self.weight.view(self.num_embeddings, -1).index_select(0, cpu_row_idxs_copy).pin_memory()
evict_in_rows_gpu = torch.empty_like(rows_cpu, device=torch.cuda.current_device())
evict_in_rows_gpu.copy_(rows_cpu, non_blocking=True)
else:
raise NotImplemented

if evict_num > 0:
with Timer() as timer:
mask_cpu_row_idx = torch.isin(self.cached_idx_map, self.evict_backlist)
Expand All @@ -323,12 +341,24 @@ def _prepare_rows_on_cuda(self, cpu_row_idxs: torch.Tensor) -> None:
backup_idxs = self.cached_idx_map[mask_cpu_row_idx].clone()
self.cached_idx_map.index_fill_(0, invalid_idxs, -2)
evict_gpu_row_idxs = self._find_evict_gpu_idxs(evict_num)

# move evict out rows to cpu
if self._async_copy:
evict_out_rows_gpu = self.cuda_cached_weight.view(self.cuda_row_num,
-1).index_select(0, evict_gpu_row_idxs)
evict_out_rows_cpu = torch.empty_like(evict_out_rows_gpu, device='cpu', pin_memory=True)
evict_out_rows_cpu.copy_(evict_out_rows_gpu, non_blocking=True)
self.cached_idx_map.index_copy_(0, invalid_idxs, backup_idxs)

elif self._evict_strategy == EvictionStrategy.LFU:
backup_freqs = self.freq_cnter[invalid_idxs].clone()
self.freq_cnter.index_fill_(0, invalid_idxs, sys.maxsize)
evict_gpu_row_idxs = self._find_evict_gpu_idxs(evict_num)
if self._async_copy:
evict_out_rows_gpu = self.cuda_cached_weight.view(self.cuda_row_num,
-1).index_select(0, evict_gpu_row_idxs)
evict_out_rows_cpu = torch.empty_like(evict_out_rows_gpu, device='cpu', pin_memory=True)
evict_out_rows_cpu.copy_(evict_out_rows_gpu, non_blocking=True)
self.freq_cnter.index_copy_(0, invalid_idxs, backup_freqs)

evict_info = self.cached_idx_map[evict_gpu_row_idxs]
Expand All @@ -341,8 +371,13 @@ def _prepare_rows_on_cuda(self, cpu_row_idxs: torch.Tensor) -> None:
tgt=self.weight.view(self.num_embeddings, -1))
else:
# allocate tmp memory on CPU and copy rows on CUDA to CPU.
rows = self.cuda_cached_weight.view(self.cuda_row_num, -1).index_select(0, evict_gpu_row_idxs).cpu()
self.weight.view(self.num_embeddings, -1).index_copy_(0, evict_info.cpu(), rows)
# TODO async gpu -> cpu
if self._async_copy:
pass
else:
evict_out_rows_cpu = self.cuda_cached_weight.view(self.cuda_row_num,
-1).index_select(0, evict_gpu_row_idxs).cpu()
self.weight.view(self.num_embeddings, -1).index_copy_(0, evict_info.cpu(), evict_out_rows_cpu)

self.cached_idx_map.index_fill_(0, evict_gpu_row_idxs, -1)
self.inverted_cached_idx.index_fill_(0, evict_info, -1)
Expand All @@ -359,13 +394,20 @@ def _prepare_rows_on_cuda(self, cpu_row_idxs: torch.Tensor) -> None:
# Here also allocate extra memory on CUDA. #cpu_row_idxs
if self.buffer_size > 0:
self.limit_buff_index_copyer.index_copy(0,
src_index=cpu_row_idxs.cpu(),
src_index=cpu_row_idxs_copy,
tgt_index=slots,
src=self.weight.view(self.num_embeddings, -1),
tgt=self.cuda_cached_weight.view(self.cuda_row_num, -1))
else:
rows = self.weight.view(self.num_embeddings, -1).index_select(0, cpu_row_idxs.cpu()).cuda()
self.cuda_cached_weight.view(self.cuda_row_num, -1).index_copy_(0, slots, rows)
# TODO async copy cpu -> gpu
if self._async_copy:
pass
else:
evict_in_rows_gpu = self.weight.view(self.num_embeddings,
-1).index_select(0, cpu_row_idxs_copy).cuda()

self.cuda_cached_weight.view(self.cuda_row_num, -1).index_copy_(0, slots, evict_in_rows_gpu)

slot_offsets = slots
self.cached_idx_map[slots] = cpu_row_idxs
self.inverted_cached_idx.index_copy_(0, cpu_row_idxs, slot_offsets)
Expand Down Expand Up @@ -453,4 +495,4 @@ def _admit(self, row_id: int):
self._cuda_available_row_num -= 1

self._cpu_to_cuda_numel += self.embedding_dim
self._cpu_to_cuda_elpase += timer.elapsed
self._cpu_to_cuda_elpase += timer.elapsed

0 comments on commit 0444360

Please sign in to comment.