In [1]:
!pip install pytesseract

Collecting pytesseract
  Downloading pytesseract-0.3.13-py3-none-any.whl.metadata (11 kB)
Downloading pytesseract-0.3.13-py3-none-any.whl (14 kB)
Installing collected packages: pytesseract
Successfully installed pytesseract-0.3.13


In [2]:
!pip install 'git+https://github.com/facebookresearch/detectron2.git'

Collecting git+https://github.com/facebookresearch/detectron2.git
  Cloning https://github.com/facebookresearch/detectron2.git to /tmp/pip-req-build-4uu6en1p
  Running command git clone --filter=blob:none --quiet https://github.com/facebookresearch/detectron2.git /tmp/pip-req-build-4uu6en1p
  Resolved https://github.com/facebookresearch/detectron2.git to commit a1ce2f956a1d2212ad672e3c47d53405c2fe4312
  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting yacs>=0.1.8 (from detectron2==0.6)
  Downloading yacs-0.1.8-py3-none-any.whl.metadata (639 bytes)
Collecting fvcore<0.1.6,>=0.1.5 (from detectron2==0.6)
  Downloading fvcore-0.1.5.post20221221.tar.gz (50 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m50.2/50.2 kB[0m [31m2.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting iopath<0.1.10,>=0.1.7 (from detectron2==0.6)
  Downloading iopath-0.1.9-py3-none-any.whl.metadata (370 bytes)
Collecting hydra-core>=

In [1]:
!pip install easyocr

Collecting easyocr
  Downloading easyocr-1.7.2-py3-none-any.whl.metadata (10 kB)
Collecting python-bidi (from easyocr)
  Downloading python_bidi-0.6.6-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.9 kB)
Collecting pyclipper (from easyocr)
  Downloading pyclipper-1.3.0.post6-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (9.0 kB)
Collecting ninja (from easyocr)
  Downloading ninja-1.13.0-py3-none-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (5.1 kB)
Downloading easyocr-1.7.2-py3-none-any.whl (2.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.9/2.9 MB[0m [31m36.3 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading ninja-1.13.0-py3-none-manylinux2014_x86_64.manylinux_2_17_x86_64.whl (180 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m180.7/180.7 kB[0m [31m1.7 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pyclipper-1.3.0.post6-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (963 k

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
from torch.utils.data import Dataset
from PIL import Image
import os

class ImageDataset(Dataset):
    def __init__(self,txt_path,img_path,box_path,processor,label2id,img_dir):
        self.words,self.labels,self.bboxes,self.images = self._load_data(txt_path,img_path,box_path,img_dir)
        self.processor = processor
        self.label2id = label2id

    def _load_data(self,txt_path,img_path,box_path,img_dir):
        words_per_doc,labels_per_doc,bboxes_per_doc,images = [],[],[],[]
        with open(txt_path,'r',encoding='utf-8') as f_text,\
             open(img_path,'r',encoding='utf-8') as f_img,\
             open(box_path,'r',encoding='utf-8') as f_box:

            words,labels,boxes = [],[],[]
            curr_file_name = None
            img = None

            for (line_text,line_img,line_box) in zip(f_text,f_img,f_box):
                if line_text.strip() == "":
                    if words:
                        words_per_doc.append(words)
                        labels_per_doc.append(labels)
                        bboxes_per_doc.append(boxes)
                        images.append(img)
                        words,labels,boxes = [],[],[]
                    continue

                word,label = line_text.strip().split("\t")
                word_box = list(map(int, line_box.strip().split("\t")[1].split()))
                img_info = line_img.strip().split("\t")
                filename = img_info[-1]

                if curr_file_name != filename:
                    curr_file_name = filename
                    img = Image.open(os.path.join(img_dir,filename)).convert("RGB")

                words.append(word)
                labels.append(label)
                boxes.append(word_box)

        print("Dataset loaded successfully")
        return words_per_doc,labels_per_doc,bboxes_per_doc,images

    def __len__(self):
        return len(self.words)

    def __getitem__(self,idx):
        encoding = self.processor(
            self.images[idx],
            self.words[idx],
            boxes=self.bboxes[idx],
            word_labels=[self.label2id[label] for label in self.labels[idx]],
            truncation=True,
            padding="max_length",
            max_length=512,
            return_tensors="pt"
        )
        return {k:v.squeeze(0) for k,v in encoding.items()}

In [4]:
from transformers import LayoutLMv2ForTokenClassification
import torch

class LayoutLM_Model:
    def __init__(self, model_path,num_labels,id2label,label2id,processor,dataset):
        print("Initializing LayoutLM_Model....")
        self.model_path = model_path
        self.num_labels = num_labels
        self.id2label = id2label
        self.label2id = label2id
        self.processor = processor
        self.dataset = dataset
        self.device =  "cuda" if torch.cuda.is_available() else "cpu"
        self.model = None
        self._load_model()

    def _load_model(self):
        self.model = LayoutLMv2ForTokenClassification.from_pretrained(
            self.model_path,
            num_labels=self.num_labels,
            id2label=self.id2label,
            label2id=self.label2id
        ).to(self.device)
        self.model.eval()
        print("Model loaded successfully")

    def model_infer(self,sample_idx):
        """
        Run inference on a single sample from the dataset.

        Args:
            model: Trained LayoutLM model
            processor: LayoutLM processor
            dataset: Dataset object containing images, words, and bboxes
            sample_idx: Index of the sample to test (e.g., 0, 1, 2...)
        """
        # Set model to evaluation mode
        if self.model is None:
            raise ValueError("Model is not loaded")
        # Get the selected sample
        sample = self.dataset[sample_idx]
        # Prepare input (exclude labels)
        inputs = {k: v.unsqueeze(0).to(self.device) for k, v in sample.items() if k != "labels"}
        # Run inference
        with torch.inference_mode():
            outputs = self.model(**inputs)
            preds = outputs.logits.argmax(-1).squeeze().cpu().numpy()

        # Recreate encoding for alignment
        encoding = self.processor(
            self.dataset.images[sample_idx],
            self.dataset.words[sample_idx],
            boxes=self.dataset.bboxes[sample_idx],
            truncation=True,
            padding="max_length",
            max_length=512,
            return_tensors="pt"
        )

        # Initialize expected fields
        results = {"company": "", "address": "", "total": "", "date": ""}

        # Track seen words to avoid duplicates
        seen = {k: set() for k in results.keys()}

        # Decode predictions
        for idx, label_id in enumerate(preds):
            word_idx = encoding.word_ids(batch_index=0)[idx]
            if word_idx is None:
                continue  # Skip special/padding tokens

            label = self.id2label[label_id]
            if label != "O":
                key = label.replace("S-", "").lower()
                if key in results:
                    word = self.dataset.words[sample_idx][word_idx]
                    if word not in seen[key]:
                        results[key] += " " + word
                        seen[key].add(word)

        # Clean whitespace
        results = {k: v.strip() for k, v in results.items()}
        return results


In [22]:
import google.generativeai as genai
import json
import os

class DocumentReasoningAgent:

    def __init__(self, api_key: str, model_name: str = "gemini-2.0-flash"):
        os.environ['GEMINI_API_KEY'] = api_key
        genai.configure(api_key=os.environ['GEMINI_API_KEY'])
        self.model = genai.GenerativeModel(model_name)

    def build_prompt(self, extracted_json: dict, ocr_text: str):
        prompt = f"""
        You are a document reasoning agent, specialized in receipts.
        You will be given:
        1. Extracted structured json from a receipt.
        2. The OCR text of the receipt.

        Tasks:
        - Detect any missing or inconsistent fields/data in the structured json.
        - Identify possible vendor names from context if extraction failed or missing from json.
        - Correct any obvious inconsistencies (e.g date formats, totals)
        - Add an "agent_comment" summarizing what u fixed or inferred.
        - Respond ONLY in JSON format with these keys:

        {{
            "company": "...",
            "date": "...",
            "address": "...",
            "total": "...",
            "agent_comment": "..."
        }}

        Example Input:
        {{
            "company": "",
            "date": "2020-10-02",
            "address": "xyz street",
            "total": ""
        }}

        Example OCR Text:
        "McDonald's\n02/10/2020\nShop 12, Main Street\nTotal: 15.90"

        Example Output:
        {{
            "company": "McDonald's",
            "date": "02/10/2020",
            "address": "Shop 12, Main Street",
            "total": "15.90",
            "agent_comment": "The company name and total were inferred from the OCR text."
        }}

        Extracted JSON: {json.dumps(extracted_json,indent=2)}
        OCR Text: {ocr_text}
        """
        return prompt

    def build_rejection_prompt(self, reason: str, metrics: dict):
        prompt = f"""
        You are a document validation agent.

        The provided document image has been flagged as potentially unreadable or invalid
        due to image quality issues such as watermarks, obstructions, or black overlays.

        Metrics from the vision detector:
        {json.dumps(metrics, indent=2)}

        Task:
        - Review the metrics and reasoning summary.
        - Return a JSON response explaining clearly *why* the document was rejected.

        Respond ONLY in JSON format:
        {{
            "status": "rejected",
            "reason": "{reason}",
            "agent_comment": "..."
        }}
        """
        return prompt

    def _generate_json_response(self, prompt: str):
        response = self.model.generate_content(prompt)
        raw_output = response.candidates[0].content.parts[0].text.strip()

        if raw_output.startswith("```"):
            raw_output = raw_output.strip("`")
            raw_output = raw_output.replace("json", "").strip()

        try:
            result = json.loads(raw_output)
        except json.JSONDecodeError:
            result = {"error": "Invalid JSON response", "raw_output": raw_output}
        return result

    def infer(self, extracted_json: dict, ocr_text: str):
        prompt = self.build_prompt(extracted_json, ocr_text)
        result = self._generate_json_response(prompt)
        if isinstance(result, dict) and "address" in result and isinstance(result["address"], str):
            result["address"] = result["address"].replace("\n", " ").strip()
        return result

    def infer_rejection(self, reason: str, metrics: dict):
        prompt = self.build_rejection_prompt(reason, metrics)
        return self._generate_json_response(prompt)


In [23]:
import cv2
import numpy as np
from PIL import Image

class WatermarkDetector:
    def __init__(self, dark_threshold=30, dark_ratio_threshold=0.3):
        self.dark_threshold = dark_threshold
        self.dark_ratio_threshold = dark_ratio_threshold

    def is_obscured(self, img):
        if img is None:
            return False, {"error": "image not provided or invalid"}

        # ✅ Corrected: proper check for PIL Image
        if isinstance(img, Image.Image):
            img = np.array(img)

        # Convert to grayscale if needed
        if len(img.shape) == 3:
            gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        else:
            gray = img

        # Normalize and compute darkness ratio
        dark_pixels = np.sum(gray < self.dark_threshold)
        total_pixels = gray.size
        dark_ratio = dark_pixels / total_pixels

        # Basic threshold flag
        flagged = dark_ratio > self.dark_ratio_threshold

        # Optional: Contour check for large dark rectangles
        _, thresh = cv2.threshold(gray, self.dark_threshold, 255, cv2.THRESH_BINARY_INV)
        contours, _ = cv2.findContours(thresh, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
        large_rects = [cv2.contourArea(c) for c in contours if cv2.contourArea(c) > 5000]
        if len(large_rects) > 0:
            flagged = True

        return flagged, {"dark_ratio": dark_ratio, "large_rects": len(large_rects)}


In [24]:
import easyocr
from PIL import Image
import numpy as np

class ReceiptOCR:
    def __init__(self, lang_list=['en'], gpu=True):
        self.reader = easyocr.Reader(lang_list, gpu=gpu)

    def extract_text(self, image):
        """
        Extracts plain text from a receipt image (PIL or NumPy) using EasyOCR.
        Returns a dictionary ready to feed into an LLM alongside structured JSON.
        """
        # Ensure image is in a NumPy format (EasyOCR expects NumPy array)
        if isinstance(image, Image.Image):
            image = np.array(image)

        # Run OCR
        results = self.reader.readtext(image, detail=1)

        # Combine text lines in reading order
        text_lines = [res[1] for res in results]
        full_text = "\n".join(text_lines).strip()

        # Prepare structured output
        data = {
            "ocr_text": full_text,
            "lines": text_lines,
        }

        return data


In [25]:
from transformers import LayoutLMv2Processor

label2id = {
    "O": 0,
    "S-COMPANY": 1,
    "S-DATE": 2,
    "S-ADDRESS": 3,
    "S-TOTAL": 4,
}
id2label = {v:k for k,v in label2id.items()}

processor = LayoutLMv2Processor.from_pretrained("microsoft/layoutlmv2-base-uncased",apply_ocr=False)

test_dataset = ImageDataset(
    txt_path="/content/drive/MyDrive/UnikrewTest/processed_data/test/test.txt",
    img_path="/content/drive/MyDrive/UnikrewTest/processed_data/test/test_img.txt",
    box_path="/content/drive/MyDrive/UnikrewTest/processed_data/test/test_box.txt",
    processor=processor,
    label2id=label2id,
    img_dir="/content/drive/MyDrive/UnikrewTest/dataset/test/img"
)

inference_model = LayoutLM_Model(
    model_path="/content/drive/MyDrive/UnikrewTest/modules/model",
    num_labels=len(label2id),
    id2label=id2label,
    label2id=label2id,
    processor=processor,
    dataset=test_dataset
)

Dataset loaded successfully
Initializing LayoutLM_Model....
Model loaded successfully


In [26]:
receipt_ocr = ReceiptOCR()
detector = WatermarkDetector()

In [30]:
idx = 0
sample = test_dataset[idx]
sample_img = test_dataset.images[idx]

In [31]:
agent = DocumentReasoningAgent(api_key="AIzaSyCh2vElbkO3E6AwX-1kzkuK6noqr2uueUo")
flagged,metrics = detector.is_obscured(sample_img)
if flagged:
  result = agent.infer_rejection(reason="Detected possible watermark or dark obstructions over text regions.",metrics=metrics)
else:
  extracted_json = inference_model.model_infer(idx)
  ocr_text = receipt_ocr.extract_text(sample_img)
  result = agent.infer(extracted_json,ocr_text)

print(result)

{'company': 'OJC MARKETING SDN BHD', 'date': '15/01/2019', 'address': 'NO 2 & 4 JALAN BAYU 4_, BANDAR SERI ALAM, 81750 MASAI; JOHOR', 'total': '193.00', 'agent_comment': 'The company name and address were inferred from the OCR text. The total and date were already present in the extracted JSON and matched the OCR text.'}
