In [2]:
# Download dependencies
import torch
from torch import nn
import torchvision
from torch.utils.data import DataLoader, random_split, Dataset
from torchvision import datasets, transforms
from torchvision.datasets.folder import has_file_allowed_extension, default_loader
from torchinfo import summary
import timm

import effdet

import sys
from pathlib import Path
import os

sys.path.append("../")

import random
from tqdm.auto import tqdm
import matplotlib.pyplot as plt
from PIL import Image
import json
import xmltodict
import numpy as np
from pprint import pprint
import pandas as pd

from src.common import tools
from src.common.tools import get_part_cat
from src.classification.model import model

from typing import Any, Callable, cast, Dict, List, Optional, Tuple, Union

print(f"PyTorch version: {torch.__version__}")

PyTorch version: 2.5.0+cu118


In [3]:
device = "cuda" if torch.cuda.is_available() else "cpu"
device

'cuda'

# Transforming data

In [4]:
# Set path to data
data_path = Path("../data/obj_detection")
obj_detection_image_path = data_path / "b200-lego-detection-dataset"

annot_dir = obj_detection_image_path / "annotations"
image_dir = obj_detection_image_path / "images"

"""
classification_image_paths: List[Path] = []
for root, dirs, _ in os.walk(data_path):
    for dir_name in dirs:
        folder_path: str = os.path.join(root, dir_name)
        subfolder_contents: List[str] = os.listdir(folder_path)

        if all(
            os.path.isfile(os.path.join(folder_path, item))
            for item in subfolder_contents
        ):
            classification_image_paths.append(Path(root))
            break
"""

part_to_cat_path = Path("../src/data/parts.csv")

In [5]:
def read_file(path) -> str:
    with open(path, "r") as f:
        return f.read()

In [6]:
def transform_name(annotations, name_transform):
    for part in annotations:
        part["target"] = name_transform(part["name"])

    return annotations

In [7]:
from typing import Any, Dict, List


from numpy import dtype, ndarray


