Skip to content

Commit

Permalink
Merge pull request #2145 from DARMA-tasking/2074-support-sparse-offli…
Browse files Browse the repository at this point in the history
…nelb-maps

#2074: support sparse OfflineLB maps
  • Loading branch information
nlslatt committed Jun 25, 2024
2 parents 2d45de6 + 61d324a commit 3638817
Show file tree
Hide file tree
Showing 13 changed files with 557 additions and 52 deletions.
7 changes: 7 additions & 0 deletions cmake/link_vt.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -236,4 +236,11 @@ function(link_target_with_vt)
if (vt_ubsan_enabled)
target_link_libraries(${ARG_TARGET} PUBLIC ${ARG_BUILD_TYPE} -fsanitize=undefined)
endif()

# Enable additional flag for GCC-8 to link std::filesystem
if (${CMAKE_CXX_COMPILER_ID} MATCHES "GNU")
if (NOT (CMAKE_CXX_COMPILER_VERSION VERSION_GREATER_EQUAL 9))
target_link_libraries(${ARG_TARGET} PUBLIC ${ARG_BUILD_TYPE} -lstdc++fs)
endif ()
endif ()
endfunction()
6 changes: 4 additions & 2 deletions src/vt/collective/collective_ops.cc
Original file line number Diff line number Diff line change
Expand Up @@ -293,8 +293,10 @@ void CollectiveAnyOps<instance>::abort(
auto myrt = tls_rt ? tls_rt : ::vt::rt;
if (myrt) {
#if vt_check_enabled(trace_enabled)
//--- Try to flush most of the traces before aborting
myrt->theTrace->cleanupTracesFile();
if (myrt->theTrace) {
//--- Try to flush most of the traces before aborting
myrt->theTrace->cleanupTracesFile();
}
#endif
myrt->abort(str, code);
} else if (vt::debug::preConfig()->vt_throw_on_abort) {
Expand Down
2 changes: 1 addition & 1 deletion src/vt/configs/error/stack_out.cc
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ std::string prettyPrintStack(DumpStackType const& stack) {
auto magenta = ::vt::debug::magenta();
auto yellow = ::vt::debug::yellow();
auto vt_pre = ::vt::debug::vtPre();
auto node = ::vt::theContext()->getNode();
auto node = ::vt::theContext() ? ::vt::theContext()->getNode() : -1;
auto node_str = ::vt::debug::proc(node);
auto prefix = vt_pre + node_str + " ";
auto separator = fmt::format("{}{}{:-^120}{}\n", prefix, yellow, "", reset);
Expand Down
30 changes: 24 additions & 6 deletions src/vt/runtime/runtime.cc
Original file line number Diff line number Diff line change
Expand Up @@ -424,12 +424,25 @@ bool Runtime::tryFinalize(bool const disable_sig) {
}

bool Runtime::needLBDataRestartReader() {
using vrt::collection::balance::ReadLBConfig;
using vrt::collection::balance::LBType;
using vrt::collection::balance::get_lb_names;

bool needOfflineLB = false;

#if vt_check_enabled(lblite)
if (true) {
return arg_config_->config_.vt_lb_data_in;
} else
if (ReadLBConfig::openConfig(arg_config_->config_.vt_lb_file_name)) {
needOfflineLB = ReadLBConfig::hasOfflineLB();
}

needOfflineLB = needOfflineLB || arg_config_->config_.vt_lb_name == get_lb_names()[LBType::OfflineLB];

if (needOfflineLB && !arg_config_->config_.vt_lb_data_in) {
vtAbort("VT cannot run OfflineLB without '--vt_lb_data_in' parameter.");
}
#endif
return false;

return needOfflineLB;
}

bool Runtime::initialize(bool const force_now) {
Expand Down Expand Up @@ -566,7 +579,7 @@ void Runtime::reset() {
void Runtime::abort(std::string const abort_str, ErrorCodeType const code) {
output(abort_str, code, true, true, false);

if (theConfig()->vt_throw_on_abort) {
if (theContext && theConfig()->vt_throw_on_abort) {
throw std::runtime_error(abort_str);
} else {
aborted_ = true;
Expand Down Expand Up @@ -627,7 +640,12 @@ void Runtime::output(
fmt::print(stderr, "{}\n", prefix);
}

if (!theConfig()->vt_no_stack) {
if (theContext == nullptr) {
// Too early in init process to check dump settings - always dump stack.
auto stack = debug::stack::dumpStack();
auto stack_pretty = debug::stack::prettyPrintStack(stack);
fmt::print("{}", stack_pretty);
} else if (!theConfig()->vt_no_stack) {
bool const on_abort = !theConfig()->vt_no_abort_stack;
bool const on_warn = !theConfig()->vt_no_warn_stack;
bool const dump = (error && on_abort) || (!error && on_warn);
Expand Down
91 changes: 59 additions & 32 deletions src/vt/vrt/collection/balance/lb_data_restart_reader.cc
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,45 @@ void LBDataRestartReader::startup() {
}

void LBDataRestartReader::readHistory(LBDataHolder const& lbdh) {
num_phases_ = lbdh.node_data_.size();
auto find_max_data_phase = [&]() -> PhaseType {
if (lbdh.node_data_.empty()) {
return 0;
}
return std::max_element(
lbdh.node_data_.begin(), lbdh.node_data_.end(),
[](const auto& p1, const auto& p2) { return p1.first < p2.first; })
->first;
};

// Find last phase number
auto largest_data = find_max_data_phase();
auto largest_identical =
lbdh.identical_phases_.size() > 0 ? *lbdh.identical_phases_.rbegin() : 0;
auto largest_skipped =
lbdh.skipped_phases_.size() > 0 ? *lbdh.skipped_phases_.rbegin() : 0;
num_phases_ =
std::max(std::max(largest_data, largest_identical), largest_skipped) + 1;

PhaseType last_found_phase = 0;
for (PhaseType phase = 0; phase < num_phases_; phase++) {
auto iter = lbdh.node_data_.find(phase);
if (iter != lbdh.node_data_.end()) {
last_found_phase = phase;
for (auto const& obj : iter->second) {
if (obj.first.isMigratable()) {
history_[phase].insert(obj.first);
if (history_[phase] == nullptr) {
history_[phase] = std::make_shared<std::set<ElementIDStruct>>();
}
history_[phase]->insert(obj.first);
}
}
} else {
// We assume that all phases are dense all fully specified even if they
// don't change
vtAbort("Could not find data: phases must all be specified");
} else if (
lbdh.identical_phases_.find(phase) != lbdh.identical_phases_.end()) {
// Phase is identical to previous one, use the shared pointer to data from previous phase
addIdenticalPhase(phase, last_found_phase);
} else if (lbdh.skipped_phases_.find(phase) == lbdh.skipped_phases_.end()) {
vtAbort("Could not find data: Skipped phases needs to be listed in file "
"metadata.");
}
}
}
Expand Down Expand Up @@ -134,12 +160,12 @@ void LBDataRestartReader::arriving(ArriveMsg* msg) {
}

void LBDataRestartReader::update(UpdateMsg* msg) {
auto iter = history_[msg->phase].find(msg->elm);
vtAssert(iter != history_[msg->phase].end(), "Must exist");
auto iter = history_[msg->phase]->find(msg->elm);
vtAssert(iter != history_[msg->phase]->end(), "Must exist");
auto elm = *iter;
elm.curr_node = msg->curr_node;
history_[msg->phase].erase(iter);
history_[msg->phase].insert(elm);
history_[msg->phase]->erase(iter);
history_[msg->phase]->insert(elm);
}

void LBDataRestartReader::checkBothEnds(Coord& coord) {
Expand All @@ -155,30 +181,31 @@ void LBDataRestartReader::determinePhasesToMigrate() {
local_changed_distro.resize(num_phases_ - 1);

auto const this_node = theContext()->getNode();

runInEpochCollective("LBDataRestartReader::updateLocations", [&]{
for (PhaseType i = 0; i < num_phases_ - 1; ++i) {
local_changed_distro[i] = history_[i] != history_[i+1];
if (local_changed_distro[i]) {
std::set<ElementIDStruct> departing, arriving;

std::set_difference(
history_[i+1].begin(), history_[i+1].end(),
history_[i].begin(), history_[i].end(),
std::inserter(arriving, arriving.begin())
);

std::set_difference(
history_[i].begin(), history_[i].end(),
history_[i+1].begin(), history_[i+1].end(),
std::inserter(departing, departing.begin())
);

for (auto&& d : departing) {
proxy_[d.getHomeNode()].send<DepartMsg, &LBDataRestartReader::departing>(this_node, i+1, d);
}
for (auto&& a : arriving) {
proxy_[a.getHomeNode()].send<ArriveMsg, &LBDataRestartReader::arriving>(this_node, i+1, a);
if(history_.count(i) && history_.count(i+1)) {
local_changed_distro[i] = *history_[i] != *history_[i+1];
if (local_changed_distro[i]) {
std::set<ElementIDStruct> departing, arriving;

std::set_difference(
history_[i+1]->begin(), history_[i+1]->end(),
history_[i]->begin(), history_[i]->end(),
std::inserter(arriving, arriving.begin())
);

std::set_difference(
history_[i]->begin(), history_[i]->end(),
history_[i+1]->begin(), history_[i+1]->end(),
std::inserter(departing, departing.begin())
);

for (auto&& d : departing) {
proxy_[d.getHomeNode()].send<DepartMsg, &LBDataRestartReader::departing>(this_node, i+1, d);
}
for (auto&& a : arriving) {
proxy_[a.getHomeNode()].send<ArriveMsg, &LBDataRestartReader::arriving>(this_node, i+1, a);
}
}
}
}
Expand Down
35 changes: 28 additions & 7 deletions src/vt/vrt/collection/balance/lb_data_restart_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,11 +128,11 @@ struct LBDataRestartReader : runtime::component::Component<LBDataRestartReader>
*
* \param[in] phase the phase
*
* \return element assigned to this node
* \return pointer to elements assigned to this node if not skipped
*/
std::set<ElementIDStruct> const& getDistro(PhaseType phase) {
std::shared_ptr<const std::set<ElementIDStruct>> getDistro(PhaseType phase) const {
auto iter = history_.find(phase);
vtAssert(iter != history_.end(), "Must have a valid phase");
vtAssert(iter != history_.end(), "Must have a valid, not skipped phase");
return iter->second;
}

Expand All @@ -142,10 +142,31 @@ struct LBDataRestartReader : runtime::component::Component<LBDataRestartReader>
* \param[in] phase the phase to clear
*/
void clearDistro(PhaseType phase) {
auto iter = history_.find(phase);
if (iter != history_.end()) {
history_.erase(iter);
history_.erase(phase);
}

/**
* \brief Add history for a given phase
*
* \param[in] phase the phase to be added
* \param[in] distro the distribution to be added
*/
void addDistro(PhaseType phase, const std::set<ElementIDStruct>& distro) {
if (history_[phase] == nullptr) {
history_[phase] = std::make_shared<std::set<ElementIDStruct>>();
}
history_[phase]->insert(distro.begin(), distro.end());
}

/**
* \brief Add identical phase to one already present
*
* \param[in] phase the phase to be added
* \param[in] identical the identical phase to be used
*/
void addIdenticalPhase(PhaseType phase, PhaseType identical) {
vtAssert(history_.find(identical) != history_.end(), "Identical phase was not added to history map.");
history_[phase] = history_[identical];
}

private:
Expand All @@ -170,7 +191,7 @@ struct LBDataRestartReader : runtime::component::Component<LBDataRestartReader>
std::vector<bool> changed_distro_;

/// History of mapping that was read in from the data files
std::unordered_map<PhaseType, std::set<ElementIDStruct>> history_;
std::unordered_map<PhaseType, std::shared_ptr<std::set<ElementIDStruct>>> history_;

struct DepartMsg : vt::Message {
DepartMsg(NodeType in_depart_node, PhaseType in_phase, ElementIDStruct in_elm)
Expand Down
5 changes: 5 additions & 0 deletions src/vt/vrt/collection/balance/lb_invoke/lb_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,11 @@ LBType LBManager::decideLBToRun(PhaseType phase, bool try_file) {
}
}

// Check if LBDataRestartReader requires to run OfflineLB for a given phase.
if(the_lb == LBType::OfflineLB && !theLBDataReader()->needsLB(phase)) {
the_lb = LBType::NoLB;
}

vt_debug_print(
terse, lb,
"LBManager::decidedLBToRun: phase={}, return lb_={}\n",
Expand Down
4 changes: 2 additions & 2 deletions src/vt/vrt/collection/balance/offlinelb/offlinelb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ void OfflineLB::init(objgroup::proxy::Proxy<OfflineLB> in_proxy) {
}

void OfflineLB::runLB(LoadType) {
auto const& distro = theLBDataReader()->getDistro(phase_ + 1);
for (auto&& elm : distro) {
auto const distro = theLBDataReader()->getDistro(phase_ + 1);
for (auto&& elm : *distro) {
migrateObjectTo(elm, theContext()->getNode());
}
theLBDataReader()->clearDistro(phase_ + 1);
Expand Down
6 changes: 6 additions & 0 deletions src/vt/vrt/collection/balance/read_lb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ namespace vt { namespace vrt { namespace collection { namespace balance {
/*static*/ typename ReadLBConfig::ConfigMapType ReadLBConfig::config_exact_ = {};
/*static*/ std::vector<ConfigIndex> ReadLBConfig::config_prec_ = {};
/*static*/ bool ReadLBConfig::read_complete_ = false;
/*static*/ bool ReadLBConfig::has_offline_lb_ = false;

/*static*/ bool ReadLBConfig::openConfig(std::string const& filename) {
// No-op if no file specified. Can't be used to clear.
Expand Down Expand Up @@ -231,6 +232,10 @@ int eatWhitespace(std::ifstream& file) {
vtAbort(err_msg);
}

if (lb_name == get_lb_names()[LBType::OfflineLB]) {
has_offline_lb_ = true;
}

map->emplace(
std::piecewise_construct,
std::forward_as_tuple(mod),
Expand All @@ -243,6 +248,7 @@ int eatWhitespace(std::ifstream& file) {

/*static*/ void ReadLBConfig::clear() {
read_complete_ = false;
has_offline_lb_ = false;
open_filename_ = "";
config_mod_.clear();
config_exact_.clear();
Expand Down
2 changes: 2 additions & 0 deletions src/vt/vrt/collection/balance/read_lb.h
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@ struct ReadLBConfig {
static ConfigIndex numEntries() { return config_mod_.size() + config_exact_.size(); }
static ConfigEntry* entry(ConfigIndex const& idx);
static LBType getLB(ConfigIndex const& idx);
static bool hasOfflineLB() { return has_offline_lb_; };
static ConfigMapType getModEntries() { return config_mod_; };
static ConfigMapType getExactEntries() {return config_exact_; };
static ParamMapType parseParams(std::vector<std::string> params);
Expand All @@ -208,6 +209,7 @@ struct ReadLBConfig {
static void readFile(std::string const& filename);

static bool read_complete_;
static bool has_offline_lb_;
static std::string open_filename_;
static ConfigMapType config_mod_;
static ConfigMapType config_exact_;
Expand Down
23 changes: 21 additions & 2 deletions tests/unit/lb/test_lb_reader.nompi.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,15 @@
#include <vt/vrt/collection/balance/read_lb.h>

#include "test_harness.h"
#include "test_helpers.h"

namespace vt { namespace tests { namespace unit {

using TestLBReader = TestHarness;

TEST_F(TestLBReader, test_lb_read_1) {

std::string file_name = "test_lb_read_1.txt";
std::string file_name = getUniqueFilename(".txt");
std::ofstream out(file_name);
out << ""
"0 NoLB\n"
Expand All @@ -66,6 +67,7 @@ TEST_F(TestLBReader, test_lb_read_1) {
Config::clear();
Config::openConfig(file_name);

EXPECT_EQ(Config::hasOfflineLB(), false);
EXPECT_EQ(Config::numEntries(), 3);
EXPECT_EQ(Config::getExactEntries().size(), 2);
EXPECT_EQ(Config::getModEntries().size(), 1);
Expand Down Expand Up @@ -104,7 +106,7 @@ TEST_F(TestLBReader, test_lb_read_1) {

TEST_F(TestLBReader, test_lb_read_2) {

std::string file_name = "test_lb_read_2.txt";
std::string file_name = getUniqueFilename(".txt");
std::ofstream out(file_name);
out << ""
"0 NoLB\n"
Expand All @@ -120,6 +122,7 @@ TEST_F(TestLBReader, test_lb_read_2) {
Config::clear();
Config::openConfig(file_name);

EXPECT_EQ(Config::hasOfflineLB(), false);
EXPECT_EQ(Config::numEntries(), 5);
for (ConfigIdx i = 0; i < 121; i++) {
auto entry = Config::entry(i);
Expand Down Expand Up @@ -195,4 +198,20 @@ TEST_F(TestLBReader, test_lb_read_2) {
EXPECT_EQ(Config::toString(), expected_config);
}

TEST_F(TestLBReader, test_lb_read_3_with_offline_lb) {
std::string file_name = getUniqueFilename(".txt");
std::ofstream out(file_name);
out << ""
"0 NoLB\n"
"1 OfflineLB\n"
"%10 OfflineLB\n";
out.close();

using Config = vt::vrt::collection::balance::ReadLBConfig;

Config::clear();
Config::openConfig(file_name);
EXPECT_EQ(Config::hasOfflineLB(), true);
}

}}} // end namespace vt::tests::unit
Loading

0 comments on commit 3638817

Please sign in to comment.