In [12]:
%%writefile data_setup.py
"""
load data and preprocess
create dataloader
visualize data
"""
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
import numpy as np
import matplotlib.pyplot as plt
import os
from pathlib import Path
import requests
import zipfile

NUM_WORKERS = os.cpu_count()
    
# create dataloader
def create_dataLoader(train_dir, test_dir, transforms, batch_size, num_workers=NUM_WORKERS):
    """
    create train and test dataloader

    Arg:
        train_dir: train data directory
        test_dir: test data directory
        transforms: torchvision transform to perform train and test data
        batch_size:number of sample per batch in DataLoader.
        num_workers: number of workers per DataLoader.

    Returns:
        A tuple of (train_dataloader, test_dataloader, class_names)
        class_names is a list of target classes
    """


    # load data
    train_data = datasets.ImageFolder(
        root=train_dir,
        transform=transforms
        )
    
    test_data = datasets.ImageFolder(
        root=test_dir,
        transform=transforms
    )

    class_names = train_data.classes

    # turn dataset to dataloader
    train_dataloader = DataLoader(
        dataset=train_data,
        batch_size=batch_size,
        shuffle=True,
        num_workers=num_workers,
        pin_memory=True
    )

    test_dataloader = DataLoader(
        dataset=test_data,
        batch_size=batch_size,
        shuffle=False,
        num_workers=num_workers,
        pin_memory=True
    )
    return train_dataloader, test_dataloader, class_names

def vizData():
    return 0

Overwriting data_setup.py


In [13]:
%%writefile train.py
"""
function for train and evaluate model
"""
from typing import Tuple, List, Dict
import torch
import torch.types
from tqdm.auto import tqdm

def train_step(
        model: torch.nn.Module,
        train_dataloader: torch.utils.data.DataLoader,
        loss_fn: torch.nn.Module,
        optimizer: torch.optim.Optimizer,
        device: torch.device) -> Tuple[float,float]:
    """
    trains a pytorch model for a single epoch

    train mode -> forward -> loss -> backward -> gd

    Args:
    model: A PyTorch model to be trained.
    dataloader: A DataLoader instance for the model to be trained on.
    loss_fn: A PyTorch loss function to minimize.
    optimizer: A PyTorch optimizer to help minimize the loss function.
    device: A target device to compute on (e.g. "cuda" or "cpu").

    Returns:
    A tuple of training loss and training accuracy metrics.
    In the form (train_loss, train_accuracy).
    """
    model.train()
    train_loss, train_acc = 0,0
    for batch, (X, y) in enumerate(train_dataloader):
        # data to target device
        X, y = X.to(device), y.to(device)

        # forward
        train_logits = model(X)

        # loss and acc
        loss = loss_fn(train_logits, y)
        train_loss += loss.item()
        train_label = train_logits.argmax(dim=1)
        train_acc += (train_label == y).sum().item()/len(y)

        # zero grad
        optimizer.zero_grad()
        # backward
        loss.backward()
        # gd
        optimizer.step()
    
    train_loss /= len(train_dataloader)
    train_acc /= len(train_dataloader)

    return train_loss, train_acc


def test_step(
        model:torch.nn.Module,
        test_dataloader:torch.utils.data.DataLoader,
        loss_fn:torch.nn.Module,
        device:torch.device) -> Tuple[float, float]:
    
    """
    test a pytorch model for a single epoch

    eval mode -> forward -> loss

    Args:
    model: A PyTorch model to be tested.
    dataloader: A DataLoader instance for the model to be tested on.
    loss_fn: A PyTorch loss function.
    device: A target device to compute on (e.g. "cuda" or "cpu").

    Returns:
    A tuple of testing loss and testing accuracy metrics.
    In the form (test_loss, test_accuracy).
    """
    # turn off gradient tracking
    model.eval()
    test_loss, test_acc = 0,0
    with torch.inference_mode():        
        for batch, (X, y) in enumerate(test_dataloader):
            X, y = X.to(device), y.to(device)

            test_logits = model(X)
            # test loss
            
            loss = loss_fn(test_logits, y)
            test_loss += loss.item()

            test_label = test_logits.argmax(dim=1)
            # test acc
            test_acc += (test_label==y).sum().item()/len(y)
        
        test_loss /= len(test_dataloader)
        test_acc /= len(test_dataloader)

    return test_loss, test_acc