class LegoObjDetDataset(Dataset):
    def __init__(
        self, 
        image_dir: Union[str, Path], 
        annot_dir: Union[str, Path], 
        transform: Optional[Callable] = None, 
        target_transform: Optional[Callable] = lambda x: x, 
        loader: Callable[[str], Any] = default_loader
    ) -> None:
        
        super().__init__()

        annotations = self.get_annotations(annot_dir, extensions=".xml")
        classes, class_to_idx = self.find_classes(list(annotations.values()))
        samples = self.make_dataset(annotations=annotations, image_dir=image_dir, class_to_idx=class_to_idx, extensions=".png")

        self.image_dir = image_dir
        self.annot_dir = annot_dir
        self.transform = transform
        self.target_transform = target_transform

        self.loader = loader

        self.annotations = annotations

        self.classes = classes
        self.class_to_idx = class_to_idx

        self.samples = samples
        self.labels = [s[1]["labels"] for s in samples]
        self.bndboxes = [s[1]["bndboxes"] for s in samples]
        self.images = [s[0] for s in samples]

        if target_transform is not None:
            self.transformed_to_idx = {transformed: idx for idx, transformed in enumerate(set(target_transform({"labels": [class_to_idx[_class] for _class in classes]}, class_to_idx)["labels"]))}
    

    def get_annotations(
        self,
        annot_dir: Union[str, Path], 
        extensions: Optional[str] = None,
        is_valid_file: Optional[Callable[[str], bool]] = None, # type: ignore
    ) -> Dict[str, ndarray[Dict[str, Any], dtype[Any]]]:
        """Gets the image annotations from the annotation files.

        Args:
            annot_dir (Union[str, Path]): Path to directory that contains annotation files.
            extensions (Optional[str], optional): Valid file extensions for annotation files. Defaults to None.
            is_valid_file (Optional[Callable[[str], bool]], optional): Callable for checking is file is valid. Defaults to None.

        Raises:
            ValueError: Both extensions and is_valid_file cannot be None or not None at the same time.
            FileNotFoundError: Couldn't find any valid annotations files in directory.

        Returns:
            Dict[str, ndarray[Dict[str, Any], dtype[Any]]]: _description_
        """

        ann_dir = os.path.expanduser(annot_dir)

        both_none = extensions is None and is_valid_file is None
        both_something = extensions is not None and is_valid_file is not None
        if both_none or both_something:
            raise ValueError("Both extensions and is_valid_file cannot be None or not None at the same time.")

        if extensions is not None:

            def is_valid_file(x: str) -> bool:
                return has_file_allowed_extension(x, extensions)

        annotations: Dict[str, ndarray[Dict[str, Any], dtype[Any]]] = {}

        for root, _, fnames in sorted(os.walk(ann_dir)):
            for fname in sorted(fnames):
                path = os.path.join(root, fname)
                if is_valid_file(path): # type: ignore
                    objects: ndarray[Dict[str, Any], dtype[Any]] = np.squeeze(
                        list(xmltodict.parse(read_file(os.path.join(root, fname)))["annotations"]["object"])
                    )
                    file_name = Path(path).stem
                    annotations.update({file_name: objects})

        if not annotations:
            raise FileNotFoundError(f"Couldn't find any valid annotations files in directory: {ann_dir}.")

        return annotations


    @staticmethod
    def find_classes(annotations: List[ndarray[Dict[str, Any], dtype[Any]]]) -> Tuple[List[str], Dict[str, int]]:
        """Finds the target classes in image annotation files.

        Args:
            annotations (ndarray[ndarray[Dict[str, Any], dtype[Any]], dtype[Any]]): Dictionary that contains annotations for alle image files.

        Raises:
            FileNotFoundError: Couldn't find any classes in given annotations.

        Returns:
            Tuple[List[str], Dict[str, int]]: Tuple that contains list with classes and dictionary for converting classes to respective indexes.
        """

        classes: List[str] = sorted(set(target["name"] for file in annotations for target in file))

        if not classes:
            raise FileNotFoundError(f"Couldn't find any classes in given annotations.")

        class_to_idx: Dict[str, int] = {cls_name: i for i, cls_name in enumerate(classes)}

        return classes, class_to_idx


    def make_dataset(
        self,
        annotations: Dict[str, ndarray[Dict[str, Any], dtype[Any]]],
        image_dir: Union[str, Path],
        class_to_idx: Optional[Dict[str, int]] = None,
        extensions: Optional[str] = None,
        is_valid_file: Optional[Callable[[str], bool]] = None, # type: ignore
    ) -> List[Tuple[str, Dict[str, Union[List[int], List[List[int]]]]]]:
        """Makes a list with all images and the corresponding targets and bounding boxes.

        Args:
            annotations (Dict[str, ndarray[Dict[str, Any], dtype[Any]]]): Dictionary that contains annotations for alle image files.
            image_dir (Union[str, Path]): Path to directory that contain image files.
            class_to_idx (Optional[Dict[str, int]], optional): Dictionary for converting classes to respective index. Defaults to None.
            extensions (Optional[str], optional): Valid file name extensions for image files. Defaults to None.
            is_valid_file (Optional[Callable[[str], bool]], optional): Callable for checking if file is valid as image file. Defaults to None.

        Raises:
            ValueError: 'class_to_index' must have at least one entry to collect any samples.
            ValueError: Both extensions and is_valid_file cannot be None or not None at the same time.
            FileNotFoundError: Couldn't find any valid image files in directory.

        Returns:
            List[Tuple[str, List[int], List[List[int]]]]: List of samples that contains an image path and its annotations
        """

        img_dir = os.path.expanduser(image_dir)

        if class_to_idx is None:
            _, class_to_idx = self.find_classes(annotations)
        elif not class_to_idx:
            raise ValueError(
                "'class_to_index' must have at least one entry to collect any samples."
            )
        
        both_none = extensions is None and is_valid_file is None
        both_something = extensions is not None and is_valid_file is not None
        if both_none or both_something:
            raise ValueError("Both extensions and is_valid_file cannot be None or not None at the same time.")

        if extensions is not None:

            def is_valid_file(x: str) -> bool:
                return has_file_allowed_extension(x, extensions)
        
            
        instances = []
        for root, _, fnames in sorted(os.walk(img_dir)):
            for fname in sorted(fnames):
                path = os.path.join(root, fname)
                if is_valid_file(path): # type: ignore
                    target: ndarray[Dict[str, Any], dtype[Any]] = annotations[Path(fname).stem]
                    item: Tuple[str, Dict[str, Union[List[int], List[List[int]]]]] = path, {"labels": [class_to_idx[obj["name"]] for obj in target], "bndboxes": [[int(coor) for coor in obj["bndbox"].values()] for obj in target]}
                    instances.append(item)

        if not instances:
            raise FileNotFoundError(f"Couldn't find any valid image files in directory: {img_dir}.")
        
        return instances
    

    def __getitem__(self, index: int):

        class_to_idx = self.class_to_idx
        
        path, target = self.samples[index]
        sample = self.loader(path)

        if self.transform is not None:
            sample = self.transform(sample)
        if self.target_transform is not None:
            target = self.target_transform(target, class_to_idx)
            target["labels"] = [self.transformed_to_idx[label] for label in target["labels"]]

        return sample, target

    def __len__(self):
        return len(self.samples)

