# Importing Libraries

In [None]:
import os
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.rpn import AnchorGenerator
from torchvision.models.vgg import vgg16
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import matplotlib.pyplot as plt
import time
import copy
from tqdm import tqdm

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")

# Data Preprocessing

In [None]:
DATA_ROOT = 'signature_data'
TRAIN_IMAGES_DIR = os.path.join(DATA_ROOT, 'train', 'images')
TRAIN_LABELS_DIR = os.path.join(DATA_ROOT, 'train', 'labels')
TEST_IMAGES_DIR = os.path.join(DATA_ROOT, 'test', 'images')
TEST_LABELS_DIR = os.path.join(DATA_ROOT, 'test', 'labels')

# Create custom dataset
class SignatureDataset(Dataset):
    def __init__(self, image_dir, label_dir, transform=None):
        self.image_dir = image_dir
        self.label_dir = label_dir
        self.transform = transform
        self.image_files = [f for f in os.listdir(image_dir) if f.endswith(('.jpg', '.png', '.jpeg'))]
        
    def __len__(self):
        return len(self.image_files)
    
    def __getitem__(self, idx):
        # Load image
        img_name = self.image_files[idx]
        img_path = os.path.join(self.image_dir, img_name)
        image = Image.open(img_path).convert("RGB")
        
        # Get original dimensions
        orig_width, orig_height = image.size
        
        # Load bounding box from text file
        # Assuming label file has same name as image but with .txt extension
        label_name = os.path.splitext(img_name)[0] + '.txt'
        label_path = os.path.join(self.label_dir, label_name)
        
        boxes = []
        try:
            with open(label_path, 'r') as f:
                # Reading x1, y1, x2, y2 coordinates
                coords = list(map(float, f.read().strip().split()))
                
                # Ensure we have 4 coordinates
                if len(coords) == 4:
                    x1, y1, x2, y2 = coords
                    # Convert to [x1, y1, x2, y2] format and normalize to [0, 1]
                    # If coordinates are already normalized, skip normalization
                    if max(coords) > 1:
                        x1 /= orig_width
                        x2 /= orig_width
                        y1 /= orig_height
                        y2 /= orig_height
                    boxes.append([x1, y1, x2, y2])
                else:
                    print(f"Warning: Incorrect format in {label_path}")
                    # Add a dummy box to avoid errors
                    boxes.append([0.0, 0.0, 1.0, 1.0])
        except FileNotFoundError:
            print(f"Warning: Label file not found: {label_path}")
            # Add a dummy box to avoid errors
            boxes.append([0.0, 0.0, 1.0, 1.0])
        
        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        
        # Create target dictionary
        target = {}
        target["boxes"] = boxes
        target["labels"] = torch.ones((len(boxes),), dtype=torch.int64)  # Class 1 for signature
        target["image_id"] = torch.tensor([idx])
        target["area"] = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
        target["iscrowd"] = torch.zeros((len(boxes),), dtype=torch.int64)
        
        if self.transform:
            image = self.transform(image)
        
        # Denormalize boxes to pixel coordinates for model input
        h, w = image.shape[-2:]
        boxes_denorm = boxes.clone()
        boxes_denorm[:, [0, 2]] *= w
        boxes_denorm[:, [1, 3]] *= h
        target["boxes"] = boxes_denorm
        
        return image, target

