In [1]:
import torch, gc
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, ConcatDataset
from torch.utils.data import Dataset

import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torchvision.models as models
import torchvision

from timeit import default_timer as timer


import os
from PIL import Image
import pickle

import numpy as np
import pandas as pd

class SingleFolderDataset(Dataset):
    def __init__(self, folder_path, transform=None):
        self.folder_path = folder_path
        self.image_list = os.listdir(folder_path)
        self.transform = transform

    def __len__(self):
        return len(self.image_list)

    def __getitem__(self, idx):
        img_name = os.path.join(self.folder_path, self.image_list[idx])
        image = Image.open(img_name).convert('RGB')
        
        if self.transform:
            image = self.transform(image)

        return image

In [2]:
# GPU
device = torch.device("cuda")
cpu_device = torch.device("cpu")

# Data Augmentation function

- Resize
- Center Crop
- HorizontalRandomFlip
- Normalization

In [3]:
# data augmentation for efficientnet
torch.manual_seed(1)
# the training transforms
train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                             std=[0.229, 0.224, 0.225]),
])
train_transform_H = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(p=1),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                             std=[0.229, 0.224, 0.225]),
])
# the validation transforms
valid_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], 
                             std=[0.229, 0.224, 0.225]),
])

# Read Data

In [4]:
BATCH_SIZE = 32
# training dataset
train_dataset = datasets.ImageFolder(
    root='train',
    transform=train_transform
)
train_dataset_H = datasets.ImageFolder(
    root='train',
    transform=train_transform_H
)
concatenated_dataset = ConcatDataset([train_dataset, train_dataset_H])

# validation dataset
valid_dataset = datasets.ImageFolder(
    root='valid',
    transform=valid_transform
)

test_dataset = SingleFolderDataset('test', transform=valid_transform)

# training data loaders
train_loader = DataLoader(
    concatenated_dataset, batch_size=BATCH_SIZE, shuffle=True,
    num_workers=0, pin_memory=False
)
# validation data loaders
valid_loader = DataLoader(
    valid_dataset, batch_size=BATCH_SIZE, shuffle=False,
    num_workers=0, pin_memory=False
)
# test
test_num = list()
for file_name in test_dataset.image_list:
    test_num.append(int(file_name.split('.')[0]))
test_indices = sorted(range(len(test_num)), key=lambda k: test_num[k])
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

# Try Pretrained EFFICIENTNET_Bx

In [5]:
# efficient net need to use this pre-trained model, or it performs very bad :(
class CustomResNet50(nn.Module):
    def __init__(self, num_classes):
        super(CustomResNet50, self).__init__()
        self.resnet50 = models.resnet50(pretrained=True)
        self.resnet50.fc = nn.Sequential(
            nn.Linear(self.resnet50.fc.in_features, num_classes),
            nn.Softmax(dim=1)  # 添加 softmax 激活函數
        )

    def forward(self, x):
        return self.resnet50(x)
    
