Skip to content

Commit

Permalink
Some when_each/wait_each fixes. Use them in examples and tests instea…
Browse files Browse the repository at this point in the history
…d of the old wait where possible.
  • Loading branch information
K-ballo committed Jul 4, 2014
1 parent 78d2a33 commit fca92db
Show file tree
Hide file tree
Showing 30 changed files with 326 additions and 341 deletions.
41 changes: 21 additions & 20 deletions examples/1d_hydro/1d_hydro_upwind.cpp
Expand Up @@ -15,8 +15,9 @@
#include <hpx/runtime/actions/plain_action.hpp>
#include <hpx/runtime/components/plain_component_factory.hpp>
#include <hpx/include/async.hpp>
#include <hpx/lcos/future_wait.hpp>
#include <hpx/include/iostreams.hpp>
#include <hpx/lcos/future_wait.hpp>
#include <hpx/lcos/wait_each.hpp>

#include <boost/format.hpp>
#include <boost/math/constants/constants.hpp>
Expand All @@ -26,6 +27,7 @@ using hpx::naming::invalid_id;

using hpx::lcos::future;
using hpx::lcos::wait;
using hpx::lcos::wait_each;
using hpx::async;

using hpx::util::high_resolution_timer;
Expand Down Expand Up @@ -131,7 +133,7 @@ struct time_element{
, elapsed_time(0.0)
, computed(false),fluid_future(0),fluid(0)
{}

time_element(boost::uint64_t number_of_cells)
:fluid_future(number_of_cells)
,fluid(number_of_cells)
Expand All @@ -146,10 +148,10 @@ struct time_element{
,fluid_future(other.fluid_future)
,fluid(other.fluid)
{}

time_element& operator=(time_element const& rhs)
{
if (this != &rhs)
if (this != &rhs)
{
dt = rhs.dt;
elapsed_time = rhs.elapsed_time;
Expand All @@ -166,7 +168,7 @@ struct time_element{
double elapsed_time;
bool computed;
double physics_time;
std::vector<hpx::lcos::shared_future<cell> > fluid_future;//future for each cell
std::vector<hpx::lcos::shared_future<cell> > fluid_future;//future for each cell
std::vector<cell> fluid;
};
// declaring time_array
Expand All @@ -190,7 +192,7 @@ class One_Dimension_Grid
void addNewTimeStep();

private:

boost::uint64_t number_t_steps;
boost::uint64_t number_of_cells;
public:
Expand Down Expand Up @@ -268,12 +270,12 @@ double timestep_size(boost::uint64_t timestep)
for (boost::uint64_t i=0;i<nx;i++)
grid.time_array.at(timestep).fluid_future.push_back(async<compute_action>(here,timestep-n_predict,i));
}

double dt_cfl = 1000.0;
wait(grid.time_array.at(timestep).fluid_future, [&](std::size_t i, cell const& this_cell)

wait_each(grid.time_array.at(timestep).fluid_future,
hpx::util::unwrapped([&](cell const& this_cell)
{
// if (i == 0)
// cout << (boost::format("futures fulfilled for timestep %1%\n") % timestep) << flush;
// look at all of the cells at a timestep, then pick the smallest
// dt_cfl = cfl_factor*dx/(soundspeed+absolute_velocity)
double abs_velocity = this_cell.mom/this_cell.rho;
Expand All @@ -287,16 +289,15 @@ double timestep_size(boost::uint64_t timestep)
}
if (dt_cfl_here < dt_cfl)
dt_cfl = dt_cfl_here;
});
}));

// initialize dt_cfl to some arbitrary high value


// wait for an array of futures
/*wait(grid.time_array, [&](std::size_t i, cell const& this_cell)
/*wait_each(grid.time_array,
hpx::util::unwrapped([&](cell const& this_cell)
{
// if (i == 0)
// cout << (boost::format("futures fulfilled for timestep %1%\n") % timestep) << flush;
// look at all of the cells at a timestep, then pick the smallest
// dt_cfl = cfl_factor*dx/(soundspeed+absolute_velocity)
double abs_velocity = this_cell.mom/this_cell.rho;
Expand All @@ -310,7 +311,7 @@ double timestep_size(boost::uint64_t timestep)
}
if (dt_cfl_here < dt_cfl)
dt_cfl = dt_cfl_here;
});
}));
*/


Expand Down Expand Up @@ -487,7 +488,7 @@ cell compute(boost::uint64_t timestep, boost::uint64_t location)
grid.remove_bottom_time_step();
grid.addNewTimeStep();
}

