In [None]:
path = 'Potholes/annotated-images/'
splits = 'Potholes/splits.json'
# path = r'C:/Users/Frederik/Programming/IntroductionToComputerVison/Project_3/Potholes/annotated-images/'
# splits = r'C:/Users/Frederik/Programming/IntroductionToComputerVison/Project_3/Potholes/splits.json'


In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.models as models
from torchvision.ops import RoIAlign
import cv2
import numpy as np
import matplotlib.pyplot as plt
from torchvision.transforms import functional as Fs
from torchvision import transforms as v2
import torchvision.ops as ops
import torch
import json
from xml.etree import ElementTree as ET
from torch.utils.data import DataLoader
from torchvision.io import decode_image
from torchvision.utils import draw_bounding_boxes
from sklearn.model_selection import train_test_split
from torchvision import tv_tensors
from torchvision.transforms import v2
import numpy as np


# speed-up using multithreads
cv2.setUseOptimized(True)
cv2.setNumThreads(8)

def selective_search(image_path, num_rects, quality=True):
    ss = cv2.ximgproc.segmentation.createSelectiveSearchSegmentation()

    image = cv2.imread(image_path)
    ss.setBaseImage(image)
    
    if quality:
        ss.switchToSelectiveSearchQuality()
    else:
        ss.switchToSelectiveSearchFast()
        
    rects = ss.process()

    return rects[:num_rects]


def show_selective_search(image, rects):
    imOut = image.copy()

    # itereate over all the region proposals
    for _, rect in enumerate(rects):
        # draw rectangle for region proposal
        x, y, w, h = rect
        color = list(np.random.random(size=3) * 256)
        cv2.rectangle(imOut, (x, y), (x+w, y+h), color, 2, cv2.LINE_AA)

    plt.imshow(imOut[...,::-1])
    plt.axis('off')

def read_xml(path: str) -> list:  
    tree = ET.parse(path)
    root = tree.getroot()

    obj_list = []

    for obj in root.iter('object'):

        ymin = int(obj.find("bndbox/ymin").text)
        xmin = int(obj.find("bndbox/xmin").text)
        ymax = int(obj.find("bndbox/ymax").text)
        xmax = int(obj.find("bndbox/xmax").text)

        bbox = (xmin, ymin, xmax, ymax)
        obj_list.append(bbox)
    
    return obj_list

def collate_fn(batch):
    return tuple(zip(*batch))

def visualize_boxes(images, annotations):
    all_images = []
    all_overlay = []

    for image, annotation in zip(images, annotations):
        
        overlay = draw_bounding_boxes(image, annotation, width=2)
        all_images.append(image)
        all_overlay.append(overlay)
    
    fig, axes = plt.subplots(len(all_images), 2, figsize=(10, len(all_images) * 5))
    
    for idx, image in enumerate(all_images):
        axes[idx, 0].imshow(image.permute(1,2,0))
        axes[idx, 0].axis('off')

        axes[idx, 1].imshow(all_overlay[idx].permute(1,2,0))
        axes[idx, 1].axis('off')

    plt.tight_layout()
    plt.subplots_adjust(wspace=0.02, hspace=0.02)
    plt.show()
        
class Network(nn.Module):
    def __init__(self):
        super(Network, self).__init__()

        self.convolutional1 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)  # 224x224 -> 112x112
        )
        
        self.convolutional2 = nn.Sequential(
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)  # 112x112 -> 56x56
        )
        
        self.convolutional3 = nn.Sequential(
            nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)  # 56x56 -> 28x28
        )
        
        self.convolutional4 = nn.Sequential(
            nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)  # 28x28 -> 14x14
        )

        self.convolutional5 = nn.Sequential(
            nn.Conv2d(512, 1024, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(1024),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)  # 14x14 -> 7x7
        )

        self.convolutional6 = nn.Sequential(
            nn.Conv2d(1024, 2048, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(2048),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2)  # 7x7 -> 3x3
        )

        # Fully connected layer
        self.fully_connected = nn.Sequential(
            nn.Linear(2048 * 3 * 3, 2048), 
            nn.BatchNorm1d(2048),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(2048, 512),
            nn.BatchNorm1d(512),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, 1)
        )

    def forward(self, x):
        x = self.convolutional1(x)
        x = self.convolutional2(x)
        x = self.convolutional3(x)
        x = self.convolutional4(x)
        x = self.convolutional5(x)
        x = self.convolutional6(x)

        x = x.view(x.size(0), -1)

        x = self.fully_connected(x)
        x = x.squeeze(1) 

        return x




