Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Prevent too many chunks from being added per function call (forward port to 7_6_X). #10424

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions Utilities/XrdAdaptor/src/XrdRequestManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
#define XRD_ADAPTOR_SOURCE_QUALITY_FUDGE 100
#endif

#define XRD_ADAPTOR_CHUNK_THRESHOLD 1000


#ifdef __MACH__
#include <mach/clock.h>
#include <mach/mach.h>
Expand Down Expand Up @@ -740,7 +743,7 @@ RequestManager::requestFailure(std::shared_ptr<XrdAdaptor::ClientRequest> c_ptr,
static void
consumeChunkFront(size_t &front, std::vector<IOPosBuffer> &input, std::vector<IOPosBuffer> &output, IOSize chunksize)
{
while ((chunksize > 0) && (front < input.size()))
while ((chunksize > 0) && (front < input.size()) && (output.size() <= XRD_ADAPTOR_CHUNK_THRESHOLD))
{
IOPosBuffer &io = input[front];
IOPosBuffer &outio = output.back();
Expand Down Expand Up @@ -789,7 +792,7 @@ consumeChunkFront(size_t &front, std::vector<IOPosBuffer> &input, std::vector<IO
static void
consumeChunkBack(size_t front, std::vector<IOPosBuffer> &input, std::vector<IOPosBuffer> &output, IOSize chunksize)
{
while ((chunksize > 0) && (front < input.size()))
while ((chunksize > 0) && (front < input.size()) && (output.size() <= XRD_ADAPTOR_CHUNK_THRESHOLD))
{
IOPosBuffer &io = input.back();
IOPosBuffer &outio = output.back();
Expand Down Expand Up @@ -864,15 +867,16 @@ XrdAdaptor::RequestManager::splitClientRequest(const std::vector<IOPosBuffer> &i
float q1 = static_cast<float>(m_activeSources[0]->getQuality())+5;
float q2 = static_cast<float>(m_activeSources[1]->getQuality())+5;
IOSize chunk1, chunk2;
chunk1 = static_cast<float>(XRD_CL_MAX_CHUNK)*(q2*q2/(q1*q1+q2*q2));
chunk2 = static_cast<float>(XRD_CL_MAX_CHUNK)*(q1*q1/(q1*q1+q2*q2));
// Make sure the chunk size is at least 1024; little point to reads less than that size.
chunk1 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK)*(q2*q2/(q1*q1+q2*q2))), static_cast<IOSize>(1024));
chunk2 = std::max(static_cast<IOSize>(static_cast<float>(XRD_CL_MAX_CHUNK)*(q1*q1/(q1*q1+q2*q2))), static_cast<IOSize>(1024));

IOSize size_orig = 0;
for (const auto & it : iolist) size_orig += it.size();

while (tmp_iolist.size()-front > 0)
{
if ((req1.size() >= 1000) && (req2.size() >= 1000))
if ((req1.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD) && (req2.size() >= XRD_ADAPTOR_CHUNK_THRESHOLD))
{ // The XrdFile::readv implementation should guarantee that no more than approximately 1024 chunks
// are passed to the request manager. However, because we have a max chunk size, we increase
// the total number slightly. Theoretically, it's possible an individual readv of total size >2GB where
Expand All @@ -891,8 +895,8 @@ XrdAdaptor::RequestManager::splitClientRequest(const std::vector<IOPosBuffer> &i
ex.addAdditionalInfo(ss2.str());
throw ex;
}
if (req1.size() < 1000) {consumeChunkFront(front, tmp_iolist, req1, chunk1);}
if (req2.size() < 1000) {consumeChunkBack(front, tmp_iolist, req2, chunk2);}
if (req1.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {consumeChunkFront(front, tmp_iolist, req1, chunk1);}
if (req2.size() < XRD_ADAPTOR_CHUNK_THRESHOLD) {consumeChunkBack(front, tmp_iolist, req2, chunk2);}
}
std::sort(req1.begin(), req1.end(), [](const IOPosBuffer & left, const IOPosBuffer & right){return left.offset() < right.offset();});
std::sort(req2.begin(), req2.end(), [](const IOPosBuffer & left, const IOPosBuffer & right){return left.offset() < right.offset();});
Expand Down