Skip to content

Commit

Permalink
Merge pull request #151 from droccom/rma-assign
Browse files Browse the repository at this point in the history
[#144] RMA-based shad::transform

Fixes #144 
Fixes #151
  • Loading branch information
mminutoli committed Feb 7, 2019
2 parents 12d64ed + cdbc084 commit ace64bc
Show file tree
Hide file tree
Showing 2 changed files with 193 additions and 36 deletions.
180 changes: 161 additions & 19 deletions include/shad/core/impl/modifyng_sequence_ops.h
Expand Up @@ -26,6 +26,7 @@
#define INCLUDE_SHAD_CORE_IMPL_MODIFYING_SEQUENCE_OPS_H

#include <algorithm>
#include <cstring>
#include <functional>
#include <iterator>
#include <tuple>
Expand Down Expand Up @@ -84,12 +85,159 @@ void fill(distributed_sequential_tag&& policy, ForwardIt first, ForwardIt last,
value);
}

namespace transform_impl {
template <typename ForwardIt>
struct gen_args_t {
static constexpr size_t buf_size =
(2 << 10) / sizeof(typename ForwardIt::value_type);
typename ForwardIt::value_type buf[buf_size];
ForwardIt w_first;
size_t size;
};

template <class ForwardIt1, class ForwardIt2, class UnaryOperation>
ForwardIt2 block_contiguous_kernel(rt::Locality l, ForwardIt1 first,
ForwardIt1 last, ForwardIt2 d_first,
UnaryOperation op) {
using itr_traits1 = std::iterator_traits<ForwardIt1>;
using itr_traits2 = distributed_iterator_traits<ForwardIt2>;
using args_t = gen_args_t<ForwardIt2>;
auto size = std::distance(first, last);
auto d_last = d_first;
std::advance(d_last, size);

// local assign
if (rt::thisLocality() == l) {
auto local_d_range = itr_traits2::local_range(d_first, d_last);
auto loc_res = std::transform(first, last, local_d_range.begin(), op);
return itr_traits2::iterator_from_local(d_first, d_last, loc_res - 1) + 1;
}

// remote assign
std::shared_ptr<uint8_t> args_buf(new uint8_t[sizeof(args_t)],
std::default_delete<uint8_t[]>());
auto typed_args_buf = reinterpret_cast<args_t*>(args_buf.get());
auto block_last = first;
rt::Handle h;
while (first != last) {
typed_args_buf->w_first = d_first;
typed_args_buf->size =
std::min(args_t::buf_size, (size_t)std::distance(first, last));
std::advance(block_last, typed_args_buf->size);
std::transform(first, block_last, typed_args_buf->buf, op);
rt::asyncExecuteAt(
h, l,
[](rt::Handle&, const uint8_t* args_buf, const uint32_t) {
const args_t& args = *reinterpret_cast<const args_t*>(args_buf);
using val_t = typename ForwardIt2::value_type;
ForwardIt2 w_last = args.w_first;
std::advance(w_last, args.size);
auto w_range = itr_traits2::local_range(args.w_first, w_last);
std::memcpy(w_range.begin(), args.buf, sizeof(val_t) * args.size);
},
args_buf, sizeof(args_t));
std::advance(first, typed_args_buf->size);
std::advance(d_first, typed_args_buf->size);
}
rt::waitForCompletion(h);
return d_last; // todo double check
}

// distributed-sequential kernel for non-block-contiguous output-iterators
template <class ForwardIt1, class ForwardIt2, class UnaryOperation>
void dseq_kernel(std::false_type, ForwardIt1 first, ForwardIt1 last,
ForwardIt2 d_first, ForwardIt2* res_ptr, UnaryOperation op) {
using itr_traits1 = distributed_iterator_traits<ForwardIt1>;
auto local_range = itr_traits1::local_range(first, last);
auto begin = local_range.begin();
auto end = local_range.end();
*res_ptr = std::transform(begin, end, d_first, op);
}

// distributed-sequential kernel for block-contiguous output-iterators
template <class ForwardIt1, class ForwardIt2, class UnaryOperation>
void dseq_kernel(std::true_type, ForwardIt1 first, ForwardIt1 last,
ForwardIt2 d_first, ForwardIt2* res_ptr, UnaryOperation op) {
using itr_traits1 = distributed_iterator_traits<ForwardIt1>;
using itr_traits2 = distributed_random_access_iterator_trait<ForwardIt2>;
auto loc_range = itr_traits1::local_range(first, last);
auto loc_first = loc_range.begin();
auto d_last = d_first;
std::advance(d_last, std::distance(loc_first, loc_range.end()));
auto dmap = itr_traits2::distribution(d_first, d_last);
auto loc_last = loc_first;
for (auto i : dmap) {
auto l = i.first;
std::advance(loc_last, i.second);
d_last = transform_impl::block_contiguous_kernel(l, loc_first, loc_last,
d_first, op);
std::advance(loc_first, i.second);
std::advance(d_first, i.second);
}
*res_ptr = d_last;
}

// distributed-parallel kernel for non-block-contiguous output-iterators
template <class ForwardIt1, class ForwardIt2, class UnaryOperation>
void dpar_kernel(std::false_type, ForwardIt1 first, ForwardIt1 last,
ForwardIt2 d_first, ForwardIt2* res_ptr, UnaryOperation op) {
using itr_traits1 = distributed_iterator_traits<ForwardIt1>;
auto local_range = itr_traits1::local_range(first, last);
auto begin = local_range.begin();
auto end = local_range.end();
auto it = itr_traits1::iterator_from_local(first, last, begin);
*res_ptr = std::transform(begin, end, d_first, op);
}

// distributed-parallel kernel for block-contiguous output-iterators
template <class ForwardIt1, class ForwardIt2, class UnaryOperation>
void dpar_kernel(std::true_type, ForwardIt1 first, ForwardIt1 last,
ForwardIt2 d_first, ForwardIt2* res_ptr, UnaryOperation op) {
using itr_traits1 = distributed_iterator_traits<ForwardIt1>;
using itr_traits2 = distributed_random_access_iterator_trait<ForwardIt2>;
auto loc_range = itr_traits1::local_range(first, last);
auto loc_first = loc_range.begin();
auto first_ = itr_traits1::iterator_from_local(first, last, loc_first);
std::advance(d_first, std::distance(first, first_));
auto d_last = d_first;
std::advance(d_last, std::distance(loc_first, loc_range.end()));
auto dmap = itr_traits2::distribution(d_first, d_last);
auto loc_last = loc_first;
for (auto i : dmap) {
auto l = i.first;
std::advance(loc_last, i.second);
d_last = transform_impl::block_contiguous_kernel(l, loc_first, loc_last,
d_first, op);
std::advance(loc_first, i.second);
std::advance(d_first, i.second);
}
*res_ptr = d_last;
}

// dispatchers
template <class ForwardIt1, class ForwardIt2, class UnaryOperation>
void dseq_kernel(ForwardIt1 first, ForwardIt1 last, ForwardIt2 d_first,
ForwardIt2* res_ptr, UnaryOperation op) {
dseq_kernel(is_block_contiguous<ForwardIt2>::value, first, last, d_first,
res_ptr, op);
}

template <class ForwardIt1, class ForwardIt2, class UnaryOperation>
void dpar_kernel(ForwardIt1 first, ForwardIt1 last, ForwardIt2 d_first,
ForwardIt2* res_ptr, UnaryOperation op) {
dpar_kernel(is_block_contiguous<ForwardIt2>::value, first, last, d_first,
res_ptr, op);
}

} // namespace transform_impl

