In [None]:
import torch
import time 
import torchvision
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.rpn import AnchorGenerator
from torch.utils.data import Dataset, DataLoader
import yaml
from PIL import Image
import os

# Config
data_yaml = 'data.yaml'  # Your dataset config
batch_size = 2
image_size = 512
epochs = 20

# Dataset
class YOLODataset(Dataset):
    def __init__(self, yaml_path, mode='train'):
        with open(yaml_path) as f:
            data = yaml.safe_load(f)
        
        self.img_dir = os.path.join(data['path'], data[mode])
        self.label_dir = self.img_dir.replace('images', 'labels')
        self.images = [f for f in os.listdir(self.img_dir) 
                      if f.endswith(('.jpg', '.png', '.jpeg'))]
        self.classes = data['names']
        
    def __len__(self):
        return len(self.images)
    
    def __getitem__(self, idx):
        # Load image
        img_path = os.path.join(self.img_dir, self.images[idx])
        img = Image.open(img_path).convert('RGB')
        img = torchvision.transforms.functional.to_tensor(img)
        img = torchvision.transforms.functional.resize(img, [image_size]*2)
        
        # Load labels
        label_path = os.path.join(self.label_dir, 
                                os.path.splitext(self.images[idx])[0] + '.txt')
        boxes, labels = [], []
        
        if os.path.exists(label_path):
            with open(label_path) as f:
                for line in f:
                    class_id, xc, yc, w, h = map(float, line.strip().split())
                    # Convert YOLO to Pascal VOC
                    x1 = (xc - w/2) * image_size
                    y1 = (yc - h/2) * image_size
                    x2 = (xc + w/2) * image_size
                    y2 = (yc + h/2) * image_size
                    boxes.append([x1, y1, x2, y2])
                    labels.append(int(class_id) + 1)  # +1 because background is class 0
        
        target = {
            'boxes': torch.tensor(boxes, dtype=torch.float32),
            'labels': torch.tensor(labels, dtype=torch.int64),
            'image_id': torch.tensor([idx]),
            'area': (torch.tensor(boxes)[:, 3] - torch.tensor(boxes)[:, 1]) * 
                    (torch.tensor(boxes)[:, 2] - torch.tensor(boxes)[:, 0]),
            'iscrowd': torch.zeros(len(labels), dtype=torch.int64)
        }
        
        return img, target

# Model
def create_model(num_classes):
    backbone = torchvision.models.mobilenet_v2(weights='DEFAULT').features
    backbone.out_channels = 1280  # MobilenetV2 feature dimension
    
    anchor_generator = AnchorGenerator(
        sizes=((32, 64, 128, 256),),
        aspect_ratios=((0.5, 1.0, 2.0),)
    )
    
    roi_pooler = torchvision.ops.MultiScaleRoIAlign(
        featmap_names=['0'],
        output_size=7,
        sampling_ratio=2
    )
    
    return FasterRCNN(
        backbone,
        num_classes=num_classes,
        rpn_anchor_generator=anchor_generator,
        box_roi_pool=roi_pooler
    )

# Training
def train():
    # Data
    train_set = YOLODataset(data_yaml, 'train')
    train_loader = DataLoader(
        train_set,
        batch_size=batch_size,
        shuffle=True,
        collate_fn=lambda x: tuple(zip(*x))
    )
    
    # Model
    model = create_model(len(train_set.classes) + 1).to('cuda')
    optimizer = torch.optim.AdamW(model.parameters(), lr=0.001)
    
    print(f"\n🚀 Starting training on {len(train_set)} images")
    print(f"📦 Batch size: {batch_size} | 🔄 Total batches: {len(train_loader)}")
    print(f"🔥 Epochs: {epochs} | 💻 Device: {next(model.parameters()).device}\n")

    # Training loop
    for epoch in range(epochs):
        model.train()
        epoch_loss = 0
        start_time = time.time()
        
        for batch_idx, (images, targets) in enumerate(train_loader):
            batch_start = time.time()

            # Move to GPU
            images = [img.to('cuda') for img in images]
            targets = [{k: v.to('cuda') for k, v in t.items()} for t in targets]
            
            # Forward + backward
            optimizer.zero_grad()
            loss_dict = model(images, targets)
            losses = sum(loss for loss in loss_dict.values())

            losses.backward()
            optimizer.step()

            batch_time = time.time() - batch_start
            epoch_loss += losses.item()
            avg_loss = epoch_loss / (batch_idx + 1)

            print(
                f"\rEpoch {epoch+1}/{epochs} | "
                f"Batch {batch_idx+1}/{len(train_loader)} | "
                f"Loss: {losses.item():.3f} (avg: {avg_loss:.3f}) | "
                f"Time: {batch_time:.2f}s/batch | "
                f"Mem: {torch.cuda.memory_allocated()/1e9:.2f}GB",
                end="", flush=True
            )                    

        epoch_time = time.time() - start_time
        print(f"\n✅ Epoch {epoch+1} complete | "
              f"Avg loss: {epoch_loss/len(train_loader):.4f} | "
              f"Time: {epoch_time:.1f}s | "
              f"LR: {optimizer.param_groups[0]['lr']:.2e}\n")

