Skip to content

Commit

Permalink
[SE] atomic task upload to external handler
Browse files Browse the repository at this point in the history
/build before_merge

Signed-off-by: iceseer <iceseer@gmail.com>
  • Loading branch information
iceseer committed Aug 25, 2021
1 parent 6cc7d87 commit d815312
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 77 deletions.
81 changes: 31 additions & 50 deletions irohad/subscription/async_dispatcher_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ namespace iroha::subscription {
struct SchedulerContext {
/// Scheduler to execute tasks
std::shared_ptr<IScheduler> handler;

/// Shows if this handler is static or if it was created to
/// execute a single task and should be deleted after performing it
bool is_temporary;
};

SchedulerContext handlers_[kHandlersCount];
Expand All @@ -45,9 +41,16 @@ namespace iroha::subscription {
};
utils::ReadWriteObject<BoundContexts> bound_;

inline SchedulerContext findHandler(typename Parent::Tid const tid) {
if (tid < kHandlersCount)
return handlers_[tid];
void uploadToHandler(typename Parent::Tid const tid,
std::chrono::microseconds timeout,
typename Parent::Task &&task) {
if (is_disposed_.load())
return;

if (tid < kHandlersCount) {
handlers_[tid].handler->addDelayed(timeout, std::move(task));
return;
}

if (auto context =
bound_.sharedAccess([tid](BoundContexts const &bound)
Expand All @@ -56,17 +59,26 @@ namespace iroha::subscription {
it != bound.contexts.end())
return it->second;
return std::nullopt;
}))
return *context;
})) {
context->handler->addDelayed(timeout, std::move(task));
return;
}

std::optional<typename Parent::Task> opt_task = std::move(task);
for (auto &handler : pool_)
if (!handler.handler->isBusy())
return handler;

return SchedulerContext{
std::make_shared<ThreadHandler>(),
true // temporary
};
if (opt_task =
handler.handler->uploadIfFree(timeout, std::move(*opt_task));
!opt_task)
return;

auto h = std::make_shared<ThreadHandler>();
++temporary_handlers_tasks_counter_;
h->addDelayed(timeout, [this, h, task{std::move(*opt_task)}]() mutable {
if (!is_disposed_.load())
task();
--temporary_handlers_tasks_counter_;
h->dispose(false);
});
}

public:
Expand All @@ -75,11 +87,9 @@ namespace iroha::subscription {
is_disposed_ = false;
for (auto &h : handlers_) {
h.handler = std::make_shared<ThreadHandler>();
h.is_temporary = false;
}
for (auto &h : pool_) {
h.handler = std::make_shared<ThreadHandler>();
h.is_temporary = false;
}
}

Expand All @@ -93,42 +103,13 @@ namespace iroha::subscription {
}

void add(typename Parent::Tid tid, typename Parent::Task &&task) override {
if (is_disposed_.load())
return;

auto h = findHandler(tid);
if (!h.is_temporary)
h.handler->add(std::move(task));
else {
++temporary_handlers_tasks_counter_;
h.handler->add([this, h, task{std::move(task)}]() mutable {
if (!is_disposed_.load())
task();
--temporary_handlers_tasks_counter_;
h.handler->dispose(false);
});
}
uploadToHandler(tid, std::chrono::microseconds(0ull), std::move(task));
}

void addDelayed(typename Parent::Tid tid,
std::chrono::microseconds timeout,
typename Parent::Task &&task) override {
if (is_disposed_.load())
return;

auto h = findHandler(tid);
if (!h.is_temporary)
h.handler->addDelayed(timeout, std::move(task));
else {
++temporary_handlers_tasks_counter_;
h.handler->addDelayed(timeout,
[this, h, task{std::move(task)}]() mutable {
if (!is_disposed_.load())
task();
--temporary_handlers_tasks_counter_;
h.handler->dispose(false);
});
}
uploadToHandler(tid, timeout, std::move(task));
}

