# IDP Backend Server

This notebook runs the FastAPI backend server with ngrok tunnel for document processing.

In [None]:
# Install required packages
!pip install -q fastapi==0.110.0 \
    uvicorn[standard]==0.27.1 \
    python-multipart==0.0.9 \
    torch torchvision --index-url https://download.pytorch.org/whl/cpu \
    transformers==4.38.2 \
    Pillow==10.2.0 \
    paddleocr==2.7.0.3 \
    python-magic==0.4.27 \
    spacy==3.7.4 \
    pyngrok==7.1.5 \
    python-dotenv==1.0.1 \
    nest-asyncio==1.6.0

In [None]:
# Download spaCy model
!python -m spacy download en_core_web_sm

In [None]:
# Create necessary directories
!mkdir -p models/layout models/spacy data/annotations

In [None]:
%%writefile app.py
from fastapi import FastAPI, File, UploadFile, HTTPException
from fastapi.middleware.cors import CORSMiddleware
from typing import Dict, Any, List, Optional
import torch
from PIL import Image
import io
import numpy as np
from paddleocr import PaddleOCR
import spacy
from transformers import LayoutLMv3Processor, LayoutLMv3ForTokenClassification
import re
import magic

class DocumentProcessor:
    def __init__(self):
        self.device = "cuda" if torch.cuda.is_available() else "cpu"
        print(f"Using device: {self.device}")
        
        # Initialize models
        self.ocr = PaddleOCR(use_angle_cls=True, lang='en', use_gpu=torch.cuda.is_available(), show_log=False)
        self.nlp = spacy.load('en_core_web_sm')
        
        # Initialize LayoutLMv3
        layout_model_name = "microsoft/layoutlmv3-base"
        self.layout_processor = LayoutLMv3Processor.from_pretrained(layout_model_name)
        self.layout_model = LayoutLMv3ForTokenClassification.from_pretrained(
            layout_model_name,
            num_labels=len(self.label2id)
        ).to(self.device)

        # Label mappings
        self.label2id = {
            "O": 0,
            "B-invoice_number": 1,
            "B-date": 2,
            "B-total_amount": 3,
            "B-tax_amount": 4,
            "B-vendor_name": 5,
            "B-customer_name": 6,
            "B-line_item": 7,
            "B-quantity": 8,
            "B-unit_price": 9,
            "B-description": 10
        }
        self.id2label = {v: k for k, v in self.label2id.items()}

    def process_image(self, image: Image.Image) -> Dict[str, Any]:
        img_array = np.array(image)
        
        # Extract text with OCR
        ocr_result = self.ocr.ocr(img_array)
        text_blocks = self._extract_text_blocks(ocr_result)
        
        # Process with LayoutLMv3
        layout_fields = self._process_with_layout(image, text_blocks)
        
        # Enhance with NER
        enhanced_fields = self._enhance_with_spacy(layout_fields)
        
        # Apply pattern matching
        final_fields = self._apply_pattern_matching(enhanced_fields)
        
        # Classify document
        doc_type = self._classify_document_type(final_fields)
        
        return {
            "fields": final_fields,
            "documentType": doc_type,
            "confidence": self._calculate_confidence(final_fields)
        }

    def _extract_text_blocks(self, ocr_result):
        text_blocks = []
        for block in ocr_result:
            for line in block:
                bbox = line[0]
                text = line[1][0]
                confidence = line[1][1]
                
                text_blocks.append({
                    "text": text,
                    "bbox": [
                        min(p[0] for p in bbox),
                        min(p[1] for p in bbox),
                        max(p[0] for p in bbox),
                        max(p[1] for p in bbox)
                    ],
                    "confidence": confidence
                })
        return text_blocks

    def _process_with_layout(self, image, text_blocks):
        encoding = self.layout_processor(
            image,
            text=[block["text"] for block in text_blocks],
            boxes=[block["bbox"] for block in text_blocks],
            return_tensors="pt"
        ).to(self.device)
        
        with torch.no_grad():
            outputs = self.layout_model(**encoding)
            predictions = outputs.logits.argmax(-1).squeeze().cpu().numpy()
        
        fields = []
        for idx, (pred, block) in enumerate(zip(predictions, text_blocks)):
            label = self.id2label[pred]
            if label != "O":
                field_type = label[2:]
                fields.append({
                    "id": f"{field_type}_{len(fields)}",
                    "label": field_type.replace("_", " ").title(),
                    "value": block["text"],
                    "confidence": float(outputs.logits.softmax(-1)[0, idx, pred].cpu().numpy()),
                    "bbox": block["bbox"]
                })
        
        return fields

    def _enhance_with_spacy(self, fields):
        enhanced_fields = []
        for field in fields:
            doc = self.nlp(field["value"])
            entities = [(ent.text, ent.label_) for ent in doc.ents]
            if entities:
                field["confidence"] = min(1.0, field["confidence"] + 0.1)
                field["entities"] = entities
            enhanced_fields.append(field)
        return enhanced_fields

    def _apply_pattern_matching(self, fields):
        patterns = {
            'invoice_number': r'(?i)inv[oice]*[\s#:]+([A-Z0-9-]+)',
            'date': r'(?i)(?:date[d:\s]*)?(\d{1,2}[-/]\d{1,2}[-/]\d{2,4})',
            'amount': r'\$?\s*(\d{1,3}(?:,\d{3})*(?:\.\d{2})?)',
            'email': r'[\w\.-]+@[\w\.-]+\.\w+',
            'phone': r'(?:\+\d{1,3}[-\.\s]?)?\(?\d{3}\)?[-\.\s]?\d{3}[-\.\s]?\d{4}'
        }
        
        for field in fields:
            for pattern_type, pattern in patterns.items():
                if re.search(pattern, field["value"]):
                    field["pattern_match"] = pattern_type
                    field["confidence"] = min(1.0, field["confidence"] + 0.05)
        
        return fields

    def _classify_document_type(self, fields):
        text = " ".join(f"{field['label']} {field['value']}" for field in fields).lower()
        
        if any(word in text for word in ["invoice", "bill to"]):
            return "invoice"
        elif any(word in text for word in ["purchase order", "po number"]):
            return "purchase_order"
        elif any(word in text for word in ["receipt", "merchant"]):
            return "receipt"
        else:
            return "unknown"

    def _calculate_confidence(self, fields):
        if not fields:
            return 0.0
        return sum(field["confidence"] for field in fields) / len(fields)

