Skip to content

Commit

Permalink
Cleaning up channel implementations
Browse files Browse the repository at this point in the history
- flyby: run performance tests for more elements
  • Loading branch information
hkaiser committed Nov 29, 2019
1 parent 8a960e7 commit f9296b2
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 31 deletions.
38 changes: 24 additions & 14 deletions hpx/lcos/local/channel_mpmc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ namespace hpx { namespace lcos { namespace local {
// This channel is bounded to a size given at construction time and supports
// multiple producers and multiple consumers. The data is stored in a
// ring-buffer.
template <typename T>
class channel_mpmc
template <typename T, typename Mutex = lcos::local::spinlock>
class base_channel_mpmc
{
private:
using mutex_type = hpx::lcos::local::spinlock;
using mutex_type = Mutex;

bool is_full(std::size_t tail) const noexcept
{
Expand Down Expand Up @@ -59,7 +59,7 @@ namespace hpx { namespace lcos { namespace local {
}

public:
explicit channel_mpmc(std::size_t size)
explicit base_channel_mpmc(std::size_t size)
: size_(size + 1)
, buffer_(new T[size + 1])
, closed_(false)
Expand All @@ -68,9 +68,9 @@ namespace hpx { namespace lcos { namespace local {
tail_.data_ = 0;
}

channel_mpmc(channel_mpmc&& rhs) noexcept
: head_(std::move(rhs.head_))
, tail_(std::move(rhs.tail_))
base_channel_mpmc(base_channel_mpmc&& rhs) noexcept
: head_(rhs.head_)
, tail_(rhs.tail_)
, size_(rhs.size_)
, buffer_(std::move(rhs.buffer_))
, closed_(rhs.closed_)
Expand All @@ -79,18 +79,18 @@ namespace hpx { namespace lcos { namespace local {
rhs.closed_ = true;
}

channel_mpmc& operator=(channel_mpmc&& rhs) noexcept
base_channel_mpmc& operator=(base_channel_mpmc&& rhs) noexcept
{
head_ = std::move(rhs.head_);
tail_ = std::move(rhs.tail_);
head_ = rhs.head_;
tail_ = rhs.tail_;
size_ = rhs.size_;
buffer_ = std::move(rhs.buffer_);
closed_ = rhs.closed_;
rhs.closed_ = true;
return *this;
}

~channel_mpmc()
~base_channel_mpmc()
{
std::unique_lock<mutex_type> l(mtx_.data_);
if (!closed_)
Expand All @@ -99,7 +99,7 @@ namespace hpx { namespace lcos { namespace local {
}
}

bool get(T* val = nullptr) const
bool get(T* val = nullptr) const noexcept
{
std::unique_lock<mutex_type> l(mtx_.data_);
if (closed_)
Expand Down Expand Up @@ -148,7 +148,7 @@ namespace hpx { namespace lcos { namespace local {
return true;
}

bool set(T&& t)
bool set(T&& t) noexcept
{
std::unique_lock<mutex_type> l(mtx_.data_);
if (closed_)
Expand Down Expand Up @@ -202,7 +202,7 @@ namespace hpx { namespace lcos { namespace local {
{
l.unlock();
HPX_THROW_EXCEPTION(hpx::invalid_status,
"hpx::lcos::local::channel_mpmc::close",
"hpx::lcos::local::base_channel_mpmc::close",
"attempting to close an already closed channel");
}

Expand All @@ -226,6 +226,16 @@ namespace hpx { namespace lcos { namespace local {
// this channel was closed, i.e. no further operations are possible
bool closed_;
};

////////////////////////////////////////////////////////////////////////////
// For use with HPX threads, the channel_mpmc defined here is the fastest
// (even faster than the channel_spsc). Using hpx::util::spinlock as the
// means of synchronization enables the use of this channel with non-HPX
// threads, however the performance degrades by a factor of ten compared to
// using hpx::lcos::local::spinlock.
template <typename T>
using channel_mpmc = base_channel_mpmc<T, hpx::lcos::local::spinlock>;

}}} // namespace hpx::lcos::local

#endif
17 changes: 2 additions & 15 deletions hpx/lcos/local/channel_spsc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,10 @@
#include <hpx/config.hpp>
#include <hpx/concurrency.hpp>
#include <hpx/errors.hpp>
#include <hpx/lcos/local/spinlock.hpp>
#include <hpx/thread_support.hpp>

#include <atomic>
#include <cstddef>
#include <memory>
#include <mutex>
#include <utility>

namespace hpx { namespace lcos { namespace local {
Expand Down Expand Up @@ -75,32 +72,22 @@ namespace hpx { namespace lcos { namespace local {
{
head_.data_.store(rhs.head_.data_.load(std::memory_order_acquire),
std::memory_order_relaxed);
rhs.head_.data_.store(0, std::memory_order_release);

tail_.data_.store(rhs.tail_.data_.load(std::memory_order_acquire),
std::memory_order_relaxed);
rhs.tail_.data_.store(0, std::memory_order_release);

closed_.store(rhs.closed_.load(std::memory_order_acquire),
std::memory_order_relaxed);
rhs.closed_.store(true, std::memory_order_release);

rhs.size_ = 0;
}

channel_spsc& operator=(channel_spsc&& rhs) noexcept
{
head_.data_.store(rhs.head_.data_.load(std::memory_order_acquire),
std::memory_order_relaxed);
rhs.head_.data_.store(0, std::memory_order_release);

tail_.data_.store(rhs.tail_.data_.load(std::memory_order_acquire),
std::memory_order_relaxed);
rhs.tail_.data_.store(0, std::memory_order_release);

size_ = rhs.size_;
rhs.size_ = 0;

buffer_ = std::move(rhs.buffer_);

closed_.store(rhs.closed_.load(std::memory_order_acquire),
Expand All @@ -118,7 +105,7 @@ namespace hpx { namespace lcos { namespace local {
}
}

bool get(T* val = nullptr) const
bool get(T* val = nullptr) const noexcept
{
if (closed_.load(std::memory_order_relaxed))
{
Expand Down Expand Up @@ -166,7 +153,7 @@ namespace hpx { namespace lcos { namespace local {
return true;
}

bool set(T&& t)
bool set(T&& t) noexcept
{
if (closed_.load(std::memory_order_relaxed))
{
Expand Down
2 changes: 1 addition & 1 deletion tests/performance/local/channel_mpmc_throughput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ struct data
int data_[8];
};

constexpr int NUM_TESTS = 10000000;
constexpr int NUM_TESTS = 100000000;

///////////////////////////////////////////////////////////////////////////////
inline data channel_get(hpx::lcos::local::channel_mpmc<data> const& c)
Expand Down
2 changes: 1 addition & 1 deletion tests/performance/local/channel_spsc_throughput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ struct data
int data_[8];
};

constexpr int NUM_TESTS = 10000000;
constexpr int NUM_TESTS = 100000000;

///////////////////////////////////////////////////////////////////////////////
inline data channel_get(hpx::lcos::local::channel_spsc<data> const& c)
Expand Down

0 comments on commit f9296b2

Please sign in to comment.