In [77]:
import xml.etree.ElementTree as ET
import matplotlib.pyplot as plt
import matplotlib.patches as patches
from PIL import Image
import cv2
import os
import json
import numpy as np
import random
from tqdm.notebook import tqdm
from torchvision.transforms import functional as FN
import torch
import torch.nn as nn
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader

from torchvision import models
from torchsummary import summary
from torch.utils.data._utils.collate import default_collate
import torch.nn.functional as F
from torch.utils.data import WeightedRandomSampler
from torch.utils.data import Subset

from time import time
from IPython.display import clear_output

In [78]:
class PotholeDataset(Dataset):
    def __init__(self, json_file, transform=None, target_transform=None, subset=None):
        with open(json_file, 'r') as f:
            self.data = json.load(f)

        self.transform = transform
        self.target_transform = target_transform
        self.subset = subset
        self.cropped_data = []
        self.prepare_dataset()

        self.augment_transform = transforms.Compose([
            transforms.RandomHorizontalFlip(),
            transforms.RandomRotation(20),  # Rotate by ±20 degrees
            transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
            self.random_noise
        ])

    def random_noise(self, img):
        if random.random() < 0.5:  # 50% chance to add noise
            noise = torch.randn(img.size()) * 0.05
            img = img + noise
            img = torch.clamp(img, 0, 1)
        return img

    def prepare_dataset(self):
        for item in self.data:
            if self.subset is not None and item.get('subset') != self.subset:
                continue
            image_path = item['image']
            image = Image.open(image_path).convert('RGB')
            for box_info in item['boxes']:
                box = box_info['box']
                label = box_info['label']
                cropped_image = FN.crop(image, box[1], box[0], box[3], box[2])  # top, left, height, width
                if self.transform:
                    cropped_image = self.transform(cropped_image)
                self.cropped_data.append((cropped_image, label))

    def __len__(self):
        return len(self.cropped_data)

    def __getitem__(self, idx):
        image, label = self.cropped_data[idx]


        # Check if the label is positive (1), and apply augmentations if so
        if label == 1:
            image = self.augment_transform(image)

        return image, label

In [79]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(device)

cuda


In [80]:
with open('processed_images_data_ss.json', 'r') as file:
    data = json.load(file)

count_negative_class_samples = 0
count_positive_class_samples = 0

# Iterate through the data and count the labels
for entry in data:
    for box in entry['boxes']:
        if box['label'] == 0:
            count_negative_class_samples += 1
        elif box['label'] == 1:
            count_positive_class_samples += 1

print("Negative class samples count:", count_negative_class_samples)
print("Positive class samples count:", count_positive_class_samples)

Negative class samples count: 784911
Positive class samples count: 18974


In [83]:

transform = transforms.Compose([
    transforms.Resize((256, 256)),  # Resize the cropped images
    transforms.ToTensor()
])

train_dataset = PotholeDataset(json_file='processed_images_data_ss.json', transform=transform, subset='train')

# Separate indices for negative and positive class samples
negative_indices = [i for i, (_, label) in enumerate(train_dataset) if label == 0]
positive_indices = [i for i, (_, label) in enumerate(train_dataset) if label == 1]

# Randomly downsample negative class
random.shuffle(negative_indices)
downsampled_negative_indices = negative_indices[:len(positive_indices)]

# Combine positive and downsampled negative indices
balanced_indices = downsampled_negative_indices + positive_indices
random.shuffle(balanced_indices)  # Shuffle to mix positive and negative samples

# Create a subset of the dataset using the balanced indices
balanced_train_dataset = Subset(train_dataset, balanced_indices)

# DataLoader using the balanced dataset
train_loader = DataLoader(balanced_train_dataset, batch_size=4, shuffle=True, num_workers=3)

test_dataset = PotholeDataset(json_file='processed_images_data_ss.json', transform=transform, subset='test')
test_loader = DataLoader(test_dataset, batch_size=4, shuffle=False, num_workers=3)

: 

: 

In [82]:
images, labels = next(iter(test_loader))
print (len(images))
plt.figure(figsize=(20,10))

count = 0
count_b = 0
# Assuming 'labels' is a tensor containing the labels for the images
for minibatch_no, (data, target) in tqdm(enumerate(test_loader), total=len(test_loader)):
    
# Count the number of labels that are equal to 1
    count += (target == 1).sum().item()
    count_b += (target == 0).sum().item()
# Printing the count
print("Number of labels equal to 1:", count)
print("Number of labels equal to 0:", count_b)


4


  0%|          | 0/1565 [00:00<?, ?it/s]