app = FastAPI()

app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

doc_processor = DocumentProcessor()

ALLOWED_MIME_TYPES = ['image/jpeg', 'image/png', 'image/tiff', 'application/pdf']

@app.post("/process-document")
async def process_document(file: UploadFile = File(...)):
    try:
        content = await file.read()
        mime = magic.Magic(mime=True)
        file_type = mime.from_buffer(content)
        
        if file_type not in ALLOWED_MIME_TYPES:
            raise HTTPException(status_code=400, detail=f"Unsupported file type: {file_type}")
        
        image = Image.open(io.BytesIO(content))
        result = doc_processor.process_image(image)
        return result
        
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/health")
async def health_check():
    return {"status": "healthy"}

In [None]:
# Setup ngrok tunnel
from google.colab import userdata
NGROK_TOKEN = userdata.get('NGROK_TOKEN')  # Set this in Colab secrets

if not NGROK_TOKEN:
    print("Please set your ngrok auth token in Colab secrets!")
    print("1. Go to https://dashboard.ngrok.com/get-started/your-authtoken")
    print("2. Copy your auth token")
    print("3. In Colab: Runtime -> Secrets -> Add new secret")
    print("4. Name: NGROK_TOKEN, Value: your_token")
    raise ValueError("Ngrok token not found")

from pyngrok import ngrok
ngrok.set_auth_token(NGROK_TOKEN)

public_url = ngrok.connect(8000)
print(f"\nBackend URL: {public_url}")
print("\nUpdate your frontend .env file with:")
print(f"VITE_API_URL={public_url}")

In [None]:
# Start the FastAPI server
import nest_asyncio
import uvicorn

nest_asyncio.apply()
uvicorn.run("app:app", host="0.0.0.0", port=8000)