In [None]:
import os
import cv2
import copy
import torch
import random
import zipfile
import torchvision
import numpy as np
import xml.etree.ElementTree as ET
import torchvision.transforms as T
from tqdm import tqdm
from PIL import Image
# from google.colab import drive
import torch.optim as optim
from torchvision import transforms
from torchvision.ops import box_iou
from torchvision.models.detection import _utils
from torch.utils.data import Dataset, DataLoader
from torchvision.models.detection import ssd300_vgg16
from torchvision.models.detection.ssd import SSD300_VGG16_Weights, SSDClassificationHead

In [None]:
# drive.mount('/content/drive')
zip_path = '/content/cans.zip'
extract_path = '/content/'

os.makedirs(extract_path, exist_ok=True)

with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_path)

In [None]:
def create_pascal_voc_annotation(img_name, img_size, boxes, labels, output_file):
    annotation = ET.Element('annotation')

    ET.SubElement(annotation, 'filename').text = img_name
    size = ET.SubElement(annotation, 'size')
    ET.SubElement(size, 'width').text = str(img_size[0])
    ET.SubElement(size, 'height').text = str(img_size[1])
    ET.SubElement(size, 'depth').text = '3'

    for box, label in zip(boxes, labels):
        obj = ET.SubElement(annotation, 'object')
        ET.SubElement(obj, 'name').text = label
        bndbox = ET.SubElement(obj, 'bndbox')
        ET.SubElement(bndbox, 'xmin').text = str(int(box[0]))
        ET.SubElement(bndbox, 'ymin').text = str(int(box[1]))
        ET.SubElement(bndbox, 'xmax').text = str(int(box[2]))
        ET.SubElement(bndbox, 'ymax').text = str(int(box[3]))

    tree = ET.ElementTree(annotation)
    tree.write(output_file)

def preprocess_folder_based_dataset(
    root_folder,
    output_image_folder,
    output_annot_folder,
    resize_width=300,
    resize_height=300,
    augmentations=True,
    num_augmentations=3
):
    os.makedirs(output_image_folder, exist_ok=True)
    os.makedirs(output_annot_folder, exist_ok=True)

    classes = os.listdir(root_folder)

    for cls in classes:
        class_folder = os.path.join(root_folder, cls)
        if not os.path.isdir(class_folder):
            continue

        img_files = [f for f in os.listdir(class_folder) if f.lower().endswith(('.jpg', '.jpeg', '.png'))]

        for img_name in tqdm(img_files, desc=f"Processing {cls}"):
            img_path = os.path.join(class_folder, img_name)
            image = cv2.imread(img_path)
            if image is None:
                print(f"[Warning] Could not read image {img_path}, skipping.")
                continue

            height, width = image.shape[:2]
            boxes = [[0, 0, width, height]]
            labels = [cls]

            resized_image = cv2.resize(image, (resize_width, resize_height))
            x_scale = resize_width / width
            y_scale = resize_height / height
            resized_boxes = [
                [
                    int(box[0] * x_scale),
                    int(box[1] * y_scale),
                    int(box[2] * x_scale),
                    int(box[3] * y_scale)
                ]
                for box in boxes
            ]

            output_img_path = os.path.join(output_image_folder, f"{cls}_{img_name}")
            output_xml_path = os.path.join(output_annot_folder, f"{cls}_{os.path.splitext(img_name)[0]}.xml")

            cv2.imwrite(output_img_path, resized_image)
            create_pascal_voc_annotation(os.path.basename(output_img_path), (resize_width, resize_height), resized_boxes, labels, output_xml_path)

            if augmentations:
                for aug_idx in range(1, num_augmentations + 1):
                    aug_image = resized_image.copy()
                    aug_boxes = copy.deepcopy(resized_boxes)

                    if random.random() > 0.5:
                        aug_image = cv2.flip(aug_image, 1)
                        for b in aug_boxes:
                            b[0], b[2] = resize_width - b[2], resize_width - b[0]
                    if random.random() > 0.5:
                        aug_image = cv2.flip(aug_image, 0)
                        for b in aug_boxes:
                            b[1], b[3] = resize_height - b[3], resize_height - b[1]

                    aug_img_name = f"{cls}_{os.path.splitext(img_name)[0]}_aug{aug_idx}.jpg"
                    aug_xml_name = f"{cls}_{os.path.splitext(img_name)[0]}_aug{aug_idx}.xml"

                    cv2.imwrite(os.path.join(output_image_folder, aug_img_name), aug_image)
                    create_pascal_voc_annotation(aug_img_name, (resize_width, resize_height), aug_boxes, labels, os.path.join(output_annot_folder, aug_xml_name))