In [8]:
part_df = pd.read_csv(part_to_cat_path, sep=",")

part_nums = part_df["part_num"].to_numpy()
part_cat_ids = part_df["part_cat_id"].to_numpy()

num_to_cat = {num: cat for num, cat in zip(part_nums, part_cat_ids)}

In [9]:
def targ_trans(targets: Dict[str, Union[List[int], List[List[int]]]], class_to_idx):
    targets_copy = targets.copy()
    idx_to_class = {idx: _class for _class, idx in class_to_idx.items()}
    targets_copy["labels"] = [get_part_cat(part_id=idx_to_class[label], id_to_cat=num_to_cat) for label in targets_copy["labels"]]
    return targets_copy

In [10]:
image_transform = transforms.Compose([transforms.ToTensor()])

In [11]:
objdet_dataset = LegoObjDetDataset(image_dir=image_dir, annot_dir=annot_dir, transform=image_transform, target_transform=targ_trans)

In [12]:
sample, target = objdet_dataset[0]
pprint(f"Sample size: {sample.size()}, \nTarget: {target}")

('Sample size: torch.Size([3, 2048, 2048]), \n'
 "Target: {'labels': [4, 4, 18, 18, 1, 1, 7, 7, 23, 23, 4, 4, 4, 4, 19, 19, 4, "
 '4, 11, 11, 25, 25, 19, 19, 4, 4, 6, 6, 15, 15, 26, 26, 8, 8, 4, 4, 8, 8, 23, '
 '23, 6, 6, 4, 4, 4, 4, 8, 8, 0, 0, 1, 1, 5, 5, 8, 8, 7, 7, 19, 19, 25, 25, '
 '11, 11, 8, 8, 4, 4, 7, 7, 21, 21, 5, 5, 5, 5, 17, 17, 25, 25, 4, 4, 4, 4, '
 '13, 13, 21, 21, 11, 11, 1, 1, 23, 23, 25, 25, 0, 0, 1, 1, 5, 5, 5, 5, 5, 5, '
 '5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 1, 1, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, 7, '
 '7, 7, 7, 7, 7, 0, 0, 18, 18, 0, 0, 0, 0, 14, 14, 1, 1, 12, 12, 5, 5, 11, 11, '
 '11, 11, 11, 11, 3, 3, 6, 6, 4, 4, 23, 23, 20, 20, 3, 3, 20, 20, 24, 24, 22, '
 '22, 6, 6, 22, 22, 22, 22, 5, 5, 22, 22, 22, 22, 22, 22, 22, 22, 17, 17, 1, '
 '1, 17, 17, 4, 4, 4, 4, 7, 7, 4, 4, 5, 5, 7, 7, 0, 0, 0, 0, 7, 7, 23, 23, 3, '
 '3, 3, 3, 20, 20, 7, 7, 24, 24, 23, 23, 7, 7, 7, 7, 10, 10, 12, 12, 7, 7, 13, '
 '13, 22, 22, 1, 1, 13, 13, 4, 4, 4, 4, 11, 11, 22, 22, 4, 4, 21, 21, 21, 

In [13]:
objdet_dataset.transformed_to_idx

{3: 0,
 5: 1,
 6: 2,
 8: 3,
 9: 4,
 11: 5,
 12: 6,
 14: 7,
 15: 8,
 16: 9,
 18: 10,
 19: 11,
 20: 12,
 21: 13,
 23: 14,
 26: 15,
 27: 16,
 28: 17,
 32: 18,
 37: 19,
 46: 20,
 49: 21,
 51: 22,
 53: 23,
 54: 24,
 67: 25,
 68: 26}

# Build model

In [14]:
#effnet_classifier, _ = create_efficientnet_b0(class_names=torch.zeros(37), device=device)
effnet_classifier = timm.create_model('efficientnet_b0', pretrained=True).to(device)

frozen_blocks = [0, 1, 2, 3]
"""
for idx in frozen_blocks:
    for param in effnet_classifier.blocks[idx].parameters():
        param.requires_grad = False
"""
input_shape = effnet_classifier.classifier.in_features
output_shape = 200

effnet_classifier.classifier = nn.Sequential(
        nn.Dropout(p=0.2, inplace=True),
        nn.Linear(
            in_features=input_shape,
            out_features=output_shape,
            bias=True,
        ),
    ).to(device)


summary(model=effnet_classifier, 
        input_size=(64, 3, 512, 512), # make sure this is "input_size", not "input_shape"
        # col_names=["input_size"], # uncomment for smaller output
        col_names=["input_size", "output_size", "num_params", "trainable"],
        col_width=20,
        row_settings=["var_names"]
)

Layer (type (var_name))                            Input Shape          Output Shape         Param #              Trainable
EfficientNet (EfficientNet)                        [64, 3, 512, 512]    [64, 200]            --                   True
├─Conv2d (conv_stem)                               [64, 3, 512, 512]    [64, 32, 256, 256]   864                  True
├─BatchNormAct2d (bn1)                             [64, 32, 256, 256]   [64, 32, 256, 256]   64                   True
│    └─Identity (drop)                             [64, 32, 256, 256]   [64, 32, 256, 256]   --                   --
│    └─SiLU (act)                                  [64, 32, 256, 256]   [64, 32, 256, 256]   --                   --
├─Sequential (blocks)                              [64, 32, 256, 256]   [64, 320, 16, 16]    --                   True
│    └─Sequential (0)                              [64, 32, 256, 256]   [64, 16, 256, 256]   --                   True
│    │    └─DepthwiseSeparableConv (0)         

In [15]:
effnet_classifier2 = torchvision.models.efficientnet_b0().to(device)

input_shape = 1280
output_shape = 200

effnet_classifier2.classifier = nn.Sequential(
        nn.Dropout(p=0.2, inplace=True),
        nn.Linear(
            in_features=input_shape,
            out_features=output_shape,
            bias=True,
        ),
    ).to(device)


summary(model=effnet_classifier2, 
        input_size=(64, 3, 224, 224), # make sure this is "input_size", not "input_shape"
        # col_names=["input_size"], # uncomment for smaller output
        col_names=["input_size", "output_size", "num_params", "trainable"],
        col_width=20,
        row_settings=["var_names"]
)

Layer (type (var_name))                                      Input Shape          Output Shape         Param #              Trainable
EfficientNet (EfficientNet)                                  [64, 3, 224, 224]    [64, 200]            --                   True
├─Sequential (features)                                      [64, 3, 224, 224]    [64, 1280, 7, 7]     --                   True
│    └─Conv2dNormActivation (0)                              [64, 3, 224, 224]    [64, 32, 112, 112]   --                   True
│    │    └─Conv2d (0)                                       [64, 3, 224, 224]    [64, 32, 112, 112]   864                  True
│    │    └─BatchNorm2d (1)                                  [64, 32, 112, 112]   [64, 32, 112, 112]   64                   True
│    │    └─SiLU (2)                                         [64, 32, 112, 112]   [64, 32, 112, 112]   --                   --
│    └─Sequential (1)                                        [64, 32, 112, 112]   [64, 16, 112

In [31]:
effnet_sorter, weights = model.create_efficientnet_b0(class_names=np.zeros(37), device=device)
image_transform = weights.transforms()
effnet_sorter.load_state_dict(torch.load(f="../models/classification/batch_1/efficientnet_b0_lego_sorter.pt"))

for param in effnet_sorter.features.parameters():
    param.requires_grad = False

  effnet_sorter.load_state_dict(torch.load(f="../models/classification/batch_1/efficientnet_b0_lego_sorter.pt"))


In [36]:
effdet_model = effdet.create_model("tf_efficientdet_d0", pretrained_backbone=False).to(device)
effdet_model.backbone.load_state_dict(effnet_classifier.state_dict(), strict=False)

_IncompatibleKeys(missing_keys=[], unexpected_keys=['conv_head.weight', 'bn2.weight', 'bn2.bias', 'bn2.running_mean', 'bn2.running_var', 'bn2.num_batches_tracked', 'classifier.1.weight', 'classifier.1.bias'])

In [35]:
summary(model=effnet_classifier, 
        input_size=(64, 3, 512, 512), # make sure this is "input_size", not "input_shape"
        # col_names=["input_size"], # uncomment for smaller output
        col_names=["input_size", "output_size", "num_params", "trainable"],
        col_width=20,
        row_settings=["var_names"]
)

Layer (type (var_name))                            Input Shape          Output Shape         Param #              Trainable
EfficientNet (EfficientNet)                        [64, 3, 512, 512]    [64, 200]            --                   True
├─Conv2d (conv_stem)                               [64, 3, 512, 512]    [64, 32, 256, 256]   864                  True
├─BatchNormAct2d (bn1)                             [64, 32, 256, 256]   [64, 32, 256, 256]   64                   True
│    └─Identity (drop)                             [64, 32, 256, 256]   [64, 32, 256, 256]   --                   --
│    └─SiLU (act)                                  [64, 32, 256, 256]   [64, 32, 256, 256]   --                   --
├─Sequential (blocks)                              [64, 32, 256, 256]   [64, 320, 16, 16]    --                   True
│    └─Sequential (0)                              [64, 32, 256, 256]   [64, 16, 256, 256]   --                   True
│    │    └─DepthwiseSeparableConv (0)         

In [114]:
effdet_model.eval()
dummy_input = torch.randn(1, 3, 512, 512).to(device)  # Batch of 1, 3 channels, 512x512 image
outputs = effdet_model(dummy_input)

print(outputs)

([tensor([[[[-4.5951, -4.5951, -4.5951,  ..., -4.5951, -4.5951, -4.5951],
          [-4.5951, -4.5951, -4.5951,  ..., -4.5951, -4.5951, -4.5951],
          [-4.5951, -4.5951, -4.5952,  ..., -4.5951, -4.5951, -4.5951],
          ...,
          [-4.5951, -4.5951, -4.5952,  ..., -4.5952, -4.5951, -4.5951],
          [-4.5951, -4.5951, -4.5951,  ..., -4.5951, -4.5952, -4.5951],
          [-4.5951, -4.5951, -4.5951,  ..., -4.5951, -4.5951, -4.5951]],

         [[-4.5951, -4.5951, -4.5951,  ..., -4.5951, -4.5951, -4.5951],
          [-4.5951, -4.5951, -4.5951,  ..., -4.5951, -4.5951, -4.5951],
          [-4.5951, -4.5951, -4.5951,  ..., -4.5951, -4.5951, -4.5951],
          ...,
          [-4.5951, -4.5951, -4.5951,  ..., -4.5951, -4.5951, -4.5951],
          [-4.5951, -4.5951, -4.5951,  ..., -4.5951, -4.5951, -4.5951],
          [-4.5951, -4.5951, -4.5951,  ..., -4.5951, -4.5951, -4.5951]],

         [[-4.5951, -4.5951, -4.5951,  ..., -4.5951, -4.5951, -4.5951],
          [-4.5951, -4.5951,