In [87]:
import torchvision
from torch.utils.data import DataLoader, Dataset
import torch
import os
from PIL import Image
import numpy as np
import random
import matplotlib.pyplot as plt
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms
import torch.nn.functional as F


loading the dataset

In [88]:
import kagglehub

# Download the dataset
path = kagglehub.dataset_download("kylegraupe/wind-turbine-image-dataset-for-computer-vision")

print("Path to dataset files:", path)
# Load dataset configuration
dataset_path = path + "/data.yaml"

# Check dataset information
print(open(dataset_path).read())

Path to dataset files: C:\Users\trh00\.cache\kagglehub\datasets\kylegraupe\wind-turbine-image-dataset-for-computer-vision\versions\12
train: ../train/images
val: ../valid/images
test: ../test/images

nc: 2
names: ['cable tower', 'turbine']

roboflow:
  workspace: kyle-graupe-jobhn
  project: wind-farms
  version: 5
  license: CC BY 4.0
  url: https://universe.roboflow.com/kyle-graupe-jobhn/wind-farms/dataset/5


In [89]:
dataset_path = r'C:\Users\trh00\.cache\kagglehub\datasets\kylegraupe\wind-turbine-image-dataset-for-computer-vision\versions\12'


putting the data in dataloaders

In [90]:
train_imgs_path_list = os.listdir(dataset_path + '/train/images')
train_labels_path_list = os.listdir(dataset_path + '/train/labels')

val_imgs_path_list = os.listdir(dataset_path + '/valid/images')
val_labels_path_list = os.listdir(dataset_path + '/valid/labels')

test_imgs_path_list = os.listdir(dataset_path + '/test/images')
test_labels_path_list = os.listdir(dataset_path + '/test/labels')


In [91]:
transform = torchvision.transforms.Compose([
            # torchvision.transforms.Resize((256, 256)),
            torchvision.transforms.ToTensor(),
            torchvision.transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5])
        ])
def custom_collate_fn(data):
    return tuple(zip(*data))

class WindTurbineDataset(Dataset):
    def __init__(self,datasetPath, datatype,  imgPaths, labelPaths, transform=None):
        self.imgPaths = imgPaths
        self.labelPaths = labelPaths
        self.transform = transform
        self.datasetPath = datasetPath
        self.datatype = datatype
    def __len__(self):
        return len(self.imgPaths)
    
    def __getitem__(self, idx):

        img_path = os.path.join(self.datasetPath, self.datatype, 'images', self.imgPaths[idx])
        # Ensure path exists before loading
        if not os.path.exists(img_path):
            raise FileNotFoundError(f"Image file not found: {img_path}")
    
        img = Image.open(img_path).convert("RGB")  # Convert to RGB to avoid grayscale issues        # labels are in a text file, where each line is a bounding box
       
        label_path = os.path.join(self.datasetPath, self.datatype, 'labels', self.labelPaths[idx])


        with open(label_path, 'r') as file:
            label_lines = file.read().splitlines()
        boxes = []
        labels = []
        for line in label_lines:
            values = list(map(float, line.split()))
            labels.append(int(values[0]))  # First value is the label
            boxes.append(values[1:])  # Remaining values are the bounding box coordinates
        boxes = torch.tensor(boxes, dtype=torch.float32)
        labels = {'boxes': boxes, 'labels': torch.tensor(labels, dtype=torch.int64)}

        

        if self.transform:
            img = self.transform(img)
        return img, labels
    
train_dataset = WindTurbineDataset(dataset_path, 'train', train_imgs_path_list, train_labels_path_list, transform)
val_dataset = WindTurbineDataset(dataset_path, 'valid', val_imgs_path_list, val_labels_path_list, transform)
test_dataset = WindTurbineDataset(dataset_path, 'test', test_imgs_path_list, test_labels_path_list, transform)

