In [1]:
import os
import glob
import re
import numpy as np
import pandas as pd
from scipy.io import loadmat

# Set your dataset root folder path
dataset_root = 'Baggages'
data_entries = []
# Folders that should not be treated as positive despite having a 'labels' file
EXCLUDED_FOLDERS = {'B0078', 'B0082', 'B0083', 'B0085', 'B0086'}

def load_annotations_from_mat(mat_file, ann_type):
    """
    Loads annotation data from a .mat file, considering the annotation type.
    For 'pmatrices' type, prioritizes keys containing 'pmatrices' in their name.
    """
    try:
        mat_data = loadmat(mat_file)
        keys = [key for key in mat_data.keys() if not key.startswith('__')]
        if ann_type == 'pmatrices':
            # Look for keys likely to contain pmatrices data
            pmatrix_keys = [k for k in keys if 'pmatrices' in k.lower()]
            if pmatrix_keys:
                selected_key = pmatrix_keys[0]
            elif keys:
                selected_key = keys[0]
            else:
                return None
        else:
            selected_key = keys[0] if keys else None
        return mat_data[selected_key] if selected_key else None
    except Exception as e:
        print(f"Error reading MAT file {mat_file}: {e}")
        return None

def load_annotations_from_text(text_file):
    """
    Loads annotation data from a text file (e.g., labels or ground_truth).
    Uses latin1 encoding and skips the first row (assuming it's a header).
    """
    try:
        data_array = np.loadtxt(text_file, encoding='latin1', skiprows=1)
        return data_array
    except Exception as e:
        print(f"Error reading text annotation file {text_file}: {e}")
        return None

def find_annotation_file(folder_path):
    """
    Search for an annotation file in the folder with priority:
      1. "labels" text file
      2. .mat files (prioritizing 'pmatrices' in name)
      3. "groundtruth" or "ground_truth" files
    """
    for file in os.listdir(folder_path):
        if file.lower().startswith("labels"):
            return os.path.join(folder_path, file), "labels"

    mat_files = glob.glob(os.path.join(folder_path, "*.mat"))
    if mat_files:
        for f in mat_files:
            if "pmatrices" in os.path.basename(f).lower():
                return f, "pmatrices"
        return mat_files[0], "mat"

    for file in os.listdir(folder_path):
        if "groundtruth" in file.lower() or "ground_truth" in file.lower():
            return os.path.join(folder_path, file), "ground_truth"

    return None, None

def process_annotation_array(data_array, ann_type, folder_path):
    """
    Processes annotation array into a dictionary mapping image IDs to bounding boxes.
    Handles both 2D and 3D arrays.
    """
    gt_dict = {}
    pos_rows = 0
    if data_array.ndim == 3:
        num_ann = data_array.shape[2]
        print(f"  -> Detected 3D annotation array with {num_ann} instances.")
        for i in range(num_ann):
            ann_instance = data_array[:, :, i].flatten()
            try:
                image_id = int(ann_instance[0])
            except Exception as e:
                print(f"    Skipping instance {i} in {folder_path}: {e}")
                continue
            bbox = ann_instance[1:].tolist()
            gt_dict.setdefault(image_id, []).append(bbox)
            pos_rows += 1
    elif data_array.ndim == 2:
        num_ann = data_array.shape[0]
        print(f"  -> Detected 2D annotation array with {num_ann} rows.")
        for i, row in enumerate(data_array):
            try:
                image_id = int(np.squeeze(row[0]))
            except Exception as e:
                print(f"    Skipping row {i} in {folder_path}: {e}")
                continue
            bbox = row[1:].tolist()
            gt_dict.setdefault(image_id, []).append(bbox)
            pos_rows += 1
    else:
        print(f"  -> Unsupported array dimension: {data_array.ndim}")
    print(f"  -> Processed {pos_rows} annotations across {len(gt_dict)} images.")
    return gt_dict

