From 022fef0dccb9e14d33c45df9f848df96885c8eeb Mon Sep 17 00:00:00 2001 From: Emily Voss Date: Thu, 30 Apr 2026 16:37:41 -0700 Subject: [PATCH 01/10] fix(DTPL-364): Fix pdf_split_hook issues --- CHANGELOG.md | 10 + .../unit/test_split_pdf_hook.py | 362 +++++++++++++++++- .../_hooks/custom/split_pdf_hook.py | 318 ++++++++++----- src/unstructured_client/_hooks/sdkhooks.py | 63 ++- src/unstructured_client/_hooks/types.py | 47 ++- src/unstructured_client/_version.py | 4 +- src/unstructured_client/basesdk.py | 61 ++- 7 files changed, 749 insertions(+), 116 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index e2ed8167..c9662250 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,13 @@ +## 0.43.4 + +### Enhancements + +### Features + +### Fixes +* Route split-PDF `partition_async()` result collection through awaited async hook dispatch instead of creating a nested event loop in a worker thread. +* Add cancellation cleanup for in-flight split-PDF chunk tasks and preserve existing sync `partition()` split-PDF behavior with lazy executor creation. + ## 0.43.2 ### Enhancements diff --git a/_test_unstructured_client/unit/test_split_pdf_hook.py b/_test_unstructured_client/unit/test_split_pdf_hook.py index a05e1a9d..30651214 100644 --- a/_test_unstructured_client/unit/test_split_pdf_hook.py +++ b/_test_unstructured_client/unit/test_split_pdf_hook.py @@ -3,6 +3,7 @@ import asyncio import io import logging +import threading from asyncio import Task from collections import Counter from concurrent import futures @@ -15,7 +16,6 @@ import requests from requests_toolbelt import MultipartDecoder -from _test_unstructured_client.unit_utils import sample_docs_path from unstructured_client._hooks.custom import form_utils, pdf_utils, request_utils from unstructured_client._hooks.custom.form_utils import ( FormData, @@ -36,8 +36,11 @@ get_optimal_split_size, run_tasks, ) +from unstructured_client._hooks.sdkhooks import SDKHooks from unstructured_client._hooks.types import AfterErrorContext, AfterSuccessContext, BeforeRequestContext +from unstructured_client.basesdk import BaseSDK from unstructured_client.models import shared +from unstructured_client.sdkconfiguration import SDKConfiguration from unstructured_client.types import UNSET from unstructured_client.utils import BackoffStrategy, RetryConfig @@ -671,6 +674,81 @@ def _make_hook_with_split_request( return hook, hook_ctx, result +def _make_sdk_hook_context(): + hook_ctx = MagicMock() + hook_ctx.config = MagicMock() + hook_ctx.base_url = "http://localhost:8888" + hook_ctx.operation_id = "partition" + hook_ctx.oauth2_scopes = None + hook_ctx.security_source = None + return hook_ctx + + +class _BlockingAsyncClient: + def __init__(self) -> None: + self.started = asyncio.Event() + + async def send(self, request: httpx.Request, stream=False) -> httpx.Response: + del stream + self.started.set() + await asyncio.Event().wait() + return httpx.Response(status_code=200, request=request) + + +@pytest.mark.asyncio +async def test_unit_do_request_async_cancellation_after_before_request_cleans_split_state(): + operation_id = "cancelled-operation" + split_hook = SplitPdfHook() + split_hook.coroutines_to_execute[operation_id] = [MagicMock()] + split_hook.pending_operation_ids[operation_id] = operation_id + tempdir = MagicMock() + split_hook.tempdirs[operation_id] = tempdir + + def _prepared_split_request(hook_ctx, request): + del hook_ctx, request + return httpx.Request( + "GET", + "http://localhost:8888/general/docs", + headers={"operation_id": operation_id}, + extensions={"split_pdf_operation_id": operation_id}, + ) + + split_hook.before_request = _prepared_split_request # type: ignore[method-assign] + hooks = SDKHooks() + hooks.before_request_hooks = [split_hook] + hooks.after_error_hooks = [split_hook] + hooks.after_success_hooks = [split_hook] + + client = _BlockingAsyncClient() + config = SDKConfiguration( + client=None, + client_supplied=False, + async_client=client, # type: ignore[arg-type] + async_client_supplied=True, + debug_logger=logging.getLogger("test"), + ) + config.__dict__["_hooks"] = hooks + sdk = BaseSDK(config) + task = asyncio.create_task( + sdk.do_request_async( + _make_sdk_hook_context(), + httpx.Request("POST", "http://localhost:8888/general/v0/general"), + error_status_codes=[], + ) + ) + + await client.started.wait() + task.cancel() + + with pytest.raises(asyncio.CancelledError): + await task + + assert operation_id not in split_hook.coroutines_to_execute + assert operation_id not in split_hook.pending_operation_ids + assert operation_id not in split_hook.tempdirs + tempdir.cleanup.assert_called_once() + + def test_before_request_returns_dummy_with_timeout_and_operation_id(): hook, mock_hook_ctx, result = _make_hook_with_split_request() operation_id = result.headers["operation_id"] @@ -703,7 +781,7 @@ def test_after_error_cleans_up_split_state(): hook, mock_hook_ctx, result = _make_hook_with_split_request() operation_id = result.headers["operation_id"] - assert operation_id in hook.executors + assert operation_id not in hook.executors assert operation_id in hook.coroutines_to_execute error_ctx = MagicMock(spec=AfterErrorContext) @@ -744,17 +822,15 @@ async def test_unit_run_tasks_allow_failed_transport_exception(): @pytest.mark.asyncio -async def test_unit_run_tasks_allow_failed_cancelled_error_treated_as_failure(): +async def test_unit_run_tasks_allow_failed_cancelled_error_propagates(): tasks = [ partial(_slow_success_request, content="1"), partial(_cancelled_request), partial(_slow_success_request, content="3"), ] - responses = await run_tasks(tasks, allow_failed=True) - - assert [response.status_code for _, response in responses] == [200, 500, 200] - assert isinstance(responses[1][1].extensions["transport_exception"], asyncio.CancelledError) + with pytest.raises(asyncio.CancelledError): + await run_tasks(tasks, allow_failed=True) @pytest.mark.asyncio @@ -845,11 +921,123 @@ def test_unit_after_error_cleans_only_matching_operation_on_transport_failure(): assert first_operation_id not in hook.executors assert first_operation_id not in hook.coroutines_to_execute assert first_operation_id not in hook.pending_operation_ids - assert second_operation_id in hook.executors + assert second_operation_id not in hook.executors assert second_operation_id in hook.coroutines_to_execute assert second_operation_id in hook.pending_operation_ids +@pytest.mark.asyncio +async def test_unit_sdk_hooks_after_success_async_uses_optional_async_hook(): + calls: list[str] = [] + + class AsyncHook: + async def after_success_async(self, hook_ctx, response): + calls.append("async") + response.headers["X-Async-Hook"] = "called" + return response + + class SyncHook: + def after_success(self, hook_ctx, response): + calls.append("sync") + response.headers["X-Sync-Hook"] = "called" + return response + + hooks = SDKHooks() + hooks.after_success_hooks = [AsyncHook(), SyncHook()] # type: ignore[list-item] + response = httpx.Response(status_code=200) + + returned_response = await hooks.after_success_async(MagicMock(spec=AfterSuccessContext), response) + + assert returned_response is response + assert calls == ["async", "sync"] + assert response.headers["X-Async-Hook"] == "called" + assert response.headers["X-Sync-Hook"] == "called" + + +@pytest.mark.asyncio +async def test_unit_sdk_hooks_after_success_async_ignores_sync_method_with_async_name(): + calls: list[str] = [] + + class SyncHook: + def after_success_async(self, hook_ctx, response): + calls.append("non-awaitable") + return response + + def after_success(self, hook_ctx, response): + calls.append("sync") + response.headers["X-Sync-Hook"] = "called" + return response + + hooks = SDKHooks() + hooks.after_success_hooks = [SyncHook()] # type: ignore[list-item] + response = httpx.Response(status_code=200) + + returned_response = await hooks.after_success_async(MagicMock(spec=AfterSuccessContext), response) + + assert returned_response is response + assert calls == ["sync"] + assert response.headers["X-Sync-Hook"] == "called" + + +@pytest.mark.asyncio +async def test_unit_sdk_hooks_after_error_async_uses_optional_async_hook(): + calls: list[str] = [] + + class AsyncHook: + async def after_error_async(self, hook_ctx, response, error): + calls.append("async") + response.headers["X-Async-Error-Hook"] = "called" + return response, error + + class SyncHook: + def after_error(self, hook_ctx, response, error): + calls.append("sync") + response.headers["X-Sync-Error-Hook"] = "called" + return response, error + + hooks = SDKHooks() + hooks.after_error_hooks = [AsyncHook(), SyncHook()] # type: ignore[list-item] + response = httpx.Response(status_code=500) + + returned_response, returned_error = await hooks.after_error_async( + MagicMock(spec=AfterErrorContext), + response, + None, + ) + + assert returned_response is response + assert returned_error is None + assert calls == ["async", "sync"] + assert response.headers["X-Async-Error-Hook"] == "called" + assert response.headers["X-Sync-Error-Hook"] == "called" + + +@pytest.mark.asyncio +async def test_unit_sdk_hooks_before_request_async_runs_sync_hooks_off_loop(): + loop_thread_id = threading.get_ident() + hook_thread_id = None + + class SyncHook: + def before_request(self, hook_ctx, request): + nonlocal hook_thread_id + hook_thread_id = threading.get_ident() + request.headers["X-Sync-Before-Hook"] = "called" + return request + + hooks = SDKHooks() + hooks.before_request_hooks = [SyncHook()] # type: ignore[list-item] + request = httpx.Request("GET", "http://localhost") + + returned_request = await hooks.before_request_async( + MagicMock(spec=BeforeRequestContext), + request, + ) + + assert returned_request is request + assert request.headers["X-Sync-Before-Hook"] == "called" + assert hook_thread_id != loop_thread_id + + def test_unit_before_request_uses_hook_ctx_timeout_when_request_timeout_missing(): hook, _, result = _make_hook_with_split_request( timeout_extension=None, @@ -900,6 +1088,162 @@ def test_unit_after_success_clears_on_await_elements_exception(): assert operation_id not in hook.pending_operation_ids +@pytest.mark.asyncio +async def test_unit_after_success_async_clears_on_await_elements_exception(): + hook, _, result = _make_hook_with_split_request() + operation_id = result.headers["operation_id"] + response = httpx.Response(status_code=200, request=result) + success_ctx = MagicMock(spec=AfterSuccessContext) + success_ctx.operation_id = "partition" + + with patch.object(hook, "_await_elements_async", side_effect=RuntimeError("boom")): + with pytest.raises(RuntimeError): + await hook.after_success_async(success_ctx, response) + + assert operation_id not in hook.executors + assert operation_id not in hook.coroutines_to_execute + assert operation_id not in hook.pending_operation_ids + + +@pytest.mark.asyncio +async def test_unit_after_success_async_collects_chunks_without_sync_executor(): + hook, _, result = _make_hook_with_split_request( + pdf_chunks=[(io.BytesIO(b"chunk-1"), 0), (io.BytesIO(b"chunk-2"), 2)], + ) + operation_id = result.headers["operation_id"] + response = httpx.Response(status_code=200, request=result) + success_ctx = MagicMock(spec=AfterSuccessContext) + success_ctx.operation_id = "partition" + + assert operation_id not in hook.executors + + with patch( + "unstructured_client._hooks.custom.request_utils.call_api_async", + new=AsyncMock( + side_effect=[ + _httpx_json_response([{"page_number": 1}]), + _httpx_json_response([{"page_number": 3}]), + ] + ), + ) as mock_call_api_async: + returned_response = await hook.after_success_async(success_ctx, response) + + assert returned_response.json() == [{"page_number": 1}, {"page_number": 3}] + assert mock_call_api_async.await_count == 2 + assert operation_id not in hook.executors + assert operation_id not in hook.coroutines_to_execute + assert operation_id not in hook.pending_operation_ids + + +@pytest.mark.asyncio +async def test_unit_after_success_async_cancels_pending_chunks_and_clears_state(): + hook, _, result = _make_hook_with_split_request( + pdf_chunks=[(io.BytesIO(b"chunk-1"), 0), (io.BytesIO(b"chunk-2"), 2)], + ) + operation_id = result.headers["operation_id"] + response = httpx.Response(status_code=200, request=result) + success_ctx = MagicMock(spec=AfterSuccessContext) + success_ctx.operation_id = "partition" + started = asyncio.Event() + started_counter = Counter() + cancelled_counter = Counter() + + async def _pending_request( + async_client: httpx.AsyncClient, + limiter: asyncio.Semaphore, + ) -> httpx.Response: + try: + started_counter.update(["started"]) + if started_counter["started"] == 2: + started.set() + await asyncio.Event().wait() + return _httpx_json_response([]) + except asyncio.CancelledError: + cancelled_counter.update(["cancelled"]) + raise + + hook.coroutines_to_execute[operation_id] = [partial(_pending_request)] * 2 + + task = asyncio.create_task(hook.after_success_async(success_ctx, response)) + await started.wait() + task.cancel() + + with pytest.raises(asyncio.CancelledError): + await task + + assert cancelled_counter["cancelled"] == 2 + assert operation_id not in hook.executors + assert operation_id not in hook.coroutines_to_execute + assert operation_id not in hook.pending_operation_ids + + +@pytest.mark.asyncio +async def test_unit_after_success_async_timeout_cancels_chunks_and_clears_state(): + hook, _, result = _make_hook_with_split_request( + pdf_chunks=[(io.BytesIO(b"chunk-1"), 0), (io.BytesIO(b"chunk-2"), 2)], + ) + operation_id = result.headers["operation_id"] + response = httpx.Response(status_code=200, request=result) + success_ctx = MagicMock(spec=AfterSuccessContext) + success_ctx.operation_id = "partition" + tempdir = MagicMock() + hook.tempdirs[operation_id] = tempdir + hook.operation_timeouts[operation_id] = 0.001 + cancelled_counter = Counter() + + async def _pending_request( + async_client: httpx.AsyncClient, + limiter: asyncio.Semaphore, + ) -> httpx.Response: + del async_client, limiter + try: + await asyncio.Event().wait() + return _httpx_json_response([]) + except asyncio.CancelledError: + cancelled_counter.update(["cancelled"]) + raise + + hook.coroutines_to_execute[operation_id] = [partial(_pending_request)] * 2 + + with patch("unstructured_client._hooks.custom.split_pdf_hook.TIMEOUT_BUFFER_SECONDS", 0): + with pytest.raises(TimeoutError): + await hook.after_success_async(success_ctx, response) + + assert cancelled_counter["cancelled"] == 2 + assert operation_id not in hook.coroutines_to_execute + assert operation_id not in hook.pending_operation_ids + assert operation_id not in hook.tempdirs + tempdir.cleanup.assert_called_once() + + +def test_unit_after_success_sync_lazily_creates_and_cleans_executor(): + hook, _, result = _make_hook_with_split_request( + pdf_chunks=[(io.BytesIO(b"chunk"), 0)], + ) + operation_id = result.headers["operation_id"] + response = httpx.Response(status_code=200, request=result) + success_ctx = MagicMock(spec=AfterSuccessContext) + success_ctx.operation_id = "partition" + fake_executor = MagicMock() + fake_future = MagicMock() + fake_future.done.return_value = True + fake_future.result.return_value = [(1, _httpx_json_response([{"page_number": 1}]))] + fake_executor.submit.return_value = fake_future + + with patch( + "unstructured_client._hooks.custom.split_pdf_hook.futures.ThreadPoolExecutor", + return_value=fake_executor, + ): + returned_response = hook.after_success(success_ctx, response) + + assert returned_response.json() == [{"page_number": 1}] + fake_executor.submit.assert_called_once() + fake_executor.shutdown.assert_called_once_with(wait=False, cancel_futures=True) + assert operation_id not in hook.executors + assert operation_id not in hook.coroutines_to_execute + assert operation_id not in hook.pending_operation_ids + + def test_unit_future_timeout_triggers_cleanup(caplog: pytest.LogCaptureFixture): caplog.set_level(logging.INFO, logger="unstructured-client") hook, _, result = _make_hook_with_split_request(pdf_chunks=[(io.BytesIO(b"chunk"), 0)]) @@ -1217,4 +1561,4 @@ def _chunk_paths_side_effect(*args, **kwargs): assert hook.cache_tmp_data_feature == {} assert hook.cache_tmp_data_dir == {} tempdir.cleanup.assert_called_once() - executor.shutdown.assert_called_once_with(wait=False, cancel_futures=True) \ No newline at end of file + executor.shutdown.assert_not_called() \ No newline at end of file diff --git a/src/unstructured_client/_hooks/custom/split_pdf_hook.py b/src/unstructured_client/_hooks/custom/split_pdf_hook.py index 6e29978a..703939fd 100644 --- a/src/unstructured_client/_hooks/custom/split_pdf_hook.py +++ b/src/unstructured_client/_hooks/custom/split_pdf_hook.py @@ -107,6 +107,8 @@ async def runner() -> list[tuple[int, httpx.Response]]: async def _order_keeper(index: int, coro: Awaitable) -> Tuple[int, httpx.Response]: try: response = await coro + except asyncio.CancelledError: + raise except BaseException as exc: raise ChunkExecutionError(index, exc) from exc return index, response @@ -151,84 +153,114 @@ async def run_tasks( async with httpx.AsyncClient(timeout=client_timeout) as client: armed_coroutines = [coro(async_client=client, limiter=limiter) for coro in coroutines] # type: ignore - if allow_failed: - responses = await asyncio.gather(*armed_coroutines, return_exceptions=True) - normalized_responses: list[tuple[int, httpx.Response]] = [] - for index, result in enumerate(responses, 1): - if isinstance(result, ChunkExecutionError): - logger.error( - "split_pdf event=chunk_transport_error operation_id=%s chunk_index=%d error_type=%s error=%s", - operation_id, - result.index, - type(result.inner).__name__, - result.inner, - exc_info=result.inner, - ) - normalized_responses.append( - ( - result.index, - _create_transport_error_response(result.inner), - ) - ) - elif isinstance(result, BaseException): - logger.error( - "split_pdf event=chunk_transport_error operation_id=%s chunk_index=%d error_type=%s error=%s", - operation_id, - index, - type(result).__name__, - result, - exc_info=result, - ) - normalized_responses.append((index, _create_transport_error_response(result))) - else: - normalized_responses.append((index, cast(httpx.Response, result))) - return normalized_responses - # TODO: replace with asyncio.TaskGroup for python >3.11 # pylint: disable=fixme - tasks = [asyncio.create_task(_order_keeper(index, coro)) - for index, coro in enumerate(armed_coroutines, 1)] - results = [] - remaining_tasks = dict(enumerate(tasks, 1)) - for future in asyncio.as_completed(tasks): - try: - index, response = await future - except ChunkExecutionError as exc: + tasks = [ + asyncio.create_task(_order_keeper(index, coro)) + for index, coro in enumerate(armed_coroutines, 1) + ] + try: + return await _collect_task_responses( + tasks, + allow_failed=allow_failed, + operation_id=operation_id, + ) + except asyncio.CancelledError: + for task in tasks: + task.cancel() + await asyncio.gather(*tasks, return_exceptions=True) + logger.warning( + "split_pdf event=batch_cancel_remaining operation_id=%s reason=caller_cancelled remaining_tasks=%d", + operation_id, + sum(1 for task in tasks if not task.done()), + ) + raise + + +async def _collect_task_responses( + tasks: list[asyncio.Task[Tuple[int, httpx.Response]]], + *, + allow_failed: bool, + operation_id: Optional[str], +) -> list[tuple[int, httpx.Response]]: + if allow_failed: + responses = await asyncio.gather(*tasks, return_exceptions=True) + normalized_responses: list[tuple[int, httpx.Response]] = [] + for index, result in enumerate(responses, 1): + if isinstance(result, ChunkExecutionError): logger.error( "split_pdf event=chunk_transport_error operation_id=%s chunk_index=%d error_type=%s error=%s", operation_id, - exc.index, - type(exc.inner).__name__, - exc.inner, - exc_info=exc.inner, + result.index, + type(result.inner).__name__, + result.inner, + exc_info=result.inner, ) - for remaining_task in remaining_tasks.values(): - remaining_task.cancel() - logger.warning( - "split_pdf event=batch_cancel_remaining operation_id=%s reason=transport_exception failed_chunk_index=%d remaining_tasks=%d", - operation_id, - exc.index, - len(remaining_tasks), + normalized_responses.append( + ( + result.index, + _create_transport_error_response(result.inner), + ) ) - if isinstance(exc.inner, Exception): - raise exc.inner - raise RuntimeError("Split PDF chunk cancelled") from exc.inner - if response.status_code != 200: - # cancel all remaining tasks - for remaining_task in remaining_tasks.values(): - remaining_task.cancel() - logger.warning( - "split_pdf event=batch_cancel_remaining operation_id=%s reason=http_error failed_chunk_index=%d status_code=%d remaining_tasks=%d", + elif isinstance(result, asyncio.CancelledError): + raise result + elif isinstance(result, BaseException): + logger.error( + "split_pdf event=chunk_transport_error operation_id=%s chunk_index=%d error_type=%s error=%s", operation_id, index, - response.status_code, - len(remaining_tasks), + type(result).__name__, + result, + exc_info=result, ) - results.append((index, response)) - break + normalized_responses.append((index, _create_transport_error_response(result))) + else: + normalized_responses.append(cast(tuple[int, httpx.Response], result)) + return normalized_responses + + results = [] + remaining_tasks = dict(enumerate(tasks, 1)) + for future in asyncio.as_completed(tasks): + try: + index, response = await future + except ChunkExecutionError as exc: + logger.error( + "split_pdf event=chunk_transport_error operation_id=%s chunk_index=%d error_type=%s error=%s", + operation_id, + exc.index, + type(exc.inner).__name__, + exc.inner, + exc_info=exc.inner, + ) + for remaining_task in remaining_tasks.values(): + remaining_task.cancel() + logger.warning( + "split_pdf event=batch_cancel_remaining operation_id=%s reason=transport_exception failed_chunk_index=%d remaining_tasks=%d", + operation_id, + exc.index, + len(remaining_tasks), + ) + await asyncio.gather(*remaining_tasks.values(), return_exceptions=True) + if isinstance(exc.inner, Exception): + raise exc.inner + raise RuntimeError("Split PDF chunk cancelled") from exc.inner + if response.status_code != 200: + # cancel all remaining tasks + for remaining_task in remaining_tasks.values(): + remaining_task.cancel() + logger.warning( + "split_pdf event=batch_cancel_remaining operation_id=%s reason=http_error failed_chunk_index=%d status_code=%d remaining_tasks=%d", + operation_id, + index, + response.status_code, + len(remaining_tasks), + ) + await asyncio.gather(*remaining_tasks.values(), return_exceptions=True) results.append((index, response)) - # remove task from remaining_tasks that should be cancelled in case of failure - del remaining_tasks[index] - # return results in the original order - return sorted(results, key=lambda x: x[0]) + break + results.append((index, response)) + # remove task from remaining_tasks that should be cancelled in case of failure + del remaining_tasks[index] + # return results in the original order + return sorted(results, key=lambda x: x[0]) def _create_transport_error_response(error: BaseException) -> httpx.Response: @@ -592,7 +624,6 @@ def before_request( self.cache_tmp_data_feature[operation_id] = cache_tmp_data_feature self.cache_tmp_data_dir[operation_id] = cache_tmp_data_dir self.concurrency_level[operation_id] = concurrency_level - self.executors[operation_id] = futures.ThreadPoolExecutor(max_workers=1) timeout_seconds = _get_request_timeout_seconds(request) if timeout_seconds is None and hook_ctx.config.timeout_ms is not None: @@ -958,10 +989,10 @@ def _await_elements(self, operation_id: str) -> Optional[list]: ) # sending the coroutines to a separate thread to avoid blocking the current event loop - # this operation should be removed when the SDK is updated to support async hooks executor = self.executors.get(operation_id) if executor is None: - raise RuntimeError("Executor not found for operation_id") + executor = futures.ThreadPoolExecutor(max_workers=1) + self.executors[operation_id] = executor loop_holder: dict[str, Optional[asyncio.AbstractEventLoop]] = {"loop": None} self.operation_loops[operation_id] = loop_holder try: @@ -1018,6 +1049,72 @@ def _await_elements(self, operation_id: str) -> Optional[list]: if task_responses is None: return None + return self._elements_from_task_responses( + operation_id, + task_responses, + started_at=started_at, + ) + + async def _await_elements_async(self, operation_id: str) -> Optional[list]: + tasks = self.coroutines_to_execute.get(operation_id) + if tasks is None: + return None + + started_at = time.perf_counter() + concurrency_level = self.concurrency_level.get(operation_id, DEFAULT_CONCURRENCY_LEVEL) + timeout_seconds = self.operation_timeouts.get(operation_id) + client_timeout = httpx.Timeout(timeout_seconds) if timeout_seconds is not None else None + allow_failed = self.allow_failed.get(operation_id, DEFAULT_ALLOW_FAILED) + coroutines = run_tasks( + tasks, + allow_failed=allow_failed, + concurrency_level=concurrency_level, + client_timeout=client_timeout, + operation_id=operation_id, + ) + num_waves = max(1, math.ceil(len(tasks) / concurrency_level)) + per_chunk = timeout_seconds or DEFAULT_FUTURE_TIMEOUT_MINUTES * 60 + future_timeout = per_chunk * num_waves + TIMEOUT_BUFFER_SECONDS + logger.info( + "split_pdf event=batch_start operation_id=%s chunk_count=%d concurrency=%d allow_failed=%s client_timeout_seconds=%s future_timeout_seconds=%s num_waves=%d", + operation_id, + len(tasks), + concurrency_level, + allow_failed, + timeout_seconds, + future_timeout, + num_waves, + ) + try: + task_responses = await asyncio.wait_for(coroutines, timeout=future_timeout) + except TimeoutError: + logger.error( + "split_pdf event=batch_timeout operation_id=%s chunk_count=%d concurrency=%d allow_failed=%s client_timeout_seconds=%s future_timeout_seconds=%s", + operation_id, + len(tasks), + concurrency_level, + allow_failed, + timeout_seconds, + future_timeout, + ) + raise + + return self._elements_from_task_responses( + operation_id, + task_responses, + started_at=started_at, + ) + + def _elements_from_task_responses( + self, + operation_id: str, + task_responses: list[tuple[int, httpx.Response]], + *, + started_at: float, + ) -> Optional[list]: + if task_responses is None: + return None + successful_responses = [] failed_responses: list[tuple[int, httpx.Response]] = [] transport_failure_count = 0 @@ -1062,7 +1159,7 @@ def _await_elements(self, operation_id: str) -> Optional[list]: len(failed_responses), transport_failure_count, elapsed_ms, - allow_failed, + self.allow_failed.get(operation_id, DEFAULT_ALLOW_FAILED), ) for failed_chunk_index, response in failed_responses: self._annotate_failure_response( @@ -1076,6 +1173,39 @@ def _await_elements(self, operation_id: str) -> Optional[list]: flattened_elements = [element for sublist in elements for element in sublist] return flattened_elements + def _build_after_success_response( + self, + operation_id: str, + response: httpx.Response, + elements: Optional[list], + ) -> httpx.Response: + # if fails are disallowed, return the first failed response + if ( + not self.allow_failed.get(operation_id, DEFAULT_ALLOW_FAILED) + and self.api_failed_responses.get(operation_id) + ): + logger.warning( + "split_pdf event=top_level_failure operation_id=%s mode=strict failed_response_selected=true", + operation_id, + ) + return self.api_failed_responses[operation_id][0] + + if ( + self.allow_failed.get(operation_id, DEFAULT_ALLOW_FAILED) + and not self.api_successful_responses.get(operation_id) + and self.api_failed_responses.get(operation_id) + ): + logger.warning( + "split_pdf event=top_level_failure operation_id=%s mode=allow_failed reason=no_successful_chunks", + operation_id, + ) + return self.api_failed_responses[operation_id][0] + + if elements is None: + return response + + return request_utils.create_response(elements) + @staticmethod def _finalize_operation_resources( executor: Optional[futures.ThreadPoolExecutor], @@ -1122,33 +1252,27 @@ def after_success( try: elements = self._await_elements(operation_id) + return self._build_after_success_response(operation_id, response, elements) + finally: + if operation_id is not None: + self._clear_operation(operation_id) - # if fails are disallowed, return the first failed response - if ( - not self.allow_failed.get(operation_id, DEFAULT_ALLOW_FAILED) - and self.api_failed_responses.get(operation_id) - ): - logger.warning( - "split_pdf event=top_level_failure operation_id=%s mode=strict failed_response_selected=true", - operation_id, - ) - return self.api_failed_responses[operation_id][0] - - if ( - self.allow_failed.get(operation_id, DEFAULT_ALLOW_FAILED) - and not self.api_successful_responses.get(operation_id) - and self.api_failed_responses.get(operation_id) - ): - logger.warning( - "split_pdf event=top_level_failure operation_id=%s mode=allow_failed reason=no_successful_chunks", - operation_id, - ) - return self.api_failed_responses[operation_id][0] + async def after_success_async( + self, hook_ctx: AfterSuccessContext, response: httpx.Response + ) -> Union[httpx.Response, Exception]: + """Async equivalent of `after_success` for `partition_async`. - if elements is None: - return response + This path awaits split chunk requests directly on the caller's event loop + instead of creating a nested event loop in a worker thread. + """ + del hook_ctx + operation_id = self._get_operation_id(response=response) + if operation_id is None or operation_id not in self.coroutines_to_execute: + return response - return request_utils.create_response(elements) + try: + elements = await self._await_elements_async(operation_id) + return self._build_after_success_response(operation_id, response, elements) finally: if operation_id is not None: self._clear_operation(operation_id) diff --git a/src/unstructured_client/_hooks/sdkhooks.py b/src/unstructured_client/_hooks/sdkhooks.py index cc59759f..f6a12283 100644 --- a/src/unstructured_client/_hooks/sdkhooks.py +++ b/src/unstructured_client/_hooks/sdkhooks.py @@ -1,5 +1,9 @@ """Code generated by Speakeasy (https://speakeasy.com). DO NOT EDIT.""" +import asyncio +import inspect +from typing import Any, Awaitable, Callable, List, Optional, Tuple + import httpx from .types import ( SDKInitHook, @@ -12,10 +16,21 @@ Hooks, ) from .registration import init_hooks -from typing import List, Optional, Tuple from unstructured_client.httpclient import HttpClient +def _get_async_hook_method( + hook: object, + method_name: str, +) -> Optional[Callable[..., Awaitable[Any]]]: + method = getattr(hook, method_name, None) + if method is None: + return None + if inspect.iscoroutinefunction(method): + return method + return None + + class SDKHooks(Hooks): def __init__(self) -> None: self.sdk_init_hooks: List[SDKInitHook] = [] @@ -52,6 +67,21 @@ def before_request( return request + async def before_request_async( + self, hook_ctx: BeforeRequestContext, request: httpx.Request + ) -> httpx.Request: + for hook in self.before_request_hooks: + async_method = _get_async_hook_method(hook, "before_request_async") + if async_method is not None: + out = await async_method(hook_ctx, request) + else: + out = await asyncio.to_thread(hook.before_request, hook_ctx, request) + if isinstance(out, Exception): + raise out + request = out + + return request + def after_success( self, hook_ctx: AfterSuccessContext, response: httpx.Response ) -> httpx.Response: @@ -62,6 +92,20 @@ def after_success( response = out return response + async def after_success_async( + self, hook_ctx: AfterSuccessContext, response: httpx.Response + ) -> httpx.Response: + for hook in self.after_success_hooks: + async_method = _get_async_hook_method(hook, "after_success_async") + if async_method is not None: + out = await async_method(hook_ctx, response) + else: + out = await asyncio.to_thread(hook.after_success, hook_ctx, response) + if isinstance(out, Exception): + raise out + response = out + return response + def after_error( self, hook_ctx: AfterErrorContext, @@ -74,3 +118,20 @@ def after_error( raise result response, error = result return response, error + + async def after_error_async( + self, + hook_ctx: AfterErrorContext, + response: Optional[httpx.Response], + error: Optional[Exception], + ) -> Tuple[Optional[httpx.Response], Optional[Exception]]: + for hook in self.after_error_hooks: + async_method = _get_async_hook_method(hook, "after_error_async") + if async_method is not None: + result = await async_method(hook_ctx, response, error) + else: + result = await asyncio.to_thread(hook.after_error, hook_ctx, response, error) + if isinstance(result, Exception): + raise result + response, error = result + return response, error diff --git a/src/unstructured_client/_hooks/types.py b/src/unstructured_client/_hooks/types.py index a2840fe4..d409d9c3 100644 --- a/src/unstructured_client/_hooks/types.py +++ b/src/unstructured_client/_hooks/types.py @@ -2,7 +2,7 @@ from abc import ABC, abstractmethod import httpx -from typing import Any, Callable, List, Optional, Tuple, Union +from typing import Any, Callable, List, Optional, Protocol, Tuple, Union from unstructured_client.httpclient import HttpClient from unstructured_client.sdkconfiguration import SDKConfiguration @@ -76,6 +76,13 @@ def before_request( pass +class AsyncBeforeRequestHook(Protocol): + async def before_request_async( + self, hook_ctx: BeforeRequestContext, request: httpx.Request + ) -> Union[httpx.Request, Exception]: + pass + + class AfterSuccessHook(ABC): @abstractmethod def after_success( @@ -84,6 +91,13 @@ def after_success( pass +class AsyncAfterSuccessHook(Protocol): + async def after_success_async( + self, hook_ctx: AfterSuccessContext, response: httpx.Response + ) -> Union[httpx.Response, Exception]: + pass + + class AfterErrorHook(ABC): @abstractmethod def after_error( @@ -95,6 +109,16 @@ def after_error( pass +class AsyncAfterErrorHook(Protocol): + async def after_error_async( + self, + hook_ctx: AfterErrorContext, + response: Optional[httpx.Response], + error: Optional[Exception], + ) -> Union[Tuple[Optional[httpx.Response], Optional[Exception]], Exception]: + pass + + class Hooks(ABC): @abstractmethod def register_sdk_init_hook(self, hook: SDKInitHook): @@ -111,3 +135,24 @@ def register_after_success_hook(self, hook: AfterSuccessHook): @abstractmethod def register_after_error_hook(self, hook: AfterErrorHook): pass + + @abstractmethod + async def before_request_async( + self, hook_ctx: BeforeRequestContext, request: httpx.Request + ) -> httpx.Request: + pass + + @abstractmethod + async def after_success_async( + self, hook_ctx: AfterSuccessContext, response: httpx.Response + ) -> httpx.Response: + pass + + @abstractmethod + async def after_error_async( + self, + hook_ctx: AfterErrorContext, + response: Optional[httpx.Response], + error: Optional[Exception], + ) -> Tuple[Optional[httpx.Response], Optional[Exception]]: + pass diff --git a/src/unstructured_client/_version.py b/src/unstructured_client/_version.py index e3a41815..31ca2183 100644 --- a/src/unstructured_client/_version.py +++ b/src/unstructured_client/_version.py @@ -3,10 +3,10 @@ import importlib.metadata __title__: str = "unstructured-client" -__version__: str = "0.43.3" +__version__: str = "0.43.4" __openapi_doc_version__: str = "1.2.31" __gen_version__: str = "2.680.0" -__user_agent__: str = "speakeasy-sdk/python 0.43.3 2.680.0 1.2.31 unstructured-client" +__user_agent__: str = "speakeasy-sdk/python 0.43.4 2.680.0 1.2.31 unstructured-client" try: if __package__ is not None: diff --git a/src/unstructured_client/basesdk.py b/src/unstructured_client/basesdk.py index a1c0f684..69407206 100644 --- a/src/unstructured_client/basesdk.py +++ b/src/unstructured_client/basesdk.py @@ -1,5 +1,7 @@ """Code generated by Speakeasy (https://speakeasy.com). DO NOT EDIT.""" +import asyncio + from .sdkconfiguration import SDKConfiguration import httpx from typing import Callable, List, Mapping, Optional, Tuple @@ -18,6 +20,13 @@ from urllib.parse import parse_qs, urlparse +class _RequestBoundCancelledError(Exception): + def __init__(self, request: httpx.Request, cancellation: asyncio.CancelledError): + super().__init__(str(cancellation) or "Request cancelled") + self.request = request + self.cancellation = cancellation + + class BaseSDK: sdk_configuration: SDKConfiguration @@ -297,10 +306,39 @@ async def do_request_async( hooks = self.sdk_configuration.__dict__["_hooks"] + async def cleanup_cancelled_request( + req: Optional[httpx.Request], + response: Optional[httpx.Response], + cancellation: asyncio.CancelledError, + ) -> None: + if req is None and response is None: + return + try: + await hooks.after_error_async( + AfterErrorContext(hook_ctx), + response, + _RequestBoundCancelledError(req or response.request, cancellation), + ) + except Exception: + logger.debug("Cancellation cleanup failed", exc_info=True) + async def do(): http_res = None + req = None try: - req = hooks.before_request(BeforeRequestContext(hook_ctx), request) + before_request_task = asyncio.create_task( + hooks.before_request_async(BeforeRequestContext(hook_ctx), request) + ) + try: + # Sync before-request hooks may be running in a worker thread; if the caller + # cancels, wait for setup to finish so cancellation cleanup can find request state. + req = await asyncio.shield(before_request_task) + except asyncio.CancelledError: + if not before_request_task.done(): + req = await asyncio.shield(before_request_task) + elif not before_request_task.cancelled(): + req = before_request_task.result() + raise logger.debug( "Request:\nMethod: %s\nURL: %s\nHeaders: %s\nBody: %s", req.method, @@ -313,8 +351,11 @@ async def do(): raise ValueError("client is required") http_res = await client.send(req, stream=stream) + except asyncio.CancelledError as cancellation: + await cleanup_cancelled_request(req, None, cancellation) + raise except Exception as e: - _, e = hooks.after_error(AfterErrorContext(hook_ctx), None, e) + _, e = await hooks.after_error_async(AfterErrorContext(hook_ctx), None, e) if e is not None: logger.debug("Request Exception", exc_info=True) raise e @@ -332,9 +373,13 @@ async def do(): ) if utils.match_status_codes(error_status_codes, http_res.status_code): - result, err = hooks.after_error( - AfterErrorContext(hook_ctx), http_res, None - ) + try: + result, err = await hooks.after_error_async( + AfterErrorContext(hook_ctx), http_res, None + ) + except asyncio.CancelledError as cancellation: + await cleanup_cancelled_request(None, http_res, cancellation) + raise if err is not None: logger.debug("Request Exception", exc_info=True) raise err @@ -354,6 +399,10 @@ async def do(): http_res = await do() if not utils.match_status_codes(error_status_codes, http_res.status_code): - http_res = hooks.after_success(AfterSuccessContext(hook_ctx), http_res) + try: + http_res = await hooks.after_success_async(AfterSuccessContext(hook_ctx), http_res) + except asyncio.CancelledError as cancellation: + await cleanup_cancelled_request(None, http_res, cancellation) + raise return http_res From 83384fbe3d490b5f3c8348457d6a510f1ae08190 Mon Sep 17 00:00:00 2001 From: Emily Voss Date: Thu, 30 Apr 2026 17:11:18 -0700 Subject: [PATCH 02/10] mypy --- src/unstructured_client/basesdk.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/unstructured_client/basesdk.py b/src/unstructured_client/basesdk.py index 69407206..d1cc479d 100644 --- a/src/unstructured_client/basesdk.py +++ b/src/unstructured_client/basesdk.py @@ -313,11 +313,15 @@ async def cleanup_cancelled_request( ) -> None: if req is None and response is None: return + cleanup_request = req + if cleanup_request is None and response is not None: + cleanup_request = response.request + assert cleanup_request is not None try: await hooks.after_error_async( AfterErrorContext(hook_ctx), response, - _RequestBoundCancelledError(req or response.request, cancellation), + _RequestBoundCancelledError(cleanup_request, cancellation), ) except Exception: logger.debug("Cancellation cleanup failed", exc_info=True) From 519b1f19c3351173e9e17f52eca18e59ca47cdec Mon Sep 17 00:00:00 2001 From: Emily Voss Date: Thu, 30 Apr 2026 17:29:38 -0700 Subject: [PATCH 03/10] remind the AI that pypdfium is not thread-safe --- .../test_split_pdf_async_behavior.py | 418 ++++++++++++++++++ .../_hooks/custom/split_pdf_hook.py | 38 +- 2 files changed, 444 insertions(+), 12 deletions(-) create mode 100644 _test_unstructured_client/integration/test_split_pdf_async_behavior.py diff --git a/_test_unstructured_client/integration/test_split_pdf_async_behavior.py b/_test_unstructured_client/integration/test_split_pdf_async_behavior.py new file mode 100644 index 00000000..7f8639c9 --- /dev/null +++ b/_test_unstructured_client/integration/test_split_pdf_async_behavior.py @@ -0,0 +1,418 @@ +from __future__ import annotations + +import asyncio +import io +import logging +import threading +from collections import Counter +from functools import partial +from unittest.mock import MagicMock, patch + +import httpx +import pytest + +from unstructured_client._hooks.custom.split_pdf_hook import ( + DEFAULT_CONCURRENCY_LEVEL, + SplitPdfHook, +) +from unstructured_client._hooks.sdkhooks import SDKHooks +from unstructured_client.basesdk import BaseSDK +from unstructured_client.sdkconfiguration import SDKConfiguration + + +def _httpx_json_response(payload: list[dict], status_code: int = 200) -> httpx.Response: + return httpx.Response( + status_code=status_code, + json=payload, + request=httpx.Request("POST", "http://localhost:8888/general/v0/general"), + ) + + +def _make_sdk_hook_context(): + hook_ctx = MagicMock() + hook_ctx.config = MagicMock() + hook_ctx.base_url = "http://localhost:8888" + hook_ctx.operation_id = "partition" + hook_ctx.oauth2_scopes = None + hook_ctx.security_source = None + return hook_ctx + + +def _make_base_sdk_with_hooks(hooks: SDKHooks, async_client: object) -> BaseSDK: + config = SDKConfiguration( + client=None, + client_supplied=False, + async_client=async_client, # type: ignore[arg-type] + async_client_supplied=True, + debug_logger=logging.getLogger("test"), + ) + config.__dict__["_hooks"] = hooks + return BaseSDK(config) + + +def _make_split_hooks(split_hook: SplitPdfHook) -> SDKHooks: + hooks = SDKHooks() + hooks.before_request_hooks = [split_hook] + hooks.after_success_hooks = [split_hook] + hooks.after_error_hooks = [split_hook] + return hooks + + +def _prepare_sdk_split_operation( + split_hook: SplitPdfHook, + *, + operation_id: str, + coroutines: list, + allow_failed: bool = False, + concurrency_level: int = DEFAULT_CONCURRENCY_LEVEL, + timeout_seconds: float | None = 12.0, +) -> None: + split_hook.coroutines_to_execute[operation_id] = coroutines + split_hook.pending_operation_ids[operation_id] = operation_id + split_hook.allow_failed[operation_id] = allow_failed + split_hook.cache_tmp_data_feature[operation_id] = False + split_hook.concurrency_level[operation_id] = concurrency_level + split_hook.operation_timeouts[operation_id] = timeout_seconds + + def _prepared_split_request(hook_ctx, request): + del hook_ctx, request + return httpx.Request( + "GET", + "http://localhost:8888/general/docs", + headers={"operation_id": operation_id}, + extensions={"split_pdf_operation_id": operation_id}, + ) + + split_hook.before_request = _prepared_split_request # type: ignore[method-assign] + + +def _make_dummy_request_client() -> httpx.AsyncClient: + return httpx.AsyncClient( + transport=httpx.MockTransport( + lambda request: httpx.Response(status_code=200, request=request) + ) + ) + + +def _make_sdk_split_task( + sdk: BaseSDK, + *, + error_status_codes: list | None = None, +) -> asyncio.Task[httpx.Response]: + return asyncio.create_task( + sdk.do_request_async( + _make_sdk_hook_context(), + httpx.Request("POST", "http://localhost:8888/general/v0/general"), + error_status_codes=error_status_codes or [], + ) + ) + + +@pytest.mark.asyncio +async def test_partition_async_split_collects_chunks_in_order_without_executor(): + operation_id = "integration-happy" + split_hook = SplitPdfHook() + active_chunks = 0 + max_active_chunks = 0 + active_lock = asyncio.Lock() + + async def _chunk_transport(request: httpx.Request) -> httpx.Response: + nonlocal active_chunks, max_active_chunks + async with active_lock: + active_chunks += 1 + max_active_chunks = max(max_active_chunks, active_chunks) + try: + if request.url.path.endswith("/1"): + await asyncio.sleep(0.02) + payload = [{"page_number": 1}] + else: + payload = [{"page_number": 3}] + return httpx.Response(status_code=200, json=payload, request=request) + finally: + async with active_lock: + active_chunks -= 1 + + original_async_client = httpx.AsyncClient + + def _chunk_client_factory(*args, **kwargs): + return original_async_client( + transport=httpx.MockTransport(_chunk_transport), + timeout=kwargs.get("timeout"), + ) + + coroutines = [ + partial( + split_hook.call_api_partial, + _operation_id=operation_id, + chunk_index=1, + page_number=1, + pdf_chunk_request=httpx.Request("POST", "http://chunks.local/chunk/1"), + pdf_chunk_file=io.BytesIO(b"chunk-1"), + retry_config=None, + cache_tmp_data_feature=False, + temp_dir_path=None, + ), + partial( + split_hook.call_api_partial, + _operation_id=operation_id, + chunk_index=2, + page_number=3, + pdf_chunk_request=httpx.Request("POST", "http://chunks.local/chunk/2"), + pdf_chunk_file=io.BytesIO(b"chunk-2"), + retry_config=None, + cache_tmp_data_feature=False, + temp_dir_path=None, + ), + ] + _prepare_sdk_split_operation( + split_hook, + operation_id=operation_id, + coroutines=coroutines, + concurrency_level=2, + ) + + async with _make_dummy_request_client() as top_level_client: + sdk = _make_base_sdk_with_hooks(_make_split_hooks(split_hook), top_level_client) + with patch( + "unstructured_client._hooks.custom.split_pdf_hook.httpx.AsyncClient", + side_effect=_chunk_client_factory, + ): + response = await sdk.do_request_async( + _make_sdk_hook_context(), + httpx.Request("POST", "http://localhost:8888/general/v0/general"), + error_status_codes=[], + ) + + assert response.json() == [{"page_number": 1}, {"page_number": 3}] + assert max_active_chunks == 2 + assert operation_id not in split_hook.executors + assert operation_id not in split_hook.coroutines_to_execute + + +@pytest.mark.asyncio +async def test_partition_async_cancellation_cleans_split_state_and_tempdir(): + operation_id = "integration-cancel" + split_hook = SplitPdfHook() + started = asyncio.Event() + cancelled_counter = Counter() + tempdir = MagicMock() + split_hook.tempdirs[operation_id] = tempdir + + async def _hanging_chunk( + async_client: httpx.AsyncClient, + limiter: asyncio.Semaphore, + ) -> httpx.Response: + del async_client, limiter + try: + started.set() + await asyncio.Event().wait() + return _httpx_json_response([]) + except asyncio.CancelledError: + cancelled_counter.update(["cancelled"]) + raise + + _prepare_sdk_split_operation( + split_hook, + operation_id=operation_id, + coroutines=[partial(_hanging_chunk)], + timeout_seconds=60.0, + ) + + async with _make_dummy_request_client() as top_level_client: + sdk = _make_base_sdk_with_hooks(_make_split_hooks(split_hook), top_level_client) + task = _make_sdk_split_task(sdk) + await started.wait() + task.cancel() + + with pytest.raises(asyncio.CancelledError): + await task + + assert cancelled_counter["cancelled"] == 1 + assert operation_id not in split_hook.coroutines_to_execute + assert operation_id not in split_hook.pending_operation_ids + assert operation_id not in split_hook.tempdirs + tempdir.cleanup.assert_called_once() + + +@pytest.mark.asyncio +async def test_partition_async_strict_failure_drains_sibling_chunks_before_close(): + operation_id = "integration-strict-failure" + split_hook = SplitPdfHook() + sibling_started = asyncio.Event() + sibling_cancelled = asyncio.Event() + sibling_finished = asyncio.Event() + + async def _chunk_transport(request: httpx.Request) -> httpx.Response: + if request.url.path.endswith("/failure"): + return httpx.Response(status_code=400, content=b"bad chunk", request=request) + sibling_started.set() + try: + await asyncio.Event().wait() + return httpx.Response(status_code=200, json=[], request=request) + except asyncio.CancelledError: + sibling_cancelled.set() + raise + finally: + sibling_finished.set() + + original_async_client = httpx.AsyncClient + + def _chunk_client_factory(*args, **kwargs): + return original_async_client( + transport=httpx.MockTransport(_chunk_transport), + timeout=kwargs.get("timeout"), + ) + + coroutines = [ + partial( + split_hook.call_api_partial, + _operation_id=operation_id, + chunk_index=1, + page_number=1, + pdf_chunk_request=httpx.Request("POST", "http://chunks.local/chunk/wait"), + pdf_chunk_file=io.BytesIO(b"chunk-1"), + retry_config=None, + cache_tmp_data_feature=False, + temp_dir_path=None, + ), + partial( + split_hook.call_api_partial, + _operation_id=operation_id, + chunk_index=2, + page_number=3, + pdf_chunk_request=httpx.Request("POST", "http://chunks.local/chunk/failure"), + pdf_chunk_file=io.BytesIO(b"chunk-2"), + retry_config=None, + cache_tmp_data_feature=False, + temp_dir_path=None, + ), + ] + _prepare_sdk_split_operation( + split_hook, + operation_id=operation_id, + coroutines=coroutines, + concurrency_level=2, + ) + + async with _make_dummy_request_client() as top_level_client: + sdk = _make_base_sdk_with_hooks(_make_split_hooks(split_hook), top_level_client) + with patch( + "unstructured_client._hooks.custom.split_pdf_hook.httpx.AsyncClient", + side_effect=_chunk_client_factory, + ): + response = await sdk.do_request_async( + _make_sdk_hook_context(), + httpx.Request("POST", "http://localhost:8888/general/v0/general"), + error_status_codes=[], + ) + + assert response.status_code == 400 + assert response.content == b"bad chunk" + assert sibling_started.is_set() + assert sibling_cancelled.is_set() + assert sibling_finished.is_set() + assert "transport_exception" not in response.extensions + assert operation_id not in split_hook.coroutines_to_execute + + +@pytest.mark.asyncio +async def test_async_hook_chain_runs_sync_fallback_without_blocking_loop(): + loop_progress_released_hook = threading.Event() + sync_hook_started = threading.Event() + sync_hook_was_released_by_loop = [] + calls: list[str] = [] + + class AsyncSuccessHook: + async def after_success_async(self, hook_ctx, response): + del hook_ctx + calls.append("async") + response.headers["X-Async-Hook"] = "called" + return response + + def after_success(self, hook_ctx, response): # pragma: no cover - dispatch guard + raise AssertionError("async hook should be awaited") + + class SyncSuccessHook: + def after_success(self, hook_ctx, response): + del hook_ctx + calls.append("sync") + sync_hook_started.set() + sync_hook_was_released_by_loop.append(loop_progress_released_hook.wait(timeout=0.5)) + response.headers["X-Sync-Hook"] = "called" + return response + + hooks = SDKHooks() + hooks.before_request_hooks = [] + hooks.after_error_hooks = [] + hooks.after_success_hooks = [AsyncSuccessHook(), SyncSuccessHook()] # type: ignore[list-item] + + async def _release_sync_hook_from_loop() -> None: + while not sync_hook_started.is_set(): + await asyncio.sleep(0) + loop_progress_released_hook.set() + + async with _make_dummy_request_client() as top_level_client: + sdk = _make_base_sdk_with_hooks(hooks, top_level_client) + response, _ = await asyncio.gather( + sdk.do_request_async( + _make_sdk_hook_context(), + httpx.Request("POST", "http://localhost:8888/general/v0/general"), + error_status_codes=[], + ), + _release_sync_hook_from_loop(), + ) + + assert calls == ["async", "sync"] + assert response.headers["X-Async-Hook"] == "called" + assert response.headers["X-Sync-Hook"] == "called" + assert sync_hook_was_released_by_loop == [True] + + +@pytest.mark.asyncio +async def test_partition_async_reassembly_does_not_block_event_loop(): + operation_id = "integration-offload" + split_hook = SplitPdfHook() + reassembly_started = threading.Event() + reassembly_released = threading.Event() + reassembly_was_released_by_loop = [] + + async def _successful_chunk( + async_client: httpx.AsyncClient, + limiter: asyncio.Semaphore, + ) -> httpx.Response: + del async_client, limiter + return _httpx_json_response([{"page_number": 1}]) + + _prepare_sdk_split_operation( + split_hook, + operation_id=operation_id, + coroutines=[partial(_successful_chunk)], + ) + + def _slow_reassembly(operation_id_arg, task_responses, *, started_at): + del operation_id_arg, task_responses, started_at + reassembly_started.set() + reassembly_was_released_by_loop.append(reassembly_released.wait(timeout=0.5)) + return [{"page_number": 1}] + + split_hook._elements_from_task_responses = _slow_reassembly # type: ignore[method-assign] + + async def _release_when_started(started: threading.Event, release: threading.Event) -> None: + while not started.is_set(): + await asyncio.sleep(0) + release.set() + + async with _make_dummy_request_client() as top_level_client: + sdk = _make_base_sdk_with_hooks(_make_split_hooks(split_hook), top_level_client) + response, _ = await asyncio.gather( + sdk.do_request_async( + _make_sdk_hook_context(), + httpx.Request("POST", "http://localhost:8888/general/v0/general"), + error_status_codes=[], + ), + _release_when_started(reassembly_started, reassembly_released), + ) + + assert response.json() == [{"page_number": 1}] + assert reassembly_was_released_by_loop == [True] + assert operation_id not in split_hook.coroutines_to_execute diff --git a/src/unstructured_client/_hooks/custom/split_pdf_hook.py b/src/unstructured_client/_hooks/custom/split_pdf_hook.py index 703939fd..b88e7365 100644 --- a/src/unstructured_client/_hooks/custom/split_pdf_hook.py +++ b/src/unstructured_client/_hooks/custom/split_pdf_hook.py @@ -725,6 +725,14 @@ def before_request( self._clear_operation(operation_id) raise + async def before_request_async( + self, hook_ctx: BeforeRequestContext, request: httpx.Request + ) -> Union[httpx.Request, Exception]: + # PDFium is not thread-safe, so split setup must not be pushed into the + # default executor. Keep this path synchronous; only response reassembly + # is offloaded from the event loop. + return self.before_request(hook_ctx, request) + async def call_api_partial( self, pdf_chunk_request: httpx.Request, @@ -892,19 +900,19 @@ def _get_pdf_chunk_paths( The list of temporary file paths. """ + # Create temporary directory + tempdir = tempfile.TemporaryDirectory( # pylint: disable=consider-using-with + dir=cache_tmp_data_dir, + prefix="unstructured_client_" + ) + self.tempdirs[operation_id] = tempdir + tempdir_path = Path(tempdir.name) + + pdf_chunk_paths: list[Tuple[Path, int]] = [] with pdfium.PdfDocument(pdf_bytes) as pdf: offset = page_start - 1 offset_end = page_end if page_end else len(pdf) - # Create temporary directory - tempdir = tempfile.TemporaryDirectory( # pylint: disable=consider-using-with - dir=cache_tmp_data_dir, - prefix="unstructured_client_" - ) - self.tempdirs[operation_id] = tempdir - tempdir_path = Path(tempdir.name) - - pdf_chunk_paths: list[Tuple[Path, int]] = [] chunk_no = 0 while offset < offset_end: @@ -924,7 +932,7 @@ def _get_pdf_chunk_paths( pdf_chunk_paths.append((chunk_path, offset)) offset += split_size - return pdf_chunk_paths + return pdf_chunk_paths def _get_pdf_chunk_files( self, pdf_chunks: list[Tuple[Path, int]] @@ -1099,7 +1107,8 @@ async def _await_elements_async(self, operation_id: str) -> Optional[list]: ) raise - return self._elements_from_task_responses( + return await asyncio.to_thread( + self._elements_from_task_responses, operation_id, task_responses, started_at=started_at, @@ -1272,7 +1281,12 @@ async def after_success_async( try: elements = await self._await_elements_async(operation_id) - return self._build_after_success_response(operation_id, response, elements) + return await asyncio.to_thread( + self._build_after_success_response, + operation_id, + response, + elements, + ) finally: if operation_id is not None: self._clear_operation(operation_id) From e5f9d512bbaa891e552a0b0bf1caddce8dcd7edf Mon Sep 17 00:00:00 2001 From: Emily Voss Date: Thu, 30 Apr 2026 19:40:38 -0700 Subject: [PATCH 04/10] andrew review --- .../unit/test_split_pdf_hook.py | 113 ++++++++++++++++++ .../_hooks/custom/split_pdf_hook.py | 31 +++-- src/unstructured_client/_hooks/sdkhooks.py | 4 +- src/unstructured_client/basesdk.py | 30 ++++- 4 files changed, 164 insertions(+), 14 deletions(-) diff --git a/_test_unstructured_client/unit/test_split_pdf_hook.py b/_test_unstructured_client/unit/test_split_pdf_hook.py index 30651214..8c733d56 100644 --- a/_test_unstructured_client/unit/test_split_pdf_hook.py +++ b/_test_unstructured_client/unit/test_split_pdf_hook.py @@ -1038,6 +1038,119 @@ def before_request(self, hook_ctx, request): assert hook_thread_id != loop_thread_id +@pytest.mark.asyncio +async def test_unit_sdk_hooks_before_request_async_serializes_sync_hooks(): + release_first_hook = threading.Event() + first_hook_started = threading.Event() + active_lock = threading.Lock() + active_hooks = 0 + max_active_hooks = 0 + + class SyncHook: + def before_request(self, hook_ctx, request): + nonlocal active_hooks, max_active_hooks + del hook_ctx + with active_lock: + active_hooks += 1 + max_active_hooks = max(max_active_hooks, active_hooks) + if active_hooks == 1: + first_hook_started.set() + try: + release_first_hook.wait(timeout=1) + request.headers["X-Sync-Before-Hook"] = "called" + return request + finally: + with active_lock: + active_hooks -= 1 + + hooks = SDKHooks() + hooks.before_request_hooks = [SyncHook()] # type: ignore[list-item] + hook_ctx = MagicMock(spec=BeforeRequestContext) + first_request = httpx.Request("GET", "http://localhost/first") + second_request = httpx.Request("GET", "http://localhost/second") + + first_task = asyncio.create_task(hooks.before_request_async(hook_ctx, first_request)) + await asyncio.to_thread(first_hook_started.wait, 1) + second_task = asyncio.create_task(hooks.before_request_async(hook_ctx, second_request)) + await asyncio.sleep(0.01) + release_first_hook.set() + + returned_first, returned_second = await asyncio.gather(first_task, second_task) + + assert returned_first is first_request + assert returned_second is second_request + assert first_request.headers["X-Sync-Before-Hook"] == "called" + assert second_request.headers["X-Sync-Before-Hook"] == "called" + assert max_active_hooks == 1 + + +@pytest.mark.asyncio +async def test_unit_do_request_async_cancellation_during_before_request_cleans_up_later(): + setup_started = threading.Event() + release_setup = threading.Event() + cleanup_called = asyncio.Event() + cancellation_is_asyncio_cancelled_error = [] + operation_id = "cancelled-during-setup" + + class SlowBeforeRequestHook: + def before_request(self, hook_ctx, request): + del hook_ctx, request + setup_started.set() + release_setup.wait(timeout=1) + return httpx.Request( + "GET", + "http://localhost:8888/general/docs", + headers={"operation_id": operation_id}, + extensions={"split_pdf_operation_id": operation_id}, + ) + + class CancellationObserverHook: + async def after_error_async(self, hook_ctx, response, error): + del hook_ctx, response + cancellation_is_asyncio_cancelled_error.append( + isinstance(error, asyncio.CancelledError) + ) + cleanup_called.set() + return None, error + + def after_error(self, hook_ctx, response, error): # pragma: no cover - dispatch guard + raise AssertionError("async hook should be awaited") + + hooks = SDKHooks() + hooks.before_request_hooks = [SlowBeforeRequestHook()] # type: ignore[list-item] + hooks.after_error_hooks = [CancellationObserverHook()] # type: ignore[list-item] + + client = _BlockingAsyncClient() + config = SDKConfiguration( + client=None, + client_supplied=False, + async_client=client, # type: ignore[arg-type] + async_client_supplied=True, + debug_logger=logging.getLogger("test"), + ) + config.__dict__["_hooks"] = hooks + sdk = BaseSDK(config) + task = asyncio.create_task( + sdk.do_request_async( + _make_sdk_hook_context(), + httpx.Request("POST", "http://localhost:8888/general/v0/general"), + error_status_codes=[], + ) + ) + + await asyncio.to_thread(setup_started.wait, 1) + task.cancel() + with pytest.raises(asyncio.CancelledError): + await asyncio.wait_for(task, timeout=0.05) + + assert not cleanup_called.is_set() + + release_setup.set() + await asyncio.wait_for(cleanup_called.wait(), timeout=1) + + assert cancellation_is_asyncio_cancelled_error == [True] + + def test_unit_before_request_uses_hook_ctx_timeout_when_request_timeout_missing(): hook, _, result = _make_hook_with_split_request( timeout_extension=None, diff --git a/src/unstructured_client/_hooks/custom/split_pdf_hook.py b/src/unstructured_client/_hooks/custom/split_pdf_hook.py index b88e7365..26f34611 100644 --- a/src/unstructured_client/_hooks/custom/split_pdf_hook.py +++ b/src/unstructured_client/_hooks/custom/split_pdf_hook.py @@ -289,6 +289,8 @@ def _request_task_cancellation( if loop is None: return False try: + # This loop is private to the split-PDF worker thread, so all_tasks() + # only targets chunk requests for the current split operation. loop.call_soon_threadsafe(_cancel_running_tasks) return True except RuntimeError as exc: @@ -533,7 +535,7 @@ def before_request( # the platform operations do). Here we need to get the base url from the request object. if hook_ctx.operation_id != "partition": return request - self.partition_base_url = get_base_url(request.url) + partition_base_url = get_base_url(request.url) if self.client is None: logger.warning("HTTP client not accessible! Continuing without splitting.") @@ -717,7 +719,7 @@ def before_request( dummy_request_extensions[OPERATION_ID_EXTENSION_KEY] = operation_id return httpx.Request( "GET", - f"{self.partition_base_url}/general/docs", + f"{partition_base_url}/general/docs", headers={"operation_id": operation_id}, extensions=dummy_request_extensions, ) @@ -764,11 +766,6 @@ async def call_api_partial( page_number=page_number, ) - # Immediately delete request to save memory - del response._request # pylint: disable=protected-access - response._request = None # pylint: disable=protected-access - - if response.status_code == 200: if cache_tmp_data_feature: if temp_dir_path is None: @@ -788,6 +785,26 @@ async def call_api_partial( page_number, Path(temp_file_name).name, ) + response = httpx.Response( + status_code=response.status_code, + headers=response.headers, + content=temp_file_name.encode(), + extensions=response.extensions, + ) + else: + response = httpx.Response( + status_code=response.status_code, + headers=response.headers, + content=response.content, + extensions=response.extensions, + ) + else: + response = httpx.Response( + status_code=response.status_code, + headers=response.headers, + content=response.content, + extensions=response.extensions, + ) logger.debug( "split_pdf event=chunk_complete operation_id=%s chunk_index=%d page_number=%d status_code=%d", diff --git a/src/unstructured_client/_hooks/sdkhooks.py b/src/unstructured_client/_hooks/sdkhooks.py index f6a12283..d126dbf5 100644 --- a/src/unstructured_client/_hooks/sdkhooks.py +++ b/src/unstructured_client/_hooks/sdkhooks.py @@ -37,6 +37,7 @@ def __init__(self) -> None: self.before_request_hooks: List[BeforeRequestHook] = [] self.after_success_hooks: List[AfterSuccessHook] = [] self.after_error_hooks: List[AfterErrorHook] = [] + self._sync_hook_lock = asyncio.Lock() init_hooks(self) def register_sdk_init_hook(self, hook: SDKInitHook) -> None: @@ -75,7 +76,8 @@ async def before_request_async( if async_method is not None: out = await async_method(hook_ctx, request) else: - out = await asyncio.to_thread(hook.before_request, hook_ctx, request) + async with self._sync_hook_lock: + out = await asyncio.to_thread(hook.before_request, hook_ctx, request) if isinstance(out, Exception): raise out request = out diff --git a/src/unstructured_client/basesdk.py b/src/unstructured_client/basesdk.py index d1cc479d..13877122 100644 --- a/src/unstructured_client/basesdk.py +++ b/src/unstructured_client/basesdk.py @@ -20,7 +20,7 @@ from urllib.parse import parse_qs, urlparse -class _RequestBoundCancelledError(Exception): +class _RequestBoundCancelledError(asyncio.CancelledError): def __init__(self, request: httpx.Request, cancellation: asyncio.CancelledError): super().__init__(str(cancellation) or "Request cancelled") self.request = request @@ -326,6 +326,24 @@ async def cleanup_cancelled_request( except Exception: logger.debug("Cancellation cleanup failed", exc_info=True) + def cleanup_when_before_request_finishes( + before_request_task: "asyncio.Task[httpx.Request]", + cancellation: asyncio.CancelledError, + ) -> None: + def on_done(task: "asyncio.Task[httpx.Request]") -> None: + if task.cancelled(): + return + try: + completed_req = task.result() + except Exception: + logger.debug("Cancelled request setup failed before cleanup", exc_info=True) + return + asyncio.create_task( + cleanup_cancelled_request(completed_req, None, cancellation) + ) + + before_request_task.add_done_callback(on_done) + async def do(): http_res = None req = None @@ -335,13 +353,13 @@ async def do(): ) try: # Sync before-request hooks may be running in a worker thread; if the caller - # cancels, wait for setup to finish so cancellation cleanup can find request state. + # cancels, let setup finish in the background so cleanup can find request state. req = await asyncio.shield(before_request_task) - except asyncio.CancelledError: - if not before_request_task.done(): - req = await asyncio.shield(before_request_task) - elif not before_request_task.cancelled(): + except asyncio.CancelledError as cancellation: + if before_request_task.done() and not before_request_task.cancelled(): req = before_request_task.result() + else: + cleanup_when_before_request_finishes(before_request_task, cancellation) raise logger.debug( "Request:\nMethod: %s\nURL: %s\nHeaders: %s\nBody: %s", From 1c9b83585e4614f67508f85696eac1a7cf4e739a Mon Sep 17 00:00:00 2001 From: Emily Voss Date: Thu, 30 Apr 2026 19:55:26 -0700 Subject: [PATCH 05/10] broader pdfium thread safety hardening --- .../unit/test_split_pdf_hook.py | 123 +++++++++++++++++- .../_hooks/custom/split_pdf_hook.py | 79 +++++++---- src/unstructured_client/_hooks/sdkhooks.py | 4 +- 3 files changed, 171 insertions(+), 35 deletions(-) diff --git a/_test_unstructured_client/unit/test_split_pdf_hook.py b/_test_unstructured_client/unit/test_split_pdf_hook.py index 8c733d56..c4a06817 100644 --- a/_test_unstructured_client/unit/test_split_pdf_hook.py +++ b/_test_unstructured_client/unit/test_split_pdf_hook.py @@ -1039,9 +1039,10 @@ def before_request(self, hook_ctx, request): @pytest.mark.asyncio -async def test_unit_sdk_hooks_before_request_async_serializes_sync_hooks(): - release_first_hook = threading.Event() +async def test_unit_sdk_hooks_before_request_async_allows_sync_hooks_to_overlap(): + release_hooks = threading.Event() first_hook_started = threading.Event() + both_hooks_started = threading.Event() active_lock = threading.Lock() active_hooks = 0 max_active_hooks = 0 @@ -1055,8 +1056,10 @@ def before_request(self, hook_ctx, request): max_active_hooks = max(max_active_hooks, active_hooks) if active_hooks == 1: first_hook_started.set() + if active_hooks == 2: + both_hooks_started.set() try: - release_first_hook.wait(timeout=1) + release_hooks.wait(timeout=1) request.headers["X-Sync-Before-Hook"] = "called" return request finally: @@ -1072,8 +1075,8 @@ def before_request(self, hook_ctx, request): first_task = asyncio.create_task(hooks.before_request_async(hook_ctx, first_request)) await asyncio.to_thread(first_hook_started.wait, 1) second_task = asyncio.create_task(hooks.before_request_async(hook_ctx, second_request)) - await asyncio.sleep(0.01) - release_first_hook.set() + assert await asyncio.to_thread(both_hooks_started.wait, 1) + release_hooks.set() returned_first, returned_second = await asyncio.gather(first_task, second_task) @@ -1081,7 +1084,115 @@ def before_request(self, hook_ctx, request): assert returned_second is second_request assert first_request.headers["X-Sync-Before-Hook"] == "called" assert second_request.headers["X-Sync-Before-Hook"] == "called" - assert max_active_hooks == 1 + assert max_active_hooks == 2 + + +@pytest.mark.asyncio +async def test_unit_split_pdf_before_request_async_serializes_setup(): + release_first_setup = threading.Event() + first_setup_started = threading.Event() + active_lock = threading.Lock() + active_setups = 0 + max_active_setups = 0 + hook = SplitPdfHook() + + def slow_setup(hook_ctx, request): + nonlocal active_setups, max_active_setups + del hook_ctx + with active_lock: + active_setups += 1 + max_active_setups = max(max_active_setups, active_setups) + if active_setups == 1: + first_setup_started.set() + try: + release_first_setup.wait(timeout=1) + return request + finally: + with active_lock: + active_setups -= 1 + + hook_ctx = MagicMock(spec=BeforeRequestContext) + first_request = httpx.Request("GET", "http://localhost/first") + second_request = httpx.Request("GET", "http://localhost/second") + + with patch.object(hook, "_before_request_unlocked", side_effect=slow_setup): + first_task = asyncio.create_task(hook.before_request_async(hook_ctx, first_request)) + await asyncio.to_thread(first_setup_started.wait, 1) + second_task = asyncio.create_task(hook.before_request_async(hook_ctx, second_request)) + await asyncio.sleep(0.01) + release_first_setup.set() + + returned_first, returned_second = await asyncio.gather(first_task, second_task) + + assert returned_first is first_request + assert returned_second is second_request + assert max_active_setups == 1 + + +def test_unit_pdfium_helpers_require_split_setup_lock(tmp_path: Path): + hook = SplitPdfHook() + + with pytest.raises(RuntimeError, match="pypdfium split setup must run"): + hook._get_pdf_chunks_in_memory(b"%PDF", split_size=1) + + with pytest.raises(RuntimeError, match="pypdfium split setup must run"): + hook._get_pdf_chunk_paths( + b"%PDF", + operation_id="operation-id", + cache_tmp_data_dir=str(tmp_path), + split_size=1, + ) + + +def test_unit_pdfium_new_document_closes_when_in_memory_split_fails(): + hook = SplitPdfHook() + new_pdf = MagicMock() + new_pdf.import_pages.side_effect = RuntimeError("import failed") + pdf_document = MagicMock() + pdf_document.__enter__.return_value = [MagicMock()] + pdf_document.__exit__.return_value = None + pdf_document_factory = MagicMock(return_value=pdf_document) + pdf_document_factory.new.return_value = new_pdf + + hook._split_pdf_setup_state.locked = True + try: + with patch( + "unstructured_client._hooks.custom.split_pdf_hook.pdfium.PdfDocument", + pdf_document_factory, + ), pytest.raises(RuntimeError, match="import failed"): + hook._get_pdf_chunks_in_memory(b"%PDF", split_size=1) + finally: + hook._split_pdf_setup_state.locked = False + + new_pdf.close.assert_called_once_with() + + +def test_unit_pdfium_new_document_closes_when_cached_split_fails(tmp_path: Path): + hook = SplitPdfHook() + new_pdf = MagicMock() + new_pdf.save.side_effect = RuntimeError("save failed") + pdf_document = MagicMock() + pdf_document.__enter__.return_value = [MagicMock()] + pdf_document.__exit__.return_value = None + pdf_document_factory = MagicMock(return_value=pdf_document) + pdf_document_factory.new.return_value = new_pdf + + hook._split_pdf_setup_state.locked = True + try: + with patch( + "unstructured_client._hooks.custom.split_pdf_hook.pdfium.PdfDocument", + pdf_document_factory, + ), pytest.raises(RuntimeError, match="save failed"): + hook._get_pdf_chunk_paths( + b"%PDF", + operation_id="operation-id", + cache_tmp_data_dir=str(tmp_path), + split_size=1, + ) + finally: + hook._split_pdf_setup_state.locked = False + + new_pdf.close.assert_called_once_with() @pytest.mark.asyncio diff --git a/src/unstructured_client/_hooks/custom/split_pdf_hook.py b/src/unstructured_client/_hooks/custom/split_pdf_hook.py index 26f34611..04986058 100644 --- a/src/unstructured_client/_hooks/custom/split_pdf_hook.py +++ b/src/unstructured_client/_hooks/custom/split_pdf_hook.py @@ -8,6 +8,7 @@ import time import os import tempfile +import threading import uuid from collections.abc import Awaitable from concurrent import futures @@ -337,6 +338,12 @@ class SplitPdfHook(SDKInitHook, BeforeRequestHook, AfterSuccessHook, AfterErrorH 1. Create an instance of the `SplitPdfHook` class. 2. Register SDK Init, Before Request, After Success and After Error hooks. """ + _split_pdf_setup_lock = threading.Lock() + _split_pdf_setup_executor = futures.ThreadPoolExecutor( + max_workers=1, + thread_name_prefix="split-pdf-setup", + ) + _split_pdf_setup_state = threading.local() def __init__(self) -> None: self.client: Optional[HttpClient] = None @@ -512,7 +519,7 @@ def handle_request(self, request: httpx.Request) -> httpx.Response: return base_url, self.client # pylint: disable=too-many-return-statements - def before_request( + def _before_request_unlocked( self, hook_ctx: BeforeRequestContext, request: httpx.Request ) -> Union[httpx.Request, Exception]: """If `splitPdfPage` is set to `true` in the request, the PDF file is split into @@ -727,13 +734,28 @@ def before_request( self._clear_operation(operation_id) raise + def before_request( + self, hook_ctx: BeforeRequestContext, request: httpx.Request + ) -> Union[httpx.Request, Exception]: + # pypdfium is process-global and not thread-safe; serialize split setup across clients. + with self._split_pdf_setup_lock: + self._split_pdf_setup_state.locked = True + try: + return self._before_request_unlocked(hook_ctx, request) + finally: + self._split_pdf_setup_state.locked = False + async def before_request_async( self, hook_ctx: BeforeRequestContext, request: httpx.Request ) -> Union[httpx.Request, Exception]: - # PDFium is not thread-safe, so split setup must not be pushed into the - # default executor. Keep this path synchronous; only response reassembly - # is offloaded from the event loop. - return self.before_request(hook_ctx, request) + # Keep pypdfium setup off the event loop while routing all async callers through one worker. + loop = asyncio.get_running_loop() + return await loop.run_in_executor( + self._split_pdf_setup_executor, + self.before_request, + hook_ctx, + request, + ) async def call_api_partial( self, @@ -850,7 +872,7 @@ def _get_pdf_chunks_in_memory( split_size: int = 1, page_start: int = 1, page_end: Optional[int] = None - ) -> Generator[Tuple[BinaryIO, int], None, None]: + ) -> list[Tuple[BinaryIO, int]]: """Reads given bytes of a pdf file and split it into n pdf-chunks, each with `split_size` pages. The chunks are written into temporary files in a temporary directory corresponding to the operation_id. @@ -866,32 +888,32 @@ def _get_pdf_chunks_in_memory( Returns: The list of temporary file paths. """ + self._assert_split_pdf_setup_locked() + pdf_chunks: list[Tuple[BinaryIO, int]] = [] with pdfium.PdfDocument(pdf_bytes) as pdf: - offset = page_start - 1 offset_end = page_end if page_end else len(pdf) while offset < offset_end: end = min(offset + split_size, offset_end) - # Create new PDF new_pdf = pdfium.PdfDocument.new() + try: + page_indices = list(range(offset, end)) + new_pdf.import_pages(pdf, pages=page_indices) - # Import pages - page_indices = list(range(offset, end)) - new_pdf.import_pages(pdf, pages=page_indices) - - # Save to buffer - chunk_buffer = io.BytesIO() - new_pdf.save(chunk_buffer) - chunk_buffer.seek(0) + chunk_buffer = io.BytesIO() + new_pdf.save(chunk_buffer) + chunk_buffer.seek(0) + finally: + new_pdf.close() - new_pdf.close() - - yield chunk_buffer, offset + pdf_chunks.append((chunk_buffer, offset)) offset += split_size + return pdf_chunks + def _get_pdf_chunk_paths( self, pdf_bytes: bytes, @@ -916,6 +938,7 @@ def _get_pdf_chunk_paths( Returns: The list of temporary file paths. """ + self._assert_split_pdf_setup_locked() # Create temporary directory tempdir = tempfile.TemporaryDirectory( # pylint: disable=consider-using-with @@ -936,21 +959,25 @@ def _get_pdf_chunk_paths( chunk_no += 1 end = min(offset + split_size, offset_end) - # Create new PDF with selected pages new_pdf = pdfium.PdfDocument.new() - page_indices = list(range(offset, end)) - new_pdf.import_pages(pdf, pages=page_indices) + try: + page_indices = list(range(offset, end)) + new_pdf.import_pages(pdf, pages=page_indices) - # Save to file - chunk_path = tempdir_path / f"chunk_{chunk_no}.pdf" - new_pdf.save(str(chunk_path)) # Convert Path to string - new_pdf.close() + chunk_path = tempdir_path / f"chunk_{chunk_no}.pdf" + new_pdf.save(str(chunk_path)) # Convert Path to string + finally: + new_pdf.close() pdf_chunk_paths.append((chunk_path, offset)) offset += split_size return pdf_chunk_paths + def _assert_split_pdf_setup_locked(self) -> None: + if not getattr(self._split_pdf_setup_state, "locked", False): + raise RuntimeError("pypdfium split setup must run under the split-PDF setup lock") + def _get_pdf_chunk_files( self, pdf_chunks: list[Tuple[Path, int]] ) -> Generator[Tuple[BinaryIO, int], None, None]: diff --git a/src/unstructured_client/_hooks/sdkhooks.py b/src/unstructured_client/_hooks/sdkhooks.py index d126dbf5..f6a12283 100644 --- a/src/unstructured_client/_hooks/sdkhooks.py +++ b/src/unstructured_client/_hooks/sdkhooks.py @@ -37,7 +37,6 @@ def __init__(self) -> None: self.before_request_hooks: List[BeforeRequestHook] = [] self.after_success_hooks: List[AfterSuccessHook] = [] self.after_error_hooks: List[AfterErrorHook] = [] - self._sync_hook_lock = asyncio.Lock() init_hooks(self) def register_sdk_init_hook(self, hook: SDKInitHook) -> None: @@ -76,8 +75,7 @@ async def before_request_async( if async_method is not None: out = await async_method(hook_ctx, request) else: - async with self._sync_hook_lock: - out = await asyncio.to_thread(hook.before_request, hook_ctx, request) + out = await asyncio.to_thread(hook.before_request, hook_ctx, request) if isinstance(out, Exception): raise out request = out From dc72b756001f818b71755f7ddafe2f5b10a441c9 Mon Sep 17 00:00:00 2001 From: Emily Voss Date: Thu, 30 Apr 2026 19:56:51 -0700 Subject: [PATCH 06/10] minor fix --- .../unit/test_split_pdf_hook.py | 28 +++++++++++++++++++ .../_hooks/custom/split_pdf_hook.py | 3 +- 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/_test_unstructured_client/unit/test_split_pdf_hook.py b/_test_unstructured_client/unit/test_split_pdf_hook.py index c4a06817..19e1de3f 100644 --- a/_test_unstructured_client/unit/test_split_pdf_hook.py +++ b/_test_unstructured_client/unit/test_split_pdf_hook.py @@ -833,6 +833,34 @@ async def test_unit_run_tasks_allow_failed_cancelled_error_propagates(): await run_tasks(tasks, allow_failed=True) +@pytest.mark.asyncio +async def test_unit_run_tasks_caller_cancelled_logs_pending_task_count( + caplog: pytest.LogCaptureFixture, +): + caplog.set_level(logging.WARNING, logger="unstructured-client") + + async def _wait_forever( + async_client: httpx.AsyncClient, + limiter: asyncio.Semaphore, + ) -> httpx.Response: + del async_client, limiter + await asyncio.Event().wait() + return _httpx_response("unreachable") + + tasks = [partial(_wait_forever), partial(_wait_forever), partial(_wait_forever)] + run_task = asyncio.create_task( + run_tasks(tasks, allow_failed=False, operation_id="caller-cancelled") + ) + await asyncio.sleep(0) + + run_task.cancel() + with pytest.raises(asyncio.CancelledError): + await run_task + + assert "event=batch_cancel_remaining operation_id=caller-cancelled" in caplog.text + assert "remaining_tasks=3" in caplog.text + + @pytest.mark.asyncio async def test_unit_run_tasks_disallow_failed_transport_exception_cancels_remaining(): cancelled_counter = Counter() diff --git a/src/unstructured_client/_hooks/custom/split_pdf_hook.py b/src/unstructured_client/_hooks/custom/split_pdf_hook.py index 04986058..b960bf07 100644 --- a/src/unstructured_client/_hooks/custom/split_pdf_hook.py +++ b/src/unstructured_client/_hooks/custom/split_pdf_hook.py @@ -167,11 +167,12 @@ async def run_tasks( except asyncio.CancelledError: for task in tasks: task.cancel() + remaining_tasks = sum(1 for task in tasks if not task.done()) await asyncio.gather(*tasks, return_exceptions=True) logger.warning( "split_pdf event=batch_cancel_remaining operation_id=%s reason=caller_cancelled remaining_tasks=%d", operation_id, - sum(1 for task in tasks if not task.done()), + remaining_tasks, ) raise From f5325d1933d15fd29da28a45ea9a7d980cabc758 Mon Sep 17 00:00:00 2001 From: Emily Voss Date: Thu, 30 Apr 2026 20:04:44 -0700 Subject: [PATCH 07/10] mypy again --- src/unstructured_client/_hooks/custom/split_pdf_hook.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/unstructured_client/_hooks/custom/split_pdf_hook.py b/src/unstructured_client/_hooks/custom/split_pdf_hook.py index b960bf07..0ac85ad8 100644 --- a/src/unstructured_client/_hooks/custom/split_pdf_hook.py +++ b/src/unstructured_client/_hooks/custom/split_pdf_hook.py @@ -10,7 +10,7 @@ import tempfile import threading import uuid -from collections.abc import Awaitable +from collections.abc import Awaitable, Iterable from concurrent import futures from functools import partial from pathlib import Path @@ -652,6 +652,7 @@ def _before_request_unlocked( pdf_bytes = pdf.stream.read() temp_dir_path = None + pdf_chunks: Iterable[Tuple[BinaryIO, int]] if cache_tmp_data_feature: pdf_chunk_paths = self._get_pdf_chunk_paths( pdf_bytes, From 7fd8d55e15f47353acc9b50fc5a4dc13265f36d7 Mon Sep 17 00:00:00 2001 From: Emily Voss Date: Thu, 30 Apr 2026 20:42:40 -0700 Subject: [PATCH 08/10] cursor bot review --- .../unit/test_split_pdf_hook.py | 56 +++++++++++++++++++ src/unstructured_client/basesdk.py | 2 + 2 files changed, 58 insertions(+) diff --git a/_test_unstructured_client/unit/test_split_pdf_hook.py b/_test_unstructured_client/unit/test_split_pdf_hook.py index 19e1de3f..4a17147f 100644 --- a/_test_unstructured_client/unit/test_split_pdf_hook.py +++ b/_test_unstructured_client/unit/test_split_pdf_hook.py @@ -749,6 +749,62 @@ def _prepared_split_request(hook_ctx, request): tempdir.cleanup.assert_called_once() +@pytest.mark.asyncio +async def test_unit_do_request_async_cancellation_logs_cancelled_cleanup( + caplog: pytest.LogCaptureFixture, +): + caplog.set_level(logging.DEBUG, logger="test") + operation_id = "cancelled-cleanup" + + class PreparedRequestHook: + def before_request(self, hook_ctx, request): + del hook_ctx, request + return httpx.Request( + "GET", + "http://localhost:8888/general/docs", + headers={"operation_id": operation_id}, + extensions={"split_pdf_operation_id": operation_id}, + ) + + class CancelledCleanupHook: + async def after_error_async(self, hook_ctx, response, error): + del hook_ctx, response, error + raise asyncio.CancelledError() + + def after_error(self, hook_ctx, response, error): # pragma: no cover - dispatch guard + raise AssertionError("async hook should be awaited") + + hooks = SDKHooks() + hooks.before_request_hooks = [PreparedRequestHook()] # type: ignore[list-item] + hooks.after_error_hooks = [CancelledCleanupHook()] # type: ignore[list-item] + + client = _BlockingAsyncClient() + config = SDKConfiguration( + client=None, + client_supplied=False, + async_client=client, # type: ignore[arg-type] + async_client_supplied=True, + debug_logger=logging.getLogger("test"), + ) + config.__dict__["_hooks"] = hooks + sdk = BaseSDK(config) + task = asyncio.create_task( + sdk.do_request_async( + _make_sdk_hook_context(), + httpx.Request("POST", "http://localhost:8888/general/v0/general"), + error_status_codes=[], + ) + ) + + await client.started.wait() + task.cancel() + + with pytest.raises(asyncio.CancelledError): + await task + + assert "Cancellation cleanup cancelled" in caplog.text + + def test_before_request_returns_dummy_with_timeout_and_operation_id(): hook, mock_hook_ctx, result = _make_hook_with_split_request() operation_id = result.headers["operation_id"] diff --git a/src/unstructured_client/basesdk.py b/src/unstructured_client/basesdk.py index 13877122..972d2797 100644 --- a/src/unstructured_client/basesdk.py +++ b/src/unstructured_client/basesdk.py @@ -323,6 +323,8 @@ async def cleanup_cancelled_request( response, _RequestBoundCancelledError(cleanup_request, cancellation), ) + except asyncio.CancelledError: + logger.debug("Cancellation cleanup cancelled", exc_info=True) except Exception: logger.debug("Cancellation cleanup failed", exc_info=True) From 5d43d4afb55f39dc593dd613eb3eec1e0853d0c9 Mon Sep 17 00:00:00 2001 From: Emily Voss Date: Thu, 30 Apr 2026 21:02:57 -0700 Subject: [PATCH 09/10] more thorough review --- .../unit/test_split_pdf_hook.py | 141 +++++++++++++----- .../_hooks/custom/split_pdf_hook.py | 113 +++++++++++--- src/unstructured_client/_hooks/sdkhooks.py | 47 +++++- src/unstructured_client/basesdk.py | 32 +--- 4 files changed, 243 insertions(+), 90 deletions(-) diff --git a/_test_unstructured_client/unit/test_split_pdf_hook.py b/_test_unstructured_client/unit/test_split_pdf_hook.py index 4a17147f..efac35b4 100644 --- a/_test_unstructured_client/unit/test_split_pdf_hook.py +++ b/_test_unstructured_client/unit/test_split_pdf_hook.py @@ -69,6 +69,25 @@ def test_unit_clear_operation(): assert hook.operation_timeouts.get(operation_id) is None +def test_unit_clear_operation_closes_unconsumed_chunk_files(tmp_path: Path): + hook = SplitPdfHook() + operation_id = "cache-mode-clear" + chunk_path = tmp_path / "chunk.pdf" + chunk_path.write_bytes(b"%PDF") + chunk_file = open(chunk_path, mode="rb") # pylint: disable=consider-using-with + tempdir = MagicMock() + + hook.coroutines_to_execute[operation_id] = [ + partial(hook.call_api_partial, pdf_chunk_file=chunk_file), + ] + hook.tempdirs[operation_id] = tempdir + + hook._clear_operation(operation_id) + + assert chunk_file.closed + tempdir.cleanup.assert_called_once() + + def test_unit_get_request_timeout_seconds_uses_request_timeout_extension(): request = httpx.Request( "POST", @@ -878,15 +897,17 @@ async def test_unit_run_tasks_allow_failed_transport_exception(): @pytest.mark.asyncio -async def test_unit_run_tasks_allow_failed_cancelled_error_propagates(): +async def test_unit_run_tasks_allow_failed_cancelled_error_becomes_failed_response(): tasks = [ partial(_slow_success_request, content="1"), partial(_cancelled_request), partial(_slow_success_request, content="3"), ] - with pytest.raises(asyncio.CancelledError): - await run_tasks(tasks, allow_failed=True) + responses = await run_tasks(tasks, allow_failed=True) + + assert [response.status_code for _, response in responses] == [200, 500, 200] + assert isinstance(responses[1][1].extensions["transport_exception"], asyncio.CancelledError) @pytest.mark.asyncio @@ -1123,10 +1144,9 @@ def before_request(self, hook_ctx, request): @pytest.mark.asyncio -async def test_unit_sdk_hooks_before_request_async_allows_sync_hooks_to_overlap(): +async def test_unit_sdk_hooks_before_request_async_serializes_same_sync_hook_instance(): release_hooks = threading.Event() first_hook_started = threading.Event() - both_hooks_started = threading.Event() active_lock = threading.Lock() active_hooks = 0 max_active_hooks = 0 @@ -1140,8 +1160,6 @@ def before_request(self, hook_ctx, request): max_active_hooks = max(max_active_hooks, active_hooks) if active_hooks == 1: first_hook_started.set() - if active_hooks == 2: - both_hooks_started.set() try: release_hooks.wait(timeout=1) request.headers["X-Sync-Before-Hook"] = "called" @@ -1159,7 +1177,8 @@ def before_request(self, hook_ctx, request): first_task = asyncio.create_task(hooks.before_request_async(hook_ctx, first_request)) await asyncio.to_thread(first_hook_started.wait, 1) second_task = asyncio.create_task(hooks.before_request_async(hook_ctx, second_request)) - assert await asyncio.to_thread(both_hooks_started.wait, 1) + await asyncio.sleep(0.01) + assert max_active_hooks == 1 release_hooks.set() returned_first, returned_second = await asyncio.gather(first_task, second_task) @@ -1168,7 +1187,7 @@ def before_request(self, hook_ctx, request): assert returned_second is second_request assert first_request.headers["X-Sync-Before-Hook"] == "called" assert second_request.headers["X-Sync-Before-Hook"] == "called" - assert max_active_hooks == 2 + assert max_active_hooks == 1 @pytest.mark.asyncio @@ -1280,40 +1299,94 @@ def test_unit_pdfium_new_document_closes_when_cached_split_fails(tmp_path: Path) @pytest.mark.asyncio -async def test_unit_do_request_async_cancellation_during_before_request_cleans_up_later(): +async def test_unit_split_pdf_before_request_async_cancellation_cleans_prepared_state(): setup_started = threading.Event() release_setup = threading.Event() - cleanup_called = asyncio.Event() - cancellation_is_asyncio_cancelled_error = [] operation_id = "cancelled-during-setup" + tempdir = MagicMock() + hook = SplitPdfHook() + + def slow_setup(hook_ctx, request): + del hook_ctx, request + setup_started.set() + release_setup.wait(timeout=1) + hook.coroutines_to_execute[operation_id] = [] + hook.pending_operation_ids[operation_id] = operation_id + hook.tempdirs[operation_id] = tempdir + return httpx.Request( + "GET", + "http://localhost:8888/general/docs", + headers={"operation_id": operation_id}, + extensions={"split_pdf_operation_id": operation_id}, + ) + + hook_ctx = MagicMock(spec=BeforeRequestContext) + hook_ctx.operation_id = "partition" + request = httpx.Request("POST", "http://localhost:8888/general/v0/general") + + with patch.object(hook, "_before_request_unlocked", side_effect=slow_setup): + task = asyncio.create_task(hook.before_request_async(hook_ctx, request)) + await asyncio.to_thread(setup_started.wait, 1) + task.cancel() + release_setup.set() + + with pytest.raises(asyncio.CancelledError): + await asyncio.wait_for(task, timeout=1) + + assert operation_id not in hook.coroutines_to_execute + assert operation_id not in hook.pending_operation_ids + assert operation_id not in hook.tempdirs + tempdir.cleanup.assert_called_once() + + with patch.object(hook, "_before_request_unlocked", return_value=request): + assert await asyncio.wait_for(hook.before_request_async(hook_ctx, request), timeout=1) is request + + +@pytest.mark.asyncio +async def test_unit_split_pdf_before_request_async_cancellation_before_admission_does_not_queue(): + setup_started = threading.Event() + release_setup = threading.Event() + hook = SplitPdfHook() + hook_ctx = MagicMock(spec=BeforeRequestContext) + hook_ctx.operation_id = "partition" + request = httpx.Request("POST", "http://localhost:8888/general/v0/general") + + def slow_setup(hook_ctx_arg, request_arg): + del hook_ctx_arg + setup_started.set() + release_setup.wait(timeout=1) + return request_arg + + with patch.object(hook, "_before_request_unlocked", side_effect=slow_setup) as mock_setup: + first_task = asyncio.create_task(hook.before_request_async(hook_ctx, request)) + await asyncio.to_thread(setup_started.wait, 1) + second_task = asyncio.create_task(hook.before_request_async(hook_ctx, request)) + await asyncio.sleep(0) + + second_task.cancel() + with pytest.raises(asyncio.CancelledError): + await second_task + + release_setup.set() + assert await asyncio.wait_for(first_task, timeout=1) is request + + assert mock_setup.call_count == 1 + + +@pytest.mark.asyncio +async def test_unit_do_request_async_cancellation_during_before_request_cancels_setup(): + setup_started = threading.Event() + release_setup = threading.Event() class SlowBeforeRequestHook: def before_request(self, hook_ctx, request): del hook_ctx, request setup_started.set() release_setup.wait(timeout=1) - return httpx.Request( - "GET", - "http://localhost:8888/general/docs", - headers={"operation_id": operation_id}, - extensions={"split_pdf_operation_id": operation_id}, - ) - - class CancellationObserverHook: - async def after_error_async(self, hook_ctx, response, error): - del hook_ctx, response - cancellation_is_asyncio_cancelled_error.append( - isinstance(error, asyncio.CancelledError) - ) - cleanup_called.set() - return None, error - - def after_error(self, hook_ctx, response, error): # pragma: no cover - dispatch guard - raise AssertionError("async hook should be awaited") + return httpx.Request("GET", "http://localhost:8888/general/docs") hooks = SDKHooks() hooks.before_request_hooks = [SlowBeforeRequestHook()] # type: ignore[list-item] - hooks.after_error_hooks = [CancellationObserverHook()] # type: ignore[list-item] client = _BlockingAsyncClient() config = SDKConfiguration( @@ -1335,15 +1408,11 @@ def after_error(self, hook_ctx, response, error): # pragma: no cover - dispatch await asyncio.to_thread(setup_started.wait, 1) task.cancel() + with pytest.raises(asyncio.CancelledError): await asyncio.wait_for(task, timeout=0.05) - assert not cleanup_called.is_set() - release_setup.set() - await asyncio.wait_for(cleanup_called.wait(), timeout=1) - - assert cancellation_is_asyncio_cancelled_error == [True] def test_unit_before_request_uses_hook_ctx_timeout_when_request_timeout_missing(): diff --git a/src/unstructured_client/_hooks/custom/split_pdf_hook.py b/src/unstructured_client/_hooks/custom/split_pdf_hook.py index 0ac85ad8..abd7b302 100644 --- a/src/unstructured_client/_hooks/custom/split_pdf_hook.py +++ b/src/unstructured_client/_hooks/custom/split_pdf_hook.py @@ -203,7 +203,15 @@ async def _collect_task_responses( ) ) elif isinstance(result, asyncio.CancelledError): - raise result + logger.error( + "split_pdf event=chunk_transport_error operation_id=%s chunk_index=%d error_type=%s error=%s", + operation_id, + index, + type(result).__name__, + result, + exc_info=result, + ) + normalized_responses.append((index, _create_transport_error_response(result))) elif isinstance(result, BaseException): logger.error( "split_pdf event=chunk_transport_error operation_id=%s chunk_index=%d error_type=%s error=%s", @@ -344,11 +352,12 @@ class SplitPdfHook(SDKInitHook, BeforeRequestHook, AfterSuccessHook, AfterErrorH max_workers=1, thread_name_prefix="split-pdf-setup", ) + _split_pdf_setup_gate = threading.BoundedSemaphore(value=1) + _split_pdf_setup_poll_interval_seconds = 0.01 _split_pdf_setup_state = threading.local() def __init__(self) -> None: self.client: Optional[HttpClient] = None - self.partition_base_url: Optional[str] = None self.async_client: Optional[AsyncHttpClient] = None self.coroutines_to_execute: dict[ str, list[partial[Coroutine[Any, Any, httpx.Response]]] @@ -501,11 +510,6 @@ def handle_request(self, request: httpx.Request) -> httpx.Response: # # Otherwise, pass the request to the default transport # return await self.base_transport.handle_async_request(request) - # Instead, save the base url so we can use it for our dummy request - # As this can be overwritten with Platform API URL, we need to get it again in - # `before_request` hook from the request object as the real URL is not available here. - self.partition_base_url = base_url - # Explicit cast to httpx.Client to avoid a typing error httpx_client = cast(httpx.Client, client) # async_httpx_client = cast(httpx.AsyncClient, async_client) @@ -524,19 +528,20 @@ def _before_request_unlocked( self, hook_ctx: BeforeRequestContext, request: httpx.Request ) -> Union[httpx.Request, Exception]: """If `splitPdfPage` is set to `true` in the request, the PDF file is split into - separate pages. Each page is sent as a separate request in parallel. The last - page request is returned by this method. It will return the original request - when: `splitPdfPage` is set to `false`, the file is not a PDF, or the HTTP + chunks and the chunk requests are prepared for later execution. The split + path returns a synthetic request carrying the split operation ID so + after_success can collect chunk results. It returns the original request + when `splitPdfPage` is `false`, the file is not a PDF, or the HTTP client has not been initialized. Args: hook_ctx (BeforeRequestContext): The hook context containing information about the operation. - request (httpx.PreparedRequest): The request object. + request (httpx.Request): The request object. Returns: - Union[httpx.PreparedRequest, Exception]: If `splitPdfPage` is set to `true`, - the last page request; otherwise, the original request. + Union[httpx.Request, Exception]: If `splitPdfPage` is set to `true`, + the synthetic collection request; otherwise, the original request. """ # Actually the general.partition operation overwrites the default client's base url (as @@ -750,14 +755,56 @@ def before_request( async def before_request_async( self, hook_ctx: BeforeRequestContext, request: httpx.Request ) -> Union[httpx.Request, Exception]: - # Keep pypdfium setup off the event loop while routing all async callers through one worker. + # Keep pypdfium setup off the event loop while preserving a single process-wide + # admission lane. pypdfium is not thread-safe, so cancelled callers must not + # leave queued setup work piling up behind the worker. + await self._acquire_split_pdf_setup_slot() loop = asyncio.get_running_loop() - return await loop.run_in_executor( + setup_future = loop.run_in_executor( self._split_pdf_setup_executor, self.before_request, hook_ctx, request, ) + try: + return await asyncio.shield(setup_future) + except asyncio.CancelledError: + result = await self._finish_cancelled_split_setup(setup_future) + if isinstance(result, httpx.Request): + self._clear_prepared_split_request(result) + raise + finally: + self._split_pdf_setup_gate.release() + + @classmethod + async def _acquire_split_pdf_setup_slot(cls) -> None: + while not cls._split_pdf_setup_gate.acquire(blocking=False): + await asyncio.sleep(cls._split_pdf_setup_poll_interval_seconds) + + async def _finish_cancelled_split_setup( + self, + setup_future: asyncio.Future[Union[httpx.Request, Exception]], + ) -> Optional[Union[httpx.Request, Exception]]: + while True: + try: + return await asyncio.shield(setup_future) + except asyncio.CancelledError: + if setup_future.cancelled(): + return None + continue + except Exception: + logger.debug("Cancelled split-PDF setup failed before cleanup", exc_info=True) + return None + + def _clear_prepared_split_request(self, request: httpx.Request) -> None: + operation_id = self._get_operation_id_from_request(request) + if operation_id is None: + return + logger.warning( + "split_pdf event=before_request_cancel_cleanup operation_id=%s", + operation_id, + ) + self._clear_operation(operation_id) async def call_api_partial( self, @@ -876,8 +923,7 @@ def _get_pdf_chunks_in_memory( page_end: Optional[int] = None ) -> list[Tuple[BinaryIO, int]]: """Reads given bytes of a pdf file and split it into n pdf-chunks, each - with `split_size` pages. The chunks are written into temporary files in - a temporary directory corresponding to the operation_id. + with `split_size` pages. The chunks are returned as in-memory buffers. Args: file_content: Content of the PDF file. @@ -888,7 +934,7 @@ def _get_pdf_chunks_in_memory( page_end: If provided, split up to and including this page number Returns: - The list of temporary file paths. + The list of chunk buffers and their zero-based page offsets. """ self._assert_split_pdf_setup_locked() @@ -1368,7 +1414,8 @@ def _clear_operation(self, operation_id: str) -> None: Args: operation_id (str): The ID of the operation to clear. """ - self.coroutines_to_execute.pop(operation_id, None) + tasks = self.coroutines_to_execute.pop(operation_id, None) + closed_chunk_files = self._close_unconsumed_chunk_files(tasks) self.api_successful_responses.pop(operation_id, None) self.api_failed_responses.pop(operation_id, None) self.concurrency_level.pop(operation_id, None) @@ -1383,12 +1430,13 @@ def _clear_operation(self, operation_id: str) -> None: executor = self.executors.pop(operation_id, None) tempdir = self.tempdirs.pop(operation_id, None) logger.debug( - "split_pdf event=clear_operation operation_id=%s has_future=%s future_done=%s has_executor=%s has_tempdir=%s", + "split_pdf event=clear_operation operation_id=%s has_future=%s future_done=%s has_executor=%s has_tempdir=%s closed_chunk_files=%d", operation_id, future is not None, future.done() if future is not None else None, executor is not None, tempdir is not None, + closed_chunk_files, ) if future is not None and not future.done(): loop = loop_holder.get("loop") if loop_holder is not None else None @@ -1411,3 +1459,28 @@ def _clear_operation(self, operation_id: str) -> None: ) return self._finalize_operation_resources(executor, tempdir, operation_id) + + @staticmethod + def _close_unconsumed_chunk_files( + tasks: Optional[list[partial[Coroutine[Any, Any, httpx.Response]]]], + ) -> int: + if tasks is None: + return 0 + + closed_count = 0 + seen_files: set[int] = set() + for task in tasks: + keywords = getattr(task, "keywords", None) or {} + pdf_chunk_file = keywords.get("pdf_chunk_file") + if pdf_chunk_file is None or id(pdf_chunk_file) in seen_files: + continue + seen_files.add(id(pdf_chunk_file)) + close = getattr(pdf_chunk_file, "close", None) + if close is None or getattr(pdf_chunk_file, "closed", False) is True: + continue + try: + close() + closed_count += 1 + except Exception: + logger.debug("Failed to close split-PDF chunk file during cleanup", exc_info=True) + return closed_count diff --git a/src/unstructured_client/_hooks/sdkhooks.py b/src/unstructured_client/_hooks/sdkhooks.py index f6a12283..948b68bc 100644 --- a/src/unstructured_client/_hooks/sdkhooks.py +++ b/src/unstructured_client/_hooks/sdkhooks.py @@ -2,6 +2,7 @@ import asyncio import inspect +import threading from typing import Any, Awaitable, Callable, List, Optional, Tuple import httpx @@ -37,8 +38,29 @@ def __init__(self) -> None: self.before_request_hooks: List[BeforeRequestHook] = [] self.after_success_hooks: List[AfterSuccessHook] = [] self.after_error_hooks: List[AfterErrorHook] = [] + self._sync_hook_locks_guard = threading.Lock() + self._sync_hook_locks: dict[int, threading.Lock] = {} init_hooks(self) + def _get_sync_hook_lock(self, hook: object) -> threading.Lock: + hook_id = id(hook) + with self._sync_hook_locks_guard: + lock = self._sync_hook_locks.get(hook_id) + if lock is None: + lock = threading.Lock() + self._sync_hook_locks[hook_id] = lock + return lock + + def _run_sync_hook_method( + self, + hook: object, + method_name: str, + *args: object, + ) -> Any: + method = getattr(hook, method_name) + with self._get_sync_hook_lock(hook): + return method(*args) + def register_sdk_init_hook(self, hook: SDKInitHook) -> None: self.sdk_init_hooks.append(hook) @@ -75,7 +97,13 @@ async def before_request_async( if async_method is not None: out = await async_method(hook_ctx, request) else: - out = await asyncio.to_thread(hook.before_request, hook_ctx, request) + out = await asyncio.to_thread( + self._run_sync_hook_method, + hook, + "before_request", + hook_ctx, + request, + ) if isinstance(out, Exception): raise out request = out @@ -100,7 +128,13 @@ async def after_success_async( if async_method is not None: out = await async_method(hook_ctx, response) else: - out = await asyncio.to_thread(hook.after_success, hook_ctx, response) + out = await asyncio.to_thread( + self._run_sync_hook_method, + hook, + "after_success", + hook_ctx, + response, + ) if isinstance(out, Exception): raise out response = out @@ -130,7 +164,14 @@ async def after_error_async( if async_method is not None: result = await async_method(hook_ctx, response, error) else: - result = await asyncio.to_thread(hook.after_error, hook_ctx, response, error) + result = await asyncio.to_thread( + self._run_sync_hook_method, + hook, + "after_error", + hook_ctx, + response, + error, + ) if isinstance(result, Exception): raise result response, error = result diff --git a/src/unstructured_client/basesdk.py b/src/unstructured_client/basesdk.py index 972d2797..4b10c124 100644 --- a/src/unstructured_client/basesdk.py +++ b/src/unstructured_client/basesdk.py @@ -328,41 +328,11 @@ async def cleanup_cancelled_request( except Exception: logger.debug("Cancellation cleanup failed", exc_info=True) - def cleanup_when_before_request_finishes( - before_request_task: "asyncio.Task[httpx.Request]", - cancellation: asyncio.CancelledError, - ) -> None: - def on_done(task: "asyncio.Task[httpx.Request]") -> None: - if task.cancelled(): - return - try: - completed_req = task.result() - except Exception: - logger.debug("Cancelled request setup failed before cleanup", exc_info=True) - return - asyncio.create_task( - cleanup_cancelled_request(completed_req, None, cancellation) - ) - - before_request_task.add_done_callback(on_done) - async def do(): http_res = None req = None try: - before_request_task = asyncio.create_task( - hooks.before_request_async(BeforeRequestContext(hook_ctx), request) - ) - try: - # Sync before-request hooks may be running in a worker thread; if the caller - # cancels, let setup finish in the background so cleanup can find request state. - req = await asyncio.shield(before_request_task) - except asyncio.CancelledError as cancellation: - if before_request_task.done() and not before_request_task.cancelled(): - req = before_request_task.result() - else: - cleanup_when_before_request_finishes(before_request_task, cancellation) - raise + req = await hooks.before_request_async(BeforeRequestContext(hook_ctx), request) logger.debug( "Request:\nMethod: %s\nURL: %s\nHeaders: %s\nBody: %s", req.method, From da0352c716ea1cd51aafac1ad4ff63c48fb118d3 Mon Sep 17 00:00:00 2001 From: Emily Voss Date: Fri, 1 May 2026 18:13:59 -0700 Subject: [PATCH 10/10] final pass --- CHANGELOG.md | 1 + .../test_split_pdf_async_behavior.py | 8 +- .../unit/test_split_pdf_hook.py | 88 +++++++++++++++++-- .../_hooks/custom/split_pdf_hook.py | 77 +++++++++++++++- src/unstructured_client/basesdk.py | 23 +++-- 5 files changed, 177 insertions(+), 20 deletions(-) rename _test_unstructured_client/{integration => unit}/test_split_pdf_async_behavior.py (98%) diff --git a/CHANGELOG.md b/CHANGELOG.md index c9662250..d721cd56 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ ### Fixes * Route split-PDF `partition_async()` result collection through awaited async hook dispatch instead of creating a nested event loop in a worker thread. + Sync-only hooks on the async path now run on a worker thread, so hook code that depends on event-loop-thread `contextvars` or thread-local state should pass that state explicitly. * Add cancellation cleanup for in-flight split-PDF chunk tasks and preserve existing sync `partition()` split-PDF behavior with lazy executor creation. ## 0.43.2 diff --git a/_test_unstructured_client/integration/test_split_pdf_async_behavior.py b/_test_unstructured_client/unit/test_split_pdf_async_behavior.py similarity index 98% rename from _test_unstructured_client/integration/test_split_pdf_async_behavior.py rename to _test_unstructured_client/unit/test_split_pdf_async_behavior.py index 7f8639c9..3e49bb8d 100644 --- a/_test_unstructured_client/integration/test_split_pdf_async_behavior.py +++ b/_test_unstructured_client/unit/test_split_pdf_async_behavior.py @@ -110,7 +110,7 @@ def _make_sdk_split_task( @pytest.mark.asyncio async def test_partition_async_split_collects_chunks_in_order_without_executor(): - operation_id = "integration-happy" + operation_id = "unit-happy" split_hook = SplitPdfHook() active_chunks = 0 max_active_chunks = 0 @@ -191,7 +191,7 @@ def _chunk_client_factory(*args, **kwargs): @pytest.mark.asyncio async def test_partition_async_cancellation_cleans_split_state_and_tempdir(): - operation_id = "integration-cancel" + operation_id = "unit-cancel" split_hook = SplitPdfHook() started = asyncio.Event() cancelled_counter = Counter() @@ -236,7 +236,7 @@ async def _hanging_chunk( @pytest.mark.asyncio async def test_partition_async_strict_failure_drains_sibling_chunks_before_close(): - operation_id = "integration-strict-failure" + operation_id = "unit-strict-failure" split_hook = SplitPdfHook() sibling_started = asyncio.Event() sibling_cancelled = asyncio.Event() @@ -370,7 +370,7 @@ async def _release_sync_hook_from_loop() -> None: @pytest.mark.asyncio async def test_partition_async_reassembly_does_not_block_event_loop(): - operation_id = "integration-offload" + operation_id = "unit-offload" split_hook = SplitPdfHook() reassembly_started = threading.Event() reassembly_released = threading.Event() diff --git a/_test_unstructured_client/unit/test_split_pdf_hook.py b/_test_unstructured_client/unit/test_split_pdf_hook.py index efac35b4..bf0a83da 100644 --- a/_test_unstructured_client/unit/test_split_pdf_hook.py +++ b/_test_unstructured_client/unit/test_split_pdf_hook.py @@ -4,17 +4,17 @@ import io import logging import threading -from asyncio import Task from collections import Counter from concurrent import futures from functools import partial from pathlib import Path +from typing import Any, Coroutine from unittest.mock import AsyncMock, MagicMock, patch import httpx import pytest -import requests -from requests_toolbelt import MultipartDecoder +import requests # type: ignore[import-untyped] +from requests_toolbelt import MultipartDecoder # type: ignore[import-untyped] from unstructured_client._hooks.custom import form_utils, pdf_utils, request_utils from unstructured_client._hooks.custom.form_utils import ( @@ -438,7 +438,7 @@ async def _request_mock( @pytest.mark.asyncio async def test_unit_disallow_failed_coroutines( allow_failed: bool, - tasks: list[Task], + tasks: list[partial[Coroutine[Any, Any, httpx.Response]]], expected_responses: list[str], ): """Test disallow failed coroutines method properly sets the flag to False.""" @@ -824,6 +824,70 @@ def after_error(self, hook_ctx, response, error): # pragma: no cover - dispatch assert "Cancellation cleanup cancelled" in caplog.text +@pytest.mark.asyncio +async def test_unit_do_request_async_secondary_cancellation_waits_for_cleanup(): + cleanup_started = asyncio.Event() + release_cleanup = asyncio.Event() + cleanup_finished = asyncio.Event() + + class PreparedRequestHook: + def before_request(self, hook_ctx, request): + del hook_ctx, request + return httpx.Request( + "GET", + "http://localhost:8888/general/docs", + headers={"operation_id": "secondary-cancel-cleanup"}, + extensions={"split_pdf_operation_id": "secondary-cancel-cleanup"}, + ) + + class SlowCleanupHook: + async def after_error_async(self, hook_ctx, response, error): + del hook_ctx, response, error + cleanup_started.set() + await release_cleanup.wait() + cleanup_finished.set() + return None, None + + def after_error(self, hook_ctx, response, error): # pragma: no cover - dispatch guard + raise AssertionError("async hook should be awaited") + + hooks = SDKHooks() + hooks.before_request_hooks = [PreparedRequestHook()] # type: ignore[list-item] + hooks.after_error_hooks = [SlowCleanupHook()] # type: ignore[list-item] + + client = _BlockingAsyncClient() + config = SDKConfiguration( + client=None, + client_supplied=False, + async_client=client, # type: ignore[arg-type] + async_client_supplied=True, + debug_logger=logging.getLogger("test"), + ) + config.__dict__["_hooks"] = hooks + sdk = BaseSDK(config) + task = asyncio.create_task( + sdk.do_request_async( + _make_sdk_hook_context(), + httpx.Request("POST", "http://localhost:8888/general/v0/general"), + error_status_codes=[], + ) + ) + + await client.started.wait() + task.cancel() + await cleanup_started.wait() + task.cancel() + await asyncio.sleep(0) + + assert not task.done() + + release_cleanup.set() + with pytest.raises(asyncio.CancelledError): + await task + + assert cleanup_finished.is_set() + + def test_before_request_returns_dummy_with_timeout_and_operation_id(): hook, mock_hook_ctx, result = _make_hook_with_split_request() operation_id = result.headers["operation_id"] @@ -836,6 +900,17 @@ def test_before_request_returns_dummy_with_timeout_and_operation_id(): assert operation_id in hook.pending_operation_ids +def test_before_request_rejects_reused_operation_id(): + hook = SplitPdfHook() + hook.coroutines_to_execute["reused-operation-id"] = [] + + with patch( + "unstructured_client._hooks.custom.split_pdf_hook.uuid.uuid4", + return_value="reused-operation-id", + ), pytest.raises(RuntimeError, match="Split PDF operation ID already in use"): + _make_hook_with_split_request(hook=hook) + + def test_before_request_logs_split_plan(caplog: pytest.LogCaptureFixture): caplog.set_level(logging.INFO, logger="unstructured-client") @@ -1784,7 +1859,8 @@ def test_unit_allow_failed_partial_results(caplog: pytest.LogCaptureFixture): hook.concurrency_level[operation_id] = 3 hook.allow_failed[operation_id] = True hook.cache_tmp_data_feature[operation_id] = False - hook.executors[operation_id] = MagicMock() + executor = MagicMock() + hook.executors[operation_id] = executor fake_future = MagicMock() fake_future.result.return_value = [ @@ -1792,7 +1868,7 @@ def test_unit_allow_failed_partial_results(caplog: pytest.LogCaptureFixture): (2, _httpx_response("boom", status_code=500)), (3, _httpx_json_response([{"page_number": 3}])), ] - hook.executors[operation_id].submit.return_value = fake_future + executor.submit.return_value = fake_future elements = hook._await_elements(operation_id) diff --git a/src/unstructured_client/_hooks/custom/split_pdf_hook.py b/src/unstructured_client/_hooks/custom/split_pdf_hook.py index abd7b302..7a6b3489 100644 --- a/src/unstructured_client/_hooks/custom/split_pdf_hook.py +++ b/src/unstructured_client/_hooks/custom/split_pdf_hook.py @@ -10,6 +10,7 @@ import tempfile import threading import uuid +from collections import deque from collections.abc import Awaitable, Iterable from concurrent import futures from functools import partial @@ -72,6 +73,67 @@ def __init__(self, index: int, inner: BaseException): self.inner = inner +class _AsyncThreadSafeBoundedSemaphore: + """A cancellable async gate that can be shared across event loops.""" + + def __init__(self, value: int) -> None: + if value <= 0: + raise ValueError("Semaphore value must be greater than zero") + self._initial_value = value + self._value = value + self._waiters: deque[asyncio.Future[None]] = deque() + self._lock = threading.Lock() + + async def acquire(self) -> None: + loop = asyncio.get_running_loop() + waiter: Optional[asyncio.Future[None]] = None + with self._lock: + if self._value > 0: + self._value -= 1 + return + waiter = loop.create_future() + self._waiters.append(waiter) + + try: + await waiter + except asyncio.CancelledError: + release_transferred_slot = False + with self._lock: + try: + self._waiters.remove(waiter) + except ValueError: + release_transferred_slot = waiter.done() and not waiter.cancelled() + if release_transferred_slot: + self.release() + raise + + def release(self) -> None: + while True: + with self._lock: + while self._waiters: + waiter = self._waiters.popleft() + if not waiter.cancelled(): + break + else: + if self._value >= self._initial_value: + raise ValueError("Semaphore released too many times") + self._value += 1 + return + + def _wake_waiter() -> None: + if waiter.cancelled(): + self.release() + else: + waiter.set_result(None) + + try: + waiter.get_loop().call_soon_threadsafe(_wake_waiter) + return + except RuntimeError: + # The waiting loop closed before it could receive the slot. + continue + + def _get_request_timeout_seconds(request: httpx.Request) -> Optional[float]: timeout = request.extensions.get("timeout") if timeout is None: @@ -352,8 +414,8 @@ class SplitPdfHook(SDKInitHook, BeforeRequestHook, AfterSuccessHook, AfterErrorH max_workers=1, thread_name_prefix="split-pdf-setup", ) - _split_pdf_setup_gate = threading.BoundedSemaphore(value=1) - _split_pdf_setup_poll_interval_seconds = 0.01 + _split_pdf_setup_gate = _AsyncThreadSafeBoundedSemaphore(value=1) + # Thread-local flag shared by all hook instances to prove pypdfium work is under the setup lock. _split_pdf_setup_state = threading.local() def __init__(self) -> None: @@ -635,6 +697,9 @@ def _before_request_unlocked( if split_size >= page_count and page_count == len(pdf.pages): return request + if operation_id in self.coroutines_to_execute: + raise RuntimeError(f"Split PDF operation ID already in use: {operation_id}") + self.allow_failed[operation_id] = allow_failed self.cache_tmp_data_feature[operation_id] = cache_tmp_data_feature self.cache_tmp_data_dir[operation_id] = cache_tmp_data_dir @@ -778,13 +843,17 @@ async def before_request_async( @classmethod async def _acquire_split_pdf_setup_slot(cls) -> None: - while not cls._split_pdf_setup_gate.acquire(blocking=False): - await asyncio.sleep(cls._split_pdf_setup_poll_interval_seconds) + await cls._split_pdf_setup_gate.acquire() async def _finish_cancelled_split_setup( self, setup_future: asyncio.Future[Union[httpx.Request, Exception]], ) -> Optional[Union[httpx.Request, Exception]]: + """Finish setup after caller cancellation so prepared state can be cleaned. + + Non-cancellation failures mean setup failed before returning a prepared + request, so there is no operation ID to clear here. + """ while True: try: return await asyncio.shield(setup_future) diff --git a/src/unstructured_client/basesdk.py b/src/unstructured_client/basesdk.py index 4b10c124..4324324f 100644 --- a/src/unstructured_client/basesdk.py +++ b/src/unstructured_client/basesdk.py @@ -21,6 +21,8 @@ class _RequestBoundCancelledError(asyncio.CancelledError): + """Cancellation wrapper that exposes the request to after-error hooks.""" + def __init__(self, request: httpx.Request, cancellation: asyncio.CancelledError): super().__init__(str(cancellation) or "Request cancelled") self.request = request @@ -317,16 +319,25 @@ async def cleanup_cancelled_request( if cleanup_request is None and response is not None: cleanup_request = response.request assert cleanup_request is not None - try: - await hooks.after_error_async( + cleanup_task = asyncio.create_task( + hooks.after_error_async( AfterErrorContext(hook_ctx), response, _RequestBoundCancelledError(cleanup_request, cancellation), ) - except asyncio.CancelledError: - logger.debug("Cancellation cleanup cancelled", exc_info=True) - except Exception: - logger.debug("Cancellation cleanup failed", exc_info=True) + ) + while True: + try: + await asyncio.shield(cleanup_task) + return + except asyncio.CancelledError: + if cleanup_task.done(): + logger.debug("Cancellation cleanup cancelled", exc_info=True) + return + logger.debug("Cancellation cleanup still running after cancellation") + except BaseException: + logger.debug("Cancellation cleanup failed", exc_info=True) + return async def do(): http_res = None