Skip to content

Commit

Permalink
Add a nonblocking bounded buffer.
Browse files Browse the repository at this point in the history
  • Loading branch information
rescrv committed Mar 14, 2012
1 parent 9abe859 commit 419dcac
Show file tree
Hide file tree
Showing 5 changed files with 424 additions and 2 deletions.
4 changes: 3 additions & 1 deletion benchmarks/Makefile.am
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
AM_CPPFLAGS = -I$(abs_top_srcdir)/include $(PO6_CFLAGS)
noinst_PROGRAMS = lockfree_fifo lockfree_hash_map lockfree_hash_set locking_iterable_fifo
noinst_PROGRAMS = lockfree_fifo lockfree_hash_map lockfree_hash_set locking_iterable_fifo nonblocking_bounded_fifo

lockfree_fifo_SOURCES = lockfree_fifo.cc
lockfree_fifo_LDADD = -lpthread
Expand All @@ -9,3 +9,5 @@ lockfree_hash_set_SOURCES = lockfree_hash_set.cc
lockfree_hash_set_LDADD = -lpthread
locking_iterable_fifo_SOURCES = locking_iterable_fifo.cc
locking_iterable_fifo_LDADD = -lpthread -lrt
nonblocking_bounded_fifo_SOURCES = nonblocking_bounded_fifo.cc
nonblocking_bounded_fifo_LDADD = -lpthread
147 changes: 147 additions & 0 deletions benchmarks/nonblocking_bounded_fifo.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
// Copyright (c) 2012, Robert Escriva
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions
// are met:
//
// * Redistributions of source code must retain the above copyright
// notice, this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above
// copyright notice, this list of conditions and the following
// disclaimer in the documentation and/or other materials provided
// with the distribution.
// * Neither the name of this project nor the names of its
// contributors may be used to endorse or promote products derived
// from this software without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

// C includes
#include <stdint.h>

// C++
#include <iostream>

// STL
#include <tr1/memory>
#include <vector>

// po6 includes
#include <po6/threads/mutex.h>
#include <po6/threads/thread.h>

// e includes
#include <e/convert.h>
#include <e/nonblocking_bounded_fifo.h>

static uint64_t ops;
static uint64_t workunit;
static uint64_t done;
static std::auto_ptr<e::nonblocking_bounded_fifo<uint64_t> > fifo;

void
usage();

void
worker_thread();

int
main(int argc, char* argv[])
{
if (argc != 4)
{
usage();
return EXIT_FAILURE;
}

uint16_t threads;

try
{
threads = e::convert::to_uint16_t(argv[1]);
ops = e::convert::to_uint64_t(argv[2]);
workunit = e::convert::to_uint64_t(argv[3]);
done = 0;
}
catch (std::domain_error& e)
{
usage();
std::cerr << "All parameters must be numeric in nature.";
return EXIT_FAILURE;
}
catch (std::out_of_range& e)
{
usage();
std::cerr << "All parameters must be suitably small.";
return EXIT_FAILURE;
}

fifo.reset(new e::nonblocking_bounded_fifo<uint64_t>(2));

std::cout << "benchmark: " << threads << " threads will perform "
<< ops << " push/pop operations on the fifo."
<< std::endl;

std::vector<std::tr1::shared_ptr<po6::threads::thread> > workers;

for (uint16_t i = 0; i < threads; ++i)
{
std::tr1::shared_ptr<po6::threads::thread> t(new po6::threads::thread(worker_thread));
workers.push_back(t);
t->start();
}

for (uint16_t i = 0; i < threads; ++i)
{
workers[i]->join();
}

return EXIT_SUCCESS;
}

void
usage()
{
std::cerr << "Usage: benchmark "
<< "<threads> "
<< "<ops> "
<< "<workunit> "
<< std::endl;
exit(EXIT_FAILURE);
}

void
worker_thread()
{
uint64_t lower = __sync_fetch_and_add(&done, workunit);
uint64_t upper = lower + workunit;

while (lower < ops)
{
upper = std::min(upper, ops);

for (; lower < upper; ++lower)
{
uint64_t val;

bool pushed = fifo->push(lower);
assert(pushed);
bool popped = fifo->pop(&val);
assert(popped);
}

lower = __sync_fetch_and_add(&done, workunit);
upper = lower + workunit;
}
}
201 changes: 201 additions & 0 deletions include/e/nonblocking_bounded_fifo.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,201 @@
// Copyright (c) 2012, Robert Escriva
// All rights reserved.
//
// Redistribution and use in source and binary forms, with or without
// modification, are permitted provided that the following conditions are met:
//
// * Redistributions of source code must retain the above copyright notice,
// this list of conditions and the following disclaimer.
// * Redistributions in binary form must reproduce the above copyright
// notice, this list of conditions and the following disclaimer in the
// documentation and/or other materials provided with the distribution.
// * Neither the name of this project nor the names of its contributors may
// be used to endorse or promote products derived from this software
// without specific prior written permission.
//
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
// AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
// ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
// LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
// CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
// SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
// INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
// CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
// ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
// POSSIBILITY OF SUCH DAMAGE.

#ifndef e_nonblocking_bounded_fifo_h_
#define e_nonblocking_bounded_fifo_h_

// C
#include <cassert>

// e
#include <e/atomic.h>

