In [2]:
import os, json, cv2, numpy as np, matplotlib.pyplot as plt, yaml
import torch
from torch.utils.data import Dataset, DataLoader, random_split
import torchvision
from torchvision.models.detection.rpn import AnchorGenerator
from torchvision.transforms import functional as F
import albumentations as A # Library for augmentations
from albumentations.pytorch import ToTensorV2
import pandas as pd
from PIL import Image
import torch.optim as optim
from torchvision import datasets, models, transforms
import torch.nn as nn
# from torchvision.transforms.functional import Interpolate
import torch.nn.functional as Func
from datetime import datetime
from torchvision.io import read_image
import time
import torch.optim as optim
import copy
from torchvision.models.detection.anchor_utils import DefaultBoxGenerator
from torchvision.models.detection.ssd import SSD, SSDHead, SSDClassificationHead

In [3]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor()
])

def load_class_info(yaml_file):
    with open(yaml_file, 'r') as file:
            class_info = yaml.safe_load(file)
    return class_info['classes']

In [4]:
class ClassDataset(Dataset):
    def __init__(self, dataset_folder, class_info_file, transform=None, demo=False):
        self.dataset_folder = dataset_folder
        self.transform = transform
        # self.label_transform = label_transform
        self.demo = demo
        self.imgs_files = self.load_data(dataset_folder)
        self.class_names = load_class_info(class_info_file)
        self.num_classes = len(self.class_names)
        # self.normalize_keypoints = normalize_keypoints
        self.class_to_idx = {class_name: idx for idx, class_name in enumerate(self.class_names)}
        print(self.class_to_idx)

    def load_data(self, dataset_folder):
        images_path = os.path.join(self.dataset_folder,"images/")
        annotations_path = os.path.join(self.dataset_folder,"annotations/")
        j_data = []
        for file in os.listdir(images_path):
            if file.endswith(".jpg"):
                json_path = os.path.join(annotations_path, file.split('.')[0] + '.json')
                with open(json_path) as f:
                    json_load = json.load(f)
                    for item in json_load['shapes']:
                        points = [value for row in item['points'] for value in row]
                        j_data.append({'image':  os.path.join(images_path,file),
                                     'label': item['label'],
                                     'points':points })
        json_data = pd.DataFrame(j_data)
        return json_data

    def get_keypoint(self, bboxes):
        centers = []
        for bbox in bboxes:
            center_x = (bbox[0] + bbox[2]) / 2
            center_y = (bbox[1] + bbox[3]) / 2
            centers.append((center_x, center_y))
        return centers
    
    def __len__(self):
        return len(self.imgs_files)

    def __getitem__(self, idx):
        target = {}
        img_path, label, bboxes_original = self.imgs_files.iloc[idx]
        bboxes_original = [bboxes_original]
        label = torch.tensor(self.class_to_idx[label], dtype=torch.int32)
        keypoint_original = torch.tensor(self.get_keypoint(bboxes_original), dtype=torch.float32)
        # img_original = cv2.imread(img_path)
        # img_original = cv2.cvtColor(img_original, cv2.COLOR_BGR2RGB)
        # img_original = read_image(img_path).float() / 255.0
        img_original = Image.open(img_path).convert('RGB')

        if self.transform:
            img_original = self.transform(img_original)

        return img_original, keypoint_original, label

In [7]:
def collate_fn(batch):
    return tuple(zip(*batch))

class_config_path = '../../config/formated_class.yaml'
KEYPOINTS_FOLDER_TRAIN = '../../dataset/robocup_all_test/'
train_path = os.path.join(KEYPOINTS_FOLDER_TRAIN,"train/")
val_path = os.path.join(KEYPOINTS_FOLDER_TRAIN,"val/")

dataset = ClassDataset(train_path, class_config_path, transform=transform, demo=True)
dataloader = DataLoader(dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)

{'AllenKey': 0, 'Axis2': 1, 'Bearing2': 2, 'Drill': 3, 'F20_20_B': 4, 'F20_20_G': 5, 'Housing': 6, 'M20': 7, 'M20_100': 8, 'M30': 9, 'Motor2': 10, 'S40_40_B': 11, 'S40_40_G': 12, 'Screwdriver': 13, 'Spacer': 14, 'Wrench': 15, 'container_blue': 16, 'container_red': 17}


