Skip to content

Commit

Permalink
Adding SPSC channel and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
hkaiser committed Nov 29, 2019
1 parent fed10b0 commit 8a960e7
Show file tree
Hide file tree
Showing 9 changed files with 557 additions and 33 deletions.
48 changes: 20 additions & 28 deletions hpx/lcos/local/channel_mpmc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,34 +33,29 @@ namespace hpx { namespace lcos { namespace local {
private:
using mutex_type = hpx::lcos::local::spinlock;

std::size_t num_items() const noexcept
bool is_full(std::size_t tail) const noexcept
{
std::size_t numitems = size_ + tail_.data_ - head_.data_;
std::size_t numitems = size_ + tail - head_.data_;
if (numitems < size_)
{
return numitems;
return numitems == size_ - 1;
}
return numitems - size_;
return numitems - size_ == size_ - 1;
}

bool is_full() const noexcept
bool is_empty(std::size_t head) const noexcept
{
return num_items() == size_ - 1;
return head == tail_.data_;
}

bool is_full_unbuffered() const noexcept
{
return head_.data_ != 0;
}

bool is_empty() const noexcept
{
return head_.data_ == tail_.data_;
}

bool is_empty_unbuffered() const noexcept
{
return !is_full_unbuffered();
return head_.data_ == 0;
}

public:
Expand Down Expand Up @@ -131,7 +126,9 @@ namespace hpx { namespace lcos { namespace local {
else
{
// buffered operation
if (is_empty())
std::size_t head = head_.data_;

if (is_empty(head))
{
return false;
}
Expand All @@ -141,16 +138,12 @@ namespace hpx { namespace lcos { namespace local {
return true;
}

std::size_t head = head_.data_;
*val = std::move(buffer_[head]);
if (++head < size_)
if (++head >= size_)
{
head_.data_ = head;
}
else
{
head_.data_ = 0;
head = 0;
}
head_.data_ = head;
}
return true;
}
Expand All @@ -177,21 +170,19 @@ namespace hpx { namespace lcos { namespace local {
else
{
// buffered operation
if (is_full())
std::size_t tail = tail_.data_;

if (is_full(tail))
{
return false;
}

std::size_t tail = tail_.data_;
buffer_[tail] = std::move(t);
if (++tail < size_)
{
tail_.data_ = tail;
}
else
if (++tail >= size_)
{
tail_.data_ = 0;
tail = 0;
}
tail_.data_ = tail;
}
return true;
}
Expand Down Expand Up @@ -232,6 +223,7 @@ namespace hpx { namespace lcos { namespace local {
// channel buffer
std::unique_ptr<T[]> buffer_;

// this channel was closed, i.e. no further operations are possible
bool closed_;
};
}}} // namespace hpx::lcos::local
Expand Down
236 changes: 236 additions & 0 deletions hpx/lcos/local/channel_spsc.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
// Copyright (c) 2019 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

// This work is inspired by https://github.com/aprell/tasking-2.0

#if !defined(HPX_LCOS_LOCAL_CHANNEL_spsc_NOV_24_2019_1141AM)
#define HPX_LCOS_LOCAL_CHANNEL_spsc_NOV_24_2019_1141AM

#include <hpx/config.hpp>
#include <hpx/concurrency.hpp>
#include <hpx/errors.hpp>
#include <hpx/lcos/local/spinlock.hpp>
#include <hpx/thread_support.hpp>

#include <atomic>
#include <cstddef>
#include <memory>
#include <mutex>
#include <utility>

namespace hpx { namespace lcos { namespace local {

////////////////////////////////////////////////////////////////////////////
// A simple but very high performance implementation of the channel concept.
// This channel is bounded to a size given at construction time and supports
// a single producer and a single consumer. The data is stored in a
// ring-buffer.
template <typename T>
class channel_spsc
{
private:
bool is_full(std::size_t tail) const noexcept
{
std::size_t numitems =
size_ + tail - head_.data_.load(std::memory_order_acquire);

if (numitems < size_)
{
return numitems == size_ - 1;
}
return (numitems - size_ == size_ - 1);
}

bool is_empty(std::size_t head) const noexcept
{
return head == tail_.data_.load(std::memory_order_acquire);
}

bool is_full_unbuffered() const noexcept
{
return head_.data_.load(std::memory_order_relaxed) != 0;
}

bool is_empty_unbuffered() const noexcept
{
return head_.data_.load(std::memory_order_acquire) == 0;
}

public:
explicit channel_spsc(std::size_t size)
: size_(size + 1)
, buffer_(new T[size + 1])
, closed_(false)
{
head_.data_.store(0, std::memory_order_relaxed);
tail_.data_.store(0, std::memory_order_relaxed);
}

channel_spsc(channel_spsc&& rhs) noexcept
: size_(rhs.size_)
, buffer_(std::move(rhs.buffer_))
{
head_.data_.store(rhs.head_.data_.load(std::memory_order_acquire),
std::memory_order_relaxed);
rhs.head_.data_.store(0, std::memory_order_release);

tail_.data_.store(rhs.tail_.data_.load(std::memory_order_acquire),
std::memory_order_relaxed);
rhs.tail_.data_.store(0, std::memory_order_release);

closed_.store(rhs.closed_.load(std::memory_order_acquire),
std::memory_order_relaxed);
rhs.closed_.store(true, std::memory_order_release);

rhs.size_ = 0;
}

channel_spsc& operator=(channel_spsc&& rhs) noexcept
{
head_.data_.store(rhs.head_.data_.load(std::memory_order_acquire),
std::memory_order_relaxed);
rhs.head_.data_.store(0, std::memory_order_release);

tail_.data_.store(rhs.tail_.data_.load(std::memory_order_acquire),
std::memory_order_relaxed);
rhs.tail_.data_.store(0, std::memory_order_release);

size_ = rhs.size_;
rhs.size_ = 0;

buffer_ = std::move(rhs.buffer_);

closed_.store(rhs.closed_.load(std::memory_order_acquire),
std::memory_order_relaxed);
rhs.closed_.store(true, std::memory_order_release);

return *this;
}

~channel_spsc()
{
if (!closed_.load(std::memory_order_relaxed))
{
close();
}
}

bool get(T* val = nullptr) const
{
if (closed_.load(std::memory_order_relaxed))
{
return false;
}

if (size_ <= 1)
{
// unbuffered operation
if (is_empty_unbuffered())
{
return false;
}

if (val == nullptr)
{
return true;
}

*val = std::move(buffer_[0]);
head_.data_.store(0, std::memory_order_release);
}
else
{
// buffered operation
std::size_t head = head_.data_.load(std::memory_order_relaxed);

if (is_empty(head))
{
return false;
}

if (val == nullptr)
{
return true;
}

*val = std::move(buffer_[head]);
if (++head >= size_)
{
head = 0;
}
head_.data_.store(head, std::memory_order_release);
}
return true;
}

bool set(T&& t)
{
if (closed_.load(std::memory_order_relaxed))
{
return false;
}

if (size_ <= 1)
{
// unbuffered operation
if (is_full_unbuffered())
{
return false;
}

buffer_[0] = std::move(t);
head_.data_.store(1, std::memory_order_release);
}
else
{
// buffered operation
std::size_t tail = tail_.data_.load(std::memory_order_relaxed);

if (is_full(tail))
{
return false;
}

buffer_[tail] = std::move(t);
if (++tail >= size_)
{
tail = 0;
}
tail_.data_.store(tail, std::memory_order_release);
}
return true;
}

std::size_t close()
{
bool expected = false;
if (!closed_.compare_exchange_strong(expected, true))
{
HPX_THROW_EXCEPTION(hpx::invalid_status,
"hpx::lcos::local::channel_spsc::close",
"attempting to close an already closed channel");
}
return 0;
}

private:
// keep the mutex, the head, and the tail pointer in separate cache
// lines
mutable hpx::util::cache_aligned_data<std::atomic<std::size_t>> head_;
hpx::util::cache_aligned_data<std::atomic<std::size_t>> tail_;

// a channel of size n can buffer n-1 items
std::size_t size_;

// channel buffer
std::unique_ptr<T[]> buffer_;

// this channel was closed, i.e. no further operations are possible
std::atomic<bool> closed_;
};
}}} // namespace hpx::lcos::local

#endif
8 changes: 7 additions & 1 deletion tests/performance/local/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ set(benchmarks
agas_cache_timings
async_overheads
channel_mpmc_throughput
channel_spsc_throughput
delay_baseline
delay_baseline_threaded
hpx_homogeneous_timed_task_spawn_executors
Expand Down Expand Up @@ -146,7 +147,12 @@ set(benchmarks ${benchmarks}
)

set(channel_mpmc_throughput_FLAGS DEPENDENCIES
hpx_assertion hpx_concurrency hpx_errors hpx_timing hpx_thread_support)
hpx_concurrency hpx_errors hpx_timing hpx_thread_support)
set(channel_mpmc_throughput_PARAMETERS THREADS_PER_LOCALITY 2)
set(channel_spsc_throughput_FLAGS DEPENDENCIES
hpx_concurrency hpx_errors hpx_timing hpx_thread_support)
set(channel_spsc_throughputs_PARAMETERS THREADS_PER_LOCALITY 2)

set(foreach_scaling_FLAGS DEPENDENCIES iostreams_component hpx_timing)
set(function_object_wrapper_overhead_FLAGS DEPENDENCIES hpx_timing)
set(hpx_tls_overhead_FLAGS DEPENDENCIES hpx_timing)
Expand Down
6 changes: 4 additions & 2 deletions tests/performance/local/channel_mpmc_throughput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
#include <hpx/hpx.hpp>
#include <hpx/hpx_main.hpp>

#include <hpx/assertion.hpp>
#include <hpx/lcos/local/channel_mpmc.hpp>
#include <hpx/timing.hpp>

Expand Down Expand Up @@ -74,7 +73,10 @@ double thread_func_1(hpx::lcos::local::channel_mpmc<data>& c)
for (int i = 0; i != NUM_TESTS; ++i)
{
data d = channel_get(c);
HPX_ASSERT(d.data_[0] == i);
if (d.data_[0] != i)
{
std::cout << "Error!\n";
}
}

std::uint64_t end = hpx::util::high_resolution_clock::now();
Expand Down

0 comments on commit 8a960e7

Please sign in to comment.