## Imports

In [1]:
import torch
print(torch.__version__)
print(torch.cuda.is_available())
print(torch.version.cuda)

from transformers import AutoImageProcessor, Swinv2Model
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

2.5.1
True
11.8


  from .autonotebook import tqdm as notebook_tqdm


## Load CBD dataset (for training)

### Transformation
 - resize to 244,244
 - normalized based on calculations from training set
 - converted to tensors

In [2]:
import sys
sys.path.append('../dataset')
from dataset_parser import XMLDataset
import numpy as np
import os
from PIL import Image


In [3]:
#calculation mean and std of training set for normalization

def compute_dataset_stats(image_dir):
    mean = np.zeros(3)
    std = np.zeros(3)
    total_pixels = 0

    # List all files in the directory
    image_filenames = sorted(os.listdir(image_dir))

    for img_filename in image_filenames:
        img_path = os.path.join(image_dir, img_filename)

        # Open the image
        image = Image.open(img_path).convert("RGB")
        image = np.array(image) / 255.0  # Convert to range [0, 1]

        # Compute mean and std
        mean += image.mean(axis=(0, 1))
        std += image.std(axis=(0, 1))
        total_pixels += 1

    # Compute average mean and std across all images
    mean /= total_pixels
    std /= total_pixels
    return mean.tolist(), std.tolist()

# Example usage
image_dir = '../dataset/train_img'
mean, std = compute_dataset_stats(image_dir)
print("Mean:", mean)
print("Std:", std)


Mean: [0.9362358181861065, 0.9420508144749817, 0.9394349808120092]
Std: [0.16755679634617304, 0.15510376581525342, 0.1576581799256157]


In [4]:
from torchvision.transforms import v2

#apply transformations to image and bounding boxes

transform = v2.Compose([
    #scales and converst to float32 tensors
    v2.ToTensor()
    ])





In [5]:
label_map = {
    "text" : 1,
    "arrow" : 2,
    "connection" : 3,
    "data": 4,
    "decision": 5,
    "process" : 6,
    "terminator" : 7
}

train = XMLDataset(image_dir='../dataset/train_img', annotation_dir='../dataset/xml_files/train', label_map=label_map, transform=transform)
test = XMLDataset(image_dir='../dataset/test_img', annotation_dir='../dataset/xml_files/test', label_map=label_map, transform=transform)
validation = XMLDataset(image_dir='../dataset/val_img', annotation_dir='../dataset/xml_files/val', label_map=label_map, transform=transform)
data = train[0]
print(data)