train_loader = DataLoader(train_dataset, batch_size=4, shuffle=True, collate_fn=custom_collate_fn)
val_loader = DataLoader(val_dataset, batch_size=4, shuffle=True, collate_fn=custom_collate_fn)
test_loader = DataLoader(test_dataset, batch_size=4, shuffle=True, collate_fn=custom_collate_fn)


In [92]:
# Mappings of label names (found in dataset annotation) to integer IDs (or classes) which we will feed to the model
voc_classes = {
    "Turbine": 0,
    "CableTower": 1,
}

#  Reverse of label to class id mapping. needed because the model predictions will be ids and we need to change it to label to visualize it.
reverse_voc_classes = {v: k for k, v in voc_classes.items()}


In [93]:
# write code to check the how would the first target look like

for img, target in train_loader:
    print(target[0])
    break

{'boxes': tensor([[0.5801, 0.5135, 0.8398, 0.8781],
        [0.5236, 0.7530, 0.3562, 0.3603],
        [0.3930, 0.8082, 0.1935, 0.2552],
        [0.5977, 0.8843, 0.1028, 0.0812]]), 'labels': tensor([1, 1, 1, 1])}


defining the model

In [94]:
import torchvision

# Load pretrained Faster R-CNN model
model = torchvision.models.detection.fasterrcnn_mobilenet_v3_large_fpn(pretrained=True)

# Change number of output classes to match Pascal VOC dataset
num_classes = 2  # Pascal VOC has 20 object classes
in_features = model.roi_heads.box_predictor.cls_score.in_features  # Input features for predictor

# Replace final layer with new predictor
model.roi_heads.box_predictor = torchvision.models.detection.faster_rcnn.FastRCNNPredictor(in_features, num_classes)


# Freeze the backbone and just finetune the head (You can finetune the whole model, but it'd take time and resources)
model.requires_grad_(False)
model.roi_heads.box_predictor = model.roi_heads.box_predictor.requires_grad_(True)


# Move model to GPU if available
device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device)




