Skip to content

Commit

Permalink
progrssed on hlws scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
flipflapflop committed May 10, 2020
1 parent 7ccaf96 commit a6334c7
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 54 deletions.
13 changes: 9 additions & 4 deletions include/sylver/StarPU/hlws.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public:
bool operator()(
struct starpu_task const* lhs, struct starpu_task const* rhs) const {

bool is_less = false;
bool is_greater = false;

// std::cout << "[HeteroLwsScheduler::TaskLess] "
// << ", lhs task name = " << lhs->cl->name
Expand All @@ -28,13 +28,13 @@ public:
// << std::endl;

if (lhs->priority == rhs->priority) {
is_less = true;
is_greater = true;
}
else {
is_less = (lhs->priority > rhs->priority);
is_greater = (lhs->priority > rhs->priority);
}

return is_less;
return is_greater;
}
};

Expand All @@ -50,6 +50,11 @@ public:

unsigned last_pop_worker;
unsigned last_push_worker;

unsigned last_pop_cuda_worker;
unsigned last_push_cuda_worker;

std::vector<int> cuda_worker_ids;
};

static void initialize(unsigned sched_ctx_id);
Expand Down
137 changes: 95 additions & 42 deletions src/StarPU/hlws.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -295,37 +295,63 @@ unsigned HeteroLwsScheduler::select_worker(
// Get current worker id
int workerid = starpu_worker_get_id();

// Check if current worker can execute task
if (workerid == -1 || // Not a worker
!starpu_sched_ctx_contains_worker(workerid, sched_ctx_id) || // Not part of current context
!starpu_worker_can_execute_task_first_impl(workerid, task, NULL)) {
if (task->where & STARPU_CUDA) {
// Task can run on CUDA, push it to a CUDA worker's queue

// Round robin

unsigned worker;
unsigned nworkers;
int *workerids;
nworkers = starpu_sched_ctx_get_workers_list_raw(sched_ctx_id, &workerids);
// std::cout << "[HeteroLwsScheduler::push_task]"
// << " task name = " << task->cl->name << std::endl;

auto& cuda_worker_ids = sched_data->cuda_worker_ids;
unsigned num_cuda_worker = cuda_worker_ids.size();
unsigned cuda_worker;

sched_data->last_push_cuda_worker;

worker = sched_data->last_push_worker;
do {
worker = (worker + 1) % nworkers;
cuda_worker = (cuda_worker + 1) % num_cuda_worker;
}
while (!sched_data->worker_data[workerids[worker]].running ||
!starpu_worker_can_execute_task_first_impl(workerids[worker], task, NULL));
while (!sched_data->worker_data[cuda_worker_ids[cuda_worker]].running ||
!starpu_worker_can_execute_task_first_impl(
cuda_worker_ids[cuda_worker], task, NULL));


sched_data->last_push_worker = worker;
sched_data->last_push_cuda_worker = cuda_worker;

workerid = workerids[worker];
workerid = cuda_worker_ids[cuda_worker];
}
else {

// Check if current worker can execute task
if (workerid == -1 || // Not a worker
!starpu_sched_ctx_contains_worker(workerid, sched_ctx_id) || // Not part of current context
!starpu_worker_can_execute_task_first_impl(workerid, task, NULL)) {

// Round robin
unsigned nworkers;
int *workerids;
nworkers = starpu_sched_ctx_get_workers_list_raw(sched_ctx_id, &workerids);

unsigned worker;

worker = sched_data->last_push_worker;
do {
worker = (worker + 1) % nworkers;
}
while (!sched_data->worker_data[workerids[worker]].running ||
!starpu_worker_can_execute_task_first_impl(
workerids[worker], task, NULL));

sched_data->last_push_worker = worker;

workerid = workerids[worker];
}
}

return workerid;
}

int HeteroLwsScheduler::push_task(struct starpu_task *task) {

// std::cout << "[HeteroLwsScheduler::push_task]" << std::endl;

using SchedulerData = HeteroLwsScheduler::Data;

unsigned sched_ctx_id = task->sched_ctx;
Expand Down Expand Up @@ -433,6 +459,9 @@ int HeteroLwsScheduler::select_victim(

// std::cout << "[HeteroLwsScheduler::select_victim] last_pop_worker = " << sched_data->last_pop_worker << std::endl;

// Worker type
auto worker_type = starpu_worker_get_type(workerid);

// Round robin strategy

unsigned worker = sched_data->last_pop_worker;
Expand All @@ -444,34 +473,48 @@ int HeteroLwsScheduler::select_victim(

/* If the worker's queue is empty, let's try
* the next ones */
while (1)
{
/* Here helgrind would shout that this is unprotected, but we
* are fine with getting outdated values, this is just an
* estimation */
// ntasks = ws->per_worker[workerids[worker]].queue.ntasks;

auto& worker_data = sched_data->worker_data[workerids[worker]];
// ntasks = worker_data.task_queue.size();
ntasks = worker_data.task_prio_queue.size();

if (
(ntasks > 0) &&
(worker_data.busy || starpu_worker_is_blocked_in_parallel(workerids[worker]))) {

break;
}

worker = (worker + 1) % nworkers;
if (worker == sched_data->last_pop_worker)
{
/* We got back to the first worker,
* don't go in infinite loop */
ntasks = 0;
while (1) {
/* Here helgrind would shout that this is unprotected, but we
* are fine with getting outdated values, this is just an
* estimation */
// ntasks = ws->per_worker[workerids[worker]].queue.ntasks;

auto& worker_data = sched_data->worker_data[workerids[worker]];
// ntasks = worker_data.task_queue.size();
ntasks = worker_data.task_prio_queue.size();

if (
(ntasks > 0) &&
(worker_data.busy || starpu_worker_is_blocked_in_parallel(workerids[worker]))) {

auto victim_type = starpu_worker_get_type(workerids[worker]);
if ((victim_type == STARPU_CUDA_WORKER) && (worker_type != STARPU_CUDA_WORKER)) {
// Non-CUDA worker trying to steal from a CUDA worker,
// making sure there is plently of tasks in the CUDA
// workers's queue. Do not steal otherwise.
if (ntasks > 100) {
break;
}

// std::cout << "[HeteroLwsScheduler::select_victim] workerid = " << workerid << " stealing " << std::endl;

}
else {
break;
}
}

worker = (worker + 1) % nworkers;

if (worker == sched_data->last_pop_worker)
{
/* We got back to the first worker,
* don't go in infinite loop */
ntasks = 0;
break;
}
}

sched_data->last_pop_worker = (worker + 1) % nworkers;

worker = workerids[worker];
Expand Down Expand Up @@ -511,6 +554,9 @@ void HeteroLwsScheduler::initialize(unsigned sched_ctx_id) {
sched_data->last_pop_worker = 0;
sched_data->last_push_worker = 0;

sched_data->last_pop_cuda_worker = 0;
sched_data->last_push_cuda_worker = 0;

unsigned const nw = starpu_worker_get_count();

sched_data->worker_data.resize(nw);
Expand All @@ -521,6 +567,13 @@ void HeteroLwsScheduler::initialize(unsigned sched_ctx_id) {
if (starpu_sched_ctx_max_priority_is_set(sched_ctx_id) == 0)
starpu_sched_ctx_set_max_priority(sched_ctx_id, INT_MAX);

unsigned num_cuda_workers = starpu_cuda_worker_get_count();
auto& cuda_worker_ids = sched_data->cuda_worker_ids;
cuda_worker_ids.resize(num_cuda_workers);
starpu_worker_get_ids_by_type(STARPU_CUDA_WORKER, &cuda_worker_ids[0], num_cuda_workers);

std::cout << "[HeteroLwsScheduler::initialize] num_cuda_workers = " << num_cuda_workers << std::endl;

}

void HeteroLwsScheduler::finalize(unsigned sched_ctx_id) {
Expand Down
4 changes: 3 additions & 1 deletion tests/drivers/factor_node_test.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,9 @@ int main(int argc, char** argv) {
// factor_node_indef_test<double, 32, false>(0.01, 1e-20, true, false, opts.m, opts.n, opts.nb, opts.ncpu);
// factor_node_indef_test<double, 32, false>(0.01, 1e-20, opts.posdef, true, false, opts.m, opts.n, opts.nb, opts.ncpu);
// No delays
spldlt::tests::factor_node_indef_test<double, 32, false>(0.01, 1e-20, opts.posdef, false, false, opts.m, opts.n, opts.nb, opts.ncpu, opts.ngpu);
spldlt::tests::factor_node_indef_test<double, 32, false>(opts);

// 0.01, 1e-20, opts.posdef, false, false, opts.m, opts.n, opts.nb, opts.ncpu, opts.ngpu);
}
}

21 changes: 14 additions & 7 deletions tests/testing_factor_node_indef.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,10 @@ int factor_node_indef_test(sylver::tests::Options test_options) {
// Make singluar
bool singular = test_options.singular;

std::cout << "[factor_node_indef_test]" << " u = " << u << ", small = " << small << std::endl;
std::cout << "[factor_node_indef_test]" << " posdef = " << posdef << std::endl;
std::cout << "[factor_node_indef_test]" << " delays = " << delays << ", singular = " << singular << std::endl;
std::cout << "[factor_node_indef_test]" << std::endl;
std::cout << "u = " << u << ", small = " << small << std::endl;
std::cout << "posdef = " << posdef << std::endl;
std::cout << " delays = " << delays << ", singular = " << singular << std::endl;

// Number of rows
int const m = test_options.m;
Expand Down Expand Up @@ -178,8 +179,14 @@ int factor_node_indef_test(sylver::tests::Options test_options) {
#if defined(SPLDLT_USE_STARPU)

sylver::starpu::StarPU::ncpu = ncpu;
#if defined(SPLDLT_USE_GPU)
sylver::starpu::StarPU::ncuda = ngpu;

#else
sylver::starpu::StarPU::ncuda = 0;
#endif

// std::cout << " TETETETETETE " << std::endl;

// Select scheduler
switch (test_options.sched) {
case(sylver::tests::Sched::HP):
Expand All @@ -198,6 +205,8 @@ int factor_node_indef_test(sylver::tests::Options test_options) {
std::runtime_error("Scheduler not available");
}

sylver::starpu::StarPU::initialize();

// struct starpu_conf *conf = new starpu_conf;
// starpu_conf_init(conf);
// conf->ncpus = ncpu;
Expand Down Expand Up @@ -229,9 +238,7 @@ int factor_node_indef_test(sylver::tests::Options test_options) {
// sylver::starpu::StarPU::conf.sched_policy_name = NULL;
// sylver::starpu::StarPU::conf.sched_policy =
// &sylver::starpu::HeteroLwsScheduler::starpu_sched_policy();

sylver::starpu::StarPU::initialize();


#endif

int nworkers = 1;
Expand Down

0 comments on commit a6334c7

Please sign in to comment.