def train(
        model:torch.nn.Module,
        train_dataloader:torch.utils.data.DataLoader,
        test_dataloader:torch.utils.data.DataLoader,
        loss_fn:torch.nn.Module,
        optimizer:torch.optim.Optimizer,
        epochs:int,
        device:torch.device) -> Dict[str, List[float]]:
    """Trains and tests a PyTorch model.

    Passes a target PyTorch models through train_step() and test_step()
    functions for a number of epochs, training and testing the model
    in the same epoch loop.

    Calculates, prints and stores evaluation metrics throughout.

    Args:
    model: A PyTorch model to be trained and tested.
    train_dataloader: A DataLoader instance for the model to be trained on.
    test_dataloader: A DataLoader instance for the model to be tested on.
    optimizer: A PyTorch optimizer to help minimize the loss function.
    loss_fn: A PyTorch loss function to calculate loss on both datasets.
    epochs: An integer indicating how many epochs to train for.
    device: A target device to compute on (e.g. "cuda" or "cpu").

    Returns:
    A dictionary of training and testing loss as well as training and
    testing accuracy metrics. Each metric has a value in a list for 
    each epoch.
    In the form: {train_loss: [...],
              train_acc: [...],
              test_loss: [...],
              test_acc: [...]}
    """

    # result dictionary
    result = {
        'train_loss':[],
        'train_acc':[],
        'test_loss':[],
        'test_acc':[]}
    
    for epoch in tqdm(range(epochs)):
        # train
        train_loss, train_acc = train_step(model, train_dataloader, loss_fn, optimizer, device)

        # test
        test_loss, test_acc = test_step(model, test_dataloader, loss_fn, device)

        # print result
        print(f'Epoch [{epoch+1}/{epochs}] | train loss: {train_loss:.4f}, train acc: {train_acc*100:.2f}% | test loss: {test_loss:.4f}, test acc: {test_acc*100:.2f}%')
        
        # save result
        result['train_loss'].append(train_loss)
        result['train_acc'].append(train_acc)
        result['test_loss'].append(test_loss)
        result['test_acc'].append(test_acc)
    
    return result




Overwriting train.py


In [14]:
%%writefile ResNet.py
"""
paper and code replicating
model file
"""
import torch
from torch import nn
import numpy as np

# 残差块类
class block(nn.Module):
    def __init__(self, in_channels, intermediate_channels, indentity_downsample=None, stride=1):
        super(block, self).__init__()
        self.expansion = 4 # 输出的channel数是进入时的4倍
        self.conv1 = nn.Conv2d(in_channels, intermediate_channels, kernel_size=1, stride=1, padding=0)
        self.bn1 = nn.BatchNorm2d(intermediate_channels)
        self.conv2 = nn.Conv2d(intermediate_channels, intermediate_channels, kernel_size=3, stride=stride, padding=1)
        self.bn2 = nn.BatchNorm2d(intermediate_channels)
        self.conv3 = nn.Conv2d(intermediate_channels, intermediate_channels*self.expansion, kernel_size=1, stride=1, padding=0)
        self.bn3 = nn.BatchNorm2d(intermediate_channels*self.expansion)
        self.relu = nn.ReLU()
        self.indentity_downsample = indentity_downsample

    def forward(self, x):
        identity = x

        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x = self.relu(x)
        x = self.conv3(x)
        x = self.bn3(x)
        
        if self.indentity_downsample is not None: # 第一块残差块会高宽减半，减少特征图的尺寸，后续的残差块就不会
            identity = self.indentity_downsample(identity)

        x += identity
        x = self.relu(x)
        return x

