In [None]:
import math
import os
import pathlib
import shutil
from math import ceil, sqrt
from pathlib import Path
from typing import Any, Callable, List, Optional, Sequence, Tuple

import cv2
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import PIL
import rootutils
import torch
import torch.nn as nn
import torch.nn.functional as F
import torchvision.transforms.v2 as T
import torchvision.transforms.v2.functional as TF
from skimage.metrics import structural_similarity
from sklearn.model_selection import GroupShuffleSplit
from torch.utils.data import ConcatDataset, Dataset
from torchmetrics import Metric
from torchvision.io import read_image
from tqdm import tqdm

root = rootutils.setup_root(search_from=".", indicator=".project-root", pythonpath=True)

# from src.data.components.dataset import FetalPlanesDataset, USVideosDataset
# from src.data.utils.utils import show_numpy_images, show_pytorch_images
# from src.models.fetal_module import FetalLitModule

root

In [None]:
from src.models.components.utils import get_model

model = get_model(name="efficientnet_v2_m")

In [None]:
for name, layer in model.model.named_children():
    print(name)

model

In [None]:
from torchvision.models import get_model

efficientnet = get_model(name="efficientnet_v2_m", weights="DEFAULT")

In [None]:
for name, layer in efficientnet.features.named_children():
    print(name)

efficientnet

In [None]:
resnet = get_model(name="resnet101", weights="DEFAULT")

for name, layer in resnet.named_children():
    print(name)

resnet

In [None]:
class ResNet(nn.Module):
    supported_models = ["resnet18", "resnet34", "resnet50", "resnet101", "resnet152"]

    freez_layers_name = [
        "conv1",
        "bn1",
        "layer1",
        "layer2",
        "layer3",
        "layer4",
    ]

    def __init__(
        self,
        name: str = "resnet18",
        output_size: int = 6,
        pretrain: bool = True,
        freez_layers: int = 0,
    ):
        super().__init__()

        assert name in self.supported_models
        self.model = get_model(name=name, weights="DEFAULT" if pretrain else None)

        if freez_layers == 0:
            # replace input 3 channels with 1 channel
            old_conv = self.model.conv1
            conv = nn.Conv2d(
                in_channels=1,
                out_channels=old_conv.out_channels,
                kernel_size=old_conv.kernel_size,
                stride=old_conv.stride,
                padding=old_conv.padding,
                dilation=old_conv.dilation,
                groups=old_conv.groups,
                bias=old_conv.bias is not None,
                padding_mode=old_conv.padding_mode,
            )
            conv.weight = nn.Parameter(torch.mean(old_conv.weight, dim=1, keepdim=True))
            self.model.conv1 = conv

        # output
        self.classifier = nn.Linear(
            in_features=self.model.fc.in_features,
            out_features=output_size,
        )
        self.model.fc = nn.Identity()

        for layer_name in self.freez_layers_name[:freez_layers]:
            freez_model_layers(self.model, layer_name, False)

    def forward(self, x):
        dense_logits = self.model(x)
        return dense_logits, self.classifier(dense_logits)


model = ResNet(name="resnet101", freez_layers=3)

In [None]:
class EfficientNet(nn.Module):
    supported_efficientnet_models = ["efficientnet_b0"]
    supported_efficientnet_v2_models = ["efficientnet_v2_s", "efficientnet_v2_m", "efficientnet_v2_l"]
    supported_models = supported_efficientnet_models + supported_efficientnet_v2_models

    freez_layers_name = [
        "features.0",
        "features.1",
        "features.2",
        "features.3",
        "features.4",
        "features.5",
        "features.6",
        "features.7",
        "features.8",
    ]

    def __init__(
        self,
        name: str = "efficientnet_b0",
        output_size: int = 6,
        pretrain: bool = True,
        freez_layers: int = 0,
        dropout: float | None = None,
    ):
        super().__init__()

        assert name in self.supported_models

        get_model_param = {"name": name, "weights": "DEFAULT" if pretrain else None}
        if dropout is not None:
            get_model_param["dropout"] = dropout
        self.model = get_model(**get_model_param)

        # input
        if freez_layers == 0:
            old_conv = self.model.features[0][0]
            conv = nn.Conv2d(
                in_channels=1,
                out_channels=old_conv.out_channels,
                kernel_size=old_conv.kernel_size,
                stride=old_conv.stride,
                padding=old_conv.padding,
                dilation=old_conv.dilation,
                groups=old_conv.groups,
                bias=old_conv.bias is not None,
                padding_mode=old_conv.padding_mode,
            )
            conv.weight = nn.Parameter(torch.mean(old_conv.weight, dim=1, keepdim=True))
            self.model.features[0][0] = conv

        # output
        self.classifier = nn.Linear(
            in_features=self.model.classifier[-1].in_features,
            out_features=output_size,
        )
        self.model.classifier = self.model.classifier[:-1]

        freez_model_layers(model=self.model, layers_name=self.freez_layers_name[:freez_layers], freez_batch_norm=False)

    def forward(self, x):
        dense_logits = self.model(x)
        return dense_logits, self.classifier(dense_logits)


