diff --git a/examples/knowledge_base_indexing_wait.py b/examples/knowledge_base_indexing_wait.py new file mode 100644 index 00000000..8242750b --- /dev/null +++ b/examples/knowledge_base_indexing_wait.py @@ -0,0 +1,196 @@ +#!/usr/bin/env python3 +""" +Example: Waiting for Knowledge Base Indexing Job Completion + +This example demonstrates how to use the wait_for_completion() method +to automatically wait for a knowledge base indexing job to finish, +without needing to write manual polling loops. +""" + +import os +from gradient import Gradient + + +def main(): + # Initialize the Gradient client + client = Gradient() + + # Example 1: Basic usage - wait for indexing job to complete + print("Example 1: Basic usage") + print("-" * 50) + + # Create an indexing job (replace with your actual knowledge base UUID) + knowledge_base_uuid = os.getenv("KNOWLEDGE_BASE_UUID", "your-kb-uuid-here") + + print(f"Creating indexing job for knowledge base: {knowledge_base_uuid}") + indexing_job = client.knowledge_bases.indexing_jobs.create( + knowledge_base_uuid=knowledge_base_uuid, + ) + + job_uuid = indexing_job.job.uuid if indexing_job.job else None + if not job_uuid: + print("Error: Could not create indexing job") + return + + print(f"Indexing job created with UUID: {job_uuid}") + print("Waiting for indexing job to complete...") + + try: + # Wait for the job to complete (polls every 5 seconds by default) + completed_job = client.knowledge_bases.indexing_jobs.wait_for_completion( + job_uuid + ) + + print("\n✅ Indexing job completed successfully!") + if completed_job.job: + print(f"Phase: {completed_job.job.phase}") + print(f"Total items indexed: {completed_job.job.total_items_indexed}") + print(f"Total items failed: {completed_job.job.total_items_failed}") + print(f"Total datasources: {completed_job.job.total_datasources}") + print(f"Completed datasources: {completed_job.job.completed_datasources}") + + except TimeoutError as e: + print(f"\n⏱️ Timeout: {e}") + except RuntimeError as e: + print(f"\n❌ Error: {e}") + except Exception as e: + print(f"\n❌ Unexpected error: {e}") + + +def example_with_custom_polling(): + """Example with custom polling interval and timeout""" + print("\n\nExample 2: Custom polling interval and timeout") + print("-" * 50) + + client = Gradient() + knowledge_base_uuid = os.getenv("KNOWLEDGE_BASE_UUID", "your-kb-uuid-here") + + print(f"Creating indexing job for knowledge base: {knowledge_base_uuid}") + indexing_job = client.knowledge_bases.indexing_jobs.create( + knowledge_base_uuid=knowledge_base_uuid, + ) + + job_uuid = indexing_job.job.uuid if indexing_job.job else None + if not job_uuid: + print("Error: Could not create indexing job") + return + + print(f"Indexing job created with UUID: {job_uuid}") + print("Waiting for indexing job to complete (polling every 10 seconds, 5 minute timeout)...") + + try: + # Wait with custom poll interval (10 seconds) and timeout (5 minutes = 300 seconds) + completed_job = client.knowledge_bases.indexing_jobs.wait_for_completion( + job_uuid, + poll_interval=10, # Poll every 10 seconds + timeout=300, # Timeout after 5 minutes + ) + + print("\n✅ Indexing job completed successfully!") + if completed_job.job: + print(f"Phase: {completed_job.job.phase}") + + except TimeoutError: + print("\n⏱️ Job did not complete within 5 minutes") + # You can still check the current status + current_status = client.knowledge_bases.indexing_jobs.retrieve(job_uuid) + if current_status.job: + print(f"Current phase: {current_status.job.phase}") + print(f"Completed datasources: {current_status.job.completed_datasources}/{current_status.job.total_datasources}") + except RuntimeError as e: + print(f"\n❌ Job failed: {e}") + + +def example_manual_polling(): + """Example of the old manual polling approach (for comparison)""" + print("\n\nExample 3: Manual polling (old approach)") + print("-" * 50) + + client = Gradient() + knowledge_base_uuid = os.getenv("KNOWLEDGE_BASE_UUID", "your-kb-uuid-here") + + indexing_job = client.knowledge_bases.indexing_jobs.create( + knowledge_base_uuid=knowledge_base_uuid, + ) + + job_uuid = indexing_job.job.uuid if indexing_job.job else None + if not job_uuid: + print("Error: Could not create indexing job") + return + + print(f"Indexing job created with UUID: {job_uuid}") + print("Manual polling (old approach)...") + + import time + + while True: + indexing_job = client.knowledge_bases.indexing_jobs.retrieve(job_uuid) + + if indexing_job.job and indexing_job.job.phase: + phase = indexing_job.job.phase + print(f"Current phase: {phase}") + + if phase in ["BATCH_JOB_PHASE_UNKNOWN", "BATCH_JOB_PHASE_PENDING", "BATCH_JOB_PHASE_RUNNING"]: + time.sleep(5) + continue + elif phase == "BATCH_JOB_PHASE_SUCCEEDED": + print("✅ Job completed successfully!") + break + else: + print(f"❌ Job ended with phase: {phase}") + break + + +async def example_async(): + """Example using async/await""" + print("\n\nExample 4: Async usage") + print("-" * 50) + + from gradient import AsyncGradient + + client = AsyncGradient() + knowledge_base_uuid = os.getenv("KNOWLEDGE_BASE_UUID", "your-kb-uuid-here") + + print(f"Creating indexing job for knowledge base: {knowledge_base_uuid}") + indexing_job = await client.knowledge_bases.indexing_jobs.create( + knowledge_base_uuid=knowledge_base_uuid, + ) + + job_uuid = indexing_job.job.uuid if indexing_job.job else None + if not job_uuid: + print("Error: Could not create indexing job") + return + + print(f"Indexing job created with UUID: {job_uuid}") + print("Waiting for indexing job to complete (async)...") + + try: + completed_job = await client.knowledge_bases.indexing_jobs.wait_for_completion( + job_uuid, + poll_interval=5, + timeout=600, # 10 minute timeout + ) + + print("\n✅ Indexing job completed successfully!") + if completed_job.job: + print(f"Phase: {completed_job.job.phase}") + + except TimeoutError as e: + print(f"\n⏱️ Timeout: {e}") + except RuntimeError as e: + print(f"\n❌ Error: {e}") + finally: + await client.close() + + +if __name__ == "__main__": + # Run the basic example + main() + + # Uncomment to run other examples: + # example_with_custom_polling() + # example_manual_polling() + + # For async example, you would need to run: + # import asyncio + # asyncio.run(example_async()) diff --git a/src/gradient/resources/knowledge_bases/indexing_jobs.py b/src/gradient/resources/knowledge_bases/indexing_jobs.py index 95898c2a..0ece3cfb 100644 --- a/src/gradient/resources/knowledge_bases/indexing_jobs.py +++ b/src/gradient/resources/knowledge_bases/indexing_jobs.py @@ -2,6 +2,8 @@ from __future__ import annotations +import time +import asyncio import httpx from ..._types import Body, Omit, Query, Headers, NotGiven, SequenceNotStr, omit, not_given @@ -259,6 +261,97 @@ def update_cancel( cast_to=IndexingJobUpdateCancelResponse, ) + def wait_for_completion( + self, + uuid: str, + *, + poll_interval: int = 5, + timeout: int | None = None, + # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. + # The extra values given here take precedence over values defined on the client or passed to this method. + extra_headers: Headers | None = None, + extra_query: Query | None = None, + extra_body: Body | None = None, + request_timeout: float | httpx.Timeout | None | NotGiven = not_given, + ) -> IndexingJobRetrieveResponse: + """ + Wait for an indexing job to complete by polling its status. + + This method polls the indexing job status at regular intervals until it reaches + a terminal state (succeeded, failed, error, or cancelled). It raises an exception + if the job fails or times out. + + Args: + uuid: The UUID of the indexing job to wait for. + + poll_interval: Time in seconds between status checks (default: 5 seconds). + + timeout: Maximum time in seconds to wait for completion. If None, waits indefinitely. + + extra_headers: Send extra headers + + extra_query: Add additional query parameters to the request + + extra_body: Add additional JSON properties to the request + + request_timeout: Override the client-level default timeout for this request, in seconds + + Returns: + The final IndexingJobRetrieveResponse when the job completes successfully. + + Raises: + TimeoutError: If the job doesn't complete within the specified timeout. + RuntimeError: If the job fails, errors, or is cancelled. + """ + if not uuid: + raise ValueError(f"Expected a non-empty value for `uuid` but received {uuid!r}") + + start_time = time.time() + + while True: + response = self.retrieve( + uuid, + extra_headers=extra_headers, + extra_query=extra_query, + extra_body=extra_body, + timeout=request_timeout, + ) + + # Check if job is in a terminal state + if response.job and response.job.phase: + phase = response.job.phase + + # Success state + if phase == "BATCH_JOB_PHASE_SUCCEEDED": + return response + + # Failure states + if phase == "BATCH_JOB_PHASE_FAILED": + raise RuntimeError( + f"Indexing job {uuid} failed. " + f"Total items indexed: {response.job.total_items_indexed}, " + f"Total items failed: {response.job.total_items_failed}" + ) + + if phase == "BATCH_JOB_PHASE_ERROR": + raise RuntimeError(f"Indexing job {uuid} encountered an error") + + if phase == "BATCH_JOB_PHASE_CANCELLED": + raise RuntimeError(f"Indexing job {uuid} was cancelled") + + # Still in progress (UNKNOWN, PENDING, or RUNNING) + # Check timeout + if timeout is not None: + elapsed = time.time() - start_time + if elapsed >= timeout: + raise TimeoutError( + f"Indexing job {uuid} did not complete within {timeout} seconds. " + f"Current phase: {phase}" + ) + + # Wait before next poll + time.sleep(poll_interval) + class AsyncIndexingJobsResource(AsyncAPIResource): @cached_property @@ -490,6 +583,97 @@ async def update_cancel( cast_to=IndexingJobUpdateCancelResponse, ) + async def wait_for_completion( + self, + uuid: str, + *, + poll_interval: int = 5, + timeout: int | None = None, + # Use the following arguments if you need to pass additional parameters to the API that aren't available via kwargs. + # The extra values given here take precedence over values defined on the client or passed to this method. + extra_headers: Headers | None = None, + extra_query: Query | None = None, + extra_body: Body | None = None, + request_timeout: float | httpx.Timeout | None | NotGiven = not_given, + ) -> IndexingJobRetrieveResponse: + """ + Wait for an indexing job to complete by polling its status. + + This method polls the indexing job status at regular intervals until it reaches + a terminal state (succeeded, failed, error, or cancelled). It raises an exception + if the job fails or times out. + + Args: + uuid: The UUID of the indexing job to wait for. + + poll_interval: Time in seconds between status checks (default: 5 seconds). + + timeout: Maximum time in seconds to wait for completion. If None, waits indefinitely. + + extra_headers: Send extra headers + + extra_query: Add additional query parameters to the request + + extra_body: Add additional JSON properties to the request + + request_timeout: Override the client-level default timeout for this request, in seconds + + Returns: + The final IndexingJobRetrieveResponse when the job completes successfully. + + Raises: + TimeoutError: If the job doesn't complete within the specified timeout. + RuntimeError: If the job fails, errors, or is cancelled. + """ + if not uuid: + raise ValueError(f"Expected a non-empty value for `uuid` but received {uuid!r}") + + start_time = time.time() + + while True: + response = await self.retrieve( + uuid, + extra_headers=extra_headers, + extra_query=extra_query, + extra_body=extra_body, + timeout=request_timeout, + ) + + # Check if job is in a terminal state + if response.job and response.job.phase: + phase = response.job.phase + + # Success state + if phase == "BATCH_JOB_PHASE_SUCCEEDED": + return response + + # Failure states + if phase == "BATCH_JOB_PHASE_FAILED": + raise RuntimeError( + f"Indexing job {uuid} failed. " + f"Total items indexed: {response.job.total_items_indexed}, " + f"Total items failed: {response.job.total_items_failed}" + ) + + if phase == "BATCH_JOB_PHASE_ERROR": + raise RuntimeError(f"Indexing job {uuid} encountered an error") + + if phase == "BATCH_JOB_PHASE_CANCELLED": + raise RuntimeError(f"Indexing job {uuid} was cancelled") + + # Still in progress (UNKNOWN, PENDING, or RUNNING) + # Check timeout + if timeout is not None: + elapsed = time.time() - start_time + if elapsed >= timeout: + raise TimeoutError( + f"Indexing job {uuid} did not complete within {timeout} seconds. " + f"Current phase: {phase}" + ) + + # Wait before next poll + await asyncio.sleep(poll_interval) + class IndexingJobsResourceWithRawResponse: def __init__(self, indexing_jobs: IndexingJobsResource) -> None: @@ -510,6 +694,9 @@ def __init__(self, indexing_jobs: IndexingJobsResource) -> None: self.update_cancel = to_raw_response_wrapper( indexing_jobs.update_cancel, ) + self.wait_for_completion = to_raw_response_wrapper( + indexing_jobs.wait_for_completion, + ) class AsyncIndexingJobsResourceWithRawResponse: @@ -531,6 +718,9 @@ def __init__(self, indexing_jobs: AsyncIndexingJobsResource) -> None: self.update_cancel = async_to_raw_response_wrapper( indexing_jobs.update_cancel, ) + self.wait_for_completion = async_to_raw_response_wrapper( + indexing_jobs.wait_for_completion, + ) class IndexingJobsResourceWithStreamingResponse: @@ -552,6 +742,9 @@ def __init__(self, indexing_jobs: IndexingJobsResource) -> None: self.update_cancel = to_streamed_response_wrapper( indexing_jobs.update_cancel, ) + self.wait_for_completion = to_streamed_response_wrapper( + indexing_jobs.wait_for_completion, + ) class AsyncIndexingJobsResourceWithStreamingResponse: @@ -573,3 +766,6 @@ def __init__(self, indexing_jobs: AsyncIndexingJobsResource) -> None: self.update_cancel = async_to_streamed_response_wrapper( indexing_jobs.update_cancel, ) + self.wait_for_completion = async_to_streamed_response_wrapper( + indexing_jobs.wait_for_completion, + )