From 071d525a8404609cb46f61e933878e27c8743f76 Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Tue, 13 Aug 2013 15:26:44 -0500 Subject: [PATCH 1/7] Add multisource version of XrdAdaptor; based on the prior dev branch at https://github.com/bbockelm/XrdAdaptor. --- .../doc/multisource_algorithm_design.txt | 124 ++++ .../XrdAdaptor/plugins/XrdStorageMaker.cc | 17 +- Utilities/XrdAdaptor/src/QualityMetric.cc | 172 +++++ Utilities/XrdAdaptor/src/QualityMetric.h | 103 +++ Utilities/XrdAdaptor/src/XrdFile.cc | 341 ++++++---- Utilities/XrdAdaptor/src/XrdFile.h | 32 +- Utilities/XrdAdaptor/src/XrdRequest.cc | 81 +++ Utilities/XrdAdaptor/src/XrdRequest.h | 90 +++ Utilities/XrdAdaptor/src/XrdRequestManager.cc | 595 ++++++++++++++++++ Utilities/XrdAdaptor/src/XrdRequestManager.h | 170 +++++ Utilities/XrdAdaptor/src/XrdSource.cc | 89 +++ Utilities/XrdAdaptor/src/XrdSource.h | 58 ++ 12 files changed, 1713 insertions(+), 159 deletions(-) create mode 100644 Utilities/XrdAdaptor/doc/multisource_algorithm_design.txt create mode 100644 Utilities/XrdAdaptor/src/QualityMetric.cc create mode 100644 Utilities/XrdAdaptor/src/QualityMetric.h create mode 100644 Utilities/XrdAdaptor/src/XrdRequest.cc create mode 100644 Utilities/XrdAdaptor/src/XrdRequest.h create mode 100644 Utilities/XrdAdaptor/src/XrdRequestManager.cc create mode 100644 Utilities/XrdAdaptor/src/XrdRequestManager.h create mode 100644 Utilities/XrdAdaptor/src/XrdSource.cc create mode 100644 Utilities/XrdAdaptor/src/XrdSource.h diff --git a/Utilities/XrdAdaptor/doc/multisource_algorithm_design.txt b/Utilities/XrdAdaptor/doc/multisource_algorithm_design.txt new file mode 100644 index 0000000000000..eb473aa52523c --- /dev/null +++ b/Utilities/XrdAdaptor/doc/multisource_algorithm_design.txt @@ -0,0 +1,124 @@ +Introduction +The existing AAA infrastructure has a relatively high penalty for poor redirection decisions. A client which is redirected to a "poor" server in terms of network locality (in terms of high latency or low bandwidth) will continue to use that server as long as it doesn't fail outright, even if there is a much better source available. + +Improving network locality argues for improving the redirector's redirection logic. This is certainly needed, however network locality is not the only concern. A site's storage can degrade in the middle of a file transfer (due to a device loss in the RAID, or sudden spurt of new transfers), making a "reasonable choice" at redirection time perform poorly overall. + +To reduce the penalty for poor redirection decisions, we aim to upgrade the CMSSW XrdAdaptor to be multi-source capable. By actively using multiple sources and probing for additional ones, the client will be able to discover the fastest sources, even if the designation of "fastest" changes through the job's lifetime. + +Design Goals +Design goals of the multi-source Xrootd client: +(0) Determine a metric for quality of the source server. +(1) Actively balance transfers over multiple links in order to determine several high-quality sources of the file. +(2) Recover from transient IO errors at a single source. +(3) Probe for additional possible sources during the file transfer. +(4) Minimize the impact on the source site versus a single-source client. Understand both average case and the worst case scenarios. + - In particular, actively utilizing too many sources can cause TCP windows to stay small and OS read-ahead to be inefficient. + - Any speculative probes for additional sources should be a small percentage of the total traffic. + - Any retry mechanisms must have reasonable delays prior to failure. +(5) Have the number of requests per source be proportional to source quality. + - Servers or sites experiencing a "soft failure" causing a degrade in quality will receive the least amount of traffic. + +Implementation + +Quality metric +A source's quality is to be defined, per-file, to be the average request response time over the last 5 time intervals. Each time interval is one minute long; time intervals with no data points are discarded. + +If there is no previously recorded data for a given source, it is assumed to have an average of 260ms (assumes a 256KB request, 1MB/s server speed, and 10ms of latency) in one time interval. When a new file is opened on a source, the prior average for that source is used for the first time interval. + +Notes: +- The request splitting algorithm outlined below will split all client requests into a series of requests at most 256KB (similar to what the Xrootd client does internally). Since the request has a maximum size, it makes looking at the unweighted time per request more reasonable. +- It may seem strange to the reader to not differentiate between bandwidth and latency, or somehow factoring in request size to the quality metric. I believe it is acceptable to ignore this as the distribution of small and large requests will remain approximately constant throughout the job lifetime. + +Source selection algorithm +The client will maintain a set of up to two "active servers" and an arbitrary number of inactive servers. When the client opens a file, the initial data server it receives from the redirector becomes the first active server. + +For a 5-second grace period, this initial server remains the only active one. (The use of "5 seconds" as the grace period is motivated by the redirector's implementation; any internal file location requests triggered by the initial file open should be finished after 5 seconds.) After the grace period, the client enters source search mode. + +When in source search mode, the client will try trigger a new file open at most once every 5 seconds. When trying a new file-open, it explicitly requests the redirector avoid any previously-used sources for that file. Only one file open operation may be in progress at a time. When a file-open is complete, that server is added to the set of active servers. When a "file not found" is returned by the redirector, a new file open will not be performed for at least 2 minutes. + +When an active source's quality goes above 5130 (256kb request, 50kb/s bandwidth, 10ms latency), the source is moved to the inactive set if it is not the only active server. + +If an active source's quality is a factor 10 worse than the other active source, it is moved to the inactive set. + +When there is only one source in the active set, a source is promoted from the inactive set if its quality is below 5130 and is not factor 10 worse than the other active source. If no server in the inactive set is eligible, the client re-enters search mode for additional sources. + +If a source encounters an error (either a file IO error or a disconnect), then it is marked as disabled and removed from both active and inactive sets. + +If an inactive source's quality metric is better than an active source's metric, the two are swapped. This swap is not performed if the inactive source itself has been removed from the active set in the last two minutes. The "Active probe algorithm" section below describes one mechanism for updating an inactive source's quality metric. + +Request splitting algorithm +When a client performs a new request, the request is balanced amongst the active servers using the following algorithm: + +1) Source A removes 256KB from the beginning of the request and places it at the end of its request queue. +2) Source B removes 256KB from the end 256KB of the request and places it at the beginning of its request queue. +3) Steps (1) and (2) are repeated until the original request has been completely split into the two queues. + +After each client request has been fully split, the labels "A" and "B" are swapped. This allows a series of small client requests to be load-balanced between the two sources. + +Examples: + +For example, suppose a client requests to read 1024KB starting at offset 0. The client request and queues look like: + +A: [] +B: [] +client request: [1024KB @ 0] + +After steps 1 and 2, the sources have the following queues: + +A: [256KB @ 0] +B: [256KB @ 768KB] +client request: [512KB @ 256KB] + +After steps 1 and 2 are applied again, we have the following queues: + +A: [256KB @ 0, 256KB @ 256KB] +B: [256KB @ 512KB, 256KB @ 768KB] +client request: [] + +Consider the following example for a vectored read request: +Start: +A: [] +B: [] +client request: [192KB @ 0, 128KB @ 256KB, 128KB @ 512KB, 192KB @ 768KB] + +Iteration 1: +A: [192KB @ 0, 64KB @ 256KB] +B: [64KB @ 576KB, 192KB @ 768KB] +client request: [64KB @ 320KB, 64KB @ 512KB] + +Iteration 2: +A: [192KB @ 0, 64KB @ 256KB, 64KB @ 320KB, 64KB @ 512KB] +B: [64KB @ 576KB, 192KB @ 768KB] +client request: [] + +Notes: +- The algorithm halts because, after each iteration, the client request is reduced by 512KB and the algorithm terminates once the client request is empty. +- The request is split between the two clients; the difference in number of bytes assigned is at max 256KB. +- 256KB is a natural unit of size as the Xrootd client break requests into this size by default. +- If the client request, when performed in order, results in the server performing non-overlapping reads with monotonically-increasing offsets, then the split requests will also have this property. +- TODO: We should have the initial assignment done with respect to the quality metric. When the two servers are heavily out-of-balance, one may steal a lot of work from the other. When work is stolen, it is performed via backward reads, which won't cause ideal filesystem performance. + +Load-balance algorithm + +When a client request is made, it is split into two sets of requests as described previously if there are two active servers. Each source performs the IO operations in its queue in order to completion. + +If two sources are active and one source has already finished its queue, it may steal work from the end of the other source's queue if the other source has not already started on that IO operation. + +If one source has not completed its request in more than 4 times the quality metric and the other source is idle, then the other source may speculatively start the same IO operation. The results of this "speculative read" are stored in a separate, statically-allocated 256KB buffer; this means only one speculative read at a time is allowed. The first request to complete is returned to the client. + +If an IO error occurs on one active source, the same IO operation is inserted into the other source's queue. If the IO operation fails in the other active source, it is repeated immediately on all inactive source. The first inactive source to successfully complete the IO is swapped into the active set, removing the currently worst-performing active source. + +Notes: +- Due to the way the quality metric is calculated, it could take a few minutes for an active source that starts moving data at 1 byte / sec to become inactive. Should we re-weight the time intervals to give more weight to recent results. + - TODO: calculate the worst case over-read for very slow sources if they get their work stolen all the time. + - The over-read is probably not as bad as having to take quite some time to give up on the source. We can probably introduce a penalty for sources that are the "victim" of a successful speculative read. + +Active probe algorithm + +If the client is not in search mode, for every 1024KB of data read, generate a random number such that there will be a .25% chance the request will also be issued as a speculative read to an inactive source. If it has been at least 2 minutes since the last file-open request to the redirector, there is also a .25% chance the request will trigger a file-open and a speculative read in the redirector. + +The results of the speculative read are kept in the same statically-allocated buffer as used by the load-balance algorithm. Only one speculative reads can be issued at a time, including reads from the load-balance algorithm. The timing results of the read are used to update the quality metric for the inactive or new sources. If the request is finished prior to the active source, the results of the speculative read will be used to fulfill the client request. + +Notes: +- Assuming a client request is 256KB, the active probe algorithm should be no more than 2% of the total traffic. + diff --git a/Utilities/XrdAdaptor/plugins/XrdStorageMaker.cc b/Utilities/XrdAdaptor/plugins/XrdStorageMaker.cc index 6c1493f27a918..d87e3fce56da7 100644 --- a/Utilities/XrdAdaptor/plugins/XrdStorageMaker.cc +++ b/Utilities/XrdAdaptor/plugins/XrdStorageMaker.cc @@ -2,9 +2,16 @@ #include "Utilities/StorageFactory/interface/StorageMakerFactory.h" #include "Utilities/StorageFactory/interface/StorageFactory.h" #include "Utilities/XrdAdaptor/src/XrdFile.h" + +// These are to be removed once the new client supports prepare requests. #include "XrdClient/XrdClientAdmin.hh" #include "XrdClient/XrdClientUrlSet.hh" -#include "XrdClient/XrdClientEnv.hh" +#include "XrdCl/XrdClDefaultEnv.hh" +// We muck with internal symbols of XrdCl to avoid duplicate definition issues +// See https://github.com/xrootd/xrootd/issues/16 +#define __XRD_CL_OPTIMIZERS_HH__ +#include "XrdCl/XrdClLog.hh" + class XrdStorageMaker : public StorageMaker { @@ -15,11 +22,6 @@ class XrdStorageMaker : public StorageMaker const std::string &path, int mode) { - // The important part here is not the cache size (which will get - // auto-adjusted), but the fact the cache is set to something non-zero. - // If we don't do this before creating the XrdFile object, caching will be - // completely disabled, resulting in poor performance. - EnvPutInt(NAME_READCACHESIZE, 20*1024*1024); StorageFactory *f = StorageFactory::get(); StorageFactory::ReadHint readHint = f->readHint(); @@ -74,7 +76,8 @@ class XrdStorageMaker : public StorageMaker virtual void setDebugLevel (unsigned int level) { - EnvPutInt("DebugLevel", level); + //XrdCl::Log *log = XrdCl::DefaultEnv::GetLog(); + //log->SetLevel(static_cast(level+2)); } }; diff --git a/Utilities/XrdAdaptor/src/QualityMetric.cc b/Utilities/XrdAdaptor/src/QualityMetric.cc new file mode 100644 index 0000000000000..5f42872cbbeab --- /dev/null +++ b/Utilities/XrdAdaptor/src/QualityMetric.cc @@ -0,0 +1,172 @@ + +#include + +#include "FWCore/MessageLogger/interface/MessageLogger.h" + +#include "QualityMetric.h" + +using namespace XrdAdaptor; + +QualityMetricWatch::QualityMetricWatch(QualityMetric *parent1, QualityMetric *parent2) + : m_parent1(parent1), m_parent2(parent2) +{ + // TODO: just assuming success. + clock_gettime(CLOCK_MONOTONIC, &m_start); +} + +QualityMetricWatch::~QualityMetricWatch() +{ + if (m_parent1 && m_parent2) + { + timespec stop; + clock_gettime(CLOCK_MONOTONIC, &stop); + int ms = 1000*(stop.tv_sec - m_start.tv_sec) + (stop.tv_nsec - m_start.tv_nsec)/1e6; + edm::LogVerbatim("XrdAdaptorInternal") << "Finished timer after " << ms << std::endl; + m_parent1->finishWatch(stop, ms); + m_parent2->finishWatch(stop, ms); + } +} + +QualityMetricWatch::QualityMetricWatch(QualityMetricWatch &&that) +{ + m_parent1 = that.m_parent1; + m_parent2 = that.m_parent2; + m_start = that.m_start; + that.m_parent1 = nullptr; + that.m_parent2 = nullptr; + that.m_start = {0, 0}; +} + +void +QualityMetricWatch::swap(QualityMetricWatch &that) +{ + QualityMetric *tmp; + tmp = that.m_parent1; + that.m_parent1 = m_parent1; + m_parent1 = tmp; + tmp = that.m_parent2; + that.m_parent2 = m_parent2; + m_parent2 = tmp; + timespec tmp2; + tmp2 = that.m_start; + that.m_start = m_start; + m_start = tmp2; +} + + +QualityMetric::QualityMetric(timespec now, int default_value) + : m_value(default_value), + m_interval0_n(0), + m_interval0_val(-1), + m_interval0_start(now.tv_sec), + m_interval1_val(-1), + m_interval2_val(-1), + m_interval3_val(-1), + m_interval4_val(-1) +{ +} + +void +QualityMetric::finishWatch(timespec stop, int ms) +{ + m_value = -1; + if (stop.tv_sec > m_interval0_start+interval_length) + { + m_interval4_val = m_interval3_val; + m_interval3_val = m_interval2_val; + m_interval2_val = m_interval1_val; + m_interval1_val = m_interval0_val; + m_interval0_n = 1; + m_interval0_val = ms; + m_interval0_start = stop.tv_sec; + } + else + { + int num = m_interval0_val * m_interval0_n + ms; + m_interval0_n++; + m_interval0_val = num / m_interval0_n; + } +} + +unsigned +QualityMetric::get() +{ + if (m_value == -1) + { + unsigned den = 0; + m_value = 0; + if (m_interval0_val >= 0) + { + den += 16; + m_value = 16*m_interval0_val; + } + if (m_interval1_val >= 0) + { + den += 8; + m_value += 8*m_interval1_val; + } + if (m_interval2_val >= 0) + { + den += 4; + m_value += 4*m_interval2_val; + } + if (m_interval3_val >= 0) + { + den += 2; + m_value += 2*m_interval3_val; + } + if (m_interval4_val >= 0) + { + den += 1; + m_value += m_interval4_val; + } + if (den) + m_value /= den; + else + m_value = 260; + } + return m_value; +} + +QualityMetricFactory * QualityMetricFactory::m_instance = new QualityMetricFactory(); + +std::unique_ptr +QualityMetricFactory::get(timespec now, const std::string &id) +{ + MetricMap::const_iterator it = m_instance->m_sources.find(id); + QualityMetricUniqueSource *source; + if (it == m_instance->m_sources.end()) + { + source = new QualityMetricUniqueSource(now); + m_instance->m_sources[id] = source; + } + else + { + source = it->second; + } + return source->newSource(now); +} + +QualityMetricSource::QualityMetricSource(QualityMetricUniqueSource &parent, timespec now, int default_value) + : QualityMetric(now, default_value), + m_parent(parent) +{} + +void +QualityMetricSource::startWatch(QualityMetricWatch & watch) +{ + QualityMetricWatch tmp(&m_parent, this); + watch.swap(tmp); +} + +QualityMetricUniqueSource::QualityMetricUniqueSource(timespec now) + : QualityMetric(now) +{} + +std::unique_ptr +QualityMetricUniqueSource::newSource(timespec now) +{ + std::unique_ptr child(new QualityMetricSource(*this, now, get())); + return child; +} + diff --git a/Utilities/XrdAdaptor/src/QualityMetric.h b/Utilities/XrdAdaptor/src/QualityMetric.h new file mode 100644 index 0000000000000..3aa027c8a85df --- /dev/null +++ b/Utilities/XrdAdaptor/src/QualityMetric.h @@ -0,0 +1,103 @@ +#ifndef Utilities_XrdAdaptor_QualityMetric_h +#define Utilities_XrdAdaptor_QualityMetric_h + +#include + +#include +#include + +#include + +namespace XrdAdaptor { + +class QualityMetric; +class QualityMetricSource; +class QualityMetricUniqueSource; + +class QualityMetricWatch : boost::noncopyable { +friend class QualityMetricSource; + +public: + QualityMetricWatch() : m_parent1(nullptr), m_parent2(nullptr) {} + QualityMetricWatch(QualityMetricWatch &&); + ~QualityMetricWatch(); + + void swap(QualityMetricWatch &); + +private: + QualityMetricWatch(QualityMetric *parent1, QualityMetric *parent2); + timespec m_start; + QualityMetric *m_parent1; + QualityMetric *m_parent2; +}; + +class QualityMetric : boost::noncopyable { +friend class QualityMetricWatch; + +public: + QualityMetric(timespec now, int default_value=260); + unsigned get(); + +private: + void finishWatch(timespec now, int ms); + + static const unsigned interval_length = 60; + + int m_value; + int m_interval0_n; + int m_interval0_val; + time_t m_interval0_start; + int m_interval1_val; + int m_interval2_val; + int m_interval3_val; + int m_interval4_val; + +}; + +class QualityMetricFactory { + +friend class Source; + +private: + static + std::unique_ptr get(timespec now, const std::string &id); + + static QualityMetricFactory *m_instance; + + typedef std::unordered_map MetricMap; + MetricMap m_sources; +}; + +/** + * This QM implementation is meant to be held by each XrdAdaptor::Source + * instance + */ +class QualityMetricSource final : public QualityMetric { + +friend class QualityMetricUniqueSource; + +public: + void startWatch(QualityMetricWatch &); + +private: + QualityMetricSource(QualityMetricUniqueSource &parent, timespec now, int default_value); + + QualityMetricUniqueSource &m_parent; +}; + +/* + * This quality metric tracks all accesses to a given source ID. + */ +class QualityMetricUniqueSource final : public QualityMetric { + +friend class QualityMetricFactory; + +private: + QualityMetricUniqueSource(timespec now); + std::unique_ptr newSource(timespec now); +}; + +} + +#endif // Utilities_XrdAdaptor_QualityMetric_h + diff --git a/Utilities/XrdAdaptor/src/XrdFile.cc b/Utilities/XrdAdaptor/src/XrdFile.cc index 6dc41bd72f57e..fce82ca1b1620 100644 --- a/Utilities/XrdAdaptor/src/XrdFile.cc +++ b/Utilities/XrdAdaptor/src/XrdFile.cc @@ -1,48 +1,51 @@ -#include "Utilities/StorageFactory/interface/StatisticsSenderService.h" -#include "FWCore/ServiceRegistry/interface/Service.h" #include "Utilities/XrdAdaptor/src/XrdFile.h" +#include "Utilities/XrdAdaptor/src/XrdRequestManager.h" #include "FWCore/Utilities/interface/EDMException.h" #include "FWCore/MessageLogger/interface/MessageLogger.h" #include "FWCore/Utilities/interface/Likely.h" +#include "FWCore/Utilities/interface/CPUTimer.h" #include #include +#include +#include + +using namespace XrdAdaptor; + +// To be re-enabled when the monitoring interface is back. +//static const char *kCrabJobIdEnv = "CRAB_UNIQUE_JOB_ID"; + +#define XRD_CL_MAX_CHUNK 512*1024 XrdFile::XrdFile (void) - : m_client (0), - m_offset (0), - m_stat(), + : m_offset (0), + m_size(-1), m_close (false), - m_name() + m_name(), + m_op_count(0) { - memset(&m_stat, 0, sizeof (m_stat)); - pthread_mutex_init(&m_readv_mutex, 0); } XrdFile::XrdFile (const char *name, int flags /* = IOFlags::OpenRead */, int perms /* = 066 */) - : m_client (0), - m_offset (0), - m_stat(), + : m_offset (0), + m_size(-1), m_close (false), - m_name() + m_name(), + m_op_count(0) { - memset(&m_stat, 0, sizeof (m_stat)); - pthread_mutex_init(&m_readv_mutex, 0); open (name, flags, perms); } XrdFile::XrdFile (const std::string &name, int flags /* = IOFlags::OpenRead */, int perms /* = 066 */) - : m_client (0), - m_offset (0), - m_stat(), + : m_offset (0), + m_size(-1), m_close (false), - m_name() + m_name(), + m_op_count(0) { - memset(&m_stat, 0, sizeof (m_stat)); - pthread_mutex_init(&m_readv_mutex, 0); open (name.c_str (), flags, perms); } @@ -52,7 +55,6 @@ XrdFile::~XrdFile (void) edm::LogError("XrdFileError") << "Destructor called on XROOTD file '" << m_name << "' but the file is still open"; - pthread_mutex_destroy(&m_readv_mutex); } ////////////////////////////////////////////////////////////////////// @@ -102,19 +104,14 @@ XrdFile::open (const char *name, ex.addContext("Calling XrdFile::open()"); throw ex; } - // If I am already open, close old file first - if (m_client && m_close) - close(); - else - abort(); // Translate our flags to system flags - int openflags = 0; + XrdCl::OpenFlags::Flags openflags = XrdCl::OpenFlags::None; if (flags & IOFlags::OpenWrite) - openflags |= kXR_open_updt; + openflags |= XrdCl::OpenFlags::Update; else if (flags & IOFlags::OpenRead) - openflags |= kXR_open_read; + openflags |= XrdCl::OpenFlags::Read; if (flags & IOFlags::OpenAppend) { edm::Exception ex(edm::errors::FileOpenError); @@ -126,65 +123,76 @@ XrdFile::open (const char *name, if (flags & IOFlags::OpenCreate) { if (! (flags & IOFlags::OpenExclusive)) - openflags |= kXR_delete; - openflags |= kXR_new; - openflags |= kXR_mkpath; + openflags |= XrdCl::OpenFlags::Delete; + openflags |= XrdCl::OpenFlags::New; + openflags |= XrdCl::OpenFlags::MakePath; } if ((flags & IOFlags::OpenTruncate) && (flags & IOFlags::OpenWrite)) - openflags |= kXR_delete; - + openflags |= XrdCl::OpenFlags::Delete; + + // Translate mode flags + XrdCl::Access::Mode modeflags = XrdCl::Access::None; + modeflags |= (perms & S_IRUSR) ? XrdCl::Access::UR : XrdCl::Access::None; + modeflags |= (perms & S_IWUSR) ? XrdCl::Access::UW : XrdCl::Access::None; + modeflags |= (perms & S_IXUSR) ? XrdCl::Access::UX : XrdCl::Access::None; + modeflags |= (perms & S_IRGRP) ? XrdCl::Access::GR : XrdCl::Access::None; + modeflags |= (perms & S_IWGRP) ? XrdCl::Access::GW : XrdCl::Access::None; + modeflags |= (perms & S_IXGRP) ? XrdCl::Access::GX : XrdCl::Access::None; + modeflags |= (perms & S_IROTH) ? XrdCl::Access::GR : XrdCl::Access::None; + modeflags |= (perms & S_IWOTH) ? XrdCl::Access::GW : XrdCl::Access::None; + modeflags |= (perms & S_IXOTH) ? XrdCl::Access::GX : XrdCl::Access::None; + + m_requestmanager.reset(new RequestManager(name, openflags, modeflags)); m_name = name; - m_client = new XrdClient(name); - if (! m_client->Open(perms, openflags) - || m_client->LastServerResp()->status != kXR_ok) { - edm::Exception ex(edm::errors::FileOpenError); - ex << "XrdClient::Open(name='" << name - << "', flags=0x" << std::hex << openflags - << ", permissions=0" << std::oct << perms << std::dec - << ") => error '" << m_client->LastServerError()->errmsg - << "' (errno=" << m_client->LastServerError()->errnum << ")"; - ex.addContext("Calling XrdFile::open()"); - addConnection(ex); - throw ex; - } - if (! m_client->Stat(&m_stat)) { + + // Stat the file so we can keep track of the offset better. + auto file = getActiveFile(); + XrdCl::XRootDStatus status; + XrdCl::StatInfo *statInfo = NULL; + if (! (status = file->Stat(true, statInfo)).IsOK()) { edm::Exception ex(edm::errors::FileOpenError); - ex << "XrdClient::Stat(name='" << name - << ") => error '" << m_client->LastServerError()->errmsg - << "' (errno=" << m_client->LastServerError()->errnum << ")"; + ex << "XrdCl::File::Stat(name='" << name + << ") => error '" << status.ToString() + << "' (errno=" << status.errNo << ", code=" << status.code << ")"; ex.addContext("Calling XrdFile::open()"); addConnection(ex); throw ex; } + assert(statInfo); + m_size = statInfo->GetSize(); + delete(statInfo); + m_offset = 0; m_close = true; // Send the monitoring info, if available. // Note: getenv is not reentrant. - const char * crabJobId = edm::storage::StatisticsSenderService::getJobID(); + // Commenting out until this is available in the new client. +/* + char * crabJobId = getenv(kCrabJobIdEnv); if (crabJobId) { kXR_unt32 dictId; - m_client->SendMonitoringInfo(crabJobId, &dictId); + m_file->SendMonitoringInfo(crabJobId, &dictId); edm::LogInfo("XrdFileInfo") << "Set monitoring ID to " << crabJobId << " with resulting dictId " << dictId << "."; } +*/ edm::LogInfo("XrdFileInfo") << "Opened " << m_name; - XrdClientConn *conn = m_client->GetClientConn(); - edm::LogInfo("XrdFileInfo") << "Connection URL " << conn->GetCurrentUrl().GetUrl().c_str(); - - std::string host = std::string(conn->GetCurrentUrl().Host.c_str()); - edm::Service statsService; - if (statsService.isAvailable()) { - statsService->setCurrentServer(host); - } + std::vector sources; + m_requestmanager->getActiveSourceNames(sources); + std::stringstream ss; + ss << "Active sources: "; + for (auto const& it : sources) + ss << it << ", "; + edm::LogInfo("XrdFileInfo") << ss.str(); } void XrdFile::close (void) { - if (! m_client) + if (! m_requestmanager.get()) { edm::LogError("XrdFileError") << "XrdFile::close(name='" << m_name @@ -193,28 +201,21 @@ XrdFile::close (void) return; } - if (! m_client->Close()) - edm::LogWarning("XrdFileWarning") - << "XrdFile::close(name='" << m_name - << "') failed with error '" << m_client->LastServerError()->errmsg - << "' (errno=" << m_client->LastServerError()->errnum << ")"; - delete m_client; - m_client = 0; + m_requestmanager.reset(); m_close = false; m_offset = 0; - memset(&m_stat, 0, sizeof (m_stat)); + m_size = -1; edm::LogInfo("XrdFileInfo") << "Closed " << m_name; } void XrdFile::abort (void) { - delete m_client; - m_client = 0; + m_requestmanager.reset(nullptr); m_close = false; m_offset = 0; - memset(&m_stat, 0, sizeof (m_stat)); + m_size = -1; } ////////////////////////////////////////////////////////////////////// @@ -229,19 +230,10 @@ XrdFile::read (void *into, IOSize n) addConnection(ex); throw ex; } - int s = m_client->Read(into, m_offset, n); - if (s < 0) { - edm::Exception ex(edm::errors::FileReadError); - ex << "XrdClient::Read(name='" << m_name - << "', offset=" << m_offset << ", n=" << n - << ") failed with error '" << m_client->LastServerError()->errmsg - << "' (errno=" << m_client->LastServerError()->errnum << ")"; - ex.addContext("Calling XrdFile::read()"); - addConnection(ex); - throw ex; - } - m_offset += s; - return s; + + uint32_t bytesRead = m_requestmanager->handle(into, n, m_offset).get(); + m_offset += bytesRead; + return bytesRead; } IOSize @@ -255,18 +247,83 @@ XrdFile::read (void *into, IOSize n, IOOffset pos) addConnection(ex); throw ex; } - int s = m_client->Read(into, pos, n); - if (s < 0) { - edm::Exception ex(edm::errors::FileReadError); - ex << "XrdClient::Read(name='" << m_name - << "', offset=" << m_offset << ", n=" << n - << ") failed with error '" << m_client->LastServerError()->errmsg - << "' (errno=" << m_client->LastServerError()->errnum << ")"; - ex.addContext("Calling XrdFile::read()"); - addConnection(ex); - throw ex; + + uint32_t bytesRead = m_requestmanager->handle(into, n, pos).get(); + + return bytesRead; +} + +// This method is rarely used by CMS; hence, it is a small wrapper and not efficient. +IOSize +XrdFile::readv (IOBuffer *into, IOSize n) +{ + std::vector new_buf; + new_buf.reserve(n); + IOOffset off = 0; + for (IOSize i=0; i >cl(new std::vector); + cl->reserve(n); + + IOSize size = 0; + for (IOSize i=0; i(into[i].data()); + while (length > XRD_CL_MAX_CHUNK) { + IOPosBuffer ci; + ci.set_size(XRD_CL_MAX_CHUNK); + length -= XRD_CL_MAX_CHUNK; + ci.set_offset(offset); + offset += XRD_CL_MAX_CHUNK; + ci.set_data(buffer); + buffer += XRD_CL_MAX_CHUNK; + cl->emplace_back(ci); + } + IOPosBuffer ci; + ci.set_size(length); + ci.set_offset(offset); + ci.set_data(buffer); + cl->emplace_back(ci); + } + edm::CPUTimer timer; + timer.start(); + IOSize result; + try + { + result = m_requestmanager->handle(cl).get(); } - return s; + catch (edm::Exception& ex) + { + ex.addContext("Calling XrdFile::readv()"); + throw; + } + timer.stop(); + assert(result == size); + edm::LogVerbatim("XrdAdaptorInternal") << "[" << m_op_count.fetch_add(1) << "] Time for readv: " << static_cast(1000*timer.realTime()) << std::endl; + return result; } IOSize @@ -280,21 +337,24 @@ XrdFile::write (const void *from, IOSize n) addConnection(ex); throw ex; } - ssize_t s = m_client->Write(from, m_offset, n); - if (s < 0) { + auto file = getActiveFile(); + + XrdCl::XRootDStatus s = file->Write(m_offset, n, from); + if (!s.IsOK()) { cms::Exception ex("FileWriteError"); ex << "XrdFile::write(name='" << m_name << "', n=" << n - << ") failed with error '" << m_client->LastServerError()->errmsg - << "' (errno=" << m_client->LastServerError()->errnum << ")"; + << ") failed with error '" << s.ToString() + << "' (errno=" << s.errNo << ", code=" << s.code << ")"; ex.addContext("Calling XrdFile::write()"); addConnection(ex); throw ex; } - m_offset += s; - if (m_offset > m_stat.size) - m_stat.size = m_offset; + m_offset += n; + assert(m_size != -1); + if (m_offset > m_size) + m_size = m_offset; - return s; + return n; } IOSize @@ -308,42 +368,31 @@ XrdFile::write (const void *from, IOSize n, IOOffset pos) addConnection(ex); throw ex; } - ssize_t s = m_client->Write(from, pos, n); - if (s < 0) { + auto file = getActiveFile(); + + XrdCl::XRootDStatus s = file->Write(pos, n, from); + if (!s.IsOK()) { cms::Exception ex("FileWriteError"); ex << "XrdFile::write(name='" << m_name << "', n=" << n - << ") failed with error '" << m_client->LastServerError()->errmsg - << "' (errno=" << m_client->LastServerError()->errnum << ")"; + << ") failed with error '" << s.ToString() + << "' (errno=" << s.errNo << ", code=" << s.code << ")"; ex.addContext("Calling XrdFile::write()"); addConnection(ex); throw ex; } - if (pos + s > m_stat.size) - m_stat.size = pos + s; + assert (m_size != -1); + if (static_cast(pos + n) > m_size) + m_size = pos + n; - return s; + return n; } bool XrdFile::prefetch (const IOPosBuffer *what, IOSize n) { - // Detect a prefetch support probe, and claim we don't support it. - // This will make the default application-only mode, but allows us to still - // effectively support storage-only mode. - if (unlikely((n == 1) && (what[0].offset() == 0) && (what[0].size() == PREFETCH_PROBE_LENGTH))) { - return false; - } - std::vector offsets; offsets.resize(n); - std::vector lens; lens.resize(n); - kXR_int64 total = 0; - for (IOSize i = 0; i < n; ++i) { - offsets[i] = what[i].offset(); - lens[i] = what[i].size(); - total += what[i].size(); - } - - kXR_int64 r = m_client->ReadV(NULL, &offsets[0], &lens[0], n); - return r == total; + // The new Xrootd client does not contain any internal buffers. + // Hence, prefetching is disabled completely. + return false; } ////////////////////////////////////////////////////////////////////// @@ -352,7 +401,7 @@ XrdFile::prefetch (const IOPosBuffer *what, IOSize n) IOOffset XrdFile::position (IOOffset offset, Relative whence /* = SET */) { - if (! m_client) { + if (! m_requestmanager.get()) { cms::Exception ex("FilePositionError"); ex << "XrdFile::position() called on a closed file"; ex.addContext("Calling XrdFile::position()"); @@ -369,8 +418,10 @@ XrdFile::position (IOOffset offset, Relative whence /* = SET */) m_offset += offset; break; + // TODO: None of this works with concurrent writers to the file. case END: - m_offset = m_stat.size + offset; + assert(m_size != -1); + m_offset = m_size + offset; break; default: @@ -383,8 +434,9 @@ XrdFile::position (IOOffset offset, Relative whence /* = SET */) if (m_offset < 0) m_offset = 0; - if (m_offset > m_stat.size) - m_stat.size = m_offset; + assert(m_size != -1); + if (m_offset > m_size) + m_size = m_offset; return m_offset; } @@ -399,14 +451,27 @@ XrdFile::resize (IOOffset /* size */) throw ex; } +std::shared_ptr +XrdFile::getActiveFile (void) +{ + if (!m_requestmanager.get()) + { + cms::Exception ex("XrdFileLogicError"); + ex << "Xrd::getActiveFile(name='" << m_name << "') no active request manager"; + ex.addContext("Calling XrdFile::getActiveFile()"); + m_requestmanager->addConnections(ex); + m_close = false; + throw ex; + } + return m_requestmanager->getActiveFile(); +} + void XrdFile::addConnection (cms::Exception &ex) { - XrdClientConn *conn = m_client->GetClientConn(); - if (conn) { - std::stringstream ss; - ss << "Current server connection: " << conn->GetCurrentUrl().GetUrl().c_str(); - ex.addAdditionalInfo(ss.str()); + if (m_requestmanager.get()) + { + m_requestmanager->addConnections(ex); } } diff --git a/Utilities/XrdAdaptor/src/XrdFile.h b/Utilities/XrdAdaptor/src/XrdFile.h index 2bcdcf7e8dbbc..e3bcc61650969 100644 --- a/Utilities/XrdAdaptor/src/XrdFile.h +++ b/Utilities/XrdAdaptor/src/XrdFile.h @@ -4,9 +4,14 @@ # include "Utilities/StorageFactory/interface/Storage.h" # include "Utilities/StorageFactory/interface/IOFlags.h" # include "FWCore/Utilities/interface/Exception.h" -# include "XrdClient/XrdClient.hh" +# include "XrdCl/XrdClFile.hh" # include -# include +# include +# include + +namespace XrdAdaptor { +class RequestManager; +} class XrdFile : public Storage { @@ -53,19 +58,18 @@ class XrdFile : public Storage void addConnection(cms::Exception &); - // "Real" implementation of readv that interacts directly with Xrootd. - IOSize readv_send(char **result_buffer, readahead_list &read_chunk_list, IOSize n, IOSize total_len); - IOSize readv_unpack(char **result_buffer, std::vector &res_buf, IOSize datalen, readahead_list &read_chunk_list, IOSize n); - - - XrdClient *m_client; - IOOffset m_offset; - XrdClientStatInfo m_stat; - bool m_close; - std::string m_name; + /** + * Returns a file handle from one of the active sources. + * Verifies the file is open and throws an exception as necessary. + */ + std::shared_ptr getActiveFile(); - // We could do away with this.. if not for the race condition with LastServerResp in XrdReadv. - pthread_mutex_t m_readv_mutex; + std::unique_ptr m_requestmanager; + IOOffset m_offset; + IOOffset m_size; + bool m_close; + std::string m_name; + std::atomic m_op_count; }; diff --git a/Utilities/XrdAdaptor/src/XrdRequest.cc b/Utilities/XrdAdaptor/src/XrdRequest.cc new file mode 100644 index 0000000000000..a55c928b7840d --- /dev/null +++ b/Utilities/XrdAdaptor/src/XrdRequest.cc @@ -0,0 +1,81 @@ + +#include + +#include "FWCore/MessageLogger/interface/MessageLogger.h" + +#include "XrdRequest.h" +#include "XrdRequestManager.h" + +using namespace XrdAdaptor; + +// If you define XRD_FAKE_ERROR, 1/5 read requests should fail. +#ifdef XRD_FAKE_ERROR +#define FAKE_ERROR_COUNTER 5 +int g_fakeError = 0; +#else +#define FAKE_ERROR_COUNTER 0 +int g_fakeError = 0; +#endif + +XrdAdaptor::ClientRequest::~ClientRequest() {} + +void +XrdAdaptor::ClientRequest::HandleResponse(XrdCl::XRootDStatus *stat, XrdCl::AnyObject *resp) +{ + std::unique_ptr response(resp); + std::unique_ptr status(stat); + { + QualityMetricWatch qmw; + m_qmw.swap(qmw); + } + if ((!FAKE_ERROR_COUNTER || ((++g_fakeError % FAKE_ERROR_COUNTER) != 0)) && (status->IsOK() && resp)) + { + if (m_into) + { + XrdCl::ChunkInfo *read_info; + response->Get(read_info); + m_promise.set_value(read_info->length); + } + else + { + XrdCl::VectorReadInfo *read_info; + response->Get(read_info); + m_promise.set_value(read_info->GetSize()); + } + } + else + { + Source *source = m_source.get(); + edm::LogWarning("XrdAdaptorInternal") << "XrdRequestManager::handle(name='" + << m_manager.getFilename() << ") failure when reading from " + << (source ? source->ID() : "(unknown source)") + << "; failed with error '" << status->ToString() << "' (errno=" + << status->errNo << ", code=" << status->code << ")."; + m_failure_count++; + try + { + m_manager.requestFailure(m_self_reference); + return; + } + catch (edm::Exception& ex) + { + ex.addContext("In XrdAdaptor::ClientRequest::HandleResponse() case for failure"); + //m_promise.set_exception(std::make_exception_ptr(ex)); + m_promise.set_exception(std::current_exception()); + } + catch (...) + { + edm::Exception ex(edm::errors::FileReadError); + ex << "XrdRequestManager::handle(name='" << m_manager.getFilename() + << ") failed with error '" << status->ToString() + << "' (errno=" << status->errNo << ", code=" + << status->code << "). Unknown exception occurred when running" + << " connection recovery."; + ex.addContext("Calling XrdRequestManager::handle()"); + m_manager.addConnections(ex); + m_promise.set_exception(std::make_exception_ptr(ex)); + } + } + m_self_reference = nullptr; +} + diff --git a/Utilities/XrdAdaptor/src/XrdRequest.h b/Utilities/XrdAdaptor/src/XrdRequest.h new file mode 100644 index 0000000000000..60c32e21805f8 --- /dev/null +++ b/Utilities/XrdAdaptor/src/XrdRequest.h @@ -0,0 +1,90 @@ +#ifndef Utilities_XrdAdaptor_XrdRequest_h +#define Utilities_XrdAdaptor_XrdRequest_h + +#include +#include + +#include +#include + +#include "Utilities/StorageFactory/interface/Storage.h" + +#include "QualityMetric.h" + +namespace XrdAdaptor { + +class Source; + +class RequestManager; + +class ClientRequest : boost::noncopyable, public XrdCl::ResponseHandler { + +friend class Source; + +public: + + ClientRequest(RequestManager &manager, void *into, IOSize size, IOOffset off) + : m_failure_count(0), + m_into(into), + m_size(size), + m_off(off), + m_iolist(nullptr), + m_manager(manager) + { + } + + ClientRequest(RequestManager &manager, std::shared_ptr > iolist) + : m_failure_count(0), + m_into(nullptr), + m_size(0), + m_off(0), + m_iolist(iolist), + m_manager(manager) + { + // TODO: calculate size here. + } + + virtual ~ClientRequest(); + + std::future get_future() + { + return m_promise.get_future(); + } + + /** + * Handle the response from the Xrootd server. + */ + virtual void HandleResponse(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response) override; + + IOSize getSize() const {return m_size;} + + /** + * Returns a pointer to the current source; may be nullptr + * if there is no outstanding IO + */ + std::shared_ptr getCurrentSource() const {return m_source;} + +private: + unsigned m_failure_count; + void *m_into; + IOSize m_size; + IOOffset m_off; + std::shared_ptr > m_iolist; + RequestManager &m_manager; + std::shared_ptr m_source; + + // Some explanation is due here. When an IO is outstanding, + // Xrootd takes a raw pointer to this object. Hence we cannot + // allow it to go out of scope until some indeterminate time in the + // future. So, while the IO is outstanding, we take a reference to + // ourself to prevent the object from being unexpectedly deleted. + std::shared_ptr m_self_reference; + + std::promise m_promise; + + QualityMetricWatch m_qmw; +}; + +} + +#endif diff --git a/Utilities/XrdAdaptor/src/XrdRequestManager.cc b/Utilities/XrdAdaptor/src/XrdRequestManager.cc new file mode 100644 index 0000000000000..bfc2643a48318 --- /dev/null +++ b/Utilities/XrdAdaptor/src/XrdRequestManager.cc @@ -0,0 +1,595 @@ + +#include +#include +#include + +#include "XrdCl/XrdClFile.hh" + +#include "FWCore/Utilities/interface/CPUTimer.h" +#include "FWCore/Utilities/interface/EDMException.h" +#include "FWCore/MessageLogger/interface/MessageLogger.h" + +#include "Utilities/XrdAdaptor/src/XrdRequestManager.h" + +#define XRD_CL_MAX_CHUNK 512*1024 + +#define XRD_ADAPTOR_SHORT_OPEN_DELAY 5 + +#ifdef XRD_FAKE_OPEN_PROBE +#define XRD_ADAPTOR_OPEN_PROBE_PERCENT 100 +#define XRD_ADAPTOR_LONG_OPEN_DELAY 20 +// This is the minimal difference in quality required to swap an active and inactive source +#define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE 0 +#else +#define XRD_ADAPTOR_OPEN_PROBE_PERCENT 10 +#define XRD_ADAPTOR_LONG_OPEN_DELAY 2*60 +#define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE 100 +#endif + +using namespace XrdAdaptor; + +long long timeDiffMS(const timespec &a, const timespec &b) +{ + long long diff = (a.tv_sec - b.tv_sec) * 1000; + diff += (a.tv_nsec - b.tv_nsec) / 1e6; + return diff; +} + +RequestManager::RequestManager(const std::string &filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms) + : m_nextInitialSourceToggle(false), + m_name(filename), + m_flags(flags), + m_perms(perms), + m_distribution(0,100), + m_open_handler(*this) +{ + std::unique_ptr file(new XrdCl::File()); + XrdCl::XRootDStatus status; + if (! (status = file->Open(filename, flags, perms)).IsOK()) + { + edm::Exception ex(edm::errors::FileOpenError); + ex << "XrdCl::File::Open(name='" << filename + << "', flags=0x" << std::hex << flags + << ", permissions=0" << std::oct << perms << std::dec + << ") => error '" << status.ToStr() + << "' (errno=" << status.errNo << ", code=" << status.code << ")"; + ex.addContext("Calling XrdFile::open()"); + addConnections(ex); + throw ex; + } + + timespec ts; + clock_gettime(CLOCK_MONOTONIC, &ts); + + std::shared_ptr source(new Source(ts, std::move(file))); + { + std::lock_guard sentry(m_source_mutex); + m_activeSources.push_back(source); + } + + m_lastSourceCheck = ts; + ts.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY; + m_nextActiveSourceCheck = ts; +} + +RequestManager::~RequestManager() +{} + +void +RequestManager::checkSources(timespec &now, IOSize requestSize) +{ + edm::LogVerbatim("XrdAdaptorInternal") << "Time since last check " + << timeDiffMS(now, m_lastSourceCheck) << "; last check " + << m_lastSourceCheck.tv_sec << "; now " < 1000 && timeDiffMS(now, m_nextActiveSourceCheck) > 0) + { + checkSourcesImpl(now, requestSize); + } +} + +void +RequestManager::checkSourcesImpl(timespec &now, IOSize requestSize) +{ + std::lock_guard sentry(m_source_mutex); + + bool findNewSource = false; + if (m_activeSources.size() <= 1) + findNewSource = true; + else if (m_activeSources.size() > 1) + { + edm::LogVerbatim("XrdAdaptorInternal") << "Source 0 quality " << m_activeSources[0]->getQuality() << ", source 1 quality " << m_activeSources[1]->getQuality() << std::endl; + if ((m_activeSources[0]->getQuality() > 5130) || + ((m_activeSources[0]->getQuality() > 260) && (m_activeSources[1]->getQuality()*4 < m_activeSources[0]->getQuality()))) + { + edm::LogWarning("XrdAdaptorInternal") << "Removing " + << m_activeSources[0]->ID() << " from active sources due to poor quality (" + << m_activeSources[0]->getQuality() << ")" << std::endl; + if (m_activeSources[0]->getLastDowngrade().tv_sec != 0) findNewSource = true; + m_activeSources[0]->setLastDowngrade(now); + m_inactiveSources.emplace_back(m_activeSources[0]); + m_activeSources.erase(m_activeSources.begin()); + } + else if ((m_activeSources[1]->getQuality() > 5130) || + ((m_activeSources[1]->getQuality() > 260) && (m_activeSources[0]->getQuality()*4 < m_activeSources[1]->getQuality()))) + { + edm::LogWarning("XrdAdaptorInternal") << "Removing " + << m_activeSources[1]->ID() << " from active sources due to poor quality (" + << m_activeSources[1]->getQuality() << ")" << std::endl; + if (m_activeSources[1]->getLastDowngrade().tv_sec != 0) findNewSource = true; + m_activeSources[1]->setLastDowngrade(now); + m_inactiveSources.emplace_back(m_activeSources[1]); + m_activeSources.erase(m_activeSources.begin()+1); + } + // NOTE: We could probably replace the copy with a better sort function at the cost of mental capacity. + std::vector > eligibleInactiveSources; eligibleInactiveSources.reserve(m_inactiveSources.size()); + for (const auto & source : m_inactiveSources) if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_SHORT_OPEN_DELAY-1)*1000) eligibleInactiveSources.push_back(source); + //for (const auto & source : m_inactiveSources) eligibleInactiveSources.push_back(source); + std::vector >::iterator bestInactiveSource = std::min_element(eligibleInactiveSources.begin(), eligibleInactiveSources.end(), + [](const std::shared_ptr &s1, const std::shared_ptr &s2) {return s1->getQuality() < s2->getQuality();}); + std::vector >::iterator worstActiveSource = std::max_element(m_activeSources.begin(), m_activeSources.end(), + [](const std::shared_ptr &s1, const std::shared_ptr &s2) {return s1->getQuality() < s2->getQuality();}); + if (bestInactiveSource != eligibleInactiveSources.end() && bestInactiveSource->get()) + { + edm::LogVerbatim("XrdAdaptorInternal") << "Best inactive source: " <<(*bestInactiveSource)->ID() + << ", quality " << (*bestInactiveSource)->getQuality(); + } + edm::LogVerbatim("XrdAdaptorInternal") << "Worst active source: " <<(*worstActiveSource)->ID() + << ", quality " << (*worstActiveSource)->getQuality(); + if ((bestInactiveSource != eligibleInactiveSources.end()) && m_activeSources.size() == 1) + { + m_activeSources.push_back(*bestInactiveSource); + for (auto it = m_inactiveSources.begin(); it != m_inactiveSources.end(); it++) if (it->get() == bestInactiveSource->get()) {m_inactiveSources.erase(it); break;} + } + else while ((bestInactiveSource != eligibleInactiveSources.end()) && (*worstActiveSource)->getQuality() > (*bestInactiveSource)->getQuality()+XRD_ADAPTOR_SOURCE_QUALITY_FUDGE) + { + edm::LogVerbatim("XrdAdaptorInternal") << "Removing " << (*worstActiveSource)->ID() + << " from active sources due to quality (" << (*worstActiveSource)->getQuality() + << ") and promoting " << (*bestInactiveSource)->ID() << " (quality: " + << (*bestInactiveSource)->getQuality() << ")" << std::endl; + (*worstActiveSource)->setLastDowngrade(now); + for (auto it = m_inactiveSources.begin(); it != m_inactiveSources.end(); it++) if (it->get() == bestInactiveSource->get()) {m_inactiveSources.erase(it); break;} + m_inactiveSources.emplace_back(std::move(*worstActiveSource)); + m_activeSources.erase(worstActiveSource); + m_activeSources.emplace_back(std::move(*bestInactiveSource)); + eligibleInactiveSources.clear(); + for (const auto & source : m_inactiveSources) if (timeDiffMS(now, source->getLastDowngrade()) > (XRD_ADAPTOR_LONG_OPEN_DELAY-1)*1000) eligibleInactiveSources.push_back(source); + bestInactiveSource = std::min_element(eligibleInactiveSources.begin(), eligibleInactiveSources.end(), + [](const std::shared_ptr &s1, const std::shared_ptr &s2) {return s1->getQuality() < s2->getQuality();}); + worstActiveSource = std::max_element(m_activeSources.begin(), m_activeSources.end(), + [](const std::shared_ptr &s1, const std::shared_ptr &s2) {return s1->getQuality() < s2->getQuality();}); + } + if (!findNewSource && (timeDiffMS(now, m_lastSourceCheck) > 1000*XRD_ADAPTOR_LONG_OPEN_DELAY)) + { + float r = m_distribution(m_generator); + if (r < XRD_ADAPTOR_OPEN_PROBE_PERCENT) + { + findNewSource = true; + } + } + } + if (findNewSource) + { + m_open_handler.open(); + m_lastSourceCheck = now; + } + + now.tv_sec += XRD_ADAPTOR_SHORT_OPEN_DELAY; + m_nextActiveSourceCheck = now; +} + +std::shared_ptr +RequestManager::getActiveFile() +{ + std::lock_guard sentry(m_source_mutex); + return m_activeSources[0]->getFileHandle(); +} + +void +RequestManager::getActiveSourceNames(std::vector & sources) +{ + std::lock_guard sentry(m_source_mutex); + sources.reserve(m_activeSources.size()); + for (auto const& source : m_activeSources) { + sources.push_back(source->ID()); + } +} + +void +RequestManager::getDisabledSourceNames(std::vector & sources) +{ + std::lock_guard sentry(m_source_mutex); + sources.reserve(m_disabledSourceStrings.size()); + for (auto const& source : m_disabledSourceStrings) { + sources.push_back(source); + } +} + +void +RequestManager::addConnections(cms::Exception &ex) +{ + std::vector sources; + getActiveSourceNames(sources); + for (auto const& source : sources) + { + ex.addAdditionalInfo("Active source: " + source); + } + sources.clear(); + getDisabledSourceNames(sources); + for (auto const& source : sources) + { + ex.addAdditionalInfo("Disabled source: " + source); + } +} + +std::future +RequestManager::handle(std::shared_ptr c_ptr) +{ + assert(c_ptr.get()); + timespec now; + clock_gettime(CLOCK_MONOTONIC, &now); + checkSources(now, c_ptr->getSize()); + + std::shared_ptr source = nullptr; + { + std::lock_guard sentry(m_source_mutex); + if (m_activeSources.size() == 2) + { + if (m_nextInitialSourceToggle) + { + source = m_activeSources[0]; + m_nextInitialSourceToggle = false; + } + else + { + source = m_activeSources[1]; + m_nextInitialSourceToggle = true; + } + } + else + { + source = m_activeSources[0]; + } + } + source->handle(c_ptr); + return c_ptr->get_future(); +} + +std::string +RequestManager::prepareOpaqueString() +{ + std::lock_guard sentry(m_source_mutex); + std::stringstream ss; + ss << "?tried="; + size_t count = 0; + for ( const auto & it : m_activeSources ) + { + count++; + ss << it->ID().substr(0, it->ID().find(":")) << ","; + } + for ( const auto & it : m_inactiveSources ) + { + count++; + ss << it->ID().substr(0, it->ID().find(":")) << ","; + } + for ( const auto & it : m_disabledSourceStrings ) + { + count++; + ss << it.substr(0, it.find(":")) << ","; + } + if (count) + { + std::string tmp_str = ss.str(); + return tmp_str.substr(0, tmp_str.size()-1); + } + return ""; +} + +void +XrdAdaptor::RequestManager::handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr source) +{ + std::lock_guard sentry(m_source_mutex); + if (status.IsOK()) + { + edm::LogVerbatim("XrdAdaptorInternal") << "Successfully opened new source: " << source->ID() << std::endl; + + if (m_activeSources.size() < 2) + { + m_activeSources.push_back(source); + } + else + { + m_inactiveSources.push_back(source); + } + } + else + { // File-open failure - wait at least 120s before next attempt. + edm::LogVerbatim("XrdAdaptorInternal") << "Got failure when trying to open a new source" << std::endl; + m_nextActiveSourceCheck.tv_sec += XRD_ADAPTOR_LONG_OPEN_DELAY - XRD_ADAPTOR_SHORT_OPEN_DELAY; + } +} + +std::future +XrdAdaptor::RequestManager::handle(std::shared_ptr > iolist) +{ + std::lock_guard sentry(m_source_mutex); + + timespec now; + clock_gettime(CLOCK_MONOTONIC, &now); + + edm::CPUTimer timer; + timer.start(); + + assert(m_activeSources.size()); + if (m_activeSources.size() == 1) + { + std::shared_ptr c_ptr(new XrdAdaptor::ClientRequest(*this, iolist)); + checkSources(now, c_ptr->getSize()); + m_activeSources[0]->handle(c_ptr); + return c_ptr->get_future(); + } + + assert(iolist.get()); + std::shared_ptr > req1(new std::vector); + std::shared_ptr > req2(new std::vector); + splitClientRequest(*iolist, *req1, *req2); + + checkSources(now, req1->size() + req2->size()); + // CheckSources may have removed a source + if (m_activeSources.size() == 1) + { + std::shared_ptr c_ptr(new XrdAdaptor::ClientRequest(*this, iolist)); + m_activeSources[0]->handle(c_ptr); + return c_ptr->get_future(); + } + + std::shared_ptr c_ptr1, c_ptr2; + std::future future1, future2; + if (req1->size()) + { + c_ptr1.reset(new XrdAdaptor::ClientRequest(*this, req1)); + m_activeSources[0]->handle(c_ptr1); + future1 = c_ptr1->get_future(); + } + if (req2->size()) + { + c_ptr2.reset(new XrdAdaptor::ClientRequest(*this, req2)); + m_activeSources[1]->handle(c_ptr2); + future2 = c_ptr2->get_future(); + } + if (req1->size() && req2->size()) + { + std::future task = std::async(std::launch::deferred, + [](std::future a, std::future b){ + return b.get() + a.get(); + }, + std::move(future1), + std::move(future2)); + timer.stop(); + //edm::LogVerbatim("XrdAdaptorInternal") << "Total time to create requests " << static_cast(1000*timer.realTime()) << std::endl; + return task; + } + if (req1->size()) return future1; + if (req2->size()) return future2; + + std::promise p; p.set_value(0); + return p.get_future(); +} + +void +RequestManager::requestFailure(std::shared_ptr c_ptr) +{ + std::unique_lock sentry(m_source_mutex); + std::shared_ptr source_ptr = c_ptr->getCurrentSource(); + + // Note that we do not delete the Source itself. That is because this + // function may be called from within XrdCl::ResponseHandler::HandleResponseWithHosts + // In such a case, if you close a file in the handler, it will deadlock + m_disabledSourceStrings.insert(source_ptr->ID()); + m_disabledSources.insert(source_ptr); + + if ((m_activeSources.size() > 0) && (m_activeSources[0].get() == source_ptr.get())) + { + m_activeSources.erase(m_activeSources.begin()); + } + else if ((m_activeSources.size() > 1) && (m_activeSources[1].get() == source_ptr.get())) + { + m_activeSources.erase(m_activeSources.begin()+1); + } + std::shared_ptr new_source; + if (m_activeSources.size() == 0) + { + std::shared_future > future = m_open_handler.open(); + timespec now; + clock_gettime(CLOCK_MONOTONIC, &now); + m_lastSourceCheck = now; + // Note we only wait for 60 seconds here. This is because we've already failed + // once and the likelihood the program has some inconsistent state is decent. + // We'd much rather fail hard than deadlock! + sentry.unlock(); + std::future_status status = future.wait_for(std::chrono::seconds(60)); + if (status == std::future_status::timeout) + { + edm::Exception ex(edm::errors::FileOpenError); + ex << "XrdCl::File::Open(name='" << m_name + << "', flags=0x" << std::hex << m_flags + << ", permissions=0" << std::oct << m_perms << std::dec + << ", old source=" << source_ptr->ID() + << ") => timeout when waiting for file open"; + ex.addContext("In XrdAdaptor::RequestManager::requestFailure()"); + addConnections(ex); + } + else + { + try + { + new_source = future.get(); + } + catch (edm::Exception &ex) + { + ex.addContext("Handling XrdAdaptor::RequestManager::requestFailure()"); + throw; + } + } + sentry.lock(); + } + else + { + new_source = m_activeSources[0]; + } + new_source->handle(c_ptr); +} + +static void +consumeChunkFront(size_t &front, std::vector &input, std::vector &output, IOSize chunksize) +{ + while ((chunksize > 0) && (front < input.size())) + { + IOPosBuffer &io = input[front]; + if (io.size() > chunksize) + { + IOSize newsize = io.size() - chunksize; + IOOffset newoffset = io.offset() + chunksize; + void* newdata = static_cast(io.data()) + chunksize; + output.emplace_back(IOPosBuffer(io.offset(), io.data(), chunksize)); + io.set_offset(newoffset); + io.set_data(newdata); + io.set_size(newsize); + chunksize = 0; + } + else + { + output.push_back(io); + chunksize -= io.size(); + front++; + } + } +} + +static void +consumeChunkBack(size_t front, std::vector &input, std::vector &output, IOSize chunksize) +{ + while ((chunksize > 0) && (front < input.size())) + { + IOPosBuffer &io = input.back(); + if (io.size() > chunksize) + { + IOSize newsize = io.size() - chunksize; + IOOffset newoffset = io.offset() + chunksize; + void* newdata = static_cast(io.data()) + chunksize; + output.emplace_back(IOPosBuffer(io.offset(), io.data(), chunksize)); + io.set_offset(newoffset); + io.set_data(newdata); + io.set_size(newsize); + chunksize = 0; + } + else + { + output.push_back(io); + chunksize -= io.size(); + input.pop_back(); + } + } +} + +void +XrdAdaptor::RequestManager::splitClientRequest(const std::vector &iolist, std::vector &req1, std::vector &req2) +{ + if (iolist.size() == 0) return; + std::vector tmp_iolist(iolist.begin(), iolist.end()); + req1.reserve(iolist.size()/2+1); + req2.reserve(iolist.size()/2+1); + size_t front=0; + + float q1 = static_cast(m_activeSources[0]->getQuality()); + float q2 = static_cast(m_activeSources[1]->getQuality()); + IOSize chunk1, chunk2; + chunk1 = static_cast(XRD_CL_MAX_CHUNK)*(q2/(q1+q2)); + chunk2 = static_cast(XRD_CL_MAX_CHUNK)*(q1/(q1+q2)); + + while (tmp_iolist.size()-front > 0) + { + consumeChunkFront(front, tmp_iolist, req1, chunk1); + consumeChunkBack(front, tmp_iolist, req2, chunk2); + } + + IOSize size1 = 0, size2 = 0, size_orig = 0; + for (const auto & it : iolist) size_orig += it.size(); + for (const auto & it : req1) size1 += it.size(); + for (const auto & it : req2) size2 += it.size(); + + assert(size_orig == size1 + size2); + + edm::LogVerbatim("XrdAdaptorInternal") << "Original request size " << iolist.size() << " (" << size_orig << " bytes) split into requests size " << req1.size() << " (" << size1 << " bytes) and " << req2.size() << " (" << size2 << " bytes)" << std::endl; +} + +XrdAdaptor::RequestManager::OpenHandler::OpenHandler(RequestManager & manager) + : m_manager(manager) +{ +} + +void +XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList) +{ + std::lock_guard sentry(m_mutex); + if (status->IsOK()) + { + timespec now; + clock_gettime(CLOCK_MONOTONIC, &now); + std::shared_ptr source(new Source(now, std::move(m_file))); + m_promise.set_value(source); + m_manager.handleOpen(*status, source); + } + else + { + m_file.reset(); + std::shared_ptr emptySource; + edm::Exception ex(edm::errors::FileOpenError); + ex << "XrdCl::File::Open(name='" << m_manager.m_name + << "', flags=0x" << std::hex << m_manager.m_flags + << ", permissions=0" << std::oct << m_manager.m_perms << std::dec + << ") => error '" << status->ToStr() + << "' (errno=" << status->errNo << ", code=" << status->code << ")"; + ex.addContext("In XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts()"); + m_manager.addConnections(ex); + + m_promise.set_exception(std::make_exception_ptr(ex)); + m_manager.handleOpen(*status, emptySource); + } + delete status; + delete hostList; +} + +std::shared_future > +XrdAdaptor::RequestManager::OpenHandler::open() +{ + std::lock_guard sentry(m_mutex); + + if (m_file.get()) + { + return m_shared_future; + } + std::promise > new_promise; + m_promise.swap(new_promise); + m_shared_future = m_promise.get_future().share(); + + auto opaque = m_manager.prepareOpaqueString(); + std::string new_name = m_manager.m_name + opaque; + edm::LogVerbatim("XrdAdaptorInternal") << "Trying to open URL: " << new_name; + m_file.reset(new XrdCl::File()); + XrdCl::XRootDStatus status; + if (!(status = m_file->Open(new_name, m_manager.m_flags, m_manager.m_perms, this)).IsOK()) + { + edm::Exception ex(edm::errors::FileOpenError); + ex << "XrdCl::File::Open(name='" << new_name + << "', flags=0x" << std::hex << m_manager.m_flags + << ", permissions=0" << std::oct << m_manager.m_perms << std::dec + << ") => error '" << status.ToStr() + << "' (errno=" << status.errNo << ", code=" << status.code << ")"; + ex.addContext("Calling XrdAdaptor::RequestManager::OpenHandler::open()"); + m_manager.addConnections(ex); + throw ex; + } + return m_shared_future; +} + diff --git a/Utilities/XrdAdaptor/src/XrdRequestManager.h b/Utilities/XrdAdaptor/src/XrdRequestManager.h new file mode 100644 index 0000000000000..cd2d3a1f70a0b --- /dev/null +++ b/Utilities/XrdAdaptor/src/XrdRequestManager.h @@ -0,0 +1,170 @@ +#ifndef Utilities_XrdAdaptor_XrdRequestManager_h +#define Utilities_XrdAdaptor_XrdRequestManager_h + +#include +#include +#include +#include +#include +#include + +#include + +#include "FWCore/Utilities/interface/EDMException.h" + +#include "XrdCl/XrdClFileSystem.hh" + +#include "XrdRequest.h" +#include "XrdSource.h" + +namespace XrdCl { + class File; +} + +namespace XrdAdaptor { + +class RequestManager : boost::noncopyable { + +public: + RequestManager(const std::string & filename, XrdCl::OpenFlags::Flags flags, XrdCl::Access::Mode perms); + + ~RequestManager(); + + /** + * Interface for handling a client request. + */ + std::future handle(void * into, IOSize size, IOOffset off) + { + std::shared_ptr c_ptr(new XrdAdaptor::ClientRequest(*this, into, size, off)); + return handle(c_ptr); + } + + std::future handle(std::shared_ptr > iolist); + + /** + * Handle a client request. + * NOTE: The shared_ptr interface is required. Depending on the state of the manager, + * it may decide to issue multiple requests and return the first successful. In that case, + * some references to the client request may still be outstanding when this function returns. + */ + std::future handle(std::shared_ptr c_ptr); + + /** + * Handle a failed client request. + */ + void requestFailure(std::shared_ptr c_ptr); + + /** + * Retrieve the names of the active sources + * (primarily meant to enable meaningful log messages). + */ + void getActiveSourceNames(std::vector & sources); + + /** + * Retrieve the names of the disabled sources + * (primarily meant to enable meaningful log messages). + */ + void getDisabledSourceNames(std::vector & sources); + + /** + * Return a pointer to an active file. Useful for metadata + * operations. + */ + std::shared_ptr getActiveFile(); + + /** + * Add the list of active connections to the exception extra info. + */ + void addConnections(cms::Exception &); + + /** + * Return current filename + */ + const std::string & getFilename() const {return m_name;} + +private: + /** + * Handle the file-open response + */ + virtual void handleOpen(XrdCl::XRootDStatus &status, std::shared_ptr); + + /** + * Given a client request, split it into two requests lists. + */ + void splitClientRequest(const std::vector &iolist, std::vector &req1, std::vector &req2); + + /** + * Given a request, broadcast it to all sources. + * If active is true, broadcast is made to all active sources. + * Otherwise, broadcast is made to the inactive sources. + */ + void broadcastRequest(const ClientRequest &, bool active); + + /** + * Check our set of active sources. + * If necessary, this will kick off a search for a new source. + * The source check is somewhat expensive so it is only done once every + * second. + */ + void checkSources(timespec &now, IOSize requestSize); // TODO: inline + void checkSourcesImpl(timespec &now, IOSize requestSize); + + /** + * Prepare an opaque string appropriate for asking a redirector to open the + * current file but avoiding servers which we already have connections to. + */ + std::string prepareOpaqueString(); + + std::vector > m_activeSources; + std::vector > m_inactiveSources; + std::set m_disabledSourceStrings; + std::set > m_disabledSources; + + timespec m_lastSourceCheck; + // If set to true, the next active source should be 1; 0 otherwise. + bool m_nextInitialSourceToggle; + // The time when the next active source check should be performed. + timespec m_nextActiveSourceCheck; + bool searchMode; + + const std::string m_name; + XrdCl::OpenFlags::Flags m_flags; + XrdCl::Access::Mode m_perms; + std::recursive_mutex m_source_mutex; + + std::mt19937 m_generator; + std::uniform_real_distribution m_distribution; + + class OpenHandler : boost::noncopyable, public XrdCl::ResponseHandler { + + public: + OpenHandler(RequestManager & manager); + + /** + * Handle the file-open response + */ + virtual void HandleResponseWithHosts(XrdCl::XRootDStatus *status, XrdCl::AnyObject *response, XrdCl::HostList *hostList) override; + + /** + * Future-based version of the handler + * If called while a file-open is in progress, we will not start a new file-open. + * Instead, the callback will be fired for the ongoing open. + */ + std::shared_future > open(); + + private: + RequestManager & m_manager; + std::shared_future > m_shared_future; + std::promise > m_promise; + // When this is not null, there is a file-open in process + // Can only be touched when m_mutex is held. + std::unique_ptr m_file; + std::recursive_mutex m_mutex; + }; + + OpenHandler m_open_handler; +}; + +} + +#endif diff --git a/Utilities/XrdAdaptor/src/XrdSource.cc b/Utilities/XrdAdaptor/src/XrdSource.cc new file mode 100644 index 0000000000000..5bddcf6a7208a --- /dev/null +++ b/Utilities/XrdAdaptor/src/XrdSource.cc @@ -0,0 +1,89 @@ + +// See http://stackoverflow.com/questions/12523122/what-is-glibcxx-use-nanosleep-all-about +#define _GLIBCXX_USE_NANOSLEEP +#include +#include +#include +#include + +#include "XrdCl/XrdClFile.hh" + +#include "FWCore/MessageLogger/interface/MessageLogger.h" + +#include "XrdSource.h" +#include "XrdRequest.h" +#include "QualityMetric.h" + +#define MAX_REQUEST 256*1024 + +#ifdef XRD_FAKE_SLOW +//#define XRD_DELAY 5140 +#define XRD_DELAY 1000 +#define XRD_SLOW_RATE 2 +int g_delayCount = 0; +#else +int g_delayCount = 0; +#endif + +using namespace XrdAdaptor; + +Source::Source(timespec now, std::unique_ptr fh) + : m_lastDowngrade({0, 0}), + m_id(fh.get() ? fh->GetDataServer() : "(unknown)"), + m_fh(std::move(fh)), + m_qm(QualityMetricFactory::get(now, m_id)) +#ifdef XRD_FAKE_SLOW + , m_slow(++g_delayCount % XRD_SLOW_RATE == 0) + //, m_slow(++g_delayCount >= XRD_SLOW_RATE) + //, m_slow(true) +#endif +{ + assert(m_qm.get()); + assert(m_fh.get()); + m_buffer.reserve(MAX_REQUEST); +} + +Source::~Source() +{ + XrdCl::XRootDStatus status; + if (! (status = m_fh->Close()).IsOK()) + edm::LogWarning("XrdFileWarning") + << "Source::~Source() failed with error '" << status.ToString() + << "' (errno=" << status.errNo << ", code=" << status.code << ")"; + m_fh.reset(); +} + +std::shared_ptr +Source::getFileHandle() +{ + return m_fh; +} + +void +Source::handle(std::shared_ptr c) +{ + edm::LogVerbatim("XrdAdaptorInternal") << "Reading from " << ID() << ", quality " << m_qm->get() << std::endl; + c->m_source = shared_from_this(); + c->m_self_reference = c; + m_qm->startWatch(c->m_qmw); +#ifdef XRD_FAKE_SLOW + if (m_slow) std::this_thread::sleep_for(std::chrono::milliseconds(XRD_DELAY)); +#endif + if (c->m_into) + { + // See notes in ClientRequest definition to understand this voodoo. + m_fh->Read(c->m_off, c->m_size, c->m_into, c.get()); + } + else + { + XrdCl::ChunkList cl; + cl.reserve(c->m_iolist->size()); + for (const auto & it : *c->m_iolist) + { + XrdCl::ChunkInfo ci(it.offset(), it.size(), it.data()); + cl.emplace_back(ci); + } + m_fh->VectorRead(cl, nullptr, c.get()); + } +} + diff --git a/Utilities/XrdAdaptor/src/XrdSource.h b/Utilities/XrdAdaptor/src/XrdSource.h new file mode 100644 index 0000000000000..f8f979f846c7a --- /dev/null +++ b/Utilities/XrdAdaptor/src/XrdSource.h @@ -0,0 +1,58 @@ +#ifndef Utilities_XrdAdaptor_XrdSource_h +#define Utilities_XrdAdaptor_XrdSource_h + +#include +#include + +#include + +#include "QualityMetric.h" + +namespace XrdCl { + class File; +} + +namespace XrdAdaptor { + +class RequestList; +class ClientRequest; + +class Source : public std::enable_shared_from_this, boost::noncopyable { + +public: + Source(timespec now, std::unique_ptr fileHandle); + + ~Source(); + + void handle(std::shared_ptr); + + void handle(RequestList &); + + std::shared_ptr getFileHandle(); + + const std::string & ID() const {return m_id;} + + unsigned getQuality() {return m_qm->get();} + + struct timespec getLastDowngrade() const {return m_lastDowngrade;} + void setLastDowngrade(struct timespec now) {m_lastDowngrade = now;} + +private: + void requestCallback(/* TODO: type? */); + + struct timespec m_lastDowngrade; + std::string m_id; + std::shared_ptr m_fh; + + std::unique_ptr m_qm; + + std::vector m_buffer; + +#ifdef XRD_FAKE_SLOW + bool m_slow; +#endif +}; + +} + +#endif From 00ed4642fdb4c787f8be57645bff859350f0878d Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Tue, 13 Aug 2013 16:19:40 -0500 Subject: [PATCH 2/7] Re-enable setting of debug level. --- .../XrdAdaptor/plugins/XrdStorageMaker.cc | 4 +- Utilities/XrdAdaptor/src/XrdReadv.cc | 240 ------------------ 2 files changed, 2 insertions(+), 242 deletions(-) delete mode 100644 Utilities/XrdAdaptor/src/XrdReadv.cc diff --git a/Utilities/XrdAdaptor/plugins/XrdStorageMaker.cc b/Utilities/XrdAdaptor/plugins/XrdStorageMaker.cc index d87e3fce56da7..bd5c726cf8174 100644 --- a/Utilities/XrdAdaptor/plugins/XrdStorageMaker.cc +++ b/Utilities/XrdAdaptor/plugins/XrdStorageMaker.cc @@ -76,8 +76,8 @@ class XrdStorageMaker : public StorageMaker virtual void setDebugLevel (unsigned int level) { - //XrdCl::Log *log = XrdCl::DefaultEnv::GetLog(); - //log->SetLevel(static_cast(level+2)); + XrdCl::Log *log = XrdCl::DefaultEnv::GetLog(); + log->SetLevel(static_cast(level+2)); } }; diff --git a/Utilities/XrdAdaptor/src/XrdReadv.cc b/Utilities/XrdAdaptor/src/XrdReadv.cc deleted file mode 100644 index 26c32d4ea56a5..0000000000000 --- a/Utilities/XrdAdaptor/src/XrdReadv.cc +++ /dev/null @@ -1,240 +0,0 @@ - -/* - * These functions are a re-implementation of upstream's readv. - * The important aspect is we have vectored scatter-gather IO. - * In the upstream readv, the vectored IO goes into one buffer - - * not scatter gathered. - * - * CMSSW now uses scatter-gather in the TFileAdapter's ReadReapacker. - * Hence, we have to emulate it using XrdClient::ReadV - horribly slow! - * - * Why not continue to use the XrdClient's internal cache? Each time we use a - * different TTC, it invalidates the cache. So, the internal cache and our - * trigger-pattern TTC usage forces a use of readv instead of prefetch. - * - */ - -#include "Utilities/XrdAdaptor/src/XrdFile.h" -#include "XProtocol/XProtocol.hh" -#include "XrdClient/XrdClientProtocol.hh" -#include "XrdClient/XrdClientConst.hh" -#include "XrdClient/XrdClientSid.hh" -#include "FWCore/Utilities/interface/EDMException.h" -#include "FWCore/Utilities/interface/Likely.h" - -#include - -class MutexSentry -{ -public: - MutexSentry(pthread_mutex_t &mutex) : m_mutex(mutex) {pthread_mutex_lock(&m_mutex);} - - ~MutexSentry() {pthread_mutex_unlock(&m_mutex);} - -private: - pthread_mutex_t &m_mutex; - -}; - -// This method is rarely used by CMS; hence, it is a small wrapper and not efficient. -IOSize -XrdFile::readv (IOBuffer *into, IOSize n) -{ - vector new_buf; - new_buf.reserve(n); - IOOffset off = 0; - for (IOSize i=0; iGetHandle(); // also - 16 bytes offset from the location of m_client. - for (IOSize i = 0; i < n; ++i) { - - IOSize len = into[i].size(); - if (unlikely(len > 0x7fffffff)) { - edm::Exception ex(edm::errors::FileReadError); - ex << "XrdFile::readv(name='" << m_name << "')[" << i - << "].size=" << len << " exceeds read size limit 0x7fffffff"; - ex.addContext("Calling XrdFile::readv()"); - addConnection(ex); - throw ex; - } - - IOOffset off = into[i].offset(); - char *chunk_data = static_cast(into[i].data()); - while (len > 0) { // Iterate as long as there is additional data to read. - // Each iteration will read up to READV_MAXCHUNKSIZE of this request. - IOSize chunk_size = len > READV_MAXCHUNKSIZE ? READV_MAXCHUNKSIZE : len; - len -= chunk_size; - readv_total_len += chunk_size; - read_chunk_list[chunk_off].rlen = chunk_size; - read_chunk_list[chunk_off].offset = off; - result_list[chunk_off] = chunk_data; - chunk_data += chunk_size; - off += chunk_size; - memcpy(&(read_chunk_list[chunk_off].fhandle), handle, 4); - chunk_off++; - if (chunk_off == READV_MAXCHUNKS) { - // Now that we have broken the readv into Xrootd-sized chunks, send the actual command. - // readv_send will also parse the response and place the data into the result_list buffers. - IOSize tmp_total_len = readv_send(result_list, *read_chunk_list, chunk_off, readv_total_len); - total_len += tmp_total_len; - if (tmp_total_len != readv_total_len) - return total_len; - readv_total_len = 0; - chunk_off = 0; - } - } - } - // Do the actual readv for all remaining chunks. - if (chunk_off) { - total_len += readv_send(result_list, *read_chunk_list, chunk_off, readv_total_len); - } - return total_len; -} - -/* - * Send the readv request to Xrootd. - * Returns the number of bytes stored into result_list. - * Assumes that read_chunk_list and result_list are of size n; the results of the reads - * described in read_chunk_list will be stored in the buffer pointed to by result_list. - * total_len should be the sum of the size of all reads. - */ -IOSize -XrdFile::readv_send(char **result_list, readahead_list &read_chunk_list, IOSize n, IOSize total_len) -{ - // Per the xrootd protocol document: - // Sending requests using the same streamid when a kXR_oksofar status code has been - // returned may produced unpredictable results. A client must serialize all requests - // using the streamid in the presence of partial results. - - XrdClientConn *xrdc = m_client->GetClientConn(); - ClientRequest readvFileRequest; - memset( &readvFileRequest, 0, sizeof(readvFileRequest) ); - - kXR_unt16 sid = ConnectionManager->SidManager()->GetNewSid(); - memcpy(readvFileRequest.header.streamid, &sid, sizeof(kXR_unt16)); - readvFileRequest.header.requestid = kXR_readv; - readvFileRequest.readv.dlen = n * sizeof(struct readahead_list); - - std::vector res_buf; - res_buf.reserve( total_len + (n * sizeof(struct readahead_list)) ); - - // Encode, then send the command. - clientMarshallReadAheadList(&read_chunk_list, readvFileRequest.readv.dlen); - bool success; - IOSize data_length; - { - MutexSentry sentry(m_readv_mutex); - success = xrdc->SendGenCommand(&readvFileRequest, &read_chunk_list, 0, - (void *)&(res_buf[0]), FALSE, (char *)"ReadV"); - data_length = xrdc->LastServerResp.dlen; - } - clientUnMarshallReadAheadList(&read_chunk_list, readvFileRequest.readv.dlen); - - ConnectionManager->SidManager()->ReleaseSid(sid); - - if (success) { - return readv_unpack(result_list, res_buf, data_length, read_chunk_list, n); - } else { - return 0; - } - -} - -/* - * Unpack the response buffer from Xrootd into the final results buffer. - */ -IOSize -XrdFile::readv_unpack(char **result_list, std::vector &result_buf, IOSize response_length, readahead_list &read_chunk_list, IOSize n) -{ - IOSize response_offset = 0; - IOSize total_len = 0; - for (IOSize i = 0; i < n; i++) { - - if (unlikely(response_offset + sizeof(struct readahead_list) > response_length)) { - edm::Exception ex(edm::errors::FileReadError); - ex << "XrdFile::readv(name='" << m_name << "')[" << i - << "] returned an incorrectly-sized response (short header)"; - ex.addContext("Calling XrdFile::readv()"); - addConnection(ex); - } - - kXR_int64 offset; - kXR_int32 rlen; - { // Done as a separate block so response is not used later - as it is all in network order! - const readahead_list *response = reinterpret_cast(&result_buf[response_offset]); - offset = ntohll(response->offset); - rlen = ntohl(response->rlen); - } - - // Sanity / consistency checks; verify the results correspond to the requested chunk - // Also check that the response buffer is sufficient large to read from. - if (unlikely((&read_chunk_list)[i].offset != offset)) { - edm::Exception ex(edm::errors::FileReadError); - ex << "XrdFile::readv(name='" << m_name << "')[" << i - << "] returned offset " << offset << " does not match requested offset " - << (&read_chunk_list)[i].offset; - ex.addContext("Calling XrdFile::readv()"); - addConnection(ex); - throw ex; - } - if (unlikely((&read_chunk_list)[i].rlen != rlen)) { - edm::Exception ex(edm::errors::FileReadError); - ex << "XrdFile::readv(name='" << m_name << "')[" << i - << "] returned size " << rlen << " does not match requested size " - << (&read_chunk_list)[i].rlen; - ex.addContext("Calling XrdFile::readv()"); - addConnection(ex); - throw ex; - } - if (unlikely(response_offset + rlen > response_length)) { - edm::Exception ex(edm::errors::FileReadError); - ex << "XrdFile::readv(name='" << m_name << "')[" << i - << "] returned an incorrectly-sized response (short data)"; - ex.addContext("Calling XrdFile::readv()"); - addConnection(ex); - } - - response_offset += sizeof(struct readahead_list); // Data is stored after header. - total_len += rlen; - // Copy the data into place; increase the offset. - memcpy(result_list[i], &result_buf[response_offset], rlen); - response_offset += rlen; - } - - return total_len; -} - From 3eeb95e3f4add88fb8f6487bb38c44ebcf8dead3 Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Tue, 13 Aug 2013 21:05:56 -0500 Subject: [PATCH 3/7] Remove use of private Xrootd headers. --- Utilities/XrdAdaptor/BuildFile.xml | 2 ++ Utilities/XrdAdaptor/plugins/BuildFile.xml | 1 + .../XrdAdaptor/plugins/XrdStorageMaker.cc | 32 +++++++++++++++---- Utilities/XrdAdaptor/src/XrdRequest.h | 2 +- 4 files changed, 30 insertions(+), 7 deletions(-) diff --git a/Utilities/XrdAdaptor/BuildFile.xml b/Utilities/XrdAdaptor/BuildFile.xml index a5ef29cb3851f..550f72a43fe61 100644 --- a/Utilities/XrdAdaptor/BuildFile.xml +++ b/Utilities/XrdAdaptor/BuildFile.xml @@ -5,4 +5,6 @@ + + diff --git a/Utilities/XrdAdaptor/plugins/BuildFile.xml b/Utilities/XrdAdaptor/plugins/BuildFile.xml index 64c555b72a5a7..9a3f7660d1a84 100644 --- a/Utilities/XrdAdaptor/plugins/BuildFile.xml +++ b/Utilities/XrdAdaptor/plugins/BuildFile.xml @@ -1,6 +1,7 @@ + diff --git a/Utilities/XrdAdaptor/plugins/XrdStorageMaker.cc b/Utilities/XrdAdaptor/plugins/XrdStorageMaker.cc index bd5c726cf8174..00b1cbfc247c9 100644 --- a/Utilities/XrdAdaptor/plugins/XrdStorageMaker.cc +++ b/Utilities/XrdAdaptor/plugins/XrdStorageMaker.cc @@ -1,3 +1,6 @@ + +#include "FWCore/Utilities/interface/EDMException.h" + #include "Utilities/StorageFactory/interface/StorageMaker.h" #include "Utilities/StorageFactory/interface/StorageMakerFactory.h" #include "Utilities/StorageFactory/interface/StorageFactory.h" @@ -7,10 +10,6 @@ #include "XrdClient/XrdClientAdmin.hh" #include "XrdClient/XrdClientUrlSet.hh" #include "XrdCl/XrdClDefaultEnv.hh" -// We muck with internal symbols of XrdCl to avoid duplicate definition issues -// See https://github.com/xrootd/xrootd/issues/16 -#define __XRD_CL_OPTIMIZERS_HH__ -#include "XrdCl/XrdClLog.hh" class XrdStorageMaker : public StorageMaker @@ -76,8 +75,29 @@ class XrdStorageMaker : public StorageMaker virtual void setDebugLevel (unsigned int level) { - XrdCl::Log *log = XrdCl::DefaultEnv::GetLog(); - log->SetLevel(static_cast(level+2)); + switch (level) + { + case 0: + XrdCl::DefaultEnv::SetLogLevel("Error"); + break; + case 1: + XrdCl::DefaultEnv::SetLogLevel("Warning"); + break; + case 2: + XrdCl::DefaultEnv::SetLogLevel("Info"); + break; + case 3: + XrdCl::DefaultEnv::SetLogLevel("Debug"); + break; + case 4: + XrdCl::DefaultEnv::SetLogLevel("Dump"); + break; + default: + edm::Exception ex(edm::errors::Configuration); + ex << "Invalid log level specified " << level; + ex.addContext("Calling XrdStorageMaker::setDebugLevel()"); + throw ex; + } } }; diff --git a/Utilities/XrdAdaptor/src/XrdRequest.h b/Utilities/XrdAdaptor/src/XrdRequest.h index 60c32e21805f8..0a4021f54c55d 100644 --- a/Utilities/XrdAdaptor/src/XrdRequest.h +++ b/Utilities/XrdAdaptor/src/XrdRequest.h @@ -5,7 +5,7 @@ #include #include -#include +#include #include "Utilities/StorageFactory/interface/Storage.h" From 00489025e2dacab48889e42f55613a4a561dc0c8 Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Fri, 16 Aug 2013 12:26:41 -0500 Subject: [PATCH 4/7] Per review comments, remove usage of clock_gettime on OS X. --- Utilities/XrdAdaptor/src/QualityMetric.cc | 26 +++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/Utilities/XrdAdaptor/src/QualityMetric.cc b/Utilities/XrdAdaptor/src/QualityMetric.cc index 5f42872cbbeab..e4997f0bf1c56 100644 --- a/Utilities/XrdAdaptor/src/QualityMetric.cc +++ b/Utilities/XrdAdaptor/src/QualityMetric.cc @@ -5,13 +5,29 @@ #include "QualityMetric.h" +#ifdef __MACH__ +#include +#include +#endif + using namespace XrdAdaptor; QualityMetricWatch::QualityMetricWatch(QualityMetric *parent1, QualityMetric *parent2) : m_parent1(parent1), m_parent2(parent2) { // TODO: just assuming success. +#ifdef __MACH__ + clock_serv_t cclock; + mach_timespec_t mts; + + host_get_clock_service(mach_host_self(), SYSTEM_CLOCK, &cclock); + clock_get_time(cclock, &mts); + mach_port_deallocate(mach_task_self(), cclock); + m_start.tv_sec = mts.tv_sec; + m_start.tv_nsec = mts.tv_nsec; +#else clock_gettime(CLOCK_MONOTONIC, &m_start); +#endif } QualityMetricWatch::~QualityMetricWatch() @@ -19,7 +35,17 @@ QualityMetricWatch::~QualityMetricWatch() if (m_parent1 && m_parent2) { timespec stop; +#ifdef __MACH__ + clock_serv_t cclock; + mach_timespec_t mts; + host_get_clock_service(mach_host_self(), SYSTEM_CLOCK, &cclock); + clock_get_time(cclock, &mts); + mach_port_deallocate(mach_task_self(), cclock); + stop.tv_sec = mts.tv_sec; + stop.tv_nsec = mts.tv_nsec; +#else clock_gettime(CLOCK_MONOTONIC, &stop); +#endif int ms = 1000*(stop.tv_sec - m_start.tv_sec) + (stop.tv_nsec - m_start.tv_nsec)/1e6; edm::LogVerbatim("XrdAdaptorInternal") << "Finished timer after " << ms << std::endl; m_parent1->finishWatch(stop, ms); From 3bbeb097023c406c27fc0fb34bc051f2fbf8e7e0 Mon Sep 17 00:00:00 2001 From: Brian Bockelman Date: Fri, 23 Aug 2013 08:20:44 -0500 Subject: [PATCH 5/7] Fix other uses of CLOCK_MONOTONIC. --- Utilities/XrdAdaptor/src/QualityMetric.cc | 40 ++++++++----------- Utilities/XrdAdaptor/src/XrdRequestManager.cc | 28 ++++++++++--- 2 files changed, 40 insertions(+), 28 deletions(-) diff --git a/Utilities/XrdAdaptor/src/QualityMetric.cc b/Utilities/XrdAdaptor/src/QualityMetric.cc index e4997f0bf1c56..91cca814dd6a2 100644 --- a/Utilities/XrdAdaptor/src/QualityMetric.cc +++ b/Utilities/XrdAdaptor/src/QualityMetric.cc @@ -8,26 +8,29 @@ #ifdef __MACH__ #include #include +#define GET_CLOCK_MONOTONIC(ts) \ +{ \ + clock_serv_t cclock; \ + mach_timespec_t mts; \ + host_get_clock_service(mach_host_self(), SYSTEM_CLOCK, &cclock); \ + clock_get_time(cclock, &mts); \ + mach_port_deallocate(mach_task_self(), cclock); \ + ts.tv_sec = mts.tv_sec; \ + ts.tv_nsec = mts.tv_nsec; \ +} +#else +#define GET_CLOCK_MONOTONIC(ts) \ + clock_gettime(CLOCK_MONOTONIC, &ts); #endif + using namespace XrdAdaptor; QualityMetricWatch::QualityMetricWatch(QualityMetric *parent1, QualityMetric *parent2) : m_parent1(parent1), m_parent2(parent2) { // TODO: just assuming success. -#ifdef __MACH__ - clock_serv_t cclock; - mach_timespec_t mts; - - host_get_clock_service(mach_host_self(), SYSTEM_CLOCK, &cclock); - clock_get_time(cclock, &mts); - mach_port_deallocate(mach_task_self(), cclock); - m_start.tv_sec = mts.tv_sec; - m_start.tv_nsec = mts.tv_nsec; -#else - clock_gettime(CLOCK_MONOTONIC, &m_start); -#endif + GET_CLOCK_MONOTONIC(m_start); } QualityMetricWatch::~QualityMetricWatch() @@ -35,17 +38,8 @@ QualityMetricWatch::~QualityMetricWatch() if (m_parent1 && m_parent2) { timespec stop; -#ifdef __MACH__ - clock_serv_t cclock; - mach_timespec_t mts; - host_get_clock_service(mach_host_self(), SYSTEM_CLOCK, &cclock); - clock_get_time(cclock, &mts); - mach_port_deallocate(mach_task_self(), cclock); - stop.tv_sec = mts.tv_sec; - stop.tv_nsec = mts.tv_nsec; -#else - clock_gettime(CLOCK_MONOTONIC, &stop); -#endif + GET_CLOCK_MONOTONIC(stop); + int ms = 1000*(stop.tv_sec - m_start.tv_sec) + (stop.tv_nsec - m_start.tv_nsec)/1e6; edm::LogVerbatim("XrdAdaptorInternal") << "Finished timer after " << ms << std::endl; m_parent1->finishWatch(stop, ms); diff --git a/Utilities/XrdAdaptor/src/XrdRequestManager.cc b/Utilities/XrdAdaptor/src/XrdRequestManager.cc index bfc2643a48318..7f5910fd0bf68 100644 --- a/Utilities/XrdAdaptor/src/XrdRequestManager.cc +++ b/Utilities/XrdAdaptor/src/XrdRequestManager.cc @@ -26,6 +26,24 @@ #define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE 100 #endif +#ifdef __MACH__ +#include +#include +#define GET_CLOCK_MONOTONIC(ts) \ +{ \ + clock_serv_t cclock; \ + mach_timespec_t mts; \ + host_get_clock_service(mach_host_self(), SYSTEM_CLOCK, &cclock); \ + clock_get_time(cclock, &mts); \ + mach_port_deallocate(mach_task_self(), cclock); \ + ts.tv_sec = mts.tv_sec; \ + ts.tv_nsec = mts.tv_nsec; \ +} +#else +#define GET_CLOCK_MONOTONIC(ts) \ + clock_gettime(CLOCK_MONOTONIC, &ts); +#endif + using namespace XrdAdaptor; long long timeDiffMS(const timespec &a, const timespec &b) @@ -59,7 +77,7 @@ RequestManager::RequestManager(const std::string &filename, XrdCl::OpenFlags::Fl } timespec ts; - clock_gettime(CLOCK_MONOTONIC, &ts); + GET_CLOCK_MONOTONIC(ts); std::shared_ptr source(new Source(ts, std::move(file))); { @@ -227,7 +245,7 @@ RequestManager::handle(std::shared_ptr c_ptr) { assert(c_ptr.get()); timespec now; - clock_gettime(CLOCK_MONOTONIC, &now); + GET_CLOCK_MONOTONIC(now); checkSources(now, c_ptr->getSize()); std::shared_ptr source = nullptr; @@ -315,7 +333,7 @@ XrdAdaptor::RequestManager::handle(std::shared_ptr > io std::lock_guard sentry(m_source_mutex); timespec now; - clock_gettime(CLOCK_MONOTONIC, &now); + GET_CLOCK_MONOTONIC(now); edm::CPUTimer timer; timer.start(); @@ -401,7 +419,7 @@ RequestManager::requestFailure(std::shared_ptr c_ptr) { std::shared_future > future = m_open_handler.open(); timespec now; - clock_gettime(CLOCK_MONOTONIC, &now); + GET_CLOCK_MONOTONIC(now); m_lastSourceCheck = now; // Note we only wait for 60 seconds here. This is because we've already failed // once and the likelihood the program has some inconsistent state is decent. @@ -535,7 +553,7 @@ XrdAdaptor::RequestManager::OpenHandler::HandleResponseWithHosts(XrdCl::XRootDSt if (status->IsOK()) { timespec now; - clock_gettime(CLOCK_MONOTONIC, &now); + GET_CLOCK_MONOTONIC(now); std::shared_ptr source(new Source(now, std::move(m_file))); m_promise.set_value(source); m_manager.handleOpen(*status, source); From 2ee604309194bbddd4b87b0dd1e1bb9077834a16 Mon Sep 17 00:00:00 2001 From: Brian Date: Tue, 3 Sep 2013 15:35:12 +0200 Subject: [PATCH 6/7] No longer force a new stat to be performed on an open file; this fails on EOS. Fixup error messages to use the correct ToStr. --- Utilities/XrdAdaptor/src/XrdFile.cc | 8 ++++---- Utilities/XrdAdaptor/src/XrdRequest.cc | 4 ++-- Utilities/XrdAdaptor/src/XrdSource.cc | 2 +- 3 files changed, 7 insertions(+), 7 deletions(-) diff --git a/Utilities/XrdAdaptor/src/XrdFile.cc b/Utilities/XrdAdaptor/src/XrdFile.cc index fce82ca1b1620..52c4020915190 100644 --- a/Utilities/XrdAdaptor/src/XrdFile.cc +++ b/Utilities/XrdAdaptor/src/XrdFile.cc @@ -150,10 +150,10 @@ XrdFile::open (const char *name, auto file = getActiveFile(); XrdCl::XRootDStatus status; XrdCl::StatInfo *statInfo = NULL; - if (! (status = file->Stat(true, statInfo)).IsOK()) { + if (! (status = file->Stat(false, statInfo)).IsOK()) { edm::Exception ex(edm::errors::FileOpenError); ex << "XrdCl::File::Stat(name='" << name - << ") => error '" << status.ToString() + << ") => error '" << status.ToStr() << "' (errno=" << status.errNo << ", code=" << status.code << ")"; ex.addContext("Calling XrdFile::open()"); addConnection(ex); @@ -343,7 +343,7 @@ XrdFile::write (const void *from, IOSize n) if (!s.IsOK()) { cms::Exception ex("FileWriteError"); ex << "XrdFile::write(name='" << m_name << "', n=" << n - << ") failed with error '" << s.ToString() + << ") failed with error '" << s.ToStr() << "' (errno=" << s.errNo << ", code=" << s.code << ")"; ex.addContext("Calling XrdFile::write()"); addConnection(ex); @@ -374,7 +374,7 @@ XrdFile::write (const void *from, IOSize n, IOOffset pos) if (!s.IsOK()) { cms::Exception ex("FileWriteError"); ex << "XrdFile::write(name='" << m_name << "', n=" << n - << ") failed with error '" << s.ToString() + << ") failed with error '" << s.ToStr() << "' (errno=" << s.errNo << ", code=" << s.code << ")"; ex.addContext("Calling XrdFile::write()"); addConnection(ex); diff --git a/Utilities/XrdAdaptor/src/XrdRequest.cc b/Utilities/XrdAdaptor/src/XrdRequest.cc index a55c928b7840d..1947430ca3738 100644 --- a/Utilities/XrdAdaptor/src/XrdRequest.cc +++ b/Utilities/XrdAdaptor/src/XrdRequest.cc @@ -49,7 +49,7 @@ XrdAdaptor::ClientRequest::HandleResponse(XrdCl::XRootDStatus *stat, XrdCl::AnyO edm::LogWarning("XrdAdaptorInternal") << "XrdRequestManager::handle(name='" << m_manager.getFilename() << ") failure when reading from " << (source ? source->ID() : "(unknown source)") - << "; failed with error '" << status->ToString() << "' (errno=" + << "; failed with error '" << status->ToStr() << "' (errno=" << status->errNo << ", code=" << status->code << ")."; m_failure_count++; try @@ -67,7 +67,7 @@ XrdAdaptor::ClientRequest::HandleResponse(XrdCl::XRootDStatus *stat, XrdCl::AnyO { edm::Exception ex(edm::errors::FileReadError); ex << "XrdRequestManager::handle(name='" << m_manager.getFilename() - << ") failed with error '" << status->ToString() + << ") failed with error '" << status->ToStr() << "' (errno=" << status->errNo << ", code=" << status->code << "). Unknown exception occurred when running" << " connection recovery."; diff --git a/Utilities/XrdAdaptor/src/XrdSource.cc b/Utilities/XrdAdaptor/src/XrdSource.cc index 5bddcf6a7208a..8848915228253 100644 --- a/Utilities/XrdAdaptor/src/XrdSource.cc +++ b/Utilities/XrdAdaptor/src/XrdSource.cc @@ -48,7 +48,7 @@ Source::~Source() XrdCl::XRootDStatus status; if (! (status = m_fh->Close()).IsOK()) edm::LogWarning("XrdFileWarning") - << "Source::~Source() failed with error '" << status.ToString() + << "Source::~Source() failed with error '" << status.ToStr() << "' (errno=" << status.errNo << ", code=" << status.code << ")"; m_fh.reset(); } From fb3a8b190829389c83848f3df22ae5e793d692f0 Mon Sep 17 00:00:00 2001 From: Brian Date: Tue, 3 Sep 2013 15:37:39 +0200 Subject: [PATCH 7/7] Keep a copy of the original exception info and append it to the fallback file-open failure message. --- IOPool/Input/src/RootInputFileSequence.cc | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/IOPool/Input/src/RootInputFileSequence.cc b/IOPool/Input/src/RootInputFileSequence.cc index 3d88d84459884..53d35f14225fd 100644 --- a/IOPool/Input/src/RootInputFileSequence.cc +++ b/IOPool/Input/src/RootInputFileSequence.cc @@ -199,6 +199,7 @@ namespace edm { bool hasFallbackUrl = !fallbackName.empty() && fallbackName != fileIter_->fileName(); boost::shared_ptr filePtr; + std::list originalInfo; try { std::unique_ptr sentry(inputType_ == InputType::Primary ? new InputSource::FileOpenSentry(input_, lfn_, usedFallback_) : 0); @@ -211,6 +212,7 @@ namespace edm { out << e.explainSelf(); std::string pfn(gSystem->ExpandPathName(fallbackName.c_str())); InputFile::reportFallbackAttempt(pfn, fileIter_->logicalFileName(), out.str()); + originalInfo = e.additionalInfo(); } else { InputFile::reportSkippedFile(fileIter_->fileName(), fileIter_->logicalFileName()); Exception ex(errors::FileOpenError, "", e); @@ -237,7 +239,15 @@ namespace edm { std::ostringstream out; out << "Input file " << fileIter_->fileName() << " could not be opened.\n"; out << "Fallback Input file " << fallbackName << " also could not be opened."; - ex.addAdditionalInfo(out.str()); + if (originalInfo.size()) { + out << std::endl << "Original exception info is above; fallback exception info is below."; + ex.addAdditionalInfo(out.str()); + for (auto const & s : originalInfo) { + ex.addAdditionalInfo(s); + } + } else { + ex.addAdditionalInfo(out.str()); + } throw ex; } }