Skip to content

Commit

Permalink
Adding high performance MPMC channel
Browse files Browse the repository at this point in the history
  • Loading branch information
hkaiser committed Nov 29, 2019
1 parent a759ebe commit 8fd3209
Show file tree
Hide file tree
Showing 3 changed files with 283 additions and 0 deletions.
195 changes: 195 additions & 0 deletions hpx/lcos/local/channel_mpmc.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
// Copyright (c) 2019 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

// This work is inspired by https://github.com/aprell/tasking-2.0

#if !defined(HPX_LCOS_LOCAL_CHANNEL_MPMC_NOV_24_2019_1141AM)
#define HPX_LCOS_LOCAL_CHANNEL_MPMC_NOV_24_2019_1141AM

#include <hpx/config.hpp>
#include <hpx/concurrency.hpp>
#include <hpx/errors.hpp>
#include <hpx/lcos/local/spinlock.hpp>
#include <hpx/thread_support.hpp>

#include <cstddef>
#include <memory>
#include <mutex>

namespace hpx { namespace lcos { namespace local {

////////////////////////////////////////////////////////////////////////////
// A simple but very high performance implementation of the channel concept.
// This channel is bounded to a size given at construction time and supports
// multiple producers and multiple consumers. The data is stored in a
// ring-buffer.
template <typename T>
class channel_mpmc
{
private:
using mutex_type = hpx::lcos::local::spinlock;

std::size_t num_items() const noexcept
{
std::size_t numitems = size_ + tail_.data_ - head_.data_;
if (numitems < size_)
{
return numitems;
}
return numitems - size_;
}

bool is_full() const noexcept
{
return num_items() == size_ - 1;
}

bool is_empty() const noexcept
{
return head_.data_ == tail_.data_;
}

public:
explicit channel_mpmc(std::size_t size)
: size_(size + 1)
, buffer_(new T[size + 1])
, closed_(false)
{
head_.data_ = 0;
tail_.data_ = 0;
}

~channel_mpmc()
{
std::unique_lock<mutex_type> l(mtx_.data_);
if (!closed_)
{
close(l);
}
}

T get() const
{
std::unique_lock<mutex_type> l(mtx_.data_);
if (is_empty())
{
if (closed_)
{
l.unlock();
HPX_THROW_EXCEPTION(hpx::invalid_status,
"hpx::lcos::local::channel_mpmc::get_sync",
"this channel is empty and was closed");
}
else
{
l.unlock();
HPX_THROW_EXCEPTION(hpx::invalid_status,
"hpx::lcos::local::channel_mpmc::get_sync",
"this channel is empty");
}
}

std::size_t head = head_.data_;
T result = std::move(buffer_[head]);
if (++head < size_)
{
head_.data_ = head;
}
else
{
head_.data_ = head - size_;
}
return result;
}

bool try_get(T* val = nullptr) const
{
std::unique_lock<mutex_type> l(mtx_.data_);
if (is_empty())
{
return false;
}

if (val == nullptr)
{
return true;
}

std::size_t head = head_.data_;
*val = std::move(buffer_[head]);
if (++head < size_)
{
head_.data_ = head;
}
else
{
head_.data_ = head - size_;
}
return true;
}

bool set(T&& t)
{
std::unique_lock<mutex_type> l(mtx_.data_);
if (closed_ || is_full())
{
return false;
}

std::size_t tail = tail_.data_;
buffer_[tail] = std::move(t);
if (++tail < size_)
{
tail_.data_ = tail;
}
else
{
tail_.data_ = tail - size_;
}
return true;
}

std::size_t close()
{
std::unique_lock<mutex_type> l(mtx_.data_);
return close(l);
}

protected:
std::size_t close(std::unique_lock<mutex_type>& l)
{
HPX_ASSERT_OWNS_LOCK(l);

if (closed_)
{
l.unlock();
HPX_THROW_EXCEPTION(hpx::invalid_status,
"hpx::lcos::local::channel_mpmc::close",
"attempting to close an already closed channel");
}

closed_ = true;
return 0;
}

private:
// keep the mutex, the head, and the tail pointer in separate cache
// lines
mutable hpx::util::cache_aligned_data<mutex_type> mtx_;
mutable hpx::util::cache_aligned_data<std::size_t> head_;
hpx::util::cache_aligned_data<std::size_t> tail_;

// a channel of size n can buffer n-1 items
std::size_t const size_;

// channel buffer
std::unique_ptr<T[]> const buffer_;

bool closed_;
};
}}} // namespace hpx::lcos::local

#endif
3 changes: 3 additions & 0 deletions tests/performance/local/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ set(boost_library_dependencies ${Boost_LIBRARIES})
set(benchmarks
agas_cache_timings
async_overheads
channel_mpmc_throughput
delay_baseline
delay_baseline_threaded
hpx_homogeneous_timed_task_spawn_executors
Expand Down Expand Up @@ -144,6 +145,8 @@ set(benchmarks ${benchmarks}
partitioned_vector_foreach
)

set(channel_mpmc_throughput_FLAGS DEPENDENCIES
hpx_assertion hpx_concurrency hpx_errors hpx_timing hpx_thread_support)
set(foreach_scaling_FLAGS DEPENDENCIES iostreams_component hpx_timing)
set(function_object_wrapper_overhead_FLAGS DEPENDENCIES hpx_timing)
set(hpx_tls_overhead_FLAGS DEPENDENCIES hpx_timing)
Expand Down
85 changes: 85 additions & 0 deletions tests/performance/local/channel_mpmc_throughput.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
// Copyright (c) 2019 Hartmut Kaiser
//
// SPDX-License-Identifier: BSL-1.0
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)

// This work is inspired by https://github.com/aprell/tasking-2.0

#include <hpx/hpx.hpp>
#include <hpx/hpx_main.hpp>

#include <hpx/assertion.hpp>
#include <hpx/lcos/local/channel_mpmc.hpp>
#include <hpx/timing.hpp>

#include <cstddef>
#include <cstdint>

struct data
{
data() = default;

data(int d)
{
data_[0] = d;
}

int data_[8];
};

constexpr int NUM_TESTS = 10000000;

// Produce
double thread_func_0(hpx::lcos::local::channel_mpmc<data>& c)
{
std::uint64_t start = hpx::util::high_resolution_clock::now();

for (int i = 0; i != NUM_TESTS; ++i)
{
data d{i};
while (!c.set(std::move(d)))
{
hpx::this_thread::yield();
}
}

std::uint64_t end = hpx::util::high_resolution_clock::now();

return (end - start) / 1e9;
}

// Consume
double thread_func_1(hpx::lcos::local::channel_mpmc<data>& c)
{
std::uint64_t start = hpx::util::high_resolution_clock::now();

for (int i = 0; i != NUM_TESTS; ++i)
{
data d;
while (!c.try_get(&d))
{
hpx::this_thread::yield();
}
HPX_ASSERT(d.data_[0] == i);
}

std::uint64_t end = hpx::util::high_resolution_clock::now();

return (end - start) / 1e9;
}

int main(int argc, char* argv[])
{
hpx::lcos::local::channel_mpmc<data> c(100);

hpx::future<double> producer = hpx::async(thread_func_0, std::ref(c));
hpx::future<double> consumer = hpx::async(thread_func_1, std::ref(c));

std::cout << "Producer throughput: " << (NUM_TESTS / producer.get())
<< "\n";
std::cout << "Consumer throughput: " << (NUM_TESTS / consumer.get())
<< "\n";

return 0;
}

0 comments on commit 8fd3209

Please sign in to comment.