In [None]:
import torch
import torchvision

import torchvision.transforms as T
import torchvision.models as models

from torch.utils.data import Subset, DataLoader
import os
import numpy as np
from pycocotools.coco import COCO
from pycocotools.cocoeval import COCOeval
import matplotlib.pyplot as plt
import matplotlib.patches as patches

import cv2
import joblib

from IPython.display import display

import selectivesearch

from sklearn.svm import LinearSVC
from sklearn.preprocessing import StandardScaler
from sklearn.ensemble import GradientBoostingRegressor

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"Using device: {device}")


In [None]:
root_dir = os.path.join(os.path.dirname(os.getcwd()), "data", "coco")
transform = T.ToTensor()

# train_data = torchvision.datasets.CocoDetection(
#     root=os.path.join(root_dir, "train2017"),
#     annFile=os.path.join(root_dir, "annotations", "instances_train2017.json"),
#     transform=transform
# )

val_data = torchvision.datasets.CocoDetection(
    root=os.path.join(root_dir, "val2017"),
    annFile=os.path.join(root_dir, "annotations", "instances_val2017.json"),
    transform=transform
)

In [None]:
def collate_fn(batch):
    return tuple(zip(*batch)) # Unzip the batch

# train_loader = DataLoader(
#     train_data,
#     batch_size=32,
#     shuffle=True,
#     num_workers=0, 
#     collate_fn=collate_fn  
# )

val_loader = DataLoader(
    val_data,
    batch_size=32,
    shuffle=False,
    num_workers=0,
    collate_fn=collate_fn
)

def compute_iou(box1, box2):
    # box1, box2: [x_min, y_min, x_max, y_max]
    x1_1, y1_1, x2_1, y2_1 = box1
    x1_2, y1_2, x2_2, y2_2 = box2
    
    # Compute intersection
    x1_i = max(x1_1, x1_2)
    y1_i = max(y1_1, y1_2)
    x2_i = min(x2_1, x2_2)
    y2_i = min(y2_1, y2_2)
    
    inter_area = max(0, x2_i - x1_i) * max(0, y2_i - y1_i)
    
    # Compute union
    box1_area = (x2_1 - x1_1) * (y2_1 - y1_1)
    box2_area = (x2_2 - x1_2) * (y2_2 - y1_2)
    union_area = box1_area + box2_area - inter_area
    
    return inter_area / union_area if union_area > 0 else 0

def non_max_suppression(boxes, scores, iou_threshold=0.3):
    if len(boxes) == 0:
        return []
    
    # Convert from [x_min, y_min, width, height] to [x_min, y_min, x_max, y_max] if needed
    if boxes.shape[1] == 4:
        if boxes[0, 2] < boxes[0, 0] + boxes[0, 2]:  # Check if format is [x, y, w, h]
            x_min = boxes[:, 0]
            y_min = boxes[:, 1]
            x_max = boxes[:, 0] + boxes[:, 2]
            y_max = boxes[:, 1] + boxes[:, 3]
            boxes = np.stack([x_min, y_min, x_max, y_max], axis=1)
    
    # Sort by score
    sorted_indices = np.argsort(scores)[::-1]  
    keep_indices = []
    
    while len(sorted_indices) > 0:
        # Pick the box with highest score
        current_index = sorted_indices[0]
        keep_indices.append(current_index)
        
        if len(sorted_indices) == 1:
            break
            
        # Get IoU of the current box with the remaining boxes
        current_box = boxes[current_index]
        other_boxes = boxes[sorted_indices[1:]]
        
        # Calculate IoU with remaining boxes
        ious = np.array([compute_iou(current_box, box) for box in other_boxes])
        
        # Keep boxes with IoU less than threshold
        mask = ious < iou_threshold
        sorted_indices = sorted_indices[1:][mask]
    
    return keep_indices

def get_region_proposals(image, scales=[200, 300, 400, 500, 600, 800], sigma=0.9, min_size=5, max_proposals=3000):
    proposals = []

    for scale in scales:
        img_lbl, regions = selectivesearch.selective_search(
            image, scale=scale, sigma=sigma, min_size=min_size)
    
        for r in regions:
            x, y, w, h = r['rect']

            if w < 10 or h < 10 or w * h < 200 or w/h > 5 or h/w > 5:
                continue
            proposals.append([x, y, x + w, y + h])
    
    # Remove duplicates based on IoU
    if len(proposals) > 0:
        proposals = np.array(proposals)
        scores = np.ones(len(proposals))  # Dummy scores for NMS
        keep_indices = non_max_suppression(proposals, scores, iou_threshold=0.6)
        proposals = proposals[keep_indices]
    
    return proposals[:max_proposals]

