# Introduction

This notebook demonstrates the complete workflow for setting up and training a YOLO model. The process includes:
- Data augmentation
- Hyperparameter optimization using Optuna
- Inference for generating submission files

Although we use the "You Only Look Once" (YOLO) model, this challenge might have you looking twice before starting the training process!

**Note:** Selecting the optimal model size, batch size, and number of epochs is critical. Launching training without careful parameter tuning may result in wasted time.

In [None]:
!pip install ultralytics
!pip install albumentations
!pip install optuna

# Import Libraries

In this section, we import the libraries required for:
- File and image handling (`os`, `cv2`, `shutil`)
- XML parsing (`xml.etree.ElementTree`)
- Data splitting (`sklearn.model_selection`)
- Data augmentation (`albumentations`)
- Model training and inference (`torch`, `ultralytics`)
- Others such as `numpy` and `pathlib`

In [None]:
import os
import cv2
import xml.etree.ElementTree as ET
from sklearn.model_selection import train_test_split
import albumentations as A
import shutil
import torch
from ultralytics import YOLO
from pathlib import Path
import numpy as np

# Transforming XML Annotations

YOLO requires annotations in a specific format. The function below converts annotations from XML (Pascal VOC) to YOLO format. Only objects labeled as `nodule` are processed.

In [None]:
def convert_xml_to_yolo(xml_path):
    """Convert XML annotations to YOLO format, separating class labels."""
    tree = ET.parse(xml_path)
    root = tree.getroot()
    size = root.find('size')
    width = int(size.find('width').text)
    height = int(size.find('height').text)
    
    bboxes = []
    class_labels = []
    for obj in root.findall('object'):
        if obj.find('name').text != 'nodule':
            continue
        bndbox = obj.find('bndbox')
        xmin = float(bndbox.find('xmin').text)
        ymin = float(bndbox.find('ymin').text)
        xmax = float(bndbox.find('xmax').text)
        ymax = float(bndbox.find('ymax').text)
        
        x_center = (xmin + xmax) / 2.0 / width
        y_center = (ymin + ymax) / 2.0 / height
        box_width = (xmax - xmin) / width
        box_height = (ymax - ymin) / height
        
        x_center = min(max(x_center, 1e-6), 1.0)
        y_center = min(max(y_center, 1e-6), 1.0)
        box_width = min(max(box_width, 1e-6), 1.0)
        box_height = min(max(box_height, 1e-6), 1.0)
        
        bboxes.append([x_center, y_center, box_width, box_height])
        class_labels.append(0)
    
    return bboxes, class_labels

# Data Augmentation

We utilize the Albumentations library to augment our chest X-ray images. The augmentation pipeline includes:
- Random brightness and contrast adjustments
- Addition of Gaussian noise
- Rotation (up to 15°)
- CLAHE (Contrast Limited Adaptive Histogram Equalization) for improved contrast
- Slight shifts and scales

CLAHE is especially effective for X-ray images as it enhances contrast while controlling noise amplification.

> **CLAHE:** Contrast Limited AHE (CLAHE) is a variant of adaptive histogram equalization where the contrast amplification is limited, reducing the risk of noise amplification.

In [None]:
import albumentations as A

def get_augmentation_pipeline():
    """Enhanced Albumentations pipeline for chest X-ray augmentation."""
    return A.Compose([
        A.RandomBrightnessContrast(brightness_limit=0.4, contrast_limit=0.4, p=0.8),
        A.GaussNoise(var_limit=(10.0, 50.0), p=0.5),
        A.Rotate(limit=15, p=0.5),  # Rotate up to 15°
        A.CLAHE(clip_limit=2.0, tile_grid_size=(8, 8), p=0.5),
        A.ShiftScaleRotate(shift_limit=0.05, scale_limit=0.05, rotate_limit=0, p=0.3),  # Slight shifting and scaling
    ], bbox_params=A.BboxParams(format='yolo', min_visibility=0.1, label_fields=['labels']))

