In [None]:
import torch
from torchvision import datasets, models, transforms

In [None]:
# Optimization Setting
AUTOCAST_FLAG = False
COMPILE_FLAG = False

# Dataloader Setting
batch_size = 100
eval_batch_size = 1000
num_workers = 2
PIN_MEMORY_FLAG = True
PERSISTENT_WORKERS_FLAG = True

# Training Setting
epochs = 5
## loss function
lr = 1e-1
momentum = 0.9
weight_decay = 1e-4
## lr scheduler
step_size = 25
gamma = 0.2

In [None]:
root = '~/.pytorch/datasets/'
device = torch.device(
    f'cuda:{torch.cuda.device_count() - 1}' if torch.cuda.is_available() else 'cpu'
)
if device.type == 'cuda':
    torch.cuda.set_device(device)
    torch.backends.cudnn.benchmark = True
print(f'Device: {device}, Type: {device.type}')

In [None]:
def load_data(
    root: str,
    batch_size: int,
    eval_batch_size: int,
    num_workers: int = 2,
    PIN_MEMORY_FLAG: bool = True,
    PERSISTENT_WORKERS_FLAG: bool = True,
):
    # for cifar-10
    #mean = torch.tensor([125.3, 123.0, 113.9]) / 255
    #std = torch.tensor([63.0, 62.1, 66.7]) / 255
    # for cifar-100
    mean = torch.tensor([129.3, 124.1, 112.4]) / 255
    std = torch.tensor([68.2, 65.4, 70.4]) / 255

    transform = {
        'train': transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize(mean, std),
            transforms.RandomCrop(32, padding=4, padding_mode='constant'),
            transforms.RandomHorizontalFlip()
        ]),
        'eval': transforms.Compose([
            transforms.ToTensor(),
            transforms.Normalize(mean, std)
        ])
    }

    dataset = {
        'train': datasets.CIFAR100(
            root=root, train=True, download=True, transform=transform['train']
        ),
        'test': datasets.CIFAR100(
            root=root, train=False, download=True, transform=transform['eval']
        )
    }

    dataloader = {
        'train': torch.utils.data.DataLoader(
            dataset['train'],
            batch_size=batch_size,
            shuffle=True,
            num_workers=num_workers,
            pin_memory=PIN_MEMORY_FLAG,
            persistent_workers=PERSISTENT_WORKERS_FLAG
        ),
        'test': torch.utils.data.DataLoader(
            dataset['test'],
            batch_size=eval_batch_size,
            num_workers=num_workers,
            pin_memory=PIN_MEMORY_FLAG,
            persistent_workers=PERSISTENT_WORKERS_FLAG
        )
    }
    
    return dataset, dataloader

In [None]:
# Models
## Original Model
class BasicBlock(torch.nn.Module):
    def __init__(self, inplanes: int, planes: int, down: bool = False) -> None:
        super().__init__()
        self.down = down
        self.conv1 = (
            torch.nn.Conv2d(inplanes, planes, 3, stride=2, padding=1, bias=False) if down
            else torch.nn.Conv2d(inplanes, planes, 3, padding='same', bias=False)
        )
        self.bn1 = torch.nn.BatchNorm2d(planes)
        self.relu = torch.nn.ReLU(inplace=True)
        self.conv2 = torch.nn.Conv2d(planes, planes, 3, padding='same', bias=False)
        self.bn2 = torch.nn.BatchNorm2d(planes)
        if self.down:
            self.downsample = torch.nn.Sequential(
                torch.nn.Conv2d(inplanes, planes, 1, stride=2, bias=False),
                torch.nn.BatchNorm2d(planes)
            )
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        identity = x
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        if self.down:
            identity = self.downsample(x)
        out += identity
        out = self.relu(out)
        return out

class BottleNeck(torch.nn.Module):
    def __init__(self) -> None:
        super().__init__()
        # not complete yet, cause it does not used in cifar resnet 

