Skip to content

Commit

Permalink
Merge pull request #13080 from bbockelm/allow_query_timeouts
Browse files Browse the repository at this point in the history
Allow FileSystem queries to occur from callback threads.
  • Loading branch information
davidlange6 committed Jan 29, 2016
2 parents d120b1d + 2ef8f99 commit 93fb804
Show file tree
Hide file tree
Showing 3 changed files with 125 additions and 6 deletions.
11 changes: 10 additions & 1 deletion Utilities/XrdAdaptor/src/XrdRequestManager.cc
Expand Up @@ -1065,6 +1065,9 @@ XrdAdaptor::RequestManager::OpenHandler::~OpenHandler()
void
XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts(XrdCl::XRootDStatus *status_ptr, XrdCl::AnyObject *, XrdCl::HostList *hostList_ptr)
{
// Make sure that we set m_outstanding_open to false on exit from this function.
std::unique_ptr<char, std::function<void(char*)>> outstanding_guard(nullptr, [&](char*){m_outstanding_open=false;});

std::shared_ptr<Source> source;
std::unique_ptr<XrdCl::XRootDStatus> status(status_ptr);
std::unique_ptr<XrdCl::HostList> hostList(hostList_ptr);
Expand Down Expand Up @@ -1165,7 +1168,7 @@ XrdAdaptor::RequestManager::OpenHandler::open()
// and make a call into xrootd (when it invokes m_file.reset()). Hence, our callback
// holds our mutex and attempts to grab an Xrootd mutex; RequestManager::requestFailure holds
// an Xrootd mutex and tries to hold m_mutex. This is a classic deadlock.
if (m_file.get())
if (m_outstanding_open)
{
return m_shared_future;
}
Expand All @@ -1178,6 +1181,11 @@ XrdAdaptor::RequestManager::OpenHandler::open()
std::string new_name = manager.m_name + ((manager.m_name.find("?") == manager.m_name.npos) ? "?" : "&") + opaque;
edm::LogVerbatim("XrdAdaptorInternal") << "Trying to open URL: " << new_name;
m_file.reset(new XrdCl::File());
m_outstanding_open = true;

// Always make sure we release m_file and set m_outstanding_open to false on error.
std::unique_ptr<char, std::function<void(char*)>> exit_guard(nullptr, [&](char*){m_outstanding_open = false; m_file.reset();});

XrdCl::XRootDStatus status;
if (!(status = m_file->Open(new_name, manager.m_flags, manager.m_perms, this)).IsOK())
{
Expand All @@ -1191,6 +1199,7 @@ XrdAdaptor::RequestManager::OpenHandler::open()
manager.addConnections(ex);
throw ex;
}
exit_guard.release();
// Have a strong self-reference for as long as the callback is in-progress.
m_self = self_ptr;
return m_shared_future;
Expand Down
5 changes: 4 additions & 1 deletion Utilities/XrdAdaptor/src/XrdRequestManager.h
Expand Up @@ -278,7 +278,10 @@ class RequestManager : boost::noncopyable {
OpenHandler(std::weak_ptr<RequestManager> manager);
std::shared_future<std::shared_ptr<Source> > m_shared_future;
std::promise<std::shared_ptr<Source> > m_promise;
// When this is not null, there is a file-open in process
// Set to true only when there is an outstanding open request; not
// protected by m_mutex, so the caller is required to know it is in a
// thread-safe context.
std::atomic<bool> m_outstanding_open {false};
// Can only be touched when m_mutex is held.
std::unique_ptr<XrdCl::File> m_file;
std::recursive_mutex m_mutex;
Expand Down
115 changes: 111 additions & 4 deletions Utilities/XrdAdaptor/src/XrdSource.cc
Expand Up @@ -77,6 +77,114 @@ class DelayedClose : boost::noncopyable, public XrdCl::ResponseHandler
std::string m_site;
};


/**
* A handler for querying a XrdCl::FileSystem object which is safe to be
* invoked from an XrdCl callback (that is, we don't need an available callback
* thread to timeout).
*/
class QueryAttrHandler : public XrdCl::ResponseHandler
{
friend std::unique_ptr<QueryAttrHandler> std::make_unique<QueryAttrHandler>();

public:

virtual ~QueryAttrHandler() = default;
QueryAttrHandler(const QueryAttrHandler&) = delete;
QueryAttrHandler& operator=(const QueryAttrHandler&) = delete;


static XrdCl::XRootDStatus query(XrdCl::FileSystem &fs, const std::string &attr, std::chrono::milliseconds timeout, std::string &result)
{
auto handler = std::make_unique<QueryAttrHandler>();
auto l_state = std::make_shared<QueryAttrState>();
handler->m_state = l_state;
XrdCl::Buffer arg(attr.size());
arg.FromString(attr);

XrdCl::XRootDStatus st = fs.Query(XrdCl::QueryCode::Config, arg, handler.get());
if (!st.IsOK())
{
return st;
}

// Successfully registered the callback; it will always delete itself, so we shouldn't.
handler.release();

std::unique_lock<std::mutex> guard(l_state->m_mutex);
// Wait until some status is available or a timeout.
l_state->m_condvar.wait_for(guard, timeout, [&]{return l_state->m_status.get();});

if (l_state->m_status)
{
if (l_state->m_status->IsOK())
{
result = l_state->m_response->ToString();
}
return *(l_state->m_status);
}
else
{ // We had a timeout; construct a reasonable message.
return XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errSocketTimeout, 1, "Timeout when waiting for query callback.");
}
}


private:

QueryAttrHandler() {}


virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response ) override
{
// NOTE: we own the status and response pointers.
std::unique_ptr<XrdCl::AnyObject> response_mgr;
response_mgr.reset(response);

// Lock our state information then dispose of our object.
auto l_state = m_state.lock();
delete this;
if (!l_state) {return;}

// On function exit, notify any waiting threads.
std::unique_ptr<char, std::function<void(char*)>> notify_guard(nullptr, [&](char *) {l_state->m_condvar.notify_all();});

{
// On exit from the block, make sure m_status is set; it needs to be set before we notify threads.
std::unique_ptr<char, std::function<void(char*)>> exit_guard(nullptr, [&](char *) {if (!l_state->m_status) l_state->m_status.reset(new XrdCl::XRootDStatus(XrdCl::stError, XrdCl::errInternal));});
if (!status) {return;}
if (status->IsOK())
{
if (!response) {return;}
XrdCl::Buffer *buf_ptr;
response->Get(buf_ptr);
// AnyObject::Set lacks specialization for nullptr
response->Set(static_cast<int *>(nullptr));
l_state->m_response.reset(buf_ptr);
}
l_state->m_status.reset(status);
}
}


