From cb8fd2264b9e74b98498b8718980d263804eb1d3 Mon Sep 17 00:00:00 2001 From: Shylock Hg <33566796+Shylock-Hg@users.noreply.github.com> Date: Fri, 30 Jul 2021 10:21:57 +0800 Subject: [PATCH] =?UTF-8?q?Replace=20the=20lastUser=20by=20user=20count.?= =?UTF-8?q?=E2=80=A6=20(#1243)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Replace the lastUser by user count.The last user maybe could run simultaneously, so can't determine which is real last user in timeline. * Fix the typo. Co-authored-by: Yee <2520865+yixinglu@users.noreply.github.com> --- src/context/Symbols.h | 12 ++-------- src/executor/Executor.cpp | 13 +++++++---- .../AsyncMsgNotifyBasedScheduler.cpp | 4 +++- src/scheduler/Scheduler.cpp | 22 ++++++++++++++----- 4 files changed, 31 insertions(+), 20 deletions(-) diff --git a/src/context/Symbols.h b/src/context/Symbols.h index b77073b71..4b6ca906a 100644 --- a/src/context/Symbols.h +++ b/src/context/Symbols.h @@ -47,16 +47,8 @@ struct Variable { std::unordered_set readBy; std::unordered_set writtenBy; - // None means will used in later - // non-positive means static lifetime - // positive means last user id - folly::Optional lastUser; - - void setLastUser(int64_t id) { - if (!lastUser.hasValue()) { - lastUser = id; - } - } + // the count of use the variable + std::atomic userCount; }; class SymbolTable final { diff --git a/src/executor/Executor.cpp b/src/executor/Executor.cpp index 6323d3c6f..08ddf02e4 100644 --- a/src/executor/Executor.cpp +++ b/src/executor/Executor.cpp @@ -8,6 +8,7 @@ #include #include +#include #include "common/base/Memory.h" #include "common/base/ObjectPool.h" @@ -581,16 +582,20 @@ folly::Future Executor::error(Status status) const { void Executor::drop() { for (const auto &inputVar : node()->inputVars()) { if (inputVar != nullptr) { - if (inputVar->lastUser.value() == node()->id()) { - ectx_->dropResult(inputVar->name); - VLOG(1) << "Drop variable " << node()->outputVar(); + // Make sure use the variable happened-before decrement count + if (inputVar->userCount.fetch_sub(1, std::memory_order_release) == 1) { + // Make sure drop happened-after count decrement + CHECK_EQ(inputVar->userCount.load(std::memory_order_acquire), 0); + ectx_->dropResult(inputVar->name); + VLOG(1) << "Drop variable " << node()->outputVar(); } } } } Status Executor::finish(Result &&result) { - if (!FLAGS_enable_lifetime_optimize || node()->outputVarPtr()->lastUser.hasValue()) { + if (!FLAGS_enable_lifetime_optimize || + node()->outputVarPtr()->userCount.load(std::memory_order_relaxed) != 0) { numRows_ = result.size(); ectx_->setResult(node()->outputVar(), std::move(result)); } else { diff --git a/src/scheduler/AsyncMsgNotifyBasedScheduler.cpp b/src/scheduler/AsyncMsgNotifyBasedScheduler.cpp index eea6c9eb2..e658079d9 100644 --- a/src/scheduler/AsyncMsgNotifyBasedScheduler.cpp +++ b/src/scheduler/AsyncMsgNotifyBasedScheduler.cpp @@ -16,7 +16,9 @@ AsyncMsgNotifyBasedScheduler::AsyncMsgNotifyBasedScheduler(QueryContext* qctx) : folly::Future AsyncMsgNotifyBasedScheduler::schedule() { if (FLAGS_enable_lifetime_optimize) { - qctx_->plan()->root()->outputVarPtr()->setLastUser(-1); // special for root + // special for root + qctx_->plan()->root()->outputVarPtr()->userCount.store(std::numeric_limits::max(), + std::memory_order_relaxed); analyzeLifetime(qctx_->plan()->root()); } auto executor = Executor::create(qctx_->plan()->root(), qctx_); diff --git a/src/scheduler/Scheduler.cpp b/src/scheduler/Scheduler.cpp index d76325367..a233accf5 100644 --- a/src/scheduler/Scheduler.cpp +++ b/src/scheduler/Scheduler.cpp @@ -5,6 +5,8 @@ */ #include "scheduler/Scheduler.h" +#include +#include #include "context/QueryContext.h" #include "executor/ExecutionError.h" @@ -28,10 +30,15 @@ namespace graph { const auto currentInLoop = std::get<1>(current); for (auto& inputVar : currentNode->inputVars()) { if (inputVar != nullptr) { - inputVar->setLastUser( - (currentNode->kind() == PlanNode::Kind::kLoop || currentInLoop) - ? -1 - : currentNode->id()); + if (currentNode->kind() == PlanNode::Kind::kLoop || currentInLoop) { + inputVar->userCount.store(std::numeric_limits::max(), + std::memory_order_relaxed); + } else { + if (inputVar->userCount.load(std::memory_order_relaxed) != + std::numeric_limits::max()) { + inputVar->userCount.fetch_add(1, std::memory_order_relaxed); + } + } } } stack.pop(); @@ -42,13 +49,18 @@ namespace graph { switch (currentNode->kind()) { case PlanNode::Kind::kSelect: { auto sel = static_cast(currentNode); + // used by scheduler + sel->outputVarPtr()->userCount.store(std::numeric_limits::max(), + std::memory_order_relaxed); stack.push(std::make_tuple(sel->then(), currentInLoop)); stack.push(std::make_tuple(sel->otherwise(), currentInLoop)); break; } case PlanNode::Kind::kLoop: { auto loop = static_cast(currentNode); - loop->outputVarPtr()->setLastUser(-1); + // used by scheduler + loop->outputVarPtr()->userCount.store(std::numeric_limits::max(), + std::memory_order_relaxed); stack.push(std::make_tuple(loop->body(), true)); break; }