In [1]:
import os
import pandas as pd
import numpy as np
from PIL import Image
import torch
import torch.nn as nn
from torch.utils import data
from torch.utils.data import TensorDataset, DataLoader
from torchvision import datasets, transforms
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix
import itertools

def getData(mode):
    if mode == 'train':
        df = pd.read_csv('train.csv')
        path = df['Path'].tolist()
        label = df['label'].tolist()
        return path, label
    
    elif mode == 'valid':
        df = pd.read_csv('valid.csv')
        path = df['Path'].tolist()
        label = df['label'].tolist()
        return path, label
    
    else:
        df = pd.read_csv('resnet_18_test.csv')
        path = df['Path'].tolist()
        return path

class RetinopathyLoader(data.Dataset):
    def __init__(self, root, mode):
        """
        Args:
            root (string): Root path of the dataset.
            mode : Indicate procedure status(training or testing)

            self.img_name (string list): String list that store all image names.
            self.label (int or float list): Numerical list that store all ground truth label values.
        """
        self.root = root
        self.mode = mode
        
        if mode == 'train' or mode == 'valid':
            self.img_name, self.label = getData(mode)
        else:  # mode == 'test'
            self.img_name = getData(mode)
            self.label = None  # No labels for test data
            
        # print("> Found %d images..." % (len(self.img_name)))

    def __len__(self):
        """'return the size of dataset"""
        return len(self.img_name)

    def __getitem__(self, index):

        """
           step1. Get the image path from 'self.img_name' and load it.
                  hint : path = root + self.img_name[index] + '.jpeg'

           step2. Get the ground truth label from self.label

           step3. Transform the .jpeg rgb images during the training phase, such as resizing, random flipping, 
                  rotation, cropping, normalization etc. But at the beginning, I suggest you follow the hints. 

                  In the testing phase, if you have a normalization process during the training phase, you only need 
                  to normalize the data. 

                  hints : Convert the pixel value to [0, 1]
                          Transpose the image shape from [H, W, C] to [C, H, W]

            step4. Return processed image and label
        """

        img_path = os.path.join(self.root, self.img_name[index]) # + '.jpeg'
        img = Image.open(img_path)
        
        if self.mode == 'train':
            transform=transforms.Compose([
                transforms.CenterCrop(400), # crops the center region of the image with a square size of height
                # transforms.Resize(400), # (h, w) 512x512 pixels
                transforms.RandomHorizontalFlip(), #  randomly flips the image horizontally with a 50% chance
                transforms.RandomRotation(degrees=15), # randomly rotates the image by a maximum of 15 degree
                transforms.ToTensor(),
                transforms.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
                # Add an additional transformation to rescale pixel values to [0, 1]
                # transforms.Lambda(lambda x: (x + 1.0) / 2.0)
            ])
        elif self.mode == 'valid':
            transform=transforms.Compose([
                transforms.CenterCrop(400),
                # transforms.Resize(400),
                transforms.ToTensor(),
                transforms.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
                # Add an additional transformation to rescale pixel values to [0, 1]
                # transforms.Lambda(lambda x: (x + 1.0) / 2.0)
            ])
        else:
            transform = transforms.Compose([
                transforms.CenterCrop(400),
                transforms.ToTensor(),
                transforms.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
            ])
        
        img = transform(img) 
        
        # print(img[0].shape)
        
        if self.label is not None:
            label = self.label[index]
            return img, label
        else:
            return img

In [2]:
torch.manual_seed(514)

batch_size = 64

train_dataset = RetinopathyLoader(root = ".", mode="train")
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

valid_dataset = RetinopathyLoader(root = ".", mode="valid")
valid_loader = DataLoader(valid_dataset, batch_size=batch_size, shuffle=True)

test_dataset = RetinopathyLoader(root = ".", mode="test")
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# train_cnt = len(train_loader.dataset)
# print(train_cnt) # 7995

# valid_cnt = len(valid_loader.dataset)
# valid_cnt # 1599

test_cnt = len(test_loader.dataset)
test_cnt # 1067

1067

In [3]:
def get_device():
    return 'cuda' if torch.cuda.is_available() else 'cpu'

device = get_device()
device

'cuda'

