# Assignment 1 - Code Example - Part A

This code baseline is inspired by and modified from [this great tutorial](https://pytorch.org/tutorials/beginner/blitz/cifar10_tutorial.html).

This code can achieve an accuracy of approximately 86.50% on CIFAR-10. Please set up the environment and run your experiments starting from this baseline. You are expected to achieve an accuracy higher than this baseline.

## data and pre

In [None]:
# import some necessary packages
import torch
import torch.nn as nn
import torch.optim as optim
 
import torchvision.datasets as tv_datasets
import torchvision.transforms as tv_transforms

import torch.nn.functional as F
from torch.optim.lr_scheduler import ChainedScheduler, LambdaLR, CosineAnnealingLR







from time import time

In [None]:
# some experimental setup
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

num_epochs = 128
batch_size = 64
num_workers = 2
print_every = 200

# optim_name = "Adam"
# optim_kwargs = dict(
#     lr=3e-4,
#     weight_decay=1e-6,
# )

# preprocessing pipeline for input images
transformation = dict()
for data_type in ("train", "test"):
    is_train = data_type=="train"
    transformation[data_type] = tv_transforms.Compose(([
        tv_transforms.RandomCrop(32, padding=4),
        tv_transforms.RandomRotation(degrees=15),
        tv_transforms.RandomHorizontalFlip(),
        tv_transforms.RandomAffine(degrees=0, translate=(0.1, 0.1)),
    ] if is_train else []) + 
    [
        tv_transforms.ToTensor(),
        tv_transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
        # tv_transforms.Normalize(mean=[0.5, 0.5, 0.5], std=[0.5, 0.5, 0.5]),
    ])

# TODO： 增加normliaze和参数。RandomCrop

In [5]:
# prepare datasets

dataset, loader = {}, {}
for data_type in ("train", "test"):
    is_train = data_type=="train"
    dataset[data_type] = tv_datasets.CIFAR10(
        
        root="./data", train=is_train, download=False, transform=transformation[data_type],
    )
    loader[data_type] = torch.utils.data.DataLoader(
        dataset[data_type], batch_size=batch_size, shuffle=is_train, num_workers=num_workers,
    )


## model

### ConvBlock

In [28]:
class ConvBlock(nn.Module):
    def __init__(self,in_channels, out_channels, kernel_size, stride=1, padding=0, Is_BN=True, Is_reg=True, Is_res=True):
        super().__init__()
        if not Is_BN:
            self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding, bias=True)
        else:
            self.conv = nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding, bias=False)
        self.relu = nn.ReLU(inplace=True)
        self.Is_reg = Is_reg
        self.Is_BN = Is_BN
        self.Is_res = Is_res
        if Is_BN:
            self.bn = nn.BatchNorm2d(out_channels)
        if Is_reg:
            self.maxpool = nn.MaxPool2d(2)
            self.dp = nn.Dropout(0.3)
        if Is_res:
            self.shortcut = nn.Sequential()
            if stride != 1 or in_channels != out_channels:
                self.shortcut =  nn.Conv2d(in_channels, out_channels, 
                            kernel_size=1, stride=stride, bias=False)
                    
                
        
    
    def forward(self, x):
        # print(f"[DEBUG] Input shape: {x.shape if x is not None else 'NULL'}")
        out = self.conv(x)
        out = self.relu(out)
        if self.Is_BN:
            out = self.bn(out)
        if self.Is_res:
            out += self.shortcut(x)
            out = F.relu(out)
            
        if self.Is_reg:
            out = self.maxpool(out)
            out = self.dp(out)

        # print(f"[DEBUG] Post-conv shape: {out.shape}")
        return out

class ConvBs(nn.Module):
    def __init__(self, num_layers,layer_dict):
        super().__init__()
        self.layers = nn.ModuleList([
            ConvBlock(**layer_dict[i])
            for i in range(num_layers)
        ])


    def forward(self, x, train=True):
        for layer in self.layers:
            x = layer(x)
        return x

### initial net

In [7]:
# our network architecture

