<a href="https://colab.research.google.com/github/footinheaven1985/Final_Project_AI/blob/main/DataClassifier_21Jan2026.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [8]:
# ============================================
# Colab Gradio App:
# 1) Upload + index reference docs into Pinecone (dim=1536)
# 2) Upload a new doc
# 3) Similarity search + classification + CITATIONS (top matches)
# ============================================

!pip -q install "pinecone-client>=3.0.0" openai pypdf python-docx gradio pandas

import os, re, uuid, time
from typing import List, Dict, Tuple
from collections import Counter, defaultdict

import pandas as pd
import gradio as gr
from google.colab import userdata # Import userdata for secrets

from pinecone import Pinecone, ServerlessSpec
from pypdf import PdfReader
import docx
from openai import OpenAI

# ---------------- CONFIG ----------------
# IMPORTANT: Replace "PASTE_PINECONE_KEY" with your actual Pinecone API key.
# For better security, store it in Colab Secrets and use userdata.get("PINECONE_API_KEY")
PINECONE_API_KEY = userdata.get("PINECONE_API_KEY") or "PASTE_PINECONE_KEY"
PINECONE_INDEX_NAME = os.getenv("PINECONE_INDEX_NAME") or "your-index-name"

# IMPORTANT: Replace "PASTE_OPENAI_KEY" with your actual OpenAI API key.
# For better security, store it in Colab Secrets and use userdata.get("OPENAI_API_KEY")
OPENAI_API_KEY = userdata.get("OPENAI_API_KEY") or "PASTE_OPENAI_KEY"

EMBED_MODEL = "text-embedding-3-small"  # 1536 dims ‚úÖ
INDEX_DIMENSION = 1536
METRIC = "cosine"

LABEL_KEY = "label"      # metadata label field
TOP_K = 5                # nearest neighbors per chunk query

# Chunking (character-based; easy + works decently)
CHUNK_SIZE = 1200
CHUNK_OVERLAP = 200

# Default namespace for runs
DEFAULT_NAMESPACE = "doc_classification"
# --------------------------------------


# ============== Text extraction ==============
def extract_text_from_pdf(path: str) -> str:
    reader = PdfReader(path)
    pages = [(p.extract_text() or "") for p in reader.pages]
    return "\n".join(pages)

def extract_text_from_txt(path: str) -> str:
    with open(path, "r", encoding="utf-8", errors="ignore") as f:
        return f.read()

def extract_text_from_docx(path: str) -> str:
    d = docx.Document(path)
    return "\n".join(p.text for p in d.paragraphs)

def normalize_text(text: str) -> str:
    return re.sub(r"\s+", " ", text).strip()

def load_file_as_text(path: str) -> str:
    ext = os.path.splitext(path)[1].lower()
    if ext == ".pdf":
        text = extract_text_from_pdf(path)
    elif ext == ".txt":
        text = extract_text_from_txt(path)
    elif ext == ".docx":
        text = extract_text_from_docx(path)
    else:
        raise ValueError(f"Unsupported file type: {ext}. Use PDF/TXT/DOCX.")
    return normalize_text(text)


# ============== Chunking ==============
def chunk_text(text: str, chunk_size: int = CHUNK_SIZE, overlap: int = CHUNK_OVERLAP) -> List[str]:
    if len(text) <= chunk_size:
        return [text]
    chunks = []
    start = 0
    while start < len(text):
        end = min(len(text), start + chunk_size)
        chunks.append(text[start:end])
        if end == len(text):
            break
        start = max(0, end - overlap)
    return chunks


# ============== Embeddings ==============
def embed_texts(texts: List[str]) -> List[List[float]]:
    client = OpenAI(api_key=OPENAI_API_KEY)
    resp = client.embeddings.create(model=EMBED_MODEL, input=texts)
    return [d.embedding for d in resp.data]


# ============== Pinecone: ensure index ==============
def ensure_pinecone_index():
    pc = Pinecone(api_key=PINECONE_API_KEY)
    existing = [i["name"] for i in pc.list_indexes()]

    if PINECONE_INDEX_NAME not in existing:
        pc.create_index(
            name=PINECONE_INDEX_NAME,
            dimension=INDEX_DIMENSION,
            metric=METRIC,
            spec=ServerlessSpec(cloud="aws", region="us-east-1"),
        )
        # wait until ready
        while True:
            desc = pc.describe_index(PINECONE_INDEX_NAME)
            if desc.status.get("ready"):
                break
            time.sleep(2)
    else:
        desc = pc.describe_index(PINECONE_INDEX_NAME)
        if desc.dimension != INDEX_DIMENSION:
            raise ValueError(f"Index dimension mismatch: index={desc.dimension} vs expected={INDEX_DIMENSION}")

    return pc.Index(PINECONE_INDEX_NAME)