if __name__ == '__main__':
    train()

In [None]:
import torch
import torchvision
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.rpn import AnchorGenerator
from torch.utils.data import Dataset, DataLoader
import yaml
from PIL import Image
import os
import numpy as np
from torchvision.ops import box_iou
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from collections import defaultdict
import json
from tqdm import tqdm

# Configuration
CONFIG = {
    "weights_path": "fasterrcnn_epoch20.pth",
    "data_yaml": "data.yaml",
    "batch_size": 4,
    "img_size": 512,
    "conf_thresh": 0.5,
    "iou_thresh": 0.5,
    "device": torch.device('cuda' if torch.cuda.is_available() else 'cpu'),
    "output_dir": "evaluation_results"
}

# Setup output directory
os.makedirs(CONFIG['output_dir'], exist_ok=True)

class EvaluationDataset(Dataset):
    def __init__(self, yaml_path, mode='val'):
        with open(yaml_path) as f:
            data = yaml.safe_load(f)
        self.img_dir = os.path.join(data['path'], data[mode])
        self.label_dir = self.img_dir.replace('images', 'labels')
        self.images = sorted([f for f in os.listdir(self.img_dir) 
                           if f.endswith(('.jpg', '.png', '.jpeg'))])
        self.class_names = data['names']
        self.class_dict = {i: name for i, name in enumerate(self.class_names)}

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.images[idx])
        img = Image.open(img_path).convert("RGB")
        img = torchvision.transforms.functional.to_tensor(img)
        img = torchvision.transforms.functional.resize(img, [CONFIG['img_size']]*2)
        
        label_path = os.path.join(self.label_dir, 
                                os.path.splitext(self.images[idx])[0] + '.txt')
        boxes, labels = [], []
        if os.path.exists(label_path):
            with open(label_path) as f:
                for line in f:
                    parts = line.strip().split()
                    if len(parts) < 5:
                        continue
                    class_id, xc, yc, w, h = map(float, parts[:5])
                    x1 = (xc - w/2) * CONFIG['img_size']
                    y1 = (yc - h/2) * CONFIG['img_size']
                    x2 = (xc + w/2) * CONFIG['img_size']
                    y2 = (yc + h/2) * CONFIG['img_size']
                    boxes.append([x1, y1, x2, y2])
                    labels.append(int(class_id))
        
        target = {
            "boxes": torch.tensor(boxes, dtype=torch.float32) if boxes else torch.zeros((0, 4), dtype=torch.float32),
            "labels": torch.tensor(labels, dtype=torch.int64) if labels else torch.zeros(0, dtype=torch.int64),
            "image_id": torch.tensor([idx]),
            "image_name": self.images[idx]
        }
        return img, target

def load_model(num_classes):
    backbone = torchvision.models.mobilenet_v2(weights=None).features
    backbone.out_channels = 1280
    anchor_generator = AnchorGenerator(
        sizes=((32, 64, 128, 256),),
        aspect_ratios=((0.5, 1.0, 2.0),)
    )
    roi_pooler = torchvision.ops.MultiScaleRoIAlign(
        featmap_names=['0'],
        output_size=7,
        sampling_ratio=2
    )
    model = FasterRCNN(
        backbone,
        num_classes=679,
        rpn_anchor_generator=anchor_generator,
        box_roi_pool=roi_pooler
    )
    state_dict = torch.load(CONFIG['weights_path'], map_location=CONFIG['device'],weights_only=True)
    for key in list(state_dict.keys()):
        if 'cls_score' in key or 'bbox_pred' in key:
            del state_dict[key]

    model.load_state_dict(state_dict, strict=False)
    return model.to(CONFIG['device'])