In [15]:
# import torch
# import json
# import matplotlib.pyplot as plt
# import torchvision.ops as ops
# from xml.etree import ElementTree as ET
# from torch.utils.data import DataLoader
# from torchvision import tv_tensors
# from torchvision.transforms import v2
# from torchvision.io import decode_image
# from torchvision.utils import draw_bounding_boxes
# from sklearn.model_selection import train_test_split

class Pothole_Dataloader(torch.utils.data.Dataset):
    def __init__(self, image_paths, mask_paths, num_rects = 1000, pos_thresh=0.5, neg_thresh=0.3, size=256, val=False, device="cpu"):
        self.image_paths = image_paths
        self.mask_paths = mask_paths
        self.num_rects = num_rects
        self.pos_thresh = pos_thresh
        self.neg_thresh = neg_thresh
        self.size = size
        self.val = val
        self.device = device
        #These transforms can be changed in the future to a more appropriate augmentation, this is just a proof-of-concept placeholder.
        #Resize is unecessary i think
        self.transforms = v2.Compose([
                            v2.RandomHorizontalFlip(),
                            v2.RandomVerticalFlip()
                          ])

    def __len__(self):
        return len(self.image_paths)
    

    def __getitem__(self, idx):
        image = decode_image(self.image_paths[idx])
        boxes = tv_tensors.BoundingBoxes(read_xml(self.mask_paths[idx]), 
                                        format="XYXY", canvas_size=image.shape[-2:])
        
        if not self.val:
            # Generate region proposals with selective search
            regions = ops.box_convert(torch.tensor(selective_search(self.image_paths[idx], num_rects=self.num_rects, quality=False)), "xywh", "xyxy")
            region_boxes = tv_tensors.BoundingBoxes(regions, format="XYXY", canvas_size=image.shape[-2:])
            
            # Calculate IoU between region proposals and ground-truth boxes
            ious = ops.box_iou(region_boxes.to(self.device), boxes.to(self.device))

            # Select proposals with IoU >= pos_thresh as positive samples
            pos_indices = (ious.max(dim=1)[0] >= self.pos_thresh).nonzero(as_tuple=True)[0]
            pos_samples = region_boxes[pos_indices]

            # Select proposals with IoU < neg_thresh as background samples
            bg_indices = (ious.max(dim=1)[0] < self.neg_thresh).nonzero(as_tuple=True)[0]
            num_bg = int(len(pos_samples) * 4)  # Make background samples 80% of total proposals
            bg_samples = region_boxes[bg_indices[:num_bg]]

            # Concatenate positive and background samples
            selected_regions = torch.cat([pos_samples, bg_samples], dim=0)
            region_labels = torch.cat([torch.ones(len(pos_samples)), torch.zeros(num_bg)])

            # Directly return the untransformed values
            return image, boxes, regions, selected_regions, region_labels

        else:
            regions = selected_regions = region_labels = []
        
        return image, boxes, regions, selected_regions, region_labels



train_mask_list = [path + f for f in json.load(open(splits))['train']]
val_mask_list = [path + f for f in json.load(open(splits))['test']]
train_img_list = [filename.replace('xml', 'jpg') for filename in train_mask_list]
val_img_list = [filename.replace('xml', 'jpg') for filename in val_mask_list]

In [None]:
size = 512 #image size 
batch_size = 16
num_workers = 1

val_img_list, test_img_list, val_mask_list, test_mask_list = train_test_split(val_img_list, val_mask_list, train_size=.5)

trainset = Pothole_Dataloader(train_img_list, train_mask_list, size=size, val=False)
valset = Pothole_Dataloader(val_img_list, val_mask_list, size=size, val=True)
testset = Pothole_Dataloader(test_img_list, test_mask_list, size=size, val=True)

train_loader = DataLoader(trainset, batch_size=batch_size, shuffle=True, num_workers=num_workers, collate_fn=collate_fn)
val_loader= DataLoader(valset, batch_size=batch_size, shuffle=False, num_workers=num_workers, collate_fn=collate_fn)
test_loader = DataLoader(testset, batch_size=batch_size, shuffle=False, num_workers=num_workers, collate_fn=collate_fn)

