<a href="https://colab.research.google.com/github/DevSecOps-Stack/simple-rag/blob/main/Welcome_to_Colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
%pip install pdfplumber pinecone openai

In [None]:
from google.colab import userdata

# Use Colab's Secrets Manager for API keys
PINECONE_API_KEY = userdata.get("PINECONE_API_KEY")
OPENAI_API_KEY = userdata.get("OPENAI_API_KEY")


In [35]:
import os
import uuid
import time
from typing import List, Dict

import pdfplumber
from pinecone import Pinecone, ServerlessSpec
# OpenAI SDK (>=1.0)
from openai import OpenAI
# Used to securely store your API key
from google.colab import userdata
import pinecone

print(f"Pinecone client version: {pinecone.__version__}")

# --------- CONFIG ---------
# Use Colab's Secrets Manager for API keys
PINECONE_API_KEY = userdata.get("PINECONE_API_KEY")
OPENAI_API_KEY = userdata.get("OPENAI_API_KEY")


INDEX_NAME = "document-index"
EMBED_MODEL = "text-embedding-3-small"   # 1536 dims
EMBED_DIM = 1536
GEN_MODEL = os.getenv("OPENAI_MODEL", "gpt-4o-mini")  # adjust if you prefer

# --------- CLIENTS ---------
pc = Pinecone(api_key=PINECONE_API_KEY)
client = OpenAI(api_key=OPENAI_API_KEY)

# --------- INDEX SETUP ---------
def ensure_index(name: str, dimension: int) -> None:
    """Create the Pinecone serverless index if it doesn't exist."""
    # Robust existence check
    try:
        pc.describe_index(name)
        return
    except Exception:
        pass

    pc.create_index(
        name=name,
        dimension=dimension,
        metric="cosine",
        spec=ServerlessSpec(cloud="aws", region="us-east-1")  # ✅ correct region
    )
    # Optionally wait a moment for readiness
    # New serverless indexes are usually ready very quickly
    for _ in range(30):
        try:
            desc = pc.describe_index(name)
            # Some SDK versions expose "status" with "ready"
            status = getattr(desc, "status", None) or getattr(desc, "index", {}).get("status")
            if (isinstance(status, dict) and status.get("ready")) or not status:
                break
        except Exception:
            pass
        time.sleep(1)

ensure_index(INDEX_NAME, EMBED_DIM)
index = pc.Index(INDEX_NAME)

# --------- UTILITIES ---------
def pdf_to_text(pdf_path: str) -> str:
    text = []
    with pdfplumber.open(pdf_path) as pdf:
        for page in pdf.pages:
            page_text = page.extract_text() or ""
            if page_text.strip():
                text.append(page_text)
    return "\n".join(text)

def chunk_text(text: str, max_words: int = 300, overlap: int = 50) -> List[str]:
    """
    Simple word-based chunker (approx token-safe).
    ~300 words ≈ 400–600 tokens depending on content.
    """
    words = text.split()
    chunks = []
    start = 0
    n = len(words)
    while start < n:
        end = min(start + max_words, n)
        chunk = " ".join(words[start:end])
        if chunk.strip():
            chunks.append(chunk)
        if end == n:
            break
        start = max(0, end - overlap)
    return chunks

def embed(texts: List[str]) -> List[List[float]]:
    # Batching recommended for speed; here we keep it simple
    resp = client.embeddings.create(model=EMBED_MODEL, input=texts)
    return [d.embedding for d in resp.data]

# --------- UPSERT ---------
def upsert_pdf(pdf_path: str, doc_id: str = None, namespace: str = None) -> List[str]:
    """
    - Extracts text from PDF
    - Chunks it
    - Embeds & upserts to Pinecone with metadata
    Returns list of vector IDs written.
    """
    full_text = pdf_to_text(pdf_path)
    if not full_text.strip():
        raise ValueError(f"No extractable text from: {pdf_path}")

    chunks = chunk_text(full_text, max_words=320, overlap=60)
    vecs = embed(chunks)

    written_ids = []
    to_upsert = []
    base_id = doc_id or str(uuid.uuid4())

    for i, (chunk, vec) in enumerate(zip(chunks, vecs)):
        vid = f"{base_id}::chunk::{i:04d}"
        to_upsert.append({
            "id": vid,
            "values": vec,
            "metadata": {
                "text": chunk,
                "source": os.path.basename(pdf_path),
                "doc_id": base_id,
                "chunk_index": i
            }
        })
        written_ids.append(vid)

    # Upsert in one go (split if you have thousands)
    index.upsert(vectors=to_upsert, namespace=namespace)
    return written_ids

# --------- QUERY ---------
def query_pinecone(query_text: str, top_k: int = 5, namespace: str = None) -> List[Dict]:
    q_vec = embed([query_text])[0]
    res = index.query(
        vector=q_vec,
        top_k=top_k,
        include_metadata=True,
        namespace=namespace
    )
    # SDK returns .matches as list of objects or dicts depending on version
    matches = getattr(res, "matches", None) or res.get("matches", [])
    return matches

# --------- GENERATION ---------
def generate_answer(query: str, context_chunks: List[str]) -> str:
    system = (
        "You answer strictly based on the provided context. "
        "If the answer isn't in context, say you don't know."
    )
    context = "\n\n".join(f"- {c}" for c in context_chunks)
    user = f"Context:\n{context}\n\nQuestion: {query}\n\nAnswer in 6-8 concise sentences."

    chat = client.chat.completions.create(
        model=GEN_MODEL,
        messages=[
            {"role": "system", "content": system},
            {"role": "user", "content": user},
        ],
        temperature=0.2,
    )
    return chat.choices[0].message.content.strip()

def answer_with_rag(query: str, top_k: int = 5, namespace: str = None) -> str:
    matches = query_pinecone(query, top_k=top_k, namespace=namespace)
    # Safely collect texts
    ctx = []
    for m in matches:
        md = getattr(m, "metadata", None) or m.get("metadata", {}) or {}
        t = md.get("text")
        if t:
            ctx.append(t)
    return generate_answer(query, ctx)

# --------- EXAMPLES ---------
if __name__ == "__main__":
    # 1) Ingest a PDF
    upsert_pdf("/content/OpenShift_Container_Platform-4.18-Architecture-en-US.pdf")

    # 2) Query & answer
    print("Querying…")
    q = "can you print entire page number 1"
    print(answer_with_rag(q))

Pinecone client version: 7.3.0




Querying…
I don't know.
