In [None]:
import os
import torch
import torchvision
import cv2
import numpy as np
from torch.utils.data import Dataset, DataLoader
import time
from torchvision.models.detection.faster_rcnn import FasterRCNN


ImportError: cannot import name 'FasterRCNN_ResNet18_FPN_Weights' from 'torchvision.models.detection' (e:\ML\envs\torch\lib\site-packages\torchvision\models\detection\__init__.py)

In [None]:
os.environ["CUDA_LAUNCH_BLOCKING"] = "1"

## Dataset Loading

In [None]:
class DroneDataset(Dataset):
    def __init__(self, img_dir, label_dir, sample_fraction=1.0):
        self.img_dir = img_dir
        self.label_dir = label_dir
        
        # Get all image filenames
        self.img_files = sorted([f for f in os.listdir(img_dir) if f.endswith(('.jpg', '.png', '.jpeg'))])

        # Get all image filenames
        all_img_files = sorted([f for f in os.listdir(img_dir) if f.endswith(('.jpg', '.png', '.jpeg'))])
        
        # Randomly sample a fraction of the dataset
        if sample_fraction < 1.0:
            num_samples = int(len(all_img_files) * sample_fraction)
            np.random.seed(42)  # For reproducibility
            indices = np.random.choice(len(all_img_files), num_samples, replace=False)
            self.img_files = [all_img_files[i] for i in indices]
            print(f"Using {len(self.img_files)}/{len(all_img_files)} images ({sample_fraction*100:.1f}%)")
        else:
            self.img_files = all_img_files        

    def __len__(self):
        return len(self.img_files)
    
    def __getitem__(self, idx):
        img_name = self.img_files[idx]
        
        # Load RGB image (320x256)
        img_path = os.path.join(self.img_dir, img_name)
        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        
        # Load label
        label_name = os.path.splitext(img_name)[0] + '.txt'
        label_path = os.path.join(self.label_dir, label_name)
        
        boxes = []
        labels = []
        
        if os.path.exists(label_path):
            with open(label_path, 'r') as f:
                for line in f.readlines():
                    data = line.strip().split()
                    class_id = int(data[0])
                    # YOLO format: class_id, x_center, y_center, width, height (normalized)
                    x_center, y_center, width, height = map(float, data[1:5])
                    
                    if width <= 0 or height <= 0:
                        continue
                    
                    # Convert to [x_min, y_min, x_max, y_max]
                    h, w = image.shape[0], image.shape[1]
                    x_min = (x_center - width/2) * w
                    y_min = (y_center - height/2) * h
                    x_max = (x_center + width/2) * w
                    y_max = (y_center + height/2) * h
                    
                    # Skip boxes that are too small
                    if x_max - x_min < 1 or y_max - y_min < 1:
                        continue
                    
                    boxes.append([x_min, y_min, x_max, y_max])
                    labels.append(class_id + 1)  # +1 since 0 is background for Faster R-CNN
        
        # Handle empty boxes case
        if len(boxes) == 0:
            boxes = torch.zeros((0, 4), dtype=torch.float32)
            labels = torch.zeros(0, dtype=torch.int64)
            area = torch.zeros(0, dtype=torch.float32)
        else:
            boxes = torch.as_tensor(boxes, dtype=torch.float32)
            labels = torch.as_tensor(labels, dtype=torch.int64)
            area = (boxes[:, 3] - boxes[:, 1]) * (boxes[:, 2] - boxes[:, 0])
        
        # Convert image to tensor
        image = torch.from_numpy(image.transpose(2, 0, 1)).float() / 255.0
        
        # Create target dictionary
        target = {
            "boxes": boxes,
            "labels": labels,
            "image_id": torch.tensor([idx]),
            "area": area,
            "iscrowd": torch.zeros((len(boxes),), dtype=torch.int64)
        }
    
        
        return image, target

