Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(userspace/libsinsp): elems in mpsc queue with same priority follow push order #1504

Merged
merged 1 commit into from
Nov 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
33 changes: 28 additions & 5 deletions userspace/libsinsp/mpsc_priority_queue.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ limitations under the License.
* @brief Concurrent priority queue optimized for multiple producer/single consumer
* (mpsc) use cases. This queue allows checking the top element against
* a provided predicate before popping. It is optimized for checking for
* emptyness before popping.
* emptyness before popping. The queue accepts only elements of pointer-type
* in the form of std::shared_ptr<T> or std::unique_ptr<T>. The priority queue
* bases its element ordering constraints on Cmp. Elements with equal priority
* follow the temporal order with which they have been pushed.
*/
template<typename Elm, typename Cmp, typename Mtx = std::mutex>
class mpsc_priority_queue
Expand Down Expand Up @@ -56,7 +59,7 @@ class mpsc_priority_queue
std::scoped_lock<Mtx> lk(m_mtx);
if (m_capacity == 0 || m_queue.size() < m_capacity)
{
m_queue.push(queue_elm{std::move(e)});
m_queue.push(queue_elm{std::move(e), m_elem_counter++});
m_queue_top = m_queue.top().elm.get();
return true;
}
Expand Down Expand Up @@ -155,16 +158,36 @@ class mpsc_priority_queue
private:
using elm_ptr = typename Elm::element_type*;

// workaround to make unique_ptr usable when copying the queue top
// which is const unique<ptr>& and denies moving
struct queue_elm
{
inline bool operator < (const queue_elm& r) const {return Cmp{}(*elm.get(), *r.elm.get());}
inline bool operator < (const queue_elm& r) const
{
// we check if this elem is less than the other. If the comparison
// gives the same result when inverting the operands, then we can
// assume them being equal.
Cmp c{};
auto res = c(*elm.get(), *r.elm.get());
if (res == c(*r.elm.get(), *elm.get()))
{
// if elements have the same priority, order them by
// temporal order of arrival in the queue by using an atomic
// logical clock (counter).
// note(jasondellaluce): this approach is vulnerable to integer overflow
// that would cause the second-level ordering guarantee to be broken,
// but given that we use a uint64_t counter we find this unlikely
return std::greater_equal<uint64_t>{}(num, r.num);
}
return res;
}
// using mutable is a workaround to make unique_ptr usable when copying
// the queue top(), which is returned a const unique<ptr>& and denies moving
mutable Elm elm;
uint64_t num;
};

const size_t m_capacity;
std::priority_queue<queue_elm> m_queue{};
std::atomic<elm_ptr> m_queue_top{nullptr};
std::atomic<uint64_t> m_elem_counter{0};
Mtx m_mtx;
};
55 changes: 52 additions & 3 deletions userspace/libsinsp/test/mpsc_priority_queue.ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,61 @@ limitations under the License.
#include <thread>
#include <chrono>

using val_t = std::unique_ptr<int>;
TEST(mpsc_priority_queue, order_consistency)
{
struct val
{
int v;
int order;
};

struct val_less
{
bool operator()(const val& l, const val& r)
{
return std::greater_equal<int>{}(l.v, r.v);
}
};

using val_t = std::unique_ptr<val>;

mpsc_priority_queue<val_t, val_less> q;
for (int i = 0; i < 100; i++)
{
for (int j = 0; j < 100; j++)
{
// j is used only for tracking the order in which elements
// are pushed for checking it later
q.push(val_t{new val{i,j}});
}
}

val_t cur{nullptr};
val_t prev{nullptr};
while (!q.empty())
{
ASSERT_TRUE(q.try_pop(cur));
if (prev != nullptr)
{
ASSERT_GE(cur->v, prev->v);
if (cur->v == prev->v)
{
ASSERT_GT(cur->order, prev->order);
}
}
prev = std::move(cur);
}

}

// note: emscripten does not support launching threads
#ifndef __EMSCRIPTEN__

TEST(mpsc_priority_queue, single_producer)
TEST(mpsc_priority_queue, single_concurrent_producer)
{
using val_t = std::unique_ptr<int>;
const int max_value = 1000;

mpsc_priority_queue<val_t, std::greater_equal<int>> q;

// single producer
Expand Down Expand Up @@ -69,10 +116,12 @@ TEST(mpsc_priority_queue, single_producer)
ASSERT_EQ(failed, 0);
}

TEST(mpsc_priority_queue, multi_producer)
TEST(mpsc_priority_queue, multi_concurrent_producers)
{
using val_t = std::unique_ptr<int>;
const int num_values = 1000;
const int num_producers = 10;

mpsc_priority_queue<val_t, std::greater_equal<int>> q;
std::atomic<int> counter{1};

Expand Down