Skip to content

Commit

Permalink
Replace tbb::task::spawn() by tbb::task::enqueue() and also support f…
Browse files Browse the repository at this point in the history
…or task monitoring

Replace tbb::task::spawn() by tbb::task::enqueue(). enqueue() is more
appropriate call since task library just fires the task without calling
spawn_root_and_wait()

Also, introduced a TaskMonitor class which monitors number of tasks
enqueued to scheduler and number of tasks spawned by TBB. If the monitor
finds tasks are enqueued but not scheduled for a configured time, it
will exit the program.

Conflicts:
	src/base/task.cc
	src/base/task.h
	src/vnsw/agent/cmn/agent.cc
	src/vnsw/agent/contrail-vrouter-agent.conf
	src/vnsw/agent/init/agent_param.cc

Change-Id: Ia68d52f4aba0e79a3fe75f8c4fc248d49ab94faf
Closes-Bug: #1684993
(cherry picked from commit a2eb064)
  • Loading branch information
praveenkv authored and Sangarshan committed Jul 17, 2018
1 parent 64aa2d4 commit ac4e397
Show file tree
Hide file tree
Showing 14 changed files with 545 additions and 17 deletions.
2 changes: 2 additions & 0 deletions src/base/SConscript
Expand Up @@ -36,6 +36,7 @@ libcpuinfo = env.Library('cpuinfo', ['cpuinfo.cc'] + cpuinfo_sandesh_files_)

task = except_env.Object('task.o', 'task.cc')
timer = timer_env.Object('timer.o', 'timer.cc')
task_monitor = timer_env.Object('task_monitor.o', 'task_monitor.cc')