def select_regions(proposals, gt_boxes, iou_pos=0.4, iou_neg=0.1):
    positive_regions = []
    negative_regions = []
    
    for proposal in proposals:
        max_iou = 0
        for gt_box in gt_boxes:
            iou = compute_iou(proposal, gt_box)
            max_iou = max(max_iou, iou)
        
        if max_iou >= iou_pos:
            positive_regions.append(proposal)
        elif max_iou < iou_neg:
            negative_regions.append(proposal)
    
    return positive_regions, negative_regions

cnn = models.resnet50(weights=models.ResNet50_Weights.IMAGENET1K_V1)

cnn = torch.nn.Sequential(
    *list(cnn.children())[:-1]  # Remove the adaptive pooling and classification layers
)
cnn.eval()
cnn.to(device)

preprocess = torchvision.transforms.Compose([
    torchvision.transforms.ToPILImage(), 
    torchvision.transforms.Resize((256)),  
    torchvision.transforms.CenterCrop((224)),
    torchvision.transforms.ToTensor(),
    torchvision.transforms.Normalize(mean=[0.485, 0.456, 0.406], # Commonly used values for ImageNet https://docs.pytorch.org/vision/0.8/models.html
                                     std=[0.229, 0.224, 0.225])  
])

def extract_features(image, regions, cnn, preprocess, device):
    """
    Extract 1280-dim features from regions using ResNet.
    
    Args:
        image: Numpy array [H, W, 3] in BGR format
        regions: Numpy array [N, 4] with [x_min, y_min, x_max, y_max]
        cnn: Pre-trained ResNet model
        preprocess: Transform pipeline
        device: torch.device
    
    Returns:
        Numpy array [N, 1280] with features
    """
    features = []
    for region in regions:
        x_min, y_min, x_max, y_max = region.astype(int) 

        # Ensure the region is within the image bounds
        x_min, x_max = max(0, x_min), min(image.shape[1], x_max)
        y_min, y_max = max(0, y_min), min(image.shape[0], y_max)

        if x_max <= x_min or y_max <= y_min:
            continue
        region_img = image[y_min:y_max, x_min:x_max, :]
        region_img = cv2.cvtColor(region_img, cv2.COLOR_BGR2RGB)  # Convert BGR to RGB
        region_tensor = preprocess(region_img).unsqueeze(0).to(device)
        with torch.no_grad():
            feature = cnn(region_tensor).squeeze()  # [1280]
        features.append(feature.cpu().numpy())
    return np.array(features) if features else np.empty((0, 1280))

