Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/sustainable solution for bug #468

Merged
merged 14 commits into from
Jan 16, 2024
Original file line number Diff line number Diff line change
Expand Up @@ -436,92 +436,153 @@ class MainModelImpl<ExtraRetrievableTypes<ExtraRetrievableType...>, ComponentLis
// missing entries are provided in the update data
}

// error messages
std::vector<std::string> exceptions(n_batch, "");
std::vector<CalculationInfo> infos(n_batch);

// lambda for sub batch calculation
auto sub_batch = sub_batch_calculation_(calculation_fn, result_data, update_data, exceptions, infos);

batch_dispatch(sub_batch, n_batch, threading);

handle_batch_exceptions(exceptions);
calculation_info_ = main_core::merge_calculation_info(infos);

return BatchParameter{};
}

template <typename Calculate>
requires std::invocable<std::remove_cvref_t<Calculate>, MainModelImpl&, Dataset const&, Idx>
auto sub_batch_calculation_(Calculate&& calculation_fn, Dataset const& result_data, ConstDataset const& update_data,
std::vector<std::string>& exceptions, std::vector<CalculationInfo>& infos) {
// const ref of current instance
MainModelImpl const& base_model = *this;

// cache component update order if possible
bool const is_independent = MainModelImpl::is_update_independent(update_data);

// error messages
std::vector<std::string> exceptions(n_batch, "");
std::vector<CalculationInfo> infos(n_batch);
return [&base_model, &exceptions, &infos, &calculation_fn, &result_data, &update_data,
is_independent](Idx start, Idx stride, Idx n_batch) {
assert(n_batch <= narrow_cast<Idx>(exceptions.size()));
assert(n_batch <= narrow_cast<Idx>(infos.size()));

// lambda for sub batch calculation
auto sub_batch = [&base_model, &exceptions, &infos, &calculation_fn, &result_data, &update_data, n_batch,
is_independent](Idx start, Idx stride) {
Timer const t_total(infos[start], 0000, "Total in thread");

auto model = [&base_model, &infos, start] {
Timer const t_copy_model(infos[start], 1100, "Copy model");
auto copy_model = [&base_model, &infos](Idx scenario_idx) {
Timer const t_copy_model(infos[scenario_idx], 1100, "Copy model");
return MainModelImpl{base_model};
}();
};
auto model = copy_model(start);

SequenceIdx scenario_sequence = is_independent ? model.get_sequence_idx_map(update_data) : SequenceIdx{};

auto [setup, winddown] =
scenario_update_restore(model, update_data, is_independent, scenario_sequence, infos);

auto calculate_scenario = MainModelImpl::call_with<Idx>(
[&model, &calculation_fn, &result_data, &infos](Idx scenario_idx) {
calculation_fn(model, result_data, scenario_idx);
infos[scenario_idx].merge(model.calculation_info_);
},
std::move(setup), std::move(winddown), scenario_exception_handler(model, exceptions, infos),
[&model, &copy_model](Idx scenario_idx) { model = copy_model(scenario_idx); });

for (Idx batch_number = start; batch_number < n_batch; batch_number += stride) {
Timer const t_total_single(infos[batch_number], 0100, "Total single calculation in thread");
// try to update model and run calculation
try {
{
Timer const t_update_model(infos[batch_number], 1200, "Update model");
if (!is_independent) {
scenario_sequence = model.get_sequence_idx_map(update_data, batch_number);
}
model.template update_component<cached_update_t>(update_data, batch_number, scenario_sequence);
}
calculation_fn(model, result_data, batch_number);
{
Timer const t_update_model(infos[batch_number], 1201, "Restore model");
model.restore_components(scenario_sequence);
if (!is_independent) {
std::ranges::for_each(scenario_sequence, [](auto& comp_seq_idx) { comp_seq_idx.clear(); });
}
}
} catch (std::exception const& ex) {
exceptions[batch_number] = ex.what();
model = [&base_model, &infos, start] {
Timer const t_copy_model(infos[start], 1100, "Copy model");
return MainModelImpl{base_model};
}();
} catch (...) {
exceptions[batch_number] = "unknown exception";
model = [&base_model, &infos, start] {
Timer const t_copy_model(infos[start], 1100, "Copy model");
return MainModelImpl{base_model};
}();
}

infos[batch_number].merge(model.calculation_info_);
calculate_scenario(batch_number);
}
};
}

// run sequential if
// specified threading < 0
// use hardware threads, but it is either unknown (0) or only has one thread (1)
// specified threading = 1
template <typename RunSubBatchFn>
requires std::invocable<std::remove_cvref_t<RunSubBatchFn>, Idx /*start*/, Idx /*stride*/, Idx /*n_batch*/>
static void batch_dispatch(RunSubBatchFn sub_batch, Idx n_batch, Idx threading) {
// run batches sequential or parallel
auto const hardware_thread = static_cast<Idx>(std::thread::hardware_concurrency());
// run sequential if
// specified threading < 0
// use hardware threads, but it is either unknown (0) or only has one thread (1)
// specified threading = 1
if (threading < 0 || threading == 1 || (threading == 0 && hardware_thread < 2)) {
// run all in sequential
sub_batch(0, 1);
sub_batch(0, 1, n_batch);
} else {
// create parallel threads
Idx const n_thread = std::min(threading == 0 ? hardware_thread : threading, n_batch);
std::vector<std::thread> threads;
threads.reserve(n_thread);
for (Idx thread_number = 0; thread_number < n_thread; ++thread_number) {
// compute each sub batch with stride
threads.emplace_back(sub_batch, thread_number, n_thread);
threads.emplace_back(sub_batch, thread_number, n_thread, n_batch);
}
for (auto& thread : threads) {
thread.join();
}
}
}

handle_batch_exceptions(exceptions);
calculation_info_ = main_core::merge_calculation_info(infos);
template <typename... Args, typename RunFn, typename SetupFn, typename WinddownFn, typename HandleExceptionFn,
typename RecoverFromBadFn>
requires std::invocable<std::remove_cvref_t<RunFn>, Args const&...> &&
std::invocable<std::remove_cvref_t<SetupFn>, Args const&...> &&
std::invocable<std::remove_cvref_t<WinddownFn>, Args const&...> &&
std::invocable<std::remove_cvref_t<HandleExceptionFn>, Args const&...> &&
std::invocable<std::remove_cvref_t<RecoverFromBadFn>, Args const&...>
static auto call_with(RunFn run, SetupFn setup, WinddownFn winddown, HandleExceptionFn handle_exception,
RecoverFromBadFn recover_from_bad) {
return [setup_ = std::move(setup), run_ = std::move(run), winddown_ = std::move(winddown),
handle_exception_ = std::move(handle_exception),
recover_from_bad_ = std::move(recover_from_bad)](Args const&... args) {
try {
setup_(args...);
run_(args...);
winddown_(args...);
} catch (...) {
handle_exception_(args...);
try {
winddown_(args...);
} catch (...) {
recover_from_bad_(args...);
}
}
};
}

return BatchParameter{};
static auto scenario_update_restore(MainModelImpl& model, ConstDataset const& update_data,
bool const is_independent, SequenceIdx& scenario_sequence,
std::vector<CalculationInfo>& infos) {
return std::make_pair(
[&model, &update_data, &scenario_sequence, is_independent, &infos](Idx scenario_idx) {
Timer const t_update_model(infos[scenario_idx], 1200, "Update model");
if (!is_independent) {
scenario_sequence = model.get_sequence_idx_map(update_data, scenario_idx);
}
model.template update_component<cached_update_t>(update_data, scenario_idx, scenario_sequence);
},
[&model, &scenario_sequence, is_independent, &infos](Idx scenario_idx) {
Timer const t_update_model(infos[scenario_idx], 1201, "Restore model");
model.restore_components(scenario_sequence);
if (!is_independent) {
std::ranges::for_each(scenario_sequence, [](auto& comp_seq_idx) { comp_seq_idx.clear(); });
}
});
}

// Lippincott pattern
static auto scenario_exception_handler(MainModelImpl& model, std::vector<std::string>& messages,
std::vector<CalculationInfo>& infos) {
return [&model, &messages, &infos](Idx scenario_idx) {
std::exception_ptr const ex_ptr = std::current_exception();
try {
std::rethrow_exception(ex_ptr);
} catch (std::exception const& ex) {
messages[scenario_idx] = ex.what();
} catch (...) {
messages[scenario_idx] = "unknown exception";
}
infos[scenario_idx].merge(model.calculation_info_);
};
}

static void handle_batch_exceptions(std::vector<std::string> const& exceptions) {
Expand Down
11 changes: 5 additions & 6 deletions tests/cpp_unit_tests/test_main_model.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1095,11 +1095,10 @@ auto incomplete_input_model(State const& state) -> MainModel {

std::vector<SourceInput> const incomplete_source_input{{6, 1, 1, nan, nan, 1e12, nan, nan},
{10, 3, 1, nan, nan, 1e12, nan, nan}};
std::vector<SymLoadGenInput> incomplete_sym_load_input{{7, 3, 1, LoadGenType::const_y, nan, 0.0}};
std::vector<AsymLoadGenInput> incomplete_asym_load_input{
std::vector<SymLoadGenInput> const incomplete_sym_load_input{{7, 3, 1, LoadGenType::const_y, nan, 0.0}};
std::vector<AsymLoadGenInput> const incomplete_asym_load_input{
{8, 3, 1, LoadGenType::const_y, RealValue<false>{nan}, RealValue<false>{0.0}}};

ConstDataset input_data;
main_model.add_component<Node>(state.node_input);
main_model.add_component<Line>(state.line_input);
main_model.add_component<Link>(state.link_input);
Expand All @@ -1119,7 +1118,7 @@ TEST_CASE("Test main model - incomplete input") {
using CalculationMethod::linear_current;
using CalculationMethod::newton_raphson;

State state;
State const state;
auto main_model = default_model(state);
auto test_model = incomplete_input_model(state);

Expand Down Expand Up @@ -1272,10 +1271,10 @@ TEST_CASE("Test main model - incomplete input") {
}
}

TEST_CASE("Incomplete followed by complete") {
TEST_CASE("Test main model - Incomplete followed by complete") {
using CalculationMethod::linear;

State state;
State const state;
auto main_model = default_model(state);
auto test_model = incomplete_input_model(state);

Expand Down