In [None]:
import torch
import torchvision
import torchvision.transforms as T
from PIL import Image
import json
import os
import numpy as np
from torchvision.ops import masks_to_boxes
import cv2
import torch.optim as optim
from torch.utils.data import DataLoader
import tqdm
from tqdm import tqdm  # Progress bar

# Define the custom dataset class
class CustomDataset(torch.utils.data.Dataset):
    def __init__(self, root, annotation_file, transforms=None):
        self.root = root
        self.transforms = transforms
        self.annotations = json.load(open(annotation_file))
        self.height = self.annotations['images'][0]['height']
        self.width = self.annotations['images'][0]['width']
        self.to_tensor = T.ToTensor()  # Convert PIL image to tensor

    def __len__(self):
        return len(self.annotations['images'])

    def __getitem__(self, idx):
        img_id = self.annotations['images'][idx]['id']
        img_path = os.path.join(self.root, self.annotations['images'][idx]['file_name'])
        image = Image.open(img_path).convert("RGB")

        # Convert image to tensor (no resizing)
        image = self.to_tensor(image)

        # Get annotations for this image
        annotations = [ann for ann in self.annotations['annotations'] if ann['image_id'] == img_id]

        boxes, masks, labels = [], [], []
        for ann in annotations:
            x, y, w, h = ann['bbox']
            boxes.append([x, y, x + w, y + h])
            labels.append(ann['category_id'])

            # Convert segmentation polygons to binary masks
            mask = torch.zeros((self.height, self.width), dtype=torch.uint8)  # Empty mask
            for seg in ann['segmentation']:  # Segmentation can have multiple polygons
                poly = np.array(seg, dtype=np.int32).reshape(-1, 2)  # Convert to (N,2)
                mask = self.draw_polygon(mask, poly)  # Draw polygon mask

            masks.append(mask)

        # Convert to tensors
        boxes = torch.as_tensor(boxes, dtype=torch.float32)
        labels = torch.as_tensor(labels, dtype=torch.int64)
        masks = torch.stack(masks) if masks else torch.zeros((0, self.height, self.width), dtype=torch.uint8)

        target = {"boxes": boxes, "labels": labels, "masks": masks}

        if self.transforms:
            image, target = self.transforms(image, target)

        return image, target

    def draw_polygon(self, mask, poly):
        """Draws a polygon on a binary mask."""
        cv2.fillPoly(mask.numpy(), [poly], 1)
        return mask

# Custom Mask R-CNN Class to Avoid Resizing
class CustomMaskRCNN(torchvision.models.detection.mask_rcnn.MaskRCNN):
    def __init__(self, model):
        super(CustomMaskRCNN, self).__init__()
        self.backbone = model.backbone
        self.roi_heads = model.roi_heads

    def forward(self, images, targets=None):
        # Skip the default resizing in the forward pass
        # images: a list of tensors
        original_size = [image.shape[-2:] for image in images]  # Get original size
        
        # The images are already in their original sizes, no resizing here.
        features = self.backbone(images)
        proposals, proposal_losses = self.roi_heads.proposal_generator(images, features, targets)
        result, losses = self.roi_heads.box_predictor(features, proposals, targets)
        
        # Return the result and losses
        return result, losses

# Define the transformations (without resizing)
transform = T.Compose([
    T.ToTensor(),  # Convert the image to a tensor (no resizing)
])

# Create the dataset with the transformations
train_dataset = CustomDataset(
    root="/home/aiunika/Desktop/180_resnet/180_4.v4-resnet.coco/train",
    annotation_file="/home/aiunika/Desktop/180_resnet/180_4.v4-resnet.coco/train/_annotations.coco.json",
    transforms=transform
)
val_dataset = CustomDataset(
    root="/home/aiunika/Desktop/180_resnet/180_4.v4-resnet.coco/valid",
    annotation_file="/home/aiunika/Desktop/180_resnet/180_4.v4-resnet.coco/valid/_annotations.coco.json",
    transforms=transform
)

# Create DataLoader
train_loader = DataLoader(train_dataset, batch_size=5, shuffle=True, collate_fn=lambda x: tuple(zip(*x)))
val_loader = DataLoader(val_dataset, batch_size=5, shuffle=False, collate_fn=lambda x: tuple(zip(*x)))

# Load the pre-trained Mask R-CNN model
model = torchvision.models.detection.maskrcnn_resnet50_fpn(pretrained=True)

# Use the custom model that avoids resizing
custom_model = CustomMaskRCNN(model)
custom_model.to(device)  # Move to the correct device (GPU or CPU)

# Define optimizer
optimizer = optim.AdamW(custom_model.parameters(), lr=1e-4)

# Training parameters
num_epochs = 20
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
best_val_loss = float("inf")
print('Training started')

# Training Loop
for epoch in range(num_epochs):
    print(f"\nEpoch {epoch + 1}/{num_epochs}")

    # Training phase
    custom_model.train()
    total_train_loss = 0

    # Using tqdm for progress bar in training
    train_progress = tqdm(enumerate(train_loader), total=len(train_loader), desc="Training Batches", ncols=100)

    for batch_idx, (images, targets) in train_progress:
        images = [img.to(device) for img in images]
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        optimizer.zero_grad()
        loss_dict = custom_model(images, targets)
        losses = sum(loss for loss in loss_dict.values())
        losses.backward()
        optimizer.step()

        total_train_loss += losses.item()

        # Update progress bar
        if torch.cuda.is_available():
            allocated = torch.cuda.memory_allocated(device) / (1024 ** 3)  # In GB
            reserved = torch.cuda.memory_reserved(device) / (1024 ** 3)  # In GB
            train_progress.set_postfix({
                "Loss": f"{losses.item():.4f}",
                "GPU Allocated": f"{allocated:.2f} GB",
                "GPU Reserved": f"{reserved:.2f} GB"
            })

    avg_train_loss = total_train_loss / len(train_loader)

    # Validation phase
    custom_model.eval()  # Set the model to evaluation mode (correct)
    total_val_loss = 0

    val_progress = tqdm(enumerate(val_loader), total=len(val_loader), desc="Validation Batches", ncols=100)

    with torch.no_grad():  # Disable gradient computation for efficiency
        for batch_idx, (images, targets) in val_progress:
            images = [img.to(device) for img in images]
            targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

            loss_dict = custom_model(images, targets)

            if isinstance(loss_dict, list):  # If predictions are returned instead of losses
                print("Warning: Model returned predictions instead of losses during validation.")
                continue

            losses = sum(loss for loss in loss_dict.values())
            total_val_loss += losses.item()

            # Update progress bar with memory info (if CUDA is available)
            if torch.cuda.is_available():
                allocated = torch.cuda.memory_allocated(device) / (1024 ** 3)  # In GB
                reserved = torch.cuda.memory_reserved(device) / (1024 ** 3)  # In GB
                val_progress.set_postfix({
                    "Loss": f"{losses.item():.4f}",
                    "GPU Allocated": f"{allocated:.2f} GB",
                    "GPU Reserved": f"{reserved:.2f} GB"
                })

    avg_val_loss = total_val_loss / len(val_loader)

    print(f"Epoch {epoch + 1} | Train Loss: {avg_train_loss:.4f} | Val Loss: {avg_val_loss:.4f}")

    # Save the best model based on validation loss
    if avg_val_loss < best_val_loss:
        best_val_loss = avg_val_loss
        torch.save(custom_model.state_dict(), "best_mask_rcnn.pth")
        print("Saved best model!")

# Save final model
torch.save(custom_model.state_dict(), "final_mask_rcnn.pth")
