Skip to content

Commit

Permalink
Adding tests
Browse files Browse the repository at this point in the history
- fixing unbuffered mode
  • Loading branch information
hkaiser committed Nov 29, 2019
1 parent 8fd3209 commit fed10b0
Show file tree
Hide file tree
Showing 5 changed files with 317 additions and 67 deletions.
146 changes: 95 additions & 51 deletions hpx/lcos/local/channel_mpmc.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <cstddef>
#include <memory>
#include <mutex>
#include <utility>

namespace hpx { namespace lcos { namespace local {

Expand Down Expand Up @@ -47,11 +48,21 @@ namespace hpx { namespace lcos { namespace local {
return num_items() == size_ - 1;
}

bool is_full_unbuffered() const noexcept
{
return head_.data_ != 0;
}

bool is_empty() const noexcept
{
return head_.data_ == tail_.data_;
}

bool is_empty_unbuffered() const noexcept
{
return !is_full_unbuffered();
}

public:
explicit channel_mpmc(std::size_t size)
: size_(size + 1)
Expand All @@ -62,6 +73,28 @@ namespace hpx { namespace lcos { namespace local {
tail_.data_ = 0;
}

channel_mpmc(channel_mpmc&& rhs) noexcept
: head_(std::move(rhs.head_))
, tail_(std::move(rhs.tail_))
, size_(rhs.size_)
, buffer_(std::move(rhs.buffer_))
, closed_(rhs.closed_)
{
rhs.size_ = 0;
rhs.closed_ = true;
}

channel_mpmc& operator=(channel_mpmc&& rhs) noexcept
{
head_ = std::move(rhs.head_);
tail_ = std::move(rhs.tail_);
size_ = rhs.size_;
buffer_ = std::move(rhs.buffer_);
closed_ = rhs.closed_;
rhs.closed_ = true;
return *this;
}

~channel_mpmc()
{
std::unique_lock<mutex_type> l(mtx_.data_);
Expand All @@ -71,83 +104,94 @@ namespace hpx { namespace lcos { namespace local {
}
}

T get() const
bool get(T* val = nullptr) const
{
std::unique_lock<mutex_type> l(mtx_.data_);
if (is_empty())
if (closed_)
{
return false;
}

if (size_ <= 1)
{
if (closed_)
// unbuffered operation
if (is_empty_unbuffered())
{
l.unlock();
HPX_THROW_EXCEPTION(hpx::invalid_status,
"hpx::lcos::local::channel_mpmc::get_sync",
"this channel is empty and was closed");
return false;
}
else

if (val == nullptr)
{
l.unlock();
HPX_THROW_EXCEPTION(hpx::invalid_status,
"hpx::lcos::local::channel_mpmc::get_sync",
"this channel is empty");
return true;
}
}

std::size_t head = head_.data_;
T result = std::move(buffer_[head]);
if (++head < size_)
{
head_.data_ = head;
*val = std::move(buffer_[0]);
head_.data_ = 0;
}
else
{
head_.data_ = head - size_;
}
return result;
}

bool try_get(T* val = nullptr) const
{
std::unique_lock<mutex_type> l(mtx_.data_);
if (is_empty())
{
return false;
}
// buffered operation
if (is_empty())
{
return false;
}

if (val == nullptr)
{
return true;
}
if (val == nullptr)
{
return true;
}

std::size_t head = head_.data_;
*val = std::move(buffer_[head]);
if (++head < size_)
{
head_.data_ = head;
}
else
{
head_.data_ = head - size_;
std::size_t head = head_.data_;
*val = std::move(buffer_[head]);
if (++head < size_)
{
head_.data_ = head;
}
else
{
head_.data_ = 0;
}
}
return true;
}

bool set(T&& t)
{
std::unique_lock<mutex_type> l(mtx_.data_);
if (closed_ || is_full())
if (closed_)
{
return false;
}

std::size_t tail = tail_.data_;
buffer_[tail] = std::move(t);
if (++tail < size_)
if (size_ <= 1)
{
tail_.data_ = tail;
// unbuffered operation
if (is_full_unbuffered())
{
return false;
}

buffer_[0] = std::move(t);
head_.data_ = 1;
}
else
{
tail_.data_ = tail - size_;
// buffered operation
if (is_full())
{
return false;
}

std::size_t tail = tail_.data_;
buffer_[tail] = std::move(t);
if (++tail < size_)
{
tail_.data_ = tail;
}
else
{
tail_.data_ = 0;
}
}
return true;
}
Expand Down Expand Up @@ -183,10 +227,10 @@ namespace hpx { namespace lcos { namespace local {
hpx::util::cache_aligned_data<std::size_t> tail_;

// a channel of size n can buffer n-1 items
std::size_t const size_;
std::size_t size_;

// channel buffer
std::unique_ptr<T[]> const buffer_;
std::unique_ptr<T[]> buffer_;

bool closed_;
};
Expand Down
48 changes: 32 additions & 16 deletions tests/performance/local/channel_mpmc_throughput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,12 @@
#include <cstddef>
#include <cstdint>

///////////////////////////////////////////////////////////////////////////////
struct data
{
data() = default;

data(int d)
explicit data(int d)
{
data_[0] = d;
}
Expand All @@ -30,18 +31,34 @@ struct data

constexpr int NUM_TESTS = 10000000;

///////////////////////////////////////////////////////////////////////////////
inline data channel_get(hpx::lcos::local::channel_mpmc<data> const& c)
{
data result;
while (!c.get(&result))
{
hpx::this_thread::yield();
}
return result;
}

inline void channel_set(hpx::lcos::local::channel_mpmc<data>& c, data&& val)
{
while (!c.set(std::move(val))) // NOLINT
{
hpx::this_thread::yield();
}
}

///////////////////////////////////////////////////////////////////////////////
// Produce
double thread_func_0(hpx::lcos::local::channel_mpmc<data>& c)
{
std::uint64_t start = hpx::util::high_resolution_clock::now();

for (int i = 0; i != NUM_TESTS; ++i)
{
data d{i};
while (!c.set(std::move(d)))
{
hpx::this_thread::yield();
}
channel_set(c, data{i});
}

std::uint64_t end = hpx::util::high_resolution_clock::now();
Expand All @@ -56,11 +73,7 @@ double thread_func_1(hpx::lcos::local::channel_mpmc<data>& c)

for (int i = 0; i != NUM_TESTS; ++i)
{
data d;
while (!c.try_get(&d))
{
hpx::this_thread::yield();
}
data d = channel_get(c);
HPX_ASSERT(d.data_[0] == i);
}

Expand All @@ -71,15 +84,18 @@ double thread_func_1(hpx::lcos::local::channel_mpmc<data>& c)

int main(int argc, char* argv[])
{
hpx::lcos::local::channel_mpmc<data> c(100);
hpx::lcos::local::channel_mpmc<data> c(10000);

hpx::future<double> producer = hpx::async(thread_func_0, std::ref(c));
hpx::future<double> consumer = hpx::async(thread_func_1, std::ref(c));

std::cout << "Producer throughput: " << (NUM_TESTS / producer.get())
<< "\n";
std::cout << "Consumer throughput: " << (NUM_TESTS / consumer.get())
<< "\n";
auto producer_time = producer.get();
std::cout << "Producer throughput: " << (NUM_TESTS / producer_time)
<< " [op/s] (" << (producer_time / NUM_TESTS) << " [s/op])\n";

auto consumer_time = consumer.get();
std::cout << "Consumer throughput: " << (NUM_TESTS / consumer_time)
<< " [op/s] (" << (consumer_time / NUM_TESTS) << " [s/op])\n";

return 0;
}
5 changes: 5 additions & 0 deletions tests/unit/lcos/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ set(tests
async_remote_client
async_unwrap_result
channel
channel_mpmc_fib
channel_mpmc_shift
client_then
future
future_ref
Expand Down Expand Up @@ -74,6 +76,9 @@ set(async_remote_client_PARAMETERS LOCALITIES 2)
set(async_cb_remote_PARAMETERS LOCALITIES 2)
set(async_cb_remote_client_PARAMETERS LOCALITIES 2)

set(channel_mpmc_fib_PARAMETERS THREADS_PER_LOCALITY 4)
set(channel_mpmc_shift_PARAMETERS THREADS_PER_LOCALITY 4)

set(future_PARAMETERS THREADS_PER_LOCALITY 4)
set(future_then_PARAMETERS THREADS_PER_LOCALITY 4)
set(future_then_executor_PARAMETERS THREADS_PER_LOCALITY 4)
Expand Down

0 comments on commit fed10b0

Please sign in to comment.