<a href="https://www.kaggle.com/code/alecparrott/swimming-pool-detection-training?scriptVersionId=180627958" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

In [14]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import matplotlib.pyplot as plt
%matplotlib inline
from PIL import Image, ImageDraw
import xml.etree.ElementTree as ET
import cv2
from torch.utils.data import Dataset
from pathlib import Path
import os
import random
import torch
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader, Subset
from torchvision.utils import draw_bounding_boxes
import matplotlib.patches as patches
import albumentations as A
from albumentations.pytorch import ToTensorV2
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
import torch.optim as optim
from sklearn.model_selection import train_test_split
from torch.utils.data import Subset

In [15]:
labels_dir = '/kaggle/input/swimming-pool-detection-algarves-landscape/labels/' 
images_dir = '/kaggle/input/swimming-pool-detection-algarves-landscape/images/'
train_images2_dir = '/kaggle/input/swimming-pool-detection-in-satellite-images/swimmingPool/training/images/'
train_labels2_dir = '/kaggle/input/swimming-pool-detection-in-satellite-images/swimmingPool/training/labels/'
test_images2_dir = '/kaggle/input/swimming-pool-detection-in-satellite-images/swimmingPool/testing/images/'
test_labels2_dir = '/kaggle/input/swimming-pool-detection-in-satellite-images/swimmingPool/testing/labels/'
root_dir = '/kaggle/input/swimming-pool-detection-algarves-landscape/'

In [16]:
import shutil

combined_dir = '/kaggle/working/combined_images/'

if not os.path.exists(combined_dir):
    os.mkdir(combined_dir)
    
    
for dir in [images_dir, train_images2_dir]:
    for filename in os.listdir(dir):
        shutil.copy(os.path.join(dir, filename), combined_dir)

In [17]:
print(len(os.listdir(combined_dir)))

2282


In [18]:
train_transforms = A.Compose([
    A.Resize(height=128, width=128),
    A.RandomCrop(height=128, width=128, p=0.2),
    A.HorizontalFlip(p=0.2),
    A.RandomBrightnessContrast(p=0.2),
    A.Blur(always_apply=False, p=1.0, blur_limit=(3, 7)),
    A.HueSaturationValue(always_apply=False, p=1.0, hue_shift_limit=(-20, 20), sat_shift_limit=(-30, 30), val_shift_limit=(-20, 20)),
    A.Rotate(limit=30, p=0.2),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
    ToTensorV2()
], bbox_params=A.BboxParams(format='pascal_voc', label_fields=['labels']))

test_transforms = A.Compose([
    A.Resize(height=128, width=128),
    A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),  # Match training normalization
    ToTensorV2()
], bbox_params=A.BboxParams(format='pascal_voc', label_fields=['labels']))

In [19]:
def show_transformed_image(dataset, idx):
    """
    Display a transformed image from the dataset along with its bounding boxes.

    Args:
        dataset (pool_sat_Dataset): The dataset object.
        idx (int): The index of the image to display.

    Returns:
        None
    """
    image, target = dataset[idx]
    image = image.permute(1, 2, 0)  # Convert from CxHxW to HxWxC
    image = image.numpy()
    fig, ax = plt.subplots(1)
    ax.imshow(image)
    print(target)
    for bbox in target['boxes']:
        rect = patches.Rectangle((bbox[0], bbox[1]), bbox[2]-bbox[0], bbox[3]-bbox[1], linewidth=1, edgecolor='r', facecolor='none')
        ax.add_patch(rect)
    plt.show()


In [20]:
def parse_xml(xml_file):
    tree = ET.parse(xml_file)
    root = tree.getroot()
    image_data = {
        "filename": root.find('filename').text,
        "width": int(root.find('size/width').text),
        "height": int(root.find('size/height').text),
        "depth": int(root.find('size/depth').text),
        "xmin": [],
        "ymin": [],
        "xmax": [],
        "ymax": []
    }

    valid_image = True  # Assume image is valid unless a bounding box proves otherwise

    for obj in root.findall('object'):
        xmin = float(obj.find('bndbox/xmin').text)
        ymin = float(obj.find('bndbox/ymin').text)
        xmax = float(obj.find('bndbox/xmax').text)
        ymax = float(obj.find('bndbox/ymax').text)

        # Invalidate this image if xmin or ymin are zero
        if xmin == 0 or ymin == 0:
            valid_image = False
            break  # No need to check further; one invalid box is enough

        image_data["xmin"].append(xmin)
        image_data["ymin"].append(ymin)
        image_data["xmax"].append(xmax)
        image_data["ymax"].append(ymax)

    if not valid_image:
        return None  # Return None to indicate this image should be skipped

    return image_data