def train(model,
          criterion,
          optimizer,
          train_loader,
          valid_loader,
          save_file_name,
          max_epochs_stop=3,
          n_epochs=5,
          print_every=1,
          train_on_gpu = True,
          Device = None):
    """Train a PyTorch Model

    Params
    --------
        model (PyTorch model): cnn to train
        criterion (PyTorch loss): objective to minimize
        optimizer (PyTorch optimizier): optimizer to compute gradients of model parameters
        train_loader (PyTorch dataloader): training dataloader to iterate through
        valid_loader (PyTorch dataloader): validation dataloader used for early stopping
        save_file_name (str ending in '.pt'): file path to save the model state dict
        max_epochs_stop (int): maximum number of epochs with no improvement in validation loss for early stopping
        n_epochs (int): maximum number of training epochs
        print_every (int): frequency of epochs to print training stats

    Returns
    --------
        model (PyTorch model): trained cnn with best weights
        history (DataFrame): history of train and validation loss and accuracy
    """

    # Early stopping intialization
    epochs_no_improve = 0
    valid_loss_min = np.Inf

    valid_max_acc = 0
    history = []

    # Number of epochs already trained (if using loaded in model weights)
    try:
        print(f'Model has been trained for: {model.epochs} epochs.\n')
    except:
        model.epochs = 0
        print(f'Starting Training from Scratch.\n')

    overall_start = timer()

    # Main loop
    for epoch in range(n_epochs):

        # keep track of training and validation loss each epoch
        train_loss = 0.0
        valid_loss = 0.0

        train_acc = 0
        valid_acc = 0

        # Set to training
        model.train()
        start = timer()
        
        sample_of_train = 0

        # Training loop
        for ii, (data, target) in enumerate(train_loader):
            # Tensors to gpu
            if train_on_gpu:
                data, target = data.to(Device), target.to(Device)

            # Clear gradients
            optimizer.zero_grad()
            # Predicted outputs are log probabilities
            output = model(data)

            # Loss and backpropagation of gradients
            #loss = criterion(output, target)
            loss = criterion(output, target)
            loss.backward()
            #loss.requres_grad = True

            # Update the parameters
            optimizer.step()

            # Track train loss by multiplying average loss by number of examples in batch
            sample_of_train += data.size(0)
            train_loss += loss.item() * data.size(0)

            # Calculate accuracy by finding max log probability
            _, pred = torch.max(output, dim=1)
            correct_tensor = pred.eq(target.data.view_as(pred))
            # Need to convert correct tensor from int to float to average
            accuracy = torch.mean(correct_tensor.type(torch.FloatTensor))
            # Multiply average accuracy times the number of examples in batch
            train_acc += accuracy.item() * data.size(0)

            # Track training progress
            print(
                f'Epoch: {epoch}\t{100 * (ii + 1) / len(train_loader):.2f}% complete| Time:{timer() - start:.2f}| Training loss:{train_loss/sample_of_train:.4f}|acc:{train_acc/sample_of_train:.4f}.',
                end='\r')

        # After training loops ends, start validation
        else:
            model.epochs += 1

            # Don't need to keep track of gradients
            with torch.no_grad():
                # Set to evaluation mode
                model.eval()

                # Validation loop
                for data, target in valid_loader:
                    # Tensors to gpu
                    if train_on_gpu:
                        data, target = data.to(Device), target.to(Device)

                    # Forward pass
                    output = model(data)

                    # Validation loss
                    loss = criterion(output, target)
                    # Multiply average loss times the number of examples in batch
                    valid_loss += loss.item() * data.size(0)

                    # Calculate validation accuracy
                    _, pred = torch.max(output, dim=1)
                    correct_tensor = pred.eq(target.data.view_as(pred))
                    accuracy = torch.mean(
                        correct_tensor.type(torch.FloatTensor))
                    # Multiply average accuracy times the number of examples
                    valid_acc += accuracy.item() * data.size(0)

                # Calculate average losses
                train_loss = train_loss / len(train_loader.dataset)
                valid_loss = valid_loss / len(valid_loader.dataset)

                # Calculate average accuracy
                train_acc = train_acc / len(train_loader.dataset)
                valid_acc = valid_acc / len(valid_loader.dataset)

                history.append([train_loss, valid_loss, train_acc, valid_acc])

                # Print training and validation results
                if (epoch + 1) % print_every == 0:
                    print(
                        f'\nEpoch: {epoch} \tTraining Loss: {train_loss:.4f} \tValidation Loss: {valid_loss:.4f}'
                    )
                    print(
                        f'\t\tTraining Accuracy: {100 * train_acc:.2f}%\t Validation Accuracy: {100 * valid_acc:.2f}%'
                    )

                # Save the model if validation loss decreases
                if valid_loss < valid_loss_min:
                    # Save model
                    torch.save(model.state_dict(), save_file_name)
                    # Track improvement
                    epochs_no_improve = 0
                    valid_loss_min = valid_loss
                    valid_best_acc = valid_acc
                    best_epoch = epoch

                # Otherwise increment count of epochs with no improvement
                else:
                    epochs_no_improve += 1
                    # Trigger early stopping
                    if epochs_no_improve >= max_epochs_stop:
                        print(
                            f'\nEarly Stopping! Total epochs: {epoch}. Best epoch: {best_epoch} with loss: {valid_loss_min:.2f} and acc: {100 * valid_acc:.2f}%'
                        )
                        total_time = timer() - overall_start
                        print(
                            f'{total_time:.2f} total seconds elapsed. {total_time / (epoch+1):.2f} seconds per epoch.'
                        )

                        # Load the best state dict
                        model.load_state_dict(torch.load(save_file_name))
                        # Attach the optimizer
                        model.optimizer = optimizer

                        # Format history
                        history = pd.DataFrame(
                            history,
                            columns=[
                                'train_loss', 'valid_loss', 'train_acc',
                                'valid_acc'
                            ])
                        return model, history

    # Attach the optimizer
    model.optimizer = optimizer
    # Record overall time and print out stats
    total_time = timer() - overall_start
    print(
        f'\nBest epoch: {best_epoch} with loss: {valid_loss_min:.2f} and acc: {100 * valid_acc:.2f}%'
    )
    print(
        f'{total_time:.2f} total seconds elapsed. {total_time / (epoch):.2f} seconds per epoch.'
    )
    # Format history
    history = pd.DataFrame(
        history,
        columns=['train_loss', 'valid_loss', 'train_acc', 'valid_acc'])
    return model, history