// Represents the current state of the callback. The parent class only manages a weak_ptr
// to the state. If the asynchronous callback cannot lock the weak_ptr, then it assumes the
// main thread has given up and doesn't touch any of the state variables.
struct QueryAttrState {

// Synchronize between the callback thread and the main thread; condvar predicate
// is having m_status set. m_mutex protects m_status.
std::mutex m_mutex;
std::condition_variable m_condvar;

// Results from the server
std::unique_ptr<XrdCl::XRootDStatus> m_status;
std::unique_ptr<XrdCl::Buffer> m_response;
};
std::weak_ptr<QueryAttrState> m_state;
};


Source::Source(timespec now, std::unique_ptr<XrdCl::File> fh, const std::string &exclude)
: m_lastDowngrade({0, 0}),
m_id("(unknown)"),
Expand Down Expand Up @@ -239,17 +347,16 @@ Source::getXrootdSiteFromURL(std::string url, std::string &site)
arg.FromString( attr );

XrdCl::FileSystem fs(url);
XrdCl::XRootDStatus st = fs.Query(XrdCl::QueryCode::Config, arg, response);
std::string rsite;
XrdCl::XRootDStatus st = QueryAttrHandler::query(fs, "sitename", std::chrono::seconds(1), rsite);
if (!st.IsOK())
{
XrdCl::URL xurl(url);
getDomain(xurl.GetHostName(), site);
delete response;
return false;
}
std::string rsite = response->ToString();
delete response;
if (rsite.size() && (rsite[rsite.size()-1] == '\n'))
if (!rsite.empty() && (rsite[rsite.size()-1] == '\n'))
{
rsite = rsite.substr(0, rsite.size()-1);
}
Expand Down

0 comments on commit 93fb804

Please sign in to comment.