# Main processing loop
for folder in sorted(os.listdir(dataset_root)):
    folder_path = os.path.join(dataset_root, folder)
    if not os.path.isdir(folder_path):
        continue

    print(f"\n=== Processing folder: {folder} ===")
    ann_file, ann_type = find_annotation_file(folder_path)
    print(f"  -> Annotation file: {ann_file} (Type: {ann_type})")

    # Determine if this folder should be treated as all-positive
    is_labels_folder = (ann_type == 'labels') and (folder not in EXCLUDED_FOLDERS)
    gt_dict = None
    data_array = None

    if ann_file:
        if ann_type in ["mat", "pmatrices"]:
            data_array = load_annotations_from_mat(ann_file, ann_type)
        elif ann_type in ["labels", "ground_truth"]:
            data_array = load_annotations_from_text(ann_file)

        if data_array is not None:
            print(f"  Annotation array shape: {data_array.shape}")
            gt_dict = process_annotation_array(data_array, ann_type, folder_path)
        else:
            print("  -> Failed to load annotation data.")
    else:
        print("  -> No annotation file found.")

    # Process images
    pos_count = neg_count = 0
    for file in os.listdir(folder_path):
        if not file.lower().endswith(('.png', '.jpg', '.jpeg')):
            continue

        image_path = os.path.join(folder_path, file)
        base_name = os.path.splitext(file)[0]

        # Extract image ID using regex to find last numeric sequence
        try:
            numbers = re.findall(r'\d+', base_name)
            if not numbers:
                raise ValueError(f"No numbers in filename: {file}")
            image_id = int(numbers[-1])  # Use last number as ID
        except Exception as e:
            print(f"  -> Error extracting ID from {file}: {e}")
            continue

        # Determine label
        if is_labels_folder:
            # If it's a labels folder and not excluded, mark as positive
            label = 1
        elif ann_file is not None:
            # If an annotation file exists, use it to determine the label
            label = 1 if (gt_dict and image_id in gt_dict) else 0
        elif folder not in EXCLUDED_FOLDERS:
            # No annotation file found, and folder is not excluded: default to positive
            label = 1
        else:
            # Folder is excluded and no annotation file found: mark as negative
            label = 0

        # Update counters
        if label == 1:
            pos_count += 1
        else:
            neg_count += 1

        data_entries.append({
            'subfolder': folder,
            'image_path': image_path,
            'image_id': image_id,
            'label': label,
            'bboxes': gt_dict.get(image_id, []) if gt_dict else []
        })

    print(f"  -> Labeling results: Pos={pos_count}, Neg={neg_count}, Total={pos_count + neg_count}")

# Save results
df = pd.DataFrame(data_entries)
df.to_csv('baggage_labels.csv', index=False)
print("\n=== Final Summary ===")
print(f"Total images: {len(df)} (Positive: {sum(df['label']==1)}, Negative: {sum(df['label']==0)})")


=== Processing folder: B0001 ===
  -> Annotation file: Baggages\B0001\BoundingBox.mat (Type: mat)
  Annotation array shape: (14, 5)
  -> Detected 2D annotation array with 14 rows.
  -> Processed 14 annotations across 14 images.
  -> Labeling results: Pos=14, Neg=0, Total=14

=== Processing folder: B0002 ===
  -> Annotation file: Baggages\B0002\BoundingBox.mat (Type: mat)
  Annotation array shape: (8, 5)
  -> Detected 2D annotation array with 8 rows.
  -> Processed 8 annotations across 8 images.
  -> Labeling results: Pos=8, Neg=1, Total=9

=== Processing folder: B0003 ===
  -> Annotation file: Baggages\B0003\BoundingBox.mat (Type: mat)
  Annotation array shape: (9, 5)
  -> Detected 2D annotation array with 9 rows.
  -> Processed 9 annotations across 9 images.
  -> Labeling results: Pos=9, Neg=1, Total=10