# Training Model Process(Skip this block and use training weight which we had trained.)

In [9]:
num_classes = 100
learning_rate = 0.00005
num_epochs = 200

model = CustomResNet50(num_classes=num_classes).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
model, history = train(
        model,
        criterion,
        optimizer,
        train_loader,
        valid_loader,
        save_file_name=os.path.join('model_weight','RES50'),
        max_epochs_stop=5,
        n_epochs=num_epochs,
        print_every=1,
        train_on_gpu = True,
        Device = device)

Starting Training from Scratch.

Epoch: 0	100.00% complete| Time:70.65| Training loss:4.3043|acc:0.3462.
Epoch: 0 	Training Loss: 4.3043 	Validation Loss: 4.0766
		Training Accuracy: 34.62%	 Validation Accuracy: 55.80%
Epoch: 1	100.00% complete| Time:70.55| Training loss:3.9445|acc:0.7106.
Epoch: 1 	Training Loss: 3.9445 	Validation Loss: 3.8899
		Training Accuracy: 71.06%	 Validation Accuracy: 75.20%
Epoch: 2	100.00% complete| Time:70.08| Training loss:3.8071|acc:0.8370.
Epoch: 2 	Training Loss: 3.8071 	Validation Loss: 3.7753
		Training Accuracy: 83.70%	 Validation Accuracy: 86.20%
Epoch: 3	100.00% complete| Time:69.64| Training loss:3.7219|acc:0.9158.
Epoch: 3 	Training Loss: 3.7219 	Validation Loss: 3.7352
		Training Accuracy: 91.58%	 Validation Accuracy: 90.60%
Epoch: 4	100.00% complete| Time:70.38| Training loss:3.6824|acc:0.9501.
Epoch: 4 	Training Loss: 3.6824 	Validation Loss: 3.7016
		Training Accuracy: 95.01%	 Validation Accuracy: 92.80%
Epoch: 5	100.00% complete| Time:70.78

# Prediction Test Data

In [42]:
num_classes = 100
prob = np.ones((500,100))
model.load_state_dict(torch.load(os.path.join('model_weight','RES50')))
label_pred = np.ones((1,100))
with torch.no_grad():
    model.eval()
    for images in test_loader:
        images = images.to(device)
        outputs = model(images).detach().cpu().numpy()
        label_pred = np.append(label_pred, outputs, axis=0)
label_pred = label_pred[1:,:][test_indices,:]
prob = label_pred


with open(os.path.join('pred_prob','Res50.pickle'), 'wb') as f:
    pickle.dump(prob, f)


# Calculate Accuracy

In [44]:
label_true = pd.read_csv('Test_label.csv')

with open(os.path.join('pred_prob','Res50.pickle'), 'rb') as f:
    prob = pickle.load(f)

name = 'Res50'
def acc_calculate(name,
                  prob,
                  label_true):
    prob_label = pd.DataFrame(prob.argmax(axis=1), columns=['Category'])
    acc = (sum(prob_label['Category'] == label_true['Category'])/500)
    return_dict = {'model' : [name], 'acc' : [acc]}
    return pd.DataFrame(return_dict).sort_values(by='acc')

accuracy = acc_calculate(name, prob, label_true)
print(accuracy)


   model    acc
0  Res50  0.984