In [None]:
def validate_dataset(dataset, num_samples=5):
    """Validate dataset by checking a few samples"""
    print(f"Dataset size: {len(dataset)}")
    
    for i in range(min(num_samples, len(dataset))):
        try:
            image, target = dataset[i]
            print(f"Sample {i}:")
            print(f"  - Image shape: {image.shape}")
            print(f"  - Boxes: {target['boxes'].shape}")
            print(f"  - Labels: {target['labels'].tolist()}")
            print(f"  - Unique labels: {torch.unique(target['labels']).tolist()}")
        except Exception as e:
            print(f"Error loading sample {i}: {e}")
    
    print("Dataset validation complete!")

In [None]:
img_dir = "releasev1-detection&tracking/RGB/images"
label_dir = "releasev1-detection&tracking/RGB/labels"

dataset = DroneDataset(img_dir=img_dir, label_dir=label_dir)
validate_dataset(dataset)

Dataset size: 57580
Sample 0:
  - Image shape: torch.Size([3, 256, 320])
  - Boxes: torch.Size([1, 4])
  - Labels: [1]
  - Unique labels: [1]
Sample 1:
  - Image shape: torch.Size([3, 256, 320])
  - Boxes: torch.Size([1, 4])
  - Labels: [1]
  - Unique labels: [1]
Sample 2:
  - Image shape: torch.Size([3, 256, 320])
  - Boxes: torch.Size([1, 4])
  - Labels: [1]
  - Unique labels: [1]
Sample 3:
  - Image shape: torch.Size([3, 256, 320])
  - Boxes: torch.Size([1, 4])
  - Labels: [1]
  - Unique labels: [1]
Sample 4:
  - Image shape: torch.Size([3, 256, 320])
  - Boxes: torch.Size([1, 4])
  - Labels: [1]
  - Unique labels: [1]
Dataset validation complete!


## Loading and Training the model

In [None]:
from torchvision.models.detection import FasterRCNN_ResNet50_FPN_Weights
from torchvision.models.detection import FasterRCNN_MobileNet_V3_Large_FPN_Weights
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.models.detection.rpn import AnchorGenerator
from torchvision.ops import MultiScaleRoIAlign

In [None]:
def get_model(num_classes=3, backbone="mobilenet"):
    if backbone == "custom_resnet18":
        # 1. Load pre-trained ResNet-18
        backbone = torchvision.models.resnet18(weights=torchvision.models.ResNet18_Weights.DEFAULT)
        
        # 2. Select layers to use - remove the avg pool and fc layers
        backbone = torch.nn.Sequential(*(list(backbone.children())[:-2]))
        
        # 3. Create FPN on top of it
        backbone.out_channels = 512  # ResNet18's last layer channels
        
        # 4. Create anchor generator
        anchor_generator = AnchorGenerator(
            sizes=((32, 64, 128, 256, 512),),
            aspect_ratios=((0.5, 1.0, 2.0),)
        )
        
        # 5. Create ROI pooler
        roi_pooler = MultiScaleRoIAlign(
            featmap_names=['0'],
            output_size=7,
            sampling_ratio=2
        )
        
        # 6. Put everything together
        model = FasterRCNN(
            backbone,
            num_classes=num_classes,
            rpn_anchor_generator=anchor_generator,
            box_roi_pool=roi_pooler
        )
    
    elif backbone == "mobilenet":
        # Use MobileNet V3 - fastest pre-built option
        weights = FasterRCNN_MobileNet_V3_Large_FPN_Weights.DEFAULT
        model = torchvision.models.detection.fasterrcnn_mobilenet_v3_large_fpn(weights=weights)
        
        # Modify classifier for your classes
        in_features = model.roi_heads.box_predictor.cls_score.in_features
        model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    
    else:
        # Original ResNet-50 backbone
        weights = FasterRCNN_ResNet50_FPN_Weights.DEFAULT
        model = torchvision.models.detection.fasterrcnn_resnet50_fpn(weights=weights)
        
        # Modify classifier for your classes
        in_features = model.roi_heads.box_predictor.cls_score.in_features
        model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)
    
    return model