# ============== Indexing ==============
def parse_label_map(label_map_text: str) -> Dict[str, str]:
    """
    Expects lines like:
      file1.pdf,Invoice
      contract.docx,Contract
    Filenames must match the uploaded files' base names.
    """
    mapping = {}
    for line in (label_map_text or "").splitlines():
        line = line.strip()
        if not line:
            continue
        if "," not in line:
            raise ValueError(f"Bad line (missing comma): {line}")
        fname, label = [x.strip() for x in line.split(",", 1)]
        if not fname or not label:
            raise ValueError(f"Bad line (empty filename or label): {line}")
        mapping[fname] = label
    return mapping

def upsert_reference_files(index, files_list: List[str], label_map: Dict[str, str], namespace: str) -> Tuple[int, List[str]]:
    """
    Upserts chunks of each file with metadata including label and citations fields.
    Returns: (num_vectors, warnings)
    """
    warnings = []
    to_upsert = []

    for path in files_list:
        base = os.path.basename(path)
        if base not in label_map:
            warnings.append(f"‚ö†Ô∏è No label provided for '{base}' ‚Äî skipped.")
            continue

        label = label_map[base]
        text = load_file_as_text(path)
        if len(text) < 30:
            warnings.append(f"‚ö†Ô∏è Very little text in '{base}' ‚Äî skipped.")
            continue

        chunks = chunk_text(text)
        vecs = embed_texts(chunks)

        for i, (chunk, vec) in enumerate(zip(chunks, vecs)):
            vec_id = f"{base}::{uuid.uuid4().hex[:10]}::{i}"
            md = {
                LABEL_KEY: label,
                "source_file": base,
                "chunk_id": i,
                "text_preview": chunk[:300],  # used for citations
            }
            to_upsert.append((vec_id, vec, md))

    # Batch upsert
    batch_size = 100
    for i in range(0, len(to_upsert), batch_size):
        index.upsert(vectors=to_upsert[i:i+batch_size], namespace=namespace)

    return len(to_upsert), warnings


# ============== Similarity search + classification + citations ==============
def query_similar(index, vec: List[float], top_k: int, namespace: str) -> List[dict]:
    res = index.query(vector=vec, top_k=top_k, include_metadata=True, namespace=namespace)
    return res.get("matches", [])

def classify_and_cite(matches: List[dict], label_key: str = LABEL_KEY) -> Tuple[str, float, pd.DataFrame, str]:
    """
    Returns: predicted_label, confidence, citations_df, citations_markdown
    Confidence = weighted vote share by similarity score.
    Citations = top matches with score + preview + source file + chunk_id.
    """
    rows = []
    weighted = defaultdict(float)

    for m in matches:
        md = m.get("metadata") or {}
        label = md.get(label_key)
        score = float(m.get("score", 0.0))
        if not label:
            continue
        weighted[label] += score
        rows.append({
            "score": score,
            "label": label,
            "source_file": md.get("source_file"),
            "chunk_id": md.get("chunk_id"),
            "text_preview": md.get("text_preview", "")
        })

    if not rows:
        empty_df = pd.DataFrame(columns=["score", "label", "source_file", "chunk_id", "text_preview"])
        return (None, 0.0, empty_df, "No labeled matches found. Make sure your indexed vectors include metadata labels.")

    df = pd.DataFrame(rows).sort_values("score", ascending=False).reset_index(drop=True)

    winner = max(weighted.items(), key=lambda x: x[1])[0]
    total = sum(weighted.values()) or 1.0
    confidence = weighted[winner] / total

    # Build a readable citation section (top 8 rows)
    topn = min(8, len(df))
    cite_lines = [f"### Citations (Top {topn} matches)"]
    for i in range(topn):
        r = df.iloc[i]
        preview = (r["text_preview"] or "").replace("\n", " ")
        if len(preview) > 220:
            preview = preview[:220] + "‚Ä¶"
        cite_lines.append(
            f"**{i+1}.** score={r['score']:.4f} ‚Ä¢ **{r['label']}** ‚Ä¢ `{r['source_file']}` (chunk {r['chunk_id']})\n\n> {preview}\n"
        )

    # Also show vote breakdown
    breakdown = sorted(weighted.items(), key=lambda x: x[1], reverse=True)
    breakdown_md = "\n".join([f"- **{lbl}**: {w:.4f}" for lbl, w in breakdown])
    cite_lines.append("### Vote breakdown (sum of similarity scores)")
    cite_lines.append(breakdown_md)

    return winner, confidence, df, "\n".join(cite_lines)