namespace e
{

// A non-blocking bounded buffer allowing an arbitrary number of producers and
// consumers.
//
// This is not linearizable because a "push" may fail because the queue is full,
// yet the last item is not poppable yet. It does, however, avoid spurious
// failures due to memory visibility issues. If the object has been pushed, it
// can be popped.

template <typename T>
class nonblocking_bounded_fifo
{
public:
nonblocking_bounded_fifo(size_t sz);
~nonblocking_bounded_fifo() throw ();

public:
bool push(const T& data);
bool pop(T* data);

private:
class element;

private:
nonblocking_bounded_fifo(const nonblocking_bounded_fifo&);

private:
nonblocking_bounded_fifo& operator = (const nonblocking_bounded_fifo&);

private:
const size_t m_sz;
element* m_elems;
uint64_t m_push;
uint64_t m_pop;
};

template <typename T>
nonblocking_bounded_fifo<T> :: nonblocking_bounded_fifo(size_t sz)
: m_sz(sz)
, m_elems(NULL)
, m_push(0)
, m_pop(0)
{
using namespace e::atomic;
assert(m_sz >= 2);
assert((m_sz & (m_sz - 1)) == 0);
element* elems = new element[m_sz];

for (size_t i = 0; i < m_sz; ++i)
{
store_64_nobarrier(&elems[i].count, i);
}

store_64_nobarrier(&m_push, 0);
store_64_nobarrier(&m_pop, 0);
store_ptr_release(&m_elems, elems);
}

template <typename T>
nonblocking_bounded_fifo<T> :: ~nonblocking_bounded_fifo() throw ()
{
element* elems = e::atomic::load_ptr_acquire(&m_elems);
delete[] elems;
}

template <typename T>
bool
nonblocking_bounded_fifo<T> :: push(const T& t)
{
using namespace e::atomic;
element* elems = load_ptr_acquire(&m_elems);
element* elem;
uint64_t po;
uint64_t pu;
uint64_t count;

while (true)
{
pu = load_64_nobarrier(&m_push);
elem = elems + (pu & (m_sz - 1));
count = load_64_acquire(&elem->count);
po = load_64_nobarrier(&m_pop);

// If the next in the sequence agrees with the elements count, then the
// element must be empty or being updated.
if (count == pu)
{
if (compare_and_swap_64_nobarrier(&m_push, pu, pu + 1) == pu)
{
break;
}
}
// Otherwise we know that the m_push sequence number has wrapped
// around. If that is the case, the only stable explanation is that the
// item is unpopped. Because we read "po" after the acquire barrier on
// &elem->count, we know that it is a value that is recent enough to
// have existed concurrently with this operation. Therefore, we take
// that as the linearization point that things fail.
else if (count == pu - m_sz + 1 && pu == po + m_sz)
{
return false;
}
}

elem->t = t;
store_64_release(&elem->count, pu + 1);
return true;
}

template <typename T>
bool
nonblocking_bounded_fifo<T> :: pop(T* t)
{
using namespace e::atomic;
element* elems = load_ptr_acquire(&m_elems);
element* elem;
uint64_t po;
uint64_t pu;
uint64_t count;

while (true)
{
po = load_64_nobarrier(&m_pop);
elem = elems + (po & (m_sz - 1));
count = load_64_acquire(&elem->count);
pu = load_64_nobarrier(&m_push);

// If the next in the sequence is off by one from the element's count, then the
// element must be valid or in the process of removal
if (count == po + 1)
{
if (compare_and_swap_64_nobarrier(&m_pop, po, po + 1) == po)
{
break;
}
}
// Otherwise we know that there is nothing to pop. Because we read "pu"
// after the acquire barrier on &elem->count, we know that it is a value
// that is recent enough to have existed concurrently with this
// operation. Therefore, we take that as the linearization point that
// things fail because the queue is empty.
else if (count == po && pu == po)
{
return false;
}
}

*t = elem->t;
elem->t = T();
store_64_release(&elem->count, po + m_sz);
return true;
}

template <typename T>
struct nonblocking_bounded_fifo<T>::element
{
uint64_t count;
T t;
};

} // namespace e

#endif // e_nonblocking_bounded_fifo_h_
4 changes: 3 additions & 1 deletion test/Makefile.am
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ AM_LDFLAGS = $(GTEST_LDFLAGS)

TESTS = $(check_PROGRAMS)
check_PROGRAMS = bitfield bitsteal buffer convert guard intrusive_ptr \
locking_iterable_fifo profiler
locking_iterable_fifo nonblocking_bounded_fifo profiler
bitfield_SOURCES = runner.cc bitfield.cc
bitfield_LDADD = ../src/libe.la ${GTEST_LIBS}
bitsteal_SOURCES = runner.cc bitsteal.cc
Expand All @@ -19,6 +19,8 @@ intrusive_ptr_SOURCES = runner.cc intrusive_ptr.cc
intrusive_ptr_LDADD = $(GTEST_LIBS)
locking_iterable_fifo_SOURCES = runner.cc locking_iterable_fifo.cc
locking_iterable_fifo_LDADD = $(GTEST_LIBS)
nonblocking_bounded_fifo_SOURCES = runner.cc nonblocking_bounded_fifo.cc
nonblocking_bounded_fifo_LDADD = $(GTEST_LIBS)
profiler_SOURCES = runner.cc profiler.cc
profiler_LDADD = -lrt $(GTEST_LIBS)
endif
Loading

0 comments on commit 419dcac

Please sign in to comment.