Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Throw on allocate or acquire errors #29

Merged
merged 1 commit into from
Jun 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
82 changes: 49 additions & 33 deletions src/proteus/core/worker_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,43 +56,25 @@ void* findFunc(const std::string& func, const std::string& soPath) {
/* open the needed object */
void* handle = dlopen(soPath.c_str(), RTLD_LOCAL | RTLD_LAZY);
if (handle == nullptr) {
char* error_str = dlerror();
const char* error_str = dlerror();
throw std::invalid_argument(error_str);
}

/* find the address of function */
void* fptr = dlsym(handle, func.c_str());
if (fptr == nullptr) {
char* error_str = dlerror();
const char* error_str = dlerror();
throw std::invalid_argument(error_str);
}
return fptr;
}

WorkerInfo::WorkerInfo(const std::string& name, RequestParameters* parameters) {
this->input_buffer_ptr_ = std::make_unique<BufferPtrsQueue>();
this->output_buffer_ptr_ = std::make_unique<BufferPtrsQueue>();
this->buffer_num_ = 0;
this->max_buffer_num_ = 0;
this->batch_size_ = 1;

this->addAndStartWorker(name, parameters);
}

WorkerInfo::~WorkerInfo() {
for (auto const& worker : this->workers_) {
delete worker.second; // NOLINT(cppcoreguidelines-owning-memory)
}
}

