In [1]:
import os
import cv2
import pandas as pd
import numpy as np
import torch
from torch.utils.data import DataLoader, random_split
import torchvision
from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.rpn import AnchorGenerator, RPNHead
from torchvision.ops import RoIAlign, RoIPool, MultiScaleRoIAlign
import matplotlib.pyplot as plt

In [3]:
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [4]:
IMAGE_DIR = '../dataset/images'
ANNOTATIONS_DIR = '../dataset/annotations'
TARGET_SIZE = (224, 224)
BATCH_SIZE = 16

In [5]:
class_names = [d for d in os.listdir(IMAGE_DIR) if os.path.isdir(os.path.join(IMAGE_DIR, d))]


In [6]:
csv_files = [f for f in os.listdir(ANNOTATIONS_DIR) if f.endswith('.csv')]


In [7]:
label_map = {name: i + 1 for i, name in enumerate(class_names)}


In [9]:
dataset = []

for i in range(len(class_names)):
    class_name = class_names[i]
    class_dir = os.path.join(IMAGE_DIR, class_name)
    csv_file_name = csv_files[i]
    
    csv_path = os.path.join(ANNOTATIONS_DIR, csv_file_name)
    df_annotations = pd.read_csv(csv_path)

    for image_name in os.listdir(class_dir):
        image_path = os.path.join(class_dir, image_name)
        
        image = cv2.imread(image_path)
        if image is None:
            continue
                
        h, w, _ = image.shape

        row = df_annotations[df_annotations['image_name'] == image_name]

        # ---- FIX HERE ----
        if row.empty:
            continue
        # -------------------

        ann = row.iloc[0, 1:].tolist()

        if ann[2] > ann[0] and ann[3] > ann[1]:
            ann[0] = int((ann[0] / w) * TARGET_SIZE[0])
            ann[1] = int((ann[1] / h) * TARGET_SIZE[0])
            ann[2] = int((ann[2] / w) * TARGET_SIZE[0])
            ann[3] = int((ann[3] / h) * TARGET_SIZE[0])
        
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            image = cv2.resize(image, TARGET_SIZE)
            
            image_tensor = torch.tensor(image, dtype=torch.float32).permute(2, 0, 1) / 255.0
            label_tensor = torch.tensor([label_map[class_name]], dtype=torch.int64)
            ann_tensor = torch.tensor([ann], dtype=torch.float32)
    
            target = {
                'boxes': ann_tensor,
                'labels': label_tensor
            }
            dataset.append((image_tensor, target))
        else:
            print(f"️Invalid box found and removed in '{image_path}': {ann}")

def collate_fn(batch):
    return tuple(zip(*batch))

dataloader = DataLoader(dataset, batch_size=8, shuffle=True, collate_fn=collate_fn)


️Invalid box found and removed in '../dataset/images/airplane/image_0118.jpg': [np.int64(104), np.int64(38), np.int64(126), np.int64(38)]


In [36]:
train_size = int(0.1 * len(dataset))
test_size = len(dataset) - train_size

In [37]:
generator = torch.Generator().manual_seed(42)

In [38]:

train_dataset, test_dataset = random_split(dataset, [train_size, test_size], generator=generator)

In [39]:
print(f"Dataset ({len(dataset)})")

Dataset (1669)


In [41]:
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, collate_fn=collate_fn)

print(f"Train ({len(train_dataset)}), Test ({len(test_dataset)})")

Train (166), Test (1503)


In [42]:
resnet_model = torchvision.models.resnet18()
backbone = torch.nn.Sequential(*list(resnet_model.children())[:-2])
backbone.out_channels = 512

In [43]:
for param in backbone.parameters():
    param.requires_grad = False

In [44]:
anchor_generator = AnchorGenerator(
    sizes=((32, 64, 128),),
    aspect_ratios=((0.5, 1.0, 2.0),)
)

In [45]:
roi_pool = MultiScaleRoIAlign(
    featmap_names=['0'],
    output_size=7,
    sampling_ratio=2
)

In [46]:
in_channels = backbone.out_channels
num_anchors = anchor_generator.num_anchors_per_location()[0]

rpn_head = RPNHead(in_channels=in_channels, num_anchors=num_anchors)

In [47]:
NUM_CLASSES = len(class_names) + 1

model = FasterRCNN(
    backbone,
    num_classes = NUM_CLASSES,
    rpn_anchor_generator = anchor_generator,
    rpn_head=rpn_head,
    box_roi_pool = roi_pool
)

In [48]:
params = [p for p in model.parameters() if p.requires_grad]
optimizer = torch.optim.Adam(params, lr=0.001)

In [None]:
import os

NUM_EPOCHS = 10
LOG_DIR = "../model_outputs"
LOG_FILE = os.path.join(LOG_DIR, "train_log.txt")

# Create output directory if it doesn't exist
os.makedirs(LOG_DIR, exist_ok=True)

# Open log file in append mode
log_f = open(LOG_FILE, "a")

backbone.to(DEVICE)
model.to(DEVICE)
model.train()

for epoch in range(NUM_EPOCHS):
    epoch_losses = []
    print(f"Start Epoch {epoch+1}/{NUM_EPOCHS}")
    log_f.write(f"Start Epoch {epoch+1}/{NUM_EPOCHS}\n")

    for i, (images, targets) in enumerate(train_loader):
        optimizer.zero_grad()
        
        images_gpu = [img.to(DEVICE) for img in images]
        targets_gpu = [{k: v.to(DEVICE) for k, v in t.items()} for t in targets]
            
        loss_dict = model(images_gpu, targets_gpu)
        losses = sum(loss_dict.values())
        
        if torch.isfinite(losses):
            epoch_losses.append(losses.item())
            losses.backward()
            optimizer.step()
    
    if epoch_losses:
        mean_loss = sum(epoch_losses) / len(epoch_losses)
        msg = f"Epoch {epoch+1} | Loss: {mean_loss:.4f}"
        print(msg)
        log_f.write(msg + "\n")
    else:
        msg = f"Epoch {epoch+1} | No valid losses"
        print(msg)
        log_f.write(msg + "\n")

# Close the log file when training is finished
log_f.close()
