Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 86 additions & 0 deletions _test_unstructured_client/integration/test_decorators.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
import httpx
import json
import pytest
import requests
from deepdiff import DeepDiff
from httpx import Response

from requests_toolbelt.multipart.decoder import MultipartDecoder # type: ignore

from unstructured_client import UnstructuredClient
from unstructured_client.models import shared, operations
from unstructured_client.models.errors import HTTPValidationError
from unstructured_client.utils.retries import BackoffStrategy, RetryConfig
from unstructured_client._hooks.custom import form_utils
from unstructured_client._hooks.custom import split_pdf_hook

FAKE_KEY = "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa"

Expand Down Expand Up @@ -275,3 +284,80 @@ def test_integration_split_pdf_strict_mode(
],
)
assert len(diff) == 0


@pytest.mark.asyncio
async def test_split_pdf_requests_do_retry(monkeypatch):
"""
Test that when we split a pdf, the split requests will honor retryable errors.
"""
number_of_split_502s = 2
number_of_last_page_502s = 2

async def mock_send(_, request):
"""
Return a predefined number of 502s for requests with certain starting_page_number values.

This is because N-1 splits are sent off in the hook logic. These need explicit retry handling.
The final split is returned to the SDK and gets the built in retry code.

We want to make sure both code paths are retried.
"""
request_body = request.read()
decoded_body = MultipartDecoder(request_body, request.headers.get("Content-Type"))
form_data = form_utils.parse_form_data(decoded_body)

nonlocal number_of_split_502s
nonlocal number_of_last_page_502s

if number_of_split_502s > 0:
if "starting_page_number" in form_data and int(form_data["starting_page_number"]) < 3:
number_of_split_502s -= 1
return Response(502, request=request)

if number_of_last_page_502s > 0:
if "starting_page_number" in form_data and int(form_data["starting_page_number"]) > 12:
number_of_last_page_502s -= 1
return Response(502, request=request)

mock_return_data = [{
"type": "Title",
"text": "Hello",
}]

return Response(
200,
request=request,
content=json.dumps(mock_return_data),
headers={"Content-Type": "application/json"},
)

monkeypatch.setattr(split_pdf_hook.httpx.AsyncClient, "send", mock_send)

sdk = UnstructuredClient(
api_key_auth=FAKE_KEY,
server_url="localhost:8000",
retry_config=RetryConfig("backoff", BackoffStrategy(200, 1000, 1.5, 1000), False),
)

filename = "_sample_docs/layout-parser-paper.pdf"
with open(filename, "rb") as f:
files = shared.Files(
content=f.read(),
file_name=filename,
)

req = operations.PartitionRequest(
shared.PartitionParameters(
files=files,
split_pdf_page=True,
split_pdf_allow_failed=False,
strategy="fast",
)
)

res = await sdk.general.partition_async(request=req)

assert number_of_split_502s == 0
assert number_of_last_page_502s == 0
assert res.status_code == 200
35 changes: 34 additions & 1 deletion src/unstructured_client/_hooks/custom/request_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
PARTITION_FORM_STARTING_PAGE_NUMBER_KEY,
FormData,
)
import unstructured_client.utils as utils

logger = logging.getLogger(UNSTRUCTURED_CLIENT_LOGGER_NAME)

Expand Down Expand Up @@ -69,10 +70,42 @@ async def call_api_async(
)

async with limiter:
response = await client.send(new_request)
response = await send_request_async_with_retries(client, new_request)
return response


async def send_request_async_with_retries(client: httpx.AsyncClient, request: httpx.Request):
# Hardcode the retry config until we can
# properly reuse the SDK logic
# (Values are in ms)
retry_config = utils.RetryConfig(
"backoff",
utils.BackoffStrategy(
initial_interval=2000,
max_interval=60000,
exponent=1.5,
max_elapsed_time=1000 * 60 * 5 # 5 minutes
),
retry_connection_errors=True
)

retryable_codes = [
"502",
"503",
"504"
]

async def do_request():
return await client.send(request)

response = await utils.retry_async(
do_request,
utils.Retries(retry_config, retryable_codes)
)

return response


def prepare_request_headers(
headers: httpx.Headers,
) -> httpx.Headers:
Expand Down
6 changes: 4 additions & 2 deletions src/unstructured_client/_hooks/custom/split_pdf_hook.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,6 @@ def before_request(
form_data = form_utils.parse_form_data(decoded_body)
split_pdf_page = form_data.get(PARTITION_FORM_SPLIT_PDF_PAGE_KEY)
if split_pdf_page is None or split_pdf_page == "false":
logger.info("Partitioning without split.")
return request

logger.info("Preparing to split document for partition.")
Expand Down Expand Up @@ -287,6 +286,8 @@ async def call_api_partial(page):
# `before_request` method needs to return a request so we skip sending the last page in parallel
# and return that last page at the end of this method

# Need to make sure the final page does not trigger splitting again
form_data[PARTITION_FORM_SPLIT_PDF_PAGE_KEY] = "false"
body = request_utils.create_request_body(
form_data, last_page_content, file.file_name, last_page_number
)
Expand Down Expand Up @@ -413,12 +414,13 @@ def after_error(
If requests were run in parallel, and at least one was successful, a combined
response object; otherwise, the original response and exception.
"""
operation_id = hook_ctx.operation_id

# if fails are disallowed - return response and error objects immediately
if not self.allow_failed:
self._clear_operation(operation_id)
return (response, error)

operation_id = hook_ctx.operation_id
# We know that this request failed so we pass a failed or empty response to `_await_elements` method
# where it checks if at least on of the other requests succeeded
elements = self._await_elements(operation_id, response or httpx.Response(status_code=200))
Expand Down