Permalink
Browse files

Merge pull request #1284 from rouault/retry_curl_write

/vsis3/, /vsigs/, /vsiaz/: add HTTP retry logic in writing code paths
  • Loading branch information...
rouault committed Feb 11, 2019
2 parents 51e2dba + 132edb7 commit 1d799cf5d8c399d9ac3feb7ef9f05dc6e3297a82
@@ -583,7 +583,76 @@ def method(request):
pytest.fail(ret)
gdal.VSIFCloseL(f)


###############################################################################
# Test write with retry


def test_vsiaz_write_blockblob_retry():

if gdaltest.webserver_port == 0:
pytest.skip()

gdal.VSICurlClearCache()

# Test creation of BlockBob
f = gdal.VSIFOpenL('/vsiaz/test_copy/file.bin', 'wb')
assert f is not None

with gdaltest.config_options({'GDAL_HTTP_MAX_RETRY': '2',
'GDAL_HTTP_RETRY_DELAY': '0.01'}):

handler = webserver.SequentialHandler()

def method(request):
request.protocol_version = 'HTTP/1.1'
request.wfile.write('HTTP/1.1 100 Continue\r\n\r\n'.encode('ascii'))
content = request.rfile.read(3).decode('ascii')
if len(content) != 3:
sys.stderr.write('Bad headers: %s\n' % str(request.headers))
request.send_response(403)
request.send_header('Content-Length', 0)
request.end_headers()
return
request.send_response(201)
request.send_header('Content-Length', 0)
request.end_headers()

handler.add('PUT', '/azure/blob/myaccount/test_copy/file.bin', 502)
handler.add('PUT', '/azure/blob/myaccount/test_copy/file.bin', custom_method=method)
with webserver.install_http_handler(handler):
assert gdal.VSIFWriteL('foo', 1, 3, f) == 3
gdal.VSIFCloseL(f)

###############################################################################
# Test write with retry


def test_vsiaz_write_appendblob_retry():

if gdaltest.webserver_port == 0:
pytest.skip()

gdal.VSICurlClearCache()

with gdaltest.config_options({'GDAL_HTTP_MAX_RETRY': '2',
'GDAL_HTTP_RETRY_DELAY': '0.01',
'VSIAZ_CHUNK_SIZE_BYTES': '10'}):

f = gdal.VSIFOpenL('/vsiaz/test_copy/file.bin', 'wb')
assert f is not None

handler = webserver.SequentialHandler()
handler.add('PUT', '/azure/blob/myaccount/test_copy/file.bin', 502)
handler.add('PUT', '/azure/blob/myaccount/test_copy/file.bin', 201)
handler.add('PUT', '/azure/blob/myaccount/test_copy/file.bin?comp=appendblock', 502)
handler.add('PUT', '/azure/blob/myaccount/test_copy/file.bin?comp=appendblock', 201)
handler.add('PUT', '/azure/blob/myaccount/test_copy/file.bin?comp=appendblock', 502)
handler.add('PUT', '/azure/blob/myaccount/test_copy/file.bin?comp=appendblock', 201)

with webserver.install_http_handler(handler):
assert gdal.VSIFWriteL('0123456789abcdef', 1, 16, f) == 16
gdal.VSIFCloseL(f)

###############################################################################
# Test Unlink()

@@ -411,7 +411,52 @@ def method(request):
ret = gdal.VSIFCloseL(f)
assert ret != 0


###############################################################################
# Test write with retry


def test_vsigs_write_retry():

if gdaltest.webserver_port == 0:
pytest.skip()

gdal.VSICurlClearCache()

with gdaltest.config_options({'GDAL_HTTP_MAX_RETRY': '2',
'GDAL_HTTP_RETRY_DELAY': '0.01'}):

f = gdal.VSIFOpenL('/vsigs/test_write_retry/put_with_retry.bin', 'wb')
assert f is not None

handler = webserver.SequentialHandler()

