In [None]:
import os
import torch
import matplotlib.pyplot as plt
import random
import cv2
import numpy as np
from dataset.st import SceneTextDataset
from torch.utils.data.dataloader import DataLoader
import detection
from detection.faster_rcnn import FastRCNNPredictor
from dataset.st import SceneTextDataset
import yaml
from tqdm import tqdm
from detection.image_list import ImageList


In [2]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [3]:
def visualize_dataset(save_dir='dataset_visualization'):
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)
    
    st_dataset = SceneTextDataset('train', root_dir='./Q1')
    
    print(f"Dataset size: {len(st_dataset)} samples")
    print(f"Label mapping: {st_dataset.idx2label}")
    
    num_samples = min(1, len(st_dataset))
    
    for i in range(num_samples):
        idx = random.randint(0, len(st_dataset) - 1)
        image_tensor, target, image_path = st_dataset[idx]
        print(target, image_path)
        image = image_tensor.permute(1, 2, 0).numpy()
        
        plt.figure(figsize=(10, 8))
        plt.imshow(image)
        
        for bbox in target['bboxes']:
            x1, y1, x2, y2 = bbox.numpy()
            plt.gca().add_patch(plt.Rectangle((x1, y1), x2-x1, y2-y1, 
                                fill=False, edgecolor='red', linewidth=2))
        
        plt.title(f"Image: {os.path.basename(image_path)}")
        plt.savefig(os.path.join(save_dir, f"sample_{i}.png"))
        plt.close()
        
        print(f"Sample {i} - {os.path.basename(image_path)}:")
        
        json_path = './Q1/annots/' + os.path.basename(image_path) + '.json'
        print(json_path)
        if os.path.exists(json_path):
            with open(json_path, 'r') as f:
                import json
                data = json.load(f)
                print(f"Number of objects: {len(data['objects'])}")
                
                for j, obj in enumerate(data['objects'][:2]):
                    print(f"  Object {j}: {obj['obb']}")
        
        print("\n")

visualize_dataset()

{0: 'background', 1: 'text'}
Dataset size: 1003 samples
Label mapping: {0: 'background', 1: 'text'}
{'bboxes': tensor([[262.0000, 363.0000, 345.0000, 388.0000],
        [275.0000, 394.0000, 333.0000, 419.0000],
        [276.0826, 598.5223, 440.5027, 676.4273],
        [441.4814, 592.0452, 872.1489, 678.2363],
        [245.5036, 728.4973, 503.5211, 812.3753],
        [661.8611, 740.7100, 919.8947, 823.8577],
        [512.7561, 308.7530, 581.8213, 364.5263]]), 'labels': tensor([1, 1, 1, 1, 1, 1, 1])} ./Q1/img/img742.jpg
Sample 0 - img742.jpg:
./Q1/annots/img742.jpg.json
Number of objects: 7
  Object 0: {'xc': 303.4999694824219, 'yc': 375.4999694824219, 'w': 82.99998474121094, 'h': 24.999996185302734, 'theta': -0.0}
  Object 1: {'xc': 304.0, 'yc': 406.5, 'w': 58.0, 'h': 25.0, 'theta': 180.0}




In [None]:
def load_dataset_sample(config_path):
    with open(config_path, 'r') as f:
        config = yaml.safe_load(f)
    
    dataset = SceneTextDataset('test', root_dir=config['dataset_params']['root_dir'])
    
    if len(dataset) > 0:
        image, target, image_path = dataset[0]
        return image, target, image_path, config
    else:
        raise ValueError("Dataset is empty. Check dataset path.")

def setup_model(config):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    
    model = detection.fasterrcnn_resnet50_fpn(
        pretrained=True,  
        min_size=config['model_params'].get('min_im_size', 600),
        max_size=config['model_params'].get('max_im_size', 1000)
    )
    
    model.roi_heads.box_predictor = FastRCNNPredictor(
        model.roi_heads.box_predictor.cls_score.in_features,
        num_classes=config['dataset_params']['num_classes']
    )
    
    model.to(device)
    return model, device