# resnet类
class ResNet(nn.Module):
    def __init__(self, block, layers, image_channels, num_classes): # [残差块数量， 残差块层数， input channels, 输出类别数量] eg. resnet-50: layers-> [3,4,6,3]
        super(ResNet, self).__init__()
        # conv1: change image channel to 64
        self.in_channels = 64 # 进入残差块的channel
        self.conv1 = nn.Conv2d(image_channels, self.in_channels, kernel_size=7, stride=2, padding=3)
        self.bn1 = nn.BatchNorm2d(self.in_channels)
        self.relu = nn.ReLU()
        self.maxpool = nn.MaxPool2d(kernel_size=3, stride=2, padding=1)

        # resnet layer(conv2-5)
        self.layer1 = self._make_layer(block, layers[0], intermediate_channels=64, stride=1)
        self.layer2 = self._make_layer(block, layers[1], intermediate_channels=128, stride=2)
        self.layer3 = self._make_layer(block, layers[2], intermediate_channels=256, stride=2)
        self.layer4 = self._make_layer(block, layers[3], intermediate_channels=512, stride=2) # 最后的intermediateput_channel=512*4
        # average pool and fully connect
        self.avgpool = nn.AdaptiveAvgPool2d((1,1))
        self.fc = nn.Linear(512*4, num_classes)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)
        x = self.avgpool(x)
        x = x.reshape(x.shape[0], -1)
        x = self.fc(x)
        return x



    # resnet layer function
    def _make_layer(self, block, num_residual_block, intermediate_channels, stride):
        identity_downsample = None
        layers = []

        if stride != 1 or self.in_channels != intermediate_channels * 4:
            identity_downsample = nn.Sequential(
                nn.Conv2d(self.in_channels, intermediate_channels*4, kernel_size=1, stride=stride),
                nn.BatchNorm2d(intermediate_channels*4)
            )

        layers.append(block(self.in_channels, intermediate_channels, identity_downsample, stride)) # intermediate_channel = 64
        self.in_channels = intermediate_channels*4 # 256

        for i in range(num_residual_block-1):
            # intermediate_channel是64, 但是在最后一次conv后的intermediate_channel会x4
            layers.append(block(self.in_channels, intermediate_channels)) # 256 -> 64 -> 256
        
        return nn.Sequential(*layers)
    
def ResNet50(img_channels=3, num_classes=1000):
    return ResNet(block, [3,4,6,3], img_channels, num_classes)

def ResNet101(img_channels=3, num_classes=1000):
    return ResNet(block, [3,4,23,3], img_channels, num_classes)

def ResNet152(img_channels=3, num_classes=1000):
    return ResNet(block, [3,8,36,3], img_channels, num_classes)


def check_output_shape(num_classes):
    net = ResNet50(num_classes=num_classes)
    x = torch.rand(2,3,224,224)
    y = net(x)
    print(y.shape)


Overwriting ResNet.py


In [15]:
%%writefile utils.py
"""
utils funciton
save model, load model,
"""
from pathlib import Path
import torch
from typing import Dict, List
import matplotlib.pyplot as plt
import pandas as pd

def save_model(model:torch.nn.Module,
               dir:str,
               model_name:str):
    """
    save model to target directory

    Args:
    model: A target PyTorch model to save.
    target_dir: A directory for saving the model to.
    model_name: A filename for the saved model. Should include
      either ".pth" or ".pt" as the file extension.
    """

    # create directory
    path_name = Path(dir)
    path_name.mkdir(parents=True, exist_ok=True)

    # create model path
    assert model_name.endswith('.pth') or model_name.endswith('.pt'), 'model name must ends with .pth or .pt'
    model_path = path_name / model_name

    # save model
    print(f'save model to {model_path}')
    torch.save(obj=model.state_dict(), f=model_path)


def plot_loss_curve(results: Dict[str, List[float]]):
    results = pd.DataFrame(results)
    train_loss = results['train_loss']
    train_acc = results['train_acc']
    test_loss = results['test_loss']
    test_acc = results['test_acc']
    plt.figure(figsize=(15,5))
    epoch = range(len(results))
    plt.subplot(1,2,1)
    plt.plot(epoch, train_loss, label='train loss')
    plt.plot(epoch, test_loss, label='test loss')
    plt.title('Loss')
    plt.xlabel('epoch')
    plt.legend()

    plt.subplot(1,2,2)
    plt.plot(epoch, train_acc, label='train accuracy')
    plt.plot(epoch, test_acc, label='test accuracy')
    plt.title('Accuracy')
    plt.xlabel('epoch')
    plt.legend()