preprocess_folder_based_dataset(
    root_folder='/content/cans',
    output_image_folder='/content/cans_augmented/images',
    output_annot_folder='/content/cans_augmented/annotations',
    resize_width=300,
    resize_height=300,
    augmentations=True,
    num_augmentations=3
)

Processing defect: 100%|██████████| 430/430 [00:02<00:00, 214.47it/s]
Processing non: 100%|██████████| 3447/3447 [00:20<00:00, 165.98it/s]


In [None]:
def check_annotations(xml_folder):
    for xml_file in os.listdir(xml_folder):
        if xml_file.endswith('.xml'):
            try:
                tree = ET.parse(os.path.join(xml_folder, xml_file))
                root = tree.getroot()

                for obj in root.findall('object'):
                    name = obj.find('name').text
                    bndbox = obj.find('bndbox')
                    xmin = int(bndbox.find('xmin').text)
                    ymin = int(bndbox.find('ymin').text)
                    xmax = int(bndbox.find('xmax').text)
                    ymax = int(bndbox.find('ymax').text)

                    if xmin >= xmax or ymin >= ymax:
                        print(f"Invalid bounding box in {xml_file}")
            except Exception as e:
                print(f"Error reading {xml_file}: {e}")

check_annotations('/content/cans_augmented/annotations')

In [None]:
class VOCDataset(Dataset):
    def __init__(self, image_dir, annot_dir, transforms=None):
        self.image_dir = image_dir
        self.annot_dir = annot_dir
        self.transforms = transforms
        self.image_files = [f for f in os.listdir(image_dir) if f.endswith(('.jpg', '.jpeg', '.png'))]

    def __len__(self):
        return len(self.image_files)

    def __getitem__(self, idx):
        img_filename = self.image_files[idx]
        img_path = os.path.join(self.image_dir, img_filename)
        annot_path = os.path.join(self.annot_dir, os.path.splitext(img_filename)[0] + '.xml')

        image = Image.open(img_path).convert("RGB")
        boxes, labels = self.parse_voc_xml(annot_path)

        target = {}
        target["boxes"] = torch.tensor(boxes, dtype=torch.float32)
        target["labels"] = torch.tensor(labels, dtype=torch.int64)

        if self.transforms:
            image = self.transforms(image)

        return image, target

    def parse_voc_xml(self, xml_file):
        tree = ET.parse(xml_file)
        root = tree.getroot()
        boxes = []
        labels = []

        for obj in root.findall('object'):
            label = obj.find('name').text
            bndbox = obj.find('bndbox')
            xmin = int(bndbox.find('xmin').text)
            ymin = int(bndbox.find('ymin').text)
            xmax = int(bndbox.find('xmax').text)
            ymax = int(bndbox.find('ymax').text)
            boxes.append([xmin, ymin, xmax, ymax])
            labels.append(self.label_to_int(label))

        return boxes, labels

    def label_to_int(self, label):
        label_map = {'non-defect': 1, 'defect': 0}
        return label_map.get(label, 0)

In [None]:
transform = T.Compose([
    T.Resize((300, 300)),
    T.ToTensor(),
    T.Normalize(mean=[0.485, 0.456, 0.406],
                std=[0.229, 0.224, 0.225]),
])