In [21]:
all_images = []

for label_dir in [labels_dir, train_labels2_dir]:
    for label_file in os.listdir(label_dir):
        if label_file.endswith('.xml'):
            xml_path = os.path.join(label_dir, label_file)
            image_data = parse_xml(xml_path)
            if image_data:  # Only add if parse_xml returns valid data
                all_images.append(image_data)

df = pd.DataFrame(all_images)
print(df.head(10))
print(df.tail(10))

  filename  width  height  depth                              xmin  \
0  128.PNG    227     185      3  [17.0, 73.0, 98.0, 137.0, 163.0]   
1  147.PNG    218     242      3                     [30.0, 125.0]   
2  142.PNG    309     167      3                           [132.0]   
3  127.PNG    227     186      3                            [61.0]   
4  164.PNG    194     199      3                            [53.0]   
5   84.PNG    115     208      3                            [48.0]   
6  169.PNG    252     236      3                           [191.0]   
7  166.PNG    237     148      3                            [45.0]   
8  120.PNG    257     141      3                    [112.0, 175.0]   
9  109.PNG    303     165      3                            [83.0]   

                               ymin                                xmax  \
0  [21.0, 63.0, 90.0, 120.0, 144.0]  [49.0, 104.0, 129.0, 170.0, 196.0]   
1                     [62.0, 200.0]                       [68.0, 164.0]   
2   

In [22]:
print(df.describe())
print(df.isnull().sum())

for index, row in df.iterrows():
    image_path = os.path.join(combined_dir, row['filename'])
    if not os.path.exists(image_path):
        print(f"Image file {row['filename']} does not exist in {combined_dir}")

             width       height   depth
count  1318.000000  1318.000000  1318.0
mean    223.613050   219.369499     3.0
std      29.328432    24.937813     0.0
min      55.000000    67.000000     3.0
25%     224.000000   224.000000     3.0
50%     224.000000   224.000000     3.0
75%     224.000000   224.000000     3.0
max     566.000000   403.000000     3.0
filename    0
width       0
height      0
depth       0
xmin        0
ymin        0
xmax        0
ymax        0
dtype: int64


In [23]:
# Split the DataFrame into training and testing datasets
train_df, test_df = train_test_split(df, test_size=0.2, random_state=42)

In [24]:
from torchvision.transforms.functional import to_tensor

class PoolDataset(Dataset):
    def __init__(self, dataframe, root_dir, transform=None):
        self.dataframe = dataframe
        self.root_dir = root_dir
        self.transform = transform 

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        # Image loading
        img_path = os.path.join(self.root_dir, self.dataframe.iloc[idx]['filename'])
        image = Image.open(img_path).convert("RGB")
        image = np.array(image)  # Convert to numpy array first because using Albumentations

        # Bounding boxes
        xmin = self.dataframe.iloc[idx]['xmin']
        ymin = self.dataframe.iloc[idx]['ymin']
        xmax = self.dataframe.iloc[idx]['xmax']
        ymax = self.dataframe.iloc[idx]['ymax']
        boxes = torch.as_tensor([list(b) for b in zip(xmin, ymin, xmax, ymax)], dtype=torch.float32)
        labels = [1] * len(boxes)
        
        # Labels (as long as all boxes are pools, labels are 1; background is 0 and implicit)
        target = {'boxes': boxes, 'labels': labels}

        if self.transform:
            transformed = self.transform(image=image, bboxes=target['boxes'], labels=target['labels'])
            image = transformed['image']
            target['boxes'] = transformed['bboxes']
            target['labels'] = transformed['labels']

        # Convert to tensors 
        target['boxes'] = torch.as_tensor(target['boxes'], dtype=torch.float32)
        target['labels'] = torch.as_tensor(target['labels'], dtype=torch.int64)

        return image, target

In [25]:
def collate_fn(batch):
    batch = [item for item in batch if item is not None and item[1]['boxes'].size(0) > 0]
    if not batch:
        return torch.tensor([]), []  # Return empty tensors if all items are filtered out
    images = torch.stack([item[0] for item in batch])
    targets = [{'boxes': item[1]['boxes'], 'labels': item[1]['labels']} for item in batch]
    return images, targets