In [None]:
def augment_dataset(original_images_dir, original_annos_dir, output_images_dir, output_labels_dir, num_augmentations=1):
    """Augment dataset by generating additional samples based on literature guidelines."""
    os.makedirs(output_images_dir, exist_ok=True)
    os.makedirs(output_labels_dir, exist_ok=True)
    
    augmentation = get_augmentation_pipeline()
    
    for xml_file in os.listdir(original_annos_dir):
        if not xml_file.endswith('.xml'):
            continue
        base_name = os.path.splitext(xml_file)[0]
        img_path = os.path.join(original_images_dir, f"{base_name}.jpg")
        xml_path = os.path.join(original_annos_dir, xml_file)
        
        img = cv2.imread(img_path)
        if img is None:
            print(f"Warning: Failed to load {img_path}")
            continue
        
        bboxes, class_labels = convert_xml_to_yolo(xml_path)
        
        orig_img_path = os.path.join(output_images_dir, f"{base_name}.jpg")
        orig_label_path = os.path.join(output_labels_dir, f"{base_name}.txt")
        if not os.path.exists(orig_img_path):
            shutil.copy(img_path, orig_img_path)
        if not os.path.exists(orig_label_path) and bboxes:
            with open(orig_label_path, 'w') as f:
                for bbox in bboxes:
                    f.write(f"0 {' '.join(map(str, bbox))}\n")
        
        # Generate augmented samples for each image
        for i in range(num_augmentations):
            aug_img_name = f"{base_name}_aug_{i}.jpg"
            aug_label_name = f"{base_name}_aug_{i}.txt"
            aug_img_path = os.path.join(output_images_dir, aug_img_name)
            aug_label_path = os.path.join(output_labels_dir, aug_label_name)
            
            if not os.path.exists(aug_img_path):
                augmented = augmentation(image=img, bboxes=bboxes, labels=class_labels)
                aug_img = augmented['image']
                aug_bboxes = augmented['bboxes']
                aug_labels = augmented['labels']
                
                cv2.imwrite(aug_img_path, aug_img)
                if aug_bboxes and not os.path.exists(aug_label_path):
                    with open(aug_label_path, 'w') as f:
                        for bbox, label in zip(aug_bboxes, aug_labels):
                            f.write(f"{label} {' '.join(map(str, bbox))}\n")

    print(f"Augmented dataset saved to {output_images_dir} and {output_labels_dir}")

In [None]:
original_images = '/kaggle/input/pulmonary-nodule/train/jpg'
original_annos = '/kaggle/input/pulmonary-nodule/train/anno'
dataset_dir = '/kaggle/working/pulmonary-nodule'
images_dir = os.path.join(dataset_dir, 'images')
labels_dir = os.path.join(dataset_dir, 'labels')

os.makedirs(images_dir, exist_ok=True)
os.makedirs(labels_dir, exist_ok=True)
train_images_dir = os.path.join(images_dir, 'train')
val_images_dir = os.path.join(images_dir, 'val')
train_labels_dir = os.path.join(labels_dir, 'train')
val_labels_dir = os.path.join(labels_dir, 'val')
os.makedirs(train_images_dir, exist_ok=True)
os.makedirs(val_images_dir, exist_ok=True)
os.makedirs(train_labels_dir, exist_ok=True)
os.makedirs(val_labels_dir, exist_ok=True)

If you wish to remove the existing augmented dataset before re-generating it, uncomment the cell below.

In [None]:
# # Clear existing augmented training data
# if os.path.exists(train_images_dir):
#     shutil.rmtree(train_images_dir)
# os.makedirs(train_images_dir, exist_ok=True)
#
# if os.path.exists(train_labels_dir):
#     shutil.rmtree(train_labels_dir)
# os.makedirs(train_labels_dir, exist_ok=True)

In [None]:
xml_files = [f for f in os.listdir(original_annos) if f.endswith('.xml')]
filenames = [os.path.splitext(f)[0] for f in xml_files]
train_filenames, val_filenames = train_test_split(filenames, test_size=0.2, random_state=42)

# Overwrite augmented training data
augment_dataset(original_images, original_annos, train_images_dir, train_labels_dir, num_augmentations=1)

# Process the validation set with safety checks
for filename in val_filenames:
    src_image = os.path.join(original_images, f"{filename}.jpg")
    dst_image = os.path.join(val_images_dir, f"{filename}.jpg")
    
    if not os.path.exists(dst_image):
        shutil.copy(src_image, dst_image)
    
    xml_path = os.path.join(original_annos, f"{filename}.xml")
    label_path = os.path.join(val_labels_dir, f"{filename}.txt")
    
    if not os.path.exists(label_path):
        bboxes, class_labels = convert_xml_to_yolo(xml_path)
        if bboxes:
            with open(label_path, 'w') as f:
                for bbox in bboxes:
                    f.write(f"0 {' '.join(map(str, bbox))}\n")

