In [1]:
import requests
from requests.auth import HTTPBasicAuth
import xml.etree.ElementTree as ET
import pandas as pd
import io
import re

# Import duckdb and SentenceTransformer
import duckdb
from sentence_transformers import SentenceTransformer, util # Import util for cosine similarity

# --- Configuration ---
DB_FILE = "aact.duckdb" # Make sure this file exists and contains the necessary tables

# ======================
# OPS CREDENTIALS
# ======================
CONSUMER_KEY = "NVQKCR82CwTZzGPaO6ogoq6WVlbS0HVy1SnytQ1gIqewF6cl"
CONSUMER_SECRET = "awuDXBZlb7Amy9dlRibwYzkAtCGCNHwnq4SY4NBXDCVw98K5FpFHHqXA746klglr"

# ======================
# LENS.ORG FETCH
# ======================
def fetch_lens_patent_data(lens_id: str) -> dict:
    """Fetches initial patent data from Lens.org"""
    url = f"https://www.lens.org/lens/export/patent?q=lens_id%3A%22{lens_id}%22&st=true"
    try:
        response = requests.post(url, timeout=30)
        response.raise_for_status()
        
        if response.text and len(response.text.splitlines()) > 1:
            df = pd.read_csv(io.StringIO(response.text))
            if not df.empty:
                row = df.iloc[0].to_dict()
                
                # Extract publication number and clean it
                pub_number = str(row.get("Publication Number", "")).strip()
                if not pub_number:
                    # Try Display Key as fallback
                    pub_number = str(row.get("Display Key", "")).replace(" ", "")

                return {
                    "lens_id": row.get("Lens ID", lens_id),
                    "publication_number": pub_number,
                    "title": str(row.get("Title", "")),
                    "abstract": str(row.get("Abstract", "")),
                    "claims": [], # Will be populated from EPO
                    "owners": str(row.get("Owners", "")),
                    "applicants": str(row.get("Applicants", "")),
                    "source": "Lens.org"
                }
        return {"lens_id": lens_id, "error": "Patent not found"}
    except requests.exceptions.RequestException as e:
        return {"lens_id": lens_id, "error": f"Request failed: {str(e)}"}

# ======================
# EPO OPS FETCH
# ======================
def get_access_token():
    url = "https://ops.epo.org/3.2/auth/accesstoken"
    data = {"grant_type": "client_credentials"}
    response = requests.post(url, data=data, auth=HTTPBasicAuth(CONSUMER_KEY, CONSUMER_SECRET))
    response.raise_for_status()
    return response.json()["access_token"]

def fetch_epo_patent_data(pub_number: str) -> dict:
    """Fetch title, abstract, and claims from EPO OPS"""
    try:
        token = get_access_token()
        headers = {"Authorization": f"Bearer {token}", "Accept": "application/xml"}

        # Clean and parse publication number
        pub_number = pub_number.replace("/", "").replace(" ", "")
        match = re.match(r"([A-Z]+)(\d+)([A-Z]\d*)", pub_number)
        if not match:
            return {"error": f"Invalid publication number format: {pub_number}"}
            
        country, number, kind = match.groups()
        docdb_number = f"{country}.{number}.{kind}"
        print(f"🔍 Using DOCDB number: {docdb_number}")

        # Fetch claims
        url_claims = f"https://ops.epo.org/3.2/rest-services/published-data/publication/docdb/{docdb_number}/claims"
        response = requests.get(url_claims, headers=headers, timeout=30)
        response.raise_for_status()
        claims = parse_claims(response.text)
        print(f"📄 Found {len(claims)} claims")

        # Fetch biblio data
        url_biblio = f"https://ops.epo.org/3.2/rest-services/published-data/publication/docdb/{docdb_number}/biblio"
        response = requests.get(url_biblio, headers=headers, timeout=30)
        response.raise_for_status()
        title, abstract = parse_biblio(response.text)
        print("📚 Retrieved bibliographic data")

        return {
            "title": title,
            "abstract": abstract,
            "claims": claims,
            "source": "EPO OPS"
        }
    except Exception as e:
        print(f"❌ EPO fetch error: {str(e)}")
        return {"error": str(e)}