In [26]:
# Initialize dataset
train_dataset = PoolDataset(train_df, combined_dir, train_transforms)
test_dataset = PoolDataset(test_df, combined_dir, test_transforms)

# DataLoader
train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=8, shuffle=False, collate_fn=collate_fn)

In [27]:
import torchvision.ops as ops

def calculate_iou(boxA, boxB):
    """
    Calculate the intersection over union (IoU) of two bounding boxes.

    Args:
        boxA: The first bounding box.
        boxB: The second bounding box.

    Returns:
        The IoU of the two bounding boxes.
    """
    
    # Determine the coordinates of the intersection rectangle
    xA = max(boxA[0], boxB[0])
    yA = max(boxA[1], boxB[1])
    xB = min(boxA[2], boxB[2])
    yB = min(boxA[3], boxB[3])

    # Compute the area of intersection rectangle
    interWidth = max(0, xB - xA)
    interHeight = max(0, yB - yA)
    interArea = interWidth * interHeight
    
    if interArea == 0:
        return 0.0  # No overlap

    # Compute the area of both bounding boxes
    boxAArea = (boxA[2] - boxA[0]) * (boxA[3] - boxA[1])
    boxBArea = (boxB[2] - boxB[0]) * (boxB[3] - boxB[1])

    # Compute the intersection over union
    iou = interArea / float(boxAArea + boxBArea - interArea)
    return iou


def soft_nms(boxes, scores, iou_threshold=0.5, score_threshold=0.001, sigma=0.5):
    """
    Apply Soft Non-Maximum Suppression to the predictions.
    
    Args:
        boxes (Tensor): The bounding boxes.
        scores (Tensor): The confidence scores for each box.
        iou_threshold (float): The IoU threshold for NMS. Defaults to 0.5.
        score_threshold (float): The score threshold to discard low confidence boxes. Defaults to 0.001.
        sigma (float): The sigma parameter for Soft-NMS. Defaults to 0.5.
        
    Returns:
        Tensor: The indices of the boxes to keep.
    """
    if boxes.numel() == 0:
        return torch.tensor([], dtype=torch.int64)
    
    # Convert boxes to the format (x1, y1, x2, y2)
    x1 = boxes[:, 0]
    y1 = boxes[:, 1]
    x2 = boxes[:, 2]
    y2 = boxes[:, 3]

    # Calculate areas of the boxes
    areas = (x2 - x1) * (y2 - y1)
    
    # Order by scores (descending)
    order = scores.argsort(descending=True)

    # List to keep track of which indices to keep
    keep = []

    while order.numel() > 0:
        i = order[0].item()
        keep.append(i)

        if order.numel() == 1:
            break
        
        # Get the IoU of the highest score box with the rest
        xx1 = torch.max(x1[i], x1[order[1:]])
        yy1 = torch.max(y1[i], y1[order[1:]])
        xx2 = torch.min(x2[i], x2[order[1:]])
        yy2 = torch.min(y2[i], y2[order[1:]])
        
        w = (xx2 - xx1).clamp(min=0)
        h = (yy2 - yy1).clamp(min=0)
        inter = w * h
        ovr = inter / (areas[i] + areas[order[1:]] - inter)

        # Apply Soft-NMS score adjustment
        weight = torch.exp(-(ovr ** 2) / sigma)
        scores[order[1:]] = scores[order[1:]] * weight
        
        # Filter out boxes with scores below threshold
        keep_indices = (scores[order[1:]] >= score_threshold).nonzero(as_tuple=False).squeeze()
        
        if keep_indices.numel() == 0:
            break
        
        # Update the order tensor to process remaining boxes
        if keep_indices.dim() == 0:  # Ensure keep_indices is a 1-D tensor
            order = order[keep_indices + 1].unsqueeze(0)
        else:
            order = order[keep_indices + 1]
        

    return torch.tensor(keep, dtype=torch.int64)

In [28]:
from torchvision.models.detection import fasterrcnn_resnet50_fpn
from torchvision.models.detection import FasterRCNN_ResNet50_FPN_Weights
from torch.optim.lr_scheduler import StepLR