class CIFAR_ResNet(torch.nn.Module):
    def __init__(self, num_classes: int = 100) -> None:
        super().__init__()
        self.inplanes = 16
        self.planes = self.inplanes
        self.stem = torch.nn.Sequential(
            torch.nn.Conv2d(3, self.inplanes, kernel_size=3, padding='same', bias=False),
            torch.nn.BatchNorm2d(self.inplanes),
            torch.nn.ReLU(inplace=True)
        )
        self.layer1 = torch.nn.Sequential(
            BasicBlock(self.inplanes, self.planes),
            BasicBlock(self.planes, self.planes),
            BasicBlock(self.planes, self.planes),
        )
        self.inplanes = self.planes
        self.planes *= 2
        self.layer2 = torch.nn.Sequential(
            BasicBlock(self.inplanes, self.planes, down=True),
            BasicBlock(self.planes, self.planes),
            BasicBlock(self.planes, self.planes),
        )
        self.inplanes = self.planes
        self.planes *= 2
        self.layer3 = torch.nn.Sequential(
            BasicBlock(self.inplanes, self.planes, down=True),
            BasicBlock(self.planes, self.planes),
            BasicBlock(self.planes, self.planes),
        )
        self.avgpool = torch.nn.AdaptiveAvgPool2d(output_size=(1, 1))
        self.fc = torch.nn.Linear(64, num_classes)
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.stem(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x

## Lazy Model
class LazyBasicBlock(torch.nn.Module):
    def __init__(self, planes: int, down: bool = False) -> None:
        super().__init__()
        self.down = down
        self.conv1 = (
            torch.nn.LazyConv2d(planes, 3, stride=2, padding=1, bias=False) if down
            else torch.nn.LazyConv2d(planes, 3, padding='same', bias=False)
        )
        self.bn1 = torch.nn.LazyBatchNorm2d()
        self.relu = torch.nn.ReLU(inplace=True)
        self.conv2 = torch.nn.LazyConv2d(planes, 3, padding='same', bias=False)
        self.bn2 = torch.nn.LazyBatchNorm2d()
        if self.down:
            self.downsample = torch.nn.Sequential(
                torch.nn.LazyConv2d(planes, 1, stride=2, bias=False),
                torch.nn.LazyBatchNorm2d()
            )
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        identity = x
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        out = self.conv2(out)
        out = self.bn2(out)
        if self.down:
            identity = self.downsample(x)
        out += identity
        out = self.relu(out)
        return out

class LazyBottleNeck(torch.nn.Module):
    def __init__(self) -> None:
        super().__init__()
        # not complete yet, cause it does not used in cifar resnet 

class Lazy_CIFAR_ResNet(torch.nn.Module):
    def __init__(self, num_classes: int = 100) -> None:
        super().__init__()
        self.planes = 16
        self.stem = torch.nn.Sequential(
            torch.nn.LazyConv2d(self.planes, kernel_size=3, padding='same', bias=False),
            torch.nn.LazyBatchNorm2d(),
            torch.nn.ReLU(inplace=True)
        )
        self.layer1 = torch.nn.Sequential(
            LazyBasicBlock(self.planes),
            LazyBasicBlock(self.planes),
            LazyBasicBlock(self.planes),
        )
        self.planes *= 2
        self.layer2 = torch.nn.Sequential(
            LazyBasicBlock(self.planes, down=True),
            LazyBasicBlock(self.planes),
            LazyBasicBlock(self.planes),
        )
        self.planes *= 2
        self.layer3 = torch.nn.Sequential(
            LazyBasicBlock(self.planes, down=True),
            LazyBasicBlock(self.planes),
            LazyBasicBlock(self.planes),
        )
        self.avgpool = torch.nn.AdaptiveAvgPool2d(output_size=(1, 1))
        self.fc = torch.nn.LazyLinear(num_classes)
    def forward(self, x: torch.Tensor) -> torch.Tensor:
        x = self.stem(x)
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)
        return x

In [None]:
dataset, dataloader = load_data(root=root, batch_size=batch_size, eval_batch_size=eval_batch_size)

In [None]:
# modify from imagenet version, poor accuracy
#model = models.resnet18()
#model.maxpool = torch.nn.Sequential()
#model.fc = torch.nn.Linear(in_features=512, out_features=100, bias=True)
#model = model.to(device)

model = Lazy_CIFAR_ResNet()
model = model.to(device)

In [None]:
scaler = torch.cuda.amp.GradScaler(
    enabled=True if device.type=='cuda' and AUTOCAST_FLAG else False
)
# compile_mode: 'default', 'reduce-overhead', 'max-autotune'
model = torch.compile(model, mode='default', fullgraph=True, disable=not COMPILE_FLAG)

In [None]:
criterion = torch.nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(
    model.parameters(),
    lr=lr,
    momentum=momentum,
    weight_decay=weight_decay
)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=step_size, gamma=gamma)

In [None]:
def train_step(model, dataset, dataloader, AUTOCAST_FLAG=False):
    record_loss, record_acc = 0, 0
    model.train()
    for i, data in enumerate(dataloader):
        # load data
        inputs = data[0].to(device, non_blocking=True)
        labels = data[1].to(device, non_blocking=True)
        # compute
        optimizer.zero_grad()
        with torch.autocast(device.type, enabled=AUTOCAST_FLAG):
            outputs = model(inputs)
            loss = criterion(outputs, labels)
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        # record
        predict_labels = torch.max(outputs, dim=1).indices
        record_loss += loss.item()
        record_acc += torch.sum(labels==predict_labels).item()
    record_loss /= len(dataloader)
    record_acc /= len(dataset)
    return record_loss, record_acc

def eval_step(model, dataset, dataloader, AUTOCAST_FLAG=False):
    record_loss, record_acc = 0, 0
    model.eval()
    with torch.no_grad():
        for i, data in enumerate(dataloader):
            # load data
            inputs = data[0].to(device, non_blocking=True)
            labels = data[1].to(device, non_blocking=True)
            # compute
            with torch.autocast(device.type, enabled=AUTOCAST_FLAG):
                outputs = model(inputs)
                loss = criterion(outputs, labels)
            # record
            predict_labels = torch.max(outputs, dim=1).indices
            record_loss += loss.item()
            record_acc += torch.sum(labels==predict_labels).item()
    record_loss /= len(dataloader)
    record_acc /= len(dataset)
    return record_loss, record_acc

In [None]:
def timed(fn):
    start = torch.cuda.Event(enable_timing=True)
    end = torch.cuda.Event(enable_timing=True)
    start.record()
    result = fn()
    end.record()
    torch.cuda.synchronize()
    return result, start.elapsed_time(end) / 1000

In [None]:
for epoch in range(epochs):
    # train
    result, time_cost = timed(
        lambda: train_step(model, dataset['train'], dataloader['train'], AUTOCAST_FLAG)
    )
    train_loss, train_acc = result
    # eval
    test_loss, test_acc = eval_step(model, dataset['test'], dataloader['test'], AUTOCAST_FLAG)
    # print results
    print('----')
    print(f'epoch {epoch}')
    print(f'AUTOCAST: {AUTOCAST_FLAG}, COMPILE: {COMPILE_FLAG}')
    print(f'time_cost: {time_cost}')
    print(f'batch_size: {batch_size}')
    print(f'learning_rate: {scheduler.get_last_lr()}')
    print(f'train_loss: {train_loss}, train_acc: {train_acc}')
    print(f'test_loss: {test_loss}, test_acc: {test_acc}')
    print('----')
    # scheduler
    scheduler.step()