Permalink
Switch branches/tags
Nothing to show
Find file
Fetching contributors…
Cannot retrieve contributors at this time
1126 lines (910 sloc) 34.2 KB
/* pipe.c - The pipe implementation. This is the only file that must be linked
* to use the pipe.
*
* The MIT License
* Copyright (c) 2011 Clark Gaebel <cg.wowus.cg@gmail.com>
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
#include "pipe.h"
#include <assert.h>
#include <stdbool.h>
#include <stdint.h>
#include <stdlib.h>
#include <string.h>
// Vanity bytes. As long as this isn't removed from the executable, I don't
// mind if I don't get credits in a README or any other documentation. Consider
// this your fulfillment of the MIT license.
const char _pipe_copyright[] =
__FILE__
" : Copyright (c) 2011 Clark Gaebel <cg.wowus.cg@gmail.com> (MIT License)";
#ifndef min
#define min(a, b) ((a) <= (b) ? (a) : (b))
#endif
#ifndef max
#define max(a, b) ((a) >= (b) ? (a) : (b))
#endif
#ifdef __GNUC__
#define likely(cond) __builtin_expect(!!(cond), 1)
#define unlikely(cond) __builtin_expect( (cond), 0)
#define CONSTEXPR __attribute__((const))
#define PURE __attribute__((pure))
#else
#define likely(cond) (cond)
#define unlikely(cond) (cond)
#define CONSTEXPR
#define PURE
#endif
#ifdef NDEBUG
#if defined(_MSC_VER)
#define assertume __assume
#else // _MSC_VER
#define assertume assert
#endif // _MSC_VER
#else // NDEBUG
#define assertume assert
#endif // NDEBUG
// The number of spins to do before performing an expensive kernel-mode context
// switch. This is a nice easy value to tweak for your application's needs. Set
// it to 0 if you want the implementation to decide, a low number if you are
// copying many objects into pipes at once (or a few large objects), and a high
// number if you are coping small or few objects into pipes at once.
#define MUTEX_SPINS 8192
// Standard threading stuff. This lets us support simple synchronization
// primitives on multiple platforms painlessly.
#if defined(_WIN32) || defined(_WIN64) // use the native win32 API on windows
#include <windows.h>
// On vista+, we have native condition variables and fast locks. Yay.
#if defined(_WIN32_WINNT) && _WIN32_WINNT >= 0x0600
#define mutex_t SRWLOCK
#define mutex_init InitializeSRWLock
#define mutex_lock AcquireSRWLockExclusive
#define mutex_unlock ReleaseSRWLockExclusive
#define mutex_destroy(m)
#define cond_t CONDITION_VARIABLE
#define cond_init InitializeConditionVariable
#define cond_signal WakeConditionVariable
#define cond_broadcast WakeAllConditionVariable
#define cond_wait(c, m) SleepConditionVariableSRW((c), (m), INFINITE, 0)
#define cond_destroy(c)
// Oh god. Microsoft has slow locks and lacks native condition variables on
// anything lower than Vista. Looks like we're rolling our own today.
#else /* vista+ */
#define mutex_t CRITICAL_SECTION
#define mutex_init(m) InitializeCriticalSectionAndSpinCount((m), MUTEX_SPINS)
#define mutex_lock EnterCriticalSection
#define mutex_unlock LeaveCriticalSection
#define mutex_destroy DeleteCriticalSection
// This Condition variable implementation is stolen from:
// http://www.cs.wustl.edu/~schmidt/win32-cv-1.html (section 3.3)
typedef struct cond_t {
// Count of the number of waiters, with a critical section to serialize
// accesses to it.
int waiters_count;
CRITICAL_SECTION waiters_count_lock;
// Number of threads to release via a cond_broadcast or a cond_signal.
int release_count;
// Keeps track of the current "generation" so that a single thread can't
// steal all the resources from a broadcast.
int wait_generation_count;
// A manual-reset event that's used to block and release waiting threads.
HANDLE event;
} cond_t;
static void cond_init(cond_t* c)
{
c->waiters_count = 0;
InitializeCriticalSection(&c->waiters_count_lock);
c->release_count = 0;
c->wait_generation_count = 0;
c->event = CreateEvent(NULL, true, false, NULL);
}
static void cond_signal(cond_t* c)
{
EnterCriticalSection(&c->waiters_count_lock);
if(c->waiters_count > c->release_count)
{
SetEvent(c->event);
c->release_count++;
c->wait_generation_count++;
}
LeaveCriticalSection(&c->waiters_count_lock);
}
static void cond_broadcast(cond_t* c)
{
EnterCriticalSection(&c->waiters_count_lock);
if(c->waiters_count > 0)
{
SetEvent(c->event);
// Release all the threads in this generation.
c->release_count = c->waiters_count;
// Start a new generation.
c->wait_generation_count++;
}
LeaveCriticalSection(&c->waiters_count_lock);
}
static void cond_wait(cond_t* c, mutex_t* m)
{
EnterCriticalSection(&c->waiters_count_lock);
c->waiters_count++;
int my_generation = c->wait_generation_count;
LeaveCriticalSection(&c->waiters_count_lock);
mutex_unlock(m);
bool wait_done;
do
{
WaitForSingleObject(c->event, INFINITE);
EnterCriticalSection(&c->waiters_count_lock);
int release_count = c->release_count;
int wait_generation_count = c->wait_generation_count;
LeaveCriticalSection(&c->waiters_count_lock);
wait_done = release_count > 0
&& wait_generation_count != my_generation;
}
while(!wait_done);
mutex_lock(m);
EnterCriticalSection(&c->waiters_count_lock);
c->waiters_count--;
int release_count = --c->release_count;
LeaveCriticalSection(&c->waiters_count_lock);
if(release_count == 0) // we're the last waiter
ResetEvent(c->event);
}
static void cond_destroy(cond_t* c)
{
DeleteCriticalSection(&c->waiters_count_lock);
CloseHandle(c->event);
}
#endif /* vista+ */
// Fall back on pthreads if we haven't special-cased the current OS.
#else /* windows */
#include <pthread.h>
#define mutex_t pthread_mutex_t
#define cond_t pthread_cond_t
#define mutex_init(m) pthread_mutex_init((m), NULL)
#define mutex_lock pthread_mutex_lock
#define mutex_unlock pthread_mutex_unlock
#define mutex_destroy pthread_mutex_destroy
#define cond_init(c) pthread_cond_init((c), NULL)
#define cond_signal pthread_cond_signal
#define cond_broadcast pthread_cond_broadcast
#define cond_wait pthread_cond_wait
#define cond_destroy pthread_cond_destroy
#endif /* windows */
// End threading.
/*
* Pipe implementation overview
* =================================
*
* A pipe is implemented as a circular buffer. There are two special cases for
* this structure: nowrap and wrap.
*
* Nowrap:
*
* buffer begin end bufend
* [ >==================> ]
*
* In this case, the data storage is contiguous, allowing easy access. This is
* the simplest case.
*
* Wrap:
*
* buffer end begin bufend
* [============> >=====================]
*
* In this case, the data storage is split up, wrapping around to the beginning
* of the buffer when it hits bufend. Hackery must be done in this case to
* ensure the structure is maintained and data can be easily copied in/out.
*
* Data is 'push'ed after the end pointer and 'pop'ed from the begin pointer.
* There is always one sentinel element in the pipe, to distinguish between an
* empty pipe and a full pipe.
*
* Invariants:
*
* The invariants of a pipe are documented in the check_invariants function,
* and double-checked frequently in debug builds. This helps restore sanity when
* making modifications, but may slow down calls. It's best to disable the
* checks in release builds.
*
* Thread-safety:
*
* pipe_t has been designed with high threading workloads foremost in my mind.
* Its initial purpose was to serve as a task queue, with multiple threads
* feeding data in (from disk, network, etc) and multiple threads reading it
* and processing it in parallel. This created the need for a fully re-entrant,
* lightweight, accommodating data structure.
*
* We have two locks guarding the pipe, instead of the naive solution of having
* one. One guards writes to the begin pointer, the other guards writes to the
* end pointer. This is due to the realization that when pushing, you don't need
* an up-to-date value for begin, and when popping you don't need an up-to-date
* value for end (since either can only move forward in the buffer). As long as
* neither moves backwards, there will be no conflicts when they move
* independently of each other. This optimization has improved benchmarks by
* 15-20%.
*
* Complexity:
*
* Pushing and popping must run in O(n) where n is the number of elements being
* inserted/removed. It must also run in O(1) with respect to the number of
* elements in the pipe.
*
* Efficiency:
*
* Asserts are used liberally, and many of them, when inlined, can be turned
* into no-ops. Therefore, it is recommended that you compile with -O1 in
* debug builds as the pipe can easily become a bottleneck.
*/
struct pipe_t {
size_t elem_size, // The size of each element. This is read-only and
// therefore does not need to be locked to read.
min_cap, // The smallest sane capacity before the buffer refuses
// to shrink because it would just end up growing again.
// To modify this variable, you must lock the whole pipe.
max_cap; // The maximum capacity of the pipe before push requests
// are blocked. To read or write to this variable, you
// must hold 'end_lock'.
char* buffer, // The internal buffer, holding the enqueued elements.
// to modify this variable, you must lock the whole pipe.
* bufend, // One past the end of the buffer, so that the actual
// elements are stored in in interval [buffer, bufend).
* begin, // Always points to the sentinel element. `begin + elem_size`
// points to the left-most element in the pipe.
// To modify this variable, you must lock begin_lock.
* end; // Always points past the right-most element in the pipe.
// To modify this variable, you must lock end_lock.
// The number of producers/consumers in the pipe.
size_t producer_refcount, // Guarded by begin_lock.
consumer_refcount; // Guarded by end_lock.
// Our lovely mutexes. To lock the pipe, call lock_pipe. Depending on what
// you modify, you may be able to get away with only locking one of them.
mutex_t begin_lock,
end_lock;
cond_t just_pushed, // Signaled immediately after a push.
just_popped; // Signaled immediately after a pop.
};
// Converts a pointer to either a producer or consumer into a suitable pipe_t*.
#define PIPIFY(handle) ((pipe_t*)(handle))
// We wrap elem_size in a function so we can annotate it with PURE, allowing
// the compiler's CSE to eliminate extraneous memory accesses.
static inline PURE size_t __pipe_elem_size(pipe_t* p)
{
return p->elem_size;
}
size_t pipe_elem_size(pipe_generic_t* p)
{
return __pipe_elem_size(PIPIFY(p));
}
// Represents a snapshot of a pipe. We often don't need all our values
// up-to-date (usually only one of begin or end). By passing this around, we
// avoid constantly wrecking our cache by accessing the real pipe_t.
typedef struct {
char* buffer,
* bufend,
* begin,
* end;
size_t elem_size;
} snapshot_t;
static inline snapshot_t make_snapshot(pipe_t* p)
{
return (snapshot_t) {
.buffer = p->buffer,
.bufend = p->bufend,
.begin = p->begin,
.end = p->end,
.elem_size = __pipe_elem_size(p),
};
}
// The initial minimum capacity of the pipe. This can be overridden dynamically
// with pipe_reserve.
#ifdef PIPE_DEBUG
#define DEFAULT_MINCAP 2
#else
#define DEFAULT_MINCAP 32
#endif
// Returns the maximum number of bytes the buffer can hold, excluding the
// sentinel element.
static inline size_t capacity(snapshot_t s)
{
return s.bufend - s.buffer - s.elem_size;
}
// Does the buffer wrap around?
// true -> wrap
// false -> nowrap
static inline bool wraps_around(snapshot_t s)
{
return unlikely(s.begin >= s.end);
}
// Returns the number of bytes currently in use in the buffer, excluding the
// sentinel element.
static inline size_t bytes_in_use(snapshot_t s)
{
return (wraps_around(s)
// v right half v v left half v
? ((s.end - s.buffer) + (s.bufend - s.begin))
: (s.end - s.begin))
// exclude the sentinel element.
- s.elem_size;
}
static inline char* wrap_ptr_if_necessary(char* buffer,
char* p,
char* bufend)
{
if(p >= bufend) {
size_t diff = p - bufend;
return buffer + diff;
} else {
return p;
}
}
static inline char* rev_wrap_ptr_if_necessary(char* buffer,
char* p,
char* bufend)
{
if(p < buffer) {
size_t diff = buffer - p;
return bufend - diff;
} else {
return p;
}
}
// Runs a memcpy, then returns the end of the range copied.
// Has identical functionality as mempcpy, but is portable.
static inline void* offset_memcpy(void* restrict dest,
const void* restrict src,
size_t n)
{
memcpy(dest, src, n);
return (char*)dest + n;
}
static size_t CONSTEXPR next_pow2(size_t n)
{
// I don't see why we would even try. Maybe a stacktrace will help.
assertume(n != 0);
// In binary, top is equal to 10000...0: A 1 right-padded by as many zeros
// as needed to fill up a size_t.
size_t top = (~(size_t)0 >> 1) + 1;
// If when we round up we will overflow our size_t, avoid rounding up and
// exit early.
if(unlikely(n >= top))
return n;
// Since we don't have to worry about overflow anymore, we can just use
// the algorithm documented at:
// http://bits.stephan-brumme.com/roundUpToNextPowerOfTwo.html
// It's my favorite due to being branch-free (the loop will be unrolled),
// and portable. However, on x86, it will be faster to use the BSR (bit-scan
// reverse) instruction. Since this isn't straight C, it has been omitted,
// but may be best for your platform.
//
// clang 3.0 is smart enough to turn this code into a bsr. gcc 4.6 isn't. I
// haven't tested higher versions.
n--;
for(size_t shift = 1; shift < (sizeof n)*8; shift <<= 1)
n |= n >> shift;
n++;
return n;
}
#define in_bounds(left, x, right) ((x) >= (left) && (x) <= (right))
// You know all those assumptions we make about our data structure whenever we
// use it? This function checks them, and is called liberally through the
// codebase. It would be best to read this function over, as it also acts as
// documentation. Code AND documentation? What is this witchcraft?
static inline void check_invariants(pipe_t* p)
{
if(p == NULL) return;
// p->buffer may be NULL. When it is, we must have no issued consumers.
// It's just a way to save memory when we've deallocated all consumers
// and people are still trying to push like idiots.
if(p->buffer == NULL)
{
assertume(p->consumer_refcount == 0);
return;
}
else
{
assertume(p->consumer_refcount != 0);
}
snapshot_t s = make_snapshot(p);
assertume(s.begin);
assertume(s.end);
assertume(s.bufend);
assertume(p->elem_size != 0);
assertume(bytes_in_use(s) <= capacity(s)
&& "There are more elements in the buffer than its capacity.");
assertume(in_bounds(s.buffer, s.begin, s.bufend));
assertume(in_bounds(s.buffer, s.end, s.bufend));
if(s.begin == s.end)
assertume(bytes_in_use(s) == capacity(s));
assertume(in_bounds(DEFAULT_MINCAP*p->elem_size, p->min_cap, p->max_cap));
assertume(in_bounds(p->min_cap, capacity(s) + p->elem_size, p->max_cap));
}
static inline void lock_pipe(pipe_t* p)
{
// watch the locking order VERY carefully. end_lock must ALWAYS be locked
// before begin_lock when dealing with both at once.
mutex_lock(&p->end_lock);
mutex_lock(&p->begin_lock);
check_invariants(p);
}
static inline void unlock_pipe(pipe_t* p)
{
check_invariants(p);
mutex_unlock(&p->begin_lock);
mutex_unlock(&p->end_lock);
}
// runs some code while automatically locking and unlocking the pipe. If `break'
// is used, the pipe will be unlocked before control returns from the macro.
#define WHILE_LOCKED(stuff) do { \
lock_pipe(p); \
do { stuff; } while(0); \
unlock_pipe(p); \
} while(0)
pipe_t* pipe_new(size_t elem_size, size_t original_limit)
{
assertume(elem_size != 0);
if(elem_size == 0)
return NULL;
pipe_t* p = malloc(sizeof *p);
assert(DEFAULT_MINCAP >= 1);
size_t cap = DEFAULT_MINCAP * elem_size;
char* buf = malloc(cap);
// Change the limit from being in "elements" to being in "bytes", and make
// room for the sentinel element.
size_t limit = (original_limit + 1) * elem_size;
if(unlikely(p == NULL || buf == NULL))
return free(p), free(buf), NULL;
*p = (pipe_t) {
.elem_size = elem_size,
.min_cap = cap,
.max_cap = original_limit ? next_pow2(max(limit, cap)) : ~(size_t)0,
.buffer = buf,
.bufend = buf + cap,
.begin = buf,
.end = buf + elem_size,
// Since we're issuing a pipe_t, it counts as both a producer and a
// consumer since it can issue new instances of both. Therefore, the
// refcounts both start at 1; not the intuitive 0.
.producer_refcount = 1,
.consumer_refcount = 1,
};
mutex_init(&p->begin_lock);
mutex_init(&p->end_lock);
cond_init(&p->just_pushed);
cond_init(&p->just_popped);
check_invariants(p);
return p;
}
// Instead of allocating a special handle, the pipe_*_new() functions just
// return the original pipe, cast into a user-friendly form. This saves needless
// malloc calls. Also, since we have to refcount anyways, it's free.
pipe_producer_t* pipe_producer_new(pipe_t* p)
{
mutex_lock(&p->begin_lock);
p->producer_refcount++;
mutex_unlock(&p->begin_lock);
return (pipe_producer_t*)p;
}
pipe_consumer_t* pipe_consumer_new(pipe_t* p)
{
mutex_lock(&p->end_lock);
p->consumer_refcount++;
mutex_unlock(&p->end_lock);
return (pipe_consumer_t*)p;
}
static void deallocate(pipe_t* p)
{
assertume(p->producer_refcount == 0);
assertume(p->consumer_refcount == 0);
mutex_destroy(&p->begin_lock);
mutex_destroy(&p->end_lock);
cond_destroy(&p->just_pushed);
cond_destroy(&p->just_popped);
free(p->buffer);
free(p);
}
void pipe_free(pipe_t* p)
{
size_t new_producer_refcount,
new_consumer_refcount;
mutex_lock(&p->begin_lock);
assertume(p->producer_refcount > 0);
new_producer_refcount = --p->producer_refcount;
mutex_unlock(&p->begin_lock);
mutex_lock(&p->end_lock);
assertume(p->consumer_refcount > 0);
new_consumer_refcount = --p->consumer_refcount;
mutex_unlock(&p->end_lock);
if(unlikely(new_consumer_refcount == 0))
{
p->buffer = (free(p->buffer), NULL);
if(likely(new_producer_refcount > 0))
cond_broadcast(&p->just_popped);
else
deallocate(p);
}
else if(unlikely(new_producer_refcount == 0))
cond_broadcast(&p->just_pushed);
}
void pipe_producer_free(pipe_producer_t* handle)
{
pipe_t* p = PIPIFY(handle);
size_t new_producer_refcount;
mutex_lock(&p->begin_lock);
assertume(p->producer_refcount > 0);
new_producer_refcount = --p->producer_refcount;
mutex_unlock(&p->begin_lock);
if(unlikely(new_producer_refcount == 0))
{
size_t consumer_refcount;
mutex_lock(&p->end_lock);
consumer_refcount = p->consumer_refcount;
mutex_unlock(&p->end_lock);
// If there are still consumers, wake them up if they're waiting on
// input from a producer. Otherwise, since we're the last handle
// altogether, we can free the pipe.
if(likely(consumer_refcount > 0))
cond_broadcast(&p->just_pushed);
else
deallocate(p);
}
}
void pipe_consumer_free(pipe_consumer_t* handle)
{
pipe_t* p = PIPIFY(handle);
size_t new_consumer_refcount;
mutex_lock(&p->end_lock);
new_consumer_refcount = --p->consumer_refcount;
mutex_unlock(&p->end_lock);
if(unlikely(new_consumer_refcount == 0))
{
size_t producer_refcount;
mutex_lock(&p->begin_lock);
producer_refcount = p->producer_refcount;
mutex_unlock(&p->begin_lock);
// If there are still producers, wake them up if they're waiting on
// room to free up from a consumer. Otherwise, since we're the last
// handle altogether, we can free the pipe.
if(likely(producer_refcount > 0))
cond_broadcast(&p->just_popped);
else
deallocate(p);
}
}
// Returns the end of the buffer (buf + number_of_bytes_copied).
static inline char* copy_pipe_into_new_buf(snapshot_t s,
char* restrict buf)
{
if(wraps_around(s))
{
buf = offset_memcpy(buf, s.begin, s.bufend - s.begin);
buf = offset_memcpy(buf, s.buffer, s.end - s.buffer);
}
else
{
buf = offset_memcpy(buf, s.begin, s.end - s.begin);
}
return buf;
}
// Resizes the buffer to make room for at least 'new_size' elements, returning
// an updated snapshot of the pipe state.
//
// The new size MUST be bigger than the number of elements currently in the
// pipe.
//
// The pipe must be fully locked on entrance to this function.
static snapshot_t resize_buffer(pipe_t* p, size_t new_size)
{
check_invariants(p);
const size_t max_cap = p->max_cap,
min_cap = p->min_cap,
elem_size = __pipe_elem_size(p);
assertume(new_size >= bytes_in_use(make_snapshot(p)));
if(unlikely(new_size >= max_cap))
new_size = max_cap;
if(new_size <= min_cap)
return make_snapshot(p);
char* new_buf = malloc(new_size + elem_size);
p->end = copy_pipe_into_new_buf(make_snapshot(p), new_buf);
p->begin =
p->buffer = (free(p->buffer), new_buf);
p->bufend = new_buf + new_size + elem_size;
check_invariants(p);
return make_snapshot(p);
}
static inline snapshot_t validate_size(pipe_t* p,
snapshot_t s,
size_t new_bytes)
{
size_t elem_size = __pipe_elem_size(p),
cap = capacity(s),
bytes_needed = bytes_in_use(s) + new_bytes;
if(unlikely(bytes_needed > cap))
{
// upgrade our lock, then re-check. By taking both locks (end and begin)
// in order, we have an equivalent operation to lock_pipe().
{ mutex_lock(&p->begin_lock);
s = make_snapshot(p);
bytes_needed = bytes_in_use(s) + new_bytes;
size_t elems_needed = bytes_needed / elem_size;
if(likely(bytes_needed > cap))
s = resize_buffer(p, next_pow2(elems_needed+1)*elem_size);
}
// Unlock the pipe if requested.
mutex_unlock(&p->begin_lock);
}
return s;
}
// Runs the actual push, assuming there is enough room in the buffer.
//
// Returns the new 'end' pointer.
static inline char* process_push(snapshot_t s,
const void* restrict elems,
size_t bytes_to_copy
)
{
assertume(bytes_to_copy != 0);
// This shouldn't be necessary.
//s.end = wrap_ptr_if_necessary(s.buffer, s.end, s.bufend);
assertume(s.end != s.bufend);
// If we currently have a nowrap buffer, we may have to wrap the new
// elements. Copy as many as we can at the end, then start copying into the
// beginning. This basically reduces the problem to only deal with wrapped
// buffers, which can be dealt with using a single offset_memcpy.
if(!wraps_around(s))
{
size_t at_end = min(bytes_to_copy, (size_t)(s.bufend - s.end));
s.end = offset_memcpy(s.end, elems, at_end);
elems = (const char*)elems + at_end;
bytes_to_copy -= at_end;
}
// Now copy any remaining data...
if(unlikely(bytes_to_copy))
{
s.end = wrap_ptr_if_necessary(s.buffer, s.end, s.bufend);
s.end = offset_memcpy(s.end, elems, bytes_to_copy);
}
s.end = wrap_ptr_if_necessary(s.buffer, s.end, s.bufend);
// ...and update the end pointer!
return s.end;
}
// Will spin until there is enough room in the buffer to push any elements.
// Returns the number of elements currently in the buffer. `end_lock` should be
// locked on entrance to this function.
static inline snapshot_t wait_for_room(pipe_t* p, size_t* max_cap)
{
snapshot_t s = make_snapshot(p);
size_t bytes_used = bytes_in_use(s);
size_t consumer_refcount = p->consumer_refcount;
*max_cap = p->max_cap;
for(; unlikely(bytes_used == *max_cap) && likely(consumer_refcount > 0);
s = make_snapshot(p),
bytes_used = bytes_in_use(s),
consumer_refcount = p->consumer_refcount,
*max_cap = p->max_cap)
cond_wait(&p->just_popped, &p->end_lock);
return s;
}
void __pipe_push(pipe_t* p,
const void* restrict elems,
size_t count)
{
size_t elem_size = __pipe_elem_size(p);
if(unlikely(count == 0))
return;
size_t pushed = 0;
{ mutex_lock(&p->end_lock);
size_t max_cap;
snapshot_t s = wait_for_room(p, &max_cap);
// if no more consumers...
if(unlikely(p->consumer_refcount == 0))
{
mutex_unlock(&p->end_lock);
return;
}
s = validate_size(p, s, count);
// Finally, we can now begin with pushing as many elements into the
// queue as possible.
p->end = process_push(s, elems,
pushed = min(count, max_cap - bytes_in_use(s)));
} mutex_unlock(&p->end_lock);
assertume(pushed > 0);
// Signal if we've only pushed one element, broadcast if we've pushed more.
if(unlikely(pushed == elem_size))
cond_signal(&p->just_pushed);
else
cond_broadcast(&p->just_pushed);
// We might not be done pushing. If the max_cap was reached, we'll need to
// recurse.
size_t bytes_remaining = count - pushed;
if(unlikely(bytes_remaining))
__pipe_push(p, (const char*)elems + pushed, bytes_remaining);
}
void pipe_push(pipe_producer_t* p, const void* restrict elems, size_t count)
{
pipe_t* p0 = PIPIFY(p);
count *= __pipe_elem_size(p0);
__pipe_push(p0, elems, count);
}
/*
#ifdef PIPE_DEBUG
// For testing/debugging only, and is only available in debug mode. Assuming a
// pipe of ints, prints them out. This function is not included or documented
// in the header file!
#include <stdio.h>
void pipe_debug(pipe_t* p, const char* id)
{
printf("%s: [ ", id);
for(int* ptr = (int*)p->buffer; ptr != (int*)p->bufend; ++ptr)
printf("%i ", *ptr);
printf("]\n");
printf("begin: %lu end: %lu\n", p->begin - p->buffer, p->end - p->buffer);
}
#endif
*/
// Waits for at least one element to be in the pipe. p->begin_lock must be
// locked when entering this function, and a new, valid snapshot is returned.
static inline snapshot_t wait_for_elements(pipe_t* p)
{
snapshot_t s = make_snapshot(p);
size_t bytes_used = bytes_in_use(s);
for(; unlikely(bytes_used == 0) && likely(p->producer_refcount > 0);
s = make_snapshot(p),
bytes_used = bytes_in_use(s))
cond_wait(&p->just_pushed, &p->begin_lock);
return s;
}
// wow, I didn't even intend for the name to work like that...
// returns a new snapshot, with the updated changes also reflected onto the
// pipe.
static inline snapshot_t pop_without_locking(snapshot_t s,
void* restrict target,
size_t bytes_to_copy,
char** begin // [out]
)
{
assertume(s.begin != s.bufend);
size_t elem_size = s.elem_size;
// Copy either as many bytes as requested, or the available bytes in the RHS
// of a wrapped buffer - whichever is smaller.
{
size_t first_bytes_to_copy = min(bytes_to_copy, (size_t)(s.bufend - s.begin - elem_size));
target = offset_memcpy(target, s.begin + elem_size, first_bytes_to_copy);
bytes_to_copy -= first_bytes_to_copy;
s.begin += first_bytes_to_copy;
s.begin = wrap_ptr_if_necessary(s.buffer, s.begin, s.bufend);
}
if(unlikely(bytes_to_copy > 0))
{
s.begin += elem_size;
s.begin = wrap_ptr_if_necessary(s.buffer, s.begin, s.bufend);
memcpy(target, s.begin, bytes_to_copy);
s.begin += bytes_to_copy;
s.begin = wrap_ptr_if_necessary(s.buffer, s.begin, s.bufend);
s.begin -= elem_size;
s.begin = rev_wrap_ptr_if_necessary(s.buffer, s.begin, s.bufend);
}
// Since we cached begin on the stack, we need to reflect our changes back
// on the pipe.
*begin = s.begin;
return s;
}
// If the buffer is shrunk to something a lot smaller than our current
// capacity, resize it to something sane. This function must be entered with
// only p->begin_lock locked, and will automatically unlock p->begin_lock on
// exit.
static inline void trim_buffer(pipe_t* p, snapshot_t s)
{
size_t cap = capacity(s);
// We have a sane size. We're done here.
if(likely(bytes_in_use(s) > cap / 4))
{
mutex_unlock(&p->begin_lock);
return;
}
// Okay, we need to resize now. Upgrade our lock so we can check again. The
// weird lock/unlock order is to make sure we always acquire the end_lock
// before begin_lock. Deadlock can arise otherwise.
mutex_unlock(&p->begin_lock);
mutex_lock(&p->end_lock);
mutex_lock(&p->begin_lock);
s = make_snapshot(p);
cap = capacity(s);
// To conserve space like the good computizens we are, we'll shrink
// our buffer if our memory usage efficiency drops below 25%. However,
// since shrinking/growing the buffer is the most expensive part of a push
// or pop, we only shrink it to bring us up to a 50% efficiency. A common
// pipe usage pattern is sudden bursts of pushes and pops. This ensures it
// doesn't get too time-inefficient.
if(likely(bytes_in_use(s) <= cap / 4))
resize_buffer(p, cap / 2);
// All done. Unlock the pipe. The reason we don't let the calling function
// unlock begin_lock is so that we can do it BEFORE end_lock. This prevents
// the lock order from being fudged.
mutex_unlock(&p->begin_lock);
mutex_unlock(&p->end_lock);
}
// Performs the actual pop, except `requested' is now in bytes as opposed to
// elements.
//
// This will behave eagerly, returning as many elements that it can into
// `target' as it can fill right now.
static inline size_t __pipe_pop(pipe_t* p,
void* restrict target,
size_t requested)
{
if(unlikely(requested == 0))
return 0;
size_t popped = 0;
{ mutex_lock(&p->begin_lock);
snapshot_t s = wait_for_elements(p);
size_t bytes_used = bytes_in_use(s);
if(unlikely(bytes_used == 0))
{
mutex_unlock(&p->begin_lock);
return 0;
}
check_invariants(p);
s = pop_without_locking(s, target,
popped = min(requested, bytes_used),
&p->begin
);
check_invariants(p);
trim_buffer(p, s);
} // p->begin_lock was unlocked by trim_buffer.
assertume(popped);
if(unlikely(popped == __pipe_elem_size(p)))
cond_signal(&p->just_popped);
else
cond_broadcast(&p->just_popped);
return popped;
}
size_t pipe_pop(pipe_consumer_t* p, void* target, size_t count)
{
size_t elem_size = __pipe_elem_size(PIPIFY(p));
size_t bytes_left = count*elem_size;
size_t bytes_popped = 0;
size_t ret = -1;
do {
ret = __pipe_pop(PIPIFY(p), target, bytes_left);
target = (void*)((char*)target + ret);
bytes_popped += ret;
bytes_left -= ret;
} while(ret != 0 && bytes_left);
return bytes_popped / elem_size;
}
size_t pipe_pop_eager(pipe_consumer_t* p, void* target, size_t count)
{
size_t elem_size = __pipe_elem_size(PIPIFY(p));
return __pipe_pop(PIPIFY(p), target, count*elem_size) / elem_size;
}
void pipe_reserve(pipe_generic_t* gen, size_t count)
{
pipe_t* p = PIPIFY(gen);
count *= __pipe_elem_size(p); // now `count' is in "bytes" instead of "elements".
if(count == 0)
count = DEFAULT_MINCAP;
size_t max_cap = p->max_cap;
WHILE_LOCKED(
if(unlikely(count <= bytes_in_use(make_snapshot(p))))
break;
p->min_cap = min(count, max_cap);
resize_buffer(p, count);
);
}
/* vim: set et ts=4 sw=4 softtabstop=4 textwidth=80: */