model = fasterrcnn_resnet50_fpn(weights=FasterRCNN_ResNet50_FPN_Weights.DEFAULT,  progress=True)
num_classes = 2  # 1 class + background
in_features = model.roi_heads.box_predictor.cls_score.in_features
model.roi_heads.box_predictor = FastRCNNPredictor(in_features, num_classes)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)
num_epochs = 15

params = [p for p in model.parameters() if p.requires_grad]
optimizer = optim.Adam(params, lr=0.0001)
lr_scheduler = StepLR(optimizer, step_size=10, gamma=0.1)

Downloading: "https://download.pytorch.org/models/fasterrcnn_resnet50_fpn_coco-258fb6c6.pth" to /root/.cache/torch/hub/checkpoints/fasterrcnn_resnet50_fpn_coco-258fb6c6.pth
100%|██████████| 160M/160M [00:01<00:00, 152MB/s]  


In [29]:
def calculate_iou(boxA, boxB):
    """
    Calculate the IoU between each pair of boxes from boxA and boxB.
    """
    xA = torch.max(boxA[:, None, 0], boxB[:, 0])  # Shape: [N, M]
    yA = torch.max(boxA[:, None, 1], boxB[:, 1])  # Shape: [N, M]
    xB = torch.min(boxA[:, None, 2], boxB[:, 2])  # Shape: [N, M]
    yB = torch.min(boxA[:, None, 3], boxB[:, 3])  # Shape: [N, M]

    interWidth = torch.clamp(xB - xA, min=0)
    interHeight = torch.clamp(yB - yA, min=0)
    interArea = interWidth * interHeight  # Shape: [N, M]

    boxAArea = (boxA[:, 2] - boxA[:, 0]) * (boxA[:, 3] - boxA[:, 1])  # Shape: [N]
    boxBArea = (boxB[:, 2] - boxB[:, 0]) * (boxB[:, 3] - boxB[:, 1])  # Shape: [M]

    iou = interArea / (boxAArea[:, None] + boxBArea - interArea)  # Shape: [N, M]
    return iou

In [30]:
import torch.nn as nn

# Define loss functions
classification_loss_fn = nn.CrossEntropyLoss()  # For classification
regression_loss_fn = nn.SmoothL1Loss()  # For bounding box regression

def calculate_combined_loss(model_output, targets, lambda_reg=10):
    """
    Calculate the combined classification and regression loss for Faster R-CNN.
    
    Parameters:
    - model_output: Dict[Tensor], output from the Faster R-CNN model.
    - targets: List[Dict], ground truth boxes and labels.
    - lambda_reg: float, the balancing parameter for the regression loss.
    
    Returns:
    - total_loss: Tensor, the combined loss for the batch.
    """
    total_classification_loss = 0.0
    total_regression_loss = 0.0
    batch_size = len(targets)
    
    for i in range(batch_size):
        # Get the model output for the i-th image
        output_boxes = model_output['boxes'][i]
        output_labels = model_output['labels'][i]
        output_logits = model_output['scores'][i]
        
        # Get the ground truth for the i-th image
        target_boxes = targets[i]['boxes']
        target_labels = targets[i]['labels']
        
        # Calculate the classification loss
        classification_loss = classification_loss_fn(output_logits, target_labels)
        
        # Calculate the regression loss only for positive samples
        positive_indices = torch.where(target_labels > 0)[0]
        if len(positive_indices) > 0:
            positive_output_boxes = output_boxes[positive_indices]
            positive_target_boxes = target_boxes[positive_indices]
            regression_loss = regression_loss_fn(positive_output_boxes, positive_target_boxes)
        else:
            regression_loss = torch.tensor(0.0, device=output_boxes.device)
        
        # Accumulate the losses
        total_classification_loss += classification_loss
        total_regression_loss += regression_loss
    
    # Average the losses over the batch and combine them
    avg_classification_loss = total_classification_loss / batch_size
    avg_regression_loss = total_regression_loss / batch_size
    total_loss = avg_classification_loss + lambda_reg * avg_regression_loss
    
    return total_loss

# Mock ground truth targets
targets = [
    {
        'boxes': torch.tensor([[50, 30, 200, 180], [300, 350, 450, 500]], dtype=torch.float32),  # Example bounding boxes
        'labels': torch.tensor([0, 1], dtype=torch.int64)  # Example labels (e.g., 0 for 'background', 1 for 'pool')
    }
]