Overwriting utils.py


In [None]:
%%writefile resnet/test.py
"""
main file
1. load data
2. model
3. train
4. test and eval
5. save
"""
import torch
from torch import nn
import os
from torchvision import transforms, datasets
from torchvision.transforms import ToTensor
import data_setup, train, ResNet, utils
from torch.utils.data import DataLoader
import torchvision.models as models
import pandas as pd


# device agnostic code
device = 'cuda' if torch.cuda.is_available() else 'cpu'
# hyperparameters
EPOCHS = 10
BATCH_SIZE = 128
NUM_WORKERS = 4
LR = 0.001
HIDDEN_UNITS = 0



# data transforms
transform = transforms.Compose([
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    # transforms.TrivialAugmentWide(num_magnitude_bins=31) # data augmentation
    # 归一化
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

# load data

train_data = datasets.Food101(
    root='resnet/data/food101',
    split='train',
    transform=transform,
    target_transform=False,
    download=False
)

test_data = datasets.Food101(
    root='resnet/data/food101',
    split='test',
    transform=transform,
    download=False)


train_dataloader = DataLoader(
    dataset=train_data,
    batch_size=BATCH_SIZE,
    shuffle=True,
    num_workers=NUM_WORKERS,
    pin_memory=True
)

test_dataloader = DataLoader(
    dataset=test_data,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=NUM_WORKERS,
    pin_memory=True
)
class_names = train_data.classes
#print(class_names)
#print(len(train_dataloader.dataset))

# samll dataset
'''
data_file = "https://github.com/mrdbourke/pytorch-deep-learning/raw/refs/heads/main/data/pizza_steak_sushi.zip"
train_dir = 'resnet/data/pizza_steak_sushi/train'
test_dir = 'resnet/data/pizza_steak_sushi/test'

# train_dataloader, test_dataloader, class_names = data_setup.create_dataLoader(train_dir, test_dir, transform, BATCH_SIZE, NUM_WORKERS)
'''


# load model and pretrained model
pretrained_resnet50 = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
in_features = pretrained_resnet50.fc.in_features  
pretrained_resnet50.fc = nn.Linear(in_features, 101)

resnet50model = ResNet.ResNet50(num_classes=len(class_names)).to(device)
resnet50model.load_state_dict(pretrained_resnet50.state_dict(), strict=False)

# loss func and optimier
loss_fn = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(params=resnet50model.parameters(), lr=LR)

results = data_setup.train(resnet50model, train_dataloader, test_dataloader, loss_fn, optimizer, EPOCHS, device)
print(results)

utils.plot_loss_curve(results)

utils.save_model(resnet50model, 'resnet/models', 'resnet50model.pth')


  0%|          | 0/10 [00:00<?, ?it/s]

Epoch [1/10] | train loss: 2.6287, train acc: 36.57% | test loss: 1.6784, test acc: 55.95%
Epoch [2/10] | train loss: 1.3851, train acc: 63.76% | test loss: 1.3240, test acc: 64.36%
Epoch [3/10] | train loss: 1.0038, train acc: 72.87% | test loss: 1.1774, test acc: 68.30%
Epoch [4/10] | train loss: 0.7372, train acc: 79.55% | test loss: 1.0877, test acc: 70.97%
Epoch [5/10] | train loss: 0.5380, train acc: 84.61% | test loss: 1.1520, test acc: 70.20%
Epoch [6/10] | train loss: 0.3789, train acc: 88.79% | test loss: 1.1733, test acc: 71.10%
Epoch [7/10] | train loss: 0.2707, train acc: 91.90% | test loss: 1.2840, test acc: 70.34%
Epoch [8/10] | train loss: 0.2075, train acc: 93.63% | test loss: 1.1639, test acc: 72.43%
Epoch [9/10] | train loss: 0.1698, train acc: 94.79% | test loss: 1.2927, test acc: 70.87%
Epoch [10/10] | train loss: 0.1451, train acc: 95.56% | test loss: 1.3143, test acc: 71.28%
{'train_loss': [2.628661970230373, 1.3850915690732968, 1.003831155296113, 0.7372214543960