Skip to content

Commit

Permalink
Remove GIL from RRefContext (pytorch#32807)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: pytorch#32807

After this commit, RRefContext no longer depends on pybind.

Test Plan: Imported from OSS

Differential Revision: D19636316

Pulled By: mrshenli

fbshipit-source-id: 88faa101c32e9019e979ae8e5da6706e49842726
  • Loading branch information
mrshenli authored and facebook-github-bot committed Jan 30, 2020
1 parent 413c0f6 commit a40a19c
Show file tree
Hide file tree
Showing 6 changed files with 61 additions and 15 deletions.
6 changes: 5 additions & 1 deletion torch/csrc/distributed/rpc/init.cpp
Expand Up @@ -271,7 +271,11 @@ If the future completes with an error, an exception is thrown.
});

module.def("_destroy_rref_context", [](bool ignoreRRefLeak) {
RRefContext::getInstance().destroyInstance(ignoreRRefLeak);
// NB: do not release GIL in the function. The destroyInstance() method
// returns a list of deleted OwnerRRefs that hold py::object instances.
// Clearing those OwnerRRefs are likely to trigger Python deref, which
// requires GIL.
RRefContext::getInstance().destroyInstance(ignoreRRefLeak).clear();
});

module.def("_rref_context_get_debug_info", []() {
Expand Down
1 change: 0 additions & 1 deletion torch/csrc/distributed/rpc/py_rref.cpp
Expand Up @@ -96,7 +96,6 @@ PyRRef PyRRef::unpickle(const py::tuple& t) {
TypePtr rref_type =
PythonRpcHandler::getInstance().parseTypeFromStr(rfd.type_str_);
rref = ctx.getOrCreateRRef(rfd, rref_type);

ctx.notifyOwnerAndParentOfFork(rfd.forkId_, rfd.parent_, rref);
return PyRRef(std::move(rref));
}
Expand Down
6 changes: 5 additions & 1 deletion torch/csrc/distributed/rpc/python_functions.cpp
Expand Up @@ -68,7 +68,11 @@ void finishCreatingOwnerRRef(
rr->rrefId() == rr->forkId(),
"Expecting an OwnerRRef as RemoteRet but got a fork.");
auto& ctx = RRefContext::getInstance();
ctx.delForkOfOwner(rr->rrefId(), rr->rrefId());
auto deletedRRef = ctx.delForkOfOwner(rr->rrefId(), rr->rrefId());
if (deletedRRef && deletedRRef->isPyObj()) {
pybind11::gil_scoped_acquire ag;
deletedRRef.reset();
}
}

std::shared_ptr<FutureMessage> sendPythonRemoteCall(
Expand Down
6 changes: 5 additions & 1 deletion torch/csrc/distributed/rpc/request_callback_impl.cpp
Expand Up @@ -223,7 +223,11 @@ std::shared_ptr<FutureMessage> RequestCallbackImpl::processRpc(
case MessageType::RREF_USER_DELETE: {
auto& rud = static_cast<RRefUserDelete&>(rpc);
auto& ctx = RRefContext::getInstance();
ctx.delForkOfOwner(rud.rrefId(), rud.forkId());
auto deletedRRef = ctx.delForkOfOwner(rud.rrefId(), rud.forkId());
if (deletedRRef && deletedRRef->isPyObj()) {
pybind11::gil_scoped_acquire ag;
deletedRRef.reset();
}
return wrap(std::move(RRefAck()).toMessage());
}
case MessageType::RREF_CHILD_ACCEPT: {
Expand Down
38 changes: 29 additions & 9 deletions torch/csrc/distributed/rpc/rref_context.cpp
Expand Up @@ -28,13 +28,23 @@ RRefContext& RRefContext::getInstance() {
return *context;
}

void RRefContext::destroyInstance(bool ignoreRRefLeak) {
std::vector<std::shared_ptr<RRef>> RRefContext::destroyInstance(
bool ignoreRRefLeak) {
auto& ctx = RRefContext::getInstance();
{
std::lock_guard<std::mutex> lock(ctx.destroyedMutex_);
ctx.destroyed_ = true;
}
ctx.checkRRefLeaks(ignoreRRefLeak);
std::vector<std::shared_ptr<RRef>> deletedRRefs;
for (auto& entry : ctx.owners_) {
auto rref = entry.second;
if (rref->isPyObj()) {
deletedRRefs.emplace_back(std::move(rref));
}
}
ctx.owners_.clear();
return deletedRRefs;
}

void RRefContext::handleException(
Expand All @@ -51,8 +61,9 @@ RRefContext::RRefContext(std::shared_ptr<RpcAgent> agent)

RRefContext::~RRefContext() {
if (!owners_.empty()) {
pybind11::gil_scoped_acquire ag;
owners_.clear();
VLOG(1) << "Destructing RRefContext with non-empty OwnerRRef set. "
<< "This would likely cause Python deref error. "
<< "Make sure destroyInstance() is invoked before destruction.";
}
}

Expand Down Expand Up @@ -250,7 +261,17 @@ void RRefContext::notifyOwnerAndParentOfFork(
if (parent == agent_->getWorkerInfo().id_) {
// Owner sending RRef to self, remove the forkId as it was added during
// pickling
delForkOfOwner(rref->rrefId(), forkId);
auto deletedRRef = delForkOfOwner(rref->rrefId(), forkId);
if (deletedRRef) {
TORCH_INTERNAL_ASSERT(
deletedRRef->rrefId() == rref->rrefId(),
"Deleting a fork of ",
rref->rrefId(),
" triggered deleting the OwnerRRef of ",
deletedRRef->rrefId());
// NB: not necessary to reset deletedRRef as rref is another shared_ptr
// instance pointing to the same OwnerRRef.
}
} else {
// If the parent is the owner, this fork has already been added into the
// forks_ map when the owner sends the message to the callee user. Hence,
Expand Down Expand Up @@ -363,7 +384,9 @@ void RRefContext::addForkOfOwner(const RRefId& rrefId, const ForkId& forkId) {
rrefForks.insert(forkId);
}

void RRefContext::delForkOfOwner(const RRefId& rrefId, const ForkId& forkId) {
std::shared_ptr<RRef> RRefContext::delForkOfOwner(
const RRefId& rrefId,
const ForkId& forkId) {
std::shared_ptr<RRef> deletedRRef = nullptr;
{
std::lock_guard<std::mutex> lock(mutex_);
Expand All @@ -389,10 +412,7 @@ void RRefContext::delForkOfOwner(const RRefId& rrefId, const ForkId& forkId) {
forks_.erase(rrefIter);
}
}
if (deletedRRef && deletedRRef->isPyObj()) {
pybind11::gil_scoped_acquire ag;
deletedRRef.reset();
}
return deletedRRef;
}

} // namespace rpc
Expand Down
19 changes: 17 additions & 2 deletions torch/csrc/distributed/rpc/rref_context.h
Expand Up @@ -23,7 +23,13 @@ void confirmPendingUser(
class RRefContext {
public:
static RRefContext& getInstance();
static void destroyInstance(bool ignoreRRefLeak = true);
// NB: This method must be called before destructing RRefContext singleton.
// Similar to delForkOfOwner, this method returns a vector of OwnerRRefs that
// hold py::object. The call-site is also responsible for resetting those
// shared_ptr objects with a GIL. See comments at delForkOfOwner() for more
// details.
static std::vector<std::shared_ptr<RRef>> destroyInstance(
bool ignoreRRefLeak = true);

static void handleException(const c10::optional<utils::FutureError>& futErr);

Expand Down Expand Up @@ -93,7 +99,16 @@ class RRefContext {
void addForkOfOwner(const RRefId& rrefId, const ForkId& forkId);
// Delete a fork of the ``OwnerRRef``. NB: this could trigger deletion on the
// IValue or py::object. For the later, this method will acquire GIL.
void delForkOfOwner(const RRefId& rrefId, const ForkId& forkId);
// NB: If this fork deletion triggered deleting OwnerRRef, this method will
// return a shared_ptr to the OwnerRRef, which is likely to be the last
// shared_ptr instance for it. Therefore, deleting this shared_ptr<OwnerRRef>
// will also trigger deleting the object it points to. If OwnerRRef holds a
// py::object, deleting it require GIL. The call site should guarded it with
// a GIL and reset the shared_ptr. The GIL-guarded deletion is intentionally
// left out of this function to avoid creating dependency on pybind.
std::shared_ptr<RRef> delForkOfOwner(
const RRefId& rrefId,
const ForkId& forkId);

// Invoked when pickling an RRef to setup child/fork properly
RRefForkData prepareChildFork(const std::shared_ptr<RRef>& rref);
Expand Down

0 comments on commit a40a19c

Please sign in to comment.