def prepare_training_data(train_loader, cnn, preprocess, device, start_image, max_images=2500):
    """
    Prepare features and targets for SVM and bounding box regression.
    
    Returns:
        features: [N, 1280] array of CNN features
        labels: [N] array of class labels (1 for human, 0 for background)
        bboxes_gt: [N, 4] array of ground-truth boxes for positive regions
        bboxes_pred: [N, 4] array of proposed boxes for positive regions
    """
    all_features = []
    all_labels = []
    all_bboxes_gt = []
    all_bboxes_pred = []
    pos_matched_features = []

    subset_indices = list(range(start_image, min(start_image + max_images, len(train_data))))
    subset_dataset = Subset(train_loader.dataset, subset_indices)
    subset_loader = DataLoader(subset_dataset, batch_size=1, shuffle=False, collate_fn=collate_fn)

    image_count = 0
    for i, (images, targets) in enumerate(subset_loader):
        
        if image_count >= max_images:
            break
            
        image_count += 1
        actual_image_idx = start_image + i  # For logging purposes
        print(f"Processing image {actual_image_idx} (batch progress: {image_count}/{max_images})")

        print(f"Image {actual_image_idx}, targets count: {len(targets[0])}")
        
        image = images[0].permute(1, 2, 0).numpy() * 255
        image = image.astype(np.uint8)
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)
        
        gt_boxes = []
        for ann in targets[0]:
            if isinstance(ann, dict) and ann.get('category_id') == 1:  # Person class
                if 'bbox' in ann and len(ann['bbox']) == 4:
                    x, y, w, h = ann['bbox']
                    gt_boxes.append([float(x), float(y), float(x + w), float(y + h)])
        
        gt_boxes = np.array(gt_boxes)
        
        if len(gt_boxes) == 0:
            print(f"No ground truth boxes found for image {actual_image_idx}")
            continue

        print(f"Found {len(gt_boxes)} ground truth person boxes")
        
        proposals = get_region_proposals(image)
        print(f"Generated {len(proposals)} region proposals")

        pos_regions, neg_regions = select_regions(proposals, gt_boxes)
        print(f"Selected {len(pos_regions)} positive and {len(neg_regions)} negative regions") 

        pos_regions = pos_regions[:min(len(pos_regions), 600)]  
        neg_regions = neg_regions[:min(len(neg_regions), int(len(pos_regions)*3))] 
        
        regions = pos_regions + neg_regions
        labels = [1] * len(pos_regions) + [0] * len(neg_regions)
        
        if not regions:
            print(f"No regions found for image {actual_image_idx}")
            continue
        
        features = extract_features(image, regions, cnn, preprocess, device) # [N, 1280]
        if features.shape[0] != len(labels):
            print(f"Feature shape mismatch for image {actual_image_idx}: {features.shape[0]} vs {len(labels)}")
            continue
        
        # Match positive regions to ground-truth boxes for regression
        pos_bboxes_pred = []
        pos_bboxes_gt = []
        pos_features_for_regression = []

        for pos_idx, pred_box in enumerate(pos_regions):
            max_iou = 0
            best_gt = None
            for gt_box in gt_boxes:
                iou = compute_iou(pred_box, gt_box)
                if iou > max_iou:
                    max_iou = iou
                    best_gt = gt_box 

            if max_iou >= 0.5:
                pos_bboxes_pred.append(pred_box)
                pos_bboxes_gt.append(best_gt)
                
                if pos_idx < features.shape[0]:
                    pos_features_for_regression.append(features[pos_idx])

        if len(regions) > 0:
            pos_matched_features.append(pos_features_for_regression)
            all_features.append(features)
            all_labels.append(labels)
            all_bboxes_gt.extend(pos_bboxes_gt)
            all_bboxes_pred.extend(pos_bboxes_pred)
    
    if not all_features:
        print("No images with valid features found in this batch. Returning empty arrays.")
        return (np.empty((0, 1280)),  # Empty features array with right dimension
                np.array([]),         # Empty labels
                np.array([]),         # Empty gt boxes
                np.array([]),         # Empty pred boxes
                np.empty((0, 1280)))  # Empty matched features
    
    print(f"Total processed images with features: {len(all_features)}")
    print(f"Feature shapes: {[f.shape for f in all_features]}")
    
    all_features = np.vstack(all_features)
    all_labels = np.concatenate(all_labels)
    all_bboxes_gt = np.array(all_bboxes_gt)
    all_bboxes_pred = np.array(all_bboxes_pred)
    
    flat_pos_features = []
    for features_list in pos_matched_features:
        flat_pos_features.extend(features_list)
    
    pos_matched_features_array = np.array(flat_pos_features) if flat_pos_features else np.empty((0, 1280))
    
    print(f"Final shapes - Features: {all_features.shape}, Labels: {all_labels.shape}")
    print(f"Positive samples: {np.sum(all_labels)}, Negative samples: {np.sum(all_labels == 0)}")
    
    return all_features, all_labels, all_bboxes_gt, all_bboxes_pred, pos_matched_features_array

def compute_offsets(pred_box, gt_box):
    """Compute normalized offsets for regression."""
    x_min_p, y_min_p, x_max_p, y_max_p = pred_box
    x_min_g, y_min_g, x_max_g, y_max_g = gt_box
    w_p, h_p = x_max_p - x_min_p, y_max_p - y_min_p
    w_g, h_g = x_max_g - x_min_g, y_max_g - y_min_g
    dx = (x_min_g - x_min_p) / w_p
    dy = (y_min_g - y_min_p) / h_p
    dw = np.log(w_g / w_p)
    dh = np.log(h_g / h_p)
    return [dx, dy, dw, dh]

def train_bbox_regressor(features, bboxes_pred, bboxes_gt):
    offsets = np.array([compute_offsets(pred, gt) for pred, gt in zip(bboxes_pred, bboxes_gt)])
    regressors = []
    for i in range(4):  # dx, dy, dw, dh
        regr = GradientBoostingRegressor(
            n_estimators=100, 
            learning_rate=0.1,
            max_depth=3
        )
        regr.fit(features, offsets[:, i])
        regressors.append(regr)
        
    return regressors 

# Preprocess features
def preprocess_features(features, labels):
    valid_mask = ~np.isnan(features).any(axis=1) & (features != 0).any(axis=1)
    features = features[valid_mask]
    labels = labels[valid_mask]
    scaler = StandardScaler()
    features_scaled = scaler.fit_transform(features)
    return features_scaled, labels, scaler