FasterRCNN(
  (transform): GeneralizedRCNNTransform(
      Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
      Resize(min_size=(800,), max_size=1333, mode='bilinear')
  )
  (backbone): BackboneWithFPN(
    (body): IntermediateLayerGetter(
      (0): Conv2dNormActivation(
        (0): Conv2d(3, 16, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
        (1): FrozenBatchNorm2d(16, eps=1e-05)
        (2): Hardswish()
      )
      (1): InvertedResidual(
        (block): Sequential(
          (0): Conv2dNormActivation(
            (0): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), groups=16, bias=False)
            (1): FrozenBatchNorm2d(16, eps=1e-05)
            (2): ReLU(inplace=True)
          )
          (1): Conv2dNormActivation(
            (0): Conv2d(16, 16, kernel_size=(1, 1), stride=(1, 1), bias=False)
            (1): FrozenBatchNorm2d(16, eps=1e-05)
          )
        )
      )
      (2): InvertedResidual(
        (block):

defining the training loop

In [95]:
from tqdm import tqdm
import torch

def convert_yolo_to_faster_rcnn(yolo_boxes, img_width, img_height):
    """
    Convert YOLO format bounding boxes to Faster R-CNN format.

    YOLO format: (x_center, y_center, width, height)
    Faster R-CNN format: (xmin, ymin, xmax, ymax)

    Args:
        yolo_boxes (Tensor): Bounding boxes in YOLO format.
        img_width (int): Width of the image.
        img_height (int): Height of the image.

    Returns:
        Tensor: Converted bounding boxes in (xmin, ymin, xmax, ymax) format.
    """
    if yolo_boxes.numel() == 0:  # No bounding boxes case
        return torch.zeros((0, 4), dtype=torch.float32)

    if yolo_boxes.dim() == 1:  # Fix case where tensor is (4,) instead of (N,4)
        yolo_boxes = yolo_boxes.unsqueeze(0)  # Convert to (1,4)

    # Convert YOLO (x_center, y_center, width, height) to (xmin, ymin, xmax, ymax)
    x_center = yolo_boxes[:, 0] * img_width
    y_center = yolo_boxes[:, 1] * img_height
    width = yolo_boxes[:, 2] * img_width
    height = yolo_boxes[:, 3] * img_height

    xmin = x_center - (width / 2)
    ymin = y_center - (height / 2)
    xmax = x_center + (width / 2)
    ymax = y_center + (height / 2)

    return torch.stack((xmin, ymin, xmax, ymax), dim=1)  # Ensure correct shape (N,4)


def train_one_epoch(model, dataloader, optimizer, device):
    model.train()  
    total_loss = 0

    for images, targets in tqdm(dataloader):
        images = list(img.to(device) for img in images)  # Move images to device

        converted_targets = []
        for target in targets:
            boxes = target["boxes"]  # Bounding boxes in YOLO format
            labels = target["labels"]  # Class labels

            # Get image dimensions
            img_width, img_height = images[0].shape[2], images[0].shape[1]  # (C, H, W)

            # Convert YOLO to Faster R-CNN format
            boxes = convert_yolo_to_faster_rcnn(boxes, img_width, img_height)

            converted_targets.append({
                "boxes": boxes.to(device),
                "labels": labels.to(device)
            })

        # Compute losses
        loss_dict = model(images, converted_targets)
        losses = sum(loss for loss in loss_dict.values())  # Sum all losses

        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

        total_loss += losses.item()

    avg_loss = total_loss / len(dataloader)
    return avg_loss


In [96]:
from torchmetrics.detection.mean_ap import MeanAveragePrecision

# Initialize metric
metric = MeanAveragePrecision(iou_thresholds=[0.5, 0.55, 0.6, 0.65, 0.7, 0.75, 0.8, 0.85, 0.9, 0.95])

def validate(model, dataloader, device):
    """Evaluates the model using mAP@0.5:0.95."""
    model.eval()
    metric.reset()
    
    with torch.no_grad():
        for images, targets in tqdm(dataloader):
            images = [img.to(device) for img in images]
            preds = model(images)

            # Convert predictions to correct format
            processed_preds = []
            for pred in preds:
                processed_preds.append({
                    "boxes": pred["boxes"].cpu(),
                    "scores": pred["scores"].cpu(),
                    "labels": pred["labels"].cpu()
                })

            # Convert ground truth targets
            processed_targets = []
            for target in targets:
                boxes = target["boxes"]
                labels = target["labels"]
                processed_targets.append({
                    "boxes": boxes.cpu(),
                    "labels": labels.cpu()
                })
                 

            # Update metric
            metric.update(processed_preds, processed_targets)

    return metric.compute()  # Compute final mAP scores

In [98]:
optimizer = torch.optim.AdamW(model.parameters(), lr=0.0001)
num_epochs = 10  # Set number of epochs

for epoch in range(num_epochs):
    train_loss = train_one_epoch(model, train_loader, optimizer, device)
    mAP_results = validate(model, val_loader, device)
    
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {train_loss:.4f}")
    # print(f"mAP@0.5:0.95 for Test: {mAP_results['map']:.4f}")

100%|██████████| 661/661 [12:24<00:00,  1.13s/it]
100%|██████████| 62/62 [01:15<00:00,  1.22s/it]


Epoch 1/10, Loss: 1.5564


100%|██████████| 661/661 [11:46<00:00,  1.07s/it]
100%|██████████| 62/62 [00:54<00:00,  1.15it/s]


Epoch 2/10, Loss: 1.4534


100%|██████████| 661/661 [09:01<00:00,  1.22it/s]
100%|██████████| 62/62 [00:41<00:00,  1.48it/s]


Epoch 3/10, Loss: 1.4679


 38%|███▊      | 254/661 [03:10<05:04,  1.34it/s]


KeyboardInterrupt: 