Skip to content

Commit

Permalink
Merge pull request #2012 from rouault/fix_1244
Browse files Browse the repository at this point in the history
/vsicurl (and derived filesystems): fix concurrency issue with multithreaded reads (fixes #1244)
  • Loading branch information
rouault committed Nov 12, 2019
2 parents c2ed2bc + 28298cf commit efc8cf4
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 27 deletions.
41 changes: 23 additions & 18 deletions gdal/port/cpl_vsil_curl.cpp
Expand Up @@ -1216,14 +1216,14 @@ CPLString VSICurlHandle::GetRedirectURLIfValid(bool& bHasExpired)
/* DownloadRegion() */
/************************************************************************/

bool VSICurlHandle::DownloadRegion( const vsi_l_offset startOffset,
const int nBlocks )
std::string VSICurlHandle::DownloadRegion( const vsi_l_offset startOffset,
const int nBlocks )
{
if( bInterrupted && bStopOnInterruptUntilUninstall )
return false;
return std::string();

if( oFileProp.eExists == EXIST_NO )
return false;
return std::string();

CURLM* hCurlMultiHandle = poFS->GetCurlMultiHandleFor(m_pszURL);

Expand Down Expand Up @@ -1308,7 +1308,7 @@ bool VSICurlHandle::DownloadRegion( const vsi_l_offset startOffset,
CPLFree(sWriteFuncHeaderData.pBuffer);
curl_easy_cleanup(hCurlHandle);

return false;
return std::string();
}

long response_code = 0;
Expand Down Expand Up @@ -1443,7 +1443,7 @@ bool VSICurlHandle::DownloadRegion( const vsi_l_offset startOffset,
CPLFree(sWriteFuncData.pBuffer);
CPLFree(sWriteFuncHeaderData.pBuffer);
curl_easy_cleanup(hCurlHandle);
return false;
return std::string();
}

if( !oFileProp.bHasComputedFileSize && sWriteFuncHeaderData.pBuffer )
Expand Down Expand Up @@ -1514,11 +1514,14 @@ bool VSICurlHandle::DownloadRegion( const vsi_l_offset startOffset,
sWriteFuncData.pBuffer,
sWriteFuncData.nSize);

std::string osRet;
osRet.assign(sWriteFuncData.pBuffer, sWriteFuncData.nSize);

CPLFree(sWriteFuncData.pBuffer);
CPLFree(sWriteFuncHeaderData.pBuffer);
curl_easy_cleanup(hCurlHandle);

return true;
return osRet;
}

/************************************************************************/
Expand Down Expand Up @@ -1599,8 +1602,13 @@ size_t VSICurlHandle::Read( void * const pBufferIn, size_t const nSize,

const vsi_l_offset nOffsetToDownload =
(iterOffset / DOWNLOAD_CHUNK_SIZE) * DOWNLOAD_CHUNK_SIZE;
std::string osRegion;
std::shared_ptr<std::string> psRegion = poFS->GetRegion(m_pszURL, nOffsetToDownload);
if( psRegion == nullptr )
if( psRegion != nullptr )
{
osRegion = *psRegion;
}
else
{
if( nOffsetToDownload == lastDownloadedOffset )
{
Expand Down Expand Up @@ -1630,6 +1638,8 @@ size_t VSICurlHandle::Read( void * const pBufferIn, size_t const nSize,
nBlocksToDownload = nMinBlocksToDownload;

// Avoid reading already cached data.
// Note: this might get evicted if concurrent reads are done, but
// this should not cause bugs. Just missed optimization.
for( int i = 1; i < nBlocksToDownload; i++ )
{
if( poFS->GetRegion(
Expand All @@ -1644,30 +1654,25 @@ size_t VSICurlHandle::Read( void * const pBufferIn, size_t const nSize,
if( nBlocksToDownload > N_MAX_REGIONS )
nBlocksToDownload = N_MAX_REGIONS;

if( DownloadRegion(nOffsetToDownload, nBlocksToDownload) == false )
osRegion = DownloadRegion(nOffsetToDownload, nBlocksToDownload);
if( osRegion.empty() )
{
if( !bInterrupted )
bEOF = true;
return 0;
}
psRegion = poFS->GetRegion(m_pszURL, iterOffset);
}
if( psRegion == nullptr )
{
bEOF = true;
return 0;
}
const int nToCopy = static_cast<int>(
std::min(static_cast<vsi_l_offset>(nBufferRequestSize),
psRegion->size() -
osRegion.size() -
(iterOffset - nOffsetToDownload)));
memcpy(pBuffer,
psRegion->data() + iterOffset - nOffsetToDownload,
osRegion.data() + iterOffset - nOffsetToDownload,
nToCopy);
pBuffer = static_cast<char *>(pBuffer) + nToCopy;
iterOffset += nToCopy;
nBufferRequestSize -= nToCopy;
if( psRegion->size() != static_cast<size_t>(DOWNLOAD_CHUNK_SIZE) &&
if( osRegion.size() < static_cast<size_t>(DOWNLOAD_CHUNK_SIZE) &&
nBufferRequestSize != 0 )
{
break;
Expand Down
2 changes: 1 addition & 1 deletion gdal/port/cpl_vsil_curl_class.h
Expand Up @@ -304,7 +304,7 @@ class VSICurlHandle : public VSIVirtualHandle

bool bEOF = false;

virtual bool DownloadRegion(vsi_l_offset startOffset, int nBlocks);
virtual std::string DownloadRegion(vsi_l_offset startOffset, int nBlocks);

bool m_bUseHead = false;

Expand Down
19 changes: 11 additions & 8 deletions gdal/port/cpl_vsil_webhdfs.cpp
Expand Up @@ -108,7 +108,7 @@ class VSIWebHDFSHandle final : public VSICurlHandle
CPLString m_osUsernameParam{};
CPLString m_osDelegationParam{};

bool DownloadRegion(vsi_l_offset startOffset, int nBlocks) override;
std::string DownloadRegion(vsi_l_offset startOffset, int nBlocks) override;

public:
VSIWebHDFSHandle( VSIWebHDFSFSHandler* poFS,
Expand Down Expand Up @@ -997,15 +997,15 @@ vsi_l_offset VSIWebHDFSHandle::GetFileSize( bool bSetError )
/* DownloadRegion() */
/************************************************************************/

bool VSIWebHDFSHandle::DownloadRegion( const vsi_l_offset startOffset,
const int nBlocks )
std::string VSIWebHDFSHandle::DownloadRegion( const vsi_l_offset startOffset,
const int nBlocks )
{
if( bInterrupted && bStopOnInterruptUntilUninstall )
return false;
return std::string();

poFS->GetCachedFileProp(m_pszURL, oFileProp);
if( oFileProp.eExists == EXIST_NO )
return false;
return std::string();

CURLM* hCurlMultiHandle = poFS->GetCurlMultiHandleFor(m_pszURL);

Expand Down Expand Up @@ -1076,7 +1076,7 @@ bool VSIWebHDFSHandle::DownloadRegion( const vsi_l_offset startOffset,
CPLFree(sWriteFuncData.pBuffer);
curl_easy_cleanup(hCurlHandle);

return false;
return std::string();
}

long response_code = 0;
Expand Down Expand Up @@ -1147,7 +1147,7 @@ bool VSIWebHDFSHandle::DownloadRegion( const vsi_l_offset startOffset,
}
CPLFree(sWriteFuncData.pBuffer);
curl_easy_cleanup(hCurlHandle);
return false;
return std::string();
}

oFileProp.eExists = EXIST_YES;
Expand All @@ -1157,10 +1157,13 @@ bool VSIWebHDFSHandle::DownloadRegion( const vsi_l_offset startOffset,
sWriteFuncData.pBuffer,
sWriteFuncData.nSize);

std::string osRet;
osRet.assign(sWriteFuncData.pBuffer, sWriteFuncData.nSize);

CPLFree(sWriteFuncData.pBuffer);
curl_easy_cleanup(hCurlHandle);

return true;
return osRet;
}


Expand Down

0 comments on commit efc8cf4

Please sign in to comment.