<a href="https://colab.research.google.com/github/Vai-Ram/Convolve-4.0-MAS---TrustLens/blob/main/TrustLens_Convolve.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install timm flash_attn einops

In [None]:
# 1. Install Core AI & Database Libraries
!pip install -q \
    qdrant-client \
    sentence-transformers \
    deepface \
    albumentations \
    accelerate \
    opencv-python-headless

# 2. Install LlamaIndex (For Metadata Structuring)
!pip install -q \
    llama-index-core \
    llama-index-llms-openai-like \
    llama-index-embeddings-huggingface


print("‚úÖ All Dependencies Installed.")

In [None]:
!pip install transformers==4.49.0 accelerate

In [None]:
import subprocess
import time
import os

# 1. FIX: Install missing dependency 'zstd'
print("üîß Installing dependencies...")
!sudo apt-get install -y zstd

# 2. Install Ollama
print("‚è≥ Installing Ollama...")
!curl -fsSL https://ollama.com/install.sh | sh

# 3. Start the Server in the Background
print("üöÄ Starting Ollama Server...")
process = subprocess.Popen(["ollama", "serve"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)

# 4. Wait for it to spin up
print("‚è≥ Waiting 10 seconds for server to initialize...")
time.sleep(10)

# 5. Pull the Model
print("‚¨áÔ∏è Downloading Model (gemma3:4b)...")
!ollama pull gemma3:4b

print("‚úÖ Setup Complete! Now you can run the TextAgent code.")

In [None]:
!pip install llama-index-llms-ollama


In [None]:
# --- 1. IMPORTS & CONFIGURATION ---
import os
import cv2
import numpy as np
import uuid
import json
import re
import torch
import albumentations as A
from PIL import Image

# TrustLens Core Imports
from qdrant_client import QdrantClient, models
from sentence_transformers import SentenceTransformer, util
from deepface import DeepFace

# Florence-2 Imports
from transformers import AutoProcessor, AutoModelForCausalLM, AutoConfig

# LlamaIndex Imports
from pydantic import BaseModel, Field
from typing import Optional, Literal
from llama_index.core import Settings, Document, VectorStoreIndex
from llama_index.embeddings.huggingface import HuggingFaceEmbedding
from llama_index.llms.ollama import Ollama
from google.colab import userdata

# Configuration Constants
PAN_TEMPLATE_PATH = "/content/convolve_pan_template.jpeg"
DB_TEMPLATES = "trustlens_templates"
DB_NETWORK = "trustlens_network"
VECTOR_SIZE = 512
DB_PATH = "/content/qdrant_db"

# --- 2. MODEL INITIALIZATION ---

# A. Embedding Model
print("üß† Loading Embedding Model (BAAI/bge-base-en-v1.5)...")
Settings.embed_model = HuggingFaceEmbedding(
    model_name="BAAI/bge-base-en-v1.5"
)

# B. LLM (Switched to Gemma 3 4B as requested)
print("‚ú® Initializing Gemma 3 4B (Ollama)...")
Settings.llm = Ollama(
    model="gemma3:4b",   # <--- Updated to 4B
    request_timeout=300.0,
    json_mode=True,
    temperature=0.1
)

# Schema (Kept Robust with Optional)
class PanCard(BaseModel):
    doc_type: Literal["pan_card"] = "pan_card"
    full_name: Optional[str] = Field(description="Full name")
    fathers_name: Optional[str] = Field(description="Father's name")
    date_of_birth: Optional[str] = Field(description="DOB (DD/MM/YYYY)")
    pan_number: Optional[str] = Field(description="PAN Number")

# --- 3. TRUSTLENS PIPELINE CLASS ---
if 'GLOBAL_QDRANT_CLIENT' not in globals():
    GLOBAL_QDRANT_CLIENT = None

class TrustLensPipeline:
    def __init__(self):
        print("üöÄ Initializing TrustLens (Hybrid w/ Florence-2 + Gemma 3 4B)...")
        self.qdrant = self._get_or_create_qdrant_client()

        # A. VISUAL AGENT (CLIP)
        print("üß† Loading CLIP (Visual Semantics)...")
        self.clip = SentenceTransformer('clip-ViT-B-32')
        self.doc_labels = ["Indian PAN Card", "Aadhaar Card", "Driving License", "Random Object"]
        self.label_embeddings = self.clip.encode(self.doc_labels, convert_to_tensor=True)

        # B. OCR AGENT (Florence-2)
        print("üëÅÔ∏è Loading Florence-2 (Vision-Language Model)...")
        self.ocr_model_id = "microsoft/Florence-2-large"

        # Load Config & Patch
        config = AutoConfig.from_pretrained(self.ocr_model_id, trust_remote_code=True)
        if not hasattr(config, 'forced_bos_token_id'):
            setattr(config, 'forced_bos_token_id', 1)
        if not hasattr(config.__class__, 'forced_bos_token_id'):
            setattr(config.__class__, 'forced_bos_token_id', 1)

        # Load Model
        self.ocr_model = AutoModelForCausalLM.from_pretrained(
            self.ocr_model_id,
            config=config,
            trust_remote_code=True,
            torch_dtype=torch.float16
        ).eval().cuda()

        self.ocr_processor = AutoProcessor.from_pretrained(
            self.ocr_model_id,
            trust_remote_code=True
        )

        # C. BIOMETRIC AGENT
        print("üë§ Initializing FaceNet...")
        try:
            DeepFace.build_model("Facenet512")
        except:
            pass

        # D. AUGMENTATION
        self.aug = A.Compose([
            A.Rotate(limit=10, p=0.8),
            A.RandomBrightnessContrast(p=0.5),
            A.GaussNoise(var_limit=(10.0, 30.0), p=0.4),
            A.Perspective(scale=(0.02, 0.05), p=0.4)
        ])

        self._setup_databases()
        print("‚úÖ System Ready.")

    def _get_or_create_qdrant_client(self):
        global GLOBAL_QDRANT_CLIENT
        if GLOBAL_QDRANT_CLIENT is not None: return GLOBAL_QDRANT_CLIENT
        client = QdrantClient(path=DB_PATH)
        GLOBAL_QDRANT_CLIENT = client
        return client

    def _setup_databases(self):
        self.qdrant.recreate_collection(
            collection_name=DB_TEMPLATES,
            vectors_config=models.VectorParams(size=VECTOR_SIZE, distance=models.Distance.COSINE)
        )
        if not self.qdrant.collection_exists(DB_NETWORK):
            self.qdrant.recreate_collection(
                collection_name=DB_NETWORK,
                vectors_config=models.VectorParams(size=VECTOR_SIZE, distance=models.Distance.COSINE)
            )
            self.qdrant.create_payload_index(DB_NETWORK, "pan_number", models.PayloadSchemaType.KEYWORD)

    def get_embedding(self, image_source):
        if isinstance(image_source, str): img = Image.open(image_source)
        else: img = image_source
        return self.clip.encode(img).tolist()

    # --- STEP 1: READ TEXT (Florence-2) ---
    def read_text_with_florence(self, image_path):
        """Uses Florence-2 to extract all text from the image."""
        print("   üëÅÔ∏è Florence-2: Reading Text...")

        image = Image.open(image_path)
        if image.mode != "RGB":
            image = image.convert("RGB")

        task_prompt = "<OCR>"

        inputs = self.ocr_processor(text=task_prompt, images=image, return_tensors="pt").to("cuda", torch.float16)

        generated_ids = self.ocr_model.generate(
            input_ids=inputs["input_ids"],
            pixel_values=inputs["pixel_values"],
            max_new_tokens=1024,
            do_sample=False,
            num_beams=3,
        )

        generated_text = self.ocr_processor.batch_decode(generated_ids, skip_special_tokens=False)[0]

        parsed_answer = self.ocr_processor.post_process_generation(
            generated_text,
            task=task_prompt,
            image_size=(image.width, image.height)
        )

        return parsed_answer['<OCR>']

    # --- STEP 2: STRUCTURE METADATA (GEMMA 3) ---
    def structure_metadata_with_gemma(self, raw_text):
        """Uses Gemma 3 to parse the raw text into JSON."""
        print("   ‚ú® Gemma 3: Structuring Metadata...")

        try:
            doc = Document(text=raw_text)
            index = VectorStoreIndex.from_documents([doc])
            query_engine = index.as_query_engine()

            prompt = (
                f"Raw Text: \"{raw_text}\"\n\n"
                "You are an expert ID card data extractor. Extract the details from the Raw Text above into a JSON object.\n"
                "Strictly follow this JSON schema:\n"
                "{\n"
                "  \"full_name\": \"string or null\",\n"
                "  \"fathers_name\": \"string or null\",\n"
                "  \"date_of_birth\": \"DD/MM/YYYY or null\",\n"
                "  \"pan_number\": \"10 character string or null\"\n"
                "}\n"
                "Output ONLY the valid JSON object. Do not add markdown formatting or explanations."
            )

            response = query_engine.query(prompt)
            response_text = str(response).strip()

            match = re.search(r'\{.*\}', response_text, re.DOTALL)
            if match:
                clean_json = match.group(0)
            else:
                clean_json = response_text.replace("```json", "").replace("```", "").strip()

            data_dict = json.loads(clean_json)

            # Robustness check
            if "pan_number" not in data_dict:
                print(f"   ‚ö†Ô∏è Metadata warning: 'pan_number' missing in JSON.")

            return PanCard(**data_dict).model_dump()

        except json.JSONDecodeError:
            print(f"   ‚ö†Ô∏è JSON Parse Error. Raw Gemma Output: {response_text}")
            return None
        except Exception as e:
            print(f"   ‚ö†Ô∏è Gemma Parsing Failed: {e}")
            return None

    def search_by_pan(self, pan_number):
        hits = self.qdrant.scroll(
            collection_name=DB_NETWORK,
            scroll_filter=models.Filter(must=[models.FieldCondition(key="pan_number", match=models.MatchValue(value=pan_number))]),
            limit=1
        )[0]
        return {"status": "FOUND", "data": hits[0].payload} if hits else {"status": "NOT FOUND"}

    # --- MAIN PIPELINE ---
    def verify_document(self, input_path):
        print(f"\nüîç Processing: {input_path}")

        # 1. Semantic Check (CLIP)
        img_emb = self.clip.encode(Image.open(input_path), convert_to_tensor=True)
        scores = util.cos_sim(img_emb, self.label_embeddings)[0]
        if self.doc_labels[np.argmax(scores.cpu().numpy())] != "Indian PAN Card":
            return "‚ùå REJECTED: Wrong Document Type"

        # 2. Structure Check (Vector Search - BGE Base)
        vector = self.get_embedding(input_path)
        layout_hits = self.qdrant.query_points(collection_name=DB_TEMPLATES, query=vector, limit=1).points
        if not layout_hits or layout_hits[0].score < 0.78:
            return "‚ùå REJECTED: Structural Mismatch"

        # 3. Fraud Check (Duplicate Image)
        fraud_hits = self.qdrant.query_points(collection_name=DB_NETWORK, query=vector, limit=1).points
        if fraud_hits and fraud_hits[0].score > 0.98:
            return "‚ùå REJECTED: FRAUD (Duplicate Image Submission)"

        # 4. Biometrics
        try:
            DeepFace.extract_faces(img_path=input_path, detector_backend="opencv", enforce_detection=True)
        except:
            return "‚ùå REJECTED: No Face Detected."

        # 5. DATA EXTRACTION (Hybrid: Florence-2 + Gemma 3)
        raw_text = self.read_text_with_florence(input_path)
        metadata = self.structure_metadata_with_gemma(raw_text)

        if not metadata:
             return "‚ö†Ô∏è REJECTED: Metadata Extraction Failed."

        # --- [NEW] DISPLAY FULL METADATA ---
        print("\nüìÑ FULL EXTRACTED METADATA:")
        print(json.dumps(metadata, indent=4))
        print("-" * 30)
        # -----------------------------------

        pan_id = metadata.get("pan_number")
        full_name = metadata.get("full_name")

        # 6. Database Check (Identity Theft)
        if pan_id:
            db_check = self.search_by_pan(pan_id)
            if db_check['status'] == "FOUND":
                 return f"‚ùå REJECTED: FRAUD (PAN {pan_id} already registered)"

        # 7. Success -> Index
        self.qdrant.upsert(
            collection_name=DB_NETWORK,
            points=[models.PointStruct(
                id=str(uuid.uuid4()),
                vector=vector,
                payload={
                    "source": input_path,
                    "status": "verified",
                    "pan_number": pan_id,
                    "metadata": metadata
                }
            )]
        )
        return f"‚úÖ ACCEPTED & INDEXED: {pan_id}"

    def index_template(self, template_path):
        print(f"üìÇ Indexing Template: {template_path}")
        original_vec = self.get_embedding(template_path)
        points = [models.PointStruct(id=0, vector=original_vec, payload={"type": "original"})]
        img_cv = cv2.imread(template_path)
        img_cv = cv2.cvtColor(img_cv, cv2.COLOR_BGR2RGB)
        for i in range(5):
            aug_data = self.aug(image=img_cv)["image"]
            aug_pil = Image.fromarray(aug_data)
            aug_vec = self.clip.encode(aug_pil).tolist()
            points.append(models.PointStruct(id=i+1, vector=aug_vec, payload={"type": "augmented"}))
        self.qdrant.upsert(collection_name=DB_TEMPLATES, points=points)



In [None]:
# --- INITIALIZATION ---
PAN_TEMPLATE_PATH = "/content/convolve_pan_template.jpeg"
agent = TrustLensPipeline()
if os.path.exists(PAN_TEMPLATE_PATH):
    agent.index_template(PAN_TEMPLATE_PATH)

In [None]:
# ----------------------------------------
# üì∏ PASTE YOUR UPLOADED IMAGE NAME HERE
# ----------------------------------------
TEST_IMAGE = "/content/Pan_Test1.jpeg"

if os.path.exists(TEST_IMAGE):
    result = agent.verify_document(TEST_IMAGE)
    print(f"\nüèÜ FINAL RESULT: {result}")
else:
    print(f"‚ùå Error: File '{TEST_IMAGE}' not found.")

In [None]:
# ----------------------------------------
# üîé SEARCH DATABASE & DISPLAY IMAGE (Safe Version)
# ----------------------------------------
from IPython.display import display
from IPython.display import Image as IPImage  # <--- ALIASING PREVENTS THE BUG
import os

# 1. Input the PAN Number to fetch
QUERY_PAN = "BCFGH123AD"  # <--- REPLACE WITH TARGET PAN

print(f"üîé Searching Database for PAN: {QUERY_PAN}...")

# 2. Search the Qdrant DB
result = agent.search_by_pan(QUERY_PAN)

# 3. Process & Display
if result["status"] == "FOUND":
    # Extract the actual data payload
    record = result["data"]

    # Get fields
    stored_pan = record.get("pan_number")
    image_path = record.get("source")
    metadata   = record.get("metadata", {})
    status     = record.get("status")

    print("\n‚úÖ RECORD FOUND:")
    print(f"   üÜî PAN ID:     {stored_pan}")
    print(f"   üë§ Name:       {metadata.get('full_name', 'N/A')}")
    print(f"   üõ°Ô∏è Status:     {status}")
    print(f"   üìÇ Local Path: {image_path}")

    # 4. Fetch and Display Image
    if image_path and os.path.exists(image_path):
        print("\nüëá RETRIEVED DOCUMENT IMAGE:")
        # We use IPImage here so we don't break PIL's 'Image'
        display(IPImage(filename=image_path, width=400))
    else:
        raise FileNotFoundError(f"‚ùå CRITICAL ERROR: Image file for PAN {QUERY_PAN} is missing at path: '{image_path}'")

else:
    raise ValueError(f"üö´ NOT FOUND: PAN '{QUERY_PAN}' does not exist in the registry.")