Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 31 additions & 8 deletions src/allocator.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,47 @@
#define ALLOCATOR_HPP

#include "infinicore_infer.h"
#include <map>
#include <memory>
#include <set>
#include <unordered_map>
#include <vector>

class AllocatorBase {
public:
virtual void *alloc(size_t size) = 0;
virtual void release(void *ptr) = 0;
virtual ~AllocatorBase() = default;
};

class WorkspaceAllocator : public AllocatorBase {
private:
void *_memory;
size_t _total_size;
size_t _align;

class MemoryPool : public AllocatorBase {
public:
WorkspaceAllocator(size_t intial_size, size_t align = 256);
~WorkspaceAllocator();
MemoryPool(size_t initialSize = 0);
~MemoryPool();
void *alloc(size_t size) override;
void release(void *ptr) override;

private:
struct Block {
void *base;
void *ptr;
size_t size;
bool is_free;
Block(void *b, void *p, size_t s, bool f)
: base(b), ptr(p), size(s), is_free(f) {}
bool operator<(const Block &other) const {
return ptr < other.ptr;
}
};

void *allocateNewRegion(size_t size);
void insertFreeBlock(Block &&block);
void tryCoalesce(const Block &block);

std::vector<void *> _base_regions;
std::set<Block> _all_blocks;
std::multimap<size_t, std::set<Block>::iterator> _free_blocks;
std::unordered_map<void *, std::set<Block>::iterator> _ptr_to_block;
};

#endif
110 changes: 110 additions & 0 deletions src/allocator/memory_allocator.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
#include "../allocator.hpp"
#include "../utils.hpp"
#include <algorithm>
#include <iostream>
#include <stdexcept>

MemoryPool::MemoryPool(size_t initialSize) {
allocateNewRegion(initialSize);
}

MemoryPool::~MemoryPool() {
for (void *region : _base_regions) {
RUN_INFINI(infinirtFree(region));
}
}

void *MemoryPool::alloc(size_t size) {
auto it = _free_blocks.lower_bound(size);
if (it == _free_blocks.end()) {
allocateNewRegion(std::max(size, size_t(0)));
it = _free_blocks.lower_bound(size);
if (it == _free_blocks.end()) {
throw std::bad_alloc();
}
}

auto block_it = it->second;
Block block = *block_it;
_free_blocks.erase(it);
_all_blocks.erase(block_it);

if (block.size > size + 256) {
// Split
void *alloc_ptr = block.ptr;
void *rem_ptr = static_cast<char *>(block.ptr) + size;
size_t rem_size = block.size - size;
Block alloc_block(block.base, alloc_ptr, size, false);
Block rem_block(block.base, rem_ptr, rem_size, true);
auto alloc_it = _all_blocks.insert(alloc_block).first;
auto rem_it = _all_blocks.insert(rem_block).first;
_free_blocks.emplace(rem_size, rem_it);
_ptr_to_block[alloc_ptr] = alloc_it;
return alloc_ptr;
} else {
// No split
block.is_free = false;
auto alloc_it = _all_blocks.insert(block).first;
_ptr_to_block[block.ptr] = alloc_it;
return block.ptr;
}
}

void MemoryPool::release(void *ptr) {
auto it = _ptr_to_block.find(ptr);
if (it == _ptr_to_block.end()) {
throw std::runtime_error("Invalid pointer to free");
}

auto block_it = it->second;
Block block = *block_it;
_all_blocks.erase(block_it);
block.is_free = true;
auto new_it = _all_blocks.insert(block).first;
_ptr_to_block.erase(ptr);
tryCoalesce(*new_it);
}

void *MemoryPool::allocateNewRegion(size_t size) {
void *ptr = nullptr;
RUN_INFINI(infinirtMalloc(&ptr, size));
_base_regions.push_back(ptr);
Block new_block(ptr, ptr, size, true);
auto it = _all_blocks.insert(new_block).first;
_free_blocks.emplace(size, it);
return ptr;
}

void MemoryPool::tryCoalesce(const Block &block) {
auto it = _all_blocks.find(block);
if (it == _all_blocks.end()) {
return;
}

Block merged = *it;
auto next = std::next(it);
auto prev = (it == _all_blocks.begin()) ? _all_blocks.end() : std::prev(it);

_all_blocks.erase(it);
_free_blocks.erase(merged.size);

// Coalesce with next
if (next != _all_blocks.end() && next->is_free && static_cast<char *>(merged.ptr) + merged.size == next->ptr) {
_free_blocks.erase(next->size);
merged.size += next->size;
_all_blocks.erase(next);
}

// Coalesce with prev
if (prev != _all_blocks.end() && prev->is_free && static_cast<char *>(prev->ptr) + prev->size == merged.ptr) {
_free_blocks.erase(prev->size);
merged.ptr = prev->ptr;
merged.size += prev->size;
merged.base = prev->base;
_all_blocks.erase(prev);
}

merged.is_free = true;
auto new_it = _all_blocks.insert(merged).first;
_free_blocks.emplace(merged.size, new_it);
}
45 changes: 0 additions & 45 deletions src/allocator/workspace_allocator.cpp

This file was deleted.

31 changes: 16 additions & 15 deletions src/models/jiuge/jiuge.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ void createDeviceResource(DeviceResource *rsrc, const JiugeMeta *meta,
getFFNDown(meta, weights, layer, idev, ndev));
}

