Skip to content
This repository has been archived by the owner on Jun 23, 2022. It is now read-only.

Commit

Permalink
core: assign a timer thread for each worker thread
Browse files Browse the repository at this point in the history
  • Loading branch information
shengofsun committed Jun 11, 2018
1 parent 2e7ffeb commit 0907496
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 20 deletions.
48 changes: 30 additions & 18 deletions src/core/core/task_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,8 @@ using namespace dsn::utils;
namespace dsn {

task_worker_pool::task_worker_pool(const threadpool_spec &opts, task_engine *owner)
: _spec(opts), _owner(owner), _node(owner->node())
: _spec(opts), _owner(owner), _node(owner->node()), _is_running(false)
{
_is_running = false;
_timer_svc = nullptr;
}

void task_worker_pool::create()
Expand Down Expand Up @@ -80,6 +78,19 @@ void task_worker_pool::create()
}
}

for (int i = 0; i < qCount; ++i) {
auto tsvc = factory_store<timer_service>::create(
service_engine::fast_instance().spec().timer_factory_name.c_str(),
PROVIDER_TYPE_MAIN,
_node,
nullptr);
for (auto &s : service_engine::fast_instance().spec().timer_aspects) {
tsvc =
factory_store<timer_service>::create(s.c_str(), PROVIDER_TYPE_ASPECT, _node, tsvc);
}
_per_queue_timer_svcs.push_back(tsvc);
}

for (int i = 0; i < _spec.worker_count; i++) {
auto q = _queues[qCount == 1 ? 0 : i];
task_worker *worker = factory_store<task_worker>::create(
Expand All @@ -100,6 +111,8 @@ void task_worker_pool::start()
if (_is_running)
return;

for (auto &tsvc : _per_queue_timer_svcs)
tsvc->start();
for (auto &wk : _workers)
wk->start();

Expand All @@ -112,8 +125,6 @@ void task_worker_pool::start()
_spec.worker_share_core ? "true" : "false",
_spec.partitioned ? "true" : "false");

// setup cached ptrs for fast timer service access
_timer_svc = node()->tsvc();
_is_running = true;
}

Expand All @@ -122,7 +133,11 @@ void task_worker_pool::add_timer(task *t)
dassert(t->delay_milliseconds() > 0,
"task delayed should be dispatched to timer service first");

_timer_svc->add_timer(t);
unsigned int idx = (_spec.partitioned
? static_cast<unsigned int>(t->hash()) %
static_cast<unsigned int>(_per_queue_timer_svcs.size())
: 0);
_per_queue_timer_svcs[idx]->add_timer(t);
}

void task_worker_pool::enqueue(task *t)
Expand All @@ -132,18 +147,15 @@ void task_worker_pool::enqueue(task *t)
dassert(t->delay_milliseconds() == 0,
"task delayed should be dispatched to timer service first");

if (_is_running) {
unsigned int idx =
(_spec.partitioned
? static_cast<unsigned int>(t->hash()) % static_cast<unsigned int>(_queues.size())
: 0);
return _queues[idx]->enqueue_internal(t);
} else {
dassert(false,
"worker pool %s must be started before enqueue task %s",
spec().name.c_str(),
t->spec().name.c_str());
}
dassert(_is_running,
"worker pool %s must be started before enqueue task %s",
spec().name.c_str(),
t->spec().name.c_str());
unsigned int idx =
(_spec.partitioned
? static_cast<unsigned int>(t->hash()) % static_cast<unsigned int>(_queues.size())
: 0);
return _queues[idx]->enqueue_internal(t);
}

bool task_worker_pool::shared_same_worker_with_current_task(task *tsk) const
Expand Down
2 changes: 0 additions & 2 deletions src/core/core/task_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,6 @@ class task_worker_pool
std::vector<task_queue *> _queues;
std::vector<admission_controller *> _controllers;

// cached ptrs for fast access
timer_service *_timer_svc;
std::vector<timer_service *> _per_queue_timer_svcs;

bool _is_running;
Expand Down

0 comments on commit 0907496

Please sign in to comment.