In [None]:
model = ssd300_vgg16(weights=SSD300_VGG16_Weights.COCO_V1)
size = 300
num_classes = 3

in_channels = _utils.retrieve_out_channels(model.backbone, (size, size))
num_anchors = model.anchor_generator.num_anchors_per_location()
model.head.classification_head = SSDClassificationHead(
    in_channels=in_channels,
    num_anchors=num_anchors,
    num_classes=num_classes,
)

model.transform.min_size = (size,)
model.transform.max_size = size

Downloading: "https://download.pytorch.org/models/ssd300_vgg16_coco-b556d3b4.pth" to /root/.cache/torch/hub/checkpoints/ssd300_vgg16_coco-b556d3b4.pth
100%|██████████| 136M/136M [00:00<00:00, 203MB/s]


In [None]:
dataset = VOCDataset(
    image_dir='/content/cans_augmented/images',
    annot_dir='/content/cans_augmented/annotations',
    transforms=transform
)

data_loader = DataLoader(dataset, batch_size=4, shuffle=True, collate_fn=lambda x: tuple(zip(*x)))

device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')
model.to(device)

optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9, weight_decay=0.0005)
lr_scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

def calculate_map(preds, targets, iou_threshold=0.5):
    all_precisions = []

    for pred_boxes, pred_scores, pred_labels, gt_boxes, gt_labels in zip(*preds, *targets):
        if pred_boxes.numel() == 0 or gt_boxes.numel() == 0:
            continue

        ious = box_iou(pred_boxes, gt_boxes)

        tp = (ious.max(dim=1)[0] > iou_threshold).sum().item()
        fp = pred_boxes.size(0) - tp
        fn = gt_boxes.size(0) - tp

        precision = tp / (tp + fp + 1e-6)
        all_precisions.append(precision)

    if len(all_precisions) == 0:
        return 0.0
    return sum(all_precisions) / len(all_precisions)

num_epochs = 10
for epoch in range(num_epochs):
    model.train()
    total_loss = 0

    preds_boxes = []
    preds_scores = []
    preds_labels = []
    targets_boxes = []
    targets_labels = []

    for images, targets in data_loader:
        images = list(image.to(device) for image in images)
        targets = [{k: v.to(device) for k, v in t.items()} for t in targets]

        loss_dict = model(images, targets)
        losses = sum(loss for loss in loss_dict.values())

        optimizer.zero_grad()
        losses.backward()
        optimizer.step()

        total_loss += losses.item()

        with torch.no_grad():
            model.eval()
            outputs = model(images)

            for output, target in zip(outputs, targets):
                pred_boxes = output['boxes']
                pred_scores = output['scores']
                pred_labels = output['labels']

                gt_boxes = target['boxes']
                gt_labels = target['labels']

                keep = pred_scores > 0.5
                pred_boxes = pred_boxes[keep]
                pred_labels = pred_labels[keep]
                pred_scores = pred_scores[keep]

                preds_boxes.append(pred_boxes.cpu())
                preds_scores.append(pred_scores.cpu())
                preds_labels.append(pred_labels.cpu())
                targets_boxes.append(gt_boxes.cpu())
                targets_labels.append(gt_labels.cpu())
        model.train()

    lr_scheduler.step()
    mAP = calculate_map((preds_boxes, preds_scores, preds_labels), (targets_boxes, targets_labels))
    print(f"Epoch {epoch+1}, Loss: {total_loss/len(data_loader):.4f}, mAP@0.5: {mAP*100:.2f}%")

Epoch 1, Loss: 0.0012, mAP@0.5: 0.94%


KeyboardInterrupt: 

In [None]:
torch.save(model.state_dict(), '/content/singleshotdetector_cans.pth')

In [None]:
model.load_state_dict(torch.load('/content/singleshotdetector_cans.pth'))
model.to(device)
model.eval()