# Mock model outputs (logits for classification, assuming 2 classes)
model_output = {
    'boxes': [torch.tensor([[48, 32, 198, 178], [302, 348, 448, 498]], dtype=torch.float32)],  # Predicted boxes
    'labels': [torch.tensor([0, 1], dtype=torch.int64)],  # Predicted labels
    'scores': [torch.tensor([[2.0, 1.5], [1.0, 0.5]], dtype=torch.float32)]  # Confidence scores (logits for 2 classes)
}

# Test the function
loss = calculate_combined_loss(model_output, targets)
print(loss)

tensor(15.7241)


In [32]:
# Training loop
train_losses = []

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    running_custom_loss = 0.0
    for images, targets in train_loader:
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        # Forward pass: get loss dictionary
        loss_dict = model(images, targets)
        
        # Sum the losses from the loss dictionary
        losses = sum(loss for loss in loss_dict.values())

        # Backward pass: compute gradient and do SGD step
        optimizer.zero_grad()
        losses.backward()
        optimizer.step()
        
        train_losses.append(losses.item())
        running_loss += losses.item()
    
    # Update the learning rate
    lr_scheduler.step()
    
    # Print average losses for the epoch
    avg_loss = running_loss / len(train_loader)
    print(f"Epoch {epoch+1} Total Loss: {avg_loss:.4f}")

KeyboardInterrupt: 

In [None]:
import matplotlib.pyplot as plt

# Plot the total loss and custom loss
plt.figure(figsize=(10, 4))

# Total Loss
plt.subplot(1, 2, 1)
plt.plot(train_losses)
plt.xlabel('Iteration')
plt.ylabel('Total Loss')
plt.title('Total Loss Over Time')

plt.tight_layout()
plt.show()

In [None]:
torch.save(model.state_dict(), 'pool_detection_model_2.pth')

In [None]:
def show_ground_truth_and_prediction(dataset, idx, model):
    """
        Display a transformed image from the dataset along with its bounding boxes and the predicted bounding boxes.

    Args:
        dataset (pool_sat_Dataset): The dataset object.
        idx (int): The index of the image to display.
        model (nn.Module): The trained model.

    Returns:
        None
    """
    # Get the image and target from the dataset
    image, target = dataset[idx]

    # Move the image to the device (GPU or CPU) that the model is on
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    image = image.to(device)

    # Get the predicted bounding boxes from the model
    model.eval()
    with torch.no_grad():
        prediction = model([image])

    # Convert the image to a format that can be displayed
    image = image.permute(1, 2, 0).cpu().numpy()

    # Create a figure and axis to display the image
    fig, ax = plt.subplots(1)
    ax.imshow(image)

    # Display the ground truth bounding boxes
    for bbox in target['boxes']:
        rect = patches.Rectangle((bbox[0], bbox[1]), bbox[2]-bbox[0], bbox[3]-bbox[1], linewidth=1, edgecolor='r', facecolor='none')
        ax.add_patch(rect)

    # Display the predicted bounding boxes
    for bbox in prediction[0]['boxes'].cpu():
        rect = patches.Rectangle((bbox[0], bbox[1]), bbox[2]-bbox[0], bbox[3]-bbox[1], linewidth=1, edgecolor='g', facecolor='none')
        ax.add_patch(rect)

    plt.show()

In [None]:
show_ground_truth_and_prediction(train_dataset, 0, model)

In [None]:
# Test loop
model.eval()  # Set the model to evaluation mode
test_ious = []

with torch.no_grad():
    for images, targets in test_loader:
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        # Forward pass: get predictions
        predictions = model(images)
        
        for i, prediction in enumerate(predictions):
            pred_boxes = prediction['boxes'].cpu().numpy()
            gt_boxes = targets[i]['boxes'].cpu().numpy()
            
            if len(pred_boxes) == 0 or len(gt_boxes) == 0:
                continue
            
            ious = []
            for gt_box in gt_boxes:
                for pred_box in pred_boxes:
                    iou = calculate_iou(gt_box, pred_box)
                    ious.append(iou)
                    
            if ious:
                mean_iou = sum(ious) / len(ious)
                test_ious.append(mean_iou)

# Compute average IoU over the test set
average_iou = sum(test_ious) / len(test_ious) if test_ious else 0
print(f"Average IoU on the test set: {average_iou:.4f}")