In [None]:
from paddleocr import PaddleOCR
import pypdfium2 as pdfium
from PIL import Image, ImageEnhance
import numpy as np
import cv2
import matplotlib.pyplot as plt
import os
import pandas as pd

os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"

In [None]:
def convertPdfToImage(path):
    pdf = pdfium.PdfDocument(path)
    n_pages = len(pdf)
    images = []
    for page_number in range(n_pages):
        page = pdf.get_page(page_number)
        pil_image = page.render(
            scale=1,
            rotation=0,
            crop=(0, 0, 0, 0),
            grayscale=True
        )
        image = pil_image.to_pil()
        images.append(image)
    return images

In [None]:
# COMMON LIBRARIES
import os
import cv2

from datetime import datetime

# DATA SET PREPARATION AND LOADING
from detectron2.data.datasets import register_coco_instances
from detectron2.data import DatasetCatalog, MetadataCatalog

# VISUALIZATION
from detectron2.utils.visualizer import Visualizer
from detectron2.utils.visualizer import ColorMode

# CONFIGURATION
from detectron2 import model_zoo
from detectron2.config import get_cfg

# EVALUATION
from detectron2.engine import DefaultPredictor

# TRAINING
from detectron2.engine import DefaultTrainer

import numpy as np
import cv2

# HYPERPARAMETERS
ARCHITECTURE = "mask_rcnn_R_101_FPN_3x"
CONFIG_FILE_PATH = f"COCO-InstanceSegmentation/{ARCHITECTURE}.yaml"
MAX_ITER = 3000
EVAL_PERIOD = 200
BASE_LR = 0.001
NUM_CLASSES = 3

train_images_path = "C:/Users/Arpit/Downloads/Signature Extractor.v1i.coco/train"
test_images_path = "C:/Users/Arpit/Downloads/Signature Extractor.v1i.coco/test"
val_images_path = "C:/Users/Arpit/Downloads/Signature Extractor.v1i.coco/valid"

train_annotation_path = "C:/Users/Arpit/Downloads/Signature Extractor.v1i.coco/annotations/train_annotations.coco.json"
test_annotation_path = "C:/Users/Arpit/Downloads/Signature Extractor.v1i.coco/annotations/test_annotations.coco.json"
val_annotation_path = "C:/Users/Arpit/Downloads/Signature Extractor.v1i.coco/annotations/valid_annotations.coco.json"

train_dataset_name = "train_dataset"
test_dataset_name = "test_dataset"
val_dataset_name = "valid_dataset"

# if your dataset is in COCO format, this cell can be replaced by the following three lines:
from detectron2.data.datasets import register_coco_instances
register_coco_instances(train_dataset_name, {}, train_annotation_path, train_images_path)
register_coco_instances(test_dataset_name, {}, test_annotation_path, test_images_path)
register_coco_instances(val_dataset_name, {}, val_annotation_path, val_images_path)

metadata = MetadataCatalog.get(train_dataset_name)
dataset_train = DatasetCatalog.get(train_dataset_name)

class Detector:
    def __init__(self):
        self.cfg = get_cfg()
        self.cfg.merge_from_file(model_zoo.get_config_file(CONFIG_FILE_PATH))
        self.cfg.MODEL.WEIGHTS = model_zoo.get_checkpoint_url(CONFIG_FILE_PATH)
        self.cfg.DATASETS.TRAIN = (train_dataset_name,)
        self.cfg.DATASETS.TEST = (test_dataset_name,)
        self.cfg.MODEL.ROI_HEADS.BATCH_SIZE_PER_IMAGE = 128
        self.cfg.TEST.EVAL_PERIOD = EVAL_PERIOD
        self.cfg.DATALOADER.NUM_WORKERS = 2
        self.cfg.SOLVER.IMS_PER_BATCH = 2
        self.cfg.INPUT.MASK_FORMAT='bitmask'
        self.cfg.SOLVER.BASE_LR = BASE_LR
        self.cfg.SOLVER.MAX_ITER = MAX_ITER
        self.cfg.MODEL.ROI_HEADS.NUM_CLASSES = NUM_CLASSES
        self.cfg.MODEL.DEVICE = 'cpu'
        #Load model config and pretrained model
        self.cfg.MODEL.ROI_HEADS.SCORE_THRESH_TEST = 0.5  # set threshold for this model
        self.cfg.MODEL.DEVICE = 'cpu'
        self.cfg.MODEL.WEIGHTS = os.path.join("model_final.pth")
        self.predictor = DefaultPredictor(self.cfg)

    def onImage_path(self, image_path):
        image = cv2.imread(image_path)
        predictions = self.predictor(image)
        viz = Visualizer(image[:,:,::-1], metadata = metadata, instance_mode = ColorMode.IMAGE_BW)
        output = viz.draw_instance_predictions(predictions["instances"].to("cpu"))
        return output
        
    def onImage(self, image):
        if(image is None): 
            return None
        predictions = self.predictor(image)
        # viz = Visualizer(image[:,:,::-1], metadata = metadata, instance_mode = ColorMode.IMAGE_BW)
        # output = viz.draw_instance_predictions(predictions["instances"].to("cpu"))
        return predictions