return grid.time_array.at(timestep).fluid.at(location);
}

Expand Down Expand Up @@ -710,9 +711,9 @@ int hpx_main(
std::cout << (boost::format(fmt) % t.elapsed());
char const* fmt0 = "code elapsed time: %1%\n";
double t_code_time= grid.time_array[0].elapsed_time;

t_code_time+=grid.time_array[nt-1].elapsed_time;

std::cout << (boost::format(fmt0) % t_code_time);


Expand Down Expand Up @@ -743,7 +744,7 @@ int main(
( "npredict-value"
, value<boost::uint64_t>()->default_value(10)
, "prediction parameter of the wave equation")

( "ptime-value"
, value<double>()->default_value(2.50)
, "Physics time to run the simulation to")
Expand Down
76 changes: 38 additions & 38 deletions examples/diskperf/diskperf_ofs_pxfs_action.cpp
Expand Up @@ -76,7 +76,7 @@ struct RESULT {
ar & user;
ar & sys;
}
};
};

struct ofs_test_info_type
{
Expand Down Expand Up @@ -109,12 +109,12 @@ struct promise_rt_ptr_type

promise_rt_ptr_type() {}

promise_rt_ptr_type(std::string name,
int_promise_type* pp, rt_type* rr)
promise_rt_ptr_type(std::string name,
int_promise_type* pp, rt_type* rr)
: thread_name_(name), p_p_(pp), rt_p_(rr) {}

void set(std::string name,
int_promise_type* pp, rt_type* rr)
void set(std::string name,
int_promise_type* pp, rt_type* rr)
{
thread_name_ = name;
p_p_ = pp;
Expand Down Expand Up @@ -209,28 +209,28 @@ RESULT write_files_test(ofs_test_info_type ofs_test_info, int proc)
continue;
}

for (int j = 0; j < count; j++)
for (int j = 0; j < count; j++)
{
std::ostringstream tmp_sstream;
tmp_sstream << "pxfs_pwrite_loc" << hpx::get_locality_id()
tmp_sstream << "pxfs_pwrite_loc" << hpx::get_locality_id()
<< "_" << proc << "_" << i << "_" << j;
promise_rt_ptr_array[i*count + j].set(
tmp_sstream.str(),
&int_promise_array[i*count + j],
tmp_sstream.str(),
&int_promise_array[i*count + j],
hpx::get_runtime_ptr());

pxfs_pwrite(fd_array[i], buf.get(), bufsiz, j*bufsiz,
&num_written_array[i*count + j],
set_promise_cb,
pxfs_pwrite(fd_array[i], buf.get(), bufsiz, j*bufsiz,
&num_written_array[i*count + j],
set_promise_cb,
&promise_rt_ptr_array[i*count + j]);

futures.push_back(int_promise_array[i*count + j].get_future());
}

}

hpx::wait(futures,
[&](std::size_t idx, int rt)
hpx::lcos::wait_each(futures,
hpx::util::unwrapped([&](std::size_t idx, int rt)
{
if (rt != 0)
{
Expand All @@ -241,10 +241,10 @@ RESULT write_files_test(ofs_test_info_type ofs_test_info, int proc)

if(num_written_array[idx] != bufsiz)
{
hpx::cerr<<"loc " << hpx::get_locality_id() << " proc " << proc << ": error! not writing all bytes of " << idx%count <<"th block of "
hpx::cerr<<"loc " << hpx::get_locality_id() << " proc " << proc << ": error! not writing all bytes of " << idx%count <<"th block of "
<< idx/count << "th file."<<hpx::endl;
}
});
}));

end = times(&t2);
r.real = ((double) end - (double) start) / (double) sysconf(_SC_CLK_TCK);
Expand Down Expand Up @@ -320,7 +320,7 @@ RESULT read_files_test(ofs_test_info_type ofs_test_info, int proc)
// fd_array[i] = pvfs_open(filename, oflags);

if(fd_array[i] == -1)
{
{
hpx::cerr<<"Unable to open orangeFS file "<<filename<<hpx::endl;
continue;
}
Expand All @@ -329,25 +329,25 @@ RESULT read_files_test(ofs_test_info_type ofs_test_info, int proc)

{
std::ostringstream tmp_sstream;
tmp_sstream << "pxfs_pwrite_loc" << hpx::get_locality_id()
tmp_sstream << "pxfs_pwrite_loc" << hpx::get_locality_id()
<< "_" << proc << "_" << i << "_" << j;
promise_rt_ptr_array[i*count + j].set(
tmp_sstream.str(),
&int_promise_array[i*count + j],
tmp_sstream.str(),
&int_promise_array[i*count + j],
hpx::get_runtime_ptr());

pxfs_pread(fd_array[i], &buf_array[j*bufsiz],
bufsiz, j*bufsiz,
&num_read_array[i*count + j],
set_promise_cb,
pxfs_pread(fd_array[i], &buf_array[j*bufsiz],
bufsiz, j*bufsiz,
&num_read_array[i*count + j],
set_promise_cb,
&promise_rt_ptr_array[i*count + j]);

futures.push_back(int_promise_array[i*count + j].get_future());
}
}

hpx::wait(futures,
[&](std::size_t idx, int rt)
hpx::lcos::wait_each(futures,
hpx::util::unwrapped([&](std::size_t idx, int rt)
{
if (rt != 0)
{
Expand All @@ -358,10 +358,10 @@ RESULT read_files_test(ofs_test_info_type ofs_test_info, int proc)

if(num_read_array[idx] != bufsiz)
{
hpx::cerr<<"loc " << hpx::get_locality_id() << " proc " << proc << ": error! not reading all bytes of " << idx%count <<"th block of "
hpx::cerr<<"loc " << hpx::get_locality_id() << " proc " << proc << ": error! not reading all bytes of " << idx%count <<"th block of "
<< idx/count << "th file."<<hpx::endl;
}
});
}));

end = times(&t2);
r.real = ((double) end - (double) start) / (double) sysconf(_SC_CLK_TCK);
Expand Down Expand Up @@ -487,14 +487,14 @@ int hpx_main(variables_map& vm)
}
}