In [None]:
def visualize_objectness_maps(model, image, device, output_dir, iteration):
    os.makedirs(output_dir, exist_ok=True)
    
    img_tensor = image.to(device)    
    model.eval()
    
    with torch.no_grad():
        images = ImageList(img_tensor.unsqueeze(0), [(img_tensor.shape[1], img_tensor.shape[2])])
        features = model.backbone(images.tensors)
        objectness, _ = model.rpn.head([features[k] for k in features.keys()])
    
    n_levels = len(objectness)
    fig, axes = plt.subplots(1, n_levels, figsize=(n_levels*5, 5))
    
    if n_levels == 1:
        axes = [axes]
    
    for level, (ax, obj) in enumerate(zip(axes, objectness)):
        obj_prob = torch.sigmoid(obj.detach().cpu())
        obj_sum = obj_prob[0].sum(dim=0)
        obj_norm = (obj_sum - obj_sum.min()) / (obj_sum.max() - obj_sum.min() + 1e-8)
        
        im = ax.imshow(obj_norm.numpy(), cmap='hot', interpolation='nearest')
        ax.set_title(f'Level {level+1}')
        fig.colorbar(im, ax=ax)
    
    fig.suptitle(f'RPN Objectness Maps - Iteration {iteration}')
    plt.tight_layout()
    
    plt.savefig(os.path.join(output_dir, f'objectness_iter{iteration}.png'))
    plt.close()

def visualize_proposals(model, image, image_path, device, output_dir, iteration):
    os.makedirs(output_dir, exist_ok=True)
    
    img_tensor = image.to(device)
    model.eval()
    
    images = ImageList(img_tensor.unsqueeze(0), [(img_tensor.shape[1], img_tensor.shape[2])])
    
    with torch.no_grad():
        features = model.backbone(images.tensors)
        proposals, _ = model.rpn(images, features)
    
    original_img = cv2.imread(image_path)
    original_rgb = cv2.cvtColor(original_img, cv2.COLOR_BGR2RGB)
    
    proposals_img = original_rgb.copy()
    
    max_proposals = min(50, len(proposals[0]))
    for i in range(max_proposals):
        box = proposals[0][i].detach().cpu().numpy().astype(np.int32)
        cv2.rectangle(proposals_img, (box[0], box[1]), (box[2], box[3]), (0, 255, 0), 2)
    
    plt.figure(figsize=(10, 8))
    plt.imshow(proposals_img)
    plt.title(f'RPN Proposals (top {max_proposals}) - Iteration {iteration}')
    plt.axis('off')
    
    plt.savefig(os.path.join(output_dir, f'proposals_iter{iteration}.png'))
    plt.close()

def visualize_anchor_assignments(model, image, target, image_path, device, output_dir, iteration):
    os.makedirs(output_dir, exist_ok=True)
    
    model.train()
    
    img_tensor = image.to(device)
    
    target_dict = {
        'boxes': target['bboxes'].to(device),
        'labels': target['labels'].to(device)
    }
    targets = [target_dict]
    
    images = [img_tensor]
    images, targets = model.transform(images, targets)
    features = model.backbone(images.tensors)
    
    feature_maps = [features[k] for k in features.keys()]
    anchors = model.rpn.anchor_generator(images, feature_maps)
    labels, matched_gt_boxes = model.rpn.assign_targets_to_anchors(anchors, targets)
    
    sampled_pos_inds, sampled_neg_inds = model.rpn.fg_bg_sampler(labels)
    
    pos_anchors = []
    neg_anchors = []
    
    for img_idx, (pos_inds_img, neg_inds_img) in enumerate(zip(sampled_pos_inds, sampled_neg_inds)):
        pos_idx = torch.where(pos_inds_img)[0]
        neg_idx = torch.where(neg_inds_img)[0]
        
        pos_idx = pos_idx[:10] if len(pos_idx) > 10 else pos_idx
        neg_idx = neg_idx[:10] if len(neg_idx) > 10 else neg_idx
        
        pos_anchors.append(anchors[img_idx][pos_idx].detach().cpu())
        neg_anchors.append(anchors[img_idx][neg_idx].detach().cpu())
    
    original_img = cv2.imread(image_path)
    original_rgb = cv2.cvtColor(original_img, cv2.COLOR_BGR2RGB)
    
    anchors_img = original_rgb.copy()
    
    for anchor in pos_anchors[0]:
        box = anchor.numpy().astype(np.int32)
        cv2.rectangle(anchors_img, (box[0], box[1]), (box[2], box[3]), (0, 255, 0), 2)
    
    for anchor in neg_anchors[0]:
        box = anchor.numpy().astype(np.int32)
        cv2.rectangle(anchors_img, (box[0], box[1]), (box[2], box[3]), (255, 0, 0), 2)
    
    for box in target['bboxes']:
        box = box.numpy().astype(np.int32)
        cv2.rectangle(anchors_img, (box[0], box[1]), (box[2], box[3]), (0, 0, 255), 2)
    
    plt.figure(figsize=(10, 8))
    plt.imshow(anchors_img)
    plt.title(f'Anchor Assignments - Iteration {iteration}\nGreen: Positive, Red: Negative, Blue: GT')
    plt.axis('off')
    
    plt.savefig(os.path.join(output_dir, f'anchors_iter{iteration}.png'))
    plt.close()
    
    model.eval()