# ============== Gradio actions ==============
INDEX = ensure_pinecone_index()

def index_reference_docs(ref_files, label_map_text, namespace):
    if not namespace:
        namespace = DEFAULT_NAMESPACE

    if not ref_files:
        return "‚ùå Please upload at least one reference file.", None

    # Gradio File objects can be dict-like or have .name depending on version
    paths = []
    for f in ref_files:
        if isinstance(f, str):
            paths.append(f)
        else:
            # gradio typically provides a tempfile path at f.name
            paths.append(getattr(f, "name", None) or f.get("name"))

    label_map = parse_label_map(label_map_text)
    num_vecs, warnings = upsert_reference_files(INDEX, paths, label_map, namespace)

    msg = [f"‚úÖ Indexed **{num_vecs}** vectors into Pinecone.", f"**Index:** `{PINECONE_INDEX_NAME}`", f"**Namespace:** `{namespace}`"]
    if warnings:
        msg.append("\n".join(warnings))
    return "\n\n".join(msg), pd.DataFrame({"uploaded_files": [os.path.basename(p) for p in paths]})


def classify_uploaded_doc(query_file, namespace, top_k):
    if not namespace:
        namespace = DEFAULT_NAMESPACE
    if not query_file:
        return "‚ùå Please upload a file to classify.", "", None

    path = query_file if isinstance(query_file, str) else getattr(query_file, "name", None) or query_file.get("name")

    text = load_file_as_text(path)
    if len(text) < 30:
        return "‚ö†Ô∏è Extracted text is very short; classification may be unreliable.", "", None

    chunks = chunk_text(text)
    vecs = embed_texts(chunks)

    all_matches = []
    for v in vecs:
        all_matches.extend(query_similar(INDEX, v, top_k=int(top_k), namespace=namespace))

    pred, conf, cite_df, cite_md = classify_and_cite(all_matches, LABEL_KEY)

    if pred is None:
        headline = "‚ùå Could not classify (no labeled matches found)."
    else:
        headline = f"‚úÖ **Predicted label:** {pred}\n\n**Confidence:** {conf:.2f}"

    return headline, cite_md, cite_df


# ============== Gradio UI ==============
with gr.Blocks(title="Pinecone Similarity Document Classifier (with Citations)") as demo:
    gr.Markdown(
        """
# Pinecone Similarity Document Classifier üîéüìÑ

This app lets you:
1) Upload **reference documents** (already labeled) ‚Üí **index** them into Pinecone
2) Upload a **new document** ‚Üí **similarity search** ‚Üí get predicted **label** + **citations**

**How to provide labels for reference docs:**
In the box, add one line per file:
`filename.pdf,Invoice`
`contract.docx,Contract`

Supported: **PDF / TXT / DOCX**
        """
    )

    with gr.Tab("1) Index Reference Docs"):
        namespace_in = gr.Textbox(value=DEFAULT_NAMESPACE, label="Pinecone Namespace (optional)")
        ref_files = gr.File(file_count="multiple", label="Upload reference docs (PDF/TXT/DOCX)")
        label_map = gr.Textbox(
            label="Labels mapping (one per line: filename,label)",
            lines=6,
            placeholder="example:\ninvoice1.pdf,Invoice\ninvoice2.pdf,Invoice\nnda.docx,Contract"
        )
        index_btn = gr.Button("Index Reference Docs")
        index_status = gr.Markdown()
        indexed_files_df = gr.Dataframe(label="Uploaded Reference Files (for your confirmation)", interactive=False)

        index_btn.click(
            fn=index_reference_docs,
            inputs=[ref_files, label_map, namespace_in],
            outputs=[index_status, indexed_files_df]
        )

    with gr.Tab("2) Classify New Doc (with citations)"):
        namespace_in2 = gr.Textbox(value=DEFAULT_NAMESPACE, label="Pinecone Namespace (must match indexing)")
        topk_in = gr.Slider(1, 20, value=TOP_K, step=1, label="Top K neighbors per chunk")
        query_file = gr.File(file_count="single", label="Upload file to classify (PDF/TXT/DOCX)")
        classify_btn = gr.Button("Classify")
        result_md = gr.Markdown()
        citations_md = gr.Markdown()
        citations_df = gr.Dataframe(label="Citations Table (sorted by score)", interactive=False)

        classify_btn.click(
            fn=classify_uploaded_doc,
            inputs=[query_file, namespace_in2, topk_in],
            outputs=[result_md, citations_md, citations_df]
        )

demo.launch(share=True)

Colab notebook detected. To show errors in colab notebook, set debug=True in launch()
* Running on public URL: https://a788f9e234d8eb4b4b.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


