diff --git a/include/shad/core/impl/modifyng_sequence_ops.h b/include/shad/core/impl/modifyng_sequence_ops.h index 7a2a22da..428bf4b8 100755 --- a/include/shad/core/impl/modifyng_sequence_ops.h +++ b/include/shad/core/impl/modifyng_sequence_ops.h @@ -26,6 +26,7 @@ #define INCLUDE_SHAD_CORE_IMPL_MODIFYING_SEQUENCE_OPS_H #include +#include #include #include #include @@ -84,12 +85,159 @@ void fill(distributed_sequential_tag&& policy, ForwardIt first, ForwardIt last, value); } +namespace transform_impl { +template +struct gen_args_t { + static constexpr size_t buf_size = + (2 << 10) / sizeof(typename ForwardIt::value_type); + typename ForwardIt::value_type buf[buf_size]; + ForwardIt w_first; + size_t size; +}; + +template +ForwardIt2 block_contiguous_kernel(rt::Locality l, ForwardIt1 first, + ForwardIt1 last, ForwardIt2 d_first, + UnaryOperation op) { + using itr_traits1 = std::iterator_traits; + using itr_traits2 = distributed_iterator_traits; + using args_t = gen_args_t; + auto size = std::distance(first, last); + auto d_last = d_first; + std::advance(d_last, size); + + // local assign + if (rt::thisLocality() == l) { + auto local_d_range = itr_traits2::local_range(d_first, d_last); + auto loc_res = std::transform(first, last, local_d_range.begin(), op); + return itr_traits2::iterator_from_local(d_first, d_last, loc_res - 1) + 1; + } + + // remote assign + std::shared_ptr args_buf(new uint8_t[sizeof(args_t)], + std::default_delete()); + auto typed_args_buf = reinterpret_cast(args_buf.get()); + auto block_last = first; + rt::Handle h; + while (first != last) { + typed_args_buf->w_first = d_first; + typed_args_buf->size = + std::min(args_t::buf_size, (size_t)std::distance(first, last)); + std::advance(block_last, typed_args_buf->size); + std::transform(first, block_last, typed_args_buf->buf, op); + rt::asyncExecuteAt( + h, l, + [](rt::Handle&, const uint8_t* args_buf, const uint32_t) { + const args_t& args = *reinterpret_cast(args_buf); + using val_t = typename ForwardIt2::value_type; + ForwardIt2 w_last = args.w_first; + std::advance(w_last, args.size); + auto w_range = itr_traits2::local_range(args.w_first, w_last); + std::memcpy(w_range.begin(), args.buf, sizeof(val_t) * args.size); + }, + args_buf, sizeof(args_t)); + std::advance(first, typed_args_buf->size); + std::advance(d_first, typed_args_buf->size); + } + rt::waitForCompletion(h); + return d_last; // todo double check +} + +// distributed-sequential kernel for non-block-contiguous output-iterators +template +void dseq_kernel(std::false_type, ForwardIt1 first, ForwardIt1 last, + ForwardIt2 d_first, ForwardIt2* res_ptr, UnaryOperation op) { + using itr_traits1 = distributed_iterator_traits; + auto local_range = itr_traits1::local_range(first, last); + auto begin = local_range.begin(); + auto end = local_range.end(); + *res_ptr = std::transform(begin, end, d_first, op); +} + +// distributed-sequential kernel for block-contiguous output-iterators +template +void dseq_kernel(std::true_type, ForwardIt1 first, ForwardIt1 last, + ForwardIt2 d_first, ForwardIt2* res_ptr, UnaryOperation op) { + using itr_traits1 = distributed_iterator_traits; + using itr_traits2 = distributed_random_access_iterator_trait; + auto loc_range = itr_traits1::local_range(first, last); + auto loc_first = loc_range.begin(); + auto d_last = d_first; + std::advance(d_last, std::distance(loc_first, loc_range.end())); + auto dmap = itr_traits2::distribution(d_first, d_last); + auto loc_last = loc_first; + for (auto i : dmap) { + auto l = i.first; + std::advance(loc_last, i.second); + d_last = transform_impl::block_contiguous_kernel(l, loc_first, loc_last, + d_first, op); + std::advance(loc_first, i.second); + std::advance(d_first, i.second); + } + *res_ptr = d_last; +} + +// distributed-parallel kernel for non-block-contiguous output-iterators +template +void dpar_kernel(std::false_type, ForwardIt1 first, ForwardIt1 last, + ForwardIt2 d_first, ForwardIt2* res_ptr, UnaryOperation op) { + using itr_traits1 = distributed_iterator_traits; + auto local_range = itr_traits1::local_range(first, last); + auto begin = local_range.begin(); + auto end = local_range.end(); + auto it = itr_traits1::iterator_from_local(first, last, begin); + *res_ptr = std::transform(begin, end, d_first, op); +} + +// distributed-parallel kernel for block-contiguous output-iterators +template +void dpar_kernel(std::true_type, ForwardIt1 first, ForwardIt1 last, + ForwardIt2 d_first, ForwardIt2* res_ptr, UnaryOperation op) { + using itr_traits1 = distributed_iterator_traits; + using itr_traits2 = distributed_random_access_iterator_trait; + auto loc_range = itr_traits1::local_range(first, last); + auto loc_first = loc_range.begin(); + auto first_ = itr_traits1::iterator_from_local(first, last, loc_first); + std::advance(d_first, std::distance(first, first_)); + auto d_last = d_first; + std::advance(d_last, std::distance(loc_first, loc_range.end())); + auto dmap = itr_traits2::distribution(d_first, d_last); + auto loc_last = loc_first; + for (auto i : dmap) { + auto l = i.first; + std::advance(loc_last, i.second); + d_last = transform_impl::block_contiguous_kernel(l, loc_first, loc_last, + d_first, op); + std::advance(loc_first, i.second); + std::advance(d_first, i.second); + } + *res_ptr = d_last; +} + +// dispatchers +template +void dseq_kernel(ForwardIt1 first, ForwardIt1 last, ForwardIt2 d_first, + ForwardIt2* res_ptr, UnaryOperation op) { + dseq_kernel(is_block_contiguous::value, first, last, d_first, + res_ptr, op); +} + +template +void dpar_kernel(ForwardIt1 first, ForwardIt1 last, ForwardIt2 d_first, + ForwardIt2* res_ptr, UnaryOperation op) { + dpar_kernel(is_block_contiguous::value, first, last, d_first, + res_ptr, op); +} + +} // namespace transform_impl + template ForwardIt2 transform(distributed_parallel_tag&& policy, ForwardIt1 first1, ForwardIt1 last1, ForwardIt2 d_first, UnaryOperation unary_op) { - using itr_traits = distributed_iterator_traits; - auto localities = itr_traits::localities(first1, last1); + using itr_traits1 = distributed_iterator_traits; + using itr_traits2 = distributed_random_access_iterator_trait; + auto localities = itr_traits1::localities(first1, last1); std::vector res(localities.size(), d_first); auto res_it = res.begin(); rt::Handle h; @@ -101,16 +249,11 @@ ForwardIt2 transform(distributed_parallel_tag&& policy, ForwardIt1 first1, const std::tuple& args, ForwardIt2* res_ptr) { - auto gbegin = std::get<0>(args); - auto gend = std::get<1>(args); - auto local_range = itr_traits::local_range(gbegin, gend); - auto begin = local_range.begin(); - auto end = local_range.end(); - auto it = itr_traits::iterator_from_local(gbegin, gend, begin); - auto d_first_ = std::get<2>(args); - advance_output_iterator(d_first_, gbegin, it); + auto first = std::get<0>(args); + auto last = std::get<1>(args); + auto d_first = std::get<2>(args); auto op = std::get<3>(args); - *res_ptr = std::transform(begin, end, d_first_, op); + transform_impl::dpar_kernel(first, last, d_first, res_ptr, op); flush_iterator(*res_ptr); }, std::make_tuple(first1, last1, d_first, unary_op), &(*res_it)); @@ -123,8 +266,9 @@ template ForwardIt2 transform(distributed_sequential_tag&& policy, ForwardIt1 first1, ForwardIt1 last1, ForwardIt2 d_first, UnaryOperation unary_op) { - using itr_traits = distributed_iterator_traits; - auto localities = itr_traits::localities(first1, last1); + using itr_traits1 = distributed_iterator_traits; + using itr_traits2 = distributed_random_access_iterator_trait; + auto localities = itr_traits1::localities(first1, last1); ForwardIt2 res = d_first; for (auto locality = localities.begin(), end = localities.end(); locality != end; ++locality) { @@ -133,13 +277,11 @@ ForwardIt2 transform(distributed_sequential_tag&& policy, ForwardIt1 first1, [](const std::tuple& args, ForwardIt2* res_ptr) { - auto d_first_ = std::get<2>(args); + auto first = std::get<0>(args); + auto last = std::get<1>(args); + auto d_first = std::get<2>(args); auto op = std::get<3>(args); - auto local_range = - itr_traits::local_range(std::get<0>(args), std::get<1>(args)); - auto begin = local_range.begin(); - auto end = local_range.end(); - *res_ptr = std::transform(begin, end, d_first_, op); + transform_impl::dseq_kernel(first, last, d_first, res_ptr, op); flush_iterator(*res_ptr); }, std::make_tuple(first1, last1, res, unary_op), &res); diff --git a/include/shad/core/iterator.h b/include/shad/core/iterator.h index 76ffd39a..ef846e41 100644 --- a/include/shad/core/iterator.h +++ b/include/shad/core/iterator.h @@ -44,6 +44,7 @@ namespace shad { template class insert_iterator : public std::iterator { + protected: using Iterator = typename Container::iterator; using internal_container_t = typename Container::internal_container_t; @@ -79,7 +80,7 @@ class insert_iterator insert_iterator& operator++() { return *this; } insert_iterator& operator++(int) { return *this; } - private: + protected: typename internal_container_t::ObjectID global_id_; Iterator iterator_; internal_container_t* local_container_ptr_ = nullptr; @@ -97,21 +98,21 @@ class insert_iterator /// /// @tparam Container The type of the distributed container. template -class buffered_insert_iterator - : public std::iterator { - using Iterator = typename Container::iterator; - using internal_container_t = typename Container::internal_container_t; +class buffered_insert_iterator : public insert_iterator { + using base_t = insert_iterator; + using Iterator = typename base_t::Iterator; + using internal_container_t = typename base_t::internal_container_t; public: - using value_type = typename Container::value_type; - using container_type = Container; + using value_type = typename base_t::value_type; + using container_type = typename base_t::container_type; /// @brief Constructor. /// /// @param container The container into which the iterator inserts. /// @param iterator The position at which the iterator starts to insert. buffered_insert_iterator(Container& container, Iterator iterator) - : global_id_(container.global_id()) {} + : base_t(container, iterator) {} /// @brief The assignment operator. /// @@ -122,21 +123,22 @@ class buffered_insert_iterator /// /// @return A self reference. buffered_insert_iterator& operator=(const value_type& value) { - if (!local_container_ptr_ || locality_ != rt::thisLocality()) { - locality_ = rt::thisLocality(); - local_container_ptr_ = Container::from_global_id(global_id_); + if (!this->local_container_ptr_ || this->locality_ != rt::thisLocality()) { + this->locality_ = rt::thisLocality(); + this->local_container_ptr_ = Container::from_global_id(this->global_id_); rt::Handle h; handle_ = h; } - local_container_ptr_->buffered_async_insert(handle_, value); + this->local_container_ptr_->buffered_async_insert(handle_, value); return *this; } /// @brief Flushes pending insertions to the container. void flush() { - if (local_container_ptr_ != nullptr && locality_ == rt::thisLocality()) { + if (this->local_container_ptr_ != nullptr && + this->locality_ == rt::thisLocality()) { // if(!handle_.IsNull()) FIXME - local_container_ptr_->buffered_async_flush(handle_); + this->local_container_ptr_->buffered_async_flush(handle_); } } @@ -145,12 +147,25 @@ class buffered_insert_iterator buffered_insert_iterator& operator++(int) { return *this; } private: - typename internal_container_t::ObjectID global_id_; - internal_container_t* local_container_ptr_ = nullptr; - rt::Locality locality_; rt::Handle handle_; }; +// compile-time test for block-contiguous property +template +struct is_block_contiguous { + static constexpr std::true_type value{}; +}; + +template +struct is_block_contiguous> { + static constexpr std::false_type value{}; +}; + +template +struct is_block_contiguous> { + static constexpr std::false_type value{}; +}; + } // namespace shad #endif /* INCLUDE_SHAD_CORE_ITERATOR_H_ */