(tensor([[[0.9490, 0.9490, 0.9490,  ..., 0.9490, 0.9490, 0.9490],
         [0.9490, 0.9490, 0.9490,  ..., 0.9490, 0.9490, 0.9490],
         [0.9490, 0.9490, 0.9490,  ..., 0.9490, 0.9490, 0.9490],
         ...,
         [0.9490, 0.9490, 0.9490,  ..., 0.9490, 0.9490, 0.9490],
         [0.9490, 0.9490, 0.9490,  ..., 0.9490, 0.9490, 0.9490],
         [0.9490, 0.9490, 0.9490,  ..., 0.9490, 0.9490, 0.9490]],

        [[0.9490, 0.9490, 0.9490,  ..., 0.9490, 0.9490, 0.9490],
         [0.9490, 0.9490, 0.9490,  ..., 0.9490, 0.9490, 0.9490],
         [0.9490, 0.9490, 0.9490,  ..., 0.9490, 0.9490, 0.9490],
         ...,
         [0.9490, 0.9490, 0.9490,  ..., 0.9490, 0.9490, 0.9490],
         [0.9490, 0.9490, 0.9490,  ..., 0.9490, 0.9490, 0.9490],
         [0.9490, 0.9490, 0.9490,  ..., 0.9490, 0.9490, 0.9490]],

        [[0.9490, 0.9490, 0.9490,  ..., 0.9490, 0.9490, 0.9490],
         [0.9490, 0.9490, 0.9490,  ..., 0.9490, 0.9490, 0.9490],
         [0.9490, 0.9490, 0.9490,  ..., 0.9490, 0.9490, 0

  target['boxes'] = torch.tensor(target['boxes'], dtype=torch.float32)
  target['image_id'] = torch.tensor(target['labels'], dtype=torch.int64)
  target['area'] = torch.tensor(target['labels'], dtype=torch.int64)


In [6]:
# assert data['objects']['bbox'][0].tolist() == [201., 31., 211., 92.], "First box coordinates do not match"
(image, target) = data
# assert target['labels'][:5] == [7, 6, 6, 6, 5], "First five labels do not match"
assert len(target['area']) == len(target['labels']), "Length of area and category do not match"
assert len(target['boxes']) == len(target['area']), "Length of bbox and area do not match"
# print("new image size", image.shape)



print("Assertions passed successfully!")

Assertions passed successfully!


## Load FCA dataset (for testing)

## Load Swin Transformer

In [7]:
from transformers import AutoConfig, Swinv2Backbone, AutoModel, Swinv2Config, AutoBackbone
import torch.nn as nn
from collections import OrderedDict

# config = Swinv2Config()
# config.out_features = ["stage2", "stage3", "stage4"]
# swin_backbone = AutoModel.from_pretrained("microsoft/swinv2-tiny-patch4-window8-256")
swin_backbone = AutoBackbone.from_pretrained("microsoft/swinv2-tiny-patch4-window8-256", out_features=["stage1", "stage2", "stage3", "stage4"])

# print(config)


class CustomSwinBackbone(nn.Module):
    def __init__(self, backbone):
        super().__init__()
        self.backbone = backbone

    def forward(self, x):
        # Forward pass through the backbone
        out = self.backbone(x, output_hidden_states=False, output_attentions=False)
        feature_map = out.feature_maps
        # print(len(out.feature_maps))
        out_channel = 768
        feature_dict = {}
        
        for i in range(len(feature_map)):
            in_channel = list(out.feature_maps[i].shape)[1]
            conv = nn.Conv2d(in_channels=in_channel, out_channels=out_channel, kernel_size=1)
            feature_dict[str(i)] = conv(feature_map[i])
            
        # print(feature_dict["0"].shape)
        # print(feature_dict["1"].shape)
        # print(feature_dict["2"].shape)
        # print(feature_dict["3"].shape)

        # Permute the output to (b, c, h, w)
        # out[0] corresponds to the feature map (assuming the backbone outputs a list of feature maps)
        # out[0].shape is (b, h, w, c), and we need to permute it to (b, c, h, w)
        return feature_dict
# Create the custom backbone with the permute operation
swin_backbone = CustomSwinBackbone(swin_backbone)
print(swin_backbone)

CustomSwinBackbone(
  (backbone): Swinv2Backbone(
    (embeddings): Swinv2Embeddings(
      (patch_embeddings): Swinv2PatchEmbeddings(
        (projection): Conv2d(3, 96, kernel_size=(4, 4), stride=(4, 4))
      )
      (norm): LayerNorm((96,), eps=1e-05, elementwise_affine=True)
      (dropout): Dropout(p=0.0, inplace=False)
    )
    (encoder): Swinv2Encoder(
      (layers): ModuleList(
        (0): Swinv2Stage(
          (blocks): ModuleList(
            (0): Swinv2Layer(
              (attention): Swinv2Attention(
                (self): Swinv2SelfAttention(
                  (continuous_position_bias_mlp): Sequential(
                    (0): Linear(in_features=2, out_features=512, bias=True)
                    (1): ReLU(inplace=True)
                    (2): Linear(in_features=512, out_features=3, bias=False)
                  )
                  (query): Linear(in_features=96, out_features=96, bias=True)
                  (key): Linear(in_features=96, out_features=96, bias=Fals

In [8]:

from torchvision.models.detection import FasterRCNN
from torchvision.models.detection.rpn import AnchorGenerator
import torchvision

swin_backbone.out_channels = 768
anchor_generator = AnchorGenerator(
    sizes=((32, 64, 128, 256, 512),
           (32, 64, 128, 256, 512),
           (32, 64, 128, 256, 512),
           (32, 64, 128, 256, 512)), 
    aspect_ratios=((0.5, 1.0, 2.0),
                   (0.5, 1.0, 2.0),
                   (0.5, 1.0, 2.0),
                   (0.5, 1.0, 2.0)))
roi_pooler = torchvision.ops.MultiScaleRoIAlign(
    featmap_names=["1", "2", "3"], 
    output_size = 7, 
    sampling_ratio=2
    )

model = FasterRCNN(
    backbone=swin_backbone,
    #7 + 1 for background
    num_classes=8,
    # min_size=256,
    max_size=256,
    # image_mean=mean,
    # image_std=std,
    rpn_anchor_generator=anchor_generator,
    box_roi_pool=roi_pooler)


In [9]:
print(mean, std)

[0.9362358181861065, 0.9420508144749817, 0.9394349808120092] [0.16755679634617304, 0.15510376581525342, 0.1576581799256157]


In [None]:
from torch.utils.data import DataLoader, Subset
from torch.utils.data.dataloader import default_collate

# train on the GPU or on the CPU, if a GPU is not available
device = torch.device('cuda') if torch.cuda.is_available() else torch.device('cpu')

def collate_fn(batch):
    # collated = []
    # for item in batch:
    #     images = item[0]
    #     labels = item[1]
    #     images = images.to(device)
    #     for key, value in labels.items():
    #         if torch.is_tensor(value):
    #             labels[key] = value.to(device) 

    #     collated.append((images, labels))
    
    return tuple(zip(*batch))
            
# train_subset = Subset(train, list(range(8)))
# val_subset = Subset(validation, list(range(8)))

# define training and validation data loaders
data_loader = DataLoader(
    train,
    batch_size=4,
    shuffle=True,
    collate_fn=collate_fn
)

data_loader_val = DataLoader(
    validation,
    batch_size=4,
    shuffle=False,
    collate_fn=collate_fn
)


In [21]:
params = [p for p in model.parameters() if p.requires_grad]

optimizer = torch.optim.SGD(params, lr=0.005, momentum=0.9, weight_decay=0.0005)

# and a learning rate scheduler
lr_scheduler = torch.optim.lr_scheduler.StepLR(
    optimizer,
    step_size=3,
    gamma=0.1
)


In [22]:
#edited from pytorch documentation
def train_one_epoch(model, optimizer, data_loader,  epoch, lr_scheduler):
    try: 
        model.train()
        header = f"Epoch: [{epoch}]"
        batch_loss = 0
        last_loss = 0
        
        for i, data in enumerate(data_loader):
            images, labels = data
            
            loss_dict = model(images, labels)
            # print(loss_dict)
            losses = sum(loss for loss in loss_dict.values())


            optimizer.zero_grad()
            losses.backward()
            optimizer.step()

            lr_scheduler.step()
            
            # Gather data and report
            batch_loss += losses.item()
            
            last_loss = batch_loss
            print(header)
            print('  batch {} loss: {}'.format(i + 1, last_loss))
            batch_loss = 0
            
    except Exception as e:
        # Print the error and the image_id that caused it
        # print(f"Error for image name {labels['image_id']}")
        # print(data)
        print(f"Error: {str(e)}")
        # You can return None or raise the error depending on your need
        raise e
        
    return last_loss



In [23]:
from typing import Tuple, List, Dict, Optional
import torch
from torch import Tensor
from collections import OrderedDict
from torchvision.models.detection.roi_heads import fastrcnn_loss
from torchvision.models.detection.rpn import concat_box_prediction_layers

def eval_forward(model, images, targets):
    # type: (List[Tensor], Optional[List[Dict[str, Tensor]]]) -> Tuple[Dict[str, Tensor], List[Dict[str, Tensor]]]
    """
    Args:
        images (list[Tensor]): images to be processed
        targets (list[Dict[str, Tensor]]): ground-truth boxes present in the image (optional)
    Returns:
        result (list[BoxList] or dict[Tensor]): the output from the model.
            It returns list[BoxList] contains additional fields
            like `scores`, `labels` and `mask` (for Mask R-CNN models).
    """
    model.eval()

    original_image_sizes: List[Tuple[int, int]] = []
    for img in images:
        val = img.shape[-2:]
        assert len(val) == 2
        original_image_sizes.append((val[0], val[1]))

    images, targets = model.transform(images, targets)

    # Check for degenerate boxes
    # TODO: Move this to a function
    if targets is not None:
        for target_idx, target in enumerate(targets):
            boxes = target["boxes"]
            degenerate_boxes = boxes[:, 2:] <= boxes[:, :2]
            if degenerate_boxes.any():
                # print the first degenerate box
                bb_idx = torch.where(degenerate_boxes.any(dim=1))[0][0]
                degen_bb: List[float] = boxes[bb_idx].tolist()
                raise ValueError(
                    "All bounding boxes should have positive height and width."
                    f" Found invalid box {degen_bb} for target at index {target_idx}."
                )

    features = model.backbone(images.tensors)
    if isinstance(features, torch.Tensor):
        features = OrderedDict([("0", features)])
    model.rpn.training=True
    #model.roi_heads.training=True


    #####proposals, proposal_losses = model.rpn(images, features, targets)
    features_rpn = list(features.values())
    objectness, pred_bbox_deltas = model.rpn.head(features_rpn)
    anchors = model.rpn.anchor_generator(images, features_rpn)

    num_images = len(anchors)
    num_anchors_per_level_shape_tensors = [o[0].shape for o in objectness]
    num_anchors_per_level = [s[0] * s[1] * s[2] for s in num_anchors_per_level_shape_tensors]
    objectness, pred_bbox_deltas = concat_box_prediction_layers(objectness, pred_bbox_deltas)
    # apply pred_bbox_deltas to anchors to obtain the decoded proposals
    # note that we detach the deltas because Faster R-CNN do not backprop through
    # the proposals
    proposals = model.rpn.box_coder.decode(pred_bbox_deltas.detach(), anchors)
    proposals = proposals.view(num_images, -1, 4)
    proposals, scores = model.rpn.filter_proposals(proposals, objectness, images.image_sizes, num_anchors_per_level)

    proposal_losses = {}
    assert targets is not None
    labels, matched_gt_boxes = model.rpn.assign_targets_to_anchors(anchors, targets)
    regression_targets = model.rpn.box_coder.encode(matched_gt_boxes, anchors)
    loss_objectness, loss_rpn_box_reg = model.rpn.compute_loss(
        objectness, pred_bbox_deltas, labels, regression_targets
    )
    proposal_losses = {
        "loss_objectness": loss_objectness,
        "loss_rpn_box_reg": loss_rpn_box_reg,
    }

    #####detections, detector_losses = model.roi_heads(features, proposals, images.image_sizes, targets)
    image_shapes = images.image_sizes
    proposals, matched_idxs, labels, regression_targets = model.roi_heads.select_training_samples(proposals, targets)
    box_features = model.roi_heads.box_roi_pool(features, proposals, image_shapes)
    box_features = model.roi_heads.box_head(box_features)
    class_logits, box_regression = model.roi_heads.box_predictor(box_features)

    result: List[Dict[str, torch.Tensor]] = []
    detector_losses = {}
    loss_classifier, loss_box_reg = fastrcnn_loss(class_logits, box_regression, labels, regression_targets)
    detector_losses = {"loss_classifier": loss_classifier, "loss_box_reg": loss_box_reg}
    boxes, scores, labels = model.roi_heads.postprocess_detections(class_logits, box_regression, proposals, image_shapes)
    num_images = len(boxes)
    for i in range(num_images):
        result.append(
            {
                "boxes": boxes[i],
                "labels": labels[i],
                "scores": scores[i],
            }
        )
    detections = result
    detections = model.transform.postprocess(detections, images.image_sizes, original_image_sizes)  # type: ignore[operator]
    model.rpn.training=False
    model.roi_heads.training=False
    losses = {}
    losses.update(detector_losses)
    losses.update(proposal_losses)
    return losses, detections

In [26]:
# Initializing in a separate cell so we can easily add more epochs to the same run
import datetime
import torchvision
from torchmetrics.detection.mean_ap import MeanAveragePrecision

timestamp = datetime.datetime.now().strftime("%Y%m%d_%H%M%S")
epoch_number = 0

EPOCHS = 1

best_vloss = 1_000_000.0

# move model to cuda
# model.to(device)
# model.backbone.to(device) 

for epoch in range(EPOCHS):
    print("EPOCH {}:".format(epoch_number + 1))

    # Make sure gradient tracking is on, and do a pass over the data
    model.train()
    avg_loss = train_one_epoch(
        epoch=epoch_number,
        model=model,
        data_loader=data_loader,
        optimizer=optimizer,
        lr_scheduler=lr_scheduler,
    )

    running_vloss = 0.0
    # Set the model to evaluation mode, disabling dropout and using population
    # statistics for batch normalization.
    model.eval()
    total_map = 0
    total_map_per_class = torch.zeros(7)

    # Disable gradient computation and reduce memory consumption.
    with torch.no_grad():
        total_predictions = 0
        correct_predictions = 0

        for i, vdata in enumerate(data_loader_val):
            vinputs, vlabels = vdata  # Images and their ground-truth labels

            # Compute the model outputs and loss
            loss_dict, detections = eval_forward(model, vinputs, vlabels)  # Pass inputs and targets
            # print(loss_dict)
            vloss = sum(loss for loss in loss_dict.values())
            running_vloss += vloss.item()

            metric = MeanAveragePrecision(iou_type="bbox", class_metrics=True)
            metric.update(detections, vlabels)
            map_dict = metric.compute()
            # print(map_dict)
            total_map += map_dict["map"]
            map_per_class = map_dict["map_per_class"]
            classes = map_dict["classes"]
            
            for index, value in zip(classes, map_per_class):
                total_map_per_class[index-1] += value
            

        avg_vloss = running_vloss / (i + 1)
        avg_map = total_map/ (i+1)
        avg_map_per_class = total_map_per_class / (i+1)
        
        print("////////////////////////////////////////////////////////////////////////////////")
        print(f"LOSS train {avg_loss} valid {avg_vloss}")
        print(f"MAP: {avg_map}")
        print(f"map per class: {avg_map_per_class}")
        

    # Track best performance, and save the model's state
    if avg_vloss < best_vloss:
        best_vloss = avg_vloss
        model_path = "model_{}_{}".format(timestamp, epoch_number)
        torch.save(model.state_dict(), model_path)

    epoch_number += 1

EPOCH 1:


  target['boxes'] = torch.tensor(target['boxes'], dtype=torch.float32)
  target['image_id'] = torch.tensor(target['labels'], dtype=torch.int64)
  target['area'] = torch.tensor(target['labels'], dtype=torch.int64)


Epoch: [0]
  batch 1 loss: 3.3591132164001465
Epoch: [0]
  batch 2 loss: 2.997565269470215
////////////////////////////////////////////////////////////////////////////////
LOSS train 2.997565269470215 valid 3.1706771850585938
MAP: 0.0007456097519025207
map per class: tensor([ 0.0000,  0.0000, -0.5000,  0.0045,  0.0000,  0.0000,  0.0000])