hpx::lcos::wait(futures,
[&](std::size_t ii, RESULT r) {
hpx::lcos::wait_each(futures,
hpx::util::unwrapped([&](std::size_t ii, RESULT r) {
r_ptr[ii].real = r.real;
r_ptr[ii].user = r.user;
r_ptr[ii].sys = r.sys;
});
}));

// overall performance
// overall performance
double tt = t.elapsed();

if(rfiles > 0)
Expand Down Expand Up @@ -530,20 +530,20 @@ int hpx_main(variables_map& vm)

if(rfiles > 0)
{
hpx::cout<<(boost::format("Aggregate Reading Throughput: %1% [MB/s]\n") %
hpx::cout<<(boost::format("Aggregate Reading Throughput: %1% [MB/s]\n") %
(localities.size() * procs * rfiles * count * bufsiz / tt / (1024*1024)));
hpx::cout<<(boost::format("\t Max Throughput per thread: %1% [MB/s]\n") %
hpx::cout<<(boost::format("\t Max Throughput per thread: %1% [MB/s]\n") %
(rfiles * count * bufsiz / min_time / (1024*1024)));
hpx::cout<<(boost::format("\t Min Throughput per thread: %1% [MB/s]\n") %
hpx::cout<<(boost::format("\t Min Throughput per thread: %1% [MB/s]\n") %
(rfiles * count * bufsiz / max_time / (1024*1024)));
}
else
{
hpx::cout<<(boost::format("Aggregate Writing Throughput: %1% [MB/s]\n") %
hpx::cout<<(boost::format("Aggregate Writing Throughput: %1% [MB/s]\n") %
(localities.size() * procs * wfiles * count * bufsiz / tt / (1024*1024)));
hpx::cout<<(boost::format("\t Max Throughput per thread: %1% [MB/s]\n") %
hpx::cout<<(boost::format("\t Max Throughput per thread: %1% [MB/s]\n") %
(wfiles * count * bufsiz / min_time / (1024*1024)));
hpx::cout<<(boost::format("\t Min Throughput per thread: %1% [MB/s]\n") %
hpx::cout<<(boost::format("\t Min Throughput per thread: %1% [MB/s]\n") %
(wfiles * count * bufsiz / max_time / (1024*1024)));

}
Expand Down
1 change: 1 addition & 0 deletions examples/jacobi/jacobi_component/server/solver.hpp
Expand Up @@ -13,6 +13,7 @@

#include <hpx/include/components.hpp>
#include <hpx/components/distributing_factory/distributing_factory.hpp>
#include <hpx/lcos/future_wait.hpp>
#include <hpx/util/high_resolution_timer.hpp>

#include <vector>
Expand Down
2 changes: 1 addition & 1 deletion examples/quickstart/1d_wave_equation.cpp
Expand Up @@ -92,7 +92,7 @@ struct data{
u_value = other.u_value;
computed = other.computed;
return *this;
}
}

hpx::lcos::local::mutex mtx;
double u_value;
Expand Down
6 changes: 3 additions & 3 deletions examples/quickstart/hello_world.cpp
Expand Up @@ -108,14 +108,14 @@ void hello_world_foreman()
// return value of the future. hpx::lcos::wait doesn't return until
// all the futures in the vector have returned.
hpx::lcos::local::spinlock mtx;
hpx::lcos::wait(futures,
[&](std::size_t, std::size_t t) {
hpx::lcos::wait_each(futures,
hpx::util::unwrapped([&](std::size_t t) {
if (std::size_t(-1) != t)
{
hpx::lcos::local::spinlock::scoped_lock lk(mtx);
attendance.erase(t);
}
});
}));
}
}
//]
Expand Down
4 changes: 0 additions & 4 deletions examples/quickstart/non_atomic_rma.cpp
Expand Up @@ -15,7 +15,6 @@
#include <hpx/hpx_init.hpp>
#include <hpx/runtime/actions/plain_action.hpp>
#include <hpx/runtime/components/plain_component_factory.hpp>
#include <hpx/lcos/future_wait.hpp>
#include <hpx/util/locking_helpers.hpp>

//Boost includes
Expand Down Expand Up @@ -122,9 +121,6 @@ int hpx_main(po::variables_map &vm)
future_update.push_back(hpx::async<update_action>(that_prefix,tmp));
}

//for (int i=0;i<N;i++) {
// future_update[i].get();
//}
hpx::wait_all(future_update);

for (int i=0;i<array_length;i++) {
Expand Down
1 change: 0 additions & 1 deletion examples/quickstart/quicksort.cpp
Expand Up @@ -9,7 +9,6 @@
#include <hpx/runtime/actions/plain_action.hpp>
#include <hpx/runtime/components/plain_component_factory.hpp>
#include <hpx/util/high_resolution_timer.hpp>
#include <hpx/lcos/future_wait.hpp>
#include <hpx/include/async.hpp>

#include <algorithm>
Expand Down
1 change: 0 additions & 1 deletion examples/sheneos/sheneos/interpolator.cpp
Expand Up @@ -10,7 +10,6 @@
#include <hpx/components/distributing_factory/distributing_factory.hpp>

#include <hpx/include/async.hpp>
#include <hpx/lcos/future_wait.hpp>
#include <hpx/lcos/local/packaged_task.hpp>
#include <hpx/util/assert.hpp>

Expand Down

0 comments on commit fca92db

Please sign in to comment.