auto memory_pool = std::make_shared<MemoryPool>(128 * 1024 * 1024);

*rsrc = DeviceResource{
device,
dev_id,
Expand All @@ -59,7 +61,7 @@ void createDeviceResource(DeviceResource *rsrc, const JiugeMeta *meta,
w_ffn_down,
stream,
comm,
std::make_unique<WorkspaceAllocator>(0),
memory_pool,
};
RUN_INFINI(infinirtDeviceSynchronize());
}
Expand Down Expand Up @@ -100,7 +102,6 @@ void releaseDeviceResource(DeviceResource &res) {
t.reset();
}
res.w_ffn_down.clear();
res.workspace_allocator.reset();
infiniopDestroyHandle(res.handle);
res.handle = nullptr;
infinirtStreamDestroy(res.stream);
Expand Down Expand Up @@ -130,13 +131,13 @@ void inferDeviceBatch(const JiugeMeta &meta, DeviceResource &rsrc,
bool has_qkv_bias = rsrc.b_attn_qkv.size() > 0;

// Allocate buffers
auto logits_in = Tensor::buffer(dt_logits, {ntok, d}, stream);
auto logits_out = Tensor::buffer(dt_logits, {ntok, d}, stream);
auto qkv_buf = Tensor::buffer(dt_logits, {ntok, (nh + nkvh * 2) * dh}, stream);
auto gate_up_buf = Tensor::buffer(dt_logits, {ntok, 2 * di}, stream);
auto o_buf = Tensor::buffer(dt_logits, {ntok, nh * dh}, stream);
auto prob_buf = Tensor::buffer(dt_logits, {nreq, dvoc}, stream);
auto result_buf = Tensor::buffer(INFINI_DTYPE_I64, {nreq}, stream);
auto logits_in = Tensor::buffer(dt_logits, {ntok, d}, rsrc.memory_pool);
auto logits_out = Tensor::buffer(dt_logits, {ntok, d}, rsrc.memory_pool);
auto qkv_buf = Tensor::buffer(dt_logits, {ntok, (nh + nkvh * 2) * dh}, rsrc.memory_pool);
auto gate_up_buf = Tensor::buffer(dt_logits, {ntok, 2 * di}, rsrc.memory_pool);
auto o_buf = Tensor::buffer(dt_logits, {ntok, nh * dh}, rsrc.memory_pool);
auto prob_buf = Tensor::buffer(dt_logits, {nreq, dvoc}, rsrc.memory_pool);
auto result_buf = Tensor::buffer(INFINI_DTYPE_I64, {nreq}, rsrc.memory_pool);
auto result_cpu = std::vector<int64_t>(nreq);

// Prepare inputs
Expand All @@ -153,7 +154,7 @@ void inferDeviceBatch(const JiugeMeta &meta, DeviceResource &rsrc,
if (rsrc.device == INFINI_DEVICE_CPU) {
pos_ids_buf = Tensor::weight(batch_pos_ids.data(), INFINI_DTYPE_U32, {ntok});
} else {
pos_ids_buf = Tensor::buffer(INFINI_DTYPE_U32, {ntok}, stream);
pos_ids_buf = Tensor::buffer(INFINI_DTYPE_U32, {ntok}, rsrc.memory_pool);
RUN_INFINI(infinirtMemcpyAsync(pos_ids_buf->data(), batch_pos_ids.data(), sizeof(uint32_t) * ntok,
INFINIRT_MEMCPY_H2D, stream));
}
Expand All @@ -164,7 +165,6 @@ void inferDeviceBatch(const JiugeMeta &meta, DeviceResource &rsrc,
}

// Prepare operators and workspace
void *workspace;
size_t workspace_size = 0, temp_size = 0;
// attn & mlp rmsnorm
infiniopRMSNormDescriptor_t desc_norm;
Expand Down Expand Up @@ -270,9 +270,9 @@ void inferDeviceBatch(const JiugeMeta &meta, DeviceResource &rsrc,

token_offset += seq_len;
}
auto qk_buf = Tensor::buffer(dt_logits, {nh, max_qk_size}, stream);
auto rearrange_q_buf = Tensor::buffer(dt_logits, {nkvh, ngroup * max_seq_len, dh}, stream);
auto attn_val_buf = Tensor::buffer(dt_logits, {nh, max_seq_len, dh}, stream);
auto qk_buf = Tensor::buffer(dt_logits, {nh, max_qk_size}, rsrc.memory_pool);
auto rearrange_q_buf = Tensor::buffer(dt_logits, {nkvh, ngroup * max_seq_len, dh}, rsrc.memory_pool);
auto attn_val_buf = Tensor::buffer(dt_logits, {nh, max_seq_len, dh}, rsrc.memory_pool);

// MLP descriptors
infiniopGemmDescriptor_t desc_ffn_gate_up, desc_ffn_down;
Expand Down Expand Up @@ -317,7 +317,8 @@ void inferDeviceBatch(const JiugeMeta &meta, DeviceResource &rsrc,
RUN_INFINI(infiniopGetRandomSampleWorkspaceSize(desc_sample, &temp_size));
workspace_size = std::max(workspace_size, temp_size);
// Allocate workspace
workspace = rsrc.workspace_allocator->alloc(workspace_size);
std::shared_ptr<Storage> workspace_storage = Storage::createFromPool(workspace_size, rsrc.memory_pool);
void *workspace = workspace_storage->memory;

// Compute
for (uint32_t layer = 0; layer < nlayer; layer++) {
Expand Down
2 changes: 1 addition & 1 deletion src/models/jiuge/jiuge_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ struct DeviceResource {
// Communicator
infinicclComm_t comm;

std::unique_ptr<WorkspaceAllocator> workspace_allocator;
std::shared_ptr<MemoryPool> memory_pool;
};

struct InferState {
Expand Down
8 changes: 6 additions & 2 deletions src/tensor.hpp
Original file line number Diff line number Diff line change
@@ -1,20 +1,24 @@
#ifndef INFER_TENSOR_H
#define INFER_TENSOR_H

#include "allocator.hpp"
#include "infinicore_infer.h"
#include "utils.hpp"
#include <memory>
#include <string>
#include <vector>

struct Storage {
class Storage {
public:
void *memory;
size_t size;
infiniDevice_t device_type;
int device_id;
std::shared_ptr<MemoryPool> memory_pool;

static std::shared_ptr<Storage> create(size_t size);
static std::shared_ptr<Storage> createAsync(size_t size, infinirtStream_t stream = nullptr);
static std::shared_ptr<Storage> createFromPool(size_t size, std::shared_ptr<MemoryPool> pool = nullptr);
static std::shared_ptr<Storage> createHost(size_t size);
~Storage();
};
Expand Down Expand Up @@ -68,7 +72,7 @@ class Tensor : public std::enable_shared_from_this<Tensor> {
public:
static std::shared_ptr<Tensor> buffer(infiniDtype_t dtype,
const std::vector<size_t> &shape,
infinirtStream_t stream = nullptr);
std::shared_ptr<MemoryPool> pool = nullptr);
static std::shared_ptr<Tensor> weight(void *host_data,
infiniDtype_t dtype,
const std::vector<size_t> &shape);
Expand Down
21 changes: 20 additions & 1 deletion src/tensor/strorage.cpp
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
#include "../allocator.hpp"
#include "../tensor.hpp"

std::shared_ptr<Storage> Storage::create(size_t size) {
Expand All @@ -16,19 +17,37 @@ std::shared_ptr<Storage> Storage::createAsync(size_t size, infinirtStream_t stre
return storage;
}

std::shared_ptr<Storage> Storage::createFromPool(size_t size, std::shared_ptr<MemoryPool> pool) {
auto storage = std::make_shared<Storage>();
storage->memory_pool = pool;
if (pool) {
storage->memory = pool->alloc(size);
} else {
RUN_INFINI(infinirtMalloc(&storage->memory, size));
}
storage->size = size;
RUN_INFINI(infinirtGetDevice(&storage->device_type, &storage->device_id));
return storage;
}

std::shared_ptr<Storage> Storage::createHost(size_t size) {
auto storage = std::make_shared<Storage>();
RUN_INFINI(infinirtMallocHost(&storage->memory, size));
storage->size = size;
storage->device_type = INFINI_DEVICE_CPU;
storage->device_id = 0;
storage->memory_pool = nullptr; // No pool for host memory
return storage;
}

Storage::~Storage() {
if (device_type == INFINI_DEVICE_CPU) {
RUN_INFINI(infinirtFreeHost(memory));
} else {
RUN_INFINI(infinirtFree(memory));
if (memory_pool) {
memory_pool->release(memory);
} else {
RUN_INFINI(infinirtFree(memory));
}
}
}
Loading