Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

More tests adjustments #1109

Merged
merged 4 commits into from Apr 4, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion tests/performance/network/network_storage/CMakeLists.txt
Expand Up @@ -31,7 +31,7 @@ hpx_option(HPX_BENCHMARK_SCRIPTS_PATH PATH
if (WIN32)
# use path to build dir here, not install dir
file(TO_NATIVE_PATH "${HPX_BUILD_PREFIX}/bin/network_storage.exe" EXE_PATH)
file(TO_NATIVE_PATH "${BOOST_LIBRARY_DIR};${HPX_PREFIX}/lib/hpx" LIB_PATH)
file(TO_NATIVE_PATH "${BOOST_LIBRARY_DIR};${HPX_BUILD_PREFIX}/lib/hpx" LIB_PATH)

#message("EXE_PATH is ${EXE_PATH}, LIB_PATH=${LIB_PATH}")

Expand Down
11 changes: 7 additions & 4 deletions tests/performance/network/network_storage/copy_script.cmake
@@ -1,8 +1,11 @@
message ("source dir is " ${SCRIPT_SOURCE_DIR})
message ("dest dir is " ${SCRIPT_DEST_DIR})
FILE(TO_NATIVE_PATH "${SCRIPT_SOURCE_DIR}/${SCRIPT_NAME}.sh.in" INFILE)
FILE(TO_NATIVE_PATH "${SCRIPT_DEST_DIR}/${SCRIPT_NAME}.sh" OUTFILE)

STRING(REPLACE "\"" "" FILE1 "${INFILE}")
STRING(REPLACE "\"" "" FILE2 "${OUTFILE}")

configure_file(
${SCRIPT_SOURCE_DIR}/${SCRIPT_NAME}.sh.in
${SCRIPT_DEST_DIR}/${SCRIPT_NAME}.sh
"${FILE1}"
"${FILE2}"
@ONLY
)
Expand Up @@ -18,13 +18,18 @@ rem cd /d d:\build\hpx
rem Get N-1 from argument
set /a N=%1-1

rem use "start /B" to suppress new window per task

echo "Starting %1 instances as part of HPX job"
FOR /l %%x IN (0, 1, %N%) DO (
echo start /B @EXE_PATH@ -l%1 -%%x @EXE_PATH@ -l%1 -%%x -Ihpx.parcel.tcp.enable=1 -Ihpx.parcel.async_serialization=1 --hpx:threads=4 --localMB=512 --transferKB=1024 --iterations=5
start /B @EXE_PATH@ -l%1 -%%x -Ihpx.parcel.tcp.enable=1 -Ihpx.parcel.async_serialization=1 --hpx:threads=4 --localMB=512 --transferKB=1024 --iterations=5
)
rem
rem for a single process we can perform a range of tests
rem
rem FOR %%b IN (64 128 256 512 1024 2048 4096 8192 16384 32768 65536) DO (
rem FOR /l %%t IN (1, 1, 12) DO (
echo "Starting %1 instances as part of HPX job"
FOR /l %%x IN (0, 1, %N%) DO (
rem use "start /B" to suppress new window per task
start /B /WAIT D:\build\hpx\Debug\bin\network_storage.exe -l%1 -%%x -Ihpx.parcel.tcp.enable=1 -Ihpx.parcel.async_serialization=1 --hpx:threads=%%t --localMB=512 --transferKB=%%b --iterations=5
)
rem )
rem )

rem 2097152 65536
rem
Expand Down
37 changes: 32 additions & 5 deletions tests/performance/network/network_storage/network_storage.cpp
Expand Up @@ -67,11 +67,17 @@
// them from the waiting list. The vars are used for this bookkeeping task.
//
#define MAX_RANKS 64

//#define USE_CLEANING_THREAD

std::vector<std::vector<hpx::future<int>>> ActiveFutures;
hpx::lcos::local::spinlock FuturesMutex;
boost::atomic<bool> FuturesActive;
boost::array<boost::atomic<int>, 64> FuturesWaiting;

#ifdef USE_CLEANING_THREAD
boost::atomic<bool> FuturesActive;
hpx::lcos::local::spinlock FuturesMutex;
#endif

//----------------------------------------------------------------------------
// Used at start and end of each loop for synchronization
hpx::lcos::barrier unique_barrier;
Expand Down Expand Up @@ -294,6 +300,7 @@ namespace Storage {
// normally these are in a header
HPX_DEFINE_PLAIN_ACTION(Storage::CopyToStorage, CopyToStorage_action);
HPX_REGISTER_PLAIN_ACTION_DECLARATION(CopyToStorage_action);
HPX_ACTION_INVOKE_NO_MORE_THAN(CopyToStorage_action, 5);

HPX_DEFINE_PLAIN_ACTION(Storage::CopyFromStorage, CopyFromStorage_action);
HPX_REGISTER_PLAIN_ACTION_DECLARATION(CopyFromStorage_action);
Expand All @@ -304,6 +311,7 @@ HPX_REGISTER_PLAIN_ACTION(CopyToStorage_action);
HPX_REGISTER_PLAIN_ACTION(CopyFromStorage_action);

//----------------------------------------------------------------------------
#ifdef USE_CLEANING_THREAD
// the main message sending loop may generate many thousands of send requests
// and each is associated with a future. To reduce the number we must wait on
// this loop runs in a background thread and simply removes any completed futures
Expand Down Expand Up @@ -336,6 +344,7 @@ int RemoveCompletions()
}
return num_removed;
}
#endif

//----------------------------------------------------------------------------
// Take a vector of futures representing pass/fail and reduce to a single pass fail
Expand Down Expand Up @@ -391,16 +400,18 @@ void test_write(
);
//
hpx::util::high_resolution_timer timerWrite;
//