In [17]:
images, boxes, regions, selected_regions, region_labels = next(iter(train_loader))

In [None]:
def crop_and_resize(image, box, target_size=(224, 224)):
    xmin, ymin, xmax, ymax = box

    # Ensure image is in the correct format
    if isinstance(image, torch.Tensor):
        image = image.permute(1, 2, 0).numpy()  # Convert from [C, H, W] to [H, W, C] if tensor

    cropped_image = image[int(ymin):int(ymax), int(xmin):int(xmax)]

    transform = v2.Compose([
        v2.ToPILImage(),  
        v2.Resize(target_size),  
        v2.ToTensor()  
    ])

    resized_image = transform(cropped_image)
    return resized_image


def process_selected_regions(image_resized, selected_regions, target_size=(224, 224)):
    """
    This function processes the selected regions for a given image, crops out each region,
    resizes them, and returns a tensor of cropped and resized regions.
    Sometimes there are none in the selected_regions for that image, so that image gets skipped.
    """
    cropped_resized_regions = []

    for region in selected_regions:
        # Ensure that we have 4 values (xmin, ymin, xmax, ymax) for the region
        if region.numel() == 4:
            cropped_resized_regions.append(crop_and_resize(image_resized, region, target_size=target_size))
        else:
            print(f"Invalid region size for {region}, skipping this region.")

    # Stack the regions into a single tensor (shape: (N, C, 224, 224))
    if len(cropped_resized_regions) > 0:
        cropped_resized_regions = torch.stack(cropped_resized_regions)
    else:
        print("No valid regions found for this image.")
        cropped_resized_regions = torch.empty(0)  # Return empty tensor if no regions

    return cropped_resized_regions


def process_images(images, selected_regions, target_size=(224, 224)):
    """
    This function processes the entire batch from the dataLoader by iterating over all images
    and their selected regions, cropping, resizing and returning the processed patches.
    """
    all_cropped_resized_regions = []

    for i in range(len(images)):  #
        image_resized = images[i].permute(1, 2, 0).numpy()  # [C, H, W] to [H, W, C]
        regions = selected_regions[i]  

        # Check if selected_regions are empty for the current image
        if regions.size(0) == 0:
            print(f"Warning: No selected regions for image {i}")
            continue

        # Process the selected regions for the current image
        cropped_resized_regions = process_selected_regions(image_resized, regions, target_size)
        
        # Check if any regions were processed
        if cropped_resized_regions.numel() == 0:
            print(f"Warning: No valid regions found for image {i}, skipping this image.")
            continue

        all_cropped_resized_regions.append(cropped_resized_regions)

    if len(all_cropped_resized_regions) > 0:
        return torch.cat(all_cropped_resized_regions, dim=0)
    else:
        print("Warning: No valid regions were found in the batch.")
        return torch.empty(0)  # Return empty tensor if no valid regions in the batch


def process_label(images, selected_regions, region_labels):
    """
    This function processes the entire batch from the dataLoader by iterating over all images
    and their selected regions, extracting the region labels and returning a flattened tensor of labels.
    """
    all_labels = []

    for i in range(len(images)):
        labels = region_labels[i]  #

        if labels.size(0) == 0:
            print(f"Warning: No labels for image {i}, skipping this image.")
            continue

        all_labels.append(labels)

    if len(all_labels) > 0:
        all_labels_flattened = torch.cat(all_labels)
        return all_labels_flattened
    else:
        print("Warning: No valid labels in the batch.")
        return torch.empty(0)  # Return empty tensor if no valid labels


# images, boxes, regions, selected_regions, region_labels = next(iter(train_loader))

# cropped_resized_regions = process_images(images, selected_regions, target_size=(224, 224))
# flattened_labels = process_label(images, selected_regions, region_labels)


In [7]:
# print(cropped_resized_regions.shape)
# print(flattened_labels.shape)

# small_batches = torch.split(cropped_resized_regions, 32)
# len(small_batches)

In [None]:
import torch
import torch.nn as nn
from tqdm import tqdm

# CNN Model
model = Network()
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=0.001)