=== Processing folder: B0004 ===
  -> Annotation file: Baggages\B0004\BoundingBox.mat (Type: mat)
  Annotation array shape: (9, 5)
  -> Detected 2D annotation array with 9 rows.
  ->

In [2]:
import os
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from PIL import Image
import torch
from torch.utils.data import Dataset, DataLoader, WeightedRandomSampler
from torchvision import transforms

class AdvancedXRayDataset(Dataset):
    """Enhanced X-ray dataset with robust error handling and advanced augmentations"""
    def __init__(self, file_paths, labels, is_training=False):
        self.file_paths = file_paths
        self.labels = labels
        self.is_training = is_training
        
        # Base preprocessing pipeline
        self.base_transform = transforms.Compose([
            transforms.Resize((384, 384)),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406],
                                 std=[0.229, 0.224, 0.225])
        ])
        
        # Advanced augmentation pipeline
        self.aug_transform = transforms.Compose([
            transforms.RandomHorizontalFlip(p=0.5),
            transforms.RandomVerticalFlip(p=0.3),
            transforms.RandomAffine(degrees=15, translate=(0.1, 0.1)), 
            transforms.RandomPerspective(distortion_scale=0.2, p=0.3),
            transforms.RandomApply([transforms.GaussianBlur(kernel_size=(3, 7))], p=0.3),
            transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.1),
            transforms.RandomAutocontrast(),
            transforms.RandomPosterize(bits=3, p=0.2)
        ])

    def __len__(self):
        return len(self.file_paths)

    def __getitem__(self, idx):
        for _ in range(3):  # Allow 3 attempts to load image
            try:
                img_path = self.file_paths[idx]
                label = self.labels[idx]
                image = Image.open(img_path).convert('RGB')
                
                if self.is_training:
                    image = self.aug_transform(image)
                
                return self.base_transform(image), torch.tensor(label, dtype=torch.float32)
            except Exception as e:
                print(f"Error loading {img_path}: {e}")
                idx = np.random.randint(0, len(self))  # Fallback to random sample
        return torch.zeros(3, 384, 384), torch.tensor(0.0)  # Ultimate fallback

def create_data_loaders(csv_path, test_size=0.1, val_size=0.1, batch_size=64):
    """Create stratified train/val/test loaders with class balancing"""
    # Load and filter data
    df = pd.read_csv(csv_path)
    df = df[df['image_path'].apply(os.path.exists)].sample(frac=1, random_state=42)
    
    # Initial split (train+val vs test)
    train_val_paths, test_paths, train_val_labels, test_labels = train_test_split(
        df['image_path'].values, df['label'].values.astype('float32'),
        test_size=test_size, stratify=df['label'], random_state=42
    )
    
    # Secondary split (train vs val)
    train_paths, val_paths, train_labels, val_labels = train_test_split(
        train_val_paths, train_val_labels,
        test_size=val_size/(1-test_size), 
        stratify=train_val_labels, random_state=42
    )

    # Class balancing setup
    class_counts = np.bincount(train_labels.astype(int))
    class_weights = 1. / class_counts
    sample_weights = class_weights[train_labels.astype(int)]
    sampler = WeightedRandomSampler(sample_weights, len(sample_weights), replacement=True)

    # Create datasets
    train_dataset = AdvancedXRayDataset(train_paths, train_labels, is_training=True)
    val_dataset = AdvancedXRayDataset(val_paths, val_labels)
    test_dataset = AdvancedXRayDataset(test_paths, test_labels)

    # Create loaders
    loaders = {
        'train': DataLoader(train_dataset, batch_size, sampler=sampler,
                           num_workers=os.cpu_count(), pin_memory=True),
        'val': DataLoader(val_dataset, batch_size, shuffle=False,
                         num_workers=os.cpu_count(), pin_memory=True),
        'test': DataLoader(test_dataset, batch_size, shuffle=False,
                          num_workers=os.cpu_count(), pin_memory=True)
    }
    
    # Print dataset statistics
    print(f"Training samples: {len(train_dataset)} (Class 0: {class_counts[0]}, Class 1: {class_counts[1]})")
    print(f"Validation samples: {len(val_dataset)}")
    print(f"Test samples: {len(test_dataset)}")
    
    return loaders

