Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Backport XrootD 5.3.1 PR to master from DEVEL #34991

Merged
merged 9 commits into from
Jan 11, 2022
2 changes: 0 additions & 2 deletions Utilities/XrdAdaptor/BuildFile.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,4 @@
<use name="FWCore/Utilities"/>
<use name="FWCore/MessageLogger"/>
<use name="xrootd"/>
<lib name="XrdCl"/>
<lib name="XrdUtils"/>
<flags CPPDEFINES="_FILE_OFFSET_BITS=64"/>
1 change: 0 additions & 1 deletion Utilities/XrdAdaptor/plugins/BuildFile.xml
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
<library file="XrdStorageMaker.cc" name="UtilitiesXrdAdaptorPlugin">
<use name="Utilities/StorageFactory"/>
<use name="Utilities/XrdAdaptor"/>
<lib name="XrdCl"/>
<flags EDM_PLUGIN="1"/>
<flags CXXFLAGS="-D_FILE_OFFSET_BITS=64"/>
</library>
52 changes: 35 additions & 17 deletions Utilities/XrdAdaptor/plugins/XrdStorageMaker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,38 @@
#include <atomic>
#include <mutex>

class MakerResponseHandler : public XrdCl::ResponseHandler {
public:
void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override {
// Note: Prepare call has a response object.
delete response;
delete status;
}
};
namespace {

class PrepareHandler : public XrdCl::ResponseHandler {
public:
PrepareHandler(const XrdCl::URL &url) : m_fs(url) { m_fileList.push_back(url.GetPath()); }

void callAsyncPrepare() {
auto status = m_fs.Prepare(m_fileList, XrdCl::PrepareFlags::Stage, 0, this);
if (!status.IsOK()) {
LogDebug("StageInError") << "XrdCl::FileSystem::Prepare submit failed with error '" << status.ToStr()
<< "' (errNo = " << status.errNo << ")";
delete this;
}
}

void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override {
// Note: Prepare call has a response object.
if (!status->IsOK()) {
LogDebug("StageInError") << "XrdCl::FileSystem::Prepare failed with error '" << status->ToStr()
<< "' (errNo = " << status->errNo << ")";
}
delete response;
delete status;
delete this;
}

private:
XrdCl::FileSystem m_fs;
std::vector<std::string> m_fileList;
};

} // namespace

namespace edm::storage {
class XrdStorageMaker final : public StorageMaker {
Expand Down Expand Up @@ -75,14 +99,9 @@ namespace edm::storage {

std::string fullpath(proto + ":" + path);
XrdCl::URL url(fullpath);
XrdCl::FileSystem fs(url);
std::vector<std::string> fileList;
fileList.push_back(url.GetPath());
auto status = fs.Prepare(fileList, XrdCl::PrepareFlags::Stage, 0, &m_null_handler);
if (!status.IsOK()) {
edm::LogWarning("StageInError") << "XrdCl::FileSystem::Prepare failed with error '" << status.ToStr()
<< "' (errNo = " << status.errNo << ")";
}

auto prep_handler = new PrepareHandler(url);
prep_handler->callAsyncPrepare();
}

bool check(const std::string &proto,
Expand Down Expand Up @@ -174,7 +193,6 @@ namespace edm::storage {
}

private:
CMS_THREAD_SAFE mutable MakerResponseHandler m_null_handler;
mutable std::mutex m_envMutex;
mutable std::atomic<unsigned int> m_lastDebugLevel;
mutable std::atomic<unsigned int> m_lastTimeout;
Expand Down
10 changes: 1 addition & 9 deletions Utilities/XrdAdaptor/src/XrdHostHandler.hh
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,6 @@
#include "XrdCl/XrdClXRootDResponses.hh"
#include "FWCore/Utilities/interface/get_underlying_safe.h"

#if defined(__linux__)
#define HAVE_ATOMICS 1
#include "XrdSys/XrdSysLinuxSemaphore.hh"
typedef XrdSys::LinuxSemaphore Semaphore;
#else
typedef XrdSysSemaphore Semaphore;
#endif

/**
* The SyncResponseHandler from the XrdCl does not
* preserve the hostinfo list, which we would like to
Expand Down Expand Up @@ -53,7 +45,7 @@ private:
edm::propagate_const<std::unique_ptr<XrdCl::XRootDStatus>> pStatus_;
edm::propagate_const<std::unique_ptr<XrdCl::AnyObject>> pResponse_;
edm::propagate_const<std::unique_ptr<XrdCl::HostList>> pHostList_;
Semaphore sem;
XrdSysSemaphore sem;
};

#endif
40 changes: 27 additions & 13 deletions Utilities/XrdAdaptor/src/XrdRequestManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -553,30 +553,37 @@ std::future<IOSize> RequestManager::handle(std::shared_ptr<XrdAdaptor::ClientReq
}

std::string RequestManager::prepareOpaqueString() const {
std::stringstream ss;
ss << "tried=";
size_t count = 0;
struct {
std::stringstream ss;
size_t count = 0;
bool has_active = false;

void append_tried(const std::string &id, bool active = false) {
ss << (count ? "," : "tried=") << id;
count++;
if (active) {
has_active = true;
}
}
} state;
{
std::lock_guard<std::recursive_mutex> sentry(m_source_mutex);

for (const auto &it : m_activeSources) {
count++;
ss << it->ExcludeID().substr(0, it->ExcludeID().find(':')) << ",";
state.append_tried(it->ExcludeID().substr(0, it->ExcludeID().find(':')), true);
}
for (const auto &it : m_inactiveSources) {
count++;
ss << it->ExcludeID().substr(0, it->ExcludeID().find(':')) << ",";
state.append_tried(it->ExcludeID().substr(0, it->ExcludeID().find(':')));
}
}
for (const auto &it : m_disabledExcludeStrings) {
count++;
ss << it.substr(0, it.find(':')) << ",";
state.append_tried(it.substr(0, it.find(':')));
}
if (count) {
std::string tmp_str = ss.str();
return tmp_str.substr(0, tmp_str.size() - 1);
if (state.has_active) {
state.ss << "&triedrc=resel";
}
return "";

return state.ss.str();
}

void XrdAdaptor::RequestManager::handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr<Source> source) {
Expand Down Expand Up @@ -1020,6 +1027,13 @@ void XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts(XrdCl::XRo
ex.addContext("In XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts()");
manager->addConnections(ex);

// Brian, should we do something like this:
// if (status.status == XrdCl::errRedirectLimit) {
// // The following method does not exist (yet), would probaly need a multiplier for OPEN_DELAY.
// // Note that with XCache cluster one will never get multiple sources.
// manager->increaseMultiSourceInterval();
// }

m_promise.set_exception(std::make_exception_ptr(ex));
}
}
Expand Down