Skip to content

Commit

Permalink
Removing unbuffered option for channels
Browse files Browse the repository at this point in the history
  • Loading branch information
hkaiser committed Nov 29, 2019
1 parent f9296b2 commit 575cc95
Show file tree
Hide file tree
Showing 4 changed files with 63 additions and 143 deletions.
98 changes: 29 additions & 69 deletions hpx/lcos/local/channel_mpmc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
#define HPX_LCOS_LOCAL_CHANNEL_MPMC_NOV_24_2019_1141AM

#include <hpx/config.hpp>
#include <hpx/assertion.hpp>
#include <hpx/concurrency.hpp>
#include <hpx/errors.hpp>
#include <hpx/lcos/local/spinlock.hpp>
Expand All @@ -27,7 +28,7 @@ namespace hpx { namespace lcos { namespace local {
// This channel is bounded to a size given at construction time and supports
// multiple producers and multiple consumers. The data is stored in a
// ring-buffer.
template <typename T, typename Mutex = lcos::local::spinlock>
template <typename T, typename Mutex = util::spinlock>
class base_channel_mpmc
{
private:
Expand All @@ -48,22 +49,14 @@ namespace hpx { namespace lcos { namespace local {
return head == tail_.data_;
}

bool is_full_unbuffered() const noexcept
{
return head_.data_ != 0;
}

bool is_empty_unbuffered() const noexcept
{
return head_.data_ == 0;
}

public:
explicit base_channel_mpmc(std::size_t size)
: size_(size + 1)
, buffer_(new T[size + 1])
, closed_(false)
{
HPX_ASSERT(size != 0);

head_.data_ = 0;
tail_.data_ = 0;
}
Expand Down Expand Up @@ -107,44 +100,25 @@ namespace hpx { namespace lcos { namespace local {
return false;
}

if (size_ <= 1)
std::size_t head = head_.data_;

if (is_empty(head))
{
return false;
}

if (val == nullptr)
{
// unbuffered operation
if (is_empty_unbuffered())
{
return false;
}

if (val == nullptr)
{
return true;
}

*val = std::move(buffer_[0]);
head_.data_ = 0;
return true;
}
else

*val = std::move(buffer_[head]);
if (++head >= size_)
{
// buffered operation
std::size_t head = head_.data_;

if (is_empty(head))
{
return false;
}

if (val == nullptr)
{
return true;
}

*val = std::move(buffer_[head]);
if (++head >= size_)
{
head = 0;
}
head_.data_ = head;
head = 0;
}
head_.data_ = head;

return true;
}

Expand All @@ -156,34 +130,20 @@ namespace hpx { namespace lcos { namespace local {
return false;
}

if (size_ <= 1)
std::size_t tail = tail_.data_;

if (is_full(tail))
{
// unbuffered operation
if (is_full_unbuffered())
{
return false;
}

buffer_[0] = std::move(t);
head_.data_ = 1;
return false;
}
else

buffer_[tail] = std::move(t);
if (++tail >= size_)
{
// buffered operation
std::size_t tail = tail_.data_;

if (is_full(tail))
{
return false;
}

buffer_[tail] = std::move(t);
if (++tail >= size_)
{
tail = 0;
}
tail_.data_ = tail;
tail = 0;
}
tail_.data_ = tail;

return true;
}

Expand Down
100 changes: 30 additions & 70 deletions hpx/lcos/local/channel_spsc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@

// This work is inspired by https://github.com/aprell/tasking-2.0

#if !defined(HPX_LCOS_LOCAL_CHANNEL_spsc_NOV_24_2019_1141AM)
#define HPX_LCOS_LOCAL_CHANNEL_spsc_NOV_24_2019_1141AM
#if !defined(HPX_LCOS_LOCAL_CHANNEL_SPSC_NOV_24_2019_1141AM)
#define HPX_LCOS_LOCAL_CHANNEL_SPSC_NOV_24_2019_1141AM

#include <hpx/config.hpp>
#include <hpx/assertion.hpp>
#include <hpx/concurrency.hpp>
#include <hpx/errors.hpp>

Expand Down Expand Up @@ -46,22 +47,14 @@ namespace hpx { namespace lcos { namespace local {
return head == tail_.data_.load(std::memory_order_acquire);
}

bool is_full_unbuffered() const noexcept
{
return head_.data_.load(std::memory_order_relaxed) != 0;
}

bool is_empty_unbuffered() const noexcept
{
return head_.data_.load(std::memory_order_acquire) == 0;
}

public:
explicit channel_spsc(std::size_t size)
: size_(size + 1)
, buffer_(new T[size + 1])
, closed_(false)
{
HPX_ASSERT(size != 0);

head_.data_.store(0, std::memory_order_relaxed);
tail_.data_.store(0, std::memory_order_relaxed);
}
Expand Down Expand Up @@ -112,44 +105,25 @@ namespace hpx { namespace lcos { namespace local {
return false;
}

if (size_ <= 1)
std::size_t head = head_.data_.load(std::memory_order_relaxed);

if (is_empty(head))
{
return false;
}

if (val == nullptr)
{
// unbuffered operation
if (is_empty_unbuffered())
{
return false;
}

if (val == nullptr)
{
return true;
}

*val = std::move(buffer_[0]);
head_.data_.store(0, std::memory_order_release);
return true;
}
else

*val = std::move(buffer_[head]);
if (++head >= size_)
{
// buffered operation
std::size_t head = head_.data_.load(std::memory_order_relaxed);

if (is_empty(head))
{
return false;
}

if (val == nullptr)
{
return true;
}

*val = std::move(buffer_[head]);
if (++head >= size_)
{
head = 0;
}
head_.data_.store(head, std::memory_order_release);
head = 0;
}
head_.data_.store(head, std::memory_order_release);

return true;
}

Expand All @@ -160,34 +134,20 @@ namespace hpx { namespace lcos { namespace local {
return false;
}

if (size_ <= 1)
std::size_t tail = tail_.data_.load(std::memory_order_relaxed);

if (is_full(tail))
{
// unbuffered operation
if (is_full_unbuffered())
{
return false;
}

buffer_[0] = std::move(t);
head_.data_.store(1, std::memory_order_release);
return false;
}
else

buffer_[tail] = std::move(t);
if (++tail >= size_)
{
// buffered operation
std::size_t tail = tail_.data_.load(std::memory_order_relaxed);

if (is_full(tail))
{
return false;
}

buffer_[tail] = std::move(t);
if (++tail >= size_)
{
tail = 0;
}
tail_.data_.store(tail, std::memory_order_release);
tail = 0;
}
tail_.data_.store(tail, std::memory_order_release);

return true;
}

Expand Down
4 changes: 2 additions & 2 deletions tests/unit/lcos/channel_mpmc_fib.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ void consume_numbers(int n, hpx::lcos::local::channel_mpmc<bool>& c1,
///////////////////////////////////////////////////////////////////////////////
int main(int argc, char* argv[])
{
hpx::lcos::local::channel_mpmc<bool> c1(0);
hpx::lcos::local::channel_mpmc<int> c2(0);
hpx::lcos::local::channel_mpmc<bool> c1(1);
hpx::lcos::local::channel_mpmc<int> c2(1);
hpx::lcos::local::channel_mpmc<int> c3(5);

hpx::future<void> producer =
Expand Down
4 changes: 2 additions & 2 deletions tests/unit/lcos/channel_spsc_fib.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,8 @@ void consume_numbers(int n, hpx::lcos::local::channel_spsc<bool>& c1,
///////////////////////////////////////////////////////////////////////////////
int main(int argc, char* argv[])
{
hpx::lcos::local::channel_spsc<bool> c1(0);
hpx::lcos::local::channel_spsc<int> c2(0);
hpx::lcos::local::channel_spsc<bool> c1(1);
hpx::lcos::local::channel_spsc<int> c2(1);
hpx::lcos::local::channel_spsc<int> c3(5);

hpx::future<void> producer =
Expand Down

0 comments on commit 575cc95

Please sign in to comment.