# 0. Library 

In [3]:
import torch
from torch import nn

from einops import rearrange
from einops.layers.torch import Rearrange

# 2. Module

In [41]:
def conv_3x3_bn(inp, oup, image_size, downsample=False):
    stride = 1 if downsample == False else 2
    return nn.Sequential(
        nn.Conv2d(inp, oup, 3, stride, 1, bias=False),
        nn.BatchNorm2d(oup),
        nn.GELU()
    )

In [42]:
class PreNorm(nn.Module):
    def __init__(self, dim, fn, norm):
        super().__init__()
        self.norm = norm(dim)
        self.fn = fn

    def forward(self, x, **kwargs):
        return self.fn(self.norm(x), **kwargs)

In [43]:
class SE(nn.Module):
    def __init__(self, inp, oup, expansion=0.25):
        super().__init__()

        # 1x1 크기의 Adaptive 평균 풀링을 수행하여 채널별 평균값을 계산
        self.avg_pool = nn.AdaptiveAvgPool2d(1)

        # 두 개의 선형 레이어를 사용하여 채널별 중요도를 조정
        self.fc = nn.Sequential(
            nn.Linear(oup, int(inp * expansion), bias=False),  # 입력 차원을 줄임 (채널 축소)
            nn.GELU(),  # 활성화 함수 GELU 사용
            nn.Linear(int(inp * expansion), oup, bias=False),  # 다시 원래 차원으로 복구 (채널 확장)
            nn.Sigmoid()  # 0~1 범위로 정규화하여 채널별 중요도 생성
        )

    def forward(self, x):
        b, c, _, _ = x.size()  # 입력 텐서의 크기: (batch, channels, height, width)

        # Global Average Pooling을 적용하여 각 채널의 평균값을 구함 (특징 압축)
        y = self.avg_pool(x).view(b, c)  # (b, c, 1, 1) -> (b, c)

        # 채널별 중요도 계산 (Squeeze-and-Excitation 연산)
        y = self.fc(y).view(b, c, 1, 1)  # 다시 (b, c, 1, 1) 형태로 변환

        # 입력 특성 맵에 채널별 가중치 적용 (채널별 중요도를 곱함)
        return x * y  # (b, c, h, w) * (b, c, 1, 1)


In [44]:
class FeedForward(nn.Module):

    def __init__(self, dim, hidden_dim, dropout=0.):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(dim ,hidden_dim),
            nn.GELU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim, dim),
            nn.Dropout(dropout)
        )

    def forward(self, x):
        return self.net(x)

In [53]:
class MBConv(nn.Module):

    def __init__(self, inp, oup, image_size, downsample=False, expansion=4):
        super().__init__()
        self.downsample = downsample
        stride = 1 if self.downsample == False else 2
        hidden_dim = int(inp * expansion)

        if self.downsample:
            self.pool = nn.MaxPool2d(3, 2, 1)
            self.proj = nn.Conv2d(inp, oup, 1, 1, 0, bias=False)

        if expansion == 1:
            self.conv = nn.Sequential(
                # dw
                nn.Conv2d(hidden_dim, hidden_dim, 3, stride,
                          1, groups=hidden_dim, bias=False),
                nn.BatchNorm2d(hidden_dim),
                nn.GELU(),
                # pw-linear
                nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False),
                nn.BatchNorm2d(oup),
            )
        else:
            self.conv = nn.Sequential(
                # pw
                # down-sample in the first conv
                nn.Conv2d(inp, hidden_dim, 1, stride, 0, bias=False),
                nn.BatchNorm2d(hidden_dim),
                nn.GELU(),
                # dw
                nn.Conv2d(hidden_dim, hidden_dim, 3, 1, 1,
                          groups=hidden_dim, bias=False),
                nn.BatchNorm2d(hidden_dim),
                nn.GELU(),
                SE(inp, hidden_dim),
                # pw-linear
                nn.Conv2d(hidden_dim, oup, 1, 1, 0, bias=False),
                nn.BatchNorm2d(oup),
            )
        
        
        self.conv = PreNorm(inp, self.conv, nn.BatchNorm2d)

    def forward(self, x):
        if self.downsample:
            return self.proj(self.pool(x)) + self.conv(x)
        else:
            return x + self.conv(x)

