In [1]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms, datasets, models
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
from tqdm import tqdm

In [2]:
base_dir = '/workspace'
sets = ['training_set', 'test_set']
# 加载预训练的ResNet50模型并修改最后一层
model = models.resnet50(weights=models.ResNet50_Weights.DEFAULT)
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 2)  # 2个类别：0猫1狗
# 如果没 GPU 就 CPU
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = model.to(device)
# 定义损失函数和优化器
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
# TensorBoard
writer = SummaryWriter()
# 注册一个钩子以获取第一个卷积层的特征图
feature_images = []
num_epochs = 100

In [3]:
data_transforms = {
    'training_set': transforms.Compose([
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'test_set': transforms.Compose([
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
}

In [4]:
image_datasets = {x: datasets.ImageFolder(os.path.join(base_dir, x),
                                          data_transforms[x])
                  for x in sets}
dataloaders = {x: DataLoader(image_datasets[x], batch_size=32,
                             shuffle=True, num_workers=4)
               for x in sets}

FileNotFoundError: [Errno 2] No such file or directory: '/workspace/training_set'

In [5]:
def get_features_hook(module, input, output):
    global feature_images
    feature_images = output


hook = model.conv1.register_forward_hook(get_features_hook)

In [ ]:
def main():
    for epoch in range(num_epochs):
        print(f'Epoch {epoch + 1}/{num_epochs}')
        print('-' * 10)

        for phase in sets:
            if phase == 'training_set':
                model.train()
            else:
                model.eval()

            running_loss = 0.0
            running_corrects = 0

            # Iterate over data.
            for inputs, labels in tqdm(dataloaders[phase]):
                inputs = inputs.to(device)
                labels = labels.to(device)

                optimizer.zero_grad()

                with torch.set_grad_enabled(phase == 'training_set'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    if phase == 'training_set':
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / len(image_datasets[phase])
            epoch_acc = running_corrects.double() / len(image_datasets[phase])

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

            # TensorBoard
            writer.add_scalar(f'{phase} Loss', epoch_loss, epoch)
            writer.add_scalar(f'{phase} Acc', epoch_acc, epoch)

            if phase == 'test_set':
                print(f"Summary for Epoch {epoch + 1}:")
                print(f"  Test Loss: {epoch_loss:.4f}, Test Accuracy: {epoch_acc:.4f}")
                if epoch_acc > 0.95:
                    print("Performance is satisfactory.")
                else:
                    print("Consider improving the model or tuning hyperparameters.")

        if (epoch + 1) % 5 == 0:  # 每5个epoch保存一次模型
            torch.save(model.state_dict(), f'cat_dog_classifier_epoch{epoch + 1}.pth')
            print(f"Model saved at epoch {epoch + 1} successfully!")


if __name__ == '__main__':
    main()

writer.close()
# 移除钩子
hook.remove()