Number of labels equal to 1: 80
Number of labels equal to 0: 6180


<Figure size 2000x1000 with 0 Axes>

In [7]:
images, labels = next(iter(train_loader))
print (len(images))
plt.figure(figsize=(20,10))

count = 0
count_b = 0
# Assuming 'labels' is a tensor containing the labels for the images
for minibatch_no, (data, target) in tqdm(enumerate(train_loader), total=len(train_loader)):
    
# Count the number of labels that are equal to 1
    count += (target == 1).sum().item()
    count_b += (target == 0).sum().item()
# Printing the count
print("Number of labels equal to 1:", count)
print("Number of labels equal to 0:", count_b)

4


  0%|          | 0/292 [00:00<?, ?it/s]

Number of labels equal to 1: 583
Number of labels equal to 0: 583


<Figure size 2000x1000 with 0 Axes>

In [9]:
#We define the training as a function so we can easily re-use it.
def train(model, optimizer, num_epochs):
    out_dict = {'train_acc': [],
              'test_acc': [],
              'train_loss': [],
              'test_loss': []}
  
    for epoch in tqdm(range(num_epochs), unit='epoch'):
        model.train()
        #For each epoch
        train_correct = 0
        train_loss = []
        for minibatch_no, (data, target) in tqdm(enumerate(train_loader), total=len(train_loader)):
            data, target = data.to(device), target.to(device)
            #Zero the gradients computed for each weight
            optimizer.zero_grad()
            #Forward pass your image through the network
            output = model(data)
            #Compute the loss
            loss = nn.CrossEntropyLoss()(output, target)
            #Backward pass through the network
            loss.backward()
            #Update the weights
            optimizer.step()

            train_loss.append(loss.item())
            #Compute how many were correctly classified
            predicted = output.argmax(1)
            train_correct += (target==predicted).sum().cpu().item()
        #Comput the test accuracy
        test_loss = []
        test_correct = 0
        model.eval()
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            with torch.no_grad():
                output = model(data)
            test_loss.append(nn.CrossEntropyLoss()(output, target).cpu().item())
            predicted = output.argmax(1)
            test_correct += (target==predicted).sum().cpu().item()
        out_dict['train_acc'].append(train_correct/len(balanced_train_dataset ))
        out_dict['test_acc'].append(test_correct/len(test_dataset))
        out_dict['train_loss'].append(np.mean(train_loss))
        out_dict['test_loss'].append(np.mean(test_loss))
        print(f"Loss train: {np.mean(train_loss):.3f}\t test: {np.mean(test_loss):.3f}\t",
              f"Accuracy train: {out_dict['train_acc'][-1]*100:.1f}%\t test: {out_dict['test_acc'][-1]*100:.1f}%")
    return out_dict

In [10]:
model_ft = models.resnet50()
num_ftrs = model_ft.fc.in_features
# Here the size of each output sample is set to 2.
# Alternatively, it can be generalized to ``nn.Linear(num_ftrs, len(class_names))``.
model_ft.fc = nn.Linear(num_ftrs, 2)

model_ft = model_ft.to(device)

criterion = nn.CrossEntropyLoss()

# Observe that all parameters are being optimized
optimizer_adam = torch.optim.Adam(model_ft.parameters(), lr=0.0001, weight_decay=0.01)
optimizer_ft = torch.optim.SGD(model_ft.parameters(), lr=0.001, momentum=0.9)


# Decay LR by a factor of 0.1 every 7 epochs
exp_lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)

In [11]:
out_dict = train(model_ft, optimizer_adam, num_epochs=5)

  0%|          | 0/5 [00:00<?, ?epoch/s]

  0%|          | 0/292 [00:00<?, ?it/s]

Loss train: 0.439	 test: 0.058	 Accuracy train: 79.1%	 test: 98.6%


  0%|          | 0/292 [00:00<?, ?it/s]

Loss train: 0.213	 test: 0.032	 Accuracy train: 93.9%	 test: 99.6%


  0%|          | 0/292 [00:00<?, ?it/s]

Loss train: 0.111	 test: 0.034	 Accuracy train: 96.9%	 test: 99.5%


  0%|          | 0/292 [00:00<?, ?it/s]

Loss train: 0.108	 test: 0.130	 Accuracy train: 97.2%	 test: 95.2%


  0%|          | 0/292 [00:00<?, ?it/s]

Loss train: 0.092	 test: 0.218	 Accuracy train: 97.8%	 test: 96.1%


In [27]:
torch.save(model_ft.state_dict(),"whatever.pt")