From cede633189ff33e236cc77dc83484f26231ee9f9 Mon Sep 17 00:00:00 2001 From: Austin Walker Date: Thu, 22 Aug 2024 01:13:15 -0400 Subject: [PATCH 1/5] fix/ensure that split pdf requests are retried We discovered that when a pdf is split into smaller chunks, those requests are not being retried. Now that we have `allow_failed=False`, this results in the whole document failing as soon as any of the child requests hit a transient error. The fix is to reuse the `utils.Retry` logic that the main code path uses. Copying the retry config in the hook logic is not great, and we can work with Speakeasy to make the internal logic more modular. But for now, this will address the current failures while we work on a better implementation. Testing: See the added unit test. The existing retry logic works for the final split page, everything else needs to use the new logic. To test this, I mocked a response from the server to return 502 for a low `starting_page_number`, which we know will have to be handled by the hooks. --- .../integration/test_decorators.py | 75 +++++++++++++++++++ .../_hooks/custom/request_utils.py | 28 ++++++- .../_hooks/custom/split_pdf_hook.py | 6 +- 3 files changed, 106 insertions(+), 3 deletions(-) diff --git a/_test_unstructured_client/integration/test_decorators.py b/_test_unstructured_client/integration/test_decorators.py index dd181e5f..9f4086da 100644 --- a/_test_unstructured_client/integration/test_decorators.py +++ b/_test_unstructured_client/integration/test_decorators.py @@ -1,9 +1,18 @@ +import httpx +import json import pytest import requests from deepdiff import DeepDiff +from httpx import Response + +from requests_toolbelt.multipart.decoder import MultipartDecoder # type: ignore + from unstructured_client import UnstructuredClient from unstructured_client.models import shared, operations from unstructured_client.models.errors import HTTPValidationError +from unstructured_client.utils.retries import BackoffStrategy, RetryConfig +from unstructured_client._hooks.custom import form_utils +from unstructured_client._hooks.custom import split_pdf_hook FAKE_KEY = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" @@ -275,3 +284,69 @@ def test_integration_split_pdf_strict_mode( ], ) assert len(diff) == 0 + + +def test_split_pdf_requests_do_retry(monkeypatch): + """ + Test that when we split a pdf, the split requests will honor retryable errors. + """ + # This is in a list so we can pass a reference to the mock function + number_of_502s = [1] + + async def mock_send(_, request): + """ + Read the request form data, and find the starting_page_number param. + If the page number is lower than 3, return a single 502 error, and then return 200s + This is because the final split page uses the built in SDK retry logic, + and everything else is sent through send_request_async_with_retries. + + We want to test the latter. + """ + request_body = request.read() + decoded_body = MultipartDecoder(request_body, request.headers.get("Content-Type")) + form_data = form_utils.parse_form_data(decoded_body) + + if number_of_502s[0] > 0: + if "starting_page_number" in form_data and int(form_data["starting_page_number"]) < 3: + number_of_502s[0] -= 1 + return Response(502, request=request) + + mock_return_data = [{ + "type": "Title", + "text": "Hello", + }] + + return Response( + 200, + request=request, + content=json.dumps(mock_return_data), + ) + + monkeypatch.setattr(split_pdf_hook.httpx.AsyncClient, "send", mock_send) + + sdk = UnstructuredClient( + api_key_auth=FAKE_KEY, + server_url="localhost:8000", + retry_config=RetryConfig("backoff", BackoffStrategy(1, 10, 1.5, 30), False), + ) + + filename = "_sample_docs/layout-parser-paper.pdf" + with open(filename, "rb") as f: + files = shared.Files( + content=f.read(), + file_name=filename, + ) + + req = operations.PartitionRequest( + shared.PartitionParameters( + files=files, + split_pdf_page=True, + split_pdf_allow_failed=False, + strategy="fast", + ) + ) + + res = sdk.general.partition(req) + + assert number_of_502s[0] == 0 + assert res.status_code == 200 diff --git a/src/unstructured_client/_hooks/custom/request_utils.py b/src/unstructured_client/_hooks/custom/request_utils.py index 09592804..36a294f8 100644 --- a/src/unstructured_client/_hooks/custom/request_utils.py +++ b/src/unstructured_client/_hooks/custom/request_utils.py @@ -19,6 +19,7 @@ PARTITION_FORM_STARTING_PAGE_NUMBER_KEY, FormData, ) +import unstructured_client.utils as utils logger = logging.getLogger(UNSTRUCTURED_CLIENT_LOGGER_NAME) @@ -69,10 +70,35 @@ async def call_api_async( ) async with limiter: - response = await client.send(new_request) + response = await send_request_async_with_retries(client, new_request) return response +async def send_request_async_with_retries(client: httpx.AsyncClient, request: httpx.Request): + retry_config = utils.RetryConfig( + "backoff", + utils.BackoffStrategy(2000, 60000, 1.5, 900000), + True + ) + + retryable_codes = [ + "502", + "503", + "504" + ] + + async def do_request(): + return await client.send(request) + + logger.error("Ready to retry") + response = await utils.retry_async( + do_request, + utils.Retries(retry_config, retryable_codes) + ) + + return response + + def prepare_request_headers( headers: httpx.Headers, ) -> httpx.Headers: diff --git a/src/unstructured_client/_hooks/custom/split_pdf_hook.py b/src/unstructured_client/_hooks/custom/split_pdf_hook.py index 8627333f..15bf90d8 100644 --- a/src/unstructured_client/_hooks/custom/split_pdf_hook.py +++ b/src/unstructured_client/_hooks/custom/split_pdf_hook.py @@ -168,7 +168,6 @@ def before_request( form_data = form_utils.parse_form_data(decoded_body) split_pdf_page = form_data.get(PARTITION_FORM_SPLIT_PDF_PAGE_KEY) if split_pdf_page is None or split_pdf_page == "false": - logger.info("Partitioning without split.") return request logger.info("Preparing to split document for partition.") @@ -287,6 +286,8 @@ async def call_api_partial(page): # `before_request` method needs to return a request so we skip sending the last page in parallel # and return that last page at the end of this method + # Need to make sure the final page does not trigger splitting again + form_data[PARTITION_FORM_SPLIT_PDF_PAGE_KEY] = "false" body = request_utils.create_request_body( form_data, last_page_content, file.file_name, last_page_number ) @@ -413,12 +414,13 @@ def after_error( If requests were run in parallel, and at least one was successful, a combined response object; otherwise, the original response and exception. """ + operation_id = hook_ctx.operation_id # if fails are disallowed - return response and error objects immediately if not self.allow_failed: + self._clear_operation(operation_id) return (response, error) - operation_id = hook_ctx.operation_id # We know that this request failed so we pass a failed or empty response to `_await_elements` method # where it checks if at least on of the other requests succeeded elements = self._await_elements(operation_id, response or httpx.Response(status_code=200)) From 583408b2ffa4fdfa4f5a93bf74f8831b9ee86e8a Mon Sep 17 00:00:00 2001 From: Austin Walker Date: Thu, 22 Aug 2024 01:27:07 -0400 Subject: [PATCH 2/5] Remove debug log --- src/unstructured_client/_hooks/custom/request_utils.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/unstructured_client/_hooks/custom/request_utils.py b/src/unstructured_client/_hooks/custom/request_utils.py index 36a294f8..b527f35c 100644 --- a/src/unstructured_client/_hooks/custom/request_utils.py +++ b/src/unstructured_client/_hooks/custom/request_utils.py @@ -90,7 +90,6 @@ async def send_request_async_with_retries(client: httpx.AsyncClient, request: ht async def do_request(): return await client.send(request) - logger.error("Ready to retry") response = await utils.retry_async( do_request, utils.Retries(retry_config, retryable_codes) From e28cdedf35248fbb47066749dcc8e44a633ff562 Mon Sep 17 00:00:00 2001 From: Austin Walker Date: Thu, 22 Aug 2024 09:26:41 -0400 Subject: [PATCH 3/5] Use nonlocal 502 counter --- .../integration/test_decorators.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/_test_unstructured_client/integration/test_decorators.py b/_test_unstructured_client/integration/test_decorators.py index 9f4086da..49646707 100644 --- a/_test_unstructured_client/integration/test_decorators.py +++ b/_test_unstructured_client/integration/test_decorators.py @@ -290,8 +290,7 @@ def test_split_pdf_requests_do_retry(monkeypatch): """ Test that when we split a pdf, the split requests will honor retryable errors. """ - # This is in a list so we can pass a reference to the mock function - number_of_502s = [1] + number_of_502s = 1 async def mock_send(_, request): """ @@ -306,9 +305,10 @@ async def mock_send(_, request): decoded_body = MultipartDecoder(request_body, request.headers.get("Content-Type")) form_data = form_utils.parse_form_data(decoded_body) - if number_of_502s[0] > 0: + nonlocal number_of_502s + if number_of_502s > 0: if "starting_page_number" in form_data and int(form_data["starting_page_number"]) < 3: - number_of_502s[0] -= 1 + number_of_502s -= 1 return Response(502, request=request) mock_return_data = [{ @@ -348,5 +348,5 @@ async def mock_send(_, request): res = sdk.general.partition(req) - assert number_of_502s[0] == 0 + assert number_of_502s == 0 assert res.status_code == 200 From f18f2b4f418b1236b0ae54e14efe1a19f54b7a60 Mon Sep 17 00:00:00 2001 From: Austin Walker Date: Thu, 22 Aug 2024 09:31:21 -0400 Subject: [PATCH 4/5] Add better comments for new retry config --- .../_hooks/custom/request_utils.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/src/unstructured_client/_hooks/custom/request_utils.py b/src/unstructured_client/_hooks/custom/request_utils.py index b527f35c..ad282080 100644 --- a/src/unstructured_client/_hooks/custom/request_utils.py +++ b/src/unstructured_client/_hooks/custom/request_utils.py @@ -75,10 +75,18 @@ async def call_api_async( async def send_request_async_with_retries(client: httpx.AsyncClient, request: httpx.Request): + # Hardcode the retry config until we can + # properly reuse the SDK logic + # (Values are in ms) retry_config = utils.RetryConfig( "backoff", - utils.BackoffStrategy(2000, 60000, 1.5, 900000), - True + utils.BackoffStrategy( + initial_interval=2000, + max_interval=60000, + exponent=1.5, + max_elapsed_time=1000 * 60 * 5 # 5 minutes + ), + retry_connection_errors=True ) retryable_codes = [ From 5350dba737c313316b999909bb5422a43ae9687a Mon Sep 17 00:00:00 2001 From: Austin Walker Date: Thu, 22 Aug 2024 10:17:26 -0400 Subject: [PATCH 5/5] Add last page retries to mocked test --- .../integration/test_decorators.py | 37 ++++++++++++------- 1 file changed, 24 insertions(+), 13 deletions(-) diff --git a/_test_unstructured_client/integration/test_decorators.py b/_test_unstructured_client/integration/test_decorators.py index 49646707..4b47a948 100644 --- a/_test_unstructured_client/integration/test_decorators.py +++ b/_test_unstructured_client/integration/test_decorators.py @@ -286,29 +286,38 @@ def test_integration_split_pdf_strict_mode( assert len(diff) == 0 -def test_split_pdf_requests_do_retry(monkeypatch): +@pytest.mark.asyncio +async def test_split_pdf_requests_do_retry(monkeypatch): """ Test that when we split a pdf, the split requests will honor retryable errors. """ - number_of_502s = 1 + number_of_split_502s = 2 + number_of_last_page_502s = 2 async def mock_send(_, request): """ - Read the request form data, and find the starting_page_number param. - If the page number is lower than 3, return a single 502 error, and then return 200s - This is because the final split page uses the built in SDK retry logic, - and everything else is sent through send_request_async_with_retries. + Return a predefined number of 502s for requests with certain starting_page_number values. - We want to test the latter. + This is because N-1 splits are sent off in the hook logic. These need explicit retry handling. + The final split is returned to the SDK and gets the built in retry code. + + We want to make sure both code paths are retried. """ request_body = request.read() decoded_body = MultipartDecoder(request_body, request.headers.get("Content-Type")) form_data = form_utils.parse_form_data(decoded_body) - nonlocal number_of_502s - if number_of_502s > 0: + nonlocal number_of_split_502s + nonlocal number_of_last_page_502s + + if number_of_split_502s > 0: if "starting_page_number" in form_data and int(form_data["starting_page_number"]) < 3: - number_of_502s -= 1 + number_of_split_502s -= 1 + return Response(502, request=request) + + if number_of_last_page_502s > 0: + if "starting_page_number" in form_data and int(form_data["starting_page_number"]) > 12: + number_of_last_page_502s -= 1 return Response(502, request=request) mock_return_data = [{ @@ -320,6 +329,7 @@ async def mock_send(_, request): 200, request=request, content=json.dumps(mock_return_data), + headers={"Content-Type": "application/json"}, ) monkeypatch.setattr(split_pdf_hook.httpx.AsyncClient, "send", mock_send) @@ -327,7 +337,7 @@ async def mock_send(_, request): sdk = UnstructuredClient( api_key_auth=FAKE_KEY, server_url="localhost:8000", - retry_config=RetryConfig("backoff", BackoffStrategy(1, 10, 1.5, 30), False), + retry_config=RetryConfig("backoff", BackoffStrategy(200, 1000, 1.5, 1000), False), ) filename = "_sample_docs/layout-parser-paper.pdf" @@ -346,7 +356,8 @@ async def mock_send(_, request): ) ) - res = sdk.general.partition(req) + res = await sdk.general.partition_async(request=req) - assert number_of_502s == 0 + assert number_of_split_502s == 0 + assert number_of_last_page_502s == 0 assert res.status_code == 200