In [None]:
data_transform = transforms.Compose([
    transforms.Resize((800, 800)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# Create datasets
train_dataset = SignatureDataset(TRAIN_IMAGES_DIR, TRAIN_LABELS_DIR, transform=data_transform)
test_dataset = SignatureDataset(TEST_IMAGES_DIR, TEST_LABELS_DIR, transform=data_transform)

# Collate function for handling batches of variable size
def collate_fn(batch):
    return tuple(zip(*batch))

# Create data loaders
train_loader = DataLoader(
    train_dataset,
    batch_size=2,
    shuffle=True,
    num_workers=4,
    collate_fn=collate_fn
)

test_loader = DataLoader(
    test_dataset,
    batch_size=1,
    shuffle=False,
    num_workers=4,
    collate_fn=collate_fn
)

# Build Faster R-CNN model with VGG16 backbone
def get_model(num_classes=2):  # Background + Signature
    # Load VGG16 pretrained on ImageNet
    backbone = vgg16(pretrained=True).features
    
    # Fix backbone parameters
    for param in backbone.parameters():
        param.requires_grad = False
    
    # Unfreezing the last conv layers in VGG16
    for layer in list(backbone)[-4:]:
        if hasattr(layer, 'weight'):
            layer.weight.requires_grad = True
        if hasattr(layer, 'bias'):
            layer.bias.requires_grad = True
    
    # VGG16 backbone returns a feature map with 512 channels
    backbone_out_channels = 512
    
    # Define anchor generator
    anchor_generator = AnchorGenerator(
        sizes=((32, 64, 128, 256, 512),),
        aspect_ratios=((0.5, 1.0, 2.0),)
    )
    
    # Define ROI pooler
    roi_pooler = torchvision.ops.MultiScaleRoIAlign(
        featmap_names=['0'],
        output_size=7,
        sampling_ratio=2
    )
    
    # Create Faster R-CNN model
    model = FasterRCNN(
        backbone=backbone,
        num_classes=num_classes,
        rpn_anchor_generator=anchor_generator,
        box_roi_pool=roi_pooler,
        min_size=800,
        max_size=800
    )
    
    return model

# Initialize model
model = get_model()
model.to(device)

# Optimizer
params = [p for p in model.parameters() if p.requires_grad]
optimizer = optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)

# Learning rate scheduler
lr_scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

# Training function
def train_model(model, dataloaders, optimizer, lr_scheduler, num_epochs=10):
    since = time.time()
    
    best_model_wts = copy.deepcopy(model.state_dict())
    best_loss = float('inf')
    
    for epoch in range(num_epochs):
        print(f'Epoch {epoch+1}/{num_epochs}')
        print('-' * 10)
        
        # Each epoch has a training phase
        model.train()
        
        running_loss = 0.0
        
        # Iterate over data
        for images, targets in tqdm(dataloaders):
            images = list(image.to(device) for image in images)
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
            
            # Zero the parameter gradients
            optimizer.zero_grad()
            
            # Forward pass
            loss_dict = model(images, targets)
            losses = sum(loss for loss in loss_dict.values())
            
            # Backward pass
            losses.backward()
            optimizer.step()
            
            # Statistics
            running_loss += losses.item()
        
        epoch_loss = running_loss / len(dataloaders)
        
        print(f'Training Loss: {epoch_loss:.4f}')
        
        # Deep copy the model if it's the best so far
        if epoch_loss < best_loss:
            best_loss = epoch_loss
            best_model_wts = copy.deepcopy(model.state_dict())
        
        # Adjust the learning rate
        lr_scheduler.step()
        
    time_elapsed = time.time() - since
    print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    print(f'Best Loss: {best_loss:.4f}')
    
    # Load best model weights
    model.load_state_dict(best_model_wts)
    return model

# Train the model
num_epochs = 10
model = train_model(model, train_loader, optimizer, lr_scheduler, num_epochs=num_epochs)

# Save the trained model
torch.save(model.state_dict(), os.path.join(DATA_ROOT, 'faster_rcnn_signature_model.pth'))

# Evaluation function
def evaluate_model(model, data_loader):
    model.eval()
    
    with torch.no_grad():
        # Parameters for calculating metrics
        true_positives = 0
        false_positives = 0
        false_negatives = 0
        
        # IoU threshold for considering a detection correct
        iou_threshold = 0.5
        
        for images, targets in tqdm(data_loader):
            images = list(img.to(device) for img in images)
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
            
            # Make predictions
            outputs = model(images)
            
            # Process each image in the batch
            for i, (output, target) in enumerate(zip(outputs, targets)):
                pred_boxes = output['boxes'].cpu()
                pred_scores = output['scores'].cpu()
                target_boxes = target['boxes'].cpu()
                
                # Filter predictions by confidence score
                conf_threshold = 0.5
                keep = pred_scores > conf_threshold
                pred_boxes = pred_boxes[keep]
                
                # If no predictions or no target boxes, update counts
                if len(pred_boxes) == 0 and len(target_boxes) > 0:
                    false_negatives += len(target_boxes)
                    continue
                elif len(target_boxes) == 0 and len(pred_boxes) > 0:
                    false_positives += len(pred_boxes)
                    continue
                elif len(pred_boxes) == 0 and len(target_boxes) == 0:
                    continue
                
                # Calculate IoU for each prediction with each target
                ious = box_iou(pred_boxes, target_boxes)
                max_ious, max_indices = ious.max(dim=1)
                
                # True positives: predictions with IoU > threshold
                tp = (max_ious > iou_threshold).sum().item()
                true_positives += tp
                
                # False positives: predictions with IoU <= threshold
                fp = (max_ious <= iou_threshold).sum().item()
                false_positives += fp
                
                # False negatives: targets without matching predictions
                # A target is matched if any prediction has IoU > threshold with it
                matched_targets = set()
                for j, iou in enumerate(max_ious):
                    if iou > iou_threshold:
                        matched_targets.add(max_indices[j].item())
                
                fn = len(target_boxes) - len(matched_targets)
                false_negatives += fn
        
        # Calculate metrics
        precision = true_positives / (true_positives + false_positives) if (true_positives + false_positives) > 0 else 0
        recall = true_positives / (true_positives + false_negatives) if (true_positives + false_negatives) > 0 else 0
        f1 = 2 * precision * recall / (precision + recall) if (precision + recall) > 0 else 0
        
        print(f"Precision: {precision:.4f}")
        print(f"Recall: {recall:.4f}")
        print(f"F1 Score: {f1:.4f}")
        
        return precision, recall, f1

# Calculate IoU (Intersection over Union)
def box_iou(boxes1, boxes2):
    area1 = (boxes1[:, 2] - boxes1[:, 0]) * (boxes1[:, 3] - boxes1[:, 1])
    area2 = (boxes2[:, 2] - boxes2[:, 0]) * (boxes2[:, 3] - boxes2[:, 1])
    
    lt = torch.max(boxes1[:, None, :2], boxes2[:, :2])  # [N,M,2]
    rb = torch.min(boxes1[:, None, 2:], boxes2[:, 2:])  # [N,M,2]
    
    wh = (rb - lt).clamp(min=0)  # [N,M,2]
    inter = wh[:, :, 0] * wh[:, :, 1]  # [N,M]
    
    union = area1[:, None] + area2 - inter
    
    iou = inter / union
    return iou

# Test the model
print("Evaluating model on test data...")
precision, recall, f1 = evaluate_model(model, test_loader)

# Visualization function
def visualize_results(model, data_loader, num_images=5):
    model.eval()
    
    fig, axs = plt.subplots(num_images, 2, figsize=(15, 5*num_images))
    
    with torch.no_grad():
        for i, (images, targets) in enumerate(data_loader):
            if i >= num_images:
                break
                
            img = images[0].permute(1, 2, 0).cpu().numpy()
            # Denormalize image
            img = img * np.array([0.229, 0.224, 0.225]) + np.array([0.485, 0.456, 0.406])
            img = np.clip(img, 0, 1)
            
            # Get predictions
            images = list(img.to(device) for img in images)
            predictions = model(images)
            
            # Ground truth
            axs[i, 0].imshow(img)
            axs[i, 0].set_title("Ground Truth")
            boxes = targets[0]['boxes'].cpu().numpy()
            for box in boxes:
                x1, y1, x2, y2 = box
                axs[i, 0].add_patch(plt.Rectangle((x1, y1), x2-x1, y2-y1, 
                                               fill=False, edgecolor='red', linewidth=2))
            
            # Predictions
            axs[i, 1].imshow(img)
            axs[i, 1].set_title("Predictions")
            pred_boxes = predictions[0]['boxes'].cpu().numpy()
            pred_scores = predictions[0]['scores'].cpu().numpy()
            
            # Filter by confidence
            keep = pred_scores > 0.5
            pred_boxes = pred_boxes[keep]
            pred_scores = pred_scores[keep]
            
            for j, box in enumerate(pred_boxes):
                x1, y1, x2, y2 = box
                axs[i, 1].add_patch(plt.Rectangle((x1, y1), x2-x1, y2-y1, 
                                               fill=False, edgecolor='blue', linewidth=2))
                axs[i, 1].text(x1, y1, f"Score: {pred_scores[j]:.2f}", 
                             bbox=dict(facecolor='yellow', alpha=0.5))
    
    plt.tight_layout()
    plt.savefig(os.path.join(DATA_ROOT, 'results_visualization.png'))
    plt.show()

# Visualize some results
print("Visualizing detection results...")
visualize_results(model, test_loader, num_images=5)

# Inference function for new images
def predict_signature(model, image_path):
    # Load and preprocess image
    image = Image.open(image_path).convert("RGB")
    transform = transforms.Compose([
        transforms.Resize((800, 800)),
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
    ])
    image_tensor = transform(image).unsqueeze(0).to(device)
    
    # Get prediction
    model.eval()
    with torch.no_grad():
        prediction = model(image_tensor)
    
    # Process prediction
    pred_boxes = prediction[0]['boxes'].cpu().numpy()
    pred_scores = prediction[0]['scores'].cpu().numpy()
    
    # Filter by confidence
    confidence_threshold = 0.5
    keep = pred_scores > confidence_threshold
    pred_boxes = pred_boxes[keep]
    pred_scores = pred_scores[keep]
    
    # Visualize
    plt.figure(figsize=(10, 10))
    img = np.array(image)
    plt.imshow(img)
    
    for i, box in enumerate(pred_boxes):
        x1, y1, x2, y2 = box
        plt.gca().add_patch(plt.Rectangle((x1, y1), x2-x1, y2-y1, 
                                         fill=False, edgecolor='blue', linewidth=2))
        plt.gca().text(x1, y1, f"Score: {pred_scores[i]:.2f}", 
                     bbox=dict(facecolor='yellow', alpha=0.5))
    
    plt.axis('off')
    plt.tight_layout()
    plt.show()
    
    return pred_boxes, pred_scores

# Example usage of inference function
# predict_signature(model, "path/to/new/document.jpg")

print("Faster R-CNN model for signature detection completed successfully!")