In [32]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import transforms, datasets, utils
from torch.autograd import Variable

torch.manual_seed(42)

<torch._C.Generator at 0x28ad62c1a10>

# Step 1 Build data pipeline and import image dataset (split into training set and test set 5:5)

In [33]:
# data_dir is the address where you store your data data, I put it here under relative path
data_dir = "D:/zhuomian/2022_project/data"
split = 0.5
batch_size = 32
learning_rate = 0.001
num_epochs = 30

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print("device:", device)

device: cpu


In [34]:
# Build data pre-processing
data_transform = transforms.Compose([transforms.ToTensor(),
                                     transforms.Resize((56,56)),
                                     transforms.Normalize((0.5,0.5,0.5),(0.5,0.5,0.5))])

# Import data into dataset
dataset = datasets.ImageFolder(root = data_dir, transform = data_transform)

# Now divide the data set into a training set and a test set
len_imgs = len(dataset.imgs)
train_set, test_set = torch.utils.data.random_split(dataset, [int(len_imgs*split), len_imgs-int(len_imgs*split)]) 

# Final construction of the data pipeline
train_loader = torch.utils.data.DataLoader(train_set,batch_size = batch_size,shuffle = True)
test_loader = torch.utils.data.DataLoader(test_set,batch_size = batch_size,shuffle = False)

# Step 2 Build the model Customized loss function and optimizer

In [35]:
class CNN_model(nn.Module):
    def __init__(self):
        super(CNN_model, self).__init__()
        # conv2d Parameters nn.Conv2d(in_channels=3, out_channels=32,kernel_size=5,stride=1,padding="same")
        # MaxPool2d Parameters nn.MaxPool2d(kernel_size=2, stride=2)
        self.conv0 = nn.Sequential(         
            nn.Conv2d(3, 16, 1, 1, padding = "same"),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.Conv2d(16, 16, 3, 1, padding = "same"),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.Conv2d(16, 3, 1, 1, padding = "same"),
            nn.BatchNorm2d(3),
            nn.ReLU(),
        )
        
        self.conv1 = nn.Sequential(         
            nn.Conv2d(3, 32, 3, 1, padding = "same"),     
            nn.ReLU(),                      
            nn.MaxPool2d(2, 2) 
        )
        
        self.conv2 = nn.Sequential(         
            nn.Conv2d(32, 64, 3, 1, padding = "same"),     
            nn.ReLU(),                      
            nn.MaxPool2d(2, 2)                
        )
        
        self.conv3 = nn.Sequential(         
            nn.Conv2d(64, 32, 3, 1, padding = "same"),     
            nn.ReLU(),                      
            nn.MaxPool2d(2, 2)                
        )
        
        # Fully connected layer with 21 output categories
        self.fc1 = nn.Sequential(nn.Linear(32 * 7 * 7, 128),nn.ReLU())
        self.fc2 = nn.Sequential(nn.Linear(128, 21))
    def forward(self, x):
        identity = x
        x = self.conv0(x)
        x = identity + x
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        # Flattening treatment x.view
        x = x.view(x.size(0), -1)       
        x = self.fc1(x)
        output = self.fc2(x)
        return output    # return x for visualization

In [36]:
model = CNN_model()
model.to(device)
# Choose whether to print the model or not, here it will not be printed, just comment
print(model)

# Select loss function and optimizer
loss_func = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(),
                       lr=learning_rate, betas=(0.9, 0.999),
                       eps=1e-07, amsgrad=False)

