-
Notifications
You must be signed in to change notification settings - Fork 0
Tavily Web Search: Auth, Retries & Fallback #28
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
- Added web search configuration variables to .env.example - Updated .gitignore to include a new line for clarity - Refactored import statement in indexing_router.py for consistency - Included web answering router in main.py for improved routing structure - Updated requirements.txt to include trafilatura and ensure httpx uses HTTP/2
…ngs for search configuration
…ror handling and HTML extraction
WalkthroughAdds a new asynchronous WebFetchService and FetchedDoc dataclass that concurrently fetches URLs with semaphore-based concurrency limits, extracts title/site/text (trafilatura optional, fallback sanitizer), checks URL safety, logs timings/errors, and provides fetch_urls and fetch_search_results public methods. Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Caller
participant WFS as WebFetchService
participant Sem as Semaphore
participant HTTP as Remote HTTP
participant Ext as Extractor (trafilatura / fallback)
Caller->>WFS: fetch_urls(urls, timeout)
WFS->>Sem: acquire slot
loop for each URL (concurrent tasks)
WFS->>HTTP: GET url (timeout, headers)
alt Success (HTML)
HTTP-->>WFS: HTML response
WFS->>Ext: extract title & text
Ext-->>WFS: title, text
WFS-->>Caller: FetchedDoc(url, title, site_name, text, fetch_ms)
else Failure / non-HTML / timeout
HTTP-->>WFS: error/none
WFS-->>Caller: FetchedDoc(url, text="", fetch_ms)
end
end
WFS->>Sem: release slot
note right of WFS: logs timings, counts, fallbacks
sequenceDiagram
autonumber
actor Caller
participant WFS as WebFetchService
participant SR as Search Results
participant Merge as Snippet Merge
Caller->>WFS: fetch_search_results(results, preserve_snippets)
WFS->>SR: extract URLs + optional snippets
WFS->>WFS: fetch_urls(extracted_urls)
alt preserve_snippets and fetch failures
WFS->>Merge: fill missing text with snippets
Merge-->>Caller: FetchedDoc list (with snippets)
else
WFS-->>Caller: FetchedDoc list
end
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
Poem
Pre-merge checks and finishing touches✅ Passed checks (3 passed)
✨ Finishing touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (8)
services/web_fetch_service.py (8)
53-55
: Reuse a single AsyncClient for pooling and HTTP/2; expose close.Per-request clients defeat pooling and add overhead.
self.max_concurrency = max_concurrency or settings.max_fetch_concurrency self.semaphore = asyncio.Semaphore(self.max_concurrency) # Common HTML headers for the fetch requests self.headers = { @@ - async with httpx.AsyncClient(timeout=timeout_seconds, follow_redirects=True) as client: - response = await client.get(url, headers=self.headers) + # Reuse a single pooled client + response = await self._client.get(url, timeout=timeout_seconds)Add outside the selected range:
# in __init__ after headers self._client = httpx.AsyncClient( http2=True, follow_redirects=True, headers=self.headers, limits=httpx.Limits( max_connections=self.max_concurrency, max_keepalive=self.max_concurrency, ), ) # add to class async def aclose(self) -> None: await self._client.aclose()Also applies to: 163-165
167-174
: Skip non-HTML content early.Avoid passing binary or JSON to HTML extractors.
- html = response.text + ct = response.headers.get("Content-Type", "") + if "text/html" not in ct and "application/xhtml+xml" not in ct: + fetch_ms = int((time.time() - start_time) * 1000) + fetched_doc.fetch_ms = fetch_ms + logger.info("Skipping non-HTML content from %s (type=%s)", url, ct) + return fetched_doc + html = response.text
183-185
: Use parameterized logs and exception logging (fixes RUF010/TRY400).Avoid f-strings in logs; include tracebacks where useful.
- logger.info(f"Fetched {url} in {fetch_ms}ms, extracted {len(text)} chars") + logger.info("Fetched %s in %dms, extracted %d chars", url, fetch_ms, len(text)) @@ - logger.warning(f"HTTP error {status} fetching {url}: {str(e)}") + logger.warning("HTTP error %s fetching %s: %s", status, url, e) @@ - logger.warning(f"Request error fetching {url}: {str(e)}") + logger.warning("Request error fetching %s: %s", url, e) @@ - except Exception as e: - logger.warning(f"Error fetching {url}: {str(e)}") + except Exception: + logger.exception("Error fetching %s", url) @@ - logger.warning(f"Failed to fetch {url} after {fetch_ms}ms") + logger.warning("Failed to fetch %s after %dms", url, fetch_ms) @@ - except Exception as e: - logger.error(f"Error in fetch_urls: {str(e)}") + except Exception: + logger.exception("Error in fetch_urls") @@ - if isinstance(result, Exception): - logger.warning(f"Exception during fetch: {str(result)}") + if isinstance(result, Exception): + logger.warning("Exception during fetch: %r", result)Also applies to: 188-192, 197-198, 221-223, 228-231
108-116
: Unescape HTML entities in titles.Improves output fidelity.
- if title_match: - title = title_match.group(1).strip() - # Clean up title - title = re.sub(r'\s+', ' ', title) - return title + if title_match: + title = re.sub(r'\s+', ' ', title_match.group(1).strip()) + return html_lib.unescape(title)Add import if missing:
+import html as html_lib
95-106
: Use stdlib entity decoding instead of ad-hoc replacements.Less brittle and more complete.
- # Replace entities - html = re.sub(r' ', ' ', html) - html = re.sub(r'&', '&', html) - html = re.sub(r'<', '<', html) - html = re.sub(r'>', '>', html) - html = re.sub(r'"', '"', html) - html = re.sub(r'&#\d+;', ' ', html) + # Decode HTML entities + html = html_lib.unescape(html)
133-136
: Tighten trafilatura error logging level; avoid noisy warnings.This is a normal fallback path; keep logs low-volume.
- logger.warning("trafilatura extraction failed, falling back to simple extraction") + logger.debug("trafilatura extraction returned no content; using fallback") @@ - except Exception as e: - logger.warning(f"trafilatura extraction error: {e}, falling back to simple extraction") + except Exception as e: + logger.debug("trafilatura extraction error; falling back: %s", e)
8-15
: Imports needed for above changes.Add stdlib modules for entity decoding and IP checks.
import asyncio import logging import re import time from dataclasses import dataclass -from typing import Dict, List, Optional, Set +from typing import Dict, List, Optional, Set from urllib.parse import urlparse +import html as html_lib +import ipaddress
235-274
: Type safety: define a SearchResult protocol for fetch_search_results.Clarify expected attributes without hard coupling to a concrete class.
Add outside the selected range:
from typing import Protocol, Iterable, Optional class SearchResultLike(Protocol): url: str title: Optional[str] site_name: Optional[str] snippet: Optional[str] published_at: Optional[str]Then adjust the signature:
-async def fetch_search_results(self, search_results: List, timeout_seconds: int = 10, +async def fetch_search_results(self, search_results: Iterable[SearchResultLike], timeout_seconds: int = 10,
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
services/web_fetch_service.py
(1 hunks)
🧰 Additional context used
🪛 Ruff (0.13.1)
services/web_fetch_service.py
79-79: Consider moving this statement to an else
block
(TRY300)
80-80: Do not catch blind exception: Exception
(BLE001)
134-134: Do not catch blind exception: Exception
(BLE001)
188-188: Use explicit conversion flag
Replace with conversion flag
(RUF010)
190-190: Use explicit conversion flag
Replace with conversion flag
(RUF010)
191-191: Do not catch blind exception: Exception
(BLE001)
192-192: Use explicit conversion flag
Replace with conversion flag
(RUF010)
220-220: Do not catch blind exception: Exception
(BLE001)
221-221: Use logging.exception
instead of logging.error
Replace with exception
(TRY400)
221-221: Use explicit conversion flag
Replace with conversion flag
(RUF010)
228-228: Use explicit conversion flag
Replace with conversion flag
(RUF010)
…cement/tavily-api-auth
…d allow httpx to manage encoding automatically
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
🧹 Nitpick comments (6)
services/web_fetch_service.py (6)
104-128
: Prefer html.unescape over manual entity replacements.Simpler and more complete entity handling; fewer edge cases.
Apply this diff:
@@ -import re +import re +import html as html_unescape @@ def _sanitize_html_fallback(self, html: str) -> str: - # Replace entities - html = re.sub(r' ', ' ', html) - html = re.sub(r'&', '&', html) - html = re.sub(r'<', '<', html) - html = re.sub(r'>', '>', html) - html = re.sub(r'"', '"', html) - html = re.sub(r'&#\d+;', ' ', html) + # Replace entities + html = html_unescape.unescape(html)Also applies to: 8-16
213-219
: Use lazy/structured logging instead of f-strings; elevate unexpected errors to exception logs.Avoid formatting costs and keep tracebacks.
Apply this diff:
@@ - logger.warning(f"HTTP error {status} fetching {url}: {str(e)}") + logger.warning("HTTP error %s fetching %s: %s", status, url, e) @@ - logger.warning(f"Request error fetching {url}: {str(e)}") + logger.warning("Request error fetching %s: %s", url, e) @@ - logger.warning(f"Error fetching {url}: {str(e)}") + logger.exception("Unexpected error fetching %s", url) @@ - logger.error(f"Error in fetch_urls: {str(e)}") + logger.exception("Error in fetch_urls") @@ - logger.warning(f"Exception during fetch: {str(result)}") + logger.warning("Exception during fetch: %s", result)Also applies to: 248-248, 255-255, 210-211
129-137
: Optional: fall back to OpenGraph/og:title when <title> is missing.Improves coverage for modern sites.
Sketch:
@@ def _extract_title(self, html: str) -> Optional[str]: - title_match = re.search(r'<title[^>]*>(.*?)</title>', html, re.IGNORECASE | re.DOTALL) + title_match = re.search(r'<title[^>]*>(.*?)</title>', html, re.IGNORECASE | re.DOTALL) if title_match: @@ - return None + og = re.search(r'<meta\s+property=["\']og:title["\']\s+content=["\'](.*?)["\']', html, re.IGNORECASE | re.DOTALL) + if og: + return re.sub(r'\s+', ' ', og.group(1).strip()) + return None
227-261
: Confirm intent: dropping docs without text.fetch_urls filters out FetchedDoc without text, which suppresses non-HTML and tiny pages and enables snippet fallback later. If that’s intentional, add a brief docstring note; otherwise, consider returning all FetchedDoc and letting callers decide.
262-301
: Add a typed protocol for search_results to improve safety.Prevents attribute errors and clarifies the contract.
Example:
@@ -from typing import Dict, List, Optional, Set +from typing import Dict, List, Optional, Set, Protocol, Sequence @@ +class SearchResultLike(Protocol): + url: str + title: Optional[str] + site_name: Optional[str] + snippet: Optional[str] + published_at: Optional[str] @@ - async def fetch_search_results(self, search_results: List, timeout_seconds: int = 10, + async def fetch_search_results(self, search_results: Sequence[SearchResultLike], timeout_seconds: int = 10, preserve_snippets: bool = True) -> List[FetchedDoc]:Also applies to: 14-14
57-61
: Optional: use a product UA; reduce per-URL logs to debug.Consider a descriptive UA (e.g., “AuthormatonFetcher/1.0 (+site)”) and keep per-URL logs at debug to avoid noisy INFO in production.
📜 Review details
Configuration used: Path: .coderabbit.yaml
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
services/web_fetch_service.py
(1 hunks)
🧰 Additional context used
🪛 Ruff (0.13.1)
services/web_fetch_service.py
73-73: Consider moving this statement to an else
block
(TRY300)
74-74: Do not catch blind exception: Exception
(BLE001)
100-100: Consider moving this statement to an else
block
(TRY300)
101-101: Do not catch blind exception: Exception
(BLE001)
155-155: Do not catch blind exception: Exception
(BLE001)
215-215: Use explicit conversion flag
Replace with conversion flag
(RUF010)
217-217: Use explicit conversion flag
Replace with conversion flag
(RUF010)
218-218: Do not catch blind exception: Exception
(BLE001)
219-219: Use explicit conversion flag
Replace with conversion flag
(RUF010)
247-247: Do not catch blind exception: Exception
(BLE001)
248-248: Use logging.exception
instead of logging.error
Replace with exception
(TRY400)
248-248: Use explicit conversion flag
Replace with conversion flag
(RUF010)
255-255: Use explicit conversion flag
Replace with conversion flag
(RUF010)
🔇 Additional comments (1)
services/web_fetch_service.py (1)
57-63
: Headers cleanup looks good (matches prior feedback).Dropping Accept-Encoding br and browser-only headers is correct; letting httpx negotiate/decompress is safer.
def __init__(self, max_concurrency: Optional[int] = None): | ||
""" | ||
Initialize the web fetch service. | ||
|
||
Args: | ||
max_concurrency: Maximum number of concurrent requests | ||
(defaults to settings.max_fetch_concurrency) | ||
""" | ||
self.max_concurrency = max_concurrency or settings.max_fetch_concurrency | ||
self.semaphore = asyncio.Semaphore(self.max_concurrency) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Block SSRF via open redirects; disable env proxies; reuse a single AsyncClient with limits.
Currently follow_redirects=True allows 3xx hops to localhost/IMDS/etc, bypassing your pre-check. Also, creating a new client per request is wasteful and inherits proxy env by default. Reuse one AsyncClient, set trust_env=False, and manually validate each redirect hop.
Apply this diff:
@@
-from urllib.parse import urlparse
+from urllib.parse import urlparse, urljoin
@@ class WebFetchService:
- def __init__(self, max_concurrency: Optional[int] = None):
+ def __init__(self, max_concurrency: Optional[int] = None):
@@
- self.max_concurrency = max_concurrency or settings.max_fetch_concurrency
- self.semaphore = asyncio.Semaphore(self.max_concurrency)
+ raw = max_concurrency or getattr(settings, "max_fetch_concurrency", 10)
+ self.max_concurrency = max(1, raw)
+ self.semaphore = asyncio.Semaphore(self.max_concurrency)
@@
- # Common HTML headers for the fetch requests
+ # Common HTML headers for the fetch requests
self.headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5"
# Let httpx handle Accept-Encoding and compression automatically
}
+
+ # Reuse a single client; disable env proxies; bound pool size
+ self._limits = httpx.Limits(
+ max_connections=self.max_concurrency,
+ max_keepalive_connections=self.max_concurrency,
+ )
+ self._client = httpx.AsyncClient(
+ headers=self.headers,
+ http2=True,
+ limits=self._limits,
+ follow_redirects=False, # we validate/handle redirects manually
+ trust_env=False, # ignore HTTP(S)_PROXY/NO_PROXY
+ )
+ self._max_redirects = getattr(settings, "fetch_max_redirects", 5)
+ self._max_content_bytes = getattr(settings, "fetch_max_content_bytes", 2_000_000)
+
+ async def aclose(self) -> None:
+ await self._client.aclose()
@@
- try:
- async with httpx.AsyncClient(timeout=timeout_seconds, follow_redirects=True) as client:
- response = await client.get(url, headers=self.headers)
- response.raise_for_status()
-
- html = response.text
-
- # Extract title if not already provided
- title = self._extract_title(html)
-
- # Extract text content
- text = self._extract_text_from_html(html)
-
- # Update the fetched document
- fetched_doc.title = title
- fetched_doc.text = text
-
- # Calculate fetch time
- fetch_ms = int((time.time() - start_time) * 1000)
- fetched_doc.fetch_ms = fetch_ms
-
- logger.info(f"Fetched {url} in {fetch_ms}ms, extracted {len(text)} chars")
- return fetched_doc
+ try:
+ current_url = url
+ html = ""
+ # Manually follow and validate up to N redirects
+ for _ in range(self._max_redirects + 1):
+ if not self._is_url_allowed(current_url):
+ raise httpx.HTTPStatusError("Unsafe URL after redirect validation",
+ request=None, response=None)
+ # Stream to cap payload size
+ async with self._client.stream("GET", current_url, timeout=timeout_seconds) as response:
+ if 300 <= response.status_code < 400 and "location" in response.headers:
+ next_url = urljoin(current_url, response.headers["location"])
+ current_url = next_url
+ continue
+ response.raise_for_status()
+ ctype = response.headers.get("content-type", "")
+ if "html" not in ctype and "xml" not in ctype:
+ logger.debug("Skipping non-HTML content: %s (%s)", current_url, ctype)
+ break
+ buf = bytearray()
+ async for chunk in response.aiter_bytes():
+ buf.extend(chunk)
+ if len(buf) > self._max_content_bytes:
+ logger.warning("Aborting %s: response exceeded %d bytes", current_url, self._max_content_bytes)
+ break
+ if buf:
+ html = buf.decode("utf-8", errors="replace")
+ break
+
+ # Extract title/text if we have HTML
+ if html:
+ title = self._extract_title(html)
+ text = self._extract_text_from_html(html)
+ fetched_doc.title = title
+ fetched_doc.text = text
+
+ # Calculate fetch time
+ fetch_ms = int((time.time() - start_time) * 1000)
+ fetched_doc.fetch_ms = fetch_ms
+ logger.debug("Fetched %s in %dms, extracted %d chars", current_url, fetch_ms, len(fetched_doc.text))
+ return fetched_doc
Also applies to: 15-16, 161-213
🤖 Prompt for AI Agents
In services/web_fetch_service.py around lines 46-56 (also applies to lines 15-16
and 161-213): the service currently creates a new HTTP client per request,
inherits proxy environment variables, and uses follow_redirects=True which
allows open-redirect SSRF to internal addresses; fix by instantiating a single
httpx.AsyncClient in __init__ with appropriate limits (httpx.Limits or
equivalent) and trust_env=False, set follow_redirects=False, and reuse that
client for all requests; implement manual redirect handling when a 3xx response
is returned: read the Location header, resolve and validate each redirect hop
against the same SSRF/internal-host checks used for the initial URL (reject
local/IMDS/private IPs), enforce a maximum redirect count, and only follow
validated redirects using the single client; ensure the client is properly
closed on service shutdown.
def _is_url_allowed(self, url: str) -> bool: | ||
""" | ||
Check if a URL is safe to fetch (SSRF protection). | ||
|
||
Args: | ||
url: The URL to validate | ||
|
||
Returns: | ||
True if URL is safe to fetch, False otherwise | ||
""" | ||
try: | ||
p = urlparse(url) | ||
if p.scheme not in ("http", "https"): | ||
return False | ||
if not p.hostname or p.username or p.password: | ||
return False | ||
try: | ||
ip = ipaddress.ip_address(p.hostname) | ||
if not ip.is_global: | ||
return False | ||
except ValueError: | ||
# Hostname; DNS resolution checks can be added later if needed. | ||
pass | ||
return True | ||
except Exception: | ||
return False | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SSRF guard is a good start; consider tightening hostname cases.
Add explicit blocks for localhost-style hostnames and common link-local names; document future DNS resolution plans to catch rebinds.
Example tweak:
@@ def _is_url_allowed(self, url: str) -> bool:
- if not p.hostname or p.username or p.password:
+ if not p.hostname or p.username or p.password:
return False
+ host = p.hostname.lower()
+ if host in {"localhost", "localhost.localdomain"} or host.endswith(".local"):
+ return False
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
def _is_url_allowed(self, url: str) -> bool: | |
""" | |
Check if a URL is safe to fetch (SSRF protection). | |
Args: | |
url: The URL to validate | |
Returns: | |
True if URL is safe to fetch, False otherwise | |
""" | |
try: | |
p = urlparse(url) | |
if p.scheme not in ("http", "https"): | |
return False | |
if not p.hostname or p.username or p.password: | |
return False | |
try: | |
ip = ipaddress.ip_address(p.hostname) | |
if not ip.is_global: | |
return False | |
except ValueError: | |
# Hostname; DNS resolution checks can be added later if needed. | |
pass | |
return True | |
except Exception: | |
return False | |
def _is_url_allowed(self, url: str) -> bool: | |
""" | |
Check if a URL is safe to fetch (SSRF protection). | |
Args: | |
url: The URL to validate | |
Returns: | |
True if URL is safe to fetch, False otherwise | |
""" | |
try: | |
p = urlparse(url) | |
if p.scheme not in ("http", "https"): | |
return False | |
if not p.hostname or p.username or p.password: | |
return False | |
host = p.hostname.lower() | |
if host in {"localhost", "localhost.localdomain"} or host.endswith(".local"): | |
return False | |
try: | |
ip = ipaddress.ip_address(p.hostname) | |
if not ip.is_global: | |
return False | |
except ValueError: | |
# Hostname; DNS resolution checks can be added later if needed. | |
pass | |
return True | |
except Exception: | |
return False |
🧰 Tools
🪛 Ruff (0.13.1)
100-100: Consider moving this statement to an else
block
(TRY300)
101-101: Do not catch blind exception: Exception
(BLE001)
🤖 Prompt for AI Agents
In services/web_fetch_service.py around lines 77 to 103, the SSRF guard
currently allows hostnames that could still resolve to
loopback/link-local/private addresses; update the function to explicitly reject
common localhost-style hostnames and numeric edge cases by normalizing
p.hostname (lowercase, strip surrounding brackets for IPv6) and returning False
for literal names like "localhost", "ip6-localhost", "0.0.0.0" and for IPv4/IPv6
addresses in loopback, link-local (169.254.0.0/16 and fe80::/10), multicast, and
private ranges (10/8, 172.16/12, 192.168/16) using ipaddress checks
(is_loopback, is_link_local, is_private, is_multicast) after parsing the
hostname to an ip object; if hostname is non-numeric keep the existing behavior
but add a clear TODO comment that DNS resolution and rebind protection will be
implemented later.
Features
TavilySearchProvider (
services/web_search_service.py
)Authorization: Bearer <TAVILY_API_KEY>
.httpx
with per-request timeout.{url, title, site_name, snippet, published_at, score}
.Settings & Fallback (
config/settings.py
)WEB_SEARCH_ENGINE=tavily
but key missing → log warning and fallback to dummy (no crash).Fixes
Refactors
Observability
Configuration
WEB_SEARCH_ENGINE=tavily|dummy|...
TAVILY_API_KEY
required only when using Tavily; otherwise dummy is used.Manual QA
Compatibility & Risks
Checklist
Summary by CodeRabbit