# patent search engine

As a research scientist, I often come up with new ideas and need to quickly check whether similar inventions already exist. Since the main way to protect and monetize an idea is by filing a patent—especially if it is novel and commercially valuable, I wanted to search existing patents efficiently. The process is tedious and this agent can help patent agent in their search before reviewing to clients their innovation.
I received an API key from PatentsView, which provides direct access to USPTO data. I plan to use it as my primary source, since it’s more structured, complete, and accurate. In parallel, I’m also using SerpApi’s free tier to run Google Patents searches and download the corresponding patent PDFs. Since the patent id search can be tedious the patent view api can return more than 10k patents using chatgpt api would not be feasible without ranking the patent first semantically based on similarity with the query and the claim.

# service 1: check if a patent exists

I used keyword search initially,
but the results were not that impressive. after retrieval based on claims keyword search i then used a transformer to gte embeddings. Ranked the patents by similarity to the user's query. got the top 10 and then fetched the top 10 full claims to let the llm compare them with the user query. -- i could have used open ai models too the small ones.
After that fed it to the LLLM (sorry could not only use the libraries provided since i wanted a small llm that runs on cpu to get embedddings in a fast way).
uv add google-search-results was added to the virtual environment.

In [27]:
import os
import re
import json
import requests
import numpy as np
from typing import Any, Dict, List, Optional, Tuple
from pydantic import BaseModel, ValidationError

# ---------- LLM ----------
from openai import OpenAI, OpenAIError

# ---------- Sentence-Transformers (required for semantic ranking) ----------
try:
    from sentence_transformers import SentenceTransformer
    import torch
except ImportError as e:
    raise RuntimeError(
        "sentence-transformers is required. Install with: pip install sentence-transformers"
    ) from e

# ---------- SerpApi (Google Patents details) ----------
try:
    import serpapi
    from serpapi import GoogleSearch
except ImportError:
    serpapi = None
    GoogleSearch = None
    print("SerpApi library not found. Please run: pip install google-search-results")

PATENTSVIEW_API_KEY = os.environ.get("PATENTSVIEW_API_KEY")
OPENAI_API_KEY      = os.environ.get("OPENAI_API_KEY")
SERPAPI_API_KEY     = os.environ.get("SERPAPI_API_KEY")

if not PATENTSVIEW_API_KEY:
    raise RuntimeError("PATENTSVIEW_API_KEY is not set.")
if not OPENAI_API_KEY:
    raise RuntimeError("OPENAI_API_KEY is not set.")

OPENAI_CLIENT = OpenAI(api_key=OPENAI_API_KEY)
SERPAPI_API_KEY_SET = bool(SERPAPI_API_KEY) and (serpapi is not None)


PATENT_ENDPOINT = "https://search.patentsview.org/api/v1/patent/"
#to get the patents from the query
CLAIMS_ENDPOINT = "https://search.patentsview.org/api/v1/g_claim/"   # <-- correct path

def _base_headers():
    return {
        "X-Api-Key": PATENTSVIEW_API_KEY,   # <-- correct header key
        "Accept": "application/json"
    }

# =========================
# Models
# =========================
class Patent(BaseModel):
    patent_number: str
    application_number: str
    title: str
    snippet: str
    publication_date: str
    inventor: str
    assignee: str
    status: str
    patent_link: str
    pdf_link: Optional[str] = None
    claims_snippet: Optional[str] = None
    claims_summary: Optional[str] = None

class LLMSummary(BaseModel):
    summary_text: str
    top_2_relevant_patents: List[Patent]


def fetch_patentsview_claims_by_id(patent_id: str, max_claims_per_patent: int = 100) -> Optional[str]:
    """
    Fetches all claims for a *specific* patent_id using the /g_claim endpoint.
    This is a reliable fallback for when SerpApi fails.
    """
    q_obj = {"patent_id": str(patent_id)}
    f = ["claim_sequence", "claim_text"]
    o = {"size": max_claims_per_patent}
    s = [{"claim_sequence": "asc"}] # Sort claims 1, 2, 3...
    body = {"q": q_obj, "f": f, "o": o, "s": s}

    try:
        resp = requests.post(
            CLAIMS_ENDPOINT,
            headers={**_base_headers(), "Content-Type": "application/json"},
            data=json.dumps(body),
            timeout=45
        )
        resp.raise_for_status()
        data = resp.json()

        claims = data.get("g_claims", [])
        if not claims:
            print(f"PatentsView /g_claim had no claims for {patent_id}")
            return None

        # Aggregate and return
        aggregated_text = " ".join([c.get("claim_text", "") for c in claims if c.get("claim_text")])
        return aggregated_text if aggregated_text else None

    except requests.HTTPError as e:
        status = getattr(e.response, "status_code", "?")
        print(f"/g_claim HTTP {status} for {patent_id}: {e.response.text if e.response else ''}")
        return None
    except requests.RequestException as e:
        print(f"/g_claim connection error for {patent_id}: {e}")
        return None


# =========================
# SerpApi Helper
# =========================
def fetch_serpapi_details(patent_id: str) -> Tuple[Optional[str], Optional[str]]:
    if not SERPAPI_API_KEY_SET:
        print("SerpApi key not set, skipping detail fetch.")
        return None, None
    try:
        params = {
            "api_key": SERPAPI_API_KEY,
            "engine": "google_patents_details",
            "patent_id": f"patent/US{patent_id}",
        }
        search = GoogleSearch(params)
        results = search.get_dict()

        if "error" in results:
            print(f"SerpApi Error for {patent_id}: {results['error']}")
            return None, None

        pdf_link = results.get("pdf")
        claims = results.get("claims")

        claims_text: Optional[str] = None
        if isinstance(claims, list):
            pieces = []
            for c in claims[:5]:
                if isinstance(c, dict):
                    pieces.append(str(c.get("text", "")))
                else:
                    pieces.append(str(c))
            claims_text = " ".join(pieces).strip() or None
        elif isinstance(claims, str):
            claims_text = claims.strip() or None
        return pdf_link, claims_text
    except Exception as e:
        print(f"Error fetching SerpApi details for {patent_id}: {e}")
        return None, None

# =========================
# Simple keyword helpers
# =========================
STOPWORDS = {
    "the", "a", "an", "of", "and", "or", "for", "to", "with", "in", "on",
    "at", "from", "by", "that", "this", "these", "those", "you", "your",
    "their", "its", "is", "are", "was", "were", "be", "being", "been",
    "as", "it", "into", "about", "over", "under", "between", "within"
}
def _extract_keywords(text: str) -> List[str]:
    tokens = re.split(r"[^a-zA-Z0-9]+", text.lower())
    return [t for t in tokens if len(t) >= 3 and t not in STOPWORDS]
def _make_text_value(keywords: List[str]) -> Optional[str]:
    return " ".join(keywords) if keywords else None