net = nn.Sequential(
    nn.Conv2d(3, 128, 3, padding=1), nn.ReLU(inplace=True), nn.MaxPool2d(2), nn.Dropout(0.3),
    nn.Conv2d(128, 256, 3, padding=1), nn.ReLU(inplace=True), nn.MaxPool2d(2), nn.Dropout(0.3),
    nn.Conv2d(256, 512, 3, padding=1), nn.ReLU(inplace=True),
    nn.Conv2d(512, 512, 3, padding=1), nn.ReLU(inplace=True),
    nn.Conv2d(512, 256, 3, padding=1), nn.ReLU(inplace=True), nn.MaxPool2d(2), nn.Dropout(0.3),
    nn.Flatten(),
    nn.Linear(256 * 4 * 4, 512), nn.ReLU(inplace=True), nn.Dropout(0.5),
    nn.Linear(512, 256), nn.ReLU(inplace=True), nn.Dropout(0.5),
    nn.Linear(256, 128), nn.ReLU(inplace=True), nn.Dropout(0.5),
    nn.Linear(128, 10),
)

# move to device
net.to(device)

# print the number of parameters
print(f"number of parameters: {sum(p.numel() for p in net.parameters() if p.requires_grad) / 1_000_000:.2f}M")

number of parameters: 7.28M


In [8]:
# 模块化设计
# 增加filter 和layers
# google net : v1:NIN+global pooling
# v2: BN + 5*5 -> 2 3*3
# v3: factorization
# residual

### ViT

In [9]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.nn.init import xavier_uniform_, normal_

class AddPositionEmbs(nn.Module):
    def __init__(self, seq_len, emb_dim):
        super().__init__()
        self.pos_embedding = nn.Parameter(torch.randn(1, seq_len, emb_dim) * 0.02)
    
    def forward(self, x):
        return x + self.pos_embedding

class MlpBlock(nn.Module):
    def __init__(self, in_dim, mlp_dim, dropout=0.1):
        super().__init__()
        self.fc1 = nn.Linear(in_dim, mlp_dim)
        self.fc2 = nn.Linear(mlp_dim, in_dim)
        self.dropout = nn.Dropout(dropout)
        
        # 初始化参数
        xavier_uniform_(self.fc1.weight)
        normal_(self.fc1.bias, std=1e-6)
        xavier_uniform_(self.fc2.weight)
        normal_(self.fc2.bias, std=1e-6)

    def forward(self, x):
        x = self.fc1(x)
        x = F.gelu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        return self.dropout(x)

class Encoder1DBlock(nn.Module):
    def __init__(self, hidden_dim, mlp_dim, num_heads, dropout=0.1, attn_dropout=0.1):
        super().__init__()
        self.norm1 = nn.LayerNorm(hidden_dim)
        self.attn = nn.MultiheadAttention(
            embed_dim=hidden_dim,
            num_heads=num_heads,
            dropout=attn_dropout,
            batch_first=True
        )
        self.dropout = nn.Dropout(dropout)
        self.norm2 = nn.LayerNorm(hidden_dim)
        self.mlp = MlpBlock(hidden_dim, mlp_dim, dropout)
        
        # 注意力层初始化
        xavier_uniform_(self.attn.in_proj_weight)
        normal_(self.attn.in_proj_bias, std=1e-6)
        xavier_uniform_(self.attn.out_proj.weight)
        normal_(self.attn.out_proj.bias, std=1e-6)

    def forward(self, x):
        attn_output, _ = self.attn(
            query=self.norm1(x),
            key=self.norm1(x),
            value=self.norm1(x)
        )
        x = x + self.dropout(attn_output)
        x = x + self.mlp(self.norm2(x))
        return x

class Encoder(nn.Module):
    def __init__(self, num_layers, hidden_dim, mlp_dim, num_heads, dropout=0.1, attn_dropout=0.1):
        super().__init__()
        self.layers = nn.ModuleList([
            Encoder1DBlock(hidden_dim, mlp_dim, num_heads, dropout, attn_dropout)
            for _ in range(num_layers)
        ])
        self.pos_emb = AddPositionEmbs(seq_len=65, emb_dim=hidden_dim)  # 默认ViT-B/16
        self.dropout = nn.Dropout(dropout)
        self.norm = nn.LayerNorm(hidden_dim)

    def forward(self, x, train=True):
        x = self.pos_emb(x)
        x = self.dropout(x) if train else x
        for layer in self.layers:
            x = layer(x)
        return self.norm(x)