CNN_model(
  (conv0): Sequential(
    (0): Conv2d(3, 16, kernel_size=(1, 1), stride=(1, 1), padding=same)
    (1): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): Conv2d(16, 16, kernel_size=(3, 3), stride=(1, 1), padding=same)
    (4): BatchNorm2d(16, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): ReLU()
    (6): Conv2d(16, 3, kernel_size=(1, 1), stride=(1, 1), padding=same)
    (7): BatchNorm2d(3, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (8): ReLU()
  )
  (conv1): Sequential(
    (0): Conv2d(3, 32, kernel_size=(3, 3), stride=(1, 1), padding=same)
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (conv2): Sequential(
    (0): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=same)
    (1): ReLU()
    (2): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (conv3): Sequential(
    (0): 

# Step 3 Pass in the model to start training and testing

In [37]:
# Train the model and record the accuracy and loss values for each epoch during training
def train(num_epochs, model, loaders):
    model.train()   
    
    # The first stores the correct rate and the second stores the loss value
    Accy_list = []
    Loss_list = []
    
    total_step = len(loaders)    
    for epoch in range(num_epochs):
        correct = 0
        L = 0
        for i, (images, labels) in enumerate(loaders):
            images = images.to(device)
            labels = labels.to(device)              
            outputs = model(images)
            loss = loss_func(outputs, labels)
            
            # Clear the previous gradient 
            optimizer.zero_grad()           
            
            # Backpropagate and update the parameters
            loss.backward()               
            optimizer.step()                
            
            _, predicted = torch.max(outputs.data, 1)
            correct += (predicted == labels).float().sum()
            L += loss.item()
            if (i+1) % 9 == 0:
                print ('Epoch: [{}/{}]\t|\tStep: [{}/{}]\t|\tLoss: {:.4f}' 
                       .format(epoch + 1, num_epochs, i + 1, total_step, loss.item()))
                
        accuracy = 100 * correct / len(train_set)
        print(accuracy.item())
        print("epoch: {:02d} | Accuracy: {:.2f} Loss: {:.4f}\n"
              .format(epoch + 1, accuracy, L))
        
        # Add the correct and lost values for this epoch to the corresponding list
        Accy_list.append(accuracy.item())
        Loss_list.append(L)
              
    return Accy_list, Loss_list


Accy_list, Loss_list = train(num_epochs, model, train_loader)

Epoch: [1/30]	|	Step: [9/27]	|	Loss: 3.0348
Epoch: [1/30]	|	Step: [18/27]	|	Loss: 3.0370
Epoch: [1/30]	|	Step: [27/27]	|	Loss: 3.0496
5.707490921020508
epoch: 01 | Accuracy: 5.71 Loss: 82.2532

Epoch: [2/30]	|	Step: [9/27]	|	Loss: 2.9416
Epoch: [2/30]	|	Step: [18/27]	|	Loss: 2.9338
Epoch: [2/30]	|	Step: [27/27]	|	Loss: 2.7628
7.253270149230957
epoch: 02 | Accuracy: 7.25 Loss: 79.8821

Epoch: [3/30]	|	Step: [9/27]	|	Loss: 2.8820
Epoch: [3/30]	|	Step: [18/27]	|	Loss: 2.5984
Epoch: [3/30]	|	Step: [27/27]	|	Loss: 2.4460
16.052318572998047
epoch: 03 | Accuracy: 16.05 Loss: 73.4796

Epoch: [4/30]	|	Step: [9/27]	|	Loss: 2.6770
Epoch: [4/30]	|	Step: [18/27]	|	Loss: 2.6803
Epoch: [4/30]	|	Step: [27/27]	|	Loss: 2.2891
28.29964256286621
epoch: 04 | Accuracy: 28.30 Loss: 64.7011

Epoch: [5/30]	|	Step: [9/27]	|	Loss: 1.9415
Epoch: [5/30]	|	Step: [18/27]	|	Loss: 2.1765
Epoch: [5/30]	|	Step: [27/27]	|	Loss: 1.4327
36.86088180541992
epoch: 05 | Accuracy: 36.86 Loss: 54.6552

Epoch: [6/30]	|	Step: [9/2

In [38]:
print(Accy_list)

[5.707490921020508, 7.253270149230957, 16.052318572998047, 28.29964256286621, 36.86088180541992, 44.94649124145508, 54.22116470336914, 69.20333099365234, 76.33769226074219, 82.63971710205078, 87.87158203125, 90.6064224243164, 94.88703918457031, 96.31391143798828, 96.19500732421875, 96.43281555175781, 97.50297546386719, 98.33531188964844, 99.52437591552734, 99.52437591552734, 99.64328002929688, 99.16765594482422, 99.7621841430664, 99.88109588623047, 99.04875183105469, 98.4542236328125, 99.04875183105469, 99.04875183105469, 98.09750366210938, 97.02735137939453]


In [39]:
print(Loss_list)

[82.25321245193481, 79.88207983970642, 73.47957515716553, 64.70112419128418, 54.65520679950714, 48.25766575336456, 38.203760504722595, 26.012213826179504, 20.020847469568253, 14.07103842496872, 9.985123850405216, 8.023274064064026, 5.509518422186375, 3.856141958385706, 3.5652719140052795, 3.275303578004241, 2.073679557070136, 1.464397206902504, 0.7642165070865303, 0.5895823836326599, 0.608004234964028, 0.6334550797473639, 0.38261501456145197, 0.2612361302017234, 1.5376014355570078, 1.381027520634234, 1.0673939399421215, 0.7221830401103944, 1.766703768633306, 3.0540570886805654]


In [40]:
# Test models
def test(model, loaders):
    model.eval()
    with torch.no_grad():
        correct = 0
        L = 0
        for images, labels in loaders:
            images = images.to(device)
            labels = labels.to(device)              
            outputs = model(images)
            # Get the loss value and add it up
            loss = loss_func(outputs, labels)
            L += loss.item()
            
             # Get the correct number of predicted samples
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == labels).float().sum()
            
    accuracy = (100 * correct / len(test_set))
    print("Test data | Accuracy: {:.2f} %, Loss: {:.4f} ".format(accuracy, L))
    
    return accuracy, L

Test_acc, Test_loss = test(model, test_loader)

Test data | Accuracy: 63.14 %, Loss: 77.1052 