In [8]:
iterator = iter(dataloader)
images, keypoint, label = next(iterator)
print(label)
print(keypoint)
image_number = 0

(tensor(1, dtype=torch.int32), tensor(5, dtype=torch.int32), tensor(3, dtype=torch.int32), tensor(4, dtype=torch.int32), tensor(1, dtype=torch.int32), tensor(9, dtype=torch.int32), tensor(0, dtype=torch.int32), tensor(1, dtype=torch.int32), tensor(4, dtype=torch.int32), tensor(5, dtype=torch.int32), tensor(6, dtype=torch.int32), tensor(7, dtype=torch.int32), tensor(9, dtype=torch.int32), tensor(2, dtype=torch.int32), tensor(6, dtype=torch.int32), tensor(7, dtype=torch.int32), tensor(3, dtype=torch.int32), tensor(1, dtype=torch.int32), tensor(2, dtype=torch.int32), tensor(7, dtype=torch.int32), tensor(0, dtype=torch.int32), tensor(3, dtype=torch.int32), tensor(4, dtype=torch.int32), tensor(7, dtype=torch.int32), tensor(3, dtype=torch.int32), tensor(5, dtype=torch.int32), tensor(1, dtype=torch.int32), tensor(5, dtype=torch.int32), tensor(0, dtype=torch.int32), tensor(12, dtype=torch.int32), tensor(7, dtype=torch.int32), tensor(1, dtype=torch.int32))
(tensor([[358.0000, 323.5000]]), tenso

In [12]:
class SSDKeypointDetector(nn.Module):
    def __init__(self, num_classes, num_keypoints, input_channels=3):  # Adjust input channels if needed
        super(SSDKeypointDetector, self).__init__()
        self.num_classes = num_classes
        self.num_keypoints = num_keypoints
        self.backbone = models.mobilenet_v2(pretrained=True).features
        
        # Adjust the input layer of the backbone to accommodate the new input channels
        self.backbone[0] = nn.Sequential(
            nn.Conv2d(input_channels, 32, kernel_size=3, stride=2, padding=1, bias=False),
            nn.BatchNorm2d(32),
            nn.ReLU6(inplace=True)
        )
        
        self.feature_map_layers = [6, 13, -1]
        self.out_channels = [self._get_out_channels(self.backbone[i]) for i in self.feature_map_layers]
        
        self.classification_head = nn.Conv2d(in_channels=self.out_channels[-1], out_channels=num_classes, kernel_size=3, padding=1)
        self.keypoint_head = nn.Conv2d(in_channels=self.out_channels[-1], out_channels=num_keypoints * 2, kernel_size=3, padding=1)
        
    def _get_out_channels(self, layer):
        if isinstance(layer, nn.Sequential):
            for sublayer in layer:
                if isinstance(sublayer, nn.Conv2d):
                    return sublayer.out_channels
        elif isinstance(layer, nn.Conv2d):
            return layer.out_channels
        elif isinstance(layer, models.mobilenetv2.InvertedResidual):
            for sublayer in layer.conv:
                if isinstance(sublayer, nn.Conv2d):
                    out_channels = sublayer.out_channels
            return out_channels
        raise ValueError("Unsupported layer type: {}".format(type(layer)))
        
    def forward(self, x):
        features = []
        for i, layer in enumerate(self.backbone):
            x = layer(x)
            if i in self.feature_map_layers:
                features.append(x)
        
        # Assume features[-1] is the feature map used for both heads
        keypoint_outputs = self.keypoint_head(features[-1])
        class_outputs = self.classification_head(features[-1])
        
        # Reshape class outputs for cross-entropy loss
        class_outputs = class_outputs.permute(0, 2, 3, 1).contiguous()
        class_outputs = class_outputs.view(class_outputs.size(0), -1, self.num_classes)
        
        return keypoint_outputs, class_outputs

In [13]:
def custom_loss_function(predictions, targets):
    keypoints_pred, labels_pred = predictions
    keypoints_target = torch.stack([t['keypoints'] for t in targets])
    labels_target = torch.stack([t['labels'] for t in targets])
    
    classification_loss = F.cross_entropy(labels_pred.view(-1, labels_pred.size(-1)), labels_target.view(-1))
    keypoint_loss = F.mse_loss(keypoints_pred, keypoints_target)
    
    total_loss = classification_loss + keypoint_loss
    return total_loss
    
def LaplaceNLLLoss(input, target, scale, eps=1e-06, reduction='mean'):
    loss = torch.log(2*scale) + torch.abs(input - target)/scale

    # Inputs and targets much have same shape
    input = input.view(input.size(0), -1)
    target = target.view(target.size(0), -1)
    if input.size() != target.size():
        raise ValueError("input and target must have same size")

    # Second dim of scale must match that of input or be equal to 1
    scale = scale.view(input.size(0), -1)
    if scale.size(1) != input.size(1) and scale.size(1) != 1:
        raise ValueError("scale is of incorrect size")

    # Check validity of reduction mode
    if reduction != 'none' and reduction != 'mean' and reduction != 'sum':
        raise ValueError(reduction + " is not valid")

    # Entries of var must be non-negative
    if torch.any(scale < 0):
        raise ValueError("scale has negative entry/entries")

    # Clamp for stability
    scale = scale.clone()
    with torch.no_grad():
        scale.clamp_(min=eps)

    # Calculate loss (without constant)
    loss = (torch.log(2*scale) + torch.abs(input - target) / scale).view(input.size(0), -1).sum(dim=1)


    # Apply reduction
    if reduction == 'mean':
        return loss.mean()
    elif reduction == 'sum':
        return loss.sum()
    else:
        return loss
    

In [14]:
def train_model(model, dataloader, optimizer, num_epochs=50):
    model.train()
    loss_cls = []
    loss_kpt = []
    for epoch in range(num_epochs):
        epoch_loss = 0
        for images, keypoints, labels in dataloader:
            images, keypoints, labels = torch.stack(images).to(device), torch.stack(keypoints).to(device), torch.stack(labels).to(device)
            optimizer.zero_grad()
            class_outputs, keypoint_outputs = model(images)
            loss_cls = criterion_cls(class_outputs, labels)
            loss_pts = criterion_pts(keypoint_outputs, keypoints)
            loss = loss_pts
            loss.backward()
            optimizer.step()
            train_epoch_loss += loss_cls.item()
        train_epoch_loss /= len(dataloader.dataset)
        loss_cls.append(epoch_loss)
        loss_kpt.append(epoch_loss)
        print(f'Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}')
    return model, loss_history

In [16]:
num_classes = len(load_class_info(class_config_path))
model = SSDKeypointDetector(num_classes, num_keypoints=1)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device) 

optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion_cls = nn.CrossEntropyLoss()
criterion_pts = nn.MSELoss()



In [17]:
# model = ConditionalSSD(num_classes=num_classes, num_keypoints=num_keypoints, input_channels=4)
# model = model.to(device)
# criterion = loss_fn_ssd
# optimizer = optim.Adam(model.parameters(), lr=0.001)

# Train and validate the model
num_epochs = 100
model, loss_history = train_model(model, dataloader, optimizer, num_epochs)
torch.save(model.state_dict(), './ssd_normal.pth')

# train_losses, valid_losses = train_model(model, criterion, criterion_kpt, optimizer, train_loader, valid_loader, num_epochs=80)
# trained_model, train_loss, key_train_loss, val_loss, key_val_loss = train_model(model, criterion_cls, criterion_kpt, optimizer, train_loader, valid_loader, num_epochs=50)
# train_model(model, criterion, optimizer, num_epochs)
# torch.save(model.state_dict(), '../models/ssd/ssd_trained_1.pth')

RuntimeError: Given groups=1, weight of size [2, 1280, 3, 3], expected input[32, 96, 14, 14] to have 1280 channels, but got 96 channels instead

In [None]:
# Plot training and validation loss
plot_loss(train_losses, val_losses)

# Inference and visualization
model.eval()
inputs, keypoints = next(iter(val_loader))
inputs = inputs.to(device)
outputs = model(inputs)

for i in range(inputs.size(0)):
    visualize_keypoints(inputs[i][:3], keypoints[i])  # Use only RGB channels for visualization
    plot_keypoints(inputs[i][:3], keypoints[i], outputs[i].cpu().detach().numpy())