Skip to content

Commit

Permalink
i#5694: Add core-oriented analysis tool support
Browse files Browse the repository at this point in the history
Adds a new type of sharding for drmemtrace analysis tools: by core
instead of by thread.

Introduces a shard_type_t enum (SHARD_BY_THREAD and SHARD_BY_CORE)
passed to a new analysis_tool_t::initialize_shard_type() function to
inform tools of the shard type (this cannot be easily added to the
stream interface as the scheduler is not aware of the shard type).

Adds a new memtrace_stream_t::get_output_cpuid() query to get the
output cpu ordinal, or for replaying as-traced the original traced
cpuid (#6262).  Implements this for the scheduler.

Generalizes analyzer_t to take in scheduler options for SHARD_BY_CORE
to support analysis tools using the full range of schedules.  In this
mode, the core count is the worker count.  Updates the shard index to
be the core ordinal.  Adds time-based scheduling support with
analyzer_t using wall-clock time as the current time.

Adds a number of options to set sharding mode (-core_sharding,
-core_serial (not yet implemented)) and control the schedule
(-sched_quantum, -sched_time, sched_order_time, -record_file,
-replay_file, -cpu_schedule_file).

Updates the basic_counts tool to support core sharding.

Adds a new test core_sharded_test which leverages the analyzer_multi
and option parsing to test the top-level options within a framework
that can capture the output and run multiple tests sequentially in a
simpler framework than having a separate test with an output file for
each parameter being tested.

Left as future work:
+ Convert scheduler_launcher into a new schedule_stats tool
+ Add a new record to indicate STATUS_WAIT
+ Add -core_serial support
+ Convert drcachesim default and -cpu_scheduling to use
  get_output_cpuid()

Issue: #5694
  • Loading branch information
derekbruening committed Aug 25, 2023
1 parent 1a14566 commit 9bb9b4d
Show file tree
Hide file tree
Showing 16 changed files with 618 additions and 36 deletions.
11 changes: 10 additions & 1 deletion api/docs/release.dox
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,16 @@ changes:
- Nothing yet.

Further non-compatibility-affecting changes include:
- Nothing yet.
- Added core-sharded analysis tool support where traces are sharded by
core instead of by thread, with the thread schedules onto the cores
either following how they were traced or using a dynamic schedule.
Adds #dynamorio::drmemtrace::shard_type_t passed to initialize_shard_type()
to inform tools of the shard type.
Adds a new memtrace_stream_t::get_output_cpuid() query.
Adds a number of drcachesim options to set sharding mode (-core_sharding,
-core_serial) and control the schedule
(-sched_quantum, -sched_time, sched_order_time, -record_file,
-replay_file, -cpu_schedule_file).

**************************************************
<hr>
Expand Down
28 changes: 26 additions & 2 deletions clients/drcachesim/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -732,8 +732,7 @@ if (BUILD_TESTS)
set_tests_properties(tool.drcacheoff.raw2trace_unit_tests PROPERTIES TIMEOUT
${test_seconds})

add_executable(tool.scheduler.unit_tests tests/scheduler_unit_tests.cpp
tests/scheduler_unit_tests.cpp)
add_executable(tool.scheduler.unit_tests tests/scheduler_unit_tests.cpp)
target_link_libraries(tool.scheduler.unit_tests drmemtrace_analyzer test_helpers)
add_win32_flags(tool.scheduler.unit_tests)
if (WIN32)
Expand All @@ -747,6 +746,31 @@ if (BUILD_TESTS)
${PROJECT_SOURCE_DIR}/clients/drcachesim/tests)
set_tests_properties(tool.scheduler.unit_tests PROPERTIES TIMEOUT ${test_seconds})