model = EfficientNet(name="efficientnet_v2_m", freez_layers=3)

In [None]:
def freez_model_layers(model, layers_name: [str], freez_batch_norm: bool = True):

    for layer_name in layers_name:
        module = model
        for layer in layer_name.split("."):
            module = getattr(module, layer)

        freez_model_layer(module, freez_batch_norm=freez_batch_norm)


batch_norm_classes = [
    "BatchNorm1d",
    "BatchNorm2d",
    "BatchNorm3d",
    "LazyBatchNorm1d",
    "LazyBatchNorm2d",
    "LazyBatchNorm3d",
]


def freez_model_layer(model, freez_batch_norm: bool = True):
    if len(list(model.named_children())) == 0:
        if freez_batch_norm or model.__class__.__name__ not in batch_norm_classes:
            model.requires_grad_(requires_grad=False)
    else:
        for layer in model.children():
            freez_model_layer(layer, freez_batch_norm)


def summary(model, spaces=0):
    if len(list(model.named_children())) == 0:
        for name, param in model.named_parameters():
            space_name = f"{' ' * spaces}{name}:"
            print(f"{space_name:<50} {param.requires_grad}")
    else:
        for name, layer in model.named_children():
            print(f"{' ' * spaces}{name}: {layer.__class__.__name__}")
            summary(model=layer, spaces=spaces + 2)


summary(model)

In [None]:
batch_norm_classes = [
    "BatchNorm1d",
    "BatchNorm2d",
    "BatchNorm3d",
    "LazyBatchNorm1d",
    "LazyBatchNorm2d",
    "LazyBatchNorm3d",
]

"BatchNorm2d" in batch_norm_classes