# Train SVM
def train_svm(features, labels):
    features_scaled, labels, scaler = preprocess_features(features, labels)
    svm = LinearSVC(C=1.0, max_iter=2000, class_weight='balanced')
    svm.fit(features_scaled, labels)

    return svm, scaler
def visualize_detections(image, detections, gt_boxes=None, threshold=0.5, save_path=None):

    # Convert BGR to RGB for display
    image_rgb = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    
    # Create figure and axis
    fig, ax = plt.subplots(1, figsize=(12, 9))
    ax.imshow(image_rgb)
    
    # Draw detections
    for det in detections:
        if det['score'] < threshold:
            continue
            
        x, y, w, h = det['bbox']
        rect = patches.Rectangle((x, y), w, h, linewidth=2, edgecolor='r', facecolor='none')
        ax.add_patch(rect)
        ax.text(x, y-10, f"Person: {det['score']:.2f}", color='red', fontsize=10, weight='bold')
    
    if gt_boxes is not None:
            for box in gt_boxes:
                # COCO ground truth boxes are in [x, y, w, h] format
                if len(box) == 4:
                    x, y, w, h = box  
                    rect = patches.Rectangle((x, y), w, h, linewidth=2, edgecolor='g', facecolor='none')
                    ax.add_patch(rect)
    
    # Add legend
    red_patch = patches.Patch(color='red', label='Detection')
    if gt_boxes is not None:
        green_patch = patches.Patch(color='green', label='Ground Truth')
        ax.legend(handles=[red_patch, green_patch], loc='upper right')
    else:
        ax.legend(handles=[red_patch], loc='upper right')
    
    ax.set_title(f"Human Detections (threshold={threshold})")
    ax.axis('off')
    
    # Save or show
    if save_path:
        plt.savefig(save_path, bbox_inches='tight', dpi=150)
    plt.show()
    plt.close()

def detect_humans(image, cnn, preprocess, svm, scaler, regressors, device, conf_threshold=0.1, nms_threshold=0.5):
    proposals = get_region_proposals(image)
    features = extract_features(image, proposals, cnn, preprocess, device)
    
    if features.shape[0] == 0:
        return []
    
    # classify using SVM
    features_scaled = scaler.transform(features)
    scores = svm.decision_function(features_scaled)

    print(f"SVM scores: min={scores.min() if scores.size > 0 else 'N/A'}, max={scores.max() if scores.size > 0 else 'N/A'}")
    print(f"Number of scores > threshold ({conf_threshold}): {np.sum(scores > conf_threshold) if scores.size > 0 else 0}")

   # Apply regression to positive regions
    candidate_boxes = []
    candidate_scores = []
    
    for i, (score, feature, proposal) in enumerate(zip(scores, features_scaled, proposals)):
        if score > conf_threshold:

            x_min, y_min, x_max, y_max = proposal
            w, h = x_max - x_min, y_max - y_min

            # Predict each coordinate separately
            offsets = [reg.predict(feature.reshape(1, -1))[0] for reg in regressors]
            dx, dy, dw, dh = offsets
            
            if any(abs(val) > 8 for val in offsets):
                continue
                
            # Apply offsets
            x_min_refined = x_min + dx * w
            y_min_refined = y_min + dy * h
            w_refined = w * np.exp(dw)
            h_refined = h * np.exp(dh)

            x_max_refined = x_min_refined + w_refined
            y_max_refined = y_min_refined + h_refined
            
            # Skip invalid boxes
            if w_refined <= 0 or h_refined <= 0:
                continue
            # Clip to image bounds
            x_min_refined = np.clip(x_min_refined, 0, image.shape[1])
            y_min_refined = np.clip(y_min_refined, 0, image.shape[0])
            x_max_refined = np.clip(x_max_refined, 0, image.shape[1])
            y_max_refined = np.clip(y_max_refined, 0, image.shape[0])
            
            # Store the candidate box in [x_min, y_min, x_max, y_max] format for NMS
            candidate_boxes.append([x_min_refined, y_min_refined, x_max_refined, y_max_refined])
            candidate_scores.append(float(score))
    
    if not candidate_boxes:
        return []
    
    # Apply NMS
    candidate_boxes = np.array(candidate_boxes)
    candidate_scores = np.array(candidate_scores)
    keep_indices = non_max_suppression(candidate_boxes, candidate_scores, iou_threshold=nms_threshold)
    
    # Create final detections
    detections = []
    for idx in keep_indices:
        x_min, y_min, x_max, y_max = candidate_boxes[idx]
        # Convert to COCO format [x, y, width, height]
        width = x_max - x_min
        height = y_max - y_min
        bbox = [x_min, y_min, width, height]
        detections.append({
            'bbox': bbox,
            'score': float(candidate_scores[idx]),
            'category_id': 1  # Person
        })
    
    return detections