def method(request):
request.protocol_version = 'HTTP/1.1'
request.wfile.write('HTTP/1.1 100 Continue\r\n\r\n'.encode('ascii'))
content = ''
while True:
numchars = int(request.rfile.readline().strip(), 16)
content += request.rfile.read(numchars).decode('ascii')
request.rfile.read(2)
if numchars == 0:
break
if len(content) != 3:
sys.stderr.write('Bad headers: %s\n' % str(request.headers))
request.send_response(403)
request.send_header('Content-Length', 0)
request.end_headers()
return
request.send_response(200)
request.send_header('Content-Length', 0)
request.end_headers()

handler.add('PUT', '/test_write_retry/put_with_retry.bin', 502)
handler.add('PUT', '/test_write_retry/put_with_retry.bin', custom_method=method)

with webserver.install_http_handler(handler):
assert gdal.VSIFWriteL('foo', 1, 3, f) == 3
gdal.VSIFCloseL(f)

###############################################################################
# Read credentials with OAuth2 refresh_token

@@ -1249,6 +1249,54 @@ def method(request):
gdal.VSIFCloseL(f)
assert gdal.GetLastErrorMsg() == ''

###############################################################################
# Test simple PUT support with retry logic


def test_vsis3_write_single_put_retry():

if gdaltest.webserver_port == 0:
pytest.skip()

with gdaltest.config_options({'GDAL_HTTP_MAX_RETRY': '2',
'GDAL_HTTP_RETRY_DELAY': '0.01'}):

with webserver.install_http_handler(webserver.SequentialHandler()):
f = gdal.VSIFOpenL('/vsis3/s3_fake_bucket3/put_with_retry.bin', 'wb')
assert f is not None
assert gdal.VSIFWriteL('foo', 1, 3, f) == 3

handler = webserver.SequentialHandler()

def method(request):
if request.headers['Content-Length'] != '3':
sys.stderr.write('Did not get expected headers: %s\n' % str(request.headers))
request.send_response(400)
request.send_header('Content-Length', 0)
request.end_headers()
return

request.wfile.write('HTTP/1.1 100 Continue\r\n\r\n'.encode('ascii'))

content = request.rfile.read(3).decode('ascii')
if content != 'foo':
sys.stderr.write('Did not get expected content: %s\n' % content)
request.send_response(400)
request.send_header('Content-Length', 0)
request.end_headers()
return

request.send_response(200)
request.send_header('Content-Length', 0)
request.end_headers()

handler.add('PUT', '/s3_fake_bucket3/put_with_retry.bin', 502)
handler.add('PUT', '/s3_fake_bucket3/put_with_retry.bin', custom_method=method)

with webserver.install_http_handler(handler):
gdal.VSIFCloseL(f)


###############################################################################
# Test simple DELETE support with a fake AWS server

@@ -1517,7 +1565,61 @@ def method(request):
gdal.VSIFCloseL(f)
assert gdal.GetLastErrorMsg() != '', filename



###############################################################################
# Test multipart upload with retry logic


def test_vsis3_write_multipart_retry():

if gdaltest.webserver_port == 0:
pytest.skip()

with gdaltest.config_options({'GDAL_HTTP_MAX_RETRY': '2',
'GDAL_HTTP_RETRY_DELAY': '0.01'}):

with gdaltest.config_option('VSIS3_CHUNK_SIZE', '1'): # 1 MB
with webserver.install_http_handler(webserver.SequentialHandler()):
f = gdal.VSIFOpenL('/vsis3/s3_fake_bucket4/large_file.bin', 'wb')
assert f is not None
size = 1024 * 1024 + 1
big_buffer = 'a' * size

handler = webserver.SequentialHandler()