In [None]:
def convertPdfToImage(pdf_bytes):
    pdf = pdfium.PdfDocument(pdf_bytes)
    n_pages = len(pdf)
    for page_number in range(n_pages):
        page = pdf.get_page(page_number)
        pil_image = page.render(
            scale=1,
            rotation=0,
            crop=(0, 0, 0, 0),
            grayscale=False
        )
    image = pil_image.to_pil()
    return image

In [None]:
ocr = PaddleOCR(det_model_dir='C:/Users/Arpit/Downloads/en_PP-OCRv3_det_distill_train', rec_model_dir='C:/Users/Arpit/Downloads/en_PP-OCRv3_rec_train', use_gpu=False)
detector = Detector()

In [None]:
def get_target_text(image, target_text):
    result = ocr.ocr(image, cls=False)
    target_text_found = False
    bbox_coords = None
    for line in result:
        for word_info in line:
            box = word_info[0]
            text = word_info[1][0]
            text = str(text)
            if target_text in text:
                bbox_coords = np.array(box, dtype=np.int32)
                target_text_found = True
                break
            if target_text_found:
                break

            if bbox_coords is not None:
                # Crop the bounding box area from the image
                xmin = int(min(p[0] for p in bbox_coords)) + 83
                xmax = int(max(p[0] for p in bbox_coords)) + 0
                ymin = int(min(p[1] for p in bbox_coords)) - 20
                ymax = int(max(p[1] for p in bbox_coords)) - 20

                xmax += 70
                ymax += 40
    return bbox_coords

def get_pensioner_sign_area(image):
    affixed_bbox = get_target_text(image, "affixed")
    
    xmin = affixed_bbox[2][0]
    xmax = image.shape[1]
    ymin = 0
    ymax = affixed_bbox[2][1] + 50
    
    roi = [xmin, xmax, ymin, ymax]
    cropped = image[ymin:ymax, xmin:xmax]
    return cropped, roi

In [None]:
parallel_path = "C:/Users/Arpit/Downloads/SCANNED LIFE CERTIFICATES/SCANNED LIFE CERTIFICATES/Paralell"
pdfs_in_parallel = os.listdir(parallel_path)

In [None]:
import os
import csv
import numpy as np
import io

# Define the path to save the CSV file
csv_file_path = 'final_stats.csv'

# Initialize the CSV file with headers if it doesn't exist
if not os.path.exists(csv_file_path):
    with open(csv_file_path, mode='w', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(['PDF File', 'Status', 'ROI', 'Signature_box' 'Error Message'])
        
# Load the list of already processed files
processed_files = set()
if os.path.exists(csv_file_path):
    with open(csv_file_path, mode='r') as file:
        reader = csv.reader(file)
        next(reader)  # Skip header
        for row in reader:
            processed_files.add(row[0])

# Process PDFs in parallel
for pdf in pdfs_in_parallel:
   
    if pdf in processed_files:
        # Skip this file as it has already been processed
        continue

    path = os.path.join(parallel_path, pdf)
    try:
        image = convertPdfToImage(path)
        image = np.array(image)

        if image is None:
            # Log error and save to CSV (error in converting PDF to image)
            print("image None")
            with open(csv_file_path, mode='a', newline='') as file:
                writer = csv.writer(file)
                writer.writerow([pdf, 'Failed', 'NA', 'NA', 'Error in converting PDF to image'])
            continue
        
        cropped, roi = get_pensioner_sign_area(image)
        print(cropped.shape)
        if roi is None or roi == []:
            # Log error and save to CSV (ROI not found, OCR error)
            print("roi none")
            with open(csv_file_path, mode='a', newline='') as file:
                
                writer = csv.writer(file)
                writer.writerow([pdf, 'Failed', 'NA', 'NA', 'ROI not found or OCR error'])
            continue
        
        if cropped is None:
            print("cropped none")
            
            # Log error and save to CSV (error in cropping the signature area)
            with open(csv_file_path, mode='a', newline='') as file:
                writer = csv.writer(file)
                writer.writerow([pdf, 'Failed', 'NA', 'NA', 'Error in cropping the signature area'])
            continue
        
        outputs = detector.onImage(cropped)
        # viz = Visualizer(image[:,:,::-1], metadata = metadata, instance_mode = ColorMode.IMAGE_BW)
        # output = viz.draw_instance_predictions(outputs["instances"].to("cpu"))
        print(outputs)
        
         # Check if outputs contain predictions
        if outputs is None or outputs['instances'].pred_boxes.tensor.size(0) == 0:
            print("pred none")
            with open(csv_file_path, mode='a', newline='') as file:
                writer = csv.writer(file)
                writer.writerow([pdf, 'Failed', f'{roi}', 'NA', 'Signature not detected'])
            continue
        
        pred_boxes = outputs['instances'].pred_boxes.tensor.tolist()
        pred_classes = outputs['instances'].pred_classes.numpy()
        print(pred_boxes)
            
        # If everything is successful, log success
        with open(csv_file_path, mode='a', newline='') as file:
            writer = csv.writer(file)
            writer.writerow([pdf, 'Success', f'{roi}', f'{pred_boxes}', 'Processed successfully'])
    
    except Exception as e:
        
        # Log unexpected errors to CSV
        with open(csv_file_path, mode='a', newline='') as file:
            writer = csv.writer(file)
            writer.writerow([pdf, 'Failed', 'NA', 'NA' f'Unexpected error: {str(e)}'])