class VisionTransformer(nn.Module):
    def __init__(self, 
                 num_classes, 
                 img_size=224,
                 patch_size=16,
                 hidden_dim=768,
                 num_layers=12,
                 num_heads=12,
                 mlp_dim=3072,
                 dropout=0.1,
                 attn_dropout=0.1,
                 representation_size=None):
        
        super().__init__()
        num_patches = (img_size // patch_size) ** 2
        self.patch_embed = nn.Conv2d(
            in_channels=3,
            out_channels=hidden_dim,
            kernel_size=patch_size,
            stride=patch_size
        )
        
        # 分类token
        self.cls_token = nn.Parameter(torch.zeros(1, 1, hidden_dim))
        
        # Transformer编码器
        self.encoder = Encoder(
            num_layers=num_layers,
            hidden_dim=hidden_dim,
            mlp_dim=mlp_dim,
            num_heads=num_heads,
            dropout=dropout,
            attn_dropout=attn_dropout
        )
        
        # 分类头
        self.pre_logits = nn.Identity()
        if representation_size:
            self.pre_logits = nn.Sequential(
                nn.Linear(hidden_dim, representation_size),
                nn.Tanh()
            )
            hidden_dim = representation_size
            
        self.head = nn.Linear(hidden_dim, num_classes)
        
        # 初始化
        nn.init.trunc_normal_(self.cls_token, std=0.02)
        self._init_weights()

    def _init_weights(self):
        # 卷积层初始化
        nn.init.xavier_uniform_(self.patch_embed.weight)
        nn.init.normal_(self.patch_embed.bias, std=1e-6)
        
        # 分类头初始化
        nn.init.zeros_(self.head.weight)
        nn.init.constant_(self.head.bias, 0)

    def forward(self, x):
        # 分块嵌入 [B, C, H, W] -> [B, hidden_dim, grid, grid]
        x = self.patch_embed(x)  
        B, C, H, W = x.shape
        
        # 展平并转置 [B, C, H*W] -> [B, H*W, C]
        x = x.flatten(2).transpose(1, 2)  
        
        # 添加分类token
        cls_tokens = self.cls_token.expand(B, -1, -1)
        x = torch.cat((cls_tokens, x), dim=1)
        
        # Transformer编码
        x = self.encoder(x, self.training)
        
        # 分类
        x = x[:, 0]  # 取分类token
        x = self.pre_logits(x)
        return self.head(x)

## Train and eval dunc

In [8]:
def evaluate_accuracy(net):
    net.eval()
    correct, total = 0, 0
    with torch.no_grad():
        for img, target in loader["test"]:
            img, target = img.to(device), target.to(device)
            
            # make prediction
            pred = net(img)
            
            # accumulate
            total += len(target)
            correct += (torch.argmax(pred, dim=1) == target).sum().item()
    

    output = f"Accuracy of the network on the {total} test images: {100 * correct / total:.2f}%"

    print(output)
    return  correct / total

class EarlyStopper:
    def __init__(self, patience=5, min_delta=0):
        self.patience = patience
        self.min_delta = min_delta
        self.counter = 0
        self.best_metric = float('inf')
        self.stop = False

    def check_stop(self, val_metric):
        if val_metric > self.best_metric + self.min_delta:
            self.best_metric = val_metric
            self.counter = 0
            
        else:
            self.counter += 1
            if self.counter >= self.patience:
                self.stop = True
                
            

In [None]:


# TODO: 设置停止策略
def train(net,file_name, optim_name,optim_kwargs,scheduler,is_warm, warmup_steps=600):

    lr = optim_kwargs['lr']
    # the network optimizer
    optimizer = getattr(optim, optim_name)(net.parameters(), **optim_kwargs)

    def warmup_lr_lambda(current_step):
        if current_step < warmup_steps:
            return   current_step * lr / warmup_steps
        return lr
    if scheduler == "CosineAnnealingLR" and is_warm:
        warmup_scheduler = LambdaLR(optimizer, lr_lambda=warmup_lr_lambda)

        cosine_scheduler = CosineAnnealingLR(optimizer, T_max=num_epochs - warmup_steps)

        scheduler = ChainedScheduler([warmup_scheduler, cosine_scheduler])
    elif scheduler == "CosineAnnealingLR" and not is_warm:
        scheduler = CosineAnnealingLR(optimizer, T_max=num_epochs)
    elif scheduler is None and is_warm:
        scheduler = LambdaLR(optimizer, lr_lambda=warmup_lr_lambda)

    # elif scheduler =="StepLR":
    #     scheduler = getattr(optim.lr_scheduler, scheduler)(optimizer, step_size=10, gamma=0.1)
    elif scheduler is None and not is_warm:
        pass


    # loss function
    criterion = nn.CrossEntropyLoss()

    # training loop
    # training loop
    net.train()

    outputs = []

    early_stopper = EarlyStopper(patience=5, min_delta=0.02)

    for epoch in range(num_epochs):
        epoch_t = 0
        running_loss = 0.0
        for i, (img, target) in enumerate(loader["train"]):
            s = time()
            img, target = img.to(device), target.to(device)

            pred = net(img)
            loss = criterion(pred, target)

            optimizer.zero_grad()
            loss.backward()
            if scheduler is not None or is_warm:
                
                scheduler.step()
            else:
                optimizer.step()

            e = time()
            epoch_t += e - s
            if i ==0:
                output = f"time: {e-s:.3f} seconds"
                print(output)
            

            # print statistics
            running_loss += loss.item()
            if i % print_every == print_every - 1:
                output = f"[epoch={epoch + 1:3d}, iter={i + 1:5d}] loss: {running_loss / print_every:.3f} epoch time: {epoch_t:.3f} seconds"
                print(output)
                outputs.append(output)
                
                with open(file_name, "w") as f:
                    for out in outputs:
                        f.write(out + "\n")
                running_loss = 0.0
                epoch_t = 0
                # early stopping check 
                val_acc = evaluate_accuracy(net)
                early_stopper.check_stop(val_acc)
                if early_stopper.stop:
                    print(f"Early stopping epoch={epoch + 1:3d}, iter={i + 1:5d}")
                    break
        if early_stopper.stop:
            break
    print("Finished Training")
    return outputs


## Evaluating its accuracy

## run

In [18]:
optim_dict= {
    "SGD":{"lr": 0.1, "momentum": 0.9, "weight_decay": 5e-4},
    "AdamW":{"lr": 0.1, 'betas':(0.9, 0.999), "weight_decay": 5e-4}
}

In [None]:
net = VisionTransformer(
                        num_classes=10,
                            img_size=32,
                            patch_size=4,
                            hidden_dim=192,
                            num_layers=6,
                            num_heads=6,
                            mlp_dim=768,
                            dropout=0.1,
                            attn_dropout=0.1,
                            representation_size=None)

# move to device
net.to(device)

# print the number of parameters
print(f"number of parameters: {sum(p.numel() for p in net.parameters() if p.requires_grad) / 1_000_000:.2f}M")


file_name = "Vit_output.txt"
optim_name =  "AdamW"
optim_kwargs = optim_dict[optim_name]



ops = train(net,file_name,optim_name,optim_kwargs,'CosineAnnealingLR',is_warm=False)


ops.append(evaluate_accuracy(net))
with open(file_name, "w") as f:
    for out in ops:
        f.write(out + "\n")
 
# TODO： 1. adamw 2 warmup 3 consine 4 big batch size 



number of parameters: 2.69M




time: 0.502 seconds
[epoch=  1, iter=  200] loss: 2.303 epoch time: 90.307 seconds


KeyboardInterrupt: 

In [30]:
# TODO： global average pooling
layer_dict = {
    0: {'in_channels': 3, 'out_channels': 128, 'kernel_size': 3, 'Is_reg':False, 'Is_BN': True, 'stride': 1, 'padding': 1, 'Is_res': True},
    1: {'in_channels': 128, 'out_channels': 256, 'kernel_size': 3, 'Is_reg': False, 'Is_BN': True, 'stride': 1, 'padding': 1, 'Is_res': True},
    2: {'in_channels': 256, 'out_channels': 512, 'kernel_size': 3, 'Is_reg': False, 'Is_BN': True, 'stride': 1, 'padding': 1, 'Is_res': True},
    3: {'in_channels': 512, 'out_channels': 1024, 'kernel_size': 3, 'Is_reg': False, 'Is_BN': True, 'stride': 1, 'padding': 1, 'Is_res': True},
    4: {'in_channels': 1024, 'out_channels': 512, 'kernel_size': 3, 'Is_reg': True, 'Is_BN': True, 'stride': 1, 'padding': 1, 'Is_res': True},
    5: {'in_channels': 512, 'out_channels': 256, 'kernel_size': 3, 'Is_reg': True, 'Is_BN': True, 'stride': 1, 'padding': 1, 'Is_res': True},
    6: {'in_channels': 256, 'out_channels': 128, 'kernel_size': 3, 'Is_reg': True, 'Is_BN': True, 'stride': 1, 'padding': 1, 'Is_res': True},
}
net  =nn.Sequential(
ConvBs(7, layer_dict),
nn.Flatten(),
nn.Linear(128 * 4 * 4, 256), nn.ReLU(inplace=True), nn.Dropout(0.5),
nn.Linear(256, 128), nn.ReLU(inplace=True), nn.Dropout(0.5),
nn.Linear(128, 10),
)
# move to device
net.to(device)

# print the number of parameters
print(f"number of parameters: {sum(p.numel() for p in net.parameters() if p.requires_grad) / 1_000_000:.2f}M")



file_name = "BN_output.txt"
optim_name = "SGD" # AdamW
optim_kwargs = {"lr": 0.1, "momentum": 0.9, "weight_decay": 5e-4}

ops = train(net,file_name,optim_name,optim_kwargs,'CosineAnnealingLR',is_warm=True)

ops.append(evaluate_accuracy(net))
with open(file_name, "w") as f:
    for out in ops:
        f.write(out + "\n")

number of parameters: 14.33M




time: 12.133 seconds


: 

In [None]:
# intial network architecture

net = nn.Sequential(
    nn.Conv2d(3, 128, 3, padding=1), nn.ReLU(inplace=True), nn.MaxPool2d(2), nn.Dropout(0.3),
    nn.Conv2d(128, 256, 3, padding=1), nn.ReLU(inplace=True), nn.MaxPool2d(2), nn.Dropout(0.3),
    nn.Conv2d(256, 512, 3, padding=1), nn.ReLU(inplace=True),
    nn.Conv2d(512, 512, 3, padding=1), nn.ReLU(inplace=True),
    nn.Conv2d(512, 256, 3, padding=1), nn.ReLU(inplace=True), nn.MaxPool2d(2), nn.Dropout(0.3),
    nn.Flatten(),
    nn.Linear(256 * 4 * 4, 512), nn.ReLU(inplace=True), nn.Dropout(0.5),
    nn.Linear(512, 256), nn.ReLU(inplace=True), nn.Dropout(0.5),
    nn.Linear(256, 128), nn.ReLU(inplace=True), nn.Dropout(0.5),
    nn.Linear(128, 10),
)

# move to device
net.to(device)

# print the number of parameters
print(f"number of parameters: {sum(p.numel() for p in net.parameters() if p.requires_grad) / 1_000_000:.2f}M")


file_name = "init_output.txt"

optim_name = "SGD" # AdamW
optim_kwargs = {"lr": 0.1, "momentum": 0.9, "weight_decay": 5e-4}

ops = train(net,file_name,optim_name,optim_kwargs)
ops.append(evaluate_accuracy(net))
with open(file_name, "w") as f:
    for out in ops:
        f.write(out + "\n")

In [11]:
from torchvision.models import ResNet18_Weights, resnet18
net = resnet18()

# move to device
net.to(device)

# print the number of parameters
print(f"number of parameters: {sum(p.numel() for p in net.parameters() if p.requires_grad) / 1_000_000:.2f}M")



file_name = "test_output.txt"

optim_name = "SGD" # AdamW
optim_kwargs = {"lr": 0.1, "momentum": 0.9, "weight_decay": 5e-4}

ops = train(net,file_name,optim_name,optim_kwargs,'CosineAnnealingLR')

number of parameters: 11.69M


KeyboardInterrupt: 

数据增强，
优化器，
lr策略：warmup +cons
模型：res，BN，
正则： maxpool ，dropout

todo： early stop， global average