diff --git a/src/allocator.hpp b/src/allocator.hpp index 5c9d268d..ada0ce78 100644 --- a/src/allocator.hpp +++ b/src/allocator.hpp @@ -2,24 +2,47 @@ #define ALLOCATOR_HPP #include "infinicore_infer.h" +#include +#include +#include +#include +#include class AllocatorBase { public: virtual void *alloc(size_t size) = 0; virtual void release(void *ptr) = 0; + virtual ~AllocatorBase() = default; }; -class WorkspaceAllocator : public AllocatorBase { -private: - void *_memory; - size_t _total_size; - size_t _align; - +class MemoryPool : public AllocatorBase { public: - WorkspaceAllocator(size_t intial_size, size_t align = 256); - ~WorkspaceAllocator(); + MemoryPool(size_t initialSize = 0); + ~MemoryPool(); void *alloc(size_t size) override; void release(void *ptr) override; + +private: + struct Block { + void *base; + void *ptr; + size_t size; + bool is_free; + Block(void *b, void *p, size_t s, bool f) + : base(b), ptr(p), size(s), is_free(f) {} + bool operator<(const Block &other) const { + return ptr < other.ptr; + } + }; + + void *allocateNewRegion(size_t size); + void insertFreeBlock(Block &&block); + void tryCoalesce(const Block &block); + + std::vector _base_regions; + std::set _all_blocks; + std::multimap::iterator> _free_blocks; + std::unordered_map::iterator> _ptr_to_block; }; #endif diff --git a/src/allocator/memory_allocator.cpp b/src/allocator/memory_allocator.cpp new file mode 100644 index 00000000..41ea0aa8 --- /dev/null +++ b/src/allocator/memory_allocator.cpp @@ -0,0 +1,110 @@ +#include "../allocator.hpp" +#include "../utils.hpp" +#include +#include +#include + +MemoryPool::MemoryPool(size_t initialSize) { + allocateNewRegion(initialSize); +} + +MemoryPool::~MemoryPool() { + for (void *region : _base_regions) { + RUN_INFINI(infinirtFree(region)); + } +} + +void *MemoryPool::alloc(size_t size) { + auto it = _free_blocks.lower_bound(size); + if (it == _free_blocks.end()) { + allocateNewRegion(std::max(size, size_t(0))); + it = _free_blocks.lower_bound(size); + if (it == _free_blocks.end()) { + throw std::bad_alloc(); + } + } + + auto block_it = it->second; + Block block = *block_it; + _free_blocks.erase(it); + _all_blocks.erase(block_it); + + if (block.size > size + 256) { + // Split + void *alloc_ptr = block.ptr; + void *rem_ptr = static_cast(block.ptr) + size; + size_t rem_size = block.size - size; + Block alloc_block(block.base, alloc_ptr, size, false); + Block rem_block(block.base, rem_ptr, rem_size, true); + auto alloc_it = _all_blocks.insert(alloc_block).first; + auto rem_it = _all_blocks.insert(rem_block).first; + _free_blocks.emplace(rem_size, rem_it); + _ptr_to_block[alloc_ptr] = alloc_it; + return alloc_ptr; + } else { + // No split + block.is_free = false; + auto alloc_it = _all_blocks.insert(block).first; + _ptr_to_block[block.ptr] = alloc_it; + return block.ptr; + } +} + +void MemoryPool::release(void *ptr) { + auto it = _ptr_to_block.find(ptr); + if (it == _ptr_to_block.end()) { + throw std::runtime_error("Invalid pointer to free"); + } + + auto block_it = it->second; + Block block = *block_it; + _all_blocks.erase(block_it); + block.is_free = true; + auto new_it = _all_blocks.insert(block).first; + _ptr_to_block.erase(ptr); + tryCoalesce(*new_it); +} + +void *MemoryPool::allocateNewRegion(size_t size) { + void *ptr = nullptr; + RUN_INFINI(infinirtMalloc(&ptr, size)); + _base_regions.push_back(ptr); + Block new_block(ptr, ptr, size, true); + auto it = _all_blocks.insert(new_block).first; + _free_blocks.emplace(size, it); + return ptr; +} + +void MemoryPool::tryCoalesce(const Block &block) { + auto it = _all_blocks.find(block); + if (it == _all_blocks.end()) { + return; + } + + Block merged = *it; + auto next = std::next(it); + auto prev = (it == _all_blocks.begin()) ? _all_blocks.end() : std::prev(it); + + _all_blocks.erase(it); + _free_blocks.erase(merged.size); + + // Coalesce with next + if (next != _all_blocks.end() && next->is_free && static_cast(merged.ptr) + merged.size == next->ptr) { + _free_blocks.erase(next->size); + merged.size += next->size; + _all_blocks.erase(next); + } + + // Coalesce with prev + if (prev != _all_blocks.end() && prev->is_free && static_cast(prev->ptr) + prev->size == merged.ptr) { + _free_blocks.erase(prev->size); + merged.ptr = prev->ptr; + merged.size += prev->size; + merged.base = prev->base; + _all_blocks.erase(prev); + } + + merged.is_free = true; + auto new_it = _all_blocks.insert(merged).first; + _free_blocks.emplace(merged.size, new_it); +} diff --git a/src/allocator/workspace_allocator.cpp b/src/allocator/workspace_allocator.cpp deleted file mode 100644 index 429aba96..00000000 --- a/src/allocator/workspace_allocator.cpp +++ /dev/null @@ -1,45 +0,0 @@ -#include "../allocator.hpp" - -#include "../utils.hpp" - -inline size_t aligned_size(size_t size_, size_t align) { - return (size_ + align - 1) & ~(align - 1); -} - -inline void *allocate(size_t size_) { - void *ptr; - RUN_INFINI(infinirtMalloc(&ptr, size_)); - return ptr; -} - -WorkspaceAllocator::WorkspaceAllocator(size_t initial_size_, size_t align) { - _align = align; - _total_size = 0; - _memory = nullptr; - if (initial_size_ > 0) { - _total_size = aligned_size(initial_size_, _align); - _memory = allocate(_total_size); - } -} - -void *WorkspaceAllocator::alloc(size_t new_size) { - if (_total_size < new_size) { - if (_total_size != 0) { - RUN_INFINI(infinirtDeviceSynchronize()); - RUN_INFINI(infinirtFree(_memory)); - } - _total_size = aligned_size(new_size, _align); - _memory = allocate(_total_size); - } - return _memory; -} - -void WorkspaceAllocator::release(void *ptr) { -} - -WorkspaceAllocator::~WorkspaceAllocator() { - if (_memory != nullptr) { - RUN_INFINI(infinirtDeviceSynchronize()); - RUN_INFINI(infinirtFree(_memory)); - } -} \ No newline at end of file diff --git a/src/models/jiuge/jiuge.cpp b/src/models/jiuge/jiuge.cpp index 39ffc2c9..d3556245 100644 --- a/src/models/jiuge/jiuge.cpp +++ b/src/models/jiuge/jiuge.cpp @@ -41,6 +41,8 @@ void createDeviceResource(DeviceResource *rsrc, const JiugeMeta *meta, getFFNDown(meta, weights, layer, idev, ndev)); } + auto memory_pool = std::make_shared(128 * 1024 * 1024); + *rsrc = DeviceResource{ device, dev_id, @@ -59,7 +61,7 @@ void createDeviceResource(DeviceResource *rsrc, const JiugeMeta *meta, w_ffn_down, stream, comm, - std::make_unique(0), + memory_pool, }; RUN_INFINI(infinirtDeviceSynchronize()); } @@ -100,7 +102,6 @@ void releaseDeviceResource(DeviceResource &res) { t.reset(); } res.w_ffn_down.clear(); - res.workspace_allocator.reset(); infiniopDestroyHandle(res.handle); res.handle = nullptr; infinirtStreamDestroy(res.stream); @@ -130,13 +131,13 @@ void inferDeviceBatch(const JiugeMeta &meta, DeviceResource &rsrc, bool has_qkv_bias = rsrc.b_attn_qkv.size() > 0; // Allocate buffers - auto logits_in = Tensor::buffer(dt_logits, {ntok, d}, stream); - auto logits_out = Tensor::buffer(dt_logits, {ntok, d}, stream); - auto qkv_buf = Tensor::buffer(dt_logits, {ntok, (nh + nkvh * 2) * dh}, stream); - auto gate_up_buf = Tensor::buffer(dt_logits, {ntok, 2 * di}, stream); - auto o_buf = Tensor::buffer(dt_logits, {ntok, nh * dh}, stream); - auto prob_buf = Tensor::buffer(dt_logits, {nreq, dvoc}, stream); - auto result_buf = Tensor::buffer(INFINI_DTYPE_I64, {nreq}, stream); + auto logits_in = Tensor::buffer(dt_logits, {ntok, d}, rsrc.memory_pool); + auto logits_out = Tensor::buffer(dt_logits, {ntok, d}, rsrc.memory_pool); + auto qkv_buf = Tensor::buffer(dt_logits, {ntok, (nh + nkvh * 2) * dh}, rsrc.memory_pool); + auto gate_up_buf = Tensor::buffer(dt_logits, {ntok, 2 * di}, rsrc.memory_pool); + auto o_buf = Tensor::buffer(dt_logits, {ntok, nh * dh}, rsrc.memory_pool); + auto prob_buf = Tensor::buffer(dt_logits, {nreq, dvoc}, rsrc.memory_pool); + auto result_buf = Tensor::buffer(INFINI_DTYPE_I64, {nreq}, rsrc.memory_pool); auto result_cpu = std::vector(nreq); // Prepare inputs @@ -153,7 +154,7 @@ void inferDeviceBatch(const JiugeMeta &meta, DeviceResource &rsrc, if (rsrc.device == INFINI_DEVICE_CPU) { pos_ids_buf = Tensor::weight(batch_pos_ids.data(), INFINI_DTYPE_U32, {ntok}); } else { - pos_ids_buf = Tensor::buffer(INFINI_DTYPE_U32, {ntok}, stream); + pos_ids_buf = Tensor::buffer(INFINI_DTYPE_U32, {ntok}, rsrc.memory_pool); RUN_INFINI(infinirtMemcpyAsync(pos_ids_buf->data(), batch_pos_ids.data(), sizeof(uint32_t) * ntok, INFINIRT_MEMCPY_H2D, stream)); } @@ -164,7 +165,6 @@ void inferDeviceBatch(const JiugeMeta &meta, DeviceResource &rsrc, } // Prepare operators and workspace - void *workspace; size_t workspace_size = 0, temp_size = 0; // attn & mlp rmsnorm infiniopRMSNormDescriptor_t desc_norm; @@ -270,9 +270,9 @@ void inferDeviceBatch(const JiugeMeta &meta, DeviceResource &rsrc, token_offset += seq_len; } - auto qk_buf = Tensor::buffer(dt_logits, {nh, max_qk_size}, stream); - auto rearrange_q_buf = Tensor::buffer(dt_logits, {nkvh, ngroup * max_seq_len, dh}, stream); - auto attn_val_buf = Tensor::buffer(dt_logits, {nh, max_seq_len, dh}, stream); + auto qk_buf = Tensor::buffer(dt_logits, {nh, max_qk_size}, rsrc.memory_pool); + auto rearrange_q_buf = Tensor::buffer(dt_logits, {nkvh, ngroup * max_seq_len, dh}, rsrc.memory_pool); + auto attn_val_buf = Tensor::buffer(dt_logits, {nh, max_seq_len, dh}, rsrc.memory_pool); // MLP descriptors infiniopGemmDescriptor_t desc_ffn_gate_up, desc_ffn_down; @@ -317,7 +317,8 @@ void inferDeviceBatch(const JiugeMeta &meta, DeviceResource &rsrc, RUN_INFINI(infiniopGetRandomSampleWorkspaceSize(desc_sample, &temp_size)); workspace_size = std::max(workspace_size, temp_size); // Allocate workspace - workspace = rsrc.workspace_allocator->alloc(workspace_size); + std::shared_ptr workspace_storage = Storage::createFromPool(workspace_size, rsrc.memory_pool); + void *workspace = workspace_storage->memory; // Compute for (uint32_t layer = 0; layer < nlayer; layer++) { diff --git a/src/models/jiuge/jiuge_impl.hpp b/src/models/jiuge/jiuge_impl.hpp index 8e1c1135..86526c89 100644 --- a/src/models/jiuge/jiuge_impl.hpp +++ b/src/models/jiuge/jiuge_impl.hpp @@ -27,7 +27,7 @@ struct DeviceResource { // Communicator infinicclComm_t comm; - std::unique_ptr workspace_allocator; + std::shared_ptr memory_pool; }; struct InferState { diff --git a/src/tensor.hpp b/src/tensor.hpp index 93b4050c..83b90196 100644 --- a/src/tensor.hpp +++ b/src/tensor.hpp @@ -1,20 +1,24 @@ #ifndef INFER_TENSOR_H #define INFER_TENSOR_H +#include "allocator.hpp" #include "infinicore_infer.h" #include "utils.hpp" #include #include #include -struct Storage { +class Storage { +public: void *memory; size_t size; infiniDevice_t device_type; int device_id; + std::shared_ptr memory_pool; static std::shared_ptr create(size_t size); static std::shared_ptr createAsync(size_t size, infinirtStream_t stream = nullptr); + static std::shared_ptr createFromPool(size_t size, std::shared_ptr pool = nullptr); static std::shared_ptr createHost(size_t size); ~Storage(); }; @@ -68,7 +72,7 @@ class Tensor : public std::enable_shared_from_this { public: static std::shared_ptr buffer(infiniDtype_t dtype, const std::vector &shape, - infinirtStream_t stream = nullptr); + std::shared_ptr pool = nullptr); static std::shared_ptr weight(void *host_data, infiniDtype_t dtype, const std::vector &shape); diff --git a/src/tensor/strorage.cpp b/src/tensor/strorage.cpp index 73e4fabf..6d5574e9 100644 --- a/src/tensor/strorage.cpp +++ b/src/tensor/strorage.cpp @@ -1,3 +1,4 @@ +#include "../allocator.hpp" #include "../tensor.hpp" std::shared_ptr Storage::create(size_t size) { @@ -16,12 +17,26 @@ std::shared_ptr Storage::createAsync(size_t size, infinirtStream_t stre return storage; } +std::shared_ptr Storage::createFromPool(size_t size, std::shared_ptr pool) { + auto storage = std::make_shared(); + storage->memory_pool = pool; + if (pool) { + storage->memory = pool->alloc(size); + } else { + RUN_INFINI(infinirtMalloc(&storage->memory, size)); + } + storage->size = size; + RUN_INFINI(infinirtGetDevice(&storage->device_type, &storage->device_id)); + return storage; +} + std::shared_ptr Storage::createHost(size_t size) { auto storage = std::make_shared(); RUN_INFINI(infinirtMallocHost(&storage->memory, size)); storage->size = size; storage->device_type = INFINI_DEVICE_CPU; storage->device_id = 0; + storage->memory_pool = nullptr; // No pool for host memory return storage; } @@ -29,6 +44,10 @@ Storage::~Storage() { if (device_type == INFINI_DEVICE_CPU) { RUN_INFINI(infinirtFreeHost(memory)); } else { - RUN_INFINI(infinirtFree(memory)); + if (memory_pool) { + memory_pool->release(memory); + } else { + RUN_INFINI(infinirtFree(memory)); + } } } diff --git a/src/tensor/tensor.cpp b/src/tensor/tensor.cpp index 8e15682c..9ff07022 100644 --- a/src/tensor/tensor.cpp +++ b/src/tensor/tensor.cpp @@ -68,7 +68,7 @@ std::shared_ptr Tensor::desc() const { return TensorDesc::create(thi std::shared_ptr Tensor::buffer(infiniDtype_t dtype, const std::vector &shape, - infinirtStream_t stream) { + std::shared_ptr pool) { std::shared_ptr tensor = std::make_shared(); tensor->_dtype = dtype; auto ndim = shape.size(); @@ -83,7 +83,7 @@ std::shared_ptr Tensor::buffer(infiniDtype_t dtype, } } tensor->_strides = strides; - tensor->_storage = Storage::createAsync(size, stream); + tensor->_storage = Storage::createFromPool(size, pool); tensor->_data = tensor->_storage->memory; infiniopCreateTensorDescriptor(&tensor->_desc, ndim, tensor->_shape.data(), strides.data(), dtype);