Skip to content

Commit

Permalink
Add additional tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Kaushik Iska committed Aug 22, 2019
1 parent 97d16ca commit f9adbc9
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 3 deletions.
12 changes: 9 additions & 3 deletions fml/message_loop_task_queues.cc
Expand Up @@ -184,8 +184,10 @@ void MessageLoopTaskQueues::SetWakeable(TaskQueueId queue_id,
queue_entries.at(queue_id).wakeable = wakeable;
}

bool MessageLoopTaskQueues::Merge(TaskQueueId owner, TaskQueueId subsumed) {
fml::UniqueLock queue_wirter(*queue_meta_mutex_);
bool MessageLoopTaskQueues::Merge(TaskQueueId owner, TaskQueueId subsumed)
FML_NO_THREAD_SAFETY_ANALYSIS {
std::scoped_lock t_o(GetMutex(owner, MutexType::kTasks));
std::scoped_lock o_o(GetMutex(owner, MutexType::kObservers));

if (owner == subsumed) {
return true;
Expand All @@ -195,6 +197,9 @@ bool MessageLoopTaskQueues::Merge(TaskQueueId owner, TaskQueueId subsumed) {
return true;
}

std::scoped_lock t_s(GetMutex(subsumed, MutexType::kTasks));
std::scoped_lock o_s(GetMutex(subsumed, MutexType::kObservers));

std::vector<TaskQueueId> owner_subsumed_keys = {
queue_entries[owner].owner_of, queue_entries[owner].subsumed_by,
queue_entries[subsumed].owner_of, queue_entries[subsumed].subsumed_by};
Expand All @@ -216,7 +221,8 @@ bool MessageLoopTaskQueues::Merge(TaskQueueId owner, TaskQueueId subsumed) {
}

bool MessageLoopTaskQueues::Unmerge(TaskQueueId owner) {
fml::UniqueLock queue_wirter(*queue_meta_mutex_);
std::scoped_lock t_o(GetMutex(owner, MutexType::kTasks));
std::scoped_lock o_o(GetMutex(owner, MutexType::kObservers));

const TaskQueueId subsumed = queue_entries[owner].owner_of;
if (subsumed == _kUnmerged) {
Expand Down
54 changes: 54 additions & 0 deletions fml/message_loop_task_queues_unittests.cc
Expand Up @@ -165,3 +165,57 @@ TEST(MessageLoopTaskQueue, NotifyObserversWhileCreatingQueues) {
before_second_observer.Signal();
notify_observers.join();
}

TEST(MessageLoopTaskQueue, ConcurrentQueueAndTaskCreatingCounts) {
auto task_queues = fml::MessageLoopTaskQueues::GetInstance();
const int base_queue_id = task_queues->CreateTaskQueue();

const int num_queues = 100;
std::atomic_bool created[num_queues * 3];
std::atomic_int num_tasks[num_queues * 3];
std::mutex task_count_mutex[num_queues * 3];
std::atomic_int done = 0;

for (int i = 0; i < num_queues * 3; i++) {
num_tasks[i] = 0;
created[i] = false;
}

auto creation_func = [&] {
for (int i = 0; i < num_queues; i++) {
fml::TaskQueueId queue_id = task_queues->CreateTaskQueue();
created[queue_id - base_queue_id] = true;

for (int cur_q = 1; cur_q < i; cur_q++) {
if (created[cur_q - base_queue_id]) {
std::scoped_lock counter(task_count_mutex[cur_q - base_queue_id]);
int cur_num_tasks = rand() % 10;
for (int k = 0; k < cur_num_tasks; k++) {
task_queues->RegisterTask(
fml::TaskQueueId(cur_q), [] {}, fml::TimePoint::Now());
}
num_tasks[cur_q - base_queue_id] += cur_num_tasks;
}
}
}
done++;
};

std::thread creation_1(creation_func);
std::thread creation_2(creation_func);

while (done < 2) {
for (int i = 0; i < num_queues * 3; i++) {
if (created[i]) {
std::scoped_lock counter(task_count_mutex[i]);
int num_pending = task_queues->GetNumPendingTasks(
fml::TaskQueueId(base_queue_id + i));
int num_added = num_tasks[i];
ASSERT_EQ(num_pending, num_added);
}
}
}

creation_1.join();
creation_2.join();
}

0 comments on commit f9adbc9

Please sign in to comment.