In [4]:
import os
import json
from PIL import Image

def convert_to_coco(dataset_path, output_path):
    # Initialize the COCO format structure
    coco_format = {
        "images": [],
        "annotations": [],
        "categories": [
            {"id": 0, "name": "no_tumor"},  # Added 'no_tumor' category
            {"id": 1, "name": "tumor_type_1"},
            {"id": 2, "name": "tumor_type_2"},
            {"id": 3, "name": "tumor_type_3"}
        ]
    }
    
    annotation_id = 1
    image_id = 1

    # Loop through each dataset subset
    for subset in ['train', 'valid', 'test']:
        images_folder = os.path.join(dataset_path, subset, 'images')
        labels_folder = os.path.join(dataset_path, subset, 'labels')
        
        for image_filename in os.listdir(images_folder):
            if not image_filename.endswith('.jpg'):
                continue
            
            # Process image
            image_path = os.path.join(images_folder, image_filename)
            
            # Check if the image file exists
            if not os.path.exists(image_path):
                print(f"Warning: Image {image_path} not found. Skipping...")
                continue
            
            image = Image.open(image_path)
            width, height = image.size
            
            image_entry = {
                "id": image_id,
                "file_name": os.path.join(subset, 'images', image_filename),
                "width": width,
                "height": height
            }
            coco_format['images'].append(image_entry)
            
            # Process label
            label_filename = os.path.splitext(image_filename)[0] + '.txt'
            label_path = os.path.join(labels_folder, label_filename)
            
            # If no corresponding label file, create a 'no_tumor' annotation
            if not os.path.exists(label_path):
                print(f"Warning: Label file {label_filename} not found for image {image_filename}. Marking as 'no_tumor'.")
                
                annotation_entry = {
                    "id": annotation_id,
                    "image_id": image_id,
                    "category_id": 0,  # 'no_tumor' category
                    "bbox": [],  # Empty bounding box
                    "area": 0.0,  # No area
                    "iscrowd": 0
                }
                coco_format['annotations'].append(annotation_entry)
                annotation_id += 1
                image_id += 1
                continue
            
            with open(label_path, 'r') as label_file:
                lines = label_file.readlines()
                if not lines:
                    # No objects in this image, mark as 'no_tumor'
                    annotation_entry = {
                        "id": annotation_id,
                        "image_id": image_id,
                        "category_id": 0,  # 'no_tumor' category
                        "bbox": [],  # Empty bounding box
                        "area": 0.0,  # No area
                        "iscrowd": 0
                    }
                    coco_format['annotations'].append(annotation_entry)
                    annotation_id += 1
                else:
                    for line in lines:
                        elements = line.strip().split()
                        if len(elements) != 5:
                            print(f"Warning: Incorrect label format in file {label_filename}. Skipping this line...")
                            continue
                        
                        # Extract the YOLO format elements
                        category_id, x_center, y_center, bbox_width, bbox_height = map(float, elements)
                        
                        # Convert from YOLO format to COCO format
                        x_min = (x_center - bbox_width / 2) * width
                        y_min = (y_center - bbox_height / 2) * height
                        bbox_width = bbox_width * width
                        bbox_height = bbox_height * height

                        # COCO bbox format: [x_min, y_min, width, height]
                        bbox = [x_min, y_min, bbox_width, bbox_height]
                        
                        annotation_entry = {
                            "id": annotation_id,
                            "image_id": image_id,
                            "category_id": int(category_id) + 1,  # Shift by +1 to match category IDs
                            "bbox": bbox,
                            "area": bbox_width * bbox_height,
                            "iscrowd": 0
                        }
                        coco_format['annotations'].append(annotation_entry)
                        annotation_id += 1
            
            image_id += 1
    
    # Save the COCO format data to a JSON file
    with open(output_path, 'w') as output_file:
        json.dump(coco_format, output_file, indent=4)
    print(f"COCO JSON file created successfully at {output_path}")

# Specify the path to your dataset and the output path for the JSON file
dataset_path = r'C:\Users\moham\Downloads\Compressed\tumrset\Brain Tumor Detection'
output_json_path = os.path.join(dataset_path, 'annotations.json')

# Generate the COCO JSON file
convert_to_coco(dataset_path, output_json_path)


COCO JSON file created successfully at C:\Users\moham\Downloads\Compressed\tumrset\Brain Tumor Detection\annotations.json


In [1]:
pip install torch torchvision torchaudio pytorch-lightning transformers

Note: you may need to restart the kernel to use updated packages.


In [2]:
pip install pycocotools


Note: you may need to restart the kernel to use updated packages.


In [3]:
pip install timm

Note: you may need to restart the kernel to use updated packages.


In [None]:
import os
import torch
import pytorch_lightning as pl
from torch.utils.data import Dataset, DataLoader
from PIL import Image
from pycocotools.coco import COCO
from torchvision import transforms
from transformers import DetrForObjectDetection