add_executable(tool.drcachesim.core_sharded tests/core_sharded_test.cpp
# XXX: Better to put these into libraries but that requires a bigger cleanup:
analyzer_multi.cpp ${client_and_sim_srcs} reader/ipc_reader.cpp)
target_link_libraries(tool.drcachesim.core_sharded test_helpers
drmemtrace_raw2trace drmemtrace_simulator drmemtrace_reuse_distance
drmemtrace_histogram drmemtrace_reuse_time drmemtrace_basic_counts
drmemtrace_opcode_mix drmemtrace_syscall_mix drmemtrace_view drmemtrace_func_view
drmemtrace_raw2trace directory_iterator drmemtrace_invariant_checker
drmemtrace_analyzer)
add_win32_flags(tool.drcachesim.core_sharded)
if (WIN32)
# We have a dup symbol from linking in DR. Linking libc first doesn't help.
append_property_string(TARGET tool.drcachesim.core_sharded LINK_FLAGS
"/force:multiple")
endif ()
configure_DynamoRIO_standalone(tool.drcachesim.core_sharded)
use_DynamoRIO_extension(tool.drcachesim.core_sharded droption)
use_DynamoRIO_extension(tool.drcachesim.core_sharded drreg_static)
use_DynamoRIO_extension(tool.drcachesim.core_sharded drcovlib_static)
use_DynamoRIO_extension(tool.drcachesim.core_sharded drutil_static)
add_test(NAME tool.drcachesim.core_sharded
COMMAND tool.drcachesim.core_sharded
${PROJECT_SOURCE_DIR}/clients/drcachesim/tests)
set_tests_properties(tool.drcachesim.core_sharded PROPERTIES TIMEOUT ${test_seconds})