In [54]:
class Attention(nn.Module):
    def __init__(self, inp, oup, image_size, heads=8, dim_head=32, dropout=0.):
        super().__init__()
        
        # 내부 차원 계산 (head 수 * head당 차원 크기)
        inner_dim = dim_head * heads

        # output projection이 필요한지 여부 (head가 1개이고 차원이 같다면 불필요)
        project_out = not (heads == 1 and dim_head == inp)

        # 이미지 크기 저장 (ih: height, iw: width)
        self.ih, self.iw = image_size

        self.heads = heads
        self.scale = dim_head ** -0.5  # Self-Attention 점수 스케일링 (1 / sqrt(d_k))

        # 🔹 상대적 위치 임베딩 테이블 (Learnable Parameters)
        self.relative_bias_table = nn.Parameter(
            torch.zeros((2 * self.ih - 1) * (2 * self.iw - 1), heads)  # (H*W, heads)
        )

        # 🔹 상대적 위치 좌표 생성
        coords = torch.meshgrid((torch.arange(self.ih), torch.arange(self.iw)))  
        coords = torch.flatten(torch.stack(coords), 1)  # 좌표를 펼쳐서 (2, H*W) 형태로 변환

        # 모든 위치 쌍 간의 상대적 위치 계산
        relative_coords = coords[:, :, None] - coords[:, None, :]  # (2, H*W, H*W)

        # 상대적 위치 값을 양수로 변환 (인덱싱을 위해)
        relative_coords[0] += self.ih - 1
        relative_coords[1] += self.iw - 1
        relative_coords[0] *= 2 * self.iw - 1  # 위치 값이 고유한 인덱스가 되도록 변환

        # (2, H*W, H*W) -> (H*W, H*W, 2)
        relative_coords = rearrange(relative_coords, 'c h w -> h w c')

        # 최종 상대 위치 인덱스 생성 (flatten 후 차원 확장)
        relative_index = relative_coords.sum(-1).flatten().unsqueeze(1)  # (H*W * H*W, 1)

        # register_buffer를 사용해 학습하지 않는 텐서로 저장 (모델 저장 시 함께 저장됨)
        self.register_buffer("relative_index", relative_index)

        # 🔹 Self-Attention Softmax
        self.attend = nn.Softmax(dim=-1)

        # 🔹 Query, Key, Value 생성 (선형 변환)
        self.to_qkv = nn.Linear(inp, inner_dim * 3, bias=False)  # (batch, seq_len, inner_dim * 3)

        # 🔹 출력 프로젝션 (필요 시 적용)
        self.to_out = nn.Sequential(
            nn.Linear(inner_dim, oup),  # 최종 출력 크기로 변환
            nn.Dropout(dropout)  # 과적합 방지를 위한 Dropout 적용
        ) if project_out else nn.Identity()

    def forward(self, x):
        # 🔹 Query, Key, Value 분할
        qkv = self.to_qkv(x).chunk(3, dim=-1)  # (batch, seq_len, inner_dim) 3개로 나눔
        q, k, v = map(lambda t: rearrange(
            t, 'b n (h d) -> b h n d', h=self.heads), qkv)  # (batch, heads, seq_len, dim_head)

        # 🔹 Self-Attention Score 계산 (Q x K^T)
        dots = torch.matmul(q, k.transpose(-1, -2)) * self.scale  # (batch, heads, seq_len, seq_len)

        # 🔹 상대적 위치 임베딩 적용
        relative_bias = self.relative_bias_table.gather(
            0, self.relative_index.repeat(1, self.heads)  # 상대적 위치에 해당하는 bias 선택
        )

        # (H*W * H*W, heads) -> (1, heads, H*W, H*W)
        relative_bias = rearrange(
            relative_bias, '(h w) c -> 1 c h w', h=self.ih*self.iw, w=self.ih*self.iw
        )

        # 어텐션 점수에 상대적 위치 임베딩 추가
        dots = dots + relative_bias

        # 🔹 Softmax를 통해 어텐션 가중치 계산
        attn = self.attend(dots)

        # 🔹 어텐션 적용 (V와 곱함)
        out = torch.matmul(attn, v)  # (batch, heads, seq_len, dim_head)

        # 🔹 병합 (multi-head -> single representation)
        out = rearrange(out, 'b h n d -> b n (h d)')  # (batch, seq_len, inner_dim)

        # 🔹 최종 출력 변환
        out = self.to_out(out)  # (batch, seq_len, oup)
        return out