In [None]:
def train_model(train_loader, cnn, preprocess, device):

    batch_size = 500 
    max_images = 2500
    
    all_features = None
    all_labels = None
    all_bboxes_gt = []
    all_bboxes_pred = []
    all_pos_features = []
    
    # Process in batches
    for start_idx in range(0, max_images, batch_size):
        end_idx = min(start_idx + batch_size, max_images)
        print(f"Processing batch of images {start_idx+1}-{end_idx}/{max_images}")
        
        features, labels, bboxes_gt, bboxes_pred, pos_features = prepare_training_data(
            train_loader, cnn, preprocess, device, 
            start_image=start_idx, 
            max_images=batch_size
        )
        
        if all_features is None:
            all_features = features
            all_labels = labels
        else:
            all_features = np.vstack([all_features, features])
            all_labels = np.concatenate([all_labels, labels])
            
        all_bboxes_gt.extend(bboxes_gt)
        all_bboxes_pred.extend(bboxes_pred)
        all_pos_features.extend(pos_features)
        
        # Free memory
        del features, labels, bboxes_gt, bboxes_pred
        import gc
        gc.collect()
        

    all_bboxes_gt = np.array(all_bboxes_gt)
    all_bboxes_pred = np.array(all_bboxes_pred)
    all_pos_features = np.array(all_pos_features)
    
    # Train models
    print("Training SVM classifier...")
    svm, scaler = train_svm(all_features, all_labels)
    joblib.dump(scaler, 'scaler.pkl')
    joblib.dump(svm, 'svm.pkl')
    print("SVM training completed. Models saved.")


    # Free memory before training regressor
    del all_features, all_labels
    gc.collect()
    
    print("Training bounding box regressor...")
    
    regressor = train_bbox_regressor(all_pos_features, all_bboxes_pred, all_bboxes_gt)
    joblib.dump(regressor, 'bbox_regressor.pkl')
    print("Bounding box regressor training completed. Model saved.")

    return svm, scaler, regressor

svm, scaler, regressor = train_model(train_loader, cnn, preprocess, device)
print("Model training completed.")

In [None]:
def test_model(val_data_loader, cnn, preprocess, svm, scaler, regressor, device, max_images=5):
    all_detections = []
    image_ids = []

    for i, (images, targets) in enumerate(val_data_loader):
        if i >= max_images:
            break
        print(f"Testing image {i+1}/{max_images}")

        image_id = None
        if targets[0] and len(targets[0]) > 0:
            for ann in targets[0]:
                if isinstance(ann, dict) and 'image_id' in ann:
                    image_id = ann['image_id']
                    break


        image = images[0].permute(1, 2, 0).numpy() * 255
        image = image.astype(np.uint8)
        image = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)


        conf_threshold = 0.4 
        nms_threshold = 0.25

        detections = detect_humans(image, cnn, preprocess, svm, scaler, regressor, device, conf_threshold, nms_threshold)

        for det in detections:
            det['image_id'] = image_id

        all_detections.extend(detections)
        image_ids.append(image_id)
     
        gt_boxes = []
        for ann in targets[0]:
            if isinstance(ann, dict) and ann.get('category_id') == 1:
                if 'bbox' in ann:
                    gt_boxes.append(ann['bbox'])
                                    
        visualize_detections(image, detections, gt_boxes, threshold=0.1)

    return all_detections, image_ids

# Run testing

svm = joblib.load('svm.pkl')
scaler = joblib.load('scaler.pkl')
regressor = joblib.load('bbox_regressor.pkl')

print("Models loaded successfully")

all_detections, image_ids = test_model(val_loader, cnn, preprocess, svm, scaler, regressor, device, max_images=100)

In [None]:
# Use COCO evaluation
coco_gt = COCO(os.path.join(root_dir, "annotations", "instances_val2017.json"))
coco_dt = coco_gt.loadRes(all_detections)
coco_eval = COCOeval(coco_gt, coco_dt, 'bbox')
coco_eval.params.catIds = [1]  # Person class
coco_eval.evaluate()
coco_eval.accumulate()
coco_eval.summarize()
print("mAP@0.5:0.95:", coco_eval.stats[0])
print("mAP@0.5:", coco_eval.stats[1])