<a href="https://colab.research.google.com/github/hiteshove/B.-Pre-processing-Quality-Filters-/blob/main/Copy_of_Welcome_to_Colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Multimodal RAG Chatbot (Colab)
**What this notebook does:** ingest images/PDFs/audio/video → OCR/ASR/transcription → index with FAISS → answer queries using Gemini 2.5-flash (RAG) with source citations and related documents.

**Before you start:** have your Gemini 2.5-flash API key ready.


In [None]:
# Run this cell once at the top of the notebook.
# Installs python packages and system packages needed for OCR/FFmpeg.
!pip install -q google-generativeai pytesseract Pillow PyMuPDF faiss-cpu sentence-transformers transformers spacy networkx rdflib
# Italian spaCy model
!python -m spacy download it_core_news_lg
!pip install openai-whisper

# System packages (FFmpeg, Tesseract + Italian language pack if available)
# NOTE: Debian/Ubuntu names may vary; Colab should have ffmpeg and tesseract already but we ensure it.
!apt-get update -qq
!apt-get install -y -qq ffmpeg tesseract-ocr tesseract-ocr-ita || true

print("✅ Installs finished. If tesseract Italian data isn't present, OCR quality for Italian may be reduced.")


Collecting it-core-news-lg==3.8.0
  Downloading https://github.com/explosion/spacy-models/releases/download/it_core_news_lg-3.8.0/it_core_news_lg-3.8.0-py3-none-any.whl (567.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m567.9/567.9 MB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25h[38;5;2m✔ Download and installation successful[0m