In [61]:
class Transformer(nn.Module):

    def __init__(self, inp, oup, image_size, heads=8, dim_head=32, downsample=False, dropout=0.):
        super().__init__()
        hidden_dim = int(inp * 4)

        self.ih, self.iw = image_size
        self.downsample = downsample 

        if self.downsample:
            self.pool1 = nn.MaxPool2d(3,2,1)
            self.pool2 = nn.MaxPool2d(3,2,1)
            self.proj = nn.Conv2d(inp, oup, 1, 1, 0, bias=False)

        self.attn = Attention(inp, oup, image_size, heads, dim_head, dropout)
        self.ff = FeedForward(oup, hidden_dim, dropout)

        self.attn = nn.Sequential(
            Rearrange('b c ih iw -> b (ih iw) c'),
            PreNorm(inp, self.attn, nn.LayerNorm),
            Rearrange('b (ih iw ) c -> b c ih iw', ih=self.ih, iw=self.iw)
        )

        self.ff = nn.Sequential(
            Rearrange('b c ih iw -> b (ih iw) c'),
            PreNorm(oup, self.ff, nn.LayerNorm),
            Rearrange('b (ih iw) c -> b c ih iw', ih=self.ih, iw=self.iw)
        )

    def forward(self, x):
        if self.downsample:
            x = self.proj(self.pool1(x)) + self.attn(self.pool2(x))
        else:
            x = x + self.attn(x)

        x = x + self.ff(x)

        return x

# 2. 모형

In [62]:
class CoAtNet(nn.Module):
    def __init__(self, image_size, in_channels, num_blocks, channels, num_classes=1000, block_types=['C', 'C', 'T', 'T']):
        super().__init__()
        ih, iw = image_size
        block = {'C': MBConv, 'T': Transformer}

        self.s0 = self._make_layer(
            conv_3x3_bn, in_channels, channels[0], num_blocks[0], (ih // 2, iw // 2))
        self.s1 = self._make_layer(
            block[block_types[0]], channels[0], channels[1], num_blocks[1], (ih // 4, iw // 4))
        self.s2 = self._make_layer(
            block[block_types[1]], channels[1], channels[2], num_blocks[2], (ih // 8, iw // 8))
        self.s3 = self._make_layer(
            block[block_types[2]], channels[2], channels[3], num_blocks[3], (ih // 16, iw // 16))
        self.s4 = self._make_layer(
            block[block_types[3]], channels[3], channels[4], num_blocks[4], (ih // 32, iw // 32))

        self.pool = nn.AvgPool2d(ih // 32, 1)
        self.fc = nn.Linear(channels[-1], num_classes, bias=False)

    def forward(self, x):
        x = self.s0(x)
        x = self.s1(x)
        x = self.s2(x)
        x = self.s3(x)
        x = self.s4(x)

        x = self.pool(x).view(-1, x.shape[1])
        x = self.fc(x)
        return x

    def _make_layer(self, block, inp, oup, depth, image_size):
        layers = nn.ModuleList([])
        for i in range(depth):
            if i == 0:
                layers.append(block(inp, oup, image_size, downsample=True))
            else:
                layers.append(block(oup, oup, image_size))
        return nn.Sequential(*layers)


In [63]:
def coatnet():
    num_blocks = [2, 2, 12, 28, 2]          # L
    channels = [192, 192, 384, 768, 1536]   # D
    return CoAtNet((224, 224), 3, num_blocks, channels, num_classes=1000)

In [64]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

In [65]:
img = torch.randn(1,3,224,224)

In [67]:
model = coatnet().to('cuda')

In [68]:
output = model(img.to('cuda'))

In [70]:
output.shape

torch.Size([1, 1000])