response = '<?xml version="1.0" encoding="UTF-8"?><InitiateMultipartUploadResult><UploadId>my_id</UploadId></InitiateMultipartUploadResult>'
handler.add('POST', '/s3_fake_bucket4/large_file.bin?uploads', 502)
handler.add('POST', '/s3_fake_bucket4/large_file.bin?uploads', 200,
{'Content-type': 'application/xml',
'Content-Length': len(response),
'Connection': 'close'},
response)

handler.add('PUT', '/s3_fake_bucket4/large_file.bin?partNumber=1&uploadId=my_id', 502)
handler.add('PUT', '/s3_fake_bucket4/large_file.bin?partNumber=1&uploadId=my_id', 200,
{'Content-Length': '0',
'ETag': '"first_etag"',
'Connection': 'close'}, {})

with webserver.install_http_handler(handler):
ret = gdal.VSIFWriteL(big_buffer, 1, size, f)
assert ret == size
handler = webserver.SequentialHandler()

handler.add('PUT', '/s3_fake_bucket4/large_file.bin?partNumber=2&uploadId=my_id', 200,
{'Content-Length': '0',
'ETag': '"second_etag"',
'Connection': 'close'}, {})

handler.add('POST', '/s3_fake_bucket4/large_file.bin?uploadId=my_id', 502)
handler.add('POST', '/s3_fake_bucket4/large_file.bin?uploadId=my_id', 200,
{'Content-Length': '0',
'Connection': 'close'}, {})

with webserver.install_http_handler(handler):
gdal.VSIFCloseL(f)


###############################################################################
# Test Mkdir() / Rmdir()

@@ -22,7 +22,7 @@ esac
$SCRIPT_DIR/../common_install.sh

# Build proj
(cd proj; ./autogen.sh && CFLAGS='-DPROJ_RENAME_SYMBOLS' CXXFLAGS='-DPROJ_RENAME_SYMBOLS' ./configure --prefix=/usr/local && make -j3 && sudo make -j3 install)
(cd proj; ./autogen.sh && CFLAGS='-DPROJ_RENAME_SYMBOLS' CXXFLAGS='-DPROJ_RENAME_SYMBOLS' ./configure --prefix=/usr/local && make -j3 && sudo make -j3 install && sudo mv /usr/local/lib/libproj.so.13.1.1 /usr/local/lib/libinternalproj.so.13.1.1 && sudo rm /usr/local/lib/libproj.so* && sudo rm /usr/local/lib/libproj.a && sudo rm /usr/local/lib/libproj.la && sudo ln -s libinternalproj.so.13.1.1 /usr/local/lib/libinternalproj.so.13 && sudo ln -s libinternalproj.so.13.1.1 /usr/local/lib/libinternalproj.so)

cd gdal
# --with-mongocxx=/usr/local
@@ -77,7 +77,7 @@ endif

default: $(OBJ:.o=.$(OBJ_EXT))

$(OBJ): cpl_vsi_virtual.h
$(OBJ): cpl_vsi_virtual.h cpl_vsil_curl_class.h

clean:
$(RM) *.o $(O_OBJ)
@@ -43,8 +43,13 @@
*/

/*! @cond Doxygen_Suppress */
#ifndef CPL_HTTP_MAX_RETRY
#define CPL_HTTP_MAX_RETRY 0
#endif

#ifndef CPL_HTTP_RETRY_DELAY
#define CPL_HTTP_RETRY_DELAY 30.0
#endif
/*! @endcond */

CPL_C_START
@@ -653,8 +653,17 @@ bool VSIAzureWriteHandle::SendInternal(bool bInitOnly, bool bIsLastBlock)
const bool bSingleBlock = bIsLastBlock &&
( m_nCurOffset <= static_cast<vsi_l_offset>(m_nBufferSize) );