void WorkerInfo::addAndStartWorker(const std::string& name,
RequestParameters* parameters) {
workers::Worker* getWorker(const std::string& name) {
// multiple workers with different configurations may exist. Remove the config
// tag that starts with "-" in the name prior to loading the .so
auto lib_name = name;
lib_name[0] = std::toupper(lib_name[0]);
auto hyphen_pos = name.find('-');
if (hyphen_pos != std::string::npos) {
lib_name[0] = static_cast<char>(std::toupper(lib_name[0]));
if (auto hyphen_pos = name.find('-'); hyphen_pos != std::string::npos) {
lib_name.erase(hyphen_pos);
}
std::string library =
Expand All @@ -103,6 +85,25 @@ void WorkerInfo::addAndStartWorker(const std::string& name,
// cast the void pointer from dlsym to a function pointer. This assumes that
// void* is same size as function pointer, which should be true on POSIX
auto* worker = reinterpret_cast<workers::Worker* (*)()>(funcPtr)();
return worker;
}

WorkerInfo::WorkerInfo(const std::string& name, RequestParameters* parameters) {
this->input_buffer_ptr_ = std::make_unique<BufferPtrsQueue>();
this->output_buffer_ptr_ = std::make_unique<BufferPtrsQueue>();

this->addAndStartWorker(name, parameters);
}

WorkerInfo::~WorkerInfo() {
for (const auto& [thread_id, worker] : workers_) {
delete worker; // NOLINT(cppcoreguidelines-owning-memory)
}
}

void WorkerInfo::addAndStartWorker(const std::string& name,
RequestParameters* parameters) {
auto worker = getWorker(name);
worker->init(parameters);
this->batch_size_ = worker->getBatchSize();

Expand All @@ -122,12 +123,27 @@ void WorkerInfo::addAndStartWorker(const std::string& name,
auto max_buffers = worker->getMaxBufferNum();
worker->setInputBuffers(this->input_buffer_ptr_.get());
worker->setOutputBuffers(this->output_buffer_ptr_.get());
auto buffer_num = worker->allocate(kNumBufferAuto);
this->buffer_num_ += buffer_num;
try {
auto buffer_num = worker->allocate(kNumBufferAuto);
this->buffer_num_ += buffer_num;
} catch (...) {
if (workers_.empty()) {
this->batchers_.clear();
}
throw;
}
this->max_buffer_num_ =
max_buffers == UINT_MAX ? UINT_MAX : this->max_buffer_num_ + max_buffers;

worker->acquire(parameters);
try {
worker->acquire(parameters);
} catch (...) {
if (workers_.empty()) {
this->batchers_.clear();
}
throw;
}

for (const auto& batcher : this->batchers_) {
if (batcher->getStatus() != BatcherStatus::kRun) {
batcher->start(this);
Expand All @@ -151,9 +167,9 @@ void WorkerInfo::join(std::thread::id id) {
}

void WorkerInfo::joinAll() {
for (auto& thread : this->worker_threads_) {
if (thread.second.joinable()) {
thread.second.join();
for (auto& [thread_id, thread] : worker_threads_) {
if (thread.joinable()) {
thread.join();
}
}
}
Expand Down Expand Up @@ -203,11 +219,11 @@ void WorkerInfo::unload() {
this->workers_.erase(id);
}

int WorkerInfo::getGroupSize() { return this->workers_.size(); }
size_t WorkerInfo::getGroupSize() const { return this->workers_.size(); }

void WorkerInfo::shutdown() {
auto workers = this->getGroupSize();
for (int i = 0; i < workers; i++) {
for (auto i = 0U; i < workers; i++) {
this->unload();
}
}
Expand All @@ -224,11 +240,11 @@ BufferPtrs WorkerInfo::getOutputBuffer() const {
return buffer;
}

void WorkerInfo::putInputBuffer(BufferPtrs buffer) {
void WorkerInfo::putInputBuffer(BufferPtrs buffer) const {
this->input_buffer_ptr_->enqueue(std::move(buffer));
}

void WorkerInfo::putOutputBuffer(BufferPtrs buffer) {
void WorkerInfo::putOutputBuffer(BufferPtrs buffer) const {
this->output_buffer_ptr_->enqueue(std::move(buffer));
}

Expand Down
12 changes: 6 additions & 6 deletions src/proteus/core/worker_info.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,13 @@ class WorkerInfo {
*
* @param buffer the buffer to return
*/
void putInputBuffer(BufferPtrs buffer);
void putInputBuffer(BufferPtrs buffer) const;
/**
* @brief Return an output buffer to the worker
*
* @param buffer the buffer to return
*/
void putOutputBuffer(BufferPtrs buffer);
void putOutputBuffer(BufferPtrs buffer) const;

/**
* @brief Checks if this worker group supports a particular number of input
Expand Down Expand Up @@ -141,7 +141,7 @@ class WorkerInfo {
void allocate(size_t request_size);

/// get the number of workers in the group
int getGroupSize();
size_t getGroupSize() const;

/// get the batch size of the worker group
[[nodiscard]] auto getBatchSize() const { return this->batch_size_; }
Expand All @@ -152,9 +152,9 @@ class WorkerInfo {
std::vector<std::unique_ptr<Batcher>> batchers_;
BufferPtrsQueuePtr input_buffer_ptr_;
BufferPtrsQueuePtr output_buffer_ptr_;
size_t buffer_num_;
size_t max_buffer_num_;
size_t batch_size_;
size_t buffer_num_ = 0;
size_t max_buffer_num_ = 0;
size_t batch_size_ = 1;

friend class Manager;
};
Expand Down
6 changes: 3 additions & 3 deletions tests/src/proteus/core/worker_info_buffers_finite.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ void WorkerInfo::joinAll() {}

void WorkerInfo::unload() {}

int WorkerInfo::getGroupSize() { return 1; }
size_t WorkerInfo::getGroupSize() const { return 1; }

void WorkerInfo::shutdown() {}

Expand All @@ -82,11 +82,11 @@ BufferPtrs WorkerInfo::getOutputBuffer() const {
return buffer;
}

void WorkerInfo::putInputBuffer(BufferPtrs buffer) {
void WorkerInfo::putInputBuffer(BufferPtrs buffer) const {
this->input_buffer_ptr_->enqueue(std::move(buffer));
}

void WorkerInfo::putOutputBuffer(BufferPtrs buffer) {
void WorkerInfo::putOutputBuffer(BufferPtrs buffer) const {
this->output_buffer_ptr_->enqueue(std::move(buffer));
}

Expand Down
6 changes: 3 additions & 3 deletions tests/src/proteus/core/worker_info_buffers_infinite.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ void WorkerInfo::joinAll() {}

void WorkerInfo::unload() {}

int WorkerInfo::getGroupSize() { return 1; }
size_t WorkerInfo::getGroupSize() const { return 1; }

void WorkerInfo::shutdown() {}

Expand All @@ -81,9 +81,9 @@ BufferPtrs WorkerInfo::getOutputBuffer() const {
return buffer;
}

void WorkerInfo::putInputBuffer(BufferPtrs buffer) { (void)buffer; }
void WorkerInfo::putInputBuffer(BufferPtrs buffer) const { (void)buffer; }

void WorkerInfo::putOutputBuffer(BufferPtrs buffer) { (void)buffer; }
void WorkerInfo::putOutputBuffer(BufferPtrs buffer) const { (void)buffer; }

bool WorkerInfo::inputSizeValid(size_t size) const {
(void)size;
Expand Down