A bounded multi-producer multi-consumer concurrent queue written in C++11.
It's battle hardened and used daily in production:
- In the Frostbite game engine developed by Electronic Arts for the following games:
- In the low latency trading infrastructure at Charlesworth Research and Marquette Partners.
It's been cited by the following papers:
- Peizhao Ou and Brian Demsky. 2018. Towards understanding the costs of avoiding out-of-thin-air results. Proc. ACM Program. Lang. 2, OOPSLA, Article 136 (October 2018), 29 pages. DOI: https://doi.org/10.1145/3276506
MPMCQueue<int> q(10);
auto t1 = std::thread([&] {
int v;
q.pop(v);
std::cout << "t1 " << v << "\n";
});
auto t2 = std::thread([&] {
int v;
q.pop(v);
std::cout << "t2 " << v << "\n";
});
q.push(1);
q.push(2);
t1.join();
t2.join();
-
MPMCQueue<T>(size_t capacity);
Constructs a new
MPMCQueue
holding items of typeT
with capacitycapacity
. -
void emplace(Args &&... args);
Enqueue an item using inplace construction. Blocks if queue is full.
-
bool try_emplace(Args &&... args);
Try to enqueue an item using inplace construction. Returns
true
on success andfalse
if queue is full. -
void push(const T &v);
Enqueue an item using copy construction. Blocks if queue is full.
-
template <typename P> void push(P &&v);
Enqueue an item using move construction. Participates in overload resolution only if
std::is_nothrow_constructible<T, P&&>::value == true
. Blocks if queue is full. -
bool try_push(const T &v);
Try to enqueue an item using copy construction. Returns
true
on success andfalse
if queue is full. -
template <typename P> bool try_push(P &&v);
Try to enqueue an item using move construction. Participates in overload resolution only if
std::is_nothrow_constructible<T, P&&>::value == true
. Returnstrue
on success andfalse
if queue is full. -
void pop(T &v);
Dequeue an item by copying or moving the item into
v
. Blocks if queue is empty. -
bool try_pop(T &v);
Try to dequeue an item by copying or moving the item into
v
. Returntrue
on sucess andfalse
if the queue is empty. -
ssize_t size();
Returns the number of elements in the queue.
The size can be negative when the queue is empty and there is at least one reader waiting. Since this is a concurrent queue the size is only a best effort guess until all reader and writer threads have been joined.
-
bool empty();
Returns true if the queue is empty.
Since this is a concurrent queue this is only a best effort guess until all reader and writer threads have been joined.
All operations except construction and destruction are thread safe.
Enqeue:
- Acquire next write ticket from head.
- Wait for our turn (2 * (ticket / capacity)) to write slot (ticket % capacity).
- Set turn = turn + 1 to inform the readers we are done writing.
Dequeue:
- Acquire next read ticket from tail.
- Wait for our turn (2 * (ticket / capacity) + 1) to read slot (ticket % capacity).
- Set turn = turn + 1 to inform the writers we are done reading.
References:
- Daniel Orozco, Elkin Garcia, Rishi Khan, Kelly Livingston, and Guang R. Gao. 2012. Toward high-throughput algorithms on many-core architectures. ACM Trans. Archit. Code Optim. 8, 4, Article 49 (January 2012), 21 pages. DOI: https://doi.org/10.1145/2086696.2086728
- Dave Dice. 2014. PTLQueue : a scalable bounded-capacity MPMC queue.
- Oleksandr Otenko. US 8607249 B2: System and method for efficient concurrent queue implementation.
- Massimiliano Meneghin, Davide Pasetto, Hubertus Franke. 2012. Performance evaluation of inter-thread communication mechanisms on multicore/multithreaded architectures. DOI: https://doi.org/10.1145/2287076.2287098
- Paul E. McKenney. 2010. Memory Barriers: a Hardware View for Software Hackers.
- Dmitry Vyukov. 2014. Bounded MPMC queue.
Testing concurrency algorithms is hard. I'm using two approaches to test the implementation:
- A single threaded test that the functionality works as intended, including that the element constructor and destructor is invoked correctly.
- A multithreaded fuzz test that all elements are enqueued and dequeued correctly under heavy contention.
- Add allocator supports so that the queue could be used with huge pages and shared memory
- Add benchmarks and compare to
boost::lockfree::queue
and others - Use C++20 concepts instead of
static_assert
if available - Use
std::hardware_destructive_interference_size
if available - Add API for zero-copy deqeue and batch dequeue operations
- Add
[[nodiscard]]
attributes
This project was created by Erik Rigtorp <erik@rigtorp.se>.