template <class ForwardIt1, class ForwardIt2, class UnaryOperation>
ForwardIt2 transform(distributed_parallel_tag&& policy, ForwardIt1 first1,
ForwardIt1 last1, ForwardIt2 d_first,
UnaryOperation unary_op) {
using itr_traits = distributed_iterator_traits<ForwardIt1>;
auto localities = itr_traits::localities(first1, last1);
using itr_traits1 = distributed_iterator_traits<ForwardIt1>;
using itr_traits2 = distributed_random_access_iterator_trait<ForwardIt2>;
auto localities = itr_traits1::localities(first1, last1);
std::vector<ForwardIt2> res(localities.size(), d_first);
auto res_it = res.begin();
rt::Handle h;
Expand All @@ -101,16 +249,11 @@ ForwardIt2 transform(distributed_parallel_tag&& policy, ForwardIt1 first1,
const std::tuple<ForwardIt1, ForwardIt1, ForwardIt2, UnaryOperation>&
args,
ForwardIt2* res_ptr) {
auto gbegin = std::get<0>(args);
auto gend = std::get<1>(args);
auto local_range = itr_traits::local_range(gbegin, gend);
auto begin = local_range.begin();
auto end = local_range.end();
auto it = itr_traits::iterator_from_local(gbegin, gend, begin);
auto d_first_ = std::get<2>(args);
advance_output_iterator(d_first_, gbegin, it);
auto first = std::get<0>(args);
auto last = std::get<1>(args);
auto d_first = std::get<2>(args);
auto op = std::get<3>(args);
*res_ptr = std::transform(begin, end, d_first_, op);
transform_impl::dpar_kernel(first, last, d_first, res_ptr, op);
flush_iterator(*res_ptr);
},
std::make_tuple(first1, last1, d_first, unary_op), &(*res_it));
Expand All @@ -123,8 +266,9 @@ template <class ForwardIt1, class ForwardIt2, class UnaryOperation>
ForwardIt2 transform(distributed_sequential_tag&& policy, ForwardIt1 first1,
ForwardIt1 last1, ForwardIt2 d_first,
UnaryOperation unary_op) {
using itr_traits = distributed_iterator_traits<ForwardIt1>;
auto localities = itr_traits::localities(first1, last1);
using itr_traits1 = distributed_iterator_traits<ForwardIt1>;
using itr_traits2 = distributed_random_access_iterator_trait<ForwardIt2>;
auto localities = itr_traits1::localities(first1, last1);
ForwardIt2 res = d_first;
for (auto locality = localities.begin(), end = localities.end();
locality != end; ++locality) {
Expand All @@ -133,13 +277,11 @@ ForwardIt2 transform(distributed_sequential_tag&& policy, ForwardIt1 first1,
[](const std::tuple<ForwardIt1, ForwardIt1, ForwardIt2, UnaryOperation>&
args,
ForwardIt2* res_ptr) {
auto d_first_ = std::get<2>(args);
auto first = std::get<0>(args);
auto last = std::get<1>(args);
auto d_first = std::get<2>(args);
auto op = std::get<3>(args);
auto local_range =
itr_traits::local_range(std::get<0>(args), std::get<1>(args));
auto begin = local_range.begin();
auto end = local_range.end();
*res_ptr = std::transform(begin, end, d_first_, op);
transform_impl::dseq_kernel(first, last, d_first, res_ptr, op);
flush_iterator(*res_ptr);
},
std::make_tuple(first1, last1, res, unary_op), &res);
Expand Down
49 changes: 32 additions & 17 deletions include/shad/core/iterator.h
Expand Up @@ -44,6 +44,7 @@ namespace shad {
template <typename Container>
class insert_iterator
: public std::iterator<std::output_iterator_tag, void, void, void, void> {
protected:
using Iterator = typename Container::iterator;
using internal_container_t = typename Container::internal_container_t;

Expand Down Expand Up @@ -79,7 +80,7 @@ class insert_iterator
insert_iterator& operator++() { return *this; }
insert_iterator& operator++(int) { return *this; }

private:
protected:
typename internal_container_t::ObjectID global_id_;
Iterator iterator_;
internal_container_t* local_container_ptr_ = nullptr;
Expand All @@ -97,21 +98,21 @@ class insert_iterator
///
/// @tparam Container The type of the distributed container.
template <typename Container>
class buffered_insert_iterator
: public std::iterator<std::output_iterator_tag, void, void, void, void> {
using Iterator = typename Container::iterator;
using internal_container_t = typename Container::internal_container_t;
class buffered_insert_iterator : public insert_iterator<Container> {
using base_t = insert_iterator<Container>;
using Iterator = typename base_t::Iterator;
using internal_container_t = typename base_t::internal_container_t;

public:
using value_type = typename Container::value_type;
using container_type = Container;
using value_type = typename base_t::value_type;
using container_type = typename base_t::container_type;

/// @brief Constructor.
///
/// @param container The container into which the iterator inserts.
/// @param iterator The position at which the iterator starts to insert.
buffered_insert_iterator(Container& container, Iterator iterator)
: global_id_(container.global_id()) {}
: base_t(container, iterator) {}

/// @brief The assignment operator.
///
Expand All @@ -122,21 +123,22 @@ class buffered_insert_iterator
///
/// @return A self reference.
buffered_insert_iterator& operator=(const value_type& value) {
if (!local_container_ptr_ || locality_ != rt::thisLocality()) {
locality_ = rt::thisLocality();
local_container_ptr_ = Container::from_global_id(global_id_);
if (!this->local_container_ptr_ || this->locality_ != rt::thisLocality()) {
this->locality_ = rt::thisLocality();
this->local_container_ptr_ = Container::from_global_id(this->global_id_);
rt::Handle h;
handle_ = h;
}
local_container_ptr_->buffered_async_insert(handle_, value);
this->local_container_ptr_->buffered_async_insert(handle_, value);
return *this;
}

/// @brief Flushes pending insertions to the container.
void flush() {
if (local_container_ptr_ != nullptr && locality_ == rt::thisLocality()) {
if (this->local_container_ptr_ != nullptr &&
this->locality_ == rt::thisLocality()) {
// if(!handle_.IsNull()) FIXME
local_container_ptr_->buffered_async_flush(handle_);
this->local_container_ptr_->buffered_async_flush(handle_);
}
}

Expand All @@ -145,12 +147,25 @@ class buffered_insert_iterator
buffered_insert_iterator& operator++(int) { return *this; }

private:
typename internal_container_t::ObjectID global_id_;
internal_container_t* local_container_ptr_ = nullptr;
rt::Locality locality_;
rt::Handle handle_;
};

// compile-time test for block-contiguous property
template <typename It>
struct is_block_contiguous {
static constexpr std::true_type value{};
};

template <typename Container>
struct is_block_contiguous<shad::insert_iterator<Container>> {
static constexpr std::false_type value{};
};

template <typename Container>
struct is_block_contiguous<shad::buffered_insert_iterator<Container>> {
static constexpr std::false_type value{};
};

} // namespace shad

#endif /* INCLUDE_SHAD_CORE_ITERATOR_H_ */

0 comments on commit ace64bc

Please sign in to comment.