print("Validation set prepared with safety checks.")

# Repeat augmentation and validation set processing if needed
xml_files = [f for f in os.listdir(original_annos) if f.endswith('.xml')]
filenames = [os.path.splitext(f)[0] for f in xml_files]
train_filenames, val_filenames = train_test_split(filenames, test_size=0.2, random_state=42)
augment_dataset(original_images, original_annos, train_images_dir, train_labels_dir, num_augmentations=1)

for filename in val_filenames:
    src_image = os.path.join(original_images, f"{filename}.jpg")
    dst_image = os.path.join(val_images_dir, f"{filename}.jpg")
    
    if not os.path.exists(dst_image):
        shutil.copy(src_image, dst_image)
    
    xml_path = os.path.join(original_annos, f"{filename}.xml")
    label_path = os.path.join(val_labels_dir, f"{filename}.txt")
    
    if not os.path.exists(label_path):
        bboxes, class_labels = convert_xml_to_yolo(xml_path)
        if bboxes:
            with open(label_path, 'w') as f:
                for bbox in bboxes:
                    f.write(f"0 {' '.join(map(str, bbox))}\n")

print("Validation set prepared with safety checks.")

In [None]:
data_yaml_path = '/kaggle/working/pulmonary-nodule/data.yaml'
data_yaml_content = """
train: /kaggle/working/pulmonary-nodule/images/train
val: /kaggle/working/pulmonary-nodule/images/val
nc: 1
names: ['nodule']
"""
with open(data_yaml_path, 'w') as f:
    f.write(data_yaml_content)

if os.path.exists(data_yaml_path):
    print("data.yaml exists!")
    with open(data_yaml_path, 'r') as f:
        print("Contents of data.yaml:")
        print(f.read())
else:
    print("Error: data.yaml was not created!")

# Hyperparameter Tuning

We use **Optuna** to optimize the hyperparameters for our YOLO model. Although a single trial may take considerable time, random parameter sampling helps us explore the search space effectively. Additionally, model weights are saved after each epoch so that you can resume or use the best available checkpoint even if training is interrupted.

In [None]:
import optuna
from ultralytics import YOLO
import os
import json
from datetime import datetime

class CheckpointCallback:
    def __init__(self, checkpoint_dir='optuna_checkpoints'):
        self.checkpoint_dir = checkpoint_dir
        os.makedirs(checkpoint_dir, exist_ok=True)
        self.checkpoint_file = os.path.join(checkpoint_dir, 'study_checkpoint.json')
        
    def __call__(self, study: optuna.study.Study, trial: optuna.trial.FrozenTrial):
        checkpoint_data = {
            'study_name': study.study_name,
            'direction': study.direction.name,
            'best_trial': {
                'number': study.best_trial.number,
                'value': study.best_trial.value,
                'params': study.best_trial.params
            },
            'trials_completed': len(study.trials),
            'datetime': datetime.now().isoformat()
        }
        
        with open(self.checkpoint_file, 'w') as f:
            json.dump(checkpoint_data, f, indent=4)

def load_checkpoint(checkpoint_dir='optuna_checkpoints'):
    checkpoint_file = os.path.join(checkpoint_dir, 'study_checkpoint.json')
    if os.path.exists(checkpoint_file):
        with open(checkpoint_file, 'r') as f:
            return json.load(f)
    return None

def objective(trial):
    # Define hyperparameters to optimize
    lr0 = trial.suggest_float('lr0', 1e-4, 1e-2, log=True)
    batch_size = trial.suggest_int('batch_size', 4, 16, step=4)
    momentum = trial.suggest_float('momentum', 0.8, 0.95)
    weight_decay = trial.suggest_float('weight_decay', 1e-5, 1e-3, log=True)
    iou = trial.suggest_float('iou', 0.4, 0.7)
    
    # Create unique run directory for this trial
    run_dir = f'runs/trial_{trial.number}'
    os.makedirs(run_dir, exist_ok=True)
    
    # Load model with pre-trained weights
    model = YOLO('yolo11m.pt')
    
    print(f"\nTrial {trial.number}: Starting YOLOv11n training")
    print(f"Parameters: lr0={lr0}, batch={batch_size}, momentum={momentum}")
    
    try:
        # Train with trial-specific hyperparameters
        results = model.train(
            data=data_yaml_path,
            epochs=50,
            imgsz=1024,
            batch=batch_size,
            patience=10,
            lr0=lr0,
            cos_lr=True,
            momentum=momentum,
            weight_decay=weight_decay,
            iou=iou,
            augment=True,
            resume=False,
            project=run_dir,
            name='exp',
            exist_ok=True,
            save_period=1,  # Save model after every epoch
            save=True      # Enable model saving
        )
        
        # Extract validation mAP50
        metrics = results.results_dict
        val_mAP50 = metrics.get('metrics/mAP50(B)', 0.0)
        
        # Save trial results
        trial_results = {
            'trial_number': trial.number,
            'params': trial.params,
            'mAP50': val_mAP50,
            'model_dir': os.path.join(run_dir, 'exp')
        }
        
        with open(os.path.join(run_dir, 'trial_results.json'), 'w') as f:
            json.dump(trial_results, f, indent=4)
            
        return val_mAP50
        
    except Exception as e:
        print(f"Error in trial {trial.number}: {str(e)}")
        return float('-inf')

