In [6]:
from langchain_community.utilities import SearxSearchWrapper
s = SearxSearchWrapper(searx_host="http://localhost:8080")
query = "What is the capital of France?"
print(s.run(query=query))
print(s.results(query=query, num_results=5))

Paris[a] is the capital and largest city of France, with an estimated city center population of 2,048,472, and a metropolitan population of 13,171,056 as of January 2025 [update][3] in an area of more than 105 km 2 (41 sq mi). It is located in the centre of the Île-de-France region. Paris is the fourth-most populous city in the European Union.

Paris is the capital of France, the largest country of Europe with 550 000 km2 (65 millions inhabitants). Paris has 2.234 million inhabitants end 2011.

This is a chronological list of capitals of France. The capital of France has been Paris since its liberation in 1944.

6 days ago - Paris, city and capital of France, located along the Seine River, in the north-central part of the country. Paris is one of the world’s most important and attractive cities, famed for its gastronomy, haute couture, painting, literature, and intellectual community.

You can climb to the top of the ... stores of Le Marais and the Place des Vosges. Enjoying an ice-cre

In [None]:
import requests

def fetch_urls_with_proxy(data, proxy_base="http://localhost:3001/"):
    """
    Fetches content for each URL in the given list using a proxy service.

    Args:
        data (list[dict]): List of dictionaries containing at least a 'link' key.
        proxy_base (str): Base proxy URL to prepend before the target URL.

    Returns:
        list[dict]: Updated list where each dict includes 'content' with the response text.
    """
    results = []
    for item in data:
        url = item.get("link")
        if not url:
            item["content"] = None
            results.append(item)
            continue

        try:
            # Construct the proxied request
            proxied_url = proxy_base + url
            resp = requests.get(proxied_url, timeout=10)

            # Append the content to the dict
            item["content"] = resp.text if resp.status_code == 200 else f"Error {resp.status_code}"
        except Exception as e:
            item["content"] = f"Request failed: {e}"

        results.append(item)

    return results


In [11]:
data = [
    {
        'snippet': 'example text',
        'title': 'example title',
        'link': 'https://www.iana.org/domains/example',
        'engines': ['duckduckgo'],
        'category': 'general'
    }
]

output = fetch_urls_with_proxy(data)
print(output[0]["content"][:500])  # Print first 500 chars of the fetched content


Error 400


In [None]:
# src/search.py (add this function)
import os
from typing import List, Dict
from langchain_community.utilities import SearxSearchWrapper

def searx_search(query: str, k: int = 5) -> List[Dict]:
    """
    Run a SearXNG search via LangChain and return a list of result dicts
    shaped like: {title, link, snippet, engines, category}.

    Args:
        query: Search query string.
        k:     Max number of results to return.

    Returns:
        List[Dict]: Each item contains keys:
            - title (str)
            - link (str)
            - snippet (str | None)
            - engines (list[str] | None)
            - category (str | None)
    """
    searx_host = os.getenv("SEARXNG_HOST", "http://localhost:8080").rstrip("/")
    s = SearxSearchWrapper(searx_host=searx_host)

    try:
        raw = s.results(query=query, num_results=k)
        return raw
    except Exception as e:
        print(f"[searx_search] Error: {e}")
        return []
searx_search('Hi')

