In [1]:
from google.colab import drive
drive.mount('/content/gdrive')

Mounted at /content/gdrive


In [2]:
import os
import xml.etree.ElementTree as ET
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import torch
from os.path import join
from torchvision.io import read_image
import numpy as np
import cv2
from torchvision import transforms
import torch.nn as nn
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
!nvidia-smi

Tue Apr  4 11:55:35 2023       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 525.85.12    Driver Version: 525.85.12    CUDA Version: 12.0     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  NVIDIA A100-SXM...  Off  | 00000000:00:04.0 Off |                    0 |
| N/A   31C    P0    43W / 400W |      0MiB / 40960MiB |      0%      Default |
|                               |                      |             Disabled |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Proces

In [None]:
torch.cuda.is_available()

True

In [3]:
class BlinkingDataset(Dataset):

    def __init__(self, dataset, split_path='train', clip_len = 3, transform=None, target_transform=None):
        self.dataset = dataset
        self.root_dir = '/content/gdrive/MyDrive/dataset'
        if not self.check_integrity():
            raise RuntimeError('Dataset not found or corrupted.' +
                               ' Please check the dataset location.')
        
        self.split_path = split_path
        self.split_dir = os.path.join(self.root_dir, self.split_path)
        self.clip_len = clip_len

        self.target_transform = target_transform
        self.transform = transform

        

        self.class_names = ['Fixed', 'Blinking']
        self.class_dict = {class_name: i for i, class_name in enumerate(self.class_names)}

        image_dir = 'images'
        annotation_dir = 'Annotations'
        
        # if preprocessing:
        #     print('Preprocessing of {} dataset, this will take long, but it will be done only once.'.format(dataset))
            # self.preprocess()

        self.img_batch_path = []
        self.ann_batch_path = []
        img_list = []
        for sub_dir in os.listdir(self.split_dir): 
            sub_path = join(self.split_dir, sub_dir)

            annotation_path = join(sub_path, annotation_dir)
            image_path = join(sub_path, image_dir)

            imgfiles = [f for f in os.listdir(image_path) if os.path.isfile(join(image_path, f))]
            annfiles = [f for f in os.listdir(annotation_path) if os.path.isfile(join(annotation_path, f))]
            
            
            for i in range(0, len(imgfiles) - self.clip_len + 1):
                batch = imgfiles[i : i+self.clip_len]
                batch_path = [os.path.join(image_path, file_name) for file_name in batch if os.path.isfile(os.path.join(image_path, file_name))]
                self.img_batch_path.append(batch_path)
            
                
            for i in range(0, len(annfiles) - self.clip_len + 1, 1):
                batch = annfiles[i : i+self.clip_len]
                batch_path = [os.path.join(annotation_path, file_name) for file_name in batch if os.path.isfile(os.path.join(annotation_path, file_name))]
                self.ann_batch_path.append(batch_path)    

    def __len__(self):
        return len(self.img_batch_path)
        
    def __getitem__(self, index):
        img_paths = self.img_batch_path[index]
        ann_paths = self.ann_batch_path[index]
        
        images = torch.Tensor()
        bboxes = []
        labels = []
        imgs_list = []
        for x in range(len(img_paths)):
            img = Image.open(img_paths[x]).convert('RGB')
            # img = img.resize((224,224))
            img = transforms.ToTensor()(img)

            imgs_list.append(img)

        images = torch.stack(imgs_list, dim = 1).to(device)

        for f in range(len(ann_paths)):
            label_per_img = []
            bbox_per_img = []
            tree = ET.parse(ann_paths[f])
            root = tree.getroot()
            size = root.find('size')
            width = float(size.find('width').text)
            height = float(size.find('height').text)
            depth = float(size.find('depth').text)

            for obj in root.findall('object'):
                name = obj.find('name').text
                if name not in self.class_names:
                    continue
                for bounding_box in obj.iter('bndbox'):
                    xmin = float(bounding_box.find('xmin').text)
                    ymin = float(bounding_box.find('ymin').text)
                    xmax = float(bounding_box.find('xmax').text)
                    ymax = float(bounding_box.find('ymax').text)
                    
                    # xmin = xmin / width
                    # ymin = ymin / height
                    # xmax = xmax / width
                    # ymax = ymax / height
                    box = [xmin, ymin, xmax, ymax]

                    bbox_per_img.append(box)
                    label_per_img.append(self.class_dict[name])

            bboxes.append(torch.Tensor(bbox_per_img))
            labels.append(torch.LongTensor(label_per_img))
            
        return {
            'images': images,
            'bboxes': bboxes,
            'labels': labels
        }
    
    def check_integrity(self):
        if not os.path.exists(self.root_dir):
            return False
        else:
            return True