In [None]:

def create_videos(image_dir, output_dir, fps=1):
    os.makedirs(output_dir, exist_ok=True)
    
    viz_types = ['objectness', 'proposals', 'anchors']
    
    for viz_type in viz_types:
        images = sorted([f for f in os.listdir(image_dir) 
                      if f.startswith(viz_type) and f.endswith('.png')])
        
        if not images:
            continue
            
        images.sort(key=lambda x: int(x.split('_iter')[1].split('.')[0]))
        
        first_img = cv2.imread(os.path.join(image_dir, images[0]))
        height, width = first_img.shape[:2]
        
        video_path = os.path.join(output_dir, f'{viz_type}_video.mp4')
        fourcc = cv2.VideoWriter_fourcc(*'mp4v')
        video = cv2.VideoWriter(video_path, fourcc, fps, (width, height))
        
        for img_file in images:
            img = cv2.imread(os.path.join(image_dir, img_file))
            video.write(img)
        
        video.release()
        print(f"Created video: {video_path}")

def simulate_training_steps(model, image, target, image_path, device, output_dir, num_steps=5):
    os.makedirs(output_dir, exist_ok=True)
    objectness_dir = os.path.join(output_dir, 'objectness')
    proposals_dir = os.path.join(output_dir, 'proposals')
    anchors_dir = os.path.join(output_dir, 'anchors')
    os.makedirs(objectness_dir, exist_ok=True)
    os.makedirs(proposals_dir, exist_ok=True)
    os.makedirs(anchors_dir, exist_ok=True)
    
    print("Visualizing initial state...")
    visualize_objectness_maps(model, image, device, objectness_dir, 0)
    visualize_proposals(model, image, image_path, device, proposals_dir, 0)
    visualize_anchor_assignments(model, image, target, image_path, device, anchors_dir, 0)
    
    optimizer = torch.optim.SGD(
        filter(lambda p: p.requires_grad, model.parameters()),
        lr=0.005,
        momentum=0.9,
        weight_decay=0.0005
    )
    
    model.train()
    for step in range(1, num_steps + 1):
        print(f"Simulating training step {step}...")
        images = [image.to(device)]
        targets_list = [{
            'boxes': target['bboxes'].to(device),
            'labels': target['labels'].to(device)
        }]
        
        loss_dict = model(images, targets_list)
        
        if isinstance(loss_dict, dict):
            losses = sum(loss for loss in loss_dict.values())
        elif isinstance(loss_dict, list):
            print("Warning: Model returned detections instead of losses. Setting model to train mode.")
            model.train()
            loss_dict = model(images, targets_list)
            losses = sum(loss for loss in loss_dict.values())
        else:
            print("Warning: Unexpected loss format. Computing losses manually.")
            images_t, targets_t = model.transform(images, targets_list)
            
            features = model.backbone(images_t.tensors)
            
            proposals, rpn_losses = model.rpn(images_t, features, targets_t)
            
            detections, detector_losses = model.roi_heads(
                features, proposals, images_t.image_sizes, targets_t)
            
            losses = sum(loss for loss in rpn_losses.values())
            losses += sum(loss for loss in detector_losses.values())
        
        optimizer.zero_grad()
        losses.backward()
        optimizer.step()
        
        visualize_objectness_maps(model, image, device, objectness_dir, step)
        visualize_proposals(model, image, image_path, device, proposals_dir, step)
        visualize_anchor_assignments(model, image, target, image_path, device, anchors_dir, step)
    
    print("Creating videos...")
    create_videos(objectness_dir, output_dir)
    create_videos(proposals_dir, output_dir)
    create_videos(anchors_dir, output_dir)
    
    print(f"Visualization completed. Results saved to {output_dir}")

In [None]:
config_path = 'config/st.yaml'
output_dir = 'rpn_visualization_results'
try:
    image, target, image_path, config = load_dataset_sample(config_path)
    print(f"Loaded image: {image_path}")
    
    model, device = setup_model(config)
    print(f"Model created on device: {device}")
    
    simulate_training_steps(model, image, target, image_path, device, output_dir)
    
except Exception as e:
    print(f"Error: {str(e)}")
    import traceback
    traceback.print_exc()

{0: 'background', 1: 'text'}
Loaded image: ./Q1/img/img47.jpg
Model created on device: cuda
Visualizing initial state...
Simulating training step 1...
Simulating training step 2...
Simulating training step 3...
Simulating training step 4...
Simulating training step 5...
Creating videos...
Created video: rpn_visualization_results/objectness_video.mp4
Created video: rpn_visualization_results/proposals_video.mp4
Created video: rpn_visualization_results/anchors_video.mp4
Visualization completed. Results saved to rpn_visualization_results