if __name__ == '__main__':
    DATA_CSV_PATH = 'baggage_labels.csv'
    loaders = create_data_loaders(DATA_CSV_PATH)
    print("Data loaders successfully created!")

Training samples: 8653 (Class 0: 1940, Class 1: 6713)
Validation samples: 1082
Test samples: 1082
Data loaders successfully created!


In [1]:
import torch

# Check if CUDA (GPU support) is available
if torch.cuda.is_available():
    print(f"CUDA is available. GPU: {torch.cuda.get_device_name(0)}")
else:
    print("CUDA is not available. Using CPU.")


CUDA is available. GPU: NVIDIA GeForce GTX 1650


In [6]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import timm
import albumentations as A
from albumentations.pytorch import ToTensorV2
from torch.optim.swa_utils import AveragedModel, update_bn
from torch.cuda.amp import GradScaler, autocast
from timm.optim import AdamP
from timm.scheduler import CosineLRScheduler
from torchmetrics import MetricCollection, Accuracy, Precision, Recall, F1Score, AUROC, AveragePrecision
from torch.utils.data import DataLoader, Dataset, random_split
import pandas as pd
import cv2
import os
import numpy as np
from sklearn.model_selection import train_test_split
from tqdm.notebook import tqdm

# -------------------- Hardware Optimization --------------------
torch.backends.cudnn.benchmark = True
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True

# -------------------- Data Augmentations --------------------
def create_transforms(img_size=224):
    train_transform = A.Compose([
        A.RandomResizedCrop(img_size, img_size, scale=(0.67, 1.0)),
        A.HorizontalFlip(p=0.5),
        A.VerticalFlip(p=0.5),
        A.ShiftScaleRotate(shift_limit=0.1, scale_limit=0.2, rotate_limit=30, p=0.5),
        A.RandomBrightnessContrast(brightness_limit=0.2, contrast_limit=0.2, p=0.5),
        A.CoarseDropout(max_holes=8, max_height=32, max_width=32, p=0.5),
        A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ToTensorV2()
    ])
    
    val_transform = A.Compose([
        A.Resize(img_size, img_size),
        A.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ToTensorV2()
    ])
    return train_transform, val_transform