In [4]:
import torch
import torch.nn as nn
import torchvision.models as models
import torch.nn.functional as F


def generate_anchors(scales, aspect_ratios):
    anchors = []
    for scale in scales:
        for aspect_ratio in aspect_ratios:
            h = scale / torch.sqrt(torch.tensor(aspect_ratio))
            w = scale * torch.sqrt(torch.tensor(aspect_ratio))
            anchors.append([-w / 2, -h / 2, w / 2, h / 2])
    return torch.tensor(anchors, dtype=torch.float32)



class FPN(nn.Module):
    def __init__(self, C3_size, C4_size, C5_size, feature_size=256):
        super(FPN, self).__init__()

        self.P5_1 = nn.Conv2d(C5_size, feature_size, kernel_size=1, stride=1, padding=0)
        self.P5_2 = nn.Conv2d(feature_size, feature_size, kernel_size=3, stride=1, padding=1)

        self.P4_1 = nn.Conv2d(C4_size, feature_size, kernel_size=1, stride=1, padding=0)
        self.P4_2 = nn.Conv2d(feature_size, feature_size, kernel_size=3, stride=1, padding=1)

        self.P3_1 = nn.Conv2d(C3_size, feature_size, kernel_size=1, stride=1, padding=0)
        self.P3_2 = nn.Conv2d(feature_size, feature_size, kernel_size=3, stride=1, padding=1)

    def forward(self, x):
        C3, C4, C5 = x

        P5 = self.P5_1(C5)
        P5_upsampled = F.interpolate(P5, scale_factor=2, mode='bilinear')
        
        # Match dimensions by cropping or padding
        diff_h = P5_upsampled.size(2) - C4.size(2)
        diff_w = P5_upsampled.size(3) - C4.size(3)

        if diff_h > 0:
            P5_upsampled = P5_upsampled[:, :, :-diff_h, :]
        elif diff_h < 0:
            P5_upsampled = F.pad(P5_upsampled, (0, 0, 0, -diff_h))

        if diff_w > 0:
            P5_upsampled = P5_upsampled[:, :, :, :-diff_w]
        elif diff_w < 0:
            P5_upsampled = F.pad(P5_upsampled, (0, -diff_w, 0, 0))

        P4 = self.P4_1(C4) + P5_upsampled
        
        P4_upsampled = F.interpolate(P4, scale_factor=2)
        if P4_upsampled.shape[-1] > self.P3_1(C3).shape[-1]:
            P4_upsampled = P4_upsampled[..., :-1]
        P3 = self.P3_1(C3) + P4_upsampled

        P3 = self.P3_2(P3)
        P4 = self.P4_2(P4)
        P5 = self.P5_2(P5)

        return [P3, P4, P5]


In [5]:
import torch
import torch.nn as nn
import torchvision.models as models
from torchvision.ops import box_iou