# =========================
# CLAIMS: retrieve claim_text (and patent_id) then aggregate per patent
# =========================
def fetch_claims_with_text(
    search_text: str,
    *,
    rows_per_call: int = 1000,
    max_claims_total: int = 3000,
    max_chars_per_patent: int = 4000,
    use_any_keywords_fallback: bool = True
) -> Dict[str, str]:
    """
    Query the /g_claim endpoint to retrieve claim_text + patent_id.
    Aggregates claim texts PER patent_id (concatenated) and caps length per patent.

    Returns:
         dict: { patent_id: aggregated_claim_text (<= max_chars_per_patent) }
    """
    keywords = _extract_keywords(search_text)
    text_value = _make_text_value(keywords) or search_text

    def _do_query(q_obj: Dict[str, Any]) -> Dict[str, Any]:
        f = ["patent_id", "claim_sequence", "claim_text"]
        o = {"size": min(rows_per_call, 1000)}
        body = {"q": q_obj, "f": f, "o": o}
        resp = requests.post(
            CLAIMS_ENDPOINT,
            headers={**_base_headers(), "Content-Type": "application/json"},
            data=json.dumps(body),
            timeout=45
        )
        resp.raise_for_status()
        return resp.json()

    # try phrase OR all-keyword
    q_all = {
        "_or": [
            {"_text_phrase": {"claim_text": search_text}},
            {"_text_all": {"claim_text": text_value}},
        ]
    }

    try:
        result = _do_query(q_all)
        claims = result.get("g_claims", [])  # response key for g_claim

        # fallback to any-keywords if needed
        if not claims and use_any_keywords_fallback and keywords and len(keywords) > 1:
            q_any = {"_text_any": {"claim_text": text_value}}
            result = _do_query(q_any)
            claims = result.get("g_claims", [])

        # Aggregate per patent and cap size
        per_patent: Dict[str, List[str]] = {}
        count = 0
        for c in claims:
            pid = str(c.get("patent_id"))
            ctext = c.get("claim_text", "")
            if not pid or not ctext:
                continue
            per_patent.setdefault(pid, []).append(ctext)
            count += 1
            if count >= max_claims_total:
                break

        aggregated: Dict[str, str] = {}
        for pid, lst in per_patent.items():
            joined = " ".join(lst)
            if len(joined) > max_chars_per_patent:
                joined = joined[:max_chars_per_patent]
            aggregated[pid] = joined

        return aggregated

    except requests.HTTPError as e:
        status = getattr(e.response, "status_code", "?")
        text = getattr(e.response, "text", "")
        print(f"/g_claim HTTP {status}: {text[:300]}")
        return {}
    except requests.RequestException as e:
        print(f"/g_claim connection error: {e}")
        return {}

# =========================
# Semantic Ranking
# =========================
def load_st_model(name: str = "all-MiniLM-L6-v2", device: Optional[str] = None) -> SentenceTransformer:
    if device is None:
        device = "cuda" if torch.cuda.is_available() else "cpu"
    print(f"Loading SentenceTransformer '{name}' on device: {device}")
    return SentenceTransformer(name, device=device)

def embed_texts(model: SentenceTransformer, texts: List[str], batch_size: int = 64) -> np.ndarray:
    embs = model.encode(
        texts,
        batch_size=batch_size,
        show_progress_bar=False,
        convert_to_numpy=True,
        normalize_embeddings=True
    )
    return embs

def cosine_sim_matrix(query_emb: np.ndarray, doc_embs: np.ndarray) -> np.ndarray:
    return doc_embs @ query_emb   # normalized -> dot = cosine


def semantic_rank_patents_by_claims(
    search_text: str,
    *,
    model: Optional[SentenceTransformer] = None,
    top_k: int = 50,
    claim_rows_per_call: int = 1000,
    max_claims_total: int = 3000,
    max_chars_per_patent: int = 4000
) -> Tuple[List[Tuple[str, str]], int]:
    """
    1) Fetch claims w/ text from /g_claim
    2) Embed query + aggregated claim_text
    3) Rank by cosine similarity
    4) Return top_k (patent_id, aggregated_claim_text) tuples and total count
    """
    aggregated = fetch_claims_with_text(
        search_text,
        rows_per_call=claim_rows_per_call,
        max_claims_total=max_claims_total,
        max_chars_per_patent=max_chars_per_patent
    )
    total_aggregated_count = len(aggregated)
    print(f"Fetched and aggregated claims for {total_aggregated_count} patents.")

    if not aggregated:
        return [], 0

    model = model or load_st_model()
    pids = list(aggregated.keys())
    claim_texts = [aggregated[pid] for pid in pids]

    query_emb = embed_texts(model, [search_text])[0]
    doc_embs  = embed_texts(model, claim_texts)

    sims = cosine_sim_matrix(query_emb, doc_embs)
    order = np.argsort(-sims)
    top_k = min(top_k, len(order))
    top_idx = order[:top_k]

    # Return list of (patent_id, claim_text) tuples
    top_ranked_tuples = [(pids[i], claim_texts[i]) for i in top_idx]

    return top_ranked_tuples, total_aggregated_count

# =========================
# NEW: LLM Selection based on Claims
# =========================
def llm_select_top_patents_from_claims(
    query: str,
    ranked_claims: List[Tuple[str, str]],
    top_n: int = 2
) -> List[str]:
    """
    Asks the LLM to analyze a list of (patent_id, aggregated_claim_text)
    and select the top_n most relevant patent_ids.
    """
    print(f"Sending {len(ranked_claims)} semantically-ranked claims to LLM for selection...")

    # Format for the prompt
    claims_list_for_llm = [
        {"patent_id": pid, "aggregated_claim_text": text}
        for pid, text in ranked_claims
    ]
    claims_json_str = json.dumps(claims_list_for_llm, indent=2)

    system_prompt = (
        "You are a patent research analyst. Your task is to analyze a list of "
        "semantically-ranked patents based on their aggregated claims and "
        "select the *most* relevant ones based on the user's query. "
        "You must return a JSON object containing a single key: 'top_patent_ids', "
        "which is a list of the patent ID strings you selected."
    )

    user_prompt = (
        f"My query is: '{query}'.\n\n"
        f"Here is a list of {len(ranked_claims)} patents, pre-ranked by semantic "
        "similarity of their claims. Please analyze the 'aggregated_claim_text' "
        f"for each and identify the {top_n} patent_ids that are *most* relevant "
        "to my query.\n\n"
        f"Patents to analyze:\n{claims_json_str}\n\n"
        f"Return a JSON object with a 'top_patent_ids' key containing a list "
        f"of exactly {top_n} patent ID strings. For example: "
        f"{json.dumps({'top_patent_ids': ['1234567', '7654321']})}"
    )

    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": user_prompt},
    ]

    try:
        completion = OPENAI_CLIENT.chat.completions.create(
            model="gpt-4o-mini",
            messages=messages,
            temperature=0.1,
            response_format={"type": "json_object"},
        )
        llm_response_json = completion.choices[0].message.content.strip()
        llm_data = json.loads(llm_response_json)

        if "top_patent_ids" not in llm_data or not isinstance(llm_data["top_patent_ids"], list):
            print(f"LLM selection response was not in the expected format: {llm_response_json}")
            return []

        selected_ids = [str(pid) for pid in llm_data["top_patent_ids"]]
        return selected_ids[:top_n] # Ensure we only return top_n

    except (OpenAIError, json.JSONDecodeError, ValidationError) as e:
        print("\n--- Error during LLM claim-based selection ---")
        print(f"Error: {e}")
        raw_output = "No response"
        if "llm_response_json" in locals():
            raw_output = llm_response_json
        print(f"Raw LLM Output:\n{raw_output}")
        return []