def calculate_metrics(pred_boxes, pred_labels, pred_scores, gt_boxes, gt_labels):
    """Calculate precision, recall, and AP for each class"""
    metrics = {
        'true_positives': 0,
        'false_positives': 0,
        'false_negatives': 0,
        'precision': 0.0,
        'recall': 0.0,
        'f1': 0.0,
        'ious': []
    }
    
    if len(gt_boxes) == 0:
        return metrics
    
    # Filter predictions by confidence
    keep = pred_scores >= CONFIG['conf_thresh']
    pred_boxes = pred_boxes[keep]
    pred_labels = pred_labels[keep]
    
    # Calculate IoU matrix
    iou_matrix = box_iou(gt_boxes, pred_boxes).cpu().numpy()
    
    # Match predictions to ground truth
    matches = []
    for gt_idx in range(len(gt_boxes)):
        for pred_idx in range(len(pred_boxes)):
            if (iou_matrix[gt_idx, pred_idx] >= CONFIG['iou_thresh'] and 
                gt_labels[gt_idx] == pred_labels[pred_idx]):
                matches.append((gt_idx, pred_idx, iou_matrix[gt_idx, pred_idx]))
    
    # Sort matches by IoU
    matches.sort(key=lambda x: x[2], reverse=True)
    
    # Count TP, FP, FN
    matched_gt = set()
    matched_pred = set()
    for gt_idx, pred_idx, iou in matches:
        if gt_idx not in matched_gt and pred_idx not in matched_pred:
            metrics['true_positives'] += 1
            metrics['ious'].append(iou)
            matched_gt.add(gt_idx)
            matched_pred.add(pred_idx)
    
    metrics['false_positives'] = len(pred_boxes) - len(matched_pred)
    metrics['false_negatives'] = len(gt_boxes) - len(matched_gt)
    
    # Calculate metrics
    metrics['precision'] = metrics['true_positives'] / max(metrics['true_positives'] + metrics['false_positives'], 1)
    metrics['recall'] = metrics['true_positives'] / max(metrics['true_positives'] + metrics['false_negatives'], 1)
    metrics['f1'] = 2 * (metrics['precision'] * metrics['recall']) / max(metrics['precision'] + metrics['recall'], 1e-6)
    
    return metrics

def visualize_detections(image, gt_boxes, gt_labels, pred_boxes, pred_labels, pred_scores, class_names, save_path):
    """Visualize ground truth and predictions on an image"""
    fig, ax = plt.subplots(1, figsize=(12, 8))
    ax.imshow(image.permute(1, 2, 0))
    
    # Draw ground truth (green)
    for box, label in zip(gt_boxes, gt_labels):
        rect = patches.Rectangle(
            (box[0], box[1]), box[2]-box[0], box[3]-box[1],
            linewidth=2, edgecolor='g', facecolor='none')
        ax.add_patch(rect)
        ax.text(box[0], box[1], class_names[label], color='white',
                bbox=dict(facecolor='green', alpha=0.7, pad=1))
    
    # Draw predictions (red)
    for box, label, score in zip(pred_boxes, pred_labels, pred_scores):
        rect = patches.Rectangle(
            (box[0], box[1]), box[2]-box[0], box[3]-box[1],
            linewidth=2, edgecolor='r', facecolor='none', linestyle='--')
        ax.add_patch(rect)
        ax.text(box[0], box[1], f"{class_names[label]} {score:.2f}", color='white',
                bbox=dict(facecolor='red', alpha=0.7, pad=1))
    
    plt.axis('off')
    plt.savefig(save_path, bbox_inches='tight', dpi=300)
    plt.close()

