In [None]:
from __future__ import print_function, division

import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
from torchsummary import summary
import torchvision.transforms.functional as F
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, Subset
import numpy as np
import torchvision
from torchvision import models, transforms
from torchvision.datasets.folder import make_dataset
from PIL import Image
import matplotlib.pyplot as plt
import time
import os
import copy
from sklearn.model_selection import train_test_split
from sklearn.svm import SVC
from sklearn.metrics import classification_report

%matplotlib inline
plt.ion()   # interactive mode

## 1. Loading data


In [None]:
# Define the dataset class
class sg_food_dataset(torch.utils.data.dataset.Dataset):
    def __init__(self, root, class_id, transform=None):
        self.class_id = class_id
        self.root = root
        all_classes = sorted(entry.name for entry in os.scandir(root) if entry.is_dir())
        if not all_classes:
            raise FileNotFoundError(f"Couldn't find any class folder in {directory}.")
        self.classes = [all_classes[x] for x in class_id]
        self.class_to_idx = {cls_name: i for i, cls_name in enumerate(self.classes)}

        self.samples = make_dataset(self.root, self.class_to_idx, extensions=('jpg'))
        self.transform = transform

    def __len__(self):
        return len(self.samples)

    def __getitem__(self, idx):
        path, target = self.samples[idx]
        with open(path, "rb") as f:
            sample = Image.open(f).convert('RGB')
        if self.transform is not None:
            sample = self.transform(sample)
        return sample, target


In [None]:
# Data augmentation and normalization for training
data_transforms = {
    'train': transforms.Compose([
        # Define data preparation operations for training set here.
        # Tips: Use torchvision.transforms
        #       https://pytorch.org/vision/stable/transforms.html
        #       Normally this should at least contain resizing (Resize) and data format converting (ToTensor).
        transforms.RandomResizedCrop(224),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) # ImageNet prior
    ]),
    'val': transforms.Compose([
        # Define data preparation operations for testing/validation set here.
        transforms.Resize(256),
        transforms.CenterCrop(224),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]) # ImageNet prior
    ]),
}

data_dir = '/kaggle/input/sg-food/sg_food' 
subfolder = {'train': 'train', 'val': 'val'}

# Define the dataset
selected_classes = [0, 2, 4, 7, 9]
n_classes = len(selected_classes)
image_datasets = {x: sg_food_dataset(root=os.path.join(data_dir, subfolder[x]),
                                     class_id=selected_classes,
                                     transform=data_transforms[x]) 
                  for x in ['train', 'val']}
class_names = image_datasets['train'].classes
print('selected classes:\n    id: {}\n    name: {}'.format(selected_classes, class_names))

# Define the dataloader
batch_size = 64
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=batch_size,
                                             shuffle=True, num_workers=0)
              for x in ['train', 'val']}

dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val']}

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
device

## 2. Visualizing the dataset
Fetch a batch of training data from the dataset and visualize them. 



In [None]:
def imshow(inp, title=None):
    """Imshow for Tensor."""
    inp = inp.numpy().transpose((1, 2, 0))
    mean = np.array([0.485, 0.456, 0.406])
    std = np.array([0.229, 0.224, 0.225])
    inp = std * inp + mean
    inp = np.clip(inp, 0, 1)
    plt.imshow(inp)
    if title is not None:
        plt.title(title)
    plt.pause(0.001)  # pause a bit so that plots are updated


# Get a batch of training data
inputs, classes = next(iter(dataloaders['train']))

# Make a grid from batch
out = torchvision.utils.make_grid(inputs[:4])

imshow(out, title=[class_names[x] for x in classes[:4]])

## 3. Model Initailization

### 3.1 Define Models

In [None]:
# Load the pre-trained VGG16 model
vgg16 = models.vgg16(pretrained=True)

# Load the pre-trained ResNet-18 model
resnet18 = models.resnet18(pretrained=True)

# Load the pre-trained Inception v3 model
inception_v3 = models.inception_v3(pretrained=True)

### 3.2 Modify Models

In [None]:
# Set num of classes
num_classes = 5

# Modify VGG16
vgg16.classifier[6] = nn.Linear(vgg16.classifier[6].in_features, num_classes)

# Modify ResNet18
resnet18.fc = nn.Linear(resnet18.fc.in_features, num_classes)

# Modify Inception_v3
inception_v3.fc = nn.Linear(inception_v3.fc.in_features, num_classes)
inception_v3.AuxLogits.fc = nn.Linear(inception_v3.AuxLogits.fc.in_features, num_classes)

In [None]:
# Summary

# VGG16
summary(vgg16.to(device), input_size=(3, 224, 224))

In [None]:
# ResNet18
summary(resnet18.to(device), input_size=(3, 224, 224))

In [None]:
# Inception_v3
summary(inception_v3.to(device), input_size=(3, 299, 299))

### 3.3 Dict for Models

In [None]:
models_dict = {
    "VGG16": vgg16,
    "ResNet18": resnet18,
    "Inception_v3": inception_v3
}

## 4. Training