# =========================
# Patent endpoint search (MODIFIED)
# =========================
def call_patentsview_search(
    search_text: str,
    *,
    rows: int = 10,
    publication_from: Optional[str] = None,
    publication_to: Optional[str] = None,
    claim_match_patent_ids: Optional[List[str]] = None,
    search_by_text: bool = True # NEW parameter
) -> Dict[str, Any]:

    core_query_parts = []

    # Add text search clause ONLY if search_by_text is True
    if search_by_text and search_text:
        keywords = _extract_keywords(search_text)
        text_value = _make_text_value(keywords) or search_text

        title_clause = {"_or": [
            {"_text_phrase": {"patent_title": search_text}},
            {"_text_all": {"patent_title": text_value}},
        ]}
        abstract_clause = {"_or": [
            {"_text_phrase": {"patent_abstract": search_text}},
            {"_text_all": {"patent_abstract": text_value}},
        ]}
        title_abstract_clause = {"_or": [title_clause, abstract_clause]}
        core_query_parts.append(title_abstract_clause)

    # Add patent ID clause if provided
    if claim_match_patent_ids:
        core_query_parts.append({"patent_id": claim_match_patent_ids})

    if not core_query_parts:
        return {"error": True, "message": "No search query or patent IDs provided."}

    # Combine clauses with _or
    core_query = {"_or": core_query_parts} if len(core_query_parts) > 1 else core_query_parts[0]

    date_clauses = []
    if publication_from:
        date_clauses.append({"_gte": {"patent_date": publication_from}})
    if publication_to:
        date_clauses.append({"_lte": {"patent_date": publication_to}})

    q = {"_and": [core_query] + date_clauses} if date_clauses else core_query

    f = [
        "patent_id",
        "patent_title",
        "patent_date",
        "patent_abstract",
        "inventors.inventor_name_first",
        "inventors.inventor_name_last",
        "assignees.assignee_organization",
        "assignees.assignee_first_name",
        "assignees.assignee_last_name",
    ]
    s = [{"patent_date": "desc"}]
    o = {"size": min(rows, 1000)}
    body = {"q": q, "f": f, "s": s, "o": o}

    try:
        resp = requests.post(
            PATENT_ENDPOINT,
            headers={**_base_headers(), "Content-Type": "application/json"},
            data=json.dumps(body),
            timeout=45,
        )
        resp.raise_for_status()
        data = resp.json()
        if isinstance(data, dict) and data.get("error"):
            return {"error": True, "message": f"PatentsView API error: {data}"}
        return data
    except requests.HTTPError as e:
        status = getattr(e.response, "status_code", "?")
        return {"error": True, "message": f"PatentsView HTTP {status}: {e.response.text if e.response else ''}"}
    except requests.RequestException as e:
        return {"error": True, "message": f"PatentsView connection error: {e}"}


def extract_patentsview_patent_summaries(api_response: Dict[str, Any]) -> List[Patent]:
    candidates = api_response.get("patents", [])
    if not candidates:
        return []

    summaries: List[Patent] = []
    for item in candidates:
        inventor_list = []
        for inv in item.get("inventors", []):
            first = inv.get("inventor_name_first") or inv.get("inventor_first_name", "")
            last = inv.get("inventor_name_last") or inv.get("inventor_last_name", "")
            name = f"{first} {last}".strip()
            if name:
                inventor_list.append(name)
        inventor_str = ", ".join(inventor_list) if inventor_list else "Unknown inventor"

        assignee_list = []
        for ass in item.get("assignees", []):
            org = ass.get("assignee_organization")
            if org:
                assignee_list.append(org.strip())
            else:
                first = ass.get("assignee_first_name", "")
                last = ass.get("assignee_last_name", "")
                name = f"{first} {last}".strip()
                if name:
                    assignee_list.append(name)
        assignee_str = ", ".join(assignee_list) if assignee_list else "Unknown assignee"

        patent_id = str(item.get("patent_id", "N/A"))
        title = str(item.get("patent_title", "No title available"))
        abstract = str(item.get("patent_abstract", "No snippet available."))
        date = str(item.get("patent_date", "Unknown"))

        summaries.append(
            Patent(
                patent_number=patent_id,
                application_number="N/A",
                title=title,
                snippet=abstract,
                publication_date=date,
                inventor=inventor_str,
                assignee=assignee_str,
                status="Unknown status",
                patent_link=f"https://patents.google.com/patent/US{patent_id}",
                pdf_link=None,
                claims_snippet=None,
                claims_summary=None,
            )
        )
    return summaries


def download_patents(patents: List[Patent], save_dir: str):
    if not os.path.exists(save_dir):
        try:
            os.makedirs(save_dir, exist_ok=True)
            print(f"Created directory: {save_dir}")
        except OSError as e:
            print(f"Error creating directory {save_dir}: {e}")
            return

    for patent in patents:
        if not patent.pdf_link or not patent.pdf_link.startswith("http"):
            print(f"Skipping {patent.patent_number}: No valid PDF link found.")
            continue
        try:
            print(f"Downloading PDF for {patent.patent_number}...")
            pdf_response = requests.get(patent.pdf_link, timeout=20)
            pdf_response.raise_for_status()
            safe_filename = re.sub(r"[^\w\.-]", "_", patent.patent_number) + ".pdf"
            save_path = os.path.join(save_dir, safe_filename)
            with open(save_path, "wb") as f:
                f.write(pdf_response.content)
            print(f"Successfully saved: {save_path}")
        except requests.RequestException as e:
            print(f"Failed to download {patent.patent_number} from {patent.pdf_link}: {e}")
