diff --git a/_test_unstructured_client/integration/test_decorators.py b/_test_unstructured_client/integration/test_decorators.py index dd181e5f..4b47a948 100644 --- a/_test_unstructured_client/integration/test_decorators.py +++ b/_test_unstructured_client/integration/test_decorators.py @@ -1,9 +1,18 @@ +import httpx +import json import pytest import requests from deepdiff import DeepDiff +from httpx import Response + +from requests_toolbelt.multipart.decoder import MultipartDecoder # type: ignore + from unstructured_client import UnstructuredClient from unstructured_client.models import shared, operations from unstructured_client.models.errors import HTTPValidationError +from unstructured_client.utils.retries import BackoffStrategy, RetryConfig +from unstructured_client._hooks.custom import form_utils +from unstructured_client._hooks.custom import split_pdf_hook FAKE_KEY = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" @@ -275,3 +284,80 @@ def test_integration_split_pdf_strict_mode( ], ) assert len(diff) == 0 + + +@pytest.mark.asyncio +async def test_split_pdf_requests_do_retry(monkeypatch): + """ + Test that when we split a pdf, the split requests will honor retryable errors. + """ + number_of_split_502s = 2 + number_of_last_page_502s = 2 + + async def mock_send(_, request): + """ + Return a predefined number of 502s for requests with certain starting_page_number values. + + This is because N-1 splits are sent off in the hook logic. These need explicit retry handling. + The final split is returned to the SDK and gets the built in retry code. + + We want to make sure both code paths are retried. + """ + request_body = request.read() + decoded_body = MultipartDecoder(request_body, request.headers.get("Content-Type")) + form_data = form_utils.parse_form_data(decoded_body) + + nonlocal number_of_split_502s + nonlocal number_of_last_page_502s + + if number_of_split_502s > 0: + if "starting_page_number" in form_data and int(form_data["starting_page_number"]) < 3: + number_of_split_502s -= 1 + return Response(502, request=request) + + if number_of_last_page_502s > 0: + if "starting_page_number" in form_data and int(form_data["starting_page_number"]) > 12: + number_of_last_page_502s -= 1 + return Response(502, request=request) + + mock_return_data = [{ + "type": "Title", + "text": "Hello", + }] + + return Response( + 200, + request=request, + content=json.dumps(mock_return_data), + headers={"Content-Type": "application/json"}, + ) + + monkeypatch.setattr(split_pdf_hook.httpx.AsyncClient, "send", mock_send) + + sdk = UnstructuredClient( + api_key_auth=FAKE_KEY, + server_url="localhost:8000", + retry_config=RetryConfig("backoff", BackoffStrategy(200, 1000, 1.5, 1000), False), + ) + + filename = "_sample_docs/layout-parser-paper.pdf" + with open(filename, "rb") as f: + files = shared.Files( + content=f.read(), + file_name=filename, + ) + + req = operations.PartitionRequest( + shared.PartitionParameters( + files=files, + split_pdf_page=True, + split_pdf_allow_failed=False, + strategy="fast", + ) + ) + + res = await sdk.general.partition_async(request=req) + + assert number_of_split_502s == 0 + assert number_of_last_page_502s == 0 + assert res.status_code == 200 diff --git a/src/unstructured_client/_hooks/custom/request_utils.py b/src/unstructured_client/_hooks/custom/request_utils.py index 09592804..ad282080 100644 --- a/src/unstructured_client/_hooks/custom/request_utils.py +++ b/src/unstructured_client/_hooks/custom/request_utils.py @@ -19,6 +19,7 @@ PARTITION_FORM_STARTING_PAGE_NUMBER_KEY, FormData, ) +import unstructured_client.utils as utils logger = logging.getLogger(UNSTRUCTURED_CLIENT_LOGGER_NAME) @@ -69,10 +70,42 @@ async def call_api_async( ) async with limiter: - response = await client.send(new_request) + response = await send_request_async_with_retries(client, new_request) return response +async def send_request_async_with_retries(client: httpx.AsyncClient, request: httpx.Request): + # Hardcode the retry config until we can + # properly reuse the SDK logic + # (Values are in ms) + retry_config = utils.RetryConfig( + "backoff", + utils.BackoffStrategy( + initial_interval=2000, + max_interval=60000, + exponent=1.5, + max_elapsed_time=1000 * 60 * 5 # 5 minutes + ), + retry_connection_errors=True + ) + + retryable_codes = [ + "502", + "503", + "504" + ] + + async def do_request(): + return await client.send(request) + + response = await utils.retry_async( + do_request, + utils.Retries(retry_config, retryable_codes) + ) + + return response + + def prepare_request_headers( headers: httpx.Headers, ) -> httpx.Headers: diff --git a/src/unstructured_client/_hooks/custom/split_pdf_hook.py b/src/unstructured_client/_hooks/custom/split_pdf_hook.py index 8627333f..15bf90d8 100644 --- a/src/unstructured_client/_hooks/custom/split_pdf_hook.py +++ b/src/unstructured_client/_hooks/custom/split_pdf_hook.py @@ -168,7 +168,6 @@ def before_request( form_data = form_utils.parse_form_data(decoded_body) split_pdf_page = form_data.get(PARTITION_FORM_SPLIT_PDF_PAGE_KEY) if split_pdf_page is None or split_pdf_page == "false": - logger.info("Partitioning without split.") return request logger.info("Preparing to split document for partition.") @@ -287,6 +286,8 @@ async def call_api_partial(page): # `before_request` method needs to return a request so we skip sending the last page in parallel # and return that last page at the end of this method + # Need to make sure the final page does not trigger splitting again + form_data[PARTITION_FORM_SPLIT_PDF_PAGE_KEY] = "false" body = request_utils.create_request_body( form_data, last_page_content, file.file_name, last_page_number ) @@ -413,12 +414,13 @@ def after_error( If requests were run in parallel, and at least one was successful, a combined response object; otherwise, the original response and exception. """ + operation_id = hook_ctx.operation_id # if fails are disallowed - return response and error objects immediately if not self.allow_failed: + self._clear_operation(operation_id) return (response, error) - operation_id = hook_ctx.operation_id # We know that this request failed so we pass a failed or empty response to `_await_elements` method # where it checks if at least on of the other requests succeeded elements = self._await_elements(operation_id, response or httpx.Response(status_code=200))