std::optional<Tid> bind(std::shared_ptr<IScheduler> scheduler) override {
Expand All @@ -139,7 +120,7 @@ namespace iroha::subscription {
[scheduler(std::move(scheduler))](BoundContexts &bound) {
auto const execution_tid = kHandlersCount + bound.next_tid_offset;
assert(bound.contexts.find(execution_tid) == bound.contexts.end());
bound.contexts[execution_tid] = SchedulerContext{scheduler, false};
bound.contexts[execution_tid] = SchedulerContext{scheduler};
++bound.next_tid_offset;
return execution_tid;
});
Expand Down
6 changes: 4 additions & 2 deletions irohad/subscription/scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ namespace iroha::subscription {
/// Checks if current scheduler executes task
virtual bool isBusy() const = 0;

/// Adds task to execution queue
virtual void add(Task &&t) = 0;
/// If scheduller is not busy it takes task for execution. Otherwise it
/// returns it back.
virtual std::optional<Task> uploadIfFree(std::chrono::microseconds timeout,
Task &&task) = 0;

/// Adds delayed task to execution queue
virtual void addDelayed(std::chrono::microseconds timeout, Task &&t) = 0;
Expand Down
48 changes: 23 additions & 25 deletions irohad/subscription/scheduler_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,6 @@

#include "common/common.hpp"

/**
* If you need to execute task, that was made in this thread and want to be
* executed in the same thread without delay - you need to uncomment this define
*/
//#define SE_SYNC_CALL_IF_SAME_THREAD

namespace iroha::subscription {

class SchedulerBase : public IScheduler, utils::NoCopy, utils::NoMove {
Expand Down Expand Up @@ -76,8 +70,9 @@ namespace iroha::subscription {
tasks_.insert(after, std::move(t));
}

bool extractExpired(Task &task, Timepoint const &before) {
bool extractExpired(Task &task) {
std::lock_guard lock(tasks_cs_);
Timepoint const before = now();
if (!tasks_.empty()) {
auto &first_task = tasks_.front();
if (first_task.timepoint <= before) {
Expand Down Expand Up @@ -106,6 +101,16 @@ namespace iroha::subscription {
return std::chrono::minutes(10ull);
}

void add(std::chrono::microseconds timeout, Task &&task) {
assert(!tasks_cs_.try_lock());
if (timeout == std::chrono::microseconds(0ull))
is_busy_ = true;

auto const tp = now() + timeout;
insert(after(tp), TimedTask{tp, std::move(task)});
event_.set();
}

public:
SchedulerBase() : is_busy_(false) {
proceed_.test_and_set();
Expand All @@ -115,7 +120,7 @@ namespace iroha::subscription {
id_ = std::this_thread::get_id();
Task task;
do {
if (extractExpired(task, now())) {
if (extractExpired(task)) {
try {
if (task)
task();
Expand All @@ -138,26 +143,19 @@ namespace iroha::subscription {
return is_busy_;
}

void add(Task &&t) override {
addDelayed(std::chrono::microseconds(0ull), std::move(t));
std::optional<Task> uploadIfFree(std::chrono::microseconds timeout,
Task &&task) override {
std::lock_guard lock(tasks_cs_);
if (is_busy_)
return std::move(task);

add(timeout, std::move(task));
return std::nullopt;
}

void addDelayed(std::chrono::microseconds timeout, Task &&t) override {
#ifdef SE_SYNC_CALL_IF_SAME_THREAD
if (timeout == std::chrono::microseconds(0ull)
&& id_ == std::this_thread::get_id()) {
std::forward<F>(f)();
} else {
#endif // SE_SYNC_CALL_IF_SAME_THREAD
auto const tp = now() + timeout;
std::lock_guard lock(tasks_cs_);
if (timeout == std::chrono::microseconds(0ull))
is_busy_ = true;
insert(after(tp), TimedTask{tp, std::move(t)});
event_.set();
#ifdef SE_SYNC_CALL_IF_SAME_THREAD
}
#endif // SE_SYNC_CALL_IF_SAME_THREAD
std::lock_guard lock(tasks_cs_);
add(timeout, std::move(t));
}
};

Expand Down

0 comments on commit d815312

Please sign in to comment.