# =========================
# Main Workflow (REFACTORED)
# =========================
def search_and_summarize_patents(
    query: str,
    *,
    publication_from: Optional[str] = None,
    publication_to: Optional[str] = None,
    top_k_claims_for_llm_ranking: int = 50,
    top_n_final_selection: int = 2
) -> Optional[LLMSummary]:
    """
    Workflow (REVISED):
      1) /g_claim: retrieve *matching claim snippets*
      2) SentenceTransformer: rank patents by snippets -> top 50
      3) PatentsView: *Always* use fetch_patentsview_claims_by_id() to get *full claims text*
      4) SerpApi: (If enabled) *Only* try to fetch the pdf_link
      5) LLM: analyzes top 50 *full claims* -> selects top 2 patent_ids
      ...rest of workflow...
    """
    print("Step 1: Fetch + rank claims semantically (by snippet)...")
    top_ranked_claims, total_aggregated_count = semantic_rank_patents_by_claims(
        query,
        top_k=top_k_claims_for_llm_ranking
    )

    if not top_ranked_claims:
        print(f"Sorry, I couldn't find any patents or applications matching '{query}'.")
        return None

    print(f"Found {total_aggregated_count} patents with matching claims. Enriching top {len(top_ranked_claims)}...")

    # --- MODIFIED: Enrichment Step ---
    enriched_claims_data: Dict[str, Tuple[str, Optional[str], Optional[str]]] = {}
    claims_to_pass_to_llm: List[Tuple[str, str]] = []

    print(f"Step 2: Enriching top {len(top_ranked_claims)} claims (PatentsView + SerpApi for PDF)...")
    for pid, matching_snippet in top_ranked_claims:
        pdf_link: Optional[str] = None
        full_claims_text: Optional[str] = None

        # --- 1. Always get claims from PatentsView (as requested) ---
        #print(f"Fetching claims for {pid} from PatentsView /g_claim...")
        full_claims_text = fetch_patentsview_claims_by_id(pid)

        #if full_claims_text:
        #    print(f"PatentsView success: Found {len(full_claims_text)} chars of claims for {pid}")
        #else:
        #    print(f"PatentsView failed to find claims for {pid}")

        # --- 2. Get PDF link if possible (SerpApi's only remaining job) ---
        if SERPAPI_API_KEY_SET:
            # We only care about the pdf_link from SerpApi now
            # We ignore the claims_text it returns
            pdf_link, _ = fetch_serpapi_details(pid)
            #if pdf_link:
                 #print(f"SerpApi success: Found PDF for {pid}")
            #else:
             #    print(f"SerpApi could not find PDF for {pid}")

        # --- 3. Store results ---
        # Use full_claims_text if we got it, otherwise fall back to the original snippet
        text_for_llm = full_claims_text if full_claims_text else matching_snippet

        claims_to_pass_to_llm.append((pid, text_for_llm))
        # Store everything we found
        enriched_claims_data[pid] = (matching_snippet, pdf_link, full_claims_text)

    print("Enrichment complete.")
    # --- END MODIFIED ---


    print("Step 3: LLM selecting top patents based on aggregated (or enriched) claims...")

    print("Passing claims to LLM (snippets):", [(pid, text[:150] + "...") for pid, text in claims_to_pass_to_llm[:10]])

    llm_selected_ids = llm_select_top_patents_from_claims(
        query,
        claims_to_pass_to_llm,
        top_n=top_n_final_selection
    )

    if not llm_selected_ids:
        print("LLM failed to select any patents from the claims list.")
        return None

    print(f"LLM selected top IDs: {llm_selected_ids}")

    print("Step 4: Fetching full patent details for LLM-selected IDs...")
    # ... (rest of your function remains identical) ...
    raw = call_patentsview_search(
        query,
        rows=len(llm_selected_ids),
        publication_from=publication_from,
        publication_to=publication_to,
        claim_match_patent_ids=llm_selected_ids,
        search_by_text=False
    )

    if raw.get("error"):
        print(f"Error from PatentsView: {raw['message']}")
        return None

    final_patents = extract_patentsview_patent_summaries(raw)

    if not final_patents:
        print(f"Could not fetch details for LLM-selected patents: {llm_selected_ids}")
        return None

    # Inject stored PDF/Claims data into the final Patent objects
    for patent in final_patents:
        if patent.patent_number in enriched_claims_data:
            original_snippet, pdf_link, full_claims = enriched_claims_data[patent.patent_number]
            # Prioritize SerpApi PDF link if it exists
            if pdf_link:
                patent.pdf_link = pdf_link
            # Use full claims if we have them, otherwise fall back to original snippet
            patent.claims_snippet = full_claims if full_claims else original_snippet

    # Re-order patents to match LLM's selection order
    final_patents_map = {p.patent_number: p for p in final_patents}
    ordered_final_patents = [final_patents_map[pid] for pid in llm_selected_ids if pid in final_patents_map]

    print("Step 5: Generating final summary with LLM (based on selected patents)...")
    # ... (rest of your function remains identical) ...
    summaries_list_of_dicts = [p.model_dump() for p in ordered_final_patents]
    summaries_str = json.dumps(summaries_list_of_dicts, indent=2)

    system_prompt = (
        "You are a patent research assistant. Your task is to analyze a *final, pre-selected* "
        "list of patents and return a JSON object with two keys: "
        "1. `summary_text`: A concise, natural-language summary based on the title, "
        "   abstract, and importantly, the `claims_snippet`. "
        "   This summary must explicitly state the total number of patents "
        "   found in the *initial* claim search. "
        "2. `top_2_relevant_patents`: The JSON list of the patent objects I provided."
    )

    user_prompt = (
        f"My query is: '{query}'.\n"
        f"We found {total_aggregated_count} total patents in an initial claim search. "
        "After a multi-step process (semantic ranking + LLM selection), we "
        f"identified these {len(ordered_final_patents)} as the most relevant.\n\n"
        f"Here are the full details for these selected patents (including `claims_snippet` if found):\n"
        f"{summaries_str}\n\n"
        "Please generate a summary for these patents, paying close attention to the `claims_snippet`. "
        f"In your summary, you must state that {total_aggregated_count} total patents "
        "were found in the initial search. "
        "Finally, return a single JSON object containing both the `summary_text` "
        "and the `top_2_relevant_patents` list (which is just the list I provided you)."
    )
    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": user_prompt},
    ]

    try:
        completion = OPENAI_CLIENT.chat.completions.create(
            model="gpt-4o-mini",
            messages=messages,
            temperature=0.2,
            response_format={"type": "json_object"},
        )
        llm_response_json = completion.choices[0].message.content.strip()
        llm_data = json.loads(llm_response_json)

        llm_data['top_2_relevant_patents'] = ordered_final_patents
        validated_response = LLMSummary(**llm_data)

        print("PatentsView claims enrichment is complete.")

        return validated_response

    except (OpenAIError, json.JSONDecodeError, ValidationError) as e:
        print("\n--- Error parsing FINAL LLM summary response ---")
        print(f"Error: {e}")
        raw_output = "No response"
        if "llm_response_json" in locals():
            raw_output = llm_response_json
        print(f"Raw LLM Output:\n{raw_output}")
        return None


# Main
# =========================
if __name__ == "__main__":
    SAVE_DIRECTORY = "./fetched_patents"

    query = "a flying car that can transform between driving and flying modes"
    print(f"--- Running Service 1 for query: '{query}' (PatentsView + ST + SerpApi) ---")

    llm_summary_object = search_and_summarize_patents(
        query,
        top_k_claims_for_llm_ranking=8, # How many claims to send to LLM for ranking the less the better for the lmm
        top_n_final_selection=2          # How many patents the LLM should select
    )

    if llm_summary_object:
        print("\n--- LLM Summary Text ---")
        print(llm_summary_object.summary_text)
        print("\n--- LLM Selected Top 2 Patents (Pydantic Objects) ---")
        for i, patent in enumerate(llm_summary_object.top_2_relevant_patents):
            print(f"\n--- Top {i + 1} Relevant Patent ---")
            print(patent.model_dump_json(indent=2))
        print("\n--- PDF Download ---")
        download_patents(llm_summary_object.top_2_relevant_patents, SAVE_DIRECTORY)
    else:
        print("Search failed to produce a valid summary object.")