In [None]:
def train_model(model, dataloaders, criterion, optimizer, num_epochs, is_inception=False):
    since = time.time()

    val_acc_history = []
    
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch + 1, num_epochs))
        print('-' * 10)

        # Each epoch has a training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()  # Set model to training mode
            else:
                model.eval()   # Set model to evaluate mode

            running_loss = 0.0
            running_corrects = 0                

            # Iterate over data.
            for inputs, labels in dataloaders[phase]:
                
                if is_inception and phase == 'train':
                    inputs = torch.stack([F.resize(input, size=(299, 299)) for input in inputs])

                inputs = inputs.to(device)
                labels = labels.to(device)

                # zero the parameter gradients
                optimizer.zero_grad()

                # forward
                # track history if only in train
                with torch.set_grad_enabled(phase == 'train'):
                    # Special case for inception because in training it has an auxiliary output. In train
                    #   mode we calculate the loss by summing the final output and the auxiliary output
                    #   but in testing we only consider the final output.
                    if is_inception and phase == 'train':
                        outputs, aux_outputs = model(inputs)
                        loss1 = criterion(outputs, labels)
                        loss2 = criterion(aux_outputs, labels)
                        loss = loss1 + 0.4*loss2
                    else:
                        outputs = model(inputs)
                        loss = criterion(outputs, labels)

                    _, preds = torch.max(outputs, 1)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)

            print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))

            # deep copy the model
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())
            if phase == 'val':
                val_acc_history.append(epoch_acc)

        print()

    time_elapsed = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(time_elapsed // 60, time_elapsed % 60))
    print('Best val Acc: {:4f}'.format(best_acc))

    # load best model weights
    model.load_state_dict(best_model_wts)
    return model, val_acc_history


In [None]:
def visualize_training(val_acc_history):
    plt.figure()
    # 确保将 tensor 转移到 CPU
    val_acc_history_cpu = [h.cpu() for h in val_acc_history]
    plt.plot(val_acc_history_cpu)
    plt.xlabel("Epoch")
    plt.ylabel("Validation Accuracy")
    plt.title("Training History")
    plt.show()


In [None]:
# Loop through each model for training
for model_name, model in models_dict.items():
    print(f"Training {model_name}...")
    
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    model = model.to(device)
    
    # Define Criterion
    criterion = nn.CrossEntropyLoss()
    
    # Define Optimizer
    optimizer = torch.optim.SGD(model.parameters(), lr=0.001, momentum=0.9)
    
    # Inception_v3
    is_inception = (model_name == "Inception_v3")
    
    trained_model, val_acc_history = train_model(model, dataloaders, criterion, optimizer, num_epochs=25, is_inception=is_inception)
    
    # Visualize Training Result
    visualize_training(val_acc_history)
    
    # Save Checkpoints
    torch.save(trained_model.state_dict(), f"/kaggle/working/{model_name}_model.pth")

### advanced 1

In [None]:
# Load ResNet18
model = models.resnet18(pretrained=False)

# Modify Model, Remove FC Layer
num_ftrs = model.fc.in_features
model.fc = torch.nn.Identity()

# Load pretrained weight
model.load_state_dict(torch.load("ResNet18_model.pth"),strict = False)
model.eval()

In [None]:
class sg_food_dataset(torch.utils.data.Dataset):
    def __init__(self, root, class_id, transform=None):
        self.class_id = class_id
        self.root = root
        all_classes = sorted(entry.name for entry in os.scandir(root) if entry.is_dir())
        if not all_classes:
            raise FileNotFoundError(f"Couldn't find any class folder in {directory}.")
        
        # 创建新的类别列表，包括指定的类别和一个"Other"类别
        self.classes = [all_classes[x] for x in class_id] + ['Other']
        self.class_to_idx = {cls_name: i for i, cls_name in enumerate(self.classes[:-1])}
        self.class_to_idx['Other'] = len(self.classes) - 1  # 'Other'类别的索引

        # 更新make_dataset函数调用，以支持"Other"类别
        self.samples = self.make_dataset(self.root, self.class_to_idx, extensions=('jpg'), class_id=class_id)
        self.transform = transform

    def make_dataset(self, directory, class_to_idx, extensions=None, class_id=None):
        instances = []
        directory = os.path.expanduser(directory)
        both_none = extensions is None
        if not both_none:
            def is_valid_file(x):
                return x.lower().endswith(extensions)
        for target_class in sorted(class_to_idx.keys()):
            class_index = class_to_idx[target_class]
            target_dir = os.path.join(directory, target_class)
            if not os.path.isdir(target_dir):
                continue
            for root, _, fnames in sorted(os.walk(target_dir, followlinks=True)):
                for fname in sorted(fnames):
                    path = os.path.join(root, fname)
                    if both_none or is_valid_file(path):
                        # 若当前类别不在selected_classes中，将其归为"Other"
                        if class_index not in class_id:
                            instances.append((path, class_to_idx['Other']))
                        else:
                            instances.append((path, class_index))
        return instances



In [None]:

# 数据目录和子目录保持不变
data_dir = '/kaggle/input/sg-food/sg_food' 
subfolder = {'train': 'train', 'val': 'val'}

# 选定的类别索引不变，因为sg_food_dataset类内部会处理Other类别
selected_classes = [0, 2, 4, 7, 9]

# 定义数据集时保持原有逻辑
image_datasets = {x: sg_food_dataset(root=os.path.join(data_dir, subfolder[x]),
                                     class_id=selected_classes,
                                     transform=data_transforms[x]) 
                  for x in ['train', 'val']}

# 更新类别名称的获取逻辑，现在包括了"Other"类别
class_names = image_datasets['train'].classes  # 现在这里已经包含了"Other"
print('Selected classes:\n    ID: {}\n    Name: {}'.format(selected_classes + ['Other'], class_names))

# 定义数据加载器和设备选择的逻辑保持不变
batch_size = 64
dataloaders = {x: torch.utils.data.DataLoader(image_datasets[x], batch_size=batch_size,
                                             shuffle=True, num_workers=0)
              for x in ['train', 'val']}

dataset_sizes = {x: len(image_datasets[x]) for x in ['train', 'val']}

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