ProcessInfoSandeshGenFiles = env.SandeshGenCpp('sandesh/process_info.sandesh')
ProcessInfoSandeshGenSrcs = env.ExtractCpp(ProcessInfoSandeshGenFiles)
Expand Down Expand Up @@ -66,6 +67,7 @@ libbase = env.Library('base',
'proto.cc',
task,
'task_annotations.cc',
task_monitor,
'task_sandesh.cc',
'task_trigger.cc',
'tdigest.c',
Expand Down
19 changes: 19 additions & 0 deletions src/base/sandesh/task.sandesh
Expand Up @@ -33,6 +33,7 @@ struct SandeshTaskGroup {

response sandesh SandeshTaskScheduler {
1: bool running;
5: bool use_spawn;
2: u64 total_count;
3: i32 thread_count;
4: list <SandeshTaskGroup> task_group_list;
Expand All @@ -56,3 +57,21 @@ trace sandesh TaskTrace {
4: u32 latency;
5: string description;
}

response sandesh SandeshMonitorResponse {
1: bool running;
2: u64 inactivity_time_msec;
3: u64 poll_interval_msec;
4: u64 poll_count;
5: u64 last_activity;
6: u64 last_enqueue_count;
7: u64 last_done_count;
8: u64 tbb_keepawake_time;
}

/**
* @description: sandesh request to get task monitoring status
* @cli_name: read task monitor
*/
request sandesh SandeshTaskMonitorRequest {
}
60 changes: 47 additions & 13 deletions src/base/task.cc
Expand Up @@ -14,6 +14,7 @@
#include "base/logging.h"
#include "base/task.h"
#include "base/task_annotations.h"
#include "base/task_monitor.h"

#include <sandesh/sandesh_types.h>
#include <sandesh/sandesh.h>
Expand Down Expand Up @@ -245,6 +246,7 @@ class TaskGroup {
tbb::task *TaskImpl::execute() {
TaskInfo::reference running = task_running.local();
running = parent_;
parent_->SetTbbState(Task::TBB_EXEC);
try {
uint64_t t = 0;
if (parent_->enqueue_time() != 0) {
Expand Down Expand Up @@ -331,6 +333,13 @@ int TaskScheduler::GetThreadCount(int thread_count) {
return num_cores_ * ThreadAmpFactor_;
}

bool TaskScheduler::ShouldUseSpawn() {
if (getenv("TBB_USE_SPAWN"))
return true;

return false;
}

////////////////////////////////////////////////////////////////////////////
// Implementation for class TaskScheduler
////////////////////////////////////////////////////////////////////////////
Expand All @@ -340,10 +349,10 @@ int TaskScheduler::GetThreadCount(int thread_count) {
// for task scheduling. But, in our case we dont want "main" thread to be
// part of tbb. So, initialize TBB with one thread more than its default
TaskScheduler::TaskScheduler(int task_count) :
task_scheduler_(GetThreadCount(task_count) + 1),
use_spawn_(ShouldUseSpawn()), task_scheduler_(GetThreadCount(task_count) + 1),
running_(true), seqno_(0), id_max_(0), log_fn_(), track_run_time_(false),
measure_delay_(false), schedule_delay_(0), execute_delay_(0),
enqueue_count_(0), done_count_(0), cancel_count_(0) {
enqueue_count_(0), done_count_(0), cancel_count_(0), task_monitor_(NULL) {
hw_thread_count_ = GetThreadCount(task_count);
task_group_db_.resize(TaskScheduler::kVectorGrowSize);
stop_entry_ = new TaskEntry(-1);
Expand Down Expand Up @@ -378,6 +387,18 @@ void TaskScheduler::Initialize(uint32_t thread_count) {
singleton_.reset(new TaskScheduler((int)thread_count));
}

void TaskScheduler::EnableMonitor(EventManager *evm,
uint64_t tbb_keepawake_time_msec,
uint64_t inactivity_time_msec,
uint64_t poll_interval_msec) {
if (task_monitor_ != NULL)
return;

task_monitor_ = new TaskMonitor(this, tbb_keepawake_time_msec,
inactivity_time_msec, poll_interval_msec);
task_monitor_->Start(evm);
}

void TaskScheduler::Log(const char *file_name, uint32_t line_no,
const Task *task, const char *description,
uint32_t delay) {
Expand Down Expand Up @@ -621,6 +642,7 @@ void TaskScheduler::OnTaskExit(Task *t) {
tbb::mutex::scoped_lock lock(mutex_);
done_count_++;

t->SetTbbState(Task::TBB_DONE);
TaskEntry *entry = QueryTaskEntry(t->GetTaskId(), t->GetTaskInstance());
entry->TaskExited(t, GetTaskGroup(t->GetTaskId()));

Expand All @@ -641,7 +663,8 @@ void TaskScheduler::OnTaskExit(Task *t) {
// Task is being recycled, reset the state, seq_no and TBB task handle
t->task_impl_ = NULL;
t->SetSeqNo(0);
t->state_ = Task::INIT;
t->SetState(Task::INIT);
t->SetTbbState(Task::TBB_INIT);
EnqueueUnLocked(t);
}

Expand Down Expand Up @@ -869,6 +892,12 @@ void TaskScheduler::WaitForTerminateCompletion() {
}

void TaskScheduler::Terminate() {
if (task_monitor_) {
task_monitor_->Terminate();
delete task_monitor_;
task_monitor_ = NULL;
}

for (int i = 0; i < 10000; i++) {
if (IsEmpty()) break;
usleep(1000);
Expand Down Expand Up @@ -1194,7 +1223,7 @@ void TaskEntry::RunTask (Task *t) {
TaskGroup *group = scheduler->QueryTaskGroup(t->GetTaskId());
group->TaskStarted();

t->StartTask();
t->StartTask(scheduler);
}

void TaskEntry::RunWaitQ() {
Expand Down Expand Up @@ -1346,32 +1375,36 @@ int TaskEntry::GetTaskDeferEntrySeqno() const {
// Implementation for class Task
////////////////////////////////////////////////////////////////////////////
Task::Task(int task_id, int task_instance) : task_id_(task_id),
task_instance_(task_instance), task_impl_(NULL), state_(INIT), seqno_(0),
task_recycle_(false), task_cancel_(false), enqueue_time_(0),
schedule_time_(0), execute_delay_(0), schedule_delay_(0) {
task_instance_(task_instance), task_impl_(NULL), state_(INIT),
tbb_state_(TBB_INIT), seqno_(0), task_recycle_(false), task_cancel_(false),
enqueue_time_(0), schedule_time_(0), execute_delay_(0), schedule_delay_(0) {
}

Task::Task(int task_id) : task_id_(task_id),
task_instance_(-1), task_impl_(NULL), state_(INIT), seqno_(0),
task_recycle_(false), task_cancel_(false), enqueue_time_(0),
task_instance_(-1), task_impl_(NULL), state_(INIT), tbb_state_(TBB_INIT),
seqno_(0), task_recycle_(false), task_cancel_(false), enqueue_time_(0),
schedule_time_(0), execute_delay_(0), schedule_delay_(0) {
}

// Start execution of task
void Task::StartTask() {
void Task::StartTask(TaskScheduler *scheduler) {
if (enqueue_time_ != 0) {
schedule_time_ = ClockMonotonicUsec();
TaskScheduler *scheduler = TaskScheduler::GetInstance();
if ((schedule_time_ - enqueue_time_) >
scheduler->schedule_delay(this)) {
TASK_TRACE(scheduler, this, "Schedule delay(in usec) ",
(schedule_time_ - enqueue_time_));
}
}
assert(task_impl_ == NULL);
state_ = RUN;
SetState(RUN);
SetTbbState(TBB_ENQUEUED);
task_impl_ = new (task::allocate_root())TaskImpl(this);
task::spawn(*task_impl_);
if (scheduler->use_spawn()) {
task::spawn(*task_impl_);
} else {
task::enqueue(*task_impl_);
}
}

Task *Task::Running() {
Expand Down Expand Up @@ -1437,6 +1470,7 @@ void TaskScheduler::GetSandeshData(SandeshTaskScheduler *resp, bool summary) {
tbb::mutex::scoped_lock lock(mutex_);

resp->set_running(running_);
resp->set_use_spawn(use_spawn_);
resp->set_total_count(seqno_);
resp->set_thread_count(hw_thread_count_);

Expand Down
25 changes: 24 additions & 1 deletion src/base/task.h
Expand Up @@ -44,6 +44,9 @@
class TaskGroup;
class TaskEntry;
class SandeshTaskScheduler;
class EventManager;
class TaskMonitor;
class TaskScheduler;

struct TaskStats {
int wait_count_; // #Entries in waitq
Expand Down Expand Up @@ -72,6 +75,13 @@ class Task {
RUN
};

enum TbbState {
TBB_INIT,
TBB_ENQUEUED,
TBB_EXEC,
TBB_DONE
};

const static int kTaskInstanceAny = -1;
Task(int task_id, int task_instance);
Task(int task_id);
Expand Down Expand Up @@ -109,15 +119,17 @@ class Task {
friend class TaskScheduler;
friend class TaskImpl;
void SetSeqNo(uint64_t seqno) {seqno_ = seqno;};
void SetTbbState(TbbState s) { tbb_state_ = s; };
void SetState(State s) { state_ = s; };
void SetTaskRecycle() { task_recycle_ = true; };
void SetTaskComplete() { task_recycle_ = false; };
void StartTask();
void StartTask(TaskScheduler *scheduler);

int task_id_; // The code path executed by the task.
int task_instance_; // The dataset id within a code path.
tbb::task *task_impl_;
State state_;
TbbState tbb_state_;
uint64_t seqno_;
bool task_recycle_;
bool task_cancel_;
Expand Down Expand Up @@ -198,6 +210,7 @@ class TaskScheduler {

// Get number of tbb worker threads.
static int GetThreadCount(int thread_count = 0);
static bool ShouldUseSpawn();

uint64_t enqueue_count() const { return enqueue_count_; }
uint64_t done_count() const { return done_count_; }
Expand All @@ -223,6 +236,13 @@ class TaskScheduler {
uint32_t schedule_delay(Task *task) const;
uint32_t execute_delay(Task *task) const;

// Enable Task monitoring
void EnableMonitor(EventManager *evm, uint64_t tbb_keepawake_time_msec,
uint64_t inactivity_time_msec,
uint64_t poll_interval_msec);
const TaskMonitor *task_monitor() const { return task_monitor_; }
bool use_spawn() const { return use_spawn_; }

// following function allows one to increase max num of threads used by
// TBB
static void SetThreadAmpFactor(int n);
Expand All @@ -245,6 +265,8 @@ class TaskScheduler {

int CountThreadsPerPid(pid_t pid);

// Use spawn() to run a tbb::task instead of enqueue()
bool use_spawn_;
TaskEntry *stop_entry_;

tbb::task_scheduler_init task_scheduler_;
Expand Down Expand Up @@ -273,6 +295,7 @@ class TaskScheduler {
// following variable allows one to increase max num of threads used by
// TBB
static int ThreadAmpFactor_;
TaskMonitor *task_monitor_;
DISALLOW_COPY_AND_ASSIGN(TaskScheduler);
};

Expand Down

0 comments on commit ac4e397

Please sign in to comment.