# Step 1: Modify the BrainTumorDataset class to handle dataset splits
class BrainTumorDataset(Dataset):
    def __init__(self, root, annotation_file, split='train', transforms=None):
        self.root = root
        self.coco = COCO(annotation_file)
        self.transforms = transforms
        self.split = split
        self.ids = self._filter_image_ids()

    def _filter_image_ids(self):
        valid_ids = []
        for img_id in self.coco.imgs:
            img_info = self.coco.loadImgs(img_id)[0]
            if self.split in img_info['file_name']:
                ann_ids = self.coco.getAnnIds(imgIds=img_id)
                if ann_ids:  # Include image ID only if annotations exist
                    valid_ids.append(img_id)
        return valid_ids

    def __getitem__(self, index):
        image_id = self.ids[index]
        image_info = self.coco.loadImgs(image_id)[0]
        path = image_info['file_name']
        image = Image.open(os.path.join(self.root, path)).convert("RGB")
        
        annotations = self.coco.loadAnns(self.coco.getAnnIds(imgIds=image_id))
        boxes = [ann['bbox'] for ann in annotations]
        labels = [ann['category_id'] for ann in annotations]
        
        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)
        
        if len(boxes) == 0:
            boxes = torch.zeros((0, 4), dtype=torch.float32)
            labels = torch.zeros((0,), dtype=torch.int64)
        
        target = {"boxes": boxes, "class_labels": labels}

        if self.transforms:
            image = self.transforms(image)
        
        return image, target

    def __len__(self):
        return len(self.ids)

# Step 2: Modify the collate function to handle varying sizes of annotations
def collate_fn(batch):
    pixel_values = [item[0] for item in batch]  # Extract images
    labels = [item[1] for item in batch]  # Extract labels

    pixel_values = torch.stack(pixel_values)

    batch_labels = []
    for label in labels:
        if label['boxes'].numel() > 0:
            batch_labels.append({
                'boxes': label['boxes'],
                'class_labels': label['class_labels']
            })
        else:
            batch_labels.append({
                'boxes': torch.zeros((0, 4), dtype=torch.float32),
                'class_labels': torch.zeros((0,), dtype=torch.int64)
            })

    return {
        'pixel_values': pixel_values,
        'labels': batch_labels
    }

# Step 3: Define the DetrLightningModule for training
class DetrLightningModule(pl.LightningModule):
    def __init__(self, lr, lr_backbone, weight_decay, num_labels):
        super().__init__()
        self.model = DetrForObjectDetection.from_pretrained(
            pretrained_model_name_or_path="facebook/detr-resnet-50", 
            num_labels=num_labels,
            ignore_mismatched_sizes=True
        )
        
        self.lr = lr
        self.lr_backbone = lr_backbone
        self.weight_decay = weight_decay

    def forward(self, pixel_values, pixel_mask=None):
        return self.model(pixel_values=pixel_values, pixel_mask=pixel_mask)

    def common_step(self, batch, batch_idx):
        pixel_values = batch["pixel_values"].to(self.device)
        labels = [{k: v.to(self.device) for k, v in t.items()} for t in batch["labels"]]

        outputs = self.model(pixel_values=pixel_values, labels=labels)
        loss = outputs.loss
        loss_dict = outputs.loss_dict

        return loss, loss_dict

    def training_step(self, batch, batch_idx):
        loss, loss_dict = self.common_step(batch, batch_idx)
        self.log("train_loss", loss)
        for k, v in loss_dict.items():
            self.log("train_" + k, v.item())
        return loss

    def validation_step(self, batch, batch_idx):
        loss, loss_dict = self.common_step(batch, batch_idx)
        self.log("val_loss", loss)
        for k, v in loss_dict.items():
            self.log("val_" + k, v.item())
        return loss

    def configure_optimizers(self):
        param_dicts = [
            {"params": [p for n, p in self.named_parameters() if "backbone" not in n and p.requires_grad]},
            {"params": [p for n, p in self.named_parameters() if "backbone" in n and p.requires_grad], "lr": self.lr_backbone},
        ]
        return torch.optim.AdamW(param_dicts, lr=self.lr, weight_decay=self.weight_decay)

# Step 4: Load and prepare the datasets
image_root = r'C:\Users\moham\Downloads\Compressed\tumrset\Brain Tumor Detection'

transform = transforms.Compose([
    transforms.ToTensor(),
])

# Initialize datasets for training and validation
train_dataset = BrainTumorDataset(root=image_root, annotation_file=os.path.join(image_root, 'annotations.json'), split='train', transforms=transform)
val_dataset = BrainTumorDataset(root=image_root, annotation_file=os.path.join(image_root, 'annotations.json'), split='valid', transforms=transform)