# ======================
# PARSERS
# ======================
def parse_claims(xml_data: str) -> list:
    """Parse claims from EPO XML response"""
    ns = {"ftxt": "http://www.epo.org/fulltext"}
    try:
        root = ET.fromstring(xml_data)
        claims = []
        
        # Try both direct claim-text and nested claim structures
        for claim in root.findall(".//ftxt:claim", ns):
            texts = claim.findall(".//ftxt:claim-text", ns)
            claim_text = " ".join(t.text.strip() for t in texts if t.text)
            if claim_text and not claim_text.upper().startswith("WHAT IS CLAIMED IS"):
                claims.append(claim_text)
                
        if not claims:
            # Fallback: try to split by claim numbers
            text_elements = root.findall(".//ftxt:claim-text", ns)
            full_text = " ".join(t.text.strip() for t in text_elements if t.text)
            claims = [c.strip() for c in re.split(r"\s*\d+\.\s+", full_text) if c.strip()]
            
        return claims
    except ET.ParseError as e:
        print(f"❌ XML Parse error: {str(e)}")
        return []

def parse_biblio(xml_data: str) -> tuple:
    """Parse title and abstract from EPO XML response"""
    ns = {"ep": "http://www.epo.org/exchange"}
    try:
        root = ET.fromstring(xml_data)
        
        # Try to find English title first, then any title
        titles = root.findall(".//ep:invention-title", ns)
        title = ""
        for t in titles:
            if t.get("lang") == "en" and t.text:
                title = t.text.strip()
                break
        if not title and titles:
            title = titles[0].text.strip() if titles[0].text else ""

        # Get abstract
        abstract = ""
        abstract_elem = root.find(".//ep:abstract/ep:p", ns)
        if abstract_elem is not None and abstract_elem.text:
            abstract = abstract_elem.text.strip()

        return title, abstract
    except ET.ParseError as e:
        print(f"❌ XML Parse error: {str(e)}")
        return "", ""
# ======================
# INTEGRATED FETCHER
# ======================
def get_patent_metadata(lens_id: str) -> dict:
    """Main function to fetch and combine patent data"""
    print(f"🔍 Fetching patent data for Lens ID: {lens_id}")
    
    # First get data from Lens.org
    lens_data = fetch_lens_patent_data(lens_id)
    if "error" in lens_data:
        return lens_data

    # Try to enrich with EPO data if we have a publication number
    pub_number = lens_data.get("publication_number", "").strip()
    if pub_number:
        print(f"📑 Found publication number: {pub_number}")
        epo_data = fetch_epo_patent_data(pub_number)
        
        if "error" not in epo_data:
            # Combine data, preferring EPO where available
            lens_data.update({
                "title": epo_data.get("title") or lens_data.get("title", ""),
                "abstract": epo_data.get("abstract") or lens_data.get("abstract", ""),
                "claims": epo_data.get("claims", []),
                "source": "EPO OPS + Lens.org"
            })
        else:
            print("⚠️ EPO data fetch failed, using Lens.org data only")
            lens_data["source"] = "Lens.org only"
    else:
        print("⚠️ No publication number found, using Lens.org data only")
        lens_data["source"] = "Lens.org only"

    return lens_data

# ======================
# MATCHING + RANKING
# ======================

def normalize_name(name: str) -> str:
    if not isinstance(name, str): return ""
    name = name.lower()
    name = re.sub(r'[,.\s]*(inc|incorporated|ltd|llc|corp|gmbh|ag|plc|sa)\b.*$', '', name)
    name = re.sub(r'[^\w\s]', '', name)
    return name.strip()

def find_sponsor_matches(patent_data: dict) -> list:
    if "error" in patent_data: return []
    all_names_raw = []
    owners_str = str(patent_data.get("owners", ""))
    applicants_str = str(patent_data.get("applicants", ""))
    if owners_str: all_names_raw.extend(owners_str.split(';'))
    if applicants_str: all_names_raw.extend(applicants_str.split(';'))

    normalized_sponsors = { normalize_name(name) for name in all_names_raw if normalize_name(name) and 'nan' not in normalize_name(name) }
    print(f"[DEBUG] Normalized sponsor names for search: {normalized_sponsors}")

    if not normalized_sponsors: return []
    matches = []
    try:
        con = duckdb.connect(database=DB_FILE, read_only=True)
        # Ensure 'norm_name' column exists in 'sponsors' table or adjust query
        where_clauses = " OR ".join([f"lower(name) LIKE '%{s}%'" for s in normalized_sponsors])
        sql_query = f"""SELECT s.nct_id, s.official_title, s.start_date, sp.name AS sponsor_name FROM studies AS s JOIN sponsors AS sp ON s.nct_id = sp.nct_id WHERE ({where_clauses});"""
        print(f"[DEBUG] Executing Sponsor Match SQL Query:\n{sql_query}")
        results = con.execute(sql_query).fetchall()
        print(f"[DEBUG] Sponsor match query returned {len(results)} rows.")
        for row in results:
            matches.append({ "nct_id": row[0], "title": row[1], "start_date": row[2], "sponsor": row[3] })
    except Exception as e:
        print(f"[ERROR] An error occurred during database query: {e}")
    finally:
        if 'con' in locals(): con.close()
    return matches

def get_patent_full_text(patent_info: dict) -> str:
    # Ensure claims is a string or list of strings before joining
    claims_text = ""
    if isinstance(patent_info.get("claims"), list):
        claims_text = ". ".join(patent_info.get("claims"))
    elif isinstance(patent_info.get("claims"), str):
        claims_text = patent_info.get("claims")

    full_text = ". ".join(filter(None, [ # Use filter(None) to remove empty strings
        patent_info.get("title", ""),
        patent_info.get("abstract", ""),
        claims_text
    ]))
    print("\n[DEBUG] Aggregated Patent Text (first 300 chars):")
    print(full_text[:300] + "...")
    return full_text

def get_trial_full_text(nct_id: str, con) -> str:
    text_parts = []

    studies_res = con.execute(f"SELECT brief_title, official_title FROM studies WHERE nct_id = '{nct_id}'").fetchone()
    if studies_res: text_parts.extend(studies_res)

    summary_res = con.execute(f"SELECT description FROM brief_summaries WHERE nct_id = '{nct_id}'").fetchone()
    if summary_res: text_parts.extend(summary_res)

    desc_res = con.execute(f"SELECT description FROM detailed_descriptions WHERE nct_id = '{nct_id}'").fetchone()
    if desc_res: text_parts.extend(desc_res)

    int_res = con.execute(f"SELECT name, description FROM interventions WHERE nct_id = '{nct_id}'").fetchall()
    for row in int_res: text_parts.extend(row)

    cond_res = con.execute(f"SELECT mesh_term FROM browse_conditions WHERE nct_id = '{nct_id}'").fetchall()
    for row in cond_res: text_parts.extend(row)

    full_text = ". ".join([str(part) for part in text_parts if part])
    print(f"\n[DEBUG] Aggregated Trial Text for {nct_id} (first 300 chars):")
    print(full_text[:300] + "...")
    return full_text

def rank_by_semantic_similarity(sponsor_matches: list, patent_info: dict) -> list:
    print("\n[DEBUG] Initializing SentenceTransformer model...")
    model = SentenceTransformer('all-MiniLM-L6-v2')

    patent_text = get_patent_full_text(patent_info)
    print("[DEBUG] Encoding patent text into a vector...")
    patent_embedding = model.encode(patent_text, convert_to_tensor=True)

    ranked_trials = []
    con = duckdb.connect(database=DB_FILE, read_only=True)

    print("\n[DEBUG] Starting to process and score each candidate trial...")
    for trial in sponsor_matches:
        nct_id = trial['nct_id']
        print(f"\n--- Processing {nct_id} ---")
        trial_text = get_trial_full_text(trial['nct_id'], con)

        if not trial_text:
            trial['semantic_score'] = 0
            ranked_trials.append(trial)
            print(f"[DEBUG] No text found for {nct_id}. Score set to 0.")
            continue

        print(f"[DEBUG] Encoding trial text for {nct_id}...")
        trial_embedding = model.encode(trial_text, convert_to_tensor=True)

        cosine_score = util.pytorch_cos_sim(patent_embedding, trial_embedding).item()
        print(f"[DEBUG] Calculated Cosine Similarity Score for {nct_id}: {cosine_score:.4f}")

        trial['semantic_score'] = cosine_score
        ranked_trials.append(trial)

    con.close()

    return sorted(ranked_trials, key=lambda x: x['semantic_score'], reverse=True)