# -------------------- Dataset --------------------
class WeaponDataset(Dataset):
    def __init__(self, dataframe, root_dir, transform=None):
        self.dataframe = dataframe.reset_index(drop=True)
        self.root_dir = root_dir
        self.transform = transform
        self.num_classes = 1

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        img_path = os.path.join(self.root_dir, self.dataframe.loc[idx, 'image_path'])
        image = cv2.imread(img_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        label = self.dataframe.loc[idx, 'label']
        
        if self.transform:
            transformed = self.transform(image=image)
            image = transformed['image']
            
        return image, torch.tensor([label], dtype=torch.float32)

# -------------------- Model Architecture --------------------
class WeaponDetector(nn.Module):
    def __init__(self, num_classes=1):
        super().__init__()
        self.base_model = timm.create_model('tf_efficientnetv2_l', 
                                           pretrained=True,
                                           num_classes=0,
                                           features_only=False)
        self.head = nn.Sequential(
            nn.Linear(1280, 512),
            nn.SiLU(),
            nn.Dropout(0.3),
            nn.Linear(512, num_classes)
        )

    def forward(self, x):
        features = self.base_model(x)
        return self.head(features)

# -------------------- Training Utilities --------------------
class ProgressiveTrainer:
    def __init__(self, model, device):
        self.model = model
        self.device = device
        self.stages = [
            {'img_size': 224, 'batch_size': 64, 'epochs': 10, 'lr': 1e-4},
            {'img_size': 384, 'batch_size': 32, 'epochs': 15, 'lr': 5e-5},
            {'img_size': 512, 'batch_size': 16, 'epochs': 25, 'lr': 1e-5}
        ]
        self.current_stage = 0
        self.global_epoch = 0
        
    def get_current_config(self):
        return self.stages[self.current_stage]
    
    def progress_stage(self):
        if self.current_stage < len(self.stages) - 1:
            self.current_stage += 1
            return True
        return False

def train_model():
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    print(f"Using device: {device}")
    torch.cuda.empty_cache()

    # Data preparation
    full_df = pd.read_csv('baggage_labels.csv')
    train_df, test_df = train_test_split(full_df, test_size=0.15, stratify=full_df['label'], random_state=42)
    train_df, val_df = train_test_split(train_df, test_size=0.1, stratify=train_df['label'], random_state=42)

    # Model setup
    model = WeaponDetector().to(device)
    ema_model = AveragedModel(model).to(device)
    scaler = GradScaler()
    prog_trainer = ProgressiveTrainer(model, device)
    
    # Loss function
    pos_weight = torch.tensor([len(train_df) / (2 * train_df['label'].sum())]).to(device)
    criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight)

    # Metrics
    metrics = MetricCollection({
        'accuracy': Accuracy(task='binary'),
        'precision': Precision(task='binary'),
        'recall': Recall(task='binary'),
        'f1': F1Score(task='binary'),
        'auc': AUROC(task='binary'),
        'prc': AveragePrecision(task='binary')
    }).to(device)

    best_score = 0.0

    # Training loop
    while True:
        config = prog_trainer.get_current_config()
        print(f"\n=== Stage {prog_trainer.current_stage + 1}: {config} ===")
        
        # Reinitialize optimizer/scheduler for each stage
        optimizer = AdamP(model.parameters(), lr=config['lr'], weight_decay=1e-4)
        scheduler = CosineLRScheduler(optimizer, 
                                    t_initial=sum(s['epochs'] for s in prog_trainer.stages),
                                    warmup_t=10,
                                    lr_min=1e-6,
                                    warmup_lr_init=5e-5)
        
        # Update transforms and data loaders
        train_transform, val_transform = create_transforms(config['img_size'])
        train_dataset = WeaponDataset(train_df, './data', train_transform)
        val_dataset = WeaponDataset(val_df, './data', val_transform)
        
        train_loader = DataLoader(train_dataset, 
                                batch_size=config['batch_size'],
                                shuffle=True,
                                num_workers=12,
                                pin_memory=True,
                                persistent_workers=True,
                                drop_last=True)
        
        val_loader = DataLoader(val_dataset,
                              batch_size=config['batch_size'],
                              num_workers=8,
                              pin_memory=True)

        for epoch in range(config['epochs']):
            model.train()
            prog_bar = tqdm(train_loader, desc=f"Epoch {prog_trainer.global_epoch+1}/{sum(s['epochs'] for s in prog_trainer.stages)}")
            epoch_loss = 0.0
            
            for images, labels in prog_bar:
                images = images.to(device, non_blocking=True)
                labels = labels.to(device)
                
                with autocast():
                    outputs = model(images)
                    loss = criterion(outputs, labels)
                
                scaler.scale(loss).backward()
                scaler.step(optimizer)
                scaler.update()
                optimizer.zero_grad(set_to_none=True)
                ema_model.update_parameters(model)
                
                epoch_loss += loss.item() * images.size(0)
                prog_bar.set_postfix({'loss': loss.item(), 'lr': optimizer.param_groups[0]['lr']})
            
            # Validation
            model.eval()
            val_metrics = {k: [] for k in metrics.keys()}
            val_loss = 0.0
            
            with torch.no_grad():
                for images, labels in val_loader:
                    images = images.to(device)
                    labels = labels.to(device)
                    
                    with autocast():
                        outputs = ema_model(images)
                        loss = criterion(outputs, labels)
                        preds = torch.sigmoid(outputs)
                    
                    val_loss += loss.item() * images.size(0)
                    batch_metrics = metrics(preds, labels)
                    for k in val_metrics:
                        val_metrics[k].append(batch_metrics[k].item())
            
            # Calculate epoch metrics
            epoch_loss = epoch_loss / len(train_loader.dataset)
            val_loss = val_loss / len(val_loader.dataset)
            avg_val_metrics = {k: np.mean(v) for k, v in val_metrics.items()}
            combined_score = avg_val_metrics['auc'] + avg_val_metrics['prc']
            
            print(f"Epoch {prog_trainer.global_epoch+1} | "
                f"Train Loss: {epoch_loss:.4f} | Val Loss: {val_loss:.4f} | "
                f"AUC: {avg_val_metrics['auc']:.4f} | PRC: {avg_val_metrics['prc']:.4f}")

            # Save best model
            if combined_score > best_score:
                best_score = combined_score
                torch.save({
                    'model': model.state_dict(),
                    'ema_model': ema_model.state_dict(),
                    'optimizer': optimizer.state_dict(),
                    'scheduler': scheduler.state_dict(),
                    'global_epoch': prog_trainer.global_epoch,
                    'metrics': avg_val_metrics
                }, 'best_model.pth')
                print(f"New best model saved with combined score: {best_score:.4f}")

            # Update scheduler and epoch counter
            scheduler.step(prog_trainer.global_epoch)
            prog_trainer.global_epoch += 1

        # Update batch norm statistics for EMA model
        update_bn(train_loader, ema_model, device=device)
        
        # Progress to next stage or exit
        if not prog_trainer.progress_stage():
            break

    # Final evaluation on test set
    test_transform = create_transforms(prog_trainer.stages[-1]['img_size'])[1]
    test_dataset = WeaponDataset(test_df, './data', test_transform)
    test_loader = DataLoader(test_dataset, batch_size=prog_trainer.stages[-1]['batch_size'], num_workers=8)
    
    ema_model.eval()
    test_metrics = {k: [] for k in metrics.keys()}
    with torch.no_grad():
        for images, labels in test_loader:
            images = images.to(device)
            labels = labels.to(device)
            
            with autocast():
                outputs = ema_model(images)
                preds = torch.sigmoid(outputs)
            
            batch_metrics = metrics(preds, labels)
            for k in test_metrics:
                test_metrics[k].append(batch_metrics[k].item())
    
    avg_test_metrics = {k: np.mean(v) for k, v in test_metrics.items()}
    print("\n=== Final Test Metrics ===")
    for k, v in avg_test_metrics.items():
        print(f"{k}: {v:.4f}")