# Step 5: Create the DataLoaders with the updated collate function
train_dataloader = DataLoader(train_dataset, batch_size=2, shuffle=True, collate_fn=collate_fn)
val_dataloader = DataLoader(val_dataset, batch_size=2, shuffle=False, collate_fn=collate_fn)

# Step 6: Instantiate the model and the PyTorch Lightning trainer
model = DetrLightningModule(lr=1e-4, lr_backbone=1e-5, weight_decay=1e-4, num_labels=4)

trainer = pl.Trainer(max_epochs=10, devices=1, accelerator='cpu')

# Pass both train and validation dataloaders to the trainer
trainer.fit(model, train_dataloader, val_dataloader)


loading annotations into memory...
Done (t=0.13s)
creating index...
index created!
loading annotations into memory...
Done (t=0.11s)
creating index...
index created!


Some weights of the model checkpoint at facebook/detr-resnet-50 were not used when initializing DetrForObjectDetection: ['model.backbone.conv_encoder.model.layer1.0.downsample.1.num_batches_tracked', 'model.backbone.conv_encoder.model.layer2.0.downsample.1.num_batches_tracked', 'model.backbone.conv_encoder.model.layer3.0.downsample.1.num_batches_tracked', 'model.backbone.conv_encoder.model.layer4.0.downsample.1.num_batches_tracked']
- This IS expected if you are initializing DetrForObjectDetection from the checkpoint of a model trained on another task or with another architecture (e.g. initializing a BertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing DetrForObjectDetection from the checkpoint of a model that you expect to be exactly identical (initializing a BertForSequenceClassification model from a BertForSequenceClassification model).
Some weights of DetrForObjectDetection were not initialized from the model checkpoin

Sanity Checking: |                                                                               | 0/? [00:00<…

C:\Users\moham\anaconda3\Lib\site-packages\pytorch_lightning\trainer\connectors\data_connector.py:424: The 'val_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=7` in the `DataLoader` to improve performance.
C:\Users\moham\anaconda3\Lib\site-packages\pytorch_lightning\utilities\data.py:78: Trying to infer the `batch_size` from an ambiguous collection. The batch size we found is 2. To avoid any miscalculations, use `self.log(..., batch_size=batch_size)`.
C:\Users\moham\anaconda3\Lib\site-packages\pytorch_lightning\trainer\connectors\data_connector.py:424: The 'train_dataloader' does not have many workers which may be a bottleneck. Consider increasing the value of the `num_workers` argument` to `num_workers=7` in the `DataLoader` to improve performance.


Training: |                                                                                      | 0/? [00:00<…

Validation: |                                                                                    | 0/? [00:00<…

In [None]:
import matplotlib.pyplot as plt
import matplotlib.patches as patches
import torch
import torchvision.transforms as T

# Step 7: Load the test dataset
test_dataset = BrainTumorDataset(root=image_root, annotation_file=annotation_file, transforms=transform)
test_dataloader = DataLoader(test_dataset, batch_size=1, shuffle=False, collate_fn=collate_fn)

# Step 8: Function to plot image with bounding boxes
def plot_image_with_boxes(image, boxes, labels, color='r', title=None):
    """ Plot image with bounding boxes in COCO format """
    plt.imshow(image)
    ax = plt.gca()
    for box, label in zip(boxes, labels):
        x_min, y_min, width, height = box
        rect = patches.Rectangle((x_min, y_min), width, height, linewidth=2, edgecolor=color, facecolor='none')
        ax.add_patch(rect)
        ax.text(x_min, y_min, f'{label}', color='white', fontsize=12, verticalalignment='top', bbox={'facecolor': color, 'alpha': 0.5})
    if title:
        plt.title(title)
    plt.show()

# Step 9: Iterate over the test dataloader and visualize predictions
model.eval()  # Set model to evaluation mode
for batch in test_dataloader:
    images, targets = batch['pixel_values'], batch['labels']
    images = images.to(model.device)

    # Predict
    with torch.no_grad():
        outputs = model(images)
    
    # Convert outputs to bounding boxes and labels
    # Ensure that the predicted boxes are in the COCO format (x_min, y_min, width, height)
    predicted_boxes = outputs.pred_boxes[0].detach().cpu().numpy()  # Assuming batch_size=1
    predicted_labels = torch.argmax(outputs.logits, dim=-1)[0].detach().cpu().numpy()
    
    # Ground truth from the COCO-format JSON file
    actual_boxes = targets[0]['boxes'].cpu().numpy()  # Assuming batch_size=1
    actual_labels = targets[0]['class_labels'].cpu().numpy()
    
    # Plot the first image with actual and predicted bounding boxes
    image_np = images[0].permute(1, 2, 0).cpu().numpy()  # Convert to numpy and channel-last format
    plot_image_with_boxes(image_np, actual_boxes, actual_labels, color='g', title='Actual')
    plot_image_with_boxes(image_np, predicted_boxes, predicted_labels, color='r', title='Predicted')