You can now load the package via spacy.load('it_core_news_lg')
[38;5;3m⚠ Restart to reload dependencies[0m
If you are in a Jupyter or Colab notebook, you may need to restart Python in
order to load all the package's dependencies. You can do this by selecting the
'Restart kernel' or 'Restart runtime' option.
W: Skipping acquire of configured file 'main/source/Sources' as repository 'https://r2u.stat.illinois.edu/ubuntu jammy InRelease' does not seem to provide it (sources.list entry misspelt?)
✅ Installs finished. If tesseract Italian data isn't present, OCR quality for Italian may be reduced.


In [None]:
# Securely input your Gemini API key (it won't be printed)
from getpass import getpass
import google.generativeai as genai

GEMINI_KEY = getpass("Paste your Gemini 2.5-flash API key (hidden): ").strip()
if not GEMINI_KEY:
    raise SystemExit("Gemini API key is required to proceed.")

genai.configure(api_key=GEMINI_KEY)
gemini_model = genai.GenerativeModel("gemini-2.5-flash")
print("✅ Gemini configured.")


Paste your Gemini 2.5-flash API key (hidden): ··········
✅ Gemini configured.


In [None]:
# Core imports and workspace setup
import os, io, uuid, sqlite3, subprocess
from pathlib import Path
from datetime import datetime
from PIL import Image
import pytesseract
import fitz  # PyMuPDF
import numpy as np
from sentence_transformers import SentenceTransformer
import faiss
import spacy
import networkx as nx
import google.generativeai as genai  # already configured above

# Directories
os.makedirs("uploads", exist_ok=True)
os.makedirs("outputs", exist_ok=True)

# DB setup
conn = sqlite3.connect("metadata.db")
c = conn.cursor()
c.execute("""
CREATE TABLE IF NOT EXISTS assets (
    id TEXT PRIMARY KEY,
    filename TEXT,
    type TEXT,
    text_content TEXT,
    created_at TEXT
)
""")
conn.commit()

# Embedding model
embed_model = SentenceTransformer("all-MiniLM-L6-v2")
EMB_DIM = embed_model.get_sentence_embedding_dimension()

# FAISS files (persistence)
FAISS_INDEX_FILE = "outputs/faiss.index"
IDMAP_FILE = "outputs/id_map.npy"   # store python list as numpy object? we'll use pickle
import pickle
IDMAP_PICKLE = "outputs/id_map.pkl"

# NER model
nlp = spacy.load("it_core_news_lg")

# Graph
G = nx.Graph()

print("✅ Environment ready.")


✅ Environment ready.


In [None]:
import pytesseract, fitz, uuid, subprocess, os
from pathlib import Path
from datetime import datetime
from PIL import Image
import sqlite3
import google.generativeai as genai

# Setup SQLite DB
conn = sqlite3.connect("metadata.db")
c = conn.cursor()
c.execute("""
CREATE TABLE IF NOT EXISTS assets (
    id TEXT PRIMARY KEY,
    filename TEXT,
    type TEXT,
    text_content TEXT,
    created_at TEXT
)
""")
conn.commit()

# Gemini models
flash_model = genai.GenerativeModel("gemini-2.5-flash")
pro_model = genai.GenerativeModel("gemini-1.5-pro")  # needed for audio transcription

def save_asset(asset):
    c.execute("""
    INSERT OR REPLACE INTO assets (id, filename, type, text_content, created_at)
    VALUES (?, ?, ?, ?, ?)
    """, (asset["id"], asset["filename"], asset["type"], asset["text"], asset["created_at"]))
    conn.commit()

# IMAGE ingestion
def ingest_image(path):
    text = pytesseract.image_to_string(Image.open(path), lang="ita+eng")
    asset = {"id": str(uuid.uuid4()), "filename": Path(path).name, "type": "image",
             "text": text, "created_at": datetime.now().isoformat()}
    save_asset(asset)
    return asset

# PDF ingestion
def ingest_pdf(path):
    doc = fitz.open(path)
    text = "\n".join([p.get_text() for p in doc])
    asset = {"id": str(uuid.uuid4()), "filename": Path(path).name, "type": "document",
             "text": text, "created_at": datetime.now().isoformat()}
    save_asset(asset)
    return asset

# AUDIO ingestion (NEW with Whisper + Gemini fallback)
def ingest_audio(path):
    text = None
    try:
        import whisper
        model = whisper.load_model("small")
        result = model.transcribe(path, language="it")
        text = result["text"]
        print(f"✅ Transcribed with Whisper: {Path(path).name}")
    except ImportError:
        print(f"⚠ Whisper not installed → using Gemini Pro for {Path(path).name}")
        with open(path, "rb") as f:
            audio_data = f.read()
        resp = pro_model.generate_content([
            {"mime_type": "audio/wav", "data": audio_data},
            "Trascrivi questo audio in italiano"
        ])
        text = resp.text

    asset = {"id": str(uuid.uuid4()), "filename": Path(path).name, "type": "audio",
             "text": text, "created_at": datetime.now().isoformat()}
    save_asset(asset)
    return asset

# VIDEO ingestion (extract audio first, then transcribe)
def extract_audio_from_video(video_path, audio_out="temp_audio.wav"):
    cmd = f'ffmpeg -y -i "{video_path}" -vn -acodec pcm_s16le -ar 16000 -ac 1 "{audio_out}"'
    subprocess.call(cmd, shell=True)
    return audio_out

def ingest_video(path):
    audio_path = extract_audio_from_video(path)
    return ingest_audio(audio_path)

# ROUTER (decides which ingestion to use)
def ingest_file(path):
    ext = Path(path).suffix.lower()
    if ext in [".jpg", ".jpeg", ".png", ".tif", ".tiff"]:
        return ingest_image(path)
    elif ext == ".pdf":
        return ingest_pdf(path)
    elif ext in [".mp3", ".wav", ".m4a"]:
        return ingest_audio(path)
    elif ext in [".mp4", ".mov", ".avi", ".mkv"]:
        return ingest_video(path)
    else:
        print(f"⚠ Unsupported file: {ext}")
        return None

# Batch ingestion
def ingest_all(folder="uploads"):
    assets = []
    for f in os.listdir(folder):
        path = os.path.join(folder, f)
        print(f"→ Ingesting {f} ...")
        try:
            asset = ingest_file(path)
            if asset: assets.append(asset)
        except Exception as e:
            print(f"❌ Error ingesting {f}: {e}")
    print(f"\n📦 Total ingested: {len(assets)} files")
    return assets


In [None]:
# Use this cell to upload files from your local machine into the Colab 'uploads/' folder.
# It will open a file picker; you can select multiple files at once.
from google.colab import files
uploaded = files.upload()

for fn in uploaded.keys():
    dest = os.path.join("uploads", fn)
    # if already present, it will be overwritten
    with open(dest, "wb") as out:
        out.write(uploaded[fn])
    print(f"✔ Uploaded {fn} -> uploads/{fn}")

print("✅ Upload complete. Now run the batch ingestion cell.")


In [None]:
# Batch ingest everything in uploads/
def ingest_all(folder="uploads"):
    files = sorted(os.listdir(folder))
    ingested = []
    for f in files:
        path = os.path.join(folder, f)
        if os.path.isdir(path):
            continue
        print(f"→ Ingesting {f} ...")
        a = ingest_file(path)
        if a:
            print(f"  ✅ Ingested: {f} (id: {a['id'][:8]}) text length: {len(a['text'] or '')}")
            ingested.append(a)
        else:
            print(f"  ⚠ Skipped/failed: {f}")
    print(f"\n📦 Total ingested: {len(ingested)}")
    return ingested

all_assets = ingest_all("uploads")


In [None]:
# Build FAISS index from assets stored in SQLite (or load persisted index)
import pickle

def build_faiss_index(rebuild=False):
    # load all texts
    c.execute("SELECT id, text_content FROM assets")
    rows = c.fetchall()
    ids = []
    texts = []
    for r in rows:
        if r[1] and len(r[1].strip())>0:
            ids.append(r[0])
            texts.append(r[1])
    if len(texts) == 0:
        raise RuntimeError("No texts found in DB. Ingest files first.")
    # compute embeddings
    embs = embed_model.encode(texts, convert_to_numpy=True)
    # normalize for cosine similarity with inner product
    norms = np.linalg.norm(embs, axis=1, keepdims=True)
    embs = embs / (norms + 1e-10)
    # build index
    index = faiss.IndexFlatIP(embs.shape[1])
    index.add(embs)
    # save index and id map
    faiss.write_index(index, FAISS_INDEX_FILE)
    with open(IDMAP_PICKLE, "wb") as f:
        pickle.dump(ids, f)
    print(f"✅ Built FAISS index with {index.ntotal} vectors.")
    return index, ids, texts

# Build (call it)
index, id_map, texts = build_faiss_index()


In [None]:
# Build a NetworkX graph linking asset ids to named entities
def build_graph():
    G.clear()
    c.execute("SELECT id, filename, text_content FROM assets")
    for aid, fname, txt in c.fetchall():
        if not txt:
            continue
        G.add_node(aid, type="asset", label=fname)
        doc = nlp(txt)
        for ent in doc.ents:
            ent_text = ent.text.strip()
            if not ent_text:
                continue
            if not G.has_node(ent_text):
                G.add_node(ent_text, type="entity", label=ent_text, entity_type=ent.label_)
            G.add_edge(aid, ent_text, relation=f"mentions:{ent.label_}")
    print(f"✅ Graph built: {G.number_of_nodes()} nodes, {G.number_of_edges()} edges")

build_graph()


In [None]:
def search_top_k(question, k=3):
    # embed query + normalize
    q_emb = embed_model.encode([question], convert_to_numpy=True)
    q_emb = q_emb / (np.linalg.norm(q_emb, axis=1, keepdims=True) + 1e-10)
    D, I = index.search(q_emb, k)
    retrieved = []
    for i in I[0]:
        if i < 0:
            continue
        aid = id_map[i]
        row = c.execute("SELECT filename, text_content FROM assets WHERE id=?", (aid,)).fetchone()
        if row:
            retrieved.append({"id": aid, "filename": row[0], "text": row[1]})
    return retrieved

def find_related_docs(retrieved_assets):
    related = set()
    for a in retrieved_assets:
        doc = nlp(a["text"] or "")
        for ent in doc.ents:
            if ent.text in G:
                for neighbor in G.neighbors(ent.text):
                    if G.nodes[neighbor].get("type") == "asset" and neighbor != a["id"]:
                        related.add(G.nodes[neighbor]["label"])
    return sorted(list(related))

def rag_answer(question, k=3):
    retrieved = search_top_k(question, k=k)
    if not retrieved:
        return "Nessun documento rilevante trovato."

    # Build context snippet
    context_texts = []
    supports = []
    for r in retrieved:
        snippet = (r["text"] or "")[:1500]
        context_texts.append(f"[{r['filename']}] {snippet}")
        supports.append(r["filename"])

    prompt = f"""
Sei un assistente archivistico che risponde in italiano in modo conciso e preciso.
Domanda: {question}

Contesti recuperati (sintesi):
{"\n\n".join(context_texts)}

Istruzioni:
1) Fornisci una risposta breve e precisa in italiano.
2) Riporta il/i file che supportano la risposta usando i nomi file (ad esempio: Martiny_doc010.jpg).
3) Se ci sono altri documenti collegati (es. menzionano le stesse entità), elencali.

Risposta:
"""

    resp = gemini_model.generate_content(prompt)
    answer_text = resp.text.strip()
    related = find_related_docs(retrieved)
    if related:
        answer_text += "\n\nCollegato anche a: " + ", ".join(related)
    # Also add the direct supporting assets
    answer_text += "\n\nFonte principale: " + ", ".join(supports)
    return answer_text

# Example
print("Esempio:", rag_answer("Qual è il numero d’ordine citato nella lettera Martiny del 26 maggio 1914?", k=3))


In [None]:
# Use this cell to type a question interactively.
# Run it, then type your query and press Enter.

while True:
    q = input("\nScrivi la tua domanda (or type 'exit' to stop): ").strip()
    if q.lower() in ("exit", "quit"):
        break
    print("\n--- Generating answer (may take a few seconds) ---\n")
    print(rag_answer(q, k=4))


KeyboardInterrupt: Interrupted by user

In [None]:
from google.colab import files
# Save DB into outputs and download
files.download("metadata.db")
print("Downloaded metadata.db")
