Skip to content

Commit

Permalink
channel: replace dequeue by vector
Browse files Browse the repository at this point in the history
  • Loading branch information
sebsura authored and BareosBot committed Sep 22, 2023
1 parent 5c34195 commit 1ccfc12
Showing 1 changed file with 23 additions and 14 deletions.
37 changes: 23 additions & 14 deletions core/src/lib/channel.h
Expand Up @@ -24,7 +24,7 @@

#include <condition_variable>
#include <optional>
#include <deque>
#include <vector>
#include <utility>
#include <variant>

Expand All @@ -44,7 +44,7 @@ struct channel_closed {};

template <typename T> class queue {
struct internal {
std::deque<T> data;
std::vector<T> data;
bool in_dead;
bool out_dead;
};
Expand Down Expand Up @@ -73,7 +73,7 @@ template <typename T> class queue {
{
}

std::deque<T>& data() { return locked->data; }
std::vector<T>& data() { return locked->data; }

~handle()
{
Expand Down Expand Up @@ -247,7 +247,8 @@ template <typename T> class input {

template <typename T> class output {
std::shared_ptr<queue<T>> shared;
std::deque<T> cache;
std::vector<T> cache{};
typename decltype(cache)::iterator cache_iter = cache.begin();
bool did_close{false};

public:
Expand All @@ -265,9 +266,8 @@ template <typename T> class output {
if (did_close) { return std::nullopt; }
update_cache();

if (cache.size() > 0) {
std::optional result = std::make_optional<T>(std::move(cache.front()));
cache.pop_front();
if (cache_iter != cache.end()) {
std::optional result = std::make_optional<T>(std::move(*cache_iter++));
return result;
} else {
return std::nullopt;
Expand All @@ -279,9 +279,8 @@ template <typename T> class output {
if (did_close) { return std::nullopt; }
try_update_cache();

if (cache.size() > 0) {
std::optional result = std::make_optional<T>(std::move(cache.front()));
cache.pop_front();
if (cache_iter != cache.end()) {
std::optional result = std::make_optional<T>(std::move(*cache_iter++));
return result;
} else {
return std::nullopt;
Expand All @@ -291,6 +290,8 @@ template <typename T> class output {
void close()
{
if (!did_close) {
cache.clear();
cache_iter = cache.begin();
shared->close_out();
did_close = true;
}
Expand All @@ -304,11 +305,18 @@ template <typename T> class output {
}

private:
void do_update_cache(std::vector<T>& data)
{
cache.clear();
std::swap(data, cache);
cache_iter = cache.begin();
}

void update_cache()
{
if (cache.empty()) {
if (cache_iter == cache.end()) {
if (auto handle = shared->output_lock()) {
std::swap(handle->data(), cache);
do_update_cache(handle->data());
} else {
// this can only happen if the channel was closed.
close();
Expand All @@ -318,14 +326,15 @@ template <typename T> class output {

void try_update_cache()
{
if (cache.empty()) {
if (cache_iter == cache.end()) {
auto result = shared->try_output_lock();
if (std::holds_alternative<failed_to_acquire_lock>(result)) {
// intentionally left empty
} else if (std::holds_alternative<channel_closed>(result)) {
close();
} else {
std::swap(std::get<typename queue<T>::handle>(result).data(), cache);
auto& handle = std::get<typename queue<T>::handle>(result);
do_update_cache(handle.data());
}
}
}
Expand Down

0 comments on commit 1ccfc12

Please sign in to comment.