class RetinaNet(nn.Module):
    def __init__(self, num_classes, scales, aspect_ratios, attention_size=256, lstm_hidden_size=512, lstm_num_layers=2):
        super(RetinaNet, self).__init__()
        
        # Load a pretrained resnet50 model
        resnet = models.resnet50(pretrained=True)
        
        self.C3 = nn.Sequential(*list(resnet.children())[:6])
        self.C4 = list(resnet.children())[6]
        self.C5 = list(resnet.children())[7]

        # Create the FPN module
        self.fpn = FPN(512, 1024, 2048)

        # LSTM and Attention Mechanism
        self.lstm = nn.LSTM(256 * 3, lstm_hidden_size, lstm_num_layers, batch_first=True)
        self.attention = nn.MultiheadAttention(lstm_hidden_size, num_heads=8)
        
        # Classification and Regression heads
        num_anchors = len(scales) * len(aspect_ratios)
        self.classification_head = nn.Linear(lstm_hidden_size, num_classes * num_anchors)
        self.regression_head = nn.Linear(lstm_hidden_size, 4 * num_anchors)

        # Anchor generation
        self.anchors = generate_anchors(scales, aspect_ratios)

    def forward(self, x):
        B, T, C, H, W = x.size()
        x = x.view(B * T, C, H, W)
        
        C3 = self.C3(x)
        C4 = self.C4(C3)
        C5 = self.C5(C4)
        
        features = self.fpn([C3, C4, C5])

        # Merge the FPN feature maps along the channel axis
        merged_features = torch.cat(features, dim=1)

        # Reshape back to (B, T, C, H, W)
        merged_features = merged_features.view(B, T, -1, merged_features.shape[2], merged_features.shape[3])

        # Global Average Pooling
        gap = nn.AdaptiveAvgPool2d(1)(merged_features)
        gap = gap.view(B, T, -1)

        # LSTM
        lstm_out, _ = self.lstm(gap)

        # Attention Mechanism
        attn_out, _ = self.attention(lstm_out, lstm_out, lstm_out)

        # Classification and Regression heads
        classification = self.classification_head(attn_out[:, -1, :]).view(B, -1, num_classes)
        regression = self.regression_head(attn_out[:, -1, :]).view(B, -1, 4)

        return classification, regression


In [6]:
import torch.optim as optim
from torch.utils.data import DataLoader
torch.cuda.empty_cache()

# Loss functions
def focal_loss(classification, targets, alpha=0.25, gamma=2):
    classification_loss = torch.nn.CrossEntropyLoss(reduction="none")(classification, targets)
    p_t = torch.exp(-classification_loss)
    focal_loss = alpha * (1 - p_t)**gamma * classification_loss
    return focal_loss.mean()

def smooth_l1_loss(regression, targets, sigma=1.0):
    regression_diff = regression - targets
    regression_loss = torch.where(torch.abs(regression_diff) < 1 / sigma**2,
                                  0.5 * sigma**2 * regression_diff**2,
                                  torch.abs(regression_diff) - 0.5 / sigma**2)
    return regression_loss.mean()

# Instantiate the RetinaNet model
num_classes = 2
scales = [32, 64, 128]
aspect_ratios = [0.5, 1, 2]

model = RetinaNet(num_classes, scales, aspect_ratios)

# Create the DataLoader
train_dataset = BlinkingDataset(dataset = 'FRSign', split_path='train')
train_loader = DataLoader(train_dataset, batch_size=1, shuffle=True)


# Loss functions
classification_criterion = nn.CrossEntropyLoss()
regression_criterion = nn.SmoothL1Loss()

# Optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# # Optimizer
# optimizer = optim.Adam(model.parameters(), lr=learning_rate)

# Training loop
num_epochs = 50

model.to(device)



Downloading: "https://download.pytorch.org/models/resnet50-0676ba61.pth" to /root/.cache/torch/hub/checkpoints/resnet50-0676ba61.pth
100%|██████████| 97.8M/97.8M [00:00<00:00, 206MB/s]


RetinaNet(
  (C3): Sequential(
    (0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
    (3): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (4): Sequential(
      (0): Bottleneck(
        (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (downsample): Sequential(
          (0): Conv2d(64, 256

In [7]:
for epoch in range(num_epochs):
    model.train()

    running_loss = 0.0
    for batch_idx, batch in enumerate(train_loader):
        images = batch['images']
        gt_bboxes = batch['bboxes']
        gt_labels = batch['labels']

        # Forward pass
        pred_classification, pred_regression = model(images)

        # Calculate loss
        classification_loss, regression_loss = 0, 0
        for b in range(images.size(0)):
            # Calculate classification loss
            classification_loss += classification_criterion(pred_classification[b], gt_labels[b])

            # Calculate regression loss
            regression_loss += regression_criterion(pred_regression[b], gt_bboxes[b])

        loss = classification_loss + regression_loss

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    # Print epoch results
    print(f"Epoch [{epoch + 1}/{num_epochs}], Loss: {running_loss / (batch_idx + 1)}")


RuntimeError: ignored