In [None]:
class SonoNet(nn.Module):
    supported_models = ["SN16", "SN32", "SN64"]

    feature_cfg_dict = {
        "SN16": [16, 16, "M", 32, 32, "M", 64, 64, 64, "M", 128, 128, 128, "M", 128, 128, 128],
        "SN32": [32, 32, "M", 64, 64, "M", 128, 128, 128, "M", 256, 256, 256, "M", 256, 256, 256],
        "SN64": [64, 64, "M", 128, 128, "M", 256, 256, 256, "M", 512, 512, 512, "M", 512, 512, 512],
    }

    def __init__(
        self,
        name: str = "SN64",
        output_size: int = 6,
        pretrain: bool = True,
    ):
        super().__init__()

        assert name in self.supported_models

        feature_cfg = self.feature_cfg_dict[name]
        self.features = self._make_feature_layers(feature_cfg, 1)

        feature_channels = feature_cfg[-1]
        adaption_channels = feature_channels // 2
        self.adaption = self._make_adaption_layer(feature_channels, adaption_channels, 14)

        if pretrain:
            # weights_path = os.path.join(os.path.dirname(__file__), "weights", 'SonoNet{}.pth'.format(name[2:]))
            weights_path = root / "src" / "models" / "components" / "weights" / f"SonoNet{name[2:]}.pth"
            self.load_weights(weights_path)
        else:
            self.apply(self._initialize_weights)

        self.adaption[3] = nn.Conv2d(adaption_channels, output_size, 1, bias=False)
        self.adaption[4] = nn.BatchNorm2d(output_size)
        self._initialize_weights(self.adaption[3])
        self._initialize_weights(self.adaption[4])

    def forward(self, x):
        x = self.features(x)

        x = self.adaption(x)
        y = F.avg_pool2d(x, x.size()[2:]).view(x.size(0), -1)
        # y = F.softmax(y, dim=1)

        return x, y

    @classmethod
    def _make_feature_layers(cls, feature_cfg, in_channels):
        layers = []
        conv_layers = []
        for v in feature_cfg:
            if v == "M":
                conv_layers.append(nn.MaxPool2d(2))
                layers.append(nn.Sequential(*conv_layers))
                conv_layers = []
            else:
                conv_layers.append(cls._conv_layer(in_channels, v))
                in_channels = v
        layers.append(nn.Sequential(*conv_layers))
        return nn.Sequential(*layers)

    @staticmethod
    def _conv_layer(in_channels, out_channels):
        return nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1, bias=False),
            nn.BatchNorm2d(out_channels, eps=1e-4),
            nn.ReLU(inplace=True),
        )

    @staticmethod
    def _make_adaption_layer(feature_channels, adaption_channels, num_labels):
        return nn.Sequential(
            nn.Conv2d(feature_channels, adaption_channels, 1, bias=False),
            nn.BatchNorm2d(adaption_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(adaption_channels, num_labels, 1, bias=False),
            nn.BatchNorm2d(num_labels),
        )

    @staticmethod
    def _initialize_weights(m):
        if isinstance(m, nn.Conv2d):
            n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
            m.weight.data.normal_(0, math.sqrt(2.0 / n))
            if m.bias is not None:
                m.bias.data.zero_()
        elif isinstance(m, nn.BatchNorm2d):
            m.weight.data.fill_(1)
            m.bias.data.zero_()
        elif isinstance(m, nn.Linear):
            m.weight.data.normal_(0, 0.01)
            m.bias.data.zero_()

    def load_weights(self, weights_path):
        state = torch.load(weights_path)
        self.load_state_dict(state, strict=True)


sononet = SonoNet(
    name="SN64",
    output_size=6,
    pretrain=True,
)

logits, y_hat = sononet(torch.rand(1, 1, 165, 240))
print(logits.view(1, -1).shape)
print(y_hat.shape)

In [None]:
logits, y_hat = model(torch.rand(1, 1, 165, 240))
print(logits.shape)
print(y_hat.shape)

In [None]:
torch.rand(1, 1, 165, 240).shape

In [None]:
for name, param in model.named_parameters():
    print(name, param.requires_grad)

In [None]:
torch.randint(0, 3, [])

In [None]:
torch.randint(0, 3, (1,))

In [None]:
a = [1, 2, 3]
i = torch.randint(0, 3, (1,))

a[i]

In [None]:
x = torch.tensor([1], dtype=torch.float32)
print(x)
print(torch.mean(torch.cat([x])))
print(torch.mean(torch.cat([x, x, x])))

In [None]:
x = torch.tensor([[1, 2, 3]], dtype=torch.float32)
print(x)
print(torch.mean(torch.cat([x]), dim=0))
print(torch.mean(torch.cat([x, x, x]), dim=0))

In [None]:
torch.exp(torch.tensor(float("-inf")))

In [None]:
loss = torch.nn.CrossEntropyLoss()

x = torch.tensor([[0.0, 0.0, 0.0]])
y = torch.tensor([1])
loss(x, y)

In [None]:
loss = torch.nn.CrossEntropyLoss()

x = torch.tensor([[float("-inf"), float("-inf"), 0.0, 0.0, 0.0]])
y = torch.tensor([3])
loss(x, y)

In [None]:
softmax = torch.nn.Softmax(dim=1)

x = torch.tensor([[0.0, 0.0, 0.0]])
softmax(x)

In [None]:
softmax = torch.nn.Softmax(dim=1)

x = torch.tensor([[float("-inf"), float("-inf"), 0.0, 0.0, 0.0]])
softmax(x)

In [None]:
x = torch.tensor([[0.0, 0.0, 0.0, 0.0, 0.0]])
mask = torch.tensor([1, 1, 0, 0, 0])
torch.masked_fill(x, mask, value=float("-inf"))

In [None]:
masks = torch.nn.functional.one_hot(torch.arange(0, 3), num_classes=5)
print(len(masks))
masks

In [None]:
x = torch.tensor([[0.0, 0.0, 0.0, 0.0, 0.0]])
mask = torch.tensor([1, 1, 0, 0, 0], dtype=torch.bool)
x.masked_fill(mask, value=float("-inf"))

In [None]:
for i, c in enumerate(torch.tensor([1, 1, 0, 0, 0], dtype=torch.bool)):
    if c:
        print(i)

In [None]:
import torch

img = torch.zeros((2, 1, 8, 8))

img.expand(-1, 3, -1, -1).shape