[{'snippet': 'Sep 18, 2025 ... used especially as a greeting; just called to say hi. HI 2 of 2 abbreviation 1. Hawaii 2. high intensity 3. humidity index.',
  'title': 'HI Definition & Meaning - Merriam-Webster',
  'link': 'https://www.merriam-webster.com/dictionary/hi',
  'engines': ['duckduckgo', 'startpage', 'google', 'brave'],
  'category': 'general'},
 {'snippet': '5 days ago - Hi definition: (used as an exclamation of greeting) hello!. See examples of HI used in a sentence.',
  'title': 'HI Definition & Meaning | Dictionary.com',
  'link': 'https://www.dictionary.com/browse/hi',
  'engines': ['startpage', 'google', 'brave'],
  'category': 'general'},
 {'snippet': 'American English. First recorded reference is to speech of a Kansas Indian (1862); originally to attract attention, probably a variant of Middle English hey, hy (circa 1475). Also an exclamation to call attention.',
  'title': 'hi - Wiktionary, the free dictionary',
  'link': 'https://en.wiktionary.org/wiki/hi',
  'engi

In [13]:
s.results(query='hi', num_results=5)

[{'snippet': 'Sep 18, 2025 ... used especially as a greeting; just called to say hi. HI 2 of 2 abbreviation 1. Hawaii 2. high intensity 3. humidity index.',
  'title': 'HI Definition & Meaning - Merriam-Webster',
  'link': 'https://www.merriam-webster.com/dictionary/hi',
  'engines': ['duckduckgo', 'startpage', 'google', 'brave'],
  'category': 'general'},
 {'snippet': '5 days ago - Hi definition: (used as an exclamation of greeting) hello!. See examples of HI used in a sentence.',
  'title': 'HI Definition & Meaning | Dictionary.com',
  'link': 'https://www.dictionary.com/browse/hi',
  'engines': ['startpage', 'google', 'brave'],
  'category': 'general'},
 {'snippet': 'Jul 26, 2024 ... Both words have the same meaning, but hi is a bit more causal and informal. For example, you might say hi to your friends but hello to your boss\xa0...',
  'title': 'r/EnglishLearning on Reddit: What is the difference between "Hi" and "Hello"?',
  'link': 'https://www.reddit.com/r/EnglishLearning/commen

In [15]:
# src/search.py (add below your searx_search)
from typing import List, Dict

def dedupe_by_link(items: List[Dict]) -> List[Dict]:
    """
    Keep only the first occurrence of each unique 'link'.
    Items without a 'link' are kept (can't dedupe them reliably).
    """
    seen = set()
    out: List[Dict] = []
    for it in items:
        link = it.get("link")
        if not link:
            out.append(it)
            continue
        if link in seen:
            continue
        seen.add(link)
        out.append(it)
    return out

hits = searx_search("hi", k=10)
unique_hits = dedupe_by_link(hits)
print(len(hits), "->", len(unique_hits))


10 -> 10


In [None]:
# src/search.py
from __future__ import annotations
import re
import os
import logging
from typing import Dict, List, Any, Tuple
import time
import requests
from requests.exceptions import Timeout, ConnectionError, ChunkedEncodingError

from concurrent.futures import ThreadPoolExecutor, as_completed
from requests.adapters import HTTPAdapter
import math
from langchain_community.utilities import SearxSearchWrapper
from pathlib import Path

from dotenv import load_dotenv


########################
# Env Loading
########################

load_dotenv()


########################
# Configuration
########################
SEARX_HOST = os.getenv("SEARXNG_HOST", "http://localhost:8080").rstrip("/")
READER_HOST = os.getenv("READER_HOST", "http://localhost:3001").rstrip("/")
READER_MODE = os.getenv("READER_MODE", "proxy")  # "proxy" or "reader"
MAX_RESULTS = int(os.getenv("MAX_RESULTS", "5"))
MIN_CONTENT_CHARS = int(os.getenv("MIN_CONTENT_CHARS", "400"))
MIN_SOURCES = int(os.getenv("MIN_SOURCES", "2"))
LANG = os.getenv("LANG", "auto")

TIMEOUT = int(os.getenv("REQUEST_TIMEOUT", "5"))
RETRY_TIMEOUT = int(os.getenv("RETRY_TIMEOUT", "10"))
RETRY_ON_FAILURE = int(os.getenv("RETRY_ON_FAILURE", "1"))

FETCH_WORKERS = int(os.getenv("FETCH_WORKERS", "8"))

logging.basicConfig(level=os.getenv("LOGLEVEL", "INFO"))
log = logging.getLogger("merlin-rag.search")




##########################
## Orchestrator
##########################

def search_and_fetch(query: str, k: int = 5) -> List[Dict[str, Any]]:
    """
    Orchestrator:
      searx_search -> dedupe_by_link -> fetch_urls_with_proxy -> normalize_records
    Returns a clean list of dicts ready for display.
    """
    # 1) search
    raw_hits = searx_search(query, k)
    searched = len(raw_hits)

    # 2) dedupe
    hits = dedupe_by_link(raw_hits)
    deduped = len(hits)

    # 3) fetch content via your proxy
    hits = fetch_urls_with_proxy(hits)
    fetched = len(hits)

    # 4) normalize shape
    clean = normalize_records(hits)

    try:
        kept, dropped = filter_failed_and_tiny(clean, min_chars=MIN_CONTENT_CHARS)
        successes = len(kept)
        failures = len(dropped)
        snippet_only = successes < MIN_SOURCES
    except Exception as _:
        # If filter function isn’t present for some reason, fall back to simple counts
        successes = sum(
            1 for it in clean
            if isinstance(it.get("content"), str)
            and not it["content"].strip().startswith(("Request failed:", "Error "))
            and len(_clean(it["content"])) >= MIN_CONTENT_CHARS
        )
        failures = len(clean) - successes
        snippet_only = successes < MIN_SOURCES

    _log_run_summary(
        searched=searched,
        deduped=deduped,
        fetched=fetched,
        successes=successes,
        failures=failures,
        snippet_only=snippet_only,
    )
    # --- end stats ---

    return clean
def _log_run_summary(
    searched: int,
    deduped: int,
    fetched: int,
    successes: int,
    failures: int,
    snippet_only: bool,
) -> None:
    log.info(
        "search_and_fetch summary | searched=%d, deduped=%d, fetched=%d, successes=%d, failures=%d, snippet_only=%s",
        searched, deduped, fetched, successes, failures, str(snippet_only),
    )


###########
## URLs
###########
def dedupe_by_link(items: List[Dict]) -> List[Dict]:
    """
    Keep only the first occurrence of each unique 'link'.
    Items without a 'link' are kept (can't dedupe them reliably).
    """
    seen = set()
    out: List[Dict] = []
    for it in items:
        link = it.get("link")
        if not link:
            out.append(it)
            continue
        if link in seen:
            continue
        seen.add(link)
        out.append(it)
    return out
    
######################
## Search (SearXNG)
#####################
def searx_search(query: str, k: int = 5) -> List[Dict]:
    """
    Run a SearXNG search via LangChain and return a list of result dicts
    shaped like: {title, link, snippet, engines, category}.

    Args:
        query: Search query string.
        k:     Max number of results to return.

    Returns:
        List[Dict]: Each item contains keys:
            - title (str)
            - link (str)
            - snippet (str | None)
            - engines (list[str] | None)
            - category (str | None)
    """
    s = SearxSearchWrapper(searx_host=SEARX_HOST)

    try:
        raw = s.results(query=query, num_results=k)
        return raw
    except Exception as e:
        print(f"[searx_search] Error: {e}")
        return []




##########################
## HTTP Fetch via Proxy
##########################
def fetch_urls_with_proxy(
    data: List[Dict[str, Any]],
    proxy_base: str = READER_HOST,
    timeout: int = TIMEOUT,
    retry_timeout: int = RETRY_TIMEOUT,
    retry_delay: float = 0.75,
    max_retries: int = RETRY_ON_FAILURE,
    max_workers: int = FETCH_WORKERS,
) -> List[Dict[str, Any]]:
    """
    Fast, concurrent fetch through a proxy with minimal retry logic.

    - Reuses a single requests.Session for connection pooling / keep-alive.
    - Parallelizes network I/O with a thread pool (I/O-bound).
    - One or more retries on transient errors, controlled by `max_retries`.
    - Returns items in the same order as `data`.
    """
    proxy_base = proxy_base.rstrip("/") + "/"
    transient_status = {429, 500, 502, 503, 504}
    transient_errors = (Timeout, ConnectionError, ChunkedEncodingError)

    # Sensible default for I/O-bound concurrency
    if max_workers is None:
        # Scale with batch size but cap to avoid thrashing
        max_workers = min(16, max(4, math.ceil(len(data) / 2)))

    # Prepare a pooled session (keep-alive, larger pool)
    session = requests.Session()
    session.headers.update({"User-Agent": "MERLIN-RAG/0.1"})
    adapter = HTTPAdapter(pool_connections=32, pool_maxsize=32, max_retries=0)
    session.mount("http://", adapter)
    session.mount("https://", adapter)

    def fetch_one(idx_item: Tuple[int, Dict[str, Any]]) -> Tuple[int, Dict[str, Any]]:
        idx, item = idx_item
        url = item.get("link")
        if not url:
            item["content"] = None
            return idx, item

        proxied_url = proxy_base + url

        attempts = 0
        while True:
            try:
                t = timeout if attempts == 0 else retry_timeout
                resp = session.get(proxied_url, timeout=t)
                if resp.status_code == 200:
                    item["content"] = resp.text
                else:
                    item["content"] = f"Error {resp.status_code}"
                return idx, item
            except transient_errors as e:
                attempts += 1
                if attempts > max_retries:
                    item["content"] = f"Request failed: {e}"
                    return idx, item
                time.sleep(retry_delay * (1.25 ** (attempts - 1)))  # tiny backoff
            except Exception as e:
                item["content"] = f"Request failed: {e}"
                return idx, item

    # Submit tasks
    futures = []
    with ThreadPoolExecutor(max_workers=max_workers) as ex:
        for idx, item in enumerate(data):
            futures.append(ex.submit(fetch_one, (idx, item.copy())))

        # Collect in an index-addressable buffer to preserve order
        results_buffer: List[Tuple[int, Dict[str, Any]] | None] = [None] * len(data)
        for fut in as_completed(futures):
            idx, out_item = fut.result()
            results_buffer[idx] = (idx, out_item)

    # Flatten buffer (preserving original order)
    results: List[Dict[str, Any]] = [pair[1] for pair in results_buffer if pair is not None]

    return results

##########################
## Record Normalization
##########################
def normalize_records(items: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """
    Ensure each record has the expected keys and simple, safe types.
    Keys: title, link, snippet, engines, category, content
    """
    normalized: List[Dict[str, Any]] = []
    for it in items:
        normalized.append({
            "title": (it.get("title") or "") if isinstance(it.get("title"), str) else "",
            "link": it.get("link"),
            "snippet": it.get("snippet") if isinstance(it.get("snippet"), str) else None,
            "engines": it.get("engines") if isinstance(it.get("engines"), list) else None,
            "category": it.get("category") if isinstance(it.get("category"), str) else None,
            ## ADDING content field from fetch step
            "content": it.get("content") if isinstance(it.get("content"), str) else None,
        })
    return normalized


##########################
## Cleaning & Filtering
##########################
def _truncate(text: str, limit: int) -> str:
    """Truncate to ~limit chars, trying to end on a sentence boundary."""
    if text is None:
        return ""
    if len(text) <= limit:
        return text
    cut = text[:limit]
    # try to cut at last sentence-ish boundary in the window
    m = re.search(r"(?s).*[.!?…]\s+", cut)
    if m:
        return cut[: m.end()].rstrip()
    return cut.rstrip() + "…"

def _clean(text: str) -> str:
    """Light cleanup for prompt-friendliness."""
    if text is None:
        return ""
    # collapse extreme whitespace, remove control chars
    text = re.sub(r"[ \t\f\v]+", " ", text)
    text = re.sub(r"\r\n?", "\n", text)
    text = re.sub(r"[\x00-\x08\x0b\x0c\x0e-\x1f]", "", text)
    return text.strip()


def is_failed_content(text: str | None) -> bool:
    """
    Return True if the page 'content' looks like a fetch error.
    We treat None/empty as failed too.
    """
    if not text:
        return True
    s = text.strip()
    if not s:
        return True
    # cheap checks that match your current error shapes
    prefixes = ("Request failed:", "Error ")
    return s.startswith(prefixes)

def filter_failed_and_tiny(
    items: List[Dict[str, Any]],
    min_chars: int = 400,
) -> Tuple[List[Dict[str, Any]], List[Dict[str, Any]]]:
    """
    Keep only items that:
      - do NOT look like failures, and
      - have non-trivial content length after cleanup (>= min_chars)

    Returns:
      kept, dropped
    """
    kept: List[Dict[str, Any]] = []
    dropped: List[Dict[str, Any]] = []

    for it in items:
        content = it.get("content") or ""
        if is_failed_content(content):
            dropped.append(it)
            continue

        cleaned = _clean(content)
        if len(cleaned) >= min_chars:
            kept.append(it)
        else:
            dropped.append(it)

    return kept, dropped


#############################
## Formatting for the LLM
#############################
def to_snippet_only(items: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
    """
    Keep only the minimal fields for snippet-only fallback.
    """
    out: List[Dict[str, Any]] = []
    for it in items:
        out.append({
            "title": (it.get("title") or "") if isinstance(it.get("title"), str) else "",
            "link": it.get("link"),
            "snippet": it.get("snippet") if isinstance(it.get("snippet"), str) else None,
        })
    return out

def format_results_for_llm_snippet_only(
    results: List[Dict[str, Any]],
    notice: str = "Some sources could not be fetched, so only snippets are available. Use them cautiously.",
) -> str:
    """
    Build a prompt that includes ONLY title + URL + snippet per source,
    and prepends a clear notice for the model.
    """
    lines: List[str] = []
    lines.append(notice)
    lines.append("")
    lines.append("SOURCES (snippet-only):")

    for i, item in enumerate(results, start=1):
        title = _clean(item.get("title") or "") or "(Untitled)"
        link = (item.get("link") or "").strip()
        snippet = _clean(item.get("snippet") or "")

        lines.append(f"[S{i}] {title}")
        if link:
            lines.append(f"URL: {link}")
        if snippet:
            lines.append(f"Snippet: {snippet}")
        lines.append("-" * 60)

    return "\n".join(lines)

def format_results_for_llm(
    results: List[Dict[str, Any]],
) -> str:
    """
    Turn `search_and_fetch(...)` results into a single prompt context string.

    Each source is labeled [S#] so you can ask the LLM to cite as [S1], [S2], etc.

    Args:
        results: list of dicts with keys: title, link, snippet, content, engines, category
        query:   the user question (placed at the end as instruction context)
        max_context_chars: total budget for all source contents combined
        min_per_doc: minimum chars allocated for each doc's content

    Returns:
        A single string suitable to pass as the LLM's context.
    """
    n = max(1, len(results))

    lines: List[str] = []
    lines.append("You are a careful assistant. Use the SOURCES below to answer the QUESTION.")
    lines.append("")
    lines.append("SOURCES:")

    for i, item in enumerate(results, start=1):
        title = _clean(item.get("title") or "") or "(Untitled)"
        link = (item.get("link") or "").strip()
        snippet = _clean(item.get("snippet") or "")
        category = item.get("category") or None
        content = _clean(item.get("content") or "")

        header_bits = [f"[S{i}] {title}"]
        if category:
            header_bits.append(f"(category: {category})")
        lines.append(" ".join(header_bits))
        if link:
            lines.append(f"URL: {link}")
        if snippet:
            lines.append(f"Snippet: {snippet}")
        lines.append("Content:")
        lines.append(content if content else "(empty)")
        lines.append("-" * 60)

    return "\n".join(lines)

results = search_and_fetch("What is the capital of France?", k=10)
kept, dropped = filter_failed_and_tiny(results, min_chars=400)

if len(kept) >= MIN_SOURCES:
    prompt = format_results_for_llm(kept)
else:
    snippet_only = to_snippet_only(results)
    prompt = format_results_for_llm_snippet_only(snippet_only)
print(prompt)

print(f"\n\nDropped {len(dropped)} tiny/failed results.")


INFO:merlin-rag.search:search_and_fetch summary | searched=10, deduped=10, fetched=10, successes=6, failures=4, snippet_only=False


You are a careful assistant. Use the SOURCES below to answer the QUESTION.

SOURCES:
[S1] Paris facts: the capital of France in history (category: general)
URL: https://home.adelphi.edu/~ca19535/page%204.html
Snippet: Paris is the capital of France, the largest country of Europe with 550 000 km2 (65 millions inhabitants). Paris has 2.234 million inhabitants end 2011.
Content:
Title: page 4

URL Source: https://home.adelphi.edu/~ca19535/page%204.html

Markdown Content:
Paris is the **capital of [France](http://www.parisdigest.com/famous_places_in_france.htm)**, the largest country of [Europe](http://www.parisdigest.com/famous_places_in_europe) with 550 000 km2 (65 millions inhabitants).

Paris has 2.234 million inhabitants end 2011. She is the core of Ile de France region (12 million people).

Founded more than 2000 years ago, Paris is a modern and vibrant city with significant [commercial](http://www.parisdigest.com/menus/shopping.htm), [cultural](http://www.parisdigest.com/goingout/sh

In [52]:
dropped

[{'title': 'Paris - Wikipedia',
  'link': 'https://en.wikipedia.org/wiki/Paris',
  'snippet': 'Paris[a] is the capital and largest city of France, with an estimated city center population of 2,048,472, and a metropolitan population of 13,171,056 as of January 2025 [update][3] in an area of more than 105 km 2 (41 sq mi). It is located in the centre of the Île-de-France region. Paris is the fourth-most populous city in the European Union.',
  'engines': ['startpage', 'brave', 'duckduckgo', 'google'],
  'category': 'general',
  'content': "Request failed: ('Connection aborted.', RemoteDisconnected('Remote end closed connection without response'))"},
 {'title': 'Discover the city of Paris | Paris the capital city of France',
  'link': 'https://www.cia-france.com/french-kids-teenage-courses/paris-school/visit-paris',
  'snippet': 'You can climb to the top of the ... stores of Le Marais and the Place des Vosges. Enjoying an ice-cream or macaroons on a terrace, enjoying the parks and gardens,