def evaluate():
    # Load dataset and model
    dataset = EvaluationDataset(CONFIG['data_yaml'], 'val')
    dataloader = DataLoader(dataset, batch_size=CONFIG['batch_size'], 
                          collate_fn=lambda x: tuple(zip(*x)))
    model = load_model(679)
    model.eval()
    
    # Initialize metrics storage
    class_metrics = defaultdict(lambda: {
        'true_positives': 0,
        'false_positives': 0,
        'false_negatives': 0,
        'ious': []
    })
    image_results = []
    
    # Evaluation loop
    for batch_idx, (images, targets) in enumerate(tqdm(dataloader, desc="Evaluating")):
        images = [img.to(CONFIG['device']) for img in images]
        
        with torch.no_grad():
            outputs = model(images)
        
        for i, (target, output) in enumerate(zip(targets, outputs)):
            # Move data to CPU for metrics calculation
            gt_boxes = target['boxes'].cpu()
            gt_labels = target['labels'].cpu()
            pred_boxes = output['boxes'].cpu()
            pred_labels = output['labels'].cpu()
            pred_scores = output['scores'].cpu()
            
            # Calculate metrics per class
            for class_id in range(len(dataset.class_names)):
                class_mask = gt_labels == class_id
                class_gt_boxes = gt_boxes[class_mask]
                class_gt_labels = gt_labels[class_mask]
                
                pred_mask = pred_labels == class_id
                class_pred_boxes = pred_boxes[pred_mask]
                class_pred_labels = pred_labels[pred_mask]
                class_pred_scores = pred_scores[pred_mask]
                
                metrics = calculate_metrics(
                    class_pred_boxes, class_pred_labels, class_pred_scores,
                    class_gt_boxes, class_gt_labels
                )
                
                # Update class metrics
                class_metrics[class_id]['true_positives'] += metrics['true_positives']
                class_metrics[class_id]['false_positives'] += metrics['false_positives']
                class_metrics[class_id]['false_negatives'] += metrics['false_negatives']
                class_metrics[class_id]['ious'].extend(metrics['ious'])
            
            # Save visualization for first image in first batch
            if batch_idx == 0 and i == 0:
                vis_path = os.path.join(CONFIG['output_dir'], 'detection_example.png')
                visualize_detections(
                    images[i].cpu(), gt_boxes, gt_labels,
                    pred_boxes, pred_labels, pred_scores,
                    dataset.class_names, vis_path
                )
            
            # Store per-image results
            image_results.append({
                'image_name': target['image_name'],
                'ground_truth': {
                    'boxes': gt_boxes.tolist(),
                    'labels': gt_labels.tolist()
                },
                'predictions': {
                    'boxes': pred_boxes.tolist(),
                    'labels': pred_labels.tolist(),
                    'scores': pred_scores.tolist()
                }
            })
    
    # Calculate final metrics
    results = {
        'per_class': {},
        'overall': {
            'true_positives': 0,
            'false_positives': 0,
            'false_negatives': 0,
            'mean_iou': 0.0,
            'precision': 0.0,
            'recall': 0.0,
            'f1': 0.0
        }
    }
    
    # Aggregate class metrics
    for class_id, metrics in class_metrics.items():
        tp = metrics['true_positives']
        fp = metrics['false_positives']
        fn = metrics['false_negatives']
        ious = metrics['ious']
        
        precision = tp / max(tp + fp, 1)
        recall = tp / max(tp + fn, 1)
        f1 = 2 * (precision * recall) / max(precision + recall, 1e-6)
        mean_iou = np.mean(ious) if ious else 0.0
        
        results['per_class'][dataset.class_names[class_id]] = {
            'precision': precision,
            'recall': recall,
            'f1': f1,
            'true_positives': tp,
            'false_positives': fp,
            'false_negatives': fn,
            'mean_iou': mean_iou
        }
        
        # Update overall metrics
        results['overall']['true_positives'] += tp
        results['overall']['false_positives'] += fp
        results['overall']['false_negatives'] += fn
    
    # Calculate overall metrics
    tp = results['overall']['true_positives']
    fp = results['overall']['false_positives']
    fn = results['overall']['false_negatives']
    
    results['overall']['precision'] = tp / max(tp + fp, 1)
    results['overall']['recall'] = tp / max(tp + fn, 1)
    results['overall']['f1'] = 2 * (results['overall']['precision'] * results['overall']['recall']) / \
                              max(results['overall']['precision'] + results['overall']['recall'], 1e-6)
    
    # Save results
    with open(os.path.join(CONFIG['output_dir'], 'metrics.json'), 'w') as f:
        json.dump(results, f, indent=2)
    
    with open(os.path.join(CONFIG['output_dir'], 'per_image_results.json'), 'w') as f:
        json.dump(image_results, f, indent=2)
    
    # Print summary
    print("\nEvaluation Results:")
    print(f"{'Class':<15} {'Precision':>10} {'Recall':>10} {'F1':>10} {'mIoU':>10}")
    print("-" * 55)
    for class_name, metrics in results['per_class'].items():
        print(f"{class_name:<15} {metrics['precision']:>10.4f} {metrics['recall']:>10.4f} "
              f"{metrics['f1']:>10.4f} {metrics['mean_iou']:>10.4f}")
    print("-" * 55)
    print(f"{'OVERALL':<15} {results['overall']['precision']:>10.4f} "
          f"{results['overall']['recall']:>10.4f} {results['overall']['f1']:>10.4f}")

if __name__ == "__main__":
    evaluate()