for(boost::uint64_t i = 0; i < options.iterations; i++) {
DEBUG_OUTPUT(1,
std::cout << "Starting iteration " << i << " on rank " << rank << std::endl;
);
#ifdef USE_CLEANING_THREAD
//
// start a thread which will clear any completed futures from our list.
//
FuturesActive = true;
hpx::future<int> cleaner = hpx::async(RemoveCompletions);
#endif
//
// Start main message sending loop
//
Expand All @@ -414,6 +425,9 @@ void test_write(
// pick a random slot to write our data into
int memory_slot = random_slot(gen);
uint32_t memory_offset = static_cast<uint32_t>(memory_slot*options.transfer_size_B);
DEBUG_OUTPUT(3,
std::cout << "Rank " << rank << " sending slot " << i << " to rank " << send_rank << std::endl;
);

// Execute a PUT on whatever locality we chose
// Create a serializable memory buffer ready for sending.
Expand All @@ -423,8 +437,10 @@ void test_write(
DEBUG_OUTPUT(5,
std::cout << "Put from rank " << rank << " on rank " << send_rank << std::endl;
);
#ifdef USE_CLEANING_THREAD
++FuturesWaiting[send_rank];
hpx::lcos::local::spinlock::scoped_lock lk(FuturesMutex);
#endif
ActiveFutures[send_rank].push_back(
hpx::async(actWrite, locality,
TransferBuffer(static_cast<char*>(buffer),
Expand All @@ -440,13 +456,15 @@ void test_write(
);
}
}
#ifdef USE_CLEANING_THREAD
// tell the cleaning thread it's time to stop
FuturesActive = false;
// wait for cleanup thread to terminate before we reduce any remaining futures
int removed = cleaner.get();
DEBUG_OUTPUT(2,
std::cout << "Cleaning thread removed " << removed << std::endl;
);
#endif
//
std::vector<hpx::future<int>> final_list;
for(uint64_t i = 0; i < nranks; i++) {
Expand Down Expand Up @@ -474,7 +492,7 @@ void test_write(
std::cout << "IOPs/s : " << IOPs_s << "\n";
std::cout << "Aggregate BW Write : " << writeBW << "MB/s" << std::endl;
// a complete set of results that our python matplotlib script will ingest
char const* msg = "CSVData, write, network, %1%, ranks, %2%, threads, %3%, Memory, %4%, IOPsize, %5%, IOPS/s, %6%, BW, %7%, ";
char const* msg = "CSVData, write, network, %1%, ranks, %2%, threads, %3%, Memory, %4%, IOPsize, %5%, IOPS/s, %6%, BW(MB/s), %7%, ";
std::cout << (boost::format(msg) % options.network % nranks % options.threads % writeMB % options.transfer_size_B
% IOPs_s % writeBW ) << std::endl;
}
Expand Down Expand Up @@ -507,11 +525,13 @@ void test_read(
hpx::util::high_resolution_timer timerRead;
//
for(boost::uint64_t i = 0; i < options.iterations; i++) {
#ifdef USE_CLEANING_THREAD
//
// start a thread which will clear any completed futures from our list.
//
FuturesActive = true;
hpx::future<int> cleaner = hpx::async(RemoveCompletions);
#endif
//
// Start main message sending loop
//
Expand All @@ -532,8 +552,10 @@ void test_read(
// is performed directly into our user memory. This avoids the need
// to copy the data from a serialization buffer into our memory
{
#ifdef USE_CLEANING_THREAD
++FuturesWaiting[send_rank];
hpx::lcos::local::spinlock::scoped_lock lk(FuturesMutex);
#endif
ActiveFutures[send_rank].push_back(
hpx::async(
actRead, locality, memory_offset, options.transfer_size_B,
Expand All @@ -552,6 +574,7 @@ void test_read(
);
}
}
#ifdef USE_CLEANING_THREAD
// tell the cleaning thread it's time to stop
FuturesActive = false;
// wait for cleanup thread to terminate before we reduce any remaining
Expand All @@ -560,6 +583,7 @@ void test_read(
DEBUG_OUTPUT(2,
std::cout << "Cleaning thread removed " << removed << std::endl;
);
#endif
//
std::vector<hpx::future<int>> final_list;
for(uint64_t i = 0; i < nranks; i++) {
Expand Down Expand Up @@ -587,7 +611,7 @@ void test_read(
std::cout << "IOPs/s : " << IOPs_s << "\n";
std::cout << "Aggregate BW Read : " << readBW << "MB/s" << std::endl;
// a complete set of results that our python matplotlib script will ingest
char const* msg = "CSVData, read, network, %1%, ranks, %2%, threads, %3%, Memory, %4%, IOPsize, %5%, IOPS/s, %6%, BW, %7%, ";
char const* msg = "CSVData, read, network, %1%, ranks, %2%, threads, %3%, Memory, %4%, IOPsize, %5%, IOPS/s, %6%, BW(MB/s), %7%, ";
std::cout << (boost::format(msg) % options.network % nranks % options.threads % readMB % options.transfer_size_B
% IOPs_s % readBW ) << std::endl;
}
Expand Down Expand Up @@ -658,6 +682,9 @@ int hpx_main(boost::program_options::variables_map& vm)
allocate_local_storage(options.local_storage_MB*1024*1024);
//
uint64_t num_transfer_slots = 1024*1024*options.local_storage_MB / options.transfer_size_B;
DEBUG_OUTPUT(1,
std::cout << "num ranks " << nranks << ", num_transfer_slots " << num_transfer_slots << " on rank " << rank << std::endl;
);
//
std::random_device rd;
std::mt19937 gen(rd());
Expand Down