Skip to content

Commit

Permalink
Merge pull request #34991 from mrodozov/backport-xrootd-pr-to-master
Browse files Browse the repository at this point in the history
Backport XrootD 5.3.1 PR to master from DEVEL
  • Loading branch information
cmsbuild committed Jan 11, 2022
2 parents b3ce8c2 + 75089fb commit afbbf4e
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 42 deletions.
2 changes: 0 additions & 2 deletions Utilities/XrdAdaptor/BuildFile.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,4 @@
<use name="FWCore/Utilities"/>
<use name="FWCore/MessageLogger"/>
<use name="xrootd"/>
<lib name="XrdCl"/>
<lib name="XrdUtils"/>
<flags CPPDEFINES="_FILE_OFFSET_BITS=64"/>
1 change: 0 additions & 1 deletion Utilities/XrdAdaptor/plugins/BuildFile.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
<library file="XrdStorageMaker.cc" name="UtilitiesXrdAdaptorPlugin">
<use name="Utilities/StorageFactory"/>
<use name="Utilities/XrdAdaptor"/>
<lib name="XrdCl"/>
<flags EDM_PLUGIN="1"/>
<flags CXXFLAGS="-D_FILE_OFFSET_BITS=64"/>
</library>
52 changes: 35 additions & 17 deletions Utilities/XrdAdaptor/plugins/XrdStorageMaker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,38 @@
#include <atomic>
#include <mutex>

class MakerResponseHandler : public XrdCl::ResponseHandler {
public:
void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override {
// Note: Prepare call has a response object.
delete response;
delete status;
}
};
namespace {

class PrepareHandler : public XrdCl::ResponseHandler {
public:
PrepareHandler(const XrdCl::URL &url) : m_fs(url) { m_fileList.push_back(url.GetPath()); }

void callAsyncPrepare() {
auto status = m_fs.Prepare(m_fileList, XrdCl::PrepareFlags::Stage, 0, this);
if (!status.IsOK()) {
LogDebug("StageInError") << "XrdCl::FileSystem::Prepare submit failed with error '" << status.ToStr()
<< "' (errNo = " << status.errNo << ")";
delete this;
}
}

void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override {
// Note: Prepare call has a response object.
if (!status->IsOK()) {
LogDebug("StageInError") << "XrdCl::FileSystem::Prepare failed with error '" << status->ToStr()
<< "' (errNo = " << status->errNo << ")";
}
delete response;
delete status;
delete this;
}

private:
XrdCl::FileSystem m_fs;
std::vector<std::string> m_fileList;
};

} // namespace

namespace edm::storage {
class XrdStorageMaker final : public StorageMaker {
Expand Down Expand Up @@ -75,14 +99,9 @@ namespace edm::storage {

std::string fullpath(proto + ":" + path);
XrdCl::URL url(fullpath);
XrdCl::FileSystem fs(url);
std::vector<std::string> fileList;
fileList.push_back(url.GetPath());
auto status = fs.Prepare(fileList, XrdCl::PrepareFlags::Stage, 0, &m_null_handler);
if (!status.IsOK()) {
edm::LogWarning("StageInError") << "XrdCl::FileSystem::Prepare failed with error '" << status.ToStr()
<< "' (errNo = " << status.errNo << ")";
}

auto prep_handler = new PrepareHandler(url);
prep_handler->callAsyncPrepare();
}

bool check(const std::string &proto,
Expand Down Expand Up @@ -174,7 +193,6 @@ namespace edm::storage {
}

private:
CMS_THREAD_SAFE mutable MakerResponseHandler m_null_handler;
mutable std::mutex m_envMutex;
mutable std::atomic<unsigned int> m_lastDebugLevel;
mutable std::atomic<unsigned int> m_lastTimeout;
Expand Down
10 changes: 1 addition & 9 deletions Utilities/XrdAdaptor/src/XrdHostHandler.hh
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,6 @@
#include "XrdCl/XrdClXRootDResponses.hh"
#include "FWCore/Utilities/interface/get_underlying_safe.h"

#if defined(__linux__)
#define HAVE_ATOMICS 1
#include "XrdSys/XrdSysLinuxSemaphore.hh"
typedef XrdSys::LinuxSemaphore Semaphore;
#else
typedef XrdSysSemaphore Semaphore;
#endif

/**
* The SyncResponseHandler from the XrdCl does not
* preserve the hostinfo list, which we would like to
Expand Down Expand Up @@ -53,7 +45,7 @@ private:
edm::propagate_const<std::unique_ptr<XrdCl::XRootDStatus>> pStatus_;
edm::propagate_const<std::unique_ptr<XrdCl::AnyObject>> pResponse_;
edm::propagate_const<std::unique_ptr<XrdCl::HostList>> pHostList_;
Semaphore sem;
XrdSysSemaphore sem;
};

#endif
40 changes: 27 additions & 13 deletions Utilities/XrdAdaptor/src/XrdRequestManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -558,30 +558,37 @@ std::future<IOSize> RequestManager::handle(std::shared_ptr<XrdAdaptor::ClientReq
}

std::string RequestManager::prepareOpaqueString() const {
std::stringstream ss;
ss << "tried=";
size_t count = 0;
struct {
std::stringstream ss;
size_t count = 0;
bool has_active = false;

void append_tried(const std::string &id, bool active = false) {
ss << (count ? "," : "tried=") << id;
count++;
if (active) {
has_active = true;
}
}
} state;
{
std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);

for (const auto &it : m_activeSources) {
count++;
ss << it->ExcludeID().substr(0, it->ExcludeID().find(':')) << ",";
state.append_tried(it->ExcludeID().substr(0, it->ExcludeID().find(':')), true);
}
for (const auto &it : m_inactiveSources) {
count++;
ss << it->ExcludeID().substr(0, it->ExcludeID().find(':')) << ",";
state.append_tried(it->ExcludeID().substr(0, it->ExcludeID().find(':')));
}
}
for (const auto &it : m_disabledExcludeStrings) {
count++;
ss << it.substr(0, it.find(':')) << ",";
state.append_tried(it.substr(0, it.find(':')));
}
if (count) {
std::string tmp_str = ss.str();
return tmp_str.substr(0, tmp_str.size() - 1);
if (state.has_active) {
state.ss << "&triedrc=resel";
}
return "";

return state.ss.str();
}

void XrdAdaptor::RequestManager::handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr<Source> source) {
Expand Down Expand Up @@ -1025,6 +1032,13 @@ void XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts(XrdCl::XRo
ex.addContext("In XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts()");
manager->addConnections(ex);

// Brian, should we do something like this:
// if (status.status == XrdCl::errRedirectLimit) {
// // The following method does not exist (yet), would probaly need a multiplier for OPEN_DELAY.
// // Note that with XCache cluster one will never get multiple sources.
// manager->increaseMultiSourceInterval();
// }

m_promise.set_exception(std::make_exception_ptr(ex));
}
}
Expand Down

0 comments on commit afbbf4e

Please sign in to comment.