In [31]:
class BottleneckBlock(nn.Module):
    def __init__(self, prev_channel, in_channel, out_channel, stride=(1, 1), downsample=False):
        super().__init__()
                
        # First bottleneck block needs to be downsampled
        if downsample: 
            self.conv = nn.Sequential(
                nn.Conv2d(prev_channel, out_channel, kernel_size=(1, 1), stride=(1, 1), bias=False),
                nn.BatchNorm2d(out_channel, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
                nn.Conv2d(out_channel, out_channel, kernel_size=(3, 3), stride=stride, padding=(1, 1), bias=False),
                nn.BatchNorm2d(out_channel, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
                nn.Conv2d(out_channel, in_channel, kernel_size=(1, 1), stride=(1, 1), bias=False),
                nn.BatchNorm2d(in_channel, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
            )
            self.down_sample = nn.Sequential(
                nn.Conv2d(prev_channel, in_channel, kernel_size=(1,1), stride=stride, bias=False),
                nn.BatchNorm2d(in_channel)
            )
        else:
            self.conv = nn.Sequential(
                nn.Conv2d(in_channel, out_channel, kernel_size=(1, 1), stride=(1, 1), bias=False),
                nn.BatchNorm2d(out_channel, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
                nn.Conv2d(out_channel, out_channel, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False),
                nn.BatchNorm2d(out_channel, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True),
                nn.Conv2d(out_channel, in_channel, kernel_size=(1, 1), stride=(1, 1), bias=False),
                nn.BatchNorm2d(in_channel, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
            )
            self.down_sample = nn.Identity()
            
        self.ReLU = nn.ReLU(inplace=True)
        
    def forward(self, x):
        out = self.conv(x)
        residual = self.down_sample(x)
        out = self.ReLU(out + residual)
        return out

def add_bottleneck_blocks(prev_channel, in_channel, out_channel, stride): #, num_blocks):
    bottleneck_blocks = []
    bottleneck_blocks.append(BottleneckBlock(prev_channel, in_channel, out_channel, stride, True))
    bottleneck_blocks.append(BottleneckBlock(prev_channel, in_channel, out_channel))
    bottleneck_blocks.append(BottleneckBlock(prev_channel, in_channel, out_channel))
    return bottleneck_blocks

class ResNet50(nn.Module):
    def __init__(self):
        super().__init__()
        
        self.conv1 = nn.Sequential(
            nn.Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False),
            nn.BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True,),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), dilation=1, ceil_mode=False),
        )
        
        # Each layer has 3 bottleneck blocks
        self.layer1 = nn.Sequential(*add_bottleneck_blocks(64, 64, 256, (1, 1))) 
        self.layer2 = nn.Sequential(*add_bottleneck_blocks(256, 128, 512, (2, 2))) 
        self.layer3 = nn.Sequential(*add_bottleneck_blocks(512, 256, 1024, (2, 2)))
        self.layer4 = nn.Sequential(*add_bottleneck_blocks(1024, 512, 2048, (2, 2)))

        # self.avg_pool = nn.AvgPool2d(kernal_size=7, stride=1, padding=0)
        self.avg_pool = nn.AdaptiveAvgPool2d(output_size=(1, 1)) # (output_H, output_W)
        self.fc = nn.Linear(2048, 2)

    def forward(self, x):
        output = self.conv1(x)
        output = self.layer1(output)
        output = self.layer2(output)
        output = self.layer3(output)
        output = self.layer4(output)              # print(output.shape) (num, 512, n, n)
        output = self.avg_pool(output)            # print(output.shape) (num, 512, 1, 1)
        # out = nn.flatten(out)
        output = output.view(output.shape[0], -1) # print(output.shape) (num, 512)
        output = self.fc(output)                  # print(output.shape) (num, 2)
        return output

In [26]:
train_acc = []
valid_acc = []
predicted_list = []
predicted_label = []
test_predicted_label = []

In [27]:
def epoch_loop(num_epoch, train_loader, test_loader, model, loss_fn, optimizer):
    best_acc = 0
    for e in range(num_epoch):
        global predicted_list
        predicted_list = []
        train_acc = train(train_loader, model, loss_fn, optimizer)
        valid_acc = valid(valid_loader, model, loss_fn)
        if (e % (num_epoch//10)) == 0:
            print(f'Epoch: {e:5}, train_acc:{train_acc:.2f}%')
            print(f'Epoch: {e:5}, valid_acc:{valid_acc:.2f}%')
            print(f'Current best acc: {best_acc:.2f}%')
        if valid_acc > best_acc:
            global predicted_label
            predicted_label = predicted_list
            best_acc = valid_acc
            torch.save(model.state_dict(), "ResNet50_"+str(optimizer.__class__.__name__)+".pth")
            '''
            print("Model state dict:")
            for param_tensor in model.state_dict():
                print(param_tensor, "\t", model.state_dict()[param_tensor])
            print("Model save")
            '''
        if (e+1) == num_epoch:
            print(f'best acc: {best_acc:.2f}%')
            
def train(train_loader, model, loss_fn, optimizer):
    model.train() 
    match_train_cnt = 0
    total_cnt = len(train_loader.dataset)
    
    for i, (train_data, train_label) in enumerate(train_loader):
        train_data = train_data.to(device)
        train_label = train_label.to(device).to(torch.long)

        output = model(train_data)
        loss = loss_fn(output, train_label)

        pred = torch.max(output, dim=1) 
        match_train_cnt += (pred[1] == train_label).sum().item()

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    
    acc = match_train_cnt / total_cnt * 100
    train_acc.append(acc)
    return acc
        
def valid(valid_loader, model, loss_fn):
    # Step 5: Testing loop
    model.eval()
    match_valid_cnt = 0
    total_cnt = len(valid_loader.dataset)
    
    with torch.no_grad():
        for i, (valid_data, valid_label) in enumerate(valid_loader):   
            valid_data = valid_data.to(device)
            valid_label = valid_label.to(device).to(torch.long)
            output = model(valid_data)
            pred = torch.max(output, dim=1)
            match_valid_cnt += (pred[1] == valid_label).sum().item()
            
            # Convert the ground truth labels and predicted labels to NumPy arrays and 
            # Retrieve the indices corresponding to their maximum values
            # true_label.extend(valid_label.cpu().data.numpy())
            predicted_list.append(pred[1].cpu().data.numpy())

    acc = match_valid_cnt / total_cnt * 100
    valid_acc.append(acc)
    return acc

def test(test_loader, model):
    model.eval()
    with torch.no_grad():
        for i, test_data in enumerate(test_loader):
            test_data = test_data.to(device)
            output = model(test_data)
            pred = torch.max(output, dim=1)
            test_predicted_label.append(pred[1].cpu().data.numpy())

In [28]:
def show_result():
    best_train_acc = 0
    best_valid_acc = 0
    for i in range(num_epoch):
        best_train_acc = max(best_train_acc, train_acc[i])
        best_valid_acc = max(best_valid_acc, valid_acc[i])
    
    plt.title('Accuracy Curve(ResNet50)')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy(%)')

    plt.plot(train_acc, label = 'ResNet50_train', color = 'b')
    plt.plot(valid_acc, label = 'ResNet50_valid', color = 'r')
    
    plt.legend()
    plt.show()
    
    print(f'Best train acc: {best_train_acc:.2f}%, Best valid acc: {best_valid_acc:.2f}%')

In [29]:
def plot_confusion_matrix(true_label, predicted_label, classes, normalize = False, title = None, cmap = plt.cm.Blues):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    # Compute confusion matrix
    cm = confusion_matrix(true_label, predicted_label)
    
    # Calculate class-wise accuracy if normalization is enabled
    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]

    plt.figure(figsize=(6, 6))
    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()

    tick_marks = np.arange(len(classes))
    plt.xticks(tick_marks, classes)
    plt.yticks(tick_marks, classes)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.ylabel('True Label')
    plt.xlabel('Predicted Label')
    plt.tight_layout()
    plt.show()

In [32]:
num_epoch = 101
model = ResNet50().to(device)
loss_fn = nn.CrossEntropyLoss().to(device)

# optimizer = torch.optim.Adam(model.parameters(), lr=0.001, betas=(0.9, 0.999), eps=1e-08, weight_decay=0, amsgrad=False)
optimizer = torch.optim.SGD(model.parameters(), lr=0.001, weight_decay=5e-4, momentum=0.9)
epoch_loop(num_epoch, train_loader, valid_loader, model, loss_fn, optimizer)

OutOfMemoryError: CUDA out of memory. Tried to allocate 158.00 MiB (GPU 0; 11.99 GiB total capacity; 25.20 GiB already allocated; 0 bytes free; 25.28 GiB reserved in total by PyTorch) If reserved memory is >> allocated memory try setting max_split_size_mb to avoid fragmentation.  See documentation for Memory Management and PYTORCH_CUDA_ALLOC_CONF

In [None]:
show_result()

_, true_label = getData('valid')
predicted_label = np.concatenate(predicted_label).tolist()
plot_confusion_matrix(true_label, predicted_label, ['0', '1'], title = 'Confusion matrix of ResNet50')

In [None]:
# Load model and show best acc on valid dataset

loss_fn = nn.CrossEntropyLoss().to(device)
model = ResNet50().to(device)
optimizer = torch.optim.SGD(model.parameters(), lr=0.001, weight_decay=5e-4, momentum=0.9)
filename = "ResNet50_"+str(optimizer.__class__.__name__)+".pth"
model.load_state_dict(torch.load(filename))
model.eval()
best_acc = valid(valid_loader, model, loss_fn)
print(f'Best valid acc: {best_acc:.2f}%')

In [None]:
# Load model and run on test data then write to csv

loss_fn = nn.CrossEntropyLoss().to(device)
model = ResNet50().to(device)
optimizer = torch.optim.SGD(model.parameters(), lr=0.001, weight_decay=5e-4, momentum=0.9)
filename = "ResNet50_"+str(optimizer.__class__.__name__)+".pth"
model.load_state_dict(torch.load(filename))
model.eval()
test(test_loader, model)
test_predicted_label = np.concatenate(test_predicted_label).tolist()

In [None]:
import pandas as pd

def write_to_csv(image_paths, labels, output_file):
    df = pd.DataFrame({'ID': image_paths, 'label': labels})
    df.to_csv(output_file, index=False)
    
image_paths = getData('test')

labels = test_predicted_label

output_file = "resnet_50_test_SGD.csv"

write_to_csv(image_paths, labels, output_file)