In [None]:
def train_model(model, dataloaders, num_epochs=20):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Training on {device}")
    model.to(device)
    
    # Parameters
    params = [p for p in model.parameters() if p.requires_grad]
    
    # Optimizer
    optimizer = torch.optim.SGD(
        params, 
        lr=0.001,
        momentum=0.9, 
        weight_decay=0.0005
    )
    
    # Learning rate scheduler
    lr_scheduler = torch.optim.lr_scheduler.StepLR(
        optimizer,
        step_size=8,
        gamma=0.5
    )
    
    # Track best model
    best_loss = float('inf')
    best_model_wts = None
    
    for epoch in range(num_epochs):
        print(f'Epoch {epoch+1}/{num_epochs}')
        print('-' * 10)
        
        # Training phase
        model.train()
        running_loss = 0.0
        total_batches = len(dataloaders['train'])
        
        # For progress tracking
        start_time = time.time()
        print_freq = max(1, total_batches // 20)  # Print ~20 updates per epoch
        
        
        for batch_idx, (images, targets) in enumerate(dataloaders['train']):
            images = list(image.to(device) for image in images)
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
            
            optimizer.zero_grad()
            
            loss_dict = model(images, targets)
            losses = sum(loss for loss in loss_dict.values())
            
            losses.backward()
            optimizer.step()
            
            running_loss += losses.item()

            # Print progress at intervals
            if (batch_idx + 1) % print_freq == 0 or (batch_idx + 1) == total_batches:
                # Calculate ETA
                elapsed_time = time.time() - start_time
                batches_per_sec = (batch_idx + 1) / elapsed_time
                eta_seconds = (total_batches - batch_idx - 1) / batches_per_sec
                
                # Format time as hh:mm:ss
                eta_str = time.strftime("%H:%M:%S", time.gmtime(eta_seconds))
                
                # Current loss
                current_loss = running_loss / (batch_idx + 1)
                
                print(f"Batch {batch_idx+1}/{total_batches} ({(batch_idx+1)/total_batches*100:.1f}%) "
                      f"Loss: {current_loss:.4f} | ETA: {eta_str}")
                

        epoch_loss = running_loss / len(dataloaders['train'])
        print(f'Train Loss: {epoch_loss:.4f}')
        
        # Validation phase
        model.eval()
        val_loss = 0.0
        
        with torch.no_grad():
            for images, targets in dataloaders['val']:
                images = list(image.to(device) for image in images)
                targets = [{k: v.to(device) for k, v in t.items()} for t in targets]
                
                # Temporarily switch to train mode to get loss dictionary
                model.train()
                loss_dict = model(images, targets)
                model.eval()  # Switch back to eval mode
                
                losses = sum(loss for loss in loss_dict.values()) 
                val_loss += losses.item()
        
        val_loss = val_loss / len(dataloaders['val'])
        print(f'Val Loss: {val_loss:.4f}')
        
        # Save best model
        if val_loss < best_loss:
            best_loss = val_loss
            best_model_wts = model.state_dict().copy()
            torch.save(model.state_dict(), 'best_rgb_drone_detector.pth')
        
        # Step the scheduler
        lr_scheduler.step()
        
        print()
    
    # Load best model
    model.load_state_dict(best_model_wts)
    return model

## Model Evaluations

In [None]:
def evaluate_model_metrics(model, test_loader, device, iou_threshold=0.5):
    model.eval()
    
    all_true_boxes = []
    all_true_labels = []
    all_pred_boxes = []
    all_pred_scores = []
    all_pred_labels = []
    
    with torch.no_grad():
        for images, targets in test_loader:
            images = list(img.to(device) for img in images)
            
            # Get predictions
            predictions = model(images)
            
            # Process each image in the batch
            for i, prediction in enumerate(predictions):
                # Get ground truth for this image
                true_boxes = targets[i]['boxes'].cpu().numpy()
                true_labels = targets[i]['labels'].cpu().numpy()
                
                # Get predictions for this image
                pred_boxes = prediction['boxes'].cpu().numpy()
                pred_scores = prediction['scores'].cpu().numpy()
                pred_labels = prediction['labels'].cpu().numpy()
                
                # Store for later analysis
                all_true_boxes.append(true_boxes)
                all_true_labels.append(true_labels)
                all_pred_boxes.append(pred_boxes)
                all_pred_scores.append(pred_scores)
                all_pred_labels.append(pred_labels)
    
    # Calculate metrics
    precision, recall, f1, mAP = calculate_detection_metrics(
        all_true_boxes, all_true_labels, 
        all_pred_boxes, all_pred_scores, all_pred_labels,
        iou_threshold
    )
    
    print(f"Evaluation Results at IoU={iou_threshold}:")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1 Score: {f1:.4f}")
    print(f"mAP: {mAP:.4f}")
    
    return precision, recall, f1, mAP

def calculate_detection_metrics(true_boxes_list, true_labels_list, 
                               pred_boxes_list, pred_scores_list, pred_labels_list,
                               iou_threshold=0.5):
    """
    Calculate precision, recall, F1 and mAP for object detection.
    This is a simplified version - production code would use libraries like pycocotools.
    """
    total_true_positives = 0
    total_false_positives = 0
    total_false_negatives = 0
    
    # For each image
    for true_boxes, true_labels, pred_boxes, pred_scores, pred_labels in zip(
        true_boxes_list, true_labels_list, pred_boxes_list, pred_scores_list, pred_labels_list):
        
        # Apply confidence threshold
        conf_threshold = 0.5
        keep = pred_scores >= conf_threshold
        pred_boxes = pred_boxes[keep]
        pred_scores = pred_scores[keep]
        pred_labels = pred_labels[keep]
        
        # Track matches
        matched = [False] * len(true_boxes)
        
        # For each prediction (sorted by confidence)
        sorted_idx = np.argsort(-pred_scores)
        
        for idx in sorted_idx:
            pred_box = pred_boxes[idx]
            pred_label = pred_labels[idx]
            
            # Check against all ground truth boxes
            best_iou = 0
            best_gt_idx = -1
            
            for gt_idx, (true_box, true_label) in enumerate(zip(true_boxes, true_labels)):
                # Skip already matched ground truths
                if matched[gt_idx]:
                    continue
                    
                # Skip if labels don't match
                if pred_label != true_label:
                    continue
                    
                # Calculate IoU
                iou = calculate_iou(pred_box, true_box)
                
                if iou > best_iou and iou >= iou_threshold:
                    best_iou = iou
                    best_gt_idx = gt_idx
            
            # If we found a match
            if best_gt_idx >= 0:
                matched[best_gt_idx] = True
                total_true_positives += 1
            else:
                total_false_positives += 1
        
        # Count false negatives
        total_false_negatives += sum(1 for m in matched if not m)
    
    # Calculate metrics
    precision = total_true_positives / (total_true_positives + total_false_positives + 1e-6)
    recall = total_true_positives / (total_true_positives + total_false_negatives + 1e-6)
    f1 = 2 * precision * recall / (precision + recall + 1e-6)
    
    # Simple mAP calculation (this is simplified)
    mAP = precision * recall
    
    return precision, recall, f1, mAP

In [None]:
def calculate_iou(box1, box2):
    """Calculate IoU between two boxes [x1, y1, x2, y2]"""
    # Get intersection coordinates
    x1 = max(box1[0], box2[0])
    y1 = max(box1[1], box2[1])
    x2 = min(box1[2], box2[2])
    y2 = min(box1[3], box2[3])
    
    # Calculate area of intersection
    width = max(0, x2 - x1)
    height = max(0, y2 - y1)
    intersection = width * height
    
    # Calculate area of both boxes
    box1_area = (box1[2] - box1[0]) * (box1[3] - box1[1])
    box2_area = (box2[2] - box2[0]) * (box2[3] - box2[1])
    
    # Calculate union
    union = box1_area + box2_area - intersection
    
    # Calculate IoU
    iou = intersection / union if union > 0 else 0
    return iou

def test_inference_speed(model, device, input_size=(3, 256, 320), num_trials=100):
    model.to(device)
    model.eval()
    
    # Create dummy input tensor
    dummy_input = torch.rand(1, *input_size).to(device)
    
    # Warm-up
    for _ in range(10):
        with torch.no_grad():
            _ = model([dummy_input])
    
    # Measure inference time
    start_time = time.time()
    
    for _ in range(num_trials):
        with torch.no_grad():
            _ = model([dummy_input])
    
    end_time = time.time()
    
    avg_time = (end_time - start_time) / num_trials
    fps = 1.0 / avg_time
    
    print(f"Average inference time: {avg_time*1000:.2f} ms")
    print(f"FPS: {fps:.2f}")
    
    return fps

In [None]:
def main():
    # Data paths - update these

    
    # Create dataset
    dataset = DroneDataset(img_dir=img_dir, label_dir=label_dir, sample_fraction=0.1)
    
    # Split dataset (80% train, 20% validation)
    dataset_size = len(dataset)
    indices = list(range(dataset_size))
    split = int(0.8 * dataset_size)
    
    # Use fixed random seed for reproducibility
    torch.manual_seed(42)
    np.random.seed(42)
    np.random.shuffle(indices)
    
    train_indices = indices[:split]
    val_indices = indices[split:]
    
    # Create data samplers
    train_sampler = torch.utils.data.SubsetRandomSampler(train_indices)
    val_sampler = torch.utils.data.SubsetRandomSampler(val_indices)
    
    # Create data loaders
    train_loader = DataLoader(
        dataset, 
        batch_size=4,  # Adjust based on your GPU memory
        sampler=train_sampler,
        collate_fn=lambda x: tuple(zip(*x))
    )
    
    val_loader = DataLoader(
        dataset,
        batch_size=4,
        sampler=val_sampler,
        collate_fn=lambda x: tuple(zip(*x))
    )
    
    dataloaders = {
        'train': train_loader,
        'val': val_loader
    }
    
    # Create model (drone + bird + background = 2 classes)
    model = get_model(num_classes=3, backbone="mobilenet")
    
    # Train the model
    trained_model = train_model(model, dataloaders, num_epochs=5)
    
    # Save final model
    torch.save(trained_model.state_dict(), 'final_rgb_drone_detector.pth')
    
    # Test inference speed
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    test_inference_speed(trained_model, device)
    
    # Evaluate model
    evaluate_model_metrics(trained_model, val_loader, device)
    
    print("Training and evaluation complete!")

In [None]:
main()

Using 5758/57580 images (10.0%)
Training on cuda
Epoch 1/5
----------
Batch 57/1152 (4.9%) Loss: 0.3428 | ETA: 00:41:29
Batch 114/1152 (9.9%) Loss: 0.3079 | ETA: 00:39:17
Batch 171/1152 (14.8%) Loss: 0.2855 | ETA: 00:37:07
Batch 228/1152 (19.8%) Loss: 0.2696 | ETA: 00:35:01
Batch 285/1152 (24.7%) Loss: 0.2523 | ETA: 00:32:53
Batch 342/1152 (29.7%) Loss: 0.2409 | ETA: 00:30:44
Batch 399/1152 (34.6%) Loss: 0.2286 | ETA: 00:28:42
Batch 456/1152 (39.6%) Loss: 0.2196 | ETA: 00:26:36
Batch 513/1152 (44.5%) Loss: 0.2119 | ETA: 00:24:29
Batch 570/1152 (49.5%) Loss: 0.2058 | ETA: 00:22:20
Batch 627/1152 (54.4%) Loss: 0.2007 | ETA: 00:20:10
Batch 684/1152 (59.4%) Loss: 0.1972 | ETA: 00:18:00
Batch 741/1152 (64.3%) Loss: 0.1933 | ETA: 00:15:50
Batch 798/1152 (69.3%) Loss: 0.1895 | ETA: 00:13:40
Batch 855/1152 (74.2%) Loss: 0.1866 | ETA: 00:11:30
Batch 912/1152 (79.2%) Loss: 0.1835 | ETA: 00:09:18
Batch 969/1152 (84.1%) Loss: 0.1819 | ETA: 00:07:06
Batch 1026/1152 (89.1%) Loss: 0.1800 | ETA: 00:04

AttributeError: 'list' object has no attribute 'values'