SerpApi library not found. Please run: pip install google-search-results
--- Running Service 1 for query: 'a flying car that can transform between driving and flying modes' (PatentsView + ST + SerpApi) ---
Step 1: Fetch + rank claims semantically (by snippet)...
Fetched and aggregated claims for 372 patents.
Loading SentenceTransformer 'all-MiniLM-L6-v2' on device: cpu
Found 372 patents with matching claims. Enriching top 8...
Step 2: Enriching top 8 claims (PatentsView + SerpApi for PDF)...
Enrichment complete.
Step 3: LLM selecting top patents based on aggregated (or enriched) claims...
Passing claims to LLM (snippets): [('11214275', '1. A vehicle comprising: a steering wheel; one or more vehicle drive systems comprising at least a vehicle suspension system; one or more driving mode...'), ('11214277', '1. A control device for a vehicle, capable of switching between an automated driving mode in which a driving force of a vehicle that has a power sourc...'), ('11214368', '1. A system f

# Service 2: semantically compare the uploaded innovation with the top2 patents
added pytesseract and pdf2image to enviroment. This let the user upload a pdf and it is compared semantically to the top 2 fetched patents pdfs. sometimes those pdf needs ocr to be read since even text are really not readable by pypdf but adding ocr takes a lot of time.
but this took a lot of time! ON colab it was working fine (maybe bacause my documents were mr). I re run the
So i took the pdf from the user and used

I changed it to make sure that the users query is patentable.

In [7]:
pip install chromadb

Collecting chromadb
  Downloading chromadb-1.3.4-cp39-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (7.2 kB)
Collecting pybase64>=1.4.1 (from chromadb)
  Downloading pybase64-1.4.2-cp312-cp312-manylinux1_x86_64.manylinux2014_x86_64.manylinux_2_17_x86_64.manylinux_2_5_x86_64.whl.metadata (8.7 kB)
Collecting posthog<6.0.0,>=2.4.0 (from chromadb)
  Downloading posthog-5.4.0-py3-none-any.whl.metadata (5.7 kB)
Collecting onnxruntime>=1.14.1 (from chromadb)
  Downloading onnxruntime-1.23.2-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (5.1 kB)
Collecting opentelemetry-exporter-otlp-proto-grpc>=1.2.0 (from chromadb)
  Downloading opentelemetry_exporter_otlp_proto_grpc-1.38.0-py3-none-any.whl.metadata (2.4 kB)
Collecting pypika>=0.48.9 (from chromadb)
  Downloading PyPika-0.48.9.tar.gz (67 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m67.3/67.3 kB[0m [31m2.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Installing build dependencies ... [?

In [1]:
pip install pytesseract

Collecting pytesseract
  Downloading pytesseract-0.3.13-py3-none-any.whl.metadata (11 kB)
Downloading pytesseract-0.3.13-py3-none-any.whl (14 kB)
Installing collected packages: pytesseract
Successfully installed pytesseract-0.3.13


In [4]:
pip install pdf2image

Collecting pdf2image
  Downloading pdf2image-1.17.0-py3-none-any.whl.metadata (6.2 kB)
Downloading pdf2image-1.17.0-py3-none-any.whl (11 kB)
Installing collected packages: pdf2image
Successfully installed pdf2image-1.17.0


In [28]:
import os
import json
import re
from openai import OpenAI, OpenAIError
from typing import Any, Dict, List, Optional
import chromadb

# --- PDF Parsing Imports ---
import pytesseract
from pdf2image import convert_from_path
from PIL import Image
from pypdf import PdfReader

# --- OpenAI Client Initialization ---
try:
    client = OpenAI(api_key=os.environ.get("OPENAI_API_KEY"))
    OPENAI_API_KEY_SET = bool(os.environ.get("OPENAI_API_KEY"))
except TypeError:
    client = None
    OPENAI_API_KEY_SET = False

# ---------------------------------------------------------------
# PDF PARSING HELPERS (From Your Prompt)
# ---------------------------------------------------------------

def extract_text_with_pypdf(pdf_path: str) -> str:
    """
    First attempt to extract text from the PDF using pypdf.
    Returns the extracted text (may be empty string if none found).
    """
    try:
        reader = PdfReader(pdf_path)
        all_text = []

        for i, page in enumerate(reader.pages):
            page_text = page.extract_text() or ""  # pypdf may return None
            if page_text.strip():
                all_text.append(f"--- Page {i + 1} (pypdf) ---\n{page_text}\n")

        return "\n".join(all_text).strip()

    except Exception as e:
        print(f"pypdf extraction failed: {e}")
        return ""


def ocr_pdf_to_text(pdf_path: str) -> (str, str):
    """
    Extracts text from an image-based PDF using OCR.
    Returns (text, error_message) tuple.
    """
    if not os.path.exists(pdf_path):
        return None, f"Error: File not found at path: {pdf_path}"

    print(f"Starting OCR process for: {pdf_path}")
    try:
        images = convert_from_path(pdf_path)
        all_text = ""

        for i, page_image in enumerate(images):
            print(f"Processing Page {i + 1} (OCR)...")
            text = pytesseract.image_to_string(page_image)
            all_text += f"--- Page {i + 1} (OCR) ---\n"
            all_text += text + "\n\n"

        if not all_text.strip():
            return None, "Error: OCR completed but returned no text."

        return all_text.strip(), None

    except Exception as e:
        return None, f"An error occurred during the OCR process: {e}"


def pdf_to_text_with_fallback(pdf_path: str) -> (str, str):
    """
    Try to extract text using pypdf first.
    If no text is found, fall back to OCR.
    Returns (text, error_message) tuple.
    """
    if not os.path.exists(pdf_path):
        return None, f"Error: File not found at path: {pdf_path}"

    print(f"Attempting text extraction with pypdf for: {pdf_path}")
    text_pypdf = extract_text_with_pypdf(pdf_path)

    if text_pypdf and text_pypdf.strip():
        print("Text successfully extracted with pypdf. Skipping OCR.")
        return text_pypdf, None

    print(f"No extractable text found with pypdf for {os.path.basename(pdf_path)}. Falling back to OCR...")
    return ocr_pdf_to_text(pdf_path)

# ---------------------------------------------------------------
# SERVICE 2: CHROMA & SEMANTIC FUNCTIONS
# ---------------------------------------------------------------

def chunk_text(text: str) -> List[str]:
    """Splits text into paragraphs for embedding."""
    chunks = re.split(r'\n\s*\n', text)
    return [chunk.strip() for chunk in chunks if chunk.strip()]

def get_embedding(text: str):
    """Use OpenAI embeddings for semantic indexing."""
    if not client:
        raise ValueError("OpenAI client is not initialized.")
    emb = client.embeddings.create(
        input=text,
        model="text-embedding-3-small"
    )
    return emb.data[0].embedding

def init_collection(path="chroma_patent_store", name="patent_comparison"):
    """
    Creates a persistent Chroma client and gets/creates the collection.
    """
    chroma_client = chromadb.PersistentClient(path=path)
    collection = chroma_client.get_or_create_collection(name=name)
    print(f"[Service 2] ChromaDB collection '{name}' loaded from path '{path}'.")
    return collection

def populate_collection(collection, patent_folder_path: str):
    """
    Finds PDFs in the folder, extracts text (with fallback), chunks, and adds to Chroma.
    """
    collection.delete(where={"source_patent": {"$ne": "dummy"}})
    print("[Service 2] Cleared old patent data from ChromaDB.")

    pdf_files = [f for f in os.listdir(patent_folder_path) if f.endswith(".pdf")]
    if not pdf_files:
        print(f"No .pdf files found in {patent_folder_path}")
        return False

    print(f"Found {len(pdf_files)} patents to process in {patent_folder_path}")

    for pdf_file in pdf_files:
        full_path = os.path.join(patent_folder_path, pdf_file)

        # --- MODIFIED: Uses your pdf_to_text_with_fallback function ---
        print(f"\n[Service 2] Processing patent file: {pdf_file}...")
        text, error = pdf_to_text_with_fallback(full_path)

        if error:
            print(f"Skipping {pdf_file}: {error}")
            continue

        chunks = chunk_text(text)
        if not chunks:
            print(f"Skipping {pdf_file}: No text chunks found after processing.")
            continue

        chunk_ids = [f"{pdf_file}_chunk_{i}" for i in range(len(chunks))]
        metadatas = [{"source_patent": pdf_file} for _ in chunks]

        try:
            embeddings = [get_embedding(c) for c in chunks]
            collection.add(
                documents=chunks,
                embeddings=embeddings,
                ids=chunk_ids,
                metadatas=metadatas
            )
            print(f"[Service 2] Added {len(chunks)} chunks for {pdf_file} to collection.")
        except Exception as e:
            print(f"Error adding {pdf_file} to Chroma: {e}")

    return True

def run_semantic_comparison(collection, user_pdf_path: str) -> str:
    """
    Performs semantic comparison and uses OpenAI to summarize.
    """
    if not client:
        return "Error: OpenAI client not set for summarization."

    # 1. Parse the user's "query" PDF using the robust fallback method
    print(f"\n[Service 2] Parsing user PDF: {user_pdf_path}")

    # --- Uses your new pdf_to_text_with_fallback function ---
    query_text, error = pdf_to_text_with_fallback(user_pdf_path)

    if error:
        return error

    query_chunks = chunk_text(query_text)
    if not query_chunks:
        return "Error: User PDF contains no text to query with."

    # 2. Run semantic search for each chunk
    print(f"[Service 2] Querying ChromaDB with {len(query_chunks)} chunks from user PDF...")
    all_results = {} # To store distances for each patent

    for i, chunk in enumerate(query_chunks):
        try:
            query_emb = get_embedding(chunk)
            results = collection.query(
                query_embeddings=[query_emb],
                n_results=5
            )

            for dist, meta in zip(results['distances'][0], results['metadatas'][0]):
                patent_id = meta['source_patent']
                if patent_id not in all_results:
                    all_results[patent_id] = []
                all_results[patent_id].append(dist)

        except Exception as e:
            print(f"Warning: Error querying for chunk {i}: {e}")

    if not all_results:
        return "No semantic matches found in ChromaDB for the user's PDF."

    # 3. Aggregate scores
    final_scores = []
    for patent_id, distances in all_results.items():
        avg_distance = sum(distances) / len(distances)
        final_scores.append({
            "patent_file": patent_id,
            "average_similarity_score": avg_distance,
            "matching_chunks": len(distances)
        })

    final_scores.sort(key=lambda x: x['average_similarity_score'])

    # 4. Ask the LLM to summarize the findings
    prompt = (
        f"You are a helpful assistant. I have semantically compared my document ('{os.path.basename(user_pdf_path)}') "
        f"against patent PDFs. The analysis resulted in the following similarity scores. "
        f"A lower 'average_similarity_score' means the documents are MORE similar.\n\n"
        f"Comparison Results:\n{json.dumps(final_scores, indent=2)}\n\n"
        f"Question: Based on these scores, which patent is more relevant to my document and why?"
    )

    try:
        answer = client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[
                {"role": "system", "content": "You summarize semantic comparison results clearly and concisely."},
                {"role": "user", "content": prompt}
            ],
            temperature=0.5
        )
        return answer.choices[0].message.content.strip()
    except OpenAIError as e:
        return f"Error during LLM summarization: {e}"

# ---------------------------------------------------------------
# MAIN EXECUTION
# ---------------------------------------------------------------

if __name__ == "__main__":

    # --- DEFINE YOUR PATHS HERE ---

    # 1. Path to the folder with the 2 downloaded patent PDFs
    PATENT_FOLDER = "./fetched_patents"

    # 2. Path to YOUR uploaded PDF (the one you want to compare against)
    #    *** YOU MUST CHANGE THIS PATH TO YOUR FILE ***
    USER_PDF_PATH = "./uploaded_patent/Funny_UFO_Wind_Communication_Patent.pdf"

    # 3. Path to store the persistent Chroma database
    CHROMA_DB_PATH = "my_patent_chroma_db"

    # --- END OF CONFIGURATION ---

    if not OPENAI_API_KEY_SET:
        print("Fatal Error: OPENAI_API_KEY is not set. The script cannot run.")
    elif not os.path.exists(PATENT_FOLDER):
        print(f"Fatal Error: Patent folder not found: {PATENT_FOLDER}")
    elif not os.path.exists(USER_PDF_PATH):
        print(f"Fatal Error: User PDF not found. Please update USER_PDF_PATH to point to your file: {USER_PDF_PATH}")
    else:
        # 1. Initialize the collection
        collection = init_collection(path=CHROMA_DB_PATH)

        # 2. Populate the collection with the 2 patent PDFs (using fallback)
        populate_collection(collection, PATENT_FOLDER)

        # 3. Run the semantic comparison
        print("\n--- Running Semantic Comparison ---")
        final_summary = run_semantic_comparison(collection, USER_PDF_PATH)

        print("\n--- FINAL SUMMARY ---")
        print(final_summary)

[Service 2] ChromaDB collection 'patent_comparison' loaded from path 'my_patent_chroma_db'.
[Service 2] Cleared old patent data from ChromaDB.
Found 2 patents to process in ./fetched_patents

[Service 2] Processing patent file: 11214277.pdf...
Attempting text extraction with pypdf for: ./fetched_patents/11214277.pdf
Text successfully extracted with pypdf. Skipping OCR.
[Service 2] Added 21 chunks for 11214277.pdf to collection.

[Service 2] Processing patent file: 11214368.pdf...
Attempting text extraction with pypdf for: ./fetched_patents/11214368.pdf
Text successfully extracted with pypdf. Skipping OCR.
[Service 2] Added 35 chunks for 11214368.pdf to collection.

--- Running Semantic Comparison ---

[Service 2] Parsing user PDF: ./uploaded_patent/Funny_UFO_Wind_Communication_Patent.pdf
Attempting text extraction with pypdf for: ./uploaded_patent/Funny_UFO_Wind_Communication_Patent.pdf
Text successfully extracted with pypdf. Skipping OCR.
[Service 2] Querying ChromaDB with 1 chunks fr

Service 3: seand email report to the lawyer

In [18]:

# --- Imports ---
import os
import smtplib
import ssl
import getpass
from email.message import EmailMessage
from email.policy import SMTPUTF8  # UTF-8 aware policy

# --- Config (env vars preferred; fallback to secure prompt at runtime) ---
SENDER_EMAIL = os.environ.get("SENDER_EMAIL")               # e.g., "yourname@gmail.com"
SENDER_APP_PASSWORD = os.environ.get("SENDER_APP_PASSWORD") # 16-char Gmail App Password (requires 2FA)
EMAIL_CONFIG_SET = bool(SENDER_EMAIL and SENDER_APP_PASSWORD)

def ensure_email_config():
    """
    If EMAIL_CONFIG_SET is False, securely prompt for credentials once.
    Returns True if config is set.
    """
    global SENDER_EMAIL, SENDER_APP_PASSWORD, EMAIL_CONFIG_SET
    if not EMAIL_CONFIG_SET:
        print("Email credentials not found in environment. Enter them to proceed.")
        SENDER_EMAIL = input("Sender Gmail address: ").strip()
        SENDER_APP_PASSWORD = getpass.getpass("Gmail App Password (won’t echo): ").strip()
        EMAIL_CONFIG_SET = bool(SENDER_EMAIL and SENDER_APP_PASSWORD)
    return EMAIL_CONFIG_SET

def send_summary_email(
    service_1_summary: str,
    service_2_summary: str,
    top_2_relevant_patents:str,
    user_email: str,
    lawyer_email: str
):
    """
    Sends a combined summary email to the lawyer via Gmail SMTP over SSL.

    Notes for Gmail:
    - Enable 2-Step Verification on the sender account.
    - Create an App Password (Google Account → Security → App passwords).
    - Use smtp.gmail.com:465 with SSL.
    """
    if not EMAIL_CONFIG_SET:
        print("Email credentials (SENDER_EMAIL, SENDER_APP_PASSWORD) not set. Skipping email.")
        return

    print(f"\n[Service 3] Preparing to send email to {lawyer_email}...")

    # --- Build the email body ---
    email_body = f"""
    Dear Legal Team,

    Please find below a summary of a patent search conducted by {user_email}.

    ========================================================
    PART 1: INITIAL PATENT SEARCH
    ========================================================

    {service_1_summary}

    The top 2 patents numbers for reference:
    {top_2_relevant_patents}
    ========================================================
    PART 2: SEMANTIC COMPARISON
    ========================================================

    A user-provided document was semantically compared against the downloaded patents.
    Here is the AI-generated analysis of the comparison:

    {service_2_summary}


    ========================================================

    Please review these findings. You can reply directly to {user_email} with your assessment.

    Best regards,
    Patent Research Bot
    """

    # --- Create the Email Message Object (UTF-8 aware) ---
    msg = EmailMessage(policy=SMTPUTF8)
    msg["Subject"] = "Patent Research Summary for Review"
    msg["To"] = lawyer_email
    msg["From"] = SENDER_EMAIL
    msg.add_header("Reply-To", user_email)

    # Ensure body is UTF-8 safe (replace any problematic chars rather than fail)
    safe_body = email_body.encode("utf-8", "replace").decode("utf-8")
    msg.set_content(safe_body, subtype="plain", charset="utf-8")

    # --- Send the Email ---
    try:
        context = ssl.create_default_context()
        with smtplib.SMTP_SSL("smtp.gmail.com", 465, context=context) as server:
            server.login(SENDER_EMAIL, SENDER_APP_PASSWORD)
            # Ask server to accept UTF-8 (supported by Gmail)
            server.send_message(msg, mail_options=["SMTPUTF8"])
            print(f"[Service 3] Email successfully sent to {lawyer_email}!")
    except smtplib.SMTPException as e:
        print(f"[Service 3] Error: Unable to send email. {e}")
    except Exception as e:
        print(f"[Service 3] An unexpected error occurred during email sending: {e}")

# ---------------------------------------------------------------
# MAIN EXECUTION
# ---------------------------------------------------------------
if __name__ == "__main__":
    # If env vars are missing, prompt interactively once
    if not ensure_email_config():
        print("Invalid or missing email credentials. Skipping email service.")
    else:
        user_email = input("Please enter your email address (for the lawyer to reply to): ").strip()
        if not user_email or "@" not in user_email:
            print("Invalid email address. Skipping email service.")
        else:

            service_1 = llm_summary_object.summary_text  # noqa: F821


            service_2 = final_summary

            top2 = llm_summary_object.top_2_relevant_patents
            numbers = [p.patent_number for p in top2]
            numbers_str = ", ".join(numbers)

            # Send the email
            send_summary_email(
                service_1_summary=service_1,
                service_2_summary=service_2,
                top_2_relevant_patents=numbers_str,
                user_email=user_email,
                lawyer_email="alimhdyassine@gmail.com"
            )


Email credentials not found in environment. Enter them to proceed.
Sender Gmail address: aliyassinove@gmail.com
Gmail App Password (won’t echo): ··········
Please enter your email address (for the lawyer to reply to): aliyassinove@gmail.com

[Service 3] Preparing to send email to alimhdyassine@gmail.com...
[Service 3] Email successfully sent to alimhdyassine@gmail.com!


Gradio user interface

In [29]:
# ============================================
# Simple Chatbot + Gradio wiring (per your 8 steps)
# WITH SECURITY GUARDRAILS
# ============================================

import os
import json
import gradio as gr

# Expect these to be defined elsewhere in your project:
# OPENAI_CLIENT, search_and_summarize_patents, download_patents,
# load_st_model, ensure_email_config, send_summary_email,
# init_collection, populate_collection, run_semantic_comparison,
# EMAIL_CONFIG_SET

# --- OpenAI function for emailing the report ---
functions = [
    {
        "name": "send_patent_summary_email",
        "description": "Email the most recent patent search & comparison summary to a lawyer.",
        "parameters": {
            "type": "object",
            "properties": {
                "user_email":   {"type": "string", "description": "Your email (reply-to)."},
                "lawyer_email": {"type": "string", "description": "Lawyer/recipient email."}
            },
            "required": ["user_email", "lawyer_email"]
        },
    }
]

SAVE_DIRECTORY = "./fetched_patents"     # where top-2 patent PDFs are saved
CHROMA_DB_PATH = "my_patent_chroma_db"   # persistent Chroma path

# ============================================
# GUARDRAILS AND SECURITY
# ============================================

def check_guardrails(user_input: str) -> tuple[bool, str]:
    """
    Check if user input violates any guardrails.
    Returns: (is_blocked, reason_message)

    Protects against:
    - System prompt access/manipulation attempts
    - Restricted topics (cats, dogs, horoscopes, Taylor Swift)
    - Prompt injection attempts
    """
    if not user_input or not isinstance(user_input, str):
        return False, ""

    user_lower = user_input.lower()

    # ============================================
    # 1. SYSTEM PROMPT PROTECTION
    # ============================================
    system_prompt_keywords = [
        # Direct prompt requests
        "system prompt", "system message", "systemprompt",
        "show prompt", "reveal prompt", "display prompt",
        "what is your prompt", "show me your prompt",
        "what are your instructions", "show instructions",
        "reveal your instructions", "display instructions",
        "your system message", "initial instructions",
        "base prompt", "original prompt", "core instructions",

        # Instruction manipulation
        "ignore previous", "ignore all previous", "ignore instructions",
        "forget everything", "forget instructions", "disregard previous",
        "override instructions", "bypass instructions", "new instructions",
        "disregard instructions", "ignore above", "ignore earlier",

        # Role manipulation
        "pretend you are", "act as if", "roleplay as",
        "you are now", "from now on you are", "behave as",

        # Information extraction
        "what were you told", "what are your rules", "your guidelines",
        "your constraints", "your limitations", "your restrictions",
        "how were you programmed", "what is your role"
    ]

    for keyword in system_prompt_keywords:
        if keyword in user_lower:
            return True, (
                "⚠️ **Security Alert**: I cannot respond to requests about my system instructions "
                "or attempts to modify my behavior. Please focus on patent search and innovation analysis."
            )

    # ============================================
    # 2. RESTRICTED TOPICS
    # ============================================
    restricted_topics = {
        "Cats or Dogs": [
            "cat", "cats", "feline", "kitten", "kitty", "meow",
            "dog", "dogs", "canine", "puppy", "puppies", "bark", "woof"
        ],
        "Horoscopes or Zodiac Signs": [
            "horoscope", "horoscopes", "zodiac", "astrology", "astrological",
            "aries", "taurus", "gemini", "cancer", "leo", "virgo",
            "libra", "scorpio", "sagittarius", "capricorn", "aquarius", "pisces",
            "birth chart", "star sign", "sun sign", "moon sign"
        ],
        "Taylor Swift": [
            "taylor swift", "taylorswift", "t swift", "tswift", "t.swift",
            "taylor alison swift", "swiftie", "swifties"
        ]
    }

    # Check each restricted topic
    for topic_name, keywords in restricted_topics.items():
        for keyword in keywords:
            # Use word boundaries to avoid false positives
            if keyword in user_lower:
                # Additional check: make sure it's not part of a technical term
                # Allow technical exceptions (e.g., "catalyst" shouldn't trigger "cat")
                if len(keyword) <= 3:
                    # For short keywords, check for word boundaries
                    import re
                    pattern = r'\b' + re.escape(keyword) + r'\b'
                    if not re.search(pattern, user_lower):
                        continue

                return True, (
                    f"🚫 **Restricted Topic**: I cannot discuss **{topic_name}**. "
                    "This chatbot is designed specifically for patent search and innovation analysis. "
                    "Please describe your technical innovation or invention instead."
                )

    # ============================================
    # 3. PROMPT INJECTION PROTECTION
    # ============================================
    injection_patterns = [
        # Code blocks and formatting that might be injection attempts
        "```", "~~~",
        # Role markers
        "assistant:", "system:", "user:", "human:", "ai:",
        # Special tokens
        "<|", "|>", "[INST]", "[/INST]", "<s>", "</s>",
        # Template injection
        "{{", "}}", "${",
        # Script tags
        "<%", "%>", "<script", "</script>",
        # XML/HTML injection
        "<?", "?>",
        # Command separators
        "###assistant", "###system", "###user"
    ]

    for pattern in injection_patterns:
        if pattern in user_input:
            return True, (
                "⚠️ **Input Validation Error**: Your input contains special characters or patterns "
                "that cannot be processed. Please provide a plain text description of your innovation."
            )

    # ============================================
    # 4. ADDITIONAL SAFETY CHECKS
    # ============================================

    # Check for excessive length (potential DOS or injection)
    if len(user_input) > 5000:
        return True, (
            "⚠️ **Input Too Long**: Please keep your innovation description under 5000 characters. "
            "Focus on the key technical aspects of your invention."
        )

    # Check for excessive repetition (spam detection)
    words = user_lower.split()
    if len(words) > 10:
        from collections import Counter
        word_counts = Counter(words)
        most_common_word, count = word_counts.most_common(1)[0]
        if count > len(words) * 0.3:  # If any word appears more than 30% of the time
            return True, (
                "⚠️ **Input Validation Error**: Your input appears to contain excessive repetition. "
                "Please provide a clear, coherent description of your innovation."
            )

    return False, ""

def chatbot(user_input: str, history=[], last_summary=None):
    """
    Simplified flow with guardrails:
      - First check guardrails for security violations
      - Always treat the message as an innovation query (steps 2–4)
      - Run the patent search with rows=200, top_k=500
      - Download PDFs of the top-2
      - Prompt to upload user's PDF for comparison (steps 5–6)
      - Also support function-calling to send the report (steps 7–8)
    Returns: (reply, history, new_summary_state, upload_btn_visibility)
    """
    # ============================================
    # GUARDRAIL CHECK - FIRST LINE OF DEFENSE
    # ============================================
    is_blocked, block_message = check_guardrails(user_input)
    if is_blocked:
        history.append((user_input, block_message))
        return block_message, history, last_summary, gr.update(visible=False)

    messages = [
        {"role": "system",
         "content": ("You check whether an innovation already exists using the US patent database, "
                     "download the top-2 patent PDFs, compare the user's uploaded PDF against them, "
                     "summarize results, and can email a report to a lawyer via a function call." )}
    ]
    for u, a in history:
        if u is not None: messages.append({"role": "user", "content": u})
        if a is not None: messages.append({"role": "assistant", "content": a})
    messages.append({"role": "user", "content": user_input})

    upload_btn_visibility = gr.update(visible=False)

    # ---- Run the innovation search (rows=200, top_k=500) ----
    try:
        try:
            load_st_model()
        except NameError:
            pass

        llm_summary = search_and_summarize_patents(
            user_input,
            top_k_claims_for_llm_ranking=8,
            top_n_final_selection=2
        )

        if llm_summary:
            os.makedirs(SAVE_DIRECTORY, exist_ok=True)
            download_patents(llm_summary.top_2_relevant_patents, SAVE_DIRECTORY)

            reply_parts = [llm_summary.summary_text, "\n**Top 2 most relevant patents:**"]
            for i, p in enumerate(llm_summary.top_2_relevant_patents, start=1):
                reply_parts.append(
                    f"\n— **{i}. {p.title} (US{p.patent_number})**\n"
                    f"*Date:* {p.publication_date}\n"
                    f"*Inventor(s):* {p.inventor}\n"
                    f"*Assignee(s):* {p.assignee}\n"
                    f"*Link:* {p.patent_link}\n"
                    f"{'* PDF: ' + str(getattr(p, 'pdf_link', None)) if getattr(p, 'pdf_link', None) else '* PDF:* (not available)'}\n"
                    f"*Abstract:* {p.snippet}"
                )

            reply_parts.append(
                "\n---\n**Next:** Please **upload a PDF** describing your innovation and I'll compare it against these two patents."
            )
            reply_parts.append(
                "\n*After the comparison*: say `Send the report` and include `user_email` and `lawyer_email` "
                "to email the results (you may be prompted for server email credentials if not set)."
            )

            final_reply = "\n".join(reply_parts)
            history.append((user_input, final_reply))

            # Make upload button visible for step 5
            upload_btn_visibility = gr.update(visible=True)
            return final_reply, history, llm_summary, upload_btn_visibility

        else:
            msg = ("I couldn't retrieve enough patent information for your query. "
                   "Please add more technical detail (components, function, use-case).")
            history.append((user_input, msg))
            return msg, history, last_summary, upload_btn_visibility

    except Exception as e:
        err = f"An error occurred while searching patents: {e}"
        history.append((user_input, err))
        return err, history, last_summary, upload_btn_visibility

def _uploaded_file_path(file_obj):
    """
    Return a real, existing path from a Gradio upload (handles multiple shapes).
    Raises FileNotFoundError if no valid path is found.
    """
    # direct string path
    if isinstance(file_obj, str) and os.path.exists(file_obj):
        return file_obj

    # gradio File/TempFile-like objects
    for attr in ("name", "path"):
        p = getattr(file_obj, attr, None)
        if isinstance(p, str) and os.path.exists(p):
            return p

    # dict-like payloads (some gradio versions)
    if isinstance(file_obj, dict):
        for key in ("name", "path", "tempfile", "file"):
            p = file_obj.get(key)
            if isinstance(p, str) and os.path.exists(p):
                return p

    raise FileNotFoundError(f"Could not resolve upload path from: {repr(file_obj)}")

def handle_pdf_upload(pdf_file, history, summary_state):
    """
    Compare the uploaded PDF against the two saved patent PDFs and summarize results.
    """
    if not summary_state:
        msg = "Run a patent search first, then upload your PDF."
        history.append((None, msg))
        return history, gr.update(visible=False)

    try:
        os.makedirs(SAVE_DIRECTORY, exist_ok=True)
        collection = init_collection(path=CHROMA_DB_PATH)
        ok = populate_collection(collection, SAVE_DIRECTORY)
        if not ok:
            msg = "I couldn't prepare the two patent PDFs for comparison."
            history.append((f"(Uploaded {getattr(pdf_file, 'name', 'uploaded.pdf')})", msg))
            return history, gr.update(visible=False)
    except Exception as e:
        msg = f"Error preparing comparison DB: {e}"
        history.append((f"(Uploaded {getattr(pdf_file, 'name', 'uploaded.pdf')})", msg))
        return history, gr.update(visible=False)

    # ✅ robust path resolution
    try:
        uploaded_path = _uploaded_file_path(pdf_file)
    except Exception as e:
        msg = f"Upload error: couldn't read the file path ({e})."
        history.append((f"(Uploaded ?)", msg))
        return history, gr.update(visible=False)

    # Run the semantic comparison
    try:
        final_summary = run_semantic_comparison(collection, uploaded_path) or "No comparison results."
    except Exception as e:
        final_summary = f"Comparison failed: {e}"

    decorated = (
        f"**FINAL COMPARISON SUMMARY**\n\n"
        f"Uploaded: `{os.path.basename(str(uploaded_path))}`\n\n"
        f"{final_summary}\n\n"
        f"To email this report, say:\n"
        f"`Send the report user_email=you@example.com lawyer_email=counsel@example.com`"
    )
    history.append((f"(Uploaded {os.path.basename(str(uploaded_path))})", decorated))
    return history, gr.update(visible=False)

# -----------------------
# Gradio UI (with loading)
# -----------------------
with gr.Blocks(theme=gr.themes.Soft()) as demo:
    gr.Markdown("# 🤖 Innovation Patent Checker (Secured)")
    gr.Markdown(
        "1) Enter your innovation → 2) I search & summarize → 3) Upload your PDF → "
        "4) I compare vs top-2 patents → 5) Provide emails to send the report.\n\n"
        "⚠️ **Security Notice**: This system has guardrails to protect against unauthorized access "
        "and is restricted to patent analysis only."
    )

    last_summary = gr.State(None)
    chatbot_ui   = gr.Chatbot(label="Chatbot", height=520)
    user_input   = gr.Textbox(placeholder="Describe your innovation...", label="Your Idea")
    submit_btn   = gr.Button("Run Search", variant="primary")

    pdf_upload_btn = gr.UploadButton("Upload Your Innovation PDF", file_types=[".pdf"], visible=False)

    def respond(msg, history, summary_state):
        """
        Generator so the UI shows a 'Searching…' bubble while the long call runs in the background.
        """
        clear = ""
        temp = (history or []) + [(msg, "Searching the US patent database…")]
        yield clear, temp, summary_state, gr.update(visible=False)
        # Actual work:
        reply, updated_history, new_summary, upload_viz = chatbot(msg, history or [], summary_state)
        yield clear, updated_history, new_summary, upload_viz

    submit_btn.click(
        respond,
        inputs=[user_input, chatbot_ui, last_summary],
        outputs=[user_input, chatbot_ui, last_summary, pdf_upload_btn],
    )
    user_input.submit(
        respond,
        inputs=[user_input, chatbot_ui, last_summary],
        outputs=[user_input, chatbot_ui, last_summary, pdf_upload_btn],
    )

    pdf_upload_btn.upload(
        handle_pdf_upload,
        inputs=[pdf_upload_btn, chatbot_ui, last_summary],
        outputs=[chatbot_ui, pdf_upload_btn],
    )

    gr.Examples(
        examples=[
            "A catheter that can do both IVUS and OCT in a single pullback",
            "A wearable that measures glucose non-invasively with Raman spectroscopy",
        ],
        inputs=user_input,
    )

# ---- Optional warmups and launch ----
if __name__ == "__main__":
    try:
        load_st_model()
    except NameError:
        pass

    try:
        if not EMAIL_CONFIG_SET:
            print("Note: Email credentials not set; you'll be prompted if you send a report.")
    except NameError:
        pass

    demo.launch(share=False)

  chatbot_ui   = gr.Chatbot(label="Chatbot", height=520)


Loading SentenceTransformer 'all-MiniLM-L6-v2' on device: cpu
Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
Note: opening Chrome Inspector may crash demo inside Colab notebooks.
* To create a public link, set `share=True` in `launch()`.


<IPython.core.display.Javascript object>