Skip to content

Feature request: wait_enqueue for BlockingReaderWriterQueue #112

@kcgen

Description

@kcgen

The BlockingReaderWriterQueue doesn't block the producer when the queue is full, which leads to unbounded queue growth.

Unbounded queue growth is almost universally undesirable because it leads to runaway usage of resources:

  • CPU: excessive generation-side CPU usage (producing content an unbounded rate)
  • Memory: excessive memory growth receive-side (due to unbounded reception without back-pressure)
  • Audio latency: the larger the buffer, the greater the latency between the source event and the play time, leading to unbounded latency.
  • Network latency: network buffer-bloat increases the latency between the requesting packet(s) and the response packet(s), as well as in A/V applications (like VOIP, Video Playback, etc).

And ideal system:

  • Consumes when the queue has at least one item and waits when it's empty
  • Produces when space exists in the queue and waits when it's full
  • The queue and its maximum size ties the two together, and sets the overall resource usage characteristics for the system.

Currently the BlockingReaderWriterQueue meets the first (throttling the consumer), but has no mechanism to throttle the producer, so currently some form of band-aid code is needed to deal with the second.

It would be fantastic if BlockingReaderWriterQueue has a wait_enqueue as an available function. Here's some sample code to illustrate the problem:

g++ test.cpp -pthread && ./a.out

#include "readerwriterqueue.h"
#include <thread>
#include <stdio.h>

using namespace moodycamel;

constexpr auto MAX_QUEUE_DEPTH = 8;
BlockingReaderWriterQueue<int, MAX_QUEUE_DEPTH> q(MAX_QUEUE_DEPTH);

int create_next_item()
{
	static int i = 1;
	std::this_thread::sleep_for(std::chrono::milliseconds(100));
	return i++;
}

void operate_on_item(const int &item)
{
	std::this_thread::sleep_for(std::chrono::seconds(1));
}

int main()
{
	printf("Running single-producer, single-consumer with a %d queue depth\n",
	       MAX_QUEUE_DEPTH);

	std::thread reader([&]() {
		int item;
		while (true) {
			q.wait_dequeue(item);
			operate_on_item(item);
			printf("Operated on%3d, %3lu items queued\n", item,
			       q.size_approx());
		}
	});

	std::thread writer([&]() {
		while (true) {
			const auto item = create_next_item();
			q.enqueue(item);
		}
	});

	writer.join();
	reader.join();

	assert(q.size_approx() == 0);

	return 0;
}

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions