In [None]:
import os
os.environ["OPENAI_API_KEY"] = ""
os.environ["PINECONE_API_KEY"] = ""

In [None]:
!pip install pypdf pinecone openai tiktoken orjson python-dotenv



In [None]:
import os, uuid, orjson
from typing import List, Dict, Any, Tuple
from dataclasses import dataclass

from pypdf import PdfReader
from openai import OpenAI
from pinecone import Pinecone, ServerlessSpec

try:
    import tiktoken
    ENCODER = tiktoken.get_encoding("cl100k_base")
except Exception:
    ENCODER = None


In [None]:

# -----------------------------
# Config
# -----------------------------
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "")
PINECONE_API_KEY = os.environ.get("PINECONE_API_KEY", "")
PINECONE_INDEX  = os.environ.get("PINECONE_INDEX", "health-rag-index")

EMBED_MODEL = "text-embedding-3-small"
GEN_MODEL   = "gpt-4o-mini"
RISK_MODEL  = GEN_MODEL

TOP_K = 8
MIN_SIM = 0.3
CHUNK_TOKENS = 1000
CHUNK_OVERLAP = 200
DEFAULT_NAMESPACE = "textbook_01"

client = OpenAI(api_key=OPENAI_API_KEY)
pc = Pinecone(api_key=PINECONE_API_KEY)

# ✅ New SDK: use .list_indexes().names instead of dict indexing
if PINECONE_INDEX not in pc.list_indexes().names():
    pc.create_index(
        name=PINECONE_INDEX,
        dimension=1536,
        metric="cosine",
        spec=ServerlessSpec(cloud="aws", region="us-east-1")
    )

# ✅ New SDK: Index is a method call
index = pc.Index(PINECONE_INDEX)



In [None]:

# -----------------------------
# Helpers
# -----------------------------
def num_tokens(s: str) -> int:
    if ENCODER:
        return len(ENCODER.encode(s))
    return max(1, len(s.split()) // 0.75)

@dataclass
class DocChunk:
    id: str
    text: str
    page: int
    meta: Dict[str, Any]


# -----------------------------
# PDF → Chunks → Pinecone
# -----------------------------
def pdf_to_pages(path: str) -> List[Tuple[int, str]]:
    reader = PdfReader(path)
    pages = []
    for i, p in enumerate(reader.pages, start=1):
        txt = p.extract_text() or ""
        txt = "\n".join(line.strip() for line in txt.splitlines())
        pages.append((i, txt))
    return pages

def chunk_text(text: str, page: int, tokens: int = CHUNK_TOKENS, overlap: int = CHUNK_OVERLAP) -> List[Tuple[str,int]]:
    words, out, buf = text.split(), [], []
    for w in words:
        buf.append(w)
        if num_tokens(" ".join(buf)) >= tokens:
            out.append((" ".join(buf), page))
            buf = buf[-overlap:]
    if buf:
        out.append((" ".join(buf), page))
    return out

def ingest_pdf(path: str, source: str, namespace: str = DEFAULT_NAMESPACE, year: int = 2024) -> int:
    pages = pdf_to_pages(path)
    chunks: List[DocChunk] = []
    for page, txt in pages:
        for chunk_txt, p in chunk_text(txt, page):
            cid = f"{source}-{p}-{uuid.uuid4().hex[:8]}"
            chunks.append(DocChunk(
                id=cid,
                text=chunk_txt,
                page=p,
                meta={"source": source, "page": p, "year": year, "namespace": namespace}
            ))
    # Embed and upsert
    texts = [c.text for c in chunks]
    embeds = []
    for i in range(0, len(texts), 128):
        resp = client.embeddings.create(model=EMBED_MODEL, input=texts[i:i+128])
        embeds.extend([d.embedding for d in resp.data])
    upserts = [{"id":c.id,"values":vec,"metadata":{**c.meta,"text":c.text}} for c,vec in zip(chunks,embeds)]
    for i in range(0,len(upserts),100):
        index.upsert(vectors=upserts[i:i+100], namespace=namespace)
    return len(chunks)


In [None]:

# -----------------------------
# Retrieval + QA
# -----------------------------
SYSTEM_PROMPT = (
    "You are a healthcare reference assistant for education.\n"
    "Rules: use ONLY provided sources, never guess; avoid diagnosis/dosing; cite like [Source, p.Page].\n"
)
RISK_PROMPT = (
    "Classify query: LOW, MED, HIGH risk.\n"
    "HIGH: emergencies, dosing, treatment; MED: conditions, criteria; LOW: definitions, anatomy.\n"
)
DISCLAIMER = "Educational use only. Not medical advice. Consult a clinician for personal health concerns."


In [None]:
def classify_risk(query: str) -> str:
    resp = client.chat.completions.create(model=RISK_MODEL,
        messages=[{"role":"system","content":"You classify risk."}, {"role":"user","content":f"{RISK_PROMPT}\nQuery: {query}"}],
        temperature=0)
    label = resp.choices[0].message.content.strip().upper()
    return label if label in {"LOW","MED","HIGH"} else "MED"

def retrieve(query: str, namespace: str = DEFAULT_NAMESPACE, k: int = TOP_K) -> List[Dict[str, Any]]:
    qvec = client.embeddings.create(model=EMBED_MODEL, input=[query]).data[0].embedding
    res = index.query(vector=qvec, top_k=k, include_metadata=True, namespace=namespace)
    hits = []
    for m in res.matches or []:
        if getattr(m,"score",1.0) >= MIN_SIM:
            hits.append({**(m.metadata or {}), "_score":m.score, "_id":m.id})
    return hits

def build_messages(query: str, ctx: List[Dict[str, Any]]):
    ctx_str = "\n---\n".join([f"[{c['source']}, p.{c['page']}]\n{c['text'][:1000]}" for c in ctx])
    user = f"Q: {query}\n\nContext:\n{ctx_str}\n\nAnswer and cite sources. Add Disclaimer line."
    return [{"role":"system","content":SYSTEM_PROMPT},{"role":"user","content":user}]

def answer_query(query: str, namespace: str = DEFAULT_NAMESPACE):
    risk = classify_risk(query)
    ctx = retrieve(query, namespace, TOP_K)
    if not ctx:
        return {"risk":risk, "answer":"Insufficient info in sources.", "citations":[], "disclaimer":DISCLAIMER}
    msgs = build_messages(query, ctx)
    resp = client.chat.completions.create(model=GEN_MODEL,messages=msgs,temperature=0.2)
    return {"risk":risk, "answer":resp.choices[0].message.content.strip(), "citations":[{"source":c['source'],"page":c['page']} for c in ctx], "disclaimer":DISCLAIMER}

In [None]:

# -----------------------------
# Fine-tune dataset builder
# -----------------------------
def build_ft_example(question: str, ctx: List[Dict[str, Any]]):
    cites = ", ".join([f"{c['source']}, p.{c['page']}" for c in ctx[:3]])
    answer = f"Answer: <your curated answer>\n\nCitations: [{cites}]\nDisclaimer: {DISCLAIMER}"
    return {"messages":[{"role":"system","content":SYSTEM_PROMPT},{"role":"user","content":question},{"role":"assistant","content":answer}]}

def write_jsonl(records: List[Dict], path: str):
    with open(path,"wb") as f:
        for r in records:
            f.write(orjson.dumps(r))
            f.write(b"\n")

In [None]:

# 1. Ingest a PDF textbook:
n = ingest_pdf("/content/5. Skin Cancer Author Lauren Queen.pdf", source="Harrison's (21e)", namespace="harrison21")
print("Chunks indexed", n)




Chunks indexed 29


In [None]:
# 2. Ask a question:
result = answer_query("I have a new dark mole that’s irregular in shape and sometimes bleeds. Could this be skin cancer?”", namespace="harrison21")
print(result)


{'risk': 'MED', 'answer': 'Based on the information provided, a new dark mole that is irregular in shape and sometimes bleeds could potentially be a sign of melanoma, which is a type of skin cancer. Dermatologists use the "ABCDE" mnemonic to assess moles for melanoma: Asymmetry, Borders, Color, Diameter, and Evolution over time. If a mole has irregular borders, varying colors, and a diameter greater than 6mm, it is recommended that it be biopsied for further testing [Harrison\'s (21e), p.5.0].\n\nIt is important to consult a dermatologist for a proper evaluation and diagnosis, as skin cancer can manifest in various ways and requires professional assessment.\n\n**Disclaimer:** This response is for informational purposes only and is not a substitute for professional medical advice, diagnosis, or treatment. Always seek the advice of your physician or other qualified health provider with any questions you may have regarding a medical condition.', 'citations': [{'source': "Harrison's (21e)"