if __name__ == '__main__':
    train_model()

Using device: cuda

=== Stage 1: {'img_size': 224, 'batch_size': 64, 'epochs': 10, 'lr': 0.0001} ===


  scaler = GradScaler()
size
  Input should be a valid tuple [type=tuple_type, input_value=224, input_type=int]
    For further information visit https://errors.pydantic.dev/2.10/v/tuple_type
  A.RandomResizedCrop(img_size, img_size, scale=(0.67, 1.0)),


ValidationError: 6 validation errors for InitSchema
p
  Field required [type=missing, input_value={}, input_type=dict]
    For further information visit https://errors.pydantic.dev/2.10/v/missing
scale
  Field required [type=missing, input_value={}, input_type=dict]
    For further information visit https://errors.pydantic.dev/2.10/v/missing
ratio
  Field required [type=missing, input_value={}, input_type=dict]
    For further information visit https://errors.pydantic.dev/2.10/v/missing
size
  Field required [type=missing, input_value={}, input_type=dict]
    For further information visit https://errors.pydantic.dev/2.10/v/missing
interpolation
  Field required [type=missing, input_value={}, input_type=dict]
    For further information visit https://errors.pydantic.dev/2.10/v/missing
mask_interpolation
  Field required [type=missing, input_value={}, input_type=dict]
    For further information visit https://errors.pydantic.dev/2.10/v/missing