Skip to content

Commit

Permalink
Adding channel_mcsp (multiple producer, single consumer)
Browse files Browse the repository at this point in the history
 - flyby adding proper buffer initialization etc.
  • Loading branch information
hkaiser committed Nov 29, 2019
1 parent 575cc95 commit d56f59b
Show file tree
Hide file tree
Showing 8 changed files with 562 additions and 10 deletions.
32 changes: 25 additions & 7 deletions hpx/lcos/local/channel_mpmc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ namespace hpx { namespace lcos { namespace local {
// multiple producers and multiple consumers. The data is stored in a
// ring-buffer.
template <typename T, typename Mutex = util::spinlock>
class base_channel_mpmc
class bounded_channel
{
private:
using mutex_type = Mutex;
Expand All @@ -50,18 +50,24 @@ namespace hpx { namespace lcos { namespace local {
}

public:
explicit base_channel_mpmc(std::size_t size)
explicit bounded_channel(std::size_t size)
: size_(size + 1)
, buffer_(new T[size + 1])
, closed_(false)
{
HPX_ASSERT(size != 0);

// invoke constructors for allocated buffer
for (std::size_t i = 0; i != size_; ++i)
{
new (&buffer_[i]) T();
}

head_.data_ = 0;
tail_.data_ = 0;
}

base_channel_mpmc(base_channel_mpmc&& rhs) noexcept
bounded_channel(bounded_channel&& rhs) noexcept
: head_(rhs.head_)
, tail_(rhs.tail_)
, size_(rhs.size_)
Expand All @@ -72,7 +78,7 @@ namespace hpx { namespace lcos { namespace local {
rhs.closed_ = true;
}

base_channel_mpmc& operator=(base_channel_mpmc&& rhs) noexcept
bounded_channel& operator=(bounded_channel&& rhs) noexcept
{
head_ = rhs.head_;
tail_ = rhs.tail_;
Expand All @@ -83,9 +89,16 @@ namespace hpx { namespace lcos { namespace local {
return *this;
}

~base_channel_mpmc()
~bounded_channel()
{
std::unique_lock<mutex_type> l(mtx_.data_);

// invoke destructors for allocated buffer
for (std::size_t i = 0; i != size_; ++i)
{
(&buffer_[i])->~T();
}

if (!closed_)
{
close(l);
Expand Down Expand Up @@ -153,6 +166,11 @@ namespace hpx { namespace lcos { namespace local {
return close(l);
}

std::size_t capacity() const
{
return size_ - 1;
}

protected:
std::size_t close(std::unique_lock<mutex_type>& l)
{
Expand All @@ -162,7 +180,7 @@ namespace hpx { namespace lcos { namespace local {
{
l.unlock();
HPX_THROW_EXCEPTION(hpx::invalid_status,
"hpx::lcos::local::base_channel_mpmc::close",
"hpx::lcos::local::bounded_channel::close",
"attempting to close an already closed channel");
}

Expand Down Expand Up @@ -194,7 +212,7 @@ namespace hpx { namespace lcos { namespace local {
// threads, however the performance degrades by a factor of ten compared to
// using hpx::lcos::local::spinlock.
template <typename T>
using channel_mpmc = base_channel_mpmc<T, hpx::lcos::local::spinlock>;
using channel_mpmc = bounded_channel<T, hpx::lcos::local::spinlock>;

}}} // namespace hpx::lcos::local

Expand Down
223 changes: 223 additions & 0 deletions hpx/lcos/local/channel_mpsc.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,223 @@
// Copyright (c) 2019 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

// This work is inspired by https://github.com/aprell/tasking-2.0

#if !defined(HPX_LCOS_LOCAL_CHANNEL_MPSC_NOV_27_2019_1021AM)
#define HPX_LCOS_LOCAL_CHANNEL_MPSC_NOV_27_2019_1021AM

#include <hpx/config.hpp>
#include <hpx/assertion.hpp>
#include <hpx/concurrency.hpp>
#include <hpx/errors.hpp>
#include <hpx/lcos/local/spinlock.hpp>
#include <hpx/thread_support.hpp>

#include <atomic>
#include <cstddef>
#include <memory>
#include <mutex>
#include <utility>

namespace hpx { namespace lcos { namespace local {

////////////////////////////////////////////////////////////////////////////
// A simple but very high performance implementation of the channel concept.
// This channel is bounded to a size given at construction time and supports
// a multiple producers and a single consumer. The data is stored in a
// ring-buffer.
template <typename T, typename Mutex = util::spinlock>
class base_channel_mpsc
{
private:
using mutex_type = Mutex;

bool is_full(std::size_t tail) const noexcept
{
std::size_t numitems =
size_ + tail - head_.data_.load(std::memory_order_relaxed);

if (numitems < size_)
{
return numitems == size_ - 1;
}
return (numitems - size_ == size_ - 1);
}

bool is_empty(std::size_t head) const noexcept
{
return head == tail_.data_.tail_.load(std::memory_order_relaxed);
}

public:
explicit base_channel_mpsc(std::size_t size)
: size_(size + 1)
, buffer_(new T[size + 1])
, closed_(false)
{
HPX_ASSERT(size != 0);

// invoke constructors for allocated buffer
for (std::size_t i = 0; i != size_; ++i)
{
new (&buffer_[i]) T();
}

head_.data_.store(0, std::memory_order_relaxed);
tail_.data_.tail_.store(0, std::memory_order_relaxed);
}

base_channel_mpsc(base_channel_mpsc&& rhs) noexcept
: size_(rhs.size_)
, buffer_(std::move(rhs.buffer_))
{
head_.data_.store(rhs.head_.data_.load(std::memory_order_acquire),
std::memory_order_relaxed);
tail_.data_.tail_.store(
rhs.tail_.data_.tail_.load(std::memory_order_acquire),
std::memory_order_relaxed);

closed_.store(rhs.closed_.load(std::memory_order_acquire),
std::memory_order_relaxed);
rhs.closed_.store(true, std::memory_order_release);
}

base_channel_mpsc& operator=(base_channel_mpsc&& rhs) noexcept
{
head_.data_.store(rhs.head_.data_.load(std::memory_order_acquire),
std::memory_order_relaxed);
tail_.data_.tail_.store(
rhs.tail_.data_.tail_.load(std::memory_order_acquire),
std::memory_order_relaxed);

size_ = rhs.size_;
buffer_ = std::move(rhs.buffer_);

closed_.store(rhs.closed_.load(std::memory_order_acquire),
std::memory_order_relaxed);
rhs.closed_.store(true, std::memory_order_release);

return *this;
}

~base_channel_mpsc()
{
// invoke destructors for allocated buffer
for (std::size_t i = 0; i != size_; ++i)
{
(&buffer_[i])->~T();
}

if (!closed_.load(std::memory_order_relaxed))
{
close();
}
}

bool get(T* val = nullptr) const noexcept
{
if (closed_.load(std::memory_order_relaxed))
{
return false;
}

std::size_t head = head_.data_.load(std::memory_order_relaxed);

if (is_empty(head))
{
return false;
}

if (val == nullptr)
{
return true;
}

*val = std::move(buffer_[head]);
if (++head >= size_)
{
head = 0;
}
head_.data_.store(head, std::memory_order_release);

return true;
}

bool set(T&& t) noexcept
{
if (closed_.load(std::memory_order_relaxed))
{
return false;
}

std::unique_lock<mutex_type> l(tail_.data_.mtx_);

std::size_t tail =
tail_.data_.tail_.load(std::memory_order_acquire);

if (is_full(tail))
{
return false;
}

buffer_[tail] = std::move(t);
if (++tail >= size_)
{
tail = 0;
}
tail_.data_.tail_.store(tail, std::memory_order_relaxed);

return true;
}

std::size_t close()
{
bool expected = false;
if (!closed_.compare_exchange_weak(expected, true))
{
HPX_THROW_EXCEPTION(hpx::invalid_status,
"hpx::lcos::local::base_channel_mpsc::close",
"attempting to close an already closed channel");
}
return 0;
}

std::size_t capacity() const
{
return size_ - 1;
}

private:
// keep the mutex with the tail and the head pointer in separate cache
// lines
struct tail_data
{
mutex_type mtx_;
std::atomic<std::size_t> tail_;
};

mutable hpx::util::cache_aligned_data<std::atomic<std::size_t>> head_;
hpx::util::cache_aligned_data<tail_data> tail_;

// a channel of size n can buffer n-1 items
std::size_t size_;

// channel buffer
std::unique_ptr<T[]> buffer_;

// this channel was closed, i.e. no further operations are possible
std::atomic<bool> closed_;
};

////////////////////////////////////////////////////////////////////////////
// Using hpx::util::spinlock as the means of synchronization enables the use
// of this channel with non-HPX threads.
template <typename T>
using channel_mpsc = base_channel_mpsc<T, hpx::lcos::local::spinlock>;

}}} // namespace hpx::lcos::local

#endif
19 changes: 18 additions & 1 deletion hpx/lcos/local/channel_spsc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ namespace hpx { namespace lcos { namespace local {
{
HPX_ASSERT(size != 0);

// invoke constructors for allocated buffer
for (std::size_t i = 0; i != size_; ++i)
{
new (&buffer_[i]) T();
}

head_.data_.store(0, std::memory_order_relaxed);
tail_.data_.store(0, std::memory_order_relaxed);
}
Expand Down Expand Up @@ -92,6 +98,12 @@ namespace hpx { namespace lcos { namespace local {

~channel_spsc()
{
// invoke destructors for allocated buffer
for (std::size_t i = 0; i != size_; ++i)
{
(&buffer_[i])->~T();
}

if (!closed_.load(std::memory_order_relaxed))
{
close();
Expand Down Expand Up @@ -154,7 +166,7 @@ namespace hpx { namespace lcos { namespace local {
std::size_t close()
{
bool expected = false;
if (!closed_.compare_exchange_strong(expected, true))
if (!closed_.compare_exchange_weak(expected, true))
{
HPX_THROW_EXCEPTION(hpx::invalid_status,
"hpx::lcos::local::channel_spsc::close",
Expand All @@ -163,6 +175,11 @@ namespace hpx { namespace lcos { namespace local {
return 0;
}

std::size_t capacity() const
{
return size_ - 1;
}

private:
// keep the mutex, the head, and the tail pointer in separate cache
// lines
Expand Down
8 changes: 6 additions & 2 deletions tests/performance/local/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ set(benchmarks
agas_cache_timings
async_overheads
channel_mpmc_throughput
channel_mpsc_throughput
channel_spsc_throughput
delay_baseline
delay_baseline_threaded
Expand Down Expand Up @@ -147,10 +148,13 @@ set(benchmarks ${benchmarks}
)

set(channel_mpmc_throughput_FLAGS DEPENDENCIES
hpx_concurrency hpx_errors hpx_timing hpx_thread_support)
hpx_assertion hpx_concurrency hpx_errors hpx_timing hpx_thread_support)
set(channel_mpmc_throughput_PARAMETERS THREADS_PER_LOCALITY 2)
set(channel_mpsc_throughput_FLAGS DEPENDENCIES
hpx_assertion hpx_concurrency hpx_errors hpx_timing hpx_thread_support)
set(channel_mpsc_throughput_PARAMETERS THREADS_PER_LOCALITY 2)
set(channel_spsc_throughput_FLAGS DEPENDENCIES
hpx_concurrency hpx_errors hpx_timing hpx_thread_support)
hpx_assertion hpx_concurrency hpx_errors hpx_timing hpx_thread_support)
set(channel_spsc_throughputs_PARAMETERS THREADS_PER_LOCALITY 2)

set(foreach_scaling_FLAGS DEPENDENCIES iostreams_component hpx_timing)
Expand Down

0 comments on commit d56f59b

Please sign in to comment.