Skip to content

Commit

Permalink
Propagate maximum number of threads.
Browse files Browse the repository at this point in the history
  - This is preparation to be able to set the desired number of threads
    at simulation start.

    Now the max number of threads to be used at any time can be set with
    the interface function PM_Model_create() when the parmodauto model
    representation is created by the OpenModelica simulation executable
    code.

    The maximum number of threads is passed to TBB by OMModel constructor
    using tbb::task_scheduler_init.
  • Loading branch information
mahge committed Oct 30, 2020
1 parent 1192d7d commit 7035b01
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 36 deletions.
2 changes: 1 addition & 1 deletion OMCompiler/Compiler/Template/CodegenC.tpl
Expand Up @@ -1039,7 +1039,7 @@ template simulationFile(SimCode simCode, String guid, String isModelExchangeFMU)
let pminit = if Flags.getConfigBool(Flags.PARMODAUTO) then
<<

pm_model = PM_Model_create("<%fileNamePrefix%>", &data, threadData);
pm_model = PM_Model_create("<%fileNamePrefix%>", &data, threadData, 0 /*num threads*/);
PM_Model_load_ODE_system(pm_model, functionODE_systems);

>>
Expand Down
2 changes: 1 addition & 1 deletion OMCompiler/SimulationRuntime/ParModelica/auto/Makefile.in
Expand Up @@ -13,7 +13,7 @@ INCDIRS = -I"../../c" -I$(BOOST_HOME) -I$(OMC_TBB_ROOT)/include -I$(OMCOMPILER_R
CC=@CC@
CXX=@CXX@
CFLAGS=@CFLAGS@
CPPFLAGS= @CFLAGS@ -Wall -DGC_THREADS
CPPFLAGS= -O3 -Wall -DGC_THREADS

OS_SRCS = pm_posix_timer.cpp

Expand Down
Expand Up @@ -45,13 +45,14 @@ typedef Equation::FunctionType FunctionType;

PMTimer seq_ode_timer;

void* PM_Model_create(const char* model_name, DATA* data, threadData_t* threadData) {
OMModel* pm_om_model = new OMModel(model_name);
void* PM_Model_create(const char* model_name, DATA* data, threadData_t* threadData, size_t in_max_num_threads) {

size_t max_num_threads = in_max_num_threads ? in_max_num_threads : tbb::this_task_arena::max_concurrency();

OMModel* pm_om_model = new OMModel(model_name, max_num_threads);
pm_om_model->data = data;
pm_om_model->threadData = threadData;

pm_om_model->max_num_threads = tbb::this_task_arena::max_concurrency();

return pm_om_model;
}

Expand Down
Expand Up @@ -48,7 +48,7 @@ extern "C" {

typedef void (*FunctionType)(DATA *, threadData_t*);

void* PM_Model_create(const char* , DATA* , threadData_t*);
void* PM_Model_create(const char* name, DATA* simdata, threadData_t* threadData, size_t num_threads);

void PM_Model_load_ODE_system(void*, FunctionType*);

Expand Down
20 changes: 11 additions & 9 deletions OMCompiler/SimulationRuntime/ParModelica/auto/om_pm_model.cpp
Expand Up @@ -77,16 +77,18 @@ void Equation::execute() {
function_system[task_id](data, threadData);
}

OMModel::OMModel(const std::string& in_name)
OMModel::OMModel(const std::string& in_name, size_t mnt)
: name(in_name)
, INI_system(name)
, INI_scheduler(INI_system)
, DAE_system(name)
, DAE_scheduler(DAE_system)
, ODE_system(name)
, ODE_scheduler(ODE_system)
, ALG_system(name)
, ALG_scheduler(ALG_system) {
, max_num_threads(mnt)
, tbb_system(mnt)
, INI_system(name, mnt)
, INI_scheduler(INI_system, mnt)
, DAE_system(name, mnt)
, DAE_scheduler(DAE_system, mnt)
, ODE_system(name, mnt)
, ODE_scheduler(ODE_system, mnt)
, ALG_system(name, mnt)
, ALG_scheduler(ALG_system, mnt) {
intialized = false;
}

Expand Down
12 changes: 6 additions & 6 deletions OMCompiler/SimulationRuntime/ParModelica/auto/om_pm_model.hpp
Expand Up @@ -38,15 +38,12 @@
Mahder.Gebremedhin@liu.se 2020-10-12
*/

#include <tbb/task_scheduler_init.h>
#include <simulation_data.h>

// #include "pm_task_system.hpp"
#include "pm_cluster_level_scheduler.hpp"
#include "pm_cluster_dynamic_scheduler.hpp"

// #include "pm_level_scheduler.hpp"
// #include "pm_dynamic_scheduler.hpp"

#include "pm_timer.hpp"

#include "om_pm_equation.hpp"
Expand Down Expand Up @@ -107,13 +104,16 @@ class OMModel

public:
std::string name;
size_t max_num_threads;
tbb::task_scheduler_init tbb_system;

bool intialized;
DATA* data;
threadData_t* threadData;
size_t max_num_threads;


public:
OMModel(const std::string&);
OMModel(const std::string& name, size_t max_num_threads);
void load_ODE_system();

PMTimer load_system_timer;
Expand Down
Expand Up @@ -69,6 +69,8 @@ class ClusterDynamicScheduler {

private:

size_t max_num_threads;

tbb::flow::graph dynamic_graph;
tbb::flow::broadcast_node<tbb::flow::continue_msg> flow_root;

Expand All @@ -81,14 +83,13 @@ class ClusterDynamicScheduler {
PMTimer clustering_timer;
TaskSystemType& task_system;

size_t max_num_threads;

int sequential_evaluations;
int total_evaluations;
int parallel_evaluations;

ClusterDynamicScheduler(TaskSystemType& task_system)
: flow_root(dynamic_graph)
ClusterDynamicScheduler(TaskSystemType& task_system, size_t mnt)
: max_num_threads(mnt)
, flow_root(dynamic_graph)
, flow_graph_created(false)
, task_system(task_system) {
sequential_evaluations = 0;
Expand Down
Expand Up @@ -66,7 +66,7 @@ struct TBBConcurrentStepExecutor {
GraphType& sys_graph;

public:
TBBConcurrentStepExecutor(GraphType& g, std::set<pid_t>& k) : sys_graph(g) {}
TBBConcurrentStepExecutor(GraphType& g) : sys_graph(g) {}

void operator()(tbb::blocked_range<ClusteIdIter>& range) const {

Expand Down Expand Up @@ -116,11 +116,12 @@ class StepLevels : boost::noncopyable {
typedef typename ClusterLevels::value_type SameLevelClusterIdsType;

public:
size_t max_num_threads;

const TaskSystemType& task_system_org;
TaskSystemType task_system;
TBBConcurrentStepExecutor<TaskType> step_executor;

size_t max_num_threads;

bool profiled;
bool schedule_available;
Expand All @@ -135,17 +136,17 @@ class StepLevels : boost::noncopyable {
bool has_run_parallel;

public:
std::string name;

PMTimer execution_timer;
PMTimer clustering_timer;
PMTimer step_timer;

std::vector<double> parallel_eval_costs;

StepLevels(TaskSystemType& ts)
: task_system_org(ts)
, task_system("invalid") // implement a constrctor with no parameters and remove this
StepLevels(TaskSystemType& ts, size_t mnt)
: max_num_threads(mnt)
, task_system_org(ts)
, task_system("invalid", mnt) // implement a constrctor with no parameters and remove this
, step_executor(task_system.sys_graph) {
// GC_allow_register_threads();
// GC_use_threads_discovery();
Expand Down
Expand Up @@ -206,14 +206,17 @@ class TaskSystem_v2 {
public:
std::string name;

size_t max_num_threads;

ClusterLevels clusters_by_level;
bool levels_valid;
double total_cost;

GraphType sys_graph;
ClusterIdType root_node_id;

TaskSystem_v2(const std::string& in_name) {
TaskSystem_v2(const std::string& in_name, size_t mnt) {
max_num_threads = mnt;
name = in_name;
levels_valid = false;
node_count = 0;
Expand All @@ -228,6 +231,7 @@ class TaskSystem_v2 {

TaskSystem_v2(const TaskSystem_v2& other) {
this->name = other.name;
this->max_num_threads = other.max_num_threads;
this->node_count = other.node_count;

this->clusters_by_level = other.clusters_by_level;
Expand Down Expand Up @@ -258,6 +262,7 @@ class TaskSystem_v2 {
TaskSystem_v2& operator=(const TaskSystem_v2& other) {

this->name = other.name;
this->max_num_threads = other.max_num_threads;
this->node_count = other.node_count;

this->clusters_by_level = other.clusters_by_level;
Expand Down Expand Up @@ -476,7 +481,7 @@ class TaskSystem_v2 {
}
}

// create alist of nodes per level
// create a list of nodes per level
// Collect nodes of the same level in to a vector and add it to the list of levels
clusters_by_level.clear();
clusters_by_level.resize(critical_path + 1);
Expand All @@ -500,8 +505,6 @@ class TaskSystem_v2 {
this->levels_valid = true;
}

void load_from_xml(const std::string& file_name, const std::string& eq_to_read);

void dump_graphml(const std::string& suffix) {

if (levels_valid == false)
Expand Down

0 comments on commit 7035b01

Please sign in to comment.