The Newscatcher CatchAll Python library provides access to the CatchAll API, which transforms natural language queries into structured data extracted from web sources.
pip install newscatcher-catchall-sdkA full reference for this library is available here.
Submit a query and retrieve structured results:
from newscatcher_catchall import CatchAllApi
import time
client = CatchAllApi(api_key="YOUR_API_KEY")
# Create a job
job = client.jobs.create_job(
query="Tech company earnings this quarter",
context="Focus on revenue and profit margins",
schema="Company [NAME] earned [REVENUE] in [QUARTER]",
)
print(f"Job created: {job.job_id}")
# Poll for completion with progress updates
while True:
status = client.jobs.get_job_status(job.job_id)
# Check if completed
completed = any(s.status == "completed" and s.completed for s in status.steps)
if completed:
print("Job completed!")
break
# Show current processing step
current_step = next((s for s in status.steps if not s.completed), None)
if current_step:
print(f"Processing: {current_step.status} (step {current_step.order}/7)")
time.sleep(60)
# Retrieve results
results = client.jobs.get_job_results(job.job_id)
print(f"Found {results.valid_records} valid records from {results.candidate_records} candidates")
for record in results.all_records:
print(record.record_title)Jobs process asynchronously and typically complete in 10-15 minutes. To learn more, see the Quickstart.
Automate recurring queries with scheduled execution:
from newscatcher_catchall import CatchAllApi
client = CatchAllApi(api_key="YOUR_API_KEY")
# Create a monitor from a completed job
monitor = client.monitors.create_monitor(
reference_job_id=job.job_id,
schedule="every day at 12 PM UTC",
webhook={
"url": "https://your-endpoint.com/webhook",
"method": "POST",
"headers": {"Authorization": "Bearer YOUR_TOKEN"},
},
)
print(f"Monitor created: {monitor.monitor_id}")
# List all monitors
monitors = client.monitors.list_monitors()
print(f"Total monitors: {monitors.total_monitors}")
# Get aggregated results
results = client.monitors.pull_monitor_results(monitor.monitor_id)
print(f"Collected {results.records} records")Monitors run jobs on your schedule and send webhook notifications when complete. See the Monitors documentation for setup and configuration.
Use the async client for non-blocking API calls:
async def main() -> None:
job = await client.jobs.create_job(
query="Tech company earnings this quarter",
context="Focus on revenue and profit margins",
)
print(f"Job created: {job.job_id}")
# Wait for completion
while True:
status = await client.jobs.get_job_status(job.job_id)
completed = any(s.status == "completed" and s.completed for s in status.steps)
if completed:
print("Job completed!")
break
current_step = next((s for s in status.steps if not s.completed), None)
if current_step:
print(f"Processing: {current_step.status} (step {current_step.order}/7)")
await asyncio.sleep(60)Handle API errors with the ApiError exception:
from newscatcher_catchall.core.api_error import ApiError
try:
client.jobs.create_job(query="...")
except ApiError as e:
print(f"Status: {e.status_code}")
print(f"Error: {e.body}")Retrieve large result sets with pagination:
# Retrieve large result sets with pagination
page = 1
while True:
results = client.jobs.get_job_results(
job_id="...",
page=page,
page_size=100,
)
print(f"Page {results.page}/{results.total_pages}: {len(results.all_records)} records")
for record in results.all_records:
# Process each record
print(f" - {record.record_title}")
if results.page >= results.total_pages:
break
page += 1
print(f"Processed {results.valid_records} total records")Access response headers and raw data:
response = client.jobs.with_raw_response.create_job(query="...")
print(response.headers)
print(response.data)The SDK retries failed requests automatically with exponential backoff. Configure retry behavior:
client.jobs.create_job(
query="...",
request_options={"max_retries": 3},
)Set custom timeouts at the client or request level:
# Client-level timeout
client = CatchAllApi(api_key="YOUR_API_KEY", timeout=30.0)
# Request-level timeout
client.jobs.create_job(
query="...",
request_options={"timeout_in_seconds": 10},
)Customize the underlying HTTP client for proxies or custom transports:
import httpx
from newscatcher_catchall import CatchAllApi
client = CatchAllApi(
api_key="YOUR_API_KEY",
httpx_client=httpx.Client(
proxy="http://my.proxy.example.com",
transport=httpx.HTTPTransport(local_address="0.0.0.0"),
),
)CatchAll API is in beta. Breaking changes may occur in minor version updates. See the Changelog for updates.
This library is generated programmatically from our API specification. Direct contributions to the generated code cannot be merged, but README improvements are welcome. To suggest SDK changes, please open an issue.
- Documentation: https://www.newscatcherapi.com/docs/v3/catch-all
- Support: support@newscatcherapi.com