# ======================
# MAIN
# ======================
if __name__ == "__main__":
    lens_id = "023-587-103-542-962"  # replace with actual Lens ID
    patent_info = get_patent_metadata(lens_id)

    print("\n=== PATENT METADATA ===")
    print("📑 Source:", patent_info.get("source", "Unknown"))
    print("🔑 Publication:", patent_info.get("publication_number"))
    print("📝 Title:", patent_info.get("title"))
    print("📖 Abstract:", patent_info.get("abstract", "")[:200], "...")
    
    # Add detailed claims preview
    print("\n=== CLAIMS PREVIEW ===")
    claims = patent_info.get("claims", [])
    if isinstance(claims, list) and claims:
        for i, claim in enumerate(claims, 1):
            # Show first 50 characters of each claim
            preview = claim[:50].replace('\n', ' ').strip()
            print(f"Claim {i}: {preview}...")
    else:
        print("❌ No claims found in the patent data")

    if "error" not in patent_info:
        try:
            initial_matches = find_sponsor_matches(patent_info)
            ranked_trials = rank_by_semantic_similarity(initial_matches, patent_info)
            print("\n=== RANKED TRIALS ===")
            for trial in ranked_trials:
                print(f"  - NCT ID: {trial['nct_id']}, Score: {trial['semantic_score']:.4f}")
        except Exception as e:
            print(f"❌ An error occurred during matching/ranking: {e}")
    else:
        print(f"❌ Error: {patent_info.get('error', 'Unknown error')}")

  from .autonotebook import tqdm as notebook_tqdm


🔍 Fetching patent data for Lens ID: 023-587-103-542-962
📑 Found publication number: WO2024/167694A2
🔍 Using DOCDB number: WO.2024167694.A2
📄 Found 14 claims
📚 Retrieved bibliographic data

=== PATENT METADATA ===
📑 Source: EPO OPS + Lens.org
🔑 Publication: WO2024/167694A2
📝 Title: PLACENTAL EXTRACELLULAR MATRICES FOR OCULAR DELIVERY OF OPHTHALMIC THERAPEUTIC AGENTS
📖 Abstract: Provided herein are novel and useful pharmaceutical compositions comprising a placenta derived biomaterial and at least one ophthalmic therapeutic agent for treating an ocular disease or condition. ...

=== CLAIMS PREVIEW ===
Claim 1: WHAT IS CLAIMED IS:...
Claim 2: A pharmaceutical composition for treating an ocula...
Claim 3: The pharmaceutical composition of Claim 1, wherein...
Claim 4: The pharmaceutical composition of Claim 2, wherein...
Claim 5: The composition of Claim 2, wherein the JAK inhibi...
Claim 6: The pharmaceutical composition of Claim 2, wherein...
Claim 7: The pharmaceutical composition of Clai

  return forward_call(*args, **kwargs)


[DEBUG] Calculated Cosine Similarity Score for NCT02264288: 0.3945

--- Processing NCT01440192 ---

[DEBUG] Aggregated Trial Text for NCT01440192 (first 300 chars):
Safety of Intravenous Infusion of Human Placenta-Derived Cells (PDA001) for the Treatment of Adults With Stage II or III Pulmonary Sarcoidosis. A Phase 1B, Multi-Center, Open-Label, Single Dose Study to Evaluate the Safety of Intravenous Infusion of Human Placental-Derived Cells (PDA001) for the Tre...
[DEBUG] Encoding trial text for NCT01440192...
[DEBUG] Calculated Cosine Similarity Score for NCT01440192: 0.3715

--- Processing NCT01859117 ---

[DEBUG] Aggregated Trial Text for NCT01859117 (first 300 chars):
Study of Human Placenta-derived Cells (PDA002) to Evaluate the Safety and Effectiveness in Subjects With PAD and DFU. A Phase 1 Multicenter, Open-Label, Dose-Escalation Study to Evaluate the Safety and Efficacy of Intramuscular Injection of Human Placenta-Derived Cells (PDA-002) in Subjects With Per...
[DEBUG] Encodin