Skip to content

Commit

Permalink
Save action times on multi-stream CPU celer-sim (#1065)
Browse files Browse the repository at this point in the history
* Record step times only when recording step counts

* Accumulate action times from all streams

* Remove bad condition

* Use double-precision for celer-sim timing values

* Average run times by number of streams to give 'wall time'

* Add const correctness to action time accessor

* Add option to write step times

* Fix step time conditional

* Fix goldfinger presets

* Add more assertions

* Don't create more streams than events

* Allow optional transporter and increase logging

* Tweak assertion and log
  • Loading branch information
sethrj committed Dec 21, 2023
1 parent 6e36826 commit 87e4421
Show file tree
Hide file tree
Showing 12 changed files with 132 additions and 80 deletions.
1 change: 1 addition & 0 deletions app/celer-sim/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ set(_env
"CELERITAS_DEMO_EXE=$<TARGET_FILE:celer-sim>"
"${_geant_exporter_env}"
"CELER_LOG=debug"
"CELER_LOG_LOCAL=debug"
"CELER_DISABLE_DEVICE=1"
"CELER_DISABLE_PARALLEL=1"
"CELER_CORE_GEO=${CELERITAS_CORE_GEO}"
Expand Down
116 changes: 70 additions & 46 deletions app/celer-sim/Runner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,45 @@ namespace celeritas
{
namespace app
{
namespace
{
//---------------------------------------------------------------------------//
/*!
* Get the number of streams from the OMP_NUM_THREADS environment variable.
*
* The value of OMP_NUM_THREADS should be a list of positive integers, each of
* which sets the number of threads for the parallel region at the
* corresponding nested level. The number of streams is set to the first value
* in the list.
*
* \note For a multithreaded CPU run, if OMP_NUM_THREADS is set to a single
* value, the number of threads for each nested parallel region will be set to
* that value.
*/
size_type calc_num_streams(RunnerInput const& inp)
{
size_type num_threads = 1;
#if CELERITAS_USE_OPENMP
if (!inp.merge_events)
{
std::string const& nt_str = celeritas::getenv("OMP_NUM_THREADS");
if (!nt_str.empty())
{
auto nt = std::stoi(nt_str);
CELER_VALIDATE(nt > 0, << "nonpositive num_streams=" << nt);
num_threads = static_cast<size_type>(nt);
}
}
#else
CELER_DISCARD(inp);
#endif
// Don't create more streams than events
return std::min(num_threads, inp.max_events);
}

//---------------------------------------------------------------------------//
} // namespace

//---------------------------------------------------------------------------//
/*!
* Construct on all threads from a JSON input and shared output manager.
Expand All @@ -93,8 +132,8 @@ Runner::Runner(RunnerInput const& inp, SPOutputRegistry output)
write_to_root(*core_params_, root_manager_.get());
}

CELER_ASSERT(core_params_);
transporters_.resize(this->num_streams());
CELER_ENSURE(core_params_);
}

//---------------------------------------------------------------------------//
Expand Down Expand Up @@ -132,6 +171,7 @@ auto Runner::operator()(StreamId stream, EventId event) -> RunnerResult
auto Runner::operator()() -> RunnerResult
{
CELER_EXPECT(events_.size() == 1);
CELER_EXPECT(this->num_streams() == 1);

auto& transport = this->get_transporter(StreamId{0});
return transport(make_span(events_.front()));
Expand All @@ -143,6 +183,7 @@ auto Runner::operator()() -> RunnerResult
*/
StreamId::size_type Runner::num_streams() const
{
CELER_EXPECT(core_params_);
return core_params_->max_streams();
}

Expand All @@ -159,22 +200,28 @@ size_type Runner::num_events() const
/*!
* Get the accumulated action times.
*
* Action times are only collected by the transporter when running with a
* single stream.
* This is a *mean* value over all streams.
*/
auto Runner::get_action_times() -> MapStrReal
auto Runner::get_action_times() const -> MapStrDouble
{
auto& transport = this->get_transporter(StreamId{0});
if (use_device_)
MapStrDouble result;
size_type num_streams{0};
for (auto sid : range(StreamId{this->num_streams()}))
{
return dynamic_cast<Transporter<MemSpace::device> const&>(transport)
.get_action_times();
if (auto* transport = this->get_transporter_ptr(sid))
{
transport->accum_action_times(&result);
++num_streams;
}
}
else

double norm{1 / static_cast<double>(num_streams)};
for (auto&& [action, time] : result)
{
return dynamic_cast<Transporter<MemSpace::host> const&>(transport)
.get_action_times();
time *= norm;
}

return result;
}

//---------------------------------------------------------------------------//
Expand Down Expand Up @@ -322,11 +369,11 @@ void Runner::build_core_params(RunnerInput const& inp,
params.sim = SimParams::from_import(imported, params.particle);

// Store the number of simultaneous threads/tasks per process
params.max_streams = this->get_num_streams(inp);
params.max_streams = calc_num_streams(inp);
CELER_VALIDATE(inp.mctruth_file.empty() || params.max_streams == 1,
<< "MC truth output is only supported with a single "
"stream ("
<< params.max_streams << " streams requested)");
<< "cannot output MC truth with multiple "
"streams ("
<< params.max_streams << " requested)");

// Construct track initialization params
params.init = [&inp, &params] {
Expand Down Expand Up @@ -366,6 +413,7 @@ void Runner::build_transporter_input(RunnerInput const& inp)
= ceil_div(inp.num_track_slots, core_params_->max_streams());
transporter_input_->max_steps = inp.max_steps;
transporter_input_->store_track_counts = inp.write_track_counts;
transporter_input_->store_step_times = inp.write_step_times;
transporter_input_->sync = inp.sync;
transporter_input_->params = core_params_;
}
Expand Down Expand Up @@ -499,9 +547,9 @@ void Runner::build_diagnostics(RunnerInput const& inp)
*/
auto Runner::get_transporter(StreamId stream) -> TransporterBase&
{
CELER_EXPECT(stream < this->num_streams());
CELER_EXPECT(stream < transporters_.size());

auto& result = transporters_[stream.get()];
UPTransporterBase& result = transporters_[stream.get()];
if (!result)
{
result = [this, stream]() -> std::unique_ptr<TransporterBase> {
Expand Down Expand Up @@ -530,37 +578,13 @@ auto Runner::get_transporter(StreamId stream) -> TransporterBase&

//---------------------------------------------------------------------------//
/*!
* Get the number of streams from the OMP_NUM_THREADS environment variable.
*
* The value of OMP_NUM_THREADS should be a list of positive integers, each of
* which sets the number of threads for the parallel region at the
* corresponding nested level. The number of streams is set to the first value
* in the list.
*
* \note For a multithreaded CPU run, if OMP_NUM_THREADS is set to a single
* value, the number of threads for each nested parallel region will be set to
* that value.
* Get an already-constructed transporter for the given stream.
*/
int Runner::get_num_streams(RunnerInput const& inp)
auto Runner::get_transporter_ptr(StreamId stream) const
-> TransporterBase const*
{
#ifdef _OPENMP
if (inp.merge_events)
{
return 1;
}

std::string const& nt_str = getenv("OMP_NUM_THREADS");
if (!nt_str.empty())
{
auto num_threads = std::stoi(nt_str);
CELER_VALIDATE(num_threads > 0,
<< "nonpositive num_streams=" << num_threads);
return num_threads;
}
#else
CELER_DISCARD(inp);
#endif
return 1;
CELER_EXPECT(stream < transporters_.size());
return transporters_[stream.get()].get();
}

//---------------------------------------------------------------------------//
Expand Down
5 changes: 3 additions & 2 deletions app/celer-sim/Runner.hh
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class Runner
//!@{
//! \name Type aliases
using Input = RunnerInput;
using MapStrReal = std::unordered_map<std::string, real_type>;
using MapStrDouble = std::unordered_map<std::string, double>;
using RunnerResult = TransporterResult;
using SPOutputRegistry = std::shared_ptr<OutputRegistry>;
//!@}
Expand All @@ -71,7 +71,7 @@ class Runner
size_type num_events() const;

// Get the accumulated action times
MapStrReal get_action_times();
MapStrDouble get_action_times() const;

private:
//// TYPES ////
Expand Down Expand Up @@ -102,6 +102,7 @@ class Runner
void build_events(RunnerInput const&);
int get_num_streams(RunnerInput const&);
TransporterBase& get_transporter(StreamId);
TransporterBase const* get_transporter_ptr(StreamId) const;
};

//---------------------------------------------------------------------------//
Expand Down
1 change: 1 addition & 0 deletions app/celer-sim/RunnerInput.hh
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ struct RunnerInput
bool step_diagnostic{};
int step_diagnostic_bins{1000};
bool write_track_counts{true}; //!< Output track counts for each step
bool write_step_times{true}; //!< Output elapsed times for each step

// Control
unsigned int seed{};
Expand Down
2 changes: 2 additions & 0 deletions app/celer-sim/RunnerInputIO.json.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,7 @@ void from_json(nlohmann::json const& j, RunnerInput& v)
LDIO_LOAD_OPTION(step_diagnostic);
LDIO_LOAD_OPTION(step_diagnostic_bins);
LDIO_LOAD_OPTION(write_track_counts);
LDIO_LOAD_OPTION(write_step_times);

LDIO_LOAD_DEPRECATED(max_num_tracks, num_track_slots);

Expand Down Expand Up @@ -166,6 +167,7 @@ void to_json(nlohmann::json& j, RunnerInput const& v)
LDIO_SAVE(step_diagnostic);
LDIO_SAVE_OPTION(step_diagnostic_bins);
LDIO_SAVE(write_track_counts);
LDIO_SAVE(write_step_times);

LDIO_SAVE(seed);
LDIO_SAVE(num_track_slots);
Expand Down
10 changes: 5 additions & 5 deletions app/celer-sim/RunnerOutput.hh
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,14 @@ namespace app
*/
struct SimulationResult
{
using MapStrReal = std::unordered_map<std::string, real_type>;
using MapStrDouble = std::unordered_map<std::string, double>;

//// DATA ////

real_type total_time{}; //!< Total simulation time
real_type setup_time{}; //!< One-time initialization cost
real_type warmup_time{}; //!< One-time warmup cost
MapStrReal action_times{}; //!< Accumulated action timing
double total_time{}; //!< Total simulation time
double setup_time{}; //!< One-time initialization cost
double warmup_time{}; //!< One-time warmup cost
MapStrDouble action_times{}; //!< Accumulated mean action wall times
std::vector<TransporterResult> events; //!< Results tallied for each event
size_type num_streams{}; //!< Number of CPU/OpenMP threads
};
Expand Down
14 changes: 7 additions & 7 deletions app/celer-sim/Transporter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ Transporter<M>::Transporter(TransporterInput inp)
: max_steps_(inp.max_steps)
, num_streams_(inp.params->max_streams())
, store_track_counts_(inp.store_track_counts)
, store_step_times_(inp.store_step_times)
{
CELER_EXPECT(inp);

Expand Down Expand Up @@ -106,7 +107,7 @@ auto Transporter<M>::operator()(SpanConstPrimary primaries)
{
append_track_counts(track_counts);
}
if (num_streams_ == 1)
if (store_step_times_)
{
result.step_times.push_back(get_step_time());
}
Expand All @@ -130,11 +131,12 @@ auto Transporter<M>::operator()(SpanConstPrimary primaries)

get_step_time = {};
track_counts = step();

if (store_track_counts_)
{
append_track_counts(track_counts);
}
if (num_streams_ == 1)
if (store_step_times_)
{
result.step_times.push_back(get_step_time());
}
Expand All @@ -149,14 +151,13 @@ auto Transporter<M>::operator()(SpanConstPrimary primaries)
* Transport the input primaries and all secondaries produced.
*/
template<MemSpace M>
auto Transporter<M>::get_action_times() const -> MapStrReal
void Transporter<M>::accum_action_times(MapStrDouble* result) const
{
// Get kernel timing if running with a single stream and if either on the
// device with synchronization enabled or on the host
MapStrReal result;
auto const& step = *stepper_;
auto const& action_seq = step.actions();
if (num_streams_ == 1 && (M == MemSpace::host || action_seq.sync()))
if (M == MemSpace::host || action_seq.sync())
{
auto const& action_ptrs = action_seq.actions();
auto const& times = action_seq.accum_time();
Expand All @@ -165,10 +166,9 @@ auto Transporter<M>::get_action_times() const -> MapStrReal
for (auto i : range(action_ptrs.size()))
{
auto&& label = action_ptrs[i]->label();
result[label] = times[i];
(*result)[label] += times[i];
}
}
return result;
}

//---------------------------------------------------------------------------//
Expand Down
18 changes: 9 additions & 9 deletions app/celer-sim/Transporter.hh
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ struct TransporterInput
// Loop control
size_type max_steps{};
bool store_track_counts{}; //!< Store track counts at each step
bool store_step_times{}; //!< Store time elapsed for each step

StreamId stream_id{0};

Expand Down Expand Up @@ -88,6 +89,7 @@ class TransporterBase
//!@{
//! \name Type aliases
using SpanConstPrimary = Span<Primary const>;
using MapStrDouble = std::unordered_map<std::string, double>;
//!@}

public:
Expand All @@ -96,8 +98,11 @@ class TransporterBase
// Run a single step with no active states to "warm up"
virtual void operator()() = 0;

// Transport the input primaries and all secondaries produced
//! Transport the input primaries and all secondaries produced
virtual TransporterResult operator()(SpanConstPrimary primaries) = 0;

//! Accumulate action times into the map
virtual void accum_action_times(MapStrDouble*) const = 0;
};

//---------------------------------------------------------------------------//
Expand All @@ -107,12 +112,6 @@ class TransporterBase
template<MemSpace M>
class Transporter final : public TransporterBase
{
public:
//!@{
//! \name Type aliases
using MapStrReal = std::unordered_map<std::string, real_type>;
//!@}

public:
// Construct from parameters
explicit Transporter(TransporterInput inp);
Expand All @@ -123,14 +122,15 @@ class Transporter final : public TransporterBase
// Transport the input primaries and all secondaries produced
TransporterResult operator()(SpanConstPrimary primaries) final;

// Get the accumulated action times
MapStrReal get_action_times() const;
// Accumulate action times into the map
void accum_action_times(MapStrDouble*) const final;

private:
std::shared_ptr<Stepper<M>> stepper_;
size_type max_steps_;
size_type num_streams_;
bool store_track_counts_;
bool store_step_times_;
};

//---------------------------------------------------------------------------//
Expand Down
2 changes: 2 additions & 0 deletions app/celer-sim/celer-sim.cc
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ void run(std::istream* is, std::shared_ptr<OutputRegistry> output)
}
else
{
CELER_LOG(status) << "Transporting " << run_stream.num_events()
<< " on " << num_streams << " threads";
MultiExceptionHandler capture_exception;
#ifdef _OPENMP
// Set the maximum number of nested parallel regions
Expand Down

0 comments on commit 87e4421

Please sign in to comment.