# XXX i#5675: add tests for other environments. Currently, the repository does not have
# a checked-in post-processed trace for x86-32 or AArchXX. We are also limited to
# the old format due to missing zip support so we can't use the new threadsig.x64.
Expand Down
20 changes: 20 additions & 0 deletions clients/drcachesim/analysis_tool.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,16 @@
namespace dynamorio {
namespace drmemtrace {

/**
* Identifies the type of shard.
*/
enum shard_type_t {
/** Sharded by software thread. */
SHARD_BY_THREAD,
/** Sharded by hardware core. */
SHARD_BY_CORE,
};

/**
* The base class for a tool that analyzes a trace. A new tool should subclass this
* class.
Expand Down Expand Up @@ -129,6 +139,16 @@ template <typename RecordType> class analysis_tool_tmpl_t {
{
return initialize();
}
/**
* Identifies the shard type for this analysis. The return value indicates whether
* the tool supports this shard type, with failure (a non-empty string) being treated
* as a fatal error for the analysis.
*/
virtual std::string
initialize_shard_type(shard_type_t shard_type)
{
return "";
}
/** Returns whether the tool was created successfully. */
virtual bool
operator!()
Expand Down
95 changes: 75 additions & 20 deletions clients/drcachesim/analyzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@

#include "analyzer.h"

#ifdef WINDOWS
# define WIN32_LEAN_AND_MEAN
# include <windows.h>
#else
# include <sys/time.h>
#endif

#include <stddef.h>
#include <stdint.h>

Expand Down Expand Up @@ -170,9 +177,9 @@ analyzer_tmpl_t<RecordType, ReaderType>::analyzer_tmpl_t()

template <typename RecordType, typename ReaderType>
bool
analyzer_tmpl_t<RecordType, ReaderType>::init_scheduler(const std::string &trace_path,
memref_tid_t only_thread,
int verbosity)
analyzer_tmpl_t<RecordType, ReaderType>::init_scheduler(
const std::string &trace_path, memref_tid_t only_thread, int verbosity,
typename sched_type_t::scheduler_options_t *options)
{
verbosity_ = verbosity;
if (trace_path.empty()) {
Expand All @@ -191,14 +198,14 @@ analyzer_tmpl_t<RecordType, ReaderType>::init_scheduler(const std::string &trace
if (only_thread != INVALID_THREAD_ID) {
workload.only_threads.insert(only_thread);
}
return init_scheduler_common(workload);
return init_scheduler_common(workload, options);
}

template <typename RecordType, typename ReaderType>
bool
analyzer_tmpl_t<RecordType, ReaderType>::init_scheduler(
std::unique_ptr<ReaderType> reader, std::unique_ptr<ReaderType> reader_end,
int verbosity)
int verbosity, typename sched_type_t::scheduler_options_t *options)
{
verbosity_ = verbosity;
if (!reader || !reader_end) {
Expand All @@ -212,13 +219,14 @@ analyzer_tmpl_t<RecordType, ReaderType>::init_scheduler(
if (skip_instrs_ > 0)
regions.emplace_back(skip_instrs_ + 1, 0);
typename sched_type_t::input_workload_t workload(std::move(readers), regions);
return init_scheduler_common(workload);
return init_scheduler_common(workload, options);
}

template <typename RecordType, typename ReaderType>
bool
analyzer_tmpl_t<RecordType, ReaderType>::init_scheduler_common(
typename sched_type_t::input_workload_t &workload)
typename sched_type_t::input_workload_t &workload,
typename sched_type_t::scheduler_options_t *options)
{
for (int i = 0; i < num_tools_; ++i) {
if (parallel_ && !tools_[i]->parallel_shard_supported()) {
Expand All @@ -228,17 +236,26 @@ analyzer_tmpl_t<RecordType, ReaderType>::init_scheduler_common(
}
std::vector<typename sched_type_t::input_workload_t> sched_inputs(1);
sched_inputs[0] = std::move(workload);

typename sched_type_t::scheduler_options_t sched_ops;
int output_count;
if (parallel_) {
if (shard_type_ == SHARD_BY_CORE) {
// Subclass must pass us options and set worker_count_ to # cores.
if (options == nullptr || worker_count_ <= 0) {
error_string_ = "For -core_sharded, core count must be > 0";
return false;
}
sched_ops = *options;
if (sched_ops.quantum_unit == sched_type_t::QUANTUM_TIME)
sched_by_time_ = true;
} else if (parallel_) {
sched_ops = sched_type_t::make_scheduler_parallel_options(verbosity_);
if (worker_count_ <= 0)
worker_count_ = std::thread::hardware_concurrency();
} else {
sched_ops = sched_type_t::make_scheduler_serial_options(verbosity_);
worker_count_ = 1;
}
output_count = worker_count_;
int output_count = worker_count_;
if (scheduler_.init(sched_inputs, output_count, sched_ops) !=
sched_type_t::STATUS_SUCCESS) {
ERRMSG("Failed to initialize scheduler: %s\n",
Expand Down Expand Up @@ -311,6 +328,26 @@ analyzer_tmpl_t<RecordType, ReaderType>::get_error_string()
return error_string_;
}

template <typename RecordType, typename ReaderType>
uint64_t
analyzer_tmpl_t<RecordType, ReaderType>::get_current_microseconds()
{
#ifdef UNIX
struct timeval time;
if (gettimeofday(&time, nullptr) != 0)
return 0;
return time.tv_sec * 1000000 + time.tv_usec;
#else
SYSTEMTIME sys_time;
GetSystemTime(&sys_time);
FILETIME file_time;
if (!SystemTimeToFileTime(&sys_time, &file_time))
return 0;
return file_time.dwLowDateTime +
(static_cast<uint64_t>(file_time.dwHighDateTime) << 32);
#endif
}

template <typename RecordType, typename ReaderType>
uint64_t
analyzer_tmpl_t<RecordType, ReaderType>::compute_interval_id(uint64_t first_timestamp,
Expand Down Expand Up @@ -377,11 +414,15 @@ analyzer_tmpl_t<RecordType, ReaderType>::process_serial(analyzer_worker_data_t &
worker.error = tools_[i]->initialize_stream(worker.stream);
if (!worker.error.empty())
return;
worker.error = tools_[i]->initialize_shard_type(shard_type_);
if (!worker.error.empty())
return;
}
while (true) {
RecordType record;
uint64_t micros = sched_by_time_ ? get_current_microseconds() : 0;
typename sched_type_t::stream_status_t status =
worker.stream->next_record(record);
worker.stream->next_record(record, micros);
if (status != sched_type_t::STATUS_OK) {
if (status != sched_type_t::STATUS_EOF) {
if (status == sched_type_t::STATUS_REGION_INVALID) {
Expand Down Expand Up @@ -429,11 +470,20 @@ analyzer_tmpl_t<RecordType, ReaderType>::process_tasks(analyzer_worker_data_t *w
for (int i = 0; i < num_tools_; ++i)
user_worker_data[i] = tools_[i]->parallel_worker_init(worker->index);
RecordType record;
uint64_t micros = sched_by_time_ ? get_current_microseconds() : 0;
for (typename sched_type_t::stream_status_t status =
worker->stream->next_record(record);
worker->stream->next_record(record, micros);
status != sched_type_t::STATUS_EOF;
status = worker->stream->next_record(record)) {
if (status != sched_type_t::STATUS_OK) {
status = worker->stream->next_record(record, micros)) {
if (sched_by_time_)
micros = get_current_microseconds();
if (status == sched_type_t::STATUS_WAIT) {
// TODO i#5694: We'd like the forthcoming schedule_stats tool to know about
// waits and idle periods (to record "-" in its string): should the analyzer
// insert a new marker type that doesn't count toward ordinals (or else it
// needs a scheduler API to inject it)?
continue;
} else if (status != sched_type_t::STATUS_OK) {
if (status == sched_type_t::STATUS_REGION_INVALID) {
worker->error =
"Too-far -skip_instrs for: " + worker->stream->get_stream_name();
Expand All @@ -443,7 +493,9 @@ analyzer_tmpl_t<RecordType, ReaderType>::process_tasks(analyzer_worker_data_t *w
}
return;
}
int shard_index = worker->stream->get_input_stream_ordinal();
int shard_index = shard_type_ == SHARD_BY_CORE
? worker->index
: worker->stream->get_input_stream_ordinal();
if (worker->shard_data.find(shard_index) == worker->shard_data.end()) {
VPRINT(this, 1, "Worker %d starting on trace shard %d stream is %p\n",
worker->index, shard_index, worker->stream);
Expand All @@ -457,11 +509,11 @@ analyzer_tmpl_t<RecordType, ReaderType>::process_tasks(analyzer_worker_data_t *w
}
}
memref_tid_t tid;
// Currently shards map only to threads, so the shard_id is the same as
// the thread id.
if (worker->shard_data[shard_index].shard_id == 0 &&
record_has_tid(record, tid)) {
worker->shard_data[shard_index].shard_id = tid;
if (worker->shard_data[shard_index].shard_id == 0) {
if (shard_type_ == SHARD_BY_CORE)
worker->shard_data[shard_index].shard_id = worker->index;
else if (record_has_tid(record, tid))
worker->shard_data[shard_index].shard_id = tid;
}
uint64_t prev_interval_index;
uint64_t prev_interval_init_instr_count;
Expand Down Expand Up @@ -699,6 +751,9 @@ analyzer_tmpl_t<RecordType, ReaderType>::run()
error_string_ = tools_[i]->initialize_stream(nullptr);
if (!error_string_.empty())
return false;
error_string_ = tools_[i]->initialize_shard_type(shard_type_);
if (!error_string_.empty())
return false;
}
std::vector<std::thread> threads;
VPRINT(this, 1, "Creating %d worker threads\n", worker_count_);
Expand Down
13 changes: 10 additions & 3 deletions clients/drcachesim/analyzer.h
Original file line number Diff line number Diff line change
Expand Up @@ -213,16 +213,18 @@ template <typename RecordType, typename ReaderType> class analyzer_tmpl_t {

bool
init_scheduler(const std::string &trace_path,
memref_tid_t only_thread = INVALID_THREAD_ID, int verbosity = 0);
memref_tid_t only_thread = INVALID_THREAD_ID, int verbosity = 0,
typename sched_type_t::scheduler_options_t *options = nullptr);

bool
init_scheduler(
std::unique_ptr<ReaderType> reader = std::unique_ptr<ReaderType>(nullptr),
std::unique_ptr<ReaderType> reader_end = std::unique_ptr<ReaderType>(nullptr),
int verbosity = 0);
int verbosity = 0, typename sched_type_t::scheduler_options_t *options = nullptr);

bool
init_scheduler_common(typename sched_type_t::input_workload_t &workload);
init_scheduler_common(typename sched_type_t::input_workload_t &workload,
typename sched_type_t::scheduler_options_t *options);

// Used for std::thread so we need an rvalue (so no &worker).
void
Expand Down Expand Up @@ -302,6 +304,9 @@ template <typename RecordType, typename ReaderType> class analyzer_tmpl_t {
uint64_t interval_end_timestamp, int tool_idx,
typename analysis_tool_tmpl_t<RecordType>::interval_state_snapshot_t *&result);

uint64_t
get_current_microseconds();

bool success_;
scheduler_tmpl_t<RecordType, ReaderType> scheduler_;
std::string error_string_;
Expand All @@ -326,6 +331,8 @@ template <typename RecordType, typename ReaderType> class analyzer_tmpl_t {
uint64_t skip_instrs_ = 0;
uint64_t interval_microseconds_ = 0;
int verbosity_ = 0;
shard_type_t shard_type_ = SHARD_BY_THREAD;
bool sched_by_time_ = false;

private:
bool
Expand Down
Loading

0 comments on commit 9bb9b4d

Please sign in to comment.