# Data path
data_yaml_path = '/kaggle/working/pulmonary-nodule/data.yaml'

# Verify data paths
print("Checking data paths...")
required_paths = {
    'Train': '/kaggle/working/pulmonary-nodule/images/train',
    'Val': '/kaggle/working/pulmonary-nodule/images/val',
    'data.yaml': data_yaml_path
}

for name, path in required_paths.items():
    exists = os.path.exists(path)
    print(f"{name} path exists: {exists}")
    if not exists:
        raise FileNotFoundError(f"Required path not found: {path}")

# Load existing checkpoint if available
checkpoint_data = load_checkpoint()
if checkpoint_data:
    print("\nResuming from previous checkpoint:")
    print(f"Completed trials: {checkpoint_data['trials_completed']}")
    print(f"Best trial so far: {checkpoint_data['best_trial']['value']}")

# Create and configure study
study = optuna.create_study(
    direction='maximize',
    study_name='yolo_optimization',
    load_if_exists=True
)

# Add checkpoint callback
checkpoint_callback = CheckpointCallback()

# Run optimization
n_trials = 1
print(f"\nStarting optimization with {n_trials} trials...")
study.optimize(
    objective,
    n_trials=n_trials,
    callbacks=[checkpoint_callback]
)

# Print final results
print("\nOptimization completed!")
print("\nBest trial:")
trial = study.best_trial
print(f"  Value (mAP50): {trial.value}")
print("  Parameters:")
for key, value in trial.params.items():
    print(f"    {key}: {value}")

# Inference

For inference, we load the best model weights obtained from the training process. You can adjust the confidence threshold to optimize your detection performance. Multiple submissions might be needed to fine-tune the final score.

In [None]:
import torch
from ultralytics import YOLO
import cv2
import pandas as pd
from glob import glob
import json
import os
import numpy as np

# Load the optimized YOLOv11 model
model = YOLO('/kaggle/working/runs/trial_0/exp/weights/best.pt')  # Updated model path
model.to('cuda' if torch.cuda.is_available() else 'cpu')

test_images = glob('/kaggle/input/pulmonary-nodule/test/jpg/*.jpg')
submission = []

for img_path in test_images:
    filename = os.path.basename(img_path).split(".")[0]
    img = cv2.imread(img_path)
        
    # Inference with Test Time Augmentation (TTA)
    results = model(img, augment=True)
    
    objects = []
    for result in results:
        boxes = result.boxes.xyxy.cpu().numpy()
        scores = result.boxes.conf.cpu().numpy()
        labels = result.boxes.cls.cpu().numpy()
        
        for box, score, label in zip(boxes, scores, labels):
            if score > 0.411 and label == 0:
                objects.append({
                    'class': 'nodule',
                    'bbox': {
                        'xmin': int(box[0]),
                        'ymin': int(box[1]),
                        'xmax': int(box[2]),
                        'ymax': int(box[3])
                    }
                })
    
    objects_str = json.dumps(objects)
    submission.append([filename, 1024, 1024, 3, objects_str])

df = pd.DataFrame(submission, columns=['filename', 'width', 'height', 'depth', 'objects'])
df.to_csv('/kaggle/working/submission.csv', index=False)
print("Submission saved to /kaggle/working/submission.csv")

# Conclusion

Thank you for exploring this notebook! If you appreciate this approach or have any questions or feedback, please feel free to reach out or upvote the project. Your feedback is invaluable and will help improve future work.