for( int i = 0; i < 2; i++ )
const int nMaxRetry = atoi(CPLGetConfigOption("GDAL_HTTP_MAX_RETRY",
CPLSPrintf("%d",CPL_HTTP_MAX_RETRY)));
double dfRetryDelay = CPLAtof(CPLGetConfigOption("GDAL_HTTP_RETRY_DELAY",
CPLSPrintf("%f", CPL_HTTP_RETRY_DELAY)));
int nRetryCount = 0;
bool bHasAlreadyHandled409 = false;
bool bRetry;
do
{
bRetry = false;

m_nBufferOffReadCallback = 0;
CURL* hCurlHandle = curl_easy_init();

@@ -708,6 +717,12 @@ bool VSIAzureWriteHandle::SendInternal(bool bInitOnly, bool bIsLastBlock)
curl_easy_setopt(hCurlHandle, CURLOPT_WRITEFUNCTION,
VSICurlHandleWriteFunc);

WriteFuncStruct sWriteFuncHeaderData;
VSICURLInitWriteFuncStruct(&sWriteFuncHeaderData, nullptr, nullptr, nullptr);
curl_easy_setopt(hCurlHandle, CURLOPT_HEADERDATA, &sWriteFuncHeaderData);
curl_easy_setopt(hCurlHandle, CURLOPT_HEADERFUNCTION,
VSICurlHandleWriteFunc);

MultiPerform(m_poFS->GetCurlMultiHandleFor(m_poHandleHelper->GetURL()),
hCurlHandle);

@@ -716,9 +731,9 @@ bool VSIAzureWriteHandle::SendInternal(bool bInitOnly, bool bIsLastBlock)
long response_code = 0;
curl_easy_getinfo(hCurlHandle, CURLINFO_HTTP_CODE, &response_code);

bool bRetry = false;
if( i == 0 && response_code == 409 )
if( !bHasAlreadyHandled409 && response_code == 409 )
{
bHasAlreadyHandled409 = true;
CPLDebug(cpl::down_cast<VSIAzureFSHandler*>(m_poFS)->GetDebugKey(),
"%s",
sWriteFuncData.pBuffer
@@ -735,28 +750,47 @@ bool VSIAzureWriteHandle::SendInternal(bool bInitOnly, bool bIsLastBlock)
}
else if( response_code != 201 )
{
CPLDebug(cpl::down_cast<VSIAzureFSHandler*>(m_poFS)->GetDebugKey(),
"%s",
sWriteFuncData.pBuffer
? sWriteFuncData.pBuffer
: "(null)");
CPLError(CE_Failure, CPLE_AppDefined,
"PUT of %s failed",
m_osFilename.c_str());
bSuccess = false;
// Look if we should attempt a retry
const double dfNewRetryDelay = CPLHTTPGetNewRetryDelay(
static_cast<int>(response_code), dfRetryDelay,
sWriteFuncHeaderData.pBuffer);
if( dfNewRetryDelay > 0 &&
nRetryCount < nMaxRetry )
{
CPLError(CE_Warning, CPLE_AppDefined,
"HTTP error code: %d - %s. "
"Retrying again in %.1f secs",
static_cast<int>(response_code),
m_poHandleHelper->GetURL().c_str(),
dfRetryDelay);
CPLSleep(dfRetryDelay);
dfRetryDelay = dfNewRetryDelay;
nRetryCount++;
bRetry = true;
}
else
{
CPLDebug(cpl::down_cast<VSIAzureFSHandler*>(m_poFS)->GetDebugKey(),
"%s",
sWriteFuncData.pBuffer
? sWriteFuncData.pBuffer
: "(null)");
CPLError(CE_Failure, CPLE_AppDefined,
"PUT of %s failed",
m_osFilename.c_str());
bSuccess = false;
}
}
else
{
InvalidateParentDirectory();
}

CPLFree(sWriteFuncData.pBuffer);
CPLFree(sWriteFuncHeaderData.pBuffer);

curl_easy_cleanup(hCurlHandle);

if( !bRetry )
break;
}
} while( bRetry );

return bSuccess;
}
Oops, something went wrong.

0 comments on commit 1d799cf

Please sign in to comment.