# Set batch size
mini_batch = 32 # Since processing the image and labels we might end up with 200+ batches, so I make a mini batch to limit vram usage.
epochs = 1  

for epoch in range(epochs):
    model.train()  
    
    total_loss = 0.0  
    total_acc = 0.0  
    
    for images, boxes, regions, selected_regions, region_labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}", unit="batch"):
        
        cropped_resized_regions = process_images(images, selected_regions, target_size=(224, 224))
        flattened_labels = process_label(images, selected_regions, region_labels)

        # Skip batch if no valid regions were found in any image
        if cropped_resized_regions.size(0) == 0 or flattened_labels.size(0) == 0:
            continue

        # Only one valid region found mean BatchNorm will fail.
        if cropped_resized_regions.size(0) == 1:
            continue
        cropped_resized_regions = cropped_resized_regions.to(device)
        flattened_labels = flattened_labels.to(device)

        # Check if the batch is smaller than mini_batch
        if len(cropped_resized_regions) < mini_batch:
            # If only 1 region, don't split, just use the region directly
            image_batches = [cropped_resized_regions]
            label_batches = [flattened_labels]
        else:
            image_batches = torch.split(cropped_resized_regions, mini_batch)
            label_batches = torch.split(flattened_labels, mini_batch)

        # Loop through mini-batches
        for image_batch, label_batch in zip(image_batches, label_batches):
            
            # some batches have no things in, which I pnnder is due to Selective Search threshold, so I skip over these for now.
            if image_batch.size(0) == 0:
                continue
            optimizer.zero_grad()
            outputs = model(image_batch)

            loss = criterion(outputs, label_batch.float()) 
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

            # test stuff for now
            # predicted = torch.sigmoid(outputs) > 0.5  
            # acc = (predicted == label_batch).float().mean()
            # total_acc += acc.item()
            

    # Print the average loss and accuracy for the epoch
    avg_loss = total_loss / len(train_loader)
    # avg_acc = total_acc / len(train_loader)
    print(f"Epoch {epoch+1} - Avg Loss: {avg_loss:.4f}")
    #print(f"Epoch {epoch+1} - Avg Loss: {avg_loss:.4f}, Avg Accuracy: {avg_acc:.4f}")


Epoch 1/1:   0%|          | 0/34 [00:10<?, ?batch/s]


KeyboardInterrupt: 

### Pre-trained model

In [None]:
import torch
import torch.nn as nn
from torchvision import models, transforms
from tqdm import tqdm

# Pre-trained model setup
model = models.resnet50(pretrained=True)
model.fc = nn.Linear(model.fc.in_features, 1)  
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

criterion = nn.BCEWithLogitsLoss()
optimizer = torch.optim.AdamW(model.parameters(), lr=0.001)

mini_batch = 32
epochs = 1

for epoch in range(epochs):
    model.train()
    
    total_loss = 0.0
    
    for images, boxes, regions, selected_regions, region_labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{epochs}", unit="batch"):
        
        cropped_resized_regions = process_images(images, selected_regions, target_size=(224, 224))

        flattened_labels = process_label(images, selected_regions, region_labels)

        if cropped_resized_regions.size(0) == 0 or flattened_labels.size(0) == 0:
            continue

        if cropped_resized_regions.size(0) == 1:
            continue
        cropped_resized_regions = cropped_resized_regions.to(device)
        flattened_labels = flattened_labels.to(device)

        if len(cropped_resized_regions) < mini_batch:
            image_batches = [cropped_resized_regions]
            label_batches = [flattened_labels]
        else:
            image_batches = torch.split(cropped_resized_regions, mini_batch)
            label_batches = torch.split(flattened_labels, mini_batch)

        for image_batch, label_batch in zip(image_batches, label_batches):
            if image_batch.size(0) == 0:
                continue
            optimizer.zero_grad()
            outputs = model(image_batch)

            loss = criterion(outputs.view(-1), label_batch.float())  # Flatten outputs to match target shape
            loss.backward()
            optimizer.step()

            total_loss += loss.item()

    # Average loss for the epoch
    avg_loss = total_loss / len(train_loader)
    print(f"Epoch {epoch+1} - Avg Loss: {avg_loss:.4f}")

Epoch 1/1:   0%|          | 0/34 [06:01<?, ?batch/s]


KeyboardInterrupt: 