Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hpx::lcos::queue exhibits strange behavior #1631

Closed
wants to merge 1 commit into from
Closed

hpx::lcos::queue exhibits strange behavior #1631

wants to merge 1 commit into from

Conversation

TheConstructor
Copy link

I am trying to use hpx::lcos::queue to distribute values between asynchronous jobs. During compilation I got errors like this:

hpx/hpx/runtime/actions/component_action.hpp:65:18: error: rvalue reference to type 'particle<[...]>' cannot bind to lvalue of type 'particle<[...]>'
                (std::forward<Ts>(vs)...);

with a help of my mentor we compiled attached fix to these problems.

Sadly hpx::lcos::queue still does not seem to be doing what I want it. Usage is as follows:

#include <hpx/include/lcos.hpp>
#include <hpx/include/parallel_algorithm.hpp>
#include <boost/function.hpp>

template<typename Intermediate, typename Reduction>
struct reduce_helper : boost::function1<void, hpx::future<Intermediate>> {
private:
    boost::function2<Reduction, Reduction, Intermediate> m_reduce;
    mutable hpx::lcos::queue<Reduction> m_queue;

    friend class hpx::serialization::access;

    // When the class Archive corresponds to an output archive, the
    // & operator is defined similar to <<.  Likewise, when the class Archive
    // is a type of input archive the & operator is defined similar to >>.
    template<class Archive>
    void serialize(Archive &ar, const unsigned int version) {
        ar &m_reduce;
        ar &m_queue;
    }

public:

    reduce_helper(boost::function2<Reduction, Reduction, Intermediate> reduce,
                  hpx::lcos::queue<Reduction> queue) : m_reduce(reduce), m_queue(queue) {

    }


    void operator()(hpx::future<Intermediate> next) const {
        Intermediate nextValue = next.get();
        Reduction val = m_reduce(m_queue.get_value_sync(), nextValue);
        m_queue.set_value(val);
    }

};

boost::function1<Reduction, InputData> m_initial;
boost::function2<Reduction, Reduction, Intermediate> m_reduce;
std::vector<hpx::future<Intermediate>> futures = std::vector<hpx::future<Intermediate>>();

// populate futures with futures, assign m_initial and m_reduce with appropriate functors ;-)

hpx::lcos::queue<Reduction> queue = hpx::lcos::queue<Reduction>();
Reduction initial = m_initial(hpx::util::get<1>(left));
queue.set_value(initial);

hpx::lcos::wait_each(reduce_helper<Intermediate, Reduction>(m_reduce, queue), futures);

output_future.get();
output.set_value_sync(hpx::util::get<0>(left), queue.get_value_sync());

@hkaiser
Copy link
Member

hkaiser commented Jun 25, 2015

@TheConstructor please look at the fixing_queue branch, all has been fixed there. I also added an example

@hkaiser
Copy link
Member

hkaiser commented Jun 25, 2015

It's also in the PR #1627

@hkaiser
Copy link
Member

hkaiser commented Jun 25, 2015

@TheConstructor Can we close this now in light of #1627 working for you?

@hkaiser hkaiser added this to the 0.9.11 milestone Jun 25, 2015
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants