In [1]:
import torch 
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
import glob
from torch.utils.data import Dataset
from PIL import Image 
from tqdm import tqdm
# Device configuration
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

# Hyper parameters
num_epochs = 30
num_classes = 1
BATCH_SIZE = 4
learning_rate = 0.005
IMG_SIZE = 224

In [2]:
TRAIN_DATA_PATH = '/home/quang/working/image_processing/data/train_crop_1/'
TRAIN_DATA_PATH_COLOR = '/home/quang/working/image_processing/data/train_color_1/'

list_paths_imgs = glob.glob(TRAIN_DATA_PATH + '*/*')
print (len(list_paths_imgs))
list_labels = [1 if x.split('/')[-2] == 'AMD' else 0 for x in list_paths_imgs]
list_paths_imgs_color = [TRAIN_DATA_PATH_COLOR + x.split('/')[-2] + '/' + x.split('/')[-1] for x in list_paths_imgs]

400


In [3]:
TEST_DATA_PATH = '/home/quang/working/image_processing/data/valid_crop/'
TEST_DATA_PATH_COLOR = '/home/quang/working/image_processing/data/valid_color/'

list_paths_imgs_test = glob.glob(TEST_DATA_PATH + '*/*')
print (len(list_paths_imgs_test))
list_labels_test = [1 if x.split('/')[-2] == 'AMD' else 0 for x in list_paths_imgs_test]
list_paths_imgs_color_test = [TEST_DATA_PATH_COLOR + x.split('/')[-2] + '/' + x.split('/')[-1] for x in list_paths_imgs_test]

96


In [4]:
TRANSFORM_IMG = transforms.Compose([
    transforms.Resize(IMG_SIZE),
    transforms.CenterCrop(IMG_SIZE),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225] )
    ])

class FundusDataset(Dataset):
    """Face Landmarks dataset."""

    def __init__(self, list_paths, list_labels, list_paths_color, transform=None):

        self.list_paths = list_paths
        self.list_labels = list_labels
        self.list_paths_color = list_paths_color

        self.transform = transform

    def __len__(self):
        # return 100
        return len(self.list_paths)

    def __getitem__(self, idx):
        # image1 = cv2.imread(self.list_paths[idx])
        # image1 = cv2.cvtColor(image1, cv2.COLOR_BGR2RGB)
        image1 = Image.open(self.list_paths[idx])
        image2 = Image.open(self.list_paths_color[idx])
                
        if self.transform:
            image1 = self.transform(image1)
            image2 = self.transform(image2)

        img_cat = torch.cat([image1,image2],dim=0)
        return img_cat, torch.tensor(self.list_labels[idx])

train_data = FundusDataset(list_paths_imgs, list_labels, list_paths_imgs_color, TRANSFORM_IMG)
train_loader = torch.utils.data.DataLoader(train_data, batch_size=BATCH_SIZE, shuffle=True,  num_workers=4)

test_data = FundusDataset(list_paths_imgs_test, list_labels_test, list_paths_imgs_color_test, TRANSFORM_IMG)
test_loader = torch.utils.data.DataLoader(test_data, batch_size=BATCH_SIZE, shuffle=False,  num_workers=4)

In [5]:
# Convolutional neural network (two convolutional layers)
class ConvNet(nn.Module):
    def __init__(self, num_classes=1):
        super(ConvNet, self).__init__()
        self.layer1 = nn.Sequential(
            nn.Conv2d(6, 16, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))

        self.layer2 = nn.Sequential(
            nn.Conv2d(16, 32, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2))
        
        self.layer3 = nn.Sequential(
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            
            nn.Conv2d(64, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
        )

        self.fc = nn.Linear(7*7*128, num_classes)
        
    def forward(self, x):
        out = self.layer1(x)
        out = self.layer2(out)
        out = self.layer3(out)
        out = out.reshape(out.size(0), -1)
        out = self.fc(out)
        return out

model = ConvNet(num_classes).to(device)

# Loss and optimizer
# criterion = nn.CrossEntropyLoss()
pos_weight = torch.tensor([2.]).cuda()
criterion = nn.BCEWithLogitsLoss(pos_weight=pos_weight).cuda()
# optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
optimizer = torch.optim.SGD(model.parameters(), lr = learning_rate)

In [None]:
# Train the model
total_step = len(train_loader)
best_acc = 0.
best_model = None
hist_acc = []
hist_loss = []
for epoch in range(num_epochs):
    loss_epoch = 0
    model.train()
    for i, (images, labels) in enumerate(tqdm(train_loader)):
        images = images.to(device)
        labels = labels.to(device)
        labels = labels.float()
        # Forward pass
        outputs = model(images)
        loss = criterion(outputs.view(-1), labels)
        loss_epoch += loss.item()
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
    print ('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}' 
                   .format(epoch+1, num_epochs, i+1, total_step, loss_epoch/total_step))   
    model.eval()  # eval mode (batchnorm uses moving mean/variance instead of mini-batch mean/variance)
    with torch.no_grad():
        correct = 0
        total = 0
        for i, (images, labels) in enumerate(tqdm(test_loader)):
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
    #         _, predicted = torch.max(outputs.data, 1)
            predicted = torch.where(outputs > 0, torch.tensor(1).cuda(), torch.tensor(0).cuda()).view(-1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
        if best_acc < correct / total:
            best_acc = correct / total
            torch.save(model.state_dict(), './model_final.pt')
        print('Acc: {} %'.format(100 * correct / total))
    hist_loss.append(loss_epoch/total_step)
    hist_acc.append(correct / total)
print('Best Acc: {} %'.format(100 * best_acc))

100%|██████████| 100/100 [00:02<00:00, 46.79it/s]
  0%|          | 0/24 [00:00<?, ?it/s]

Epoch [1/30], Step [100/100], Loss: 1.1040


100%|██████████| 24/24 [00:00<00:00, 35.57it/s]
  0%|          | 0/100 [00:00<?, ?it/s]

Acc: 56.25 %


100%|██████████| 100/100 [00:02<00:00, 44.38it/s]
  0%|          | 0/24 [00:00<?, ?it/s]

Epoch [2/30], Step [100/100], Loss: 0.8609


100%|██████████| 24/24 [00:00<00:00, 30.53it/s]
  0%|          | 0/100 [00:00<?, ?it/s]

Acc: 61.458333333333336 %


  1%|          | 1/100 [00:00<00:31,  3.10it/s]

In [None]:
# Test the model
model.load_state_dict(torch.load('./model_final.pt'))
model.eval()  # eval mode (batchnorm uses moving mean/variance instead of mini-batch mean/variance)
with torch.no_grad():
    correct = 0
    total = 0
    for i, (images, labels) in enumerate(tqdm(test_loader)):
        images = images.to(device)
        labels = labels.to(device)
        outputs = model(images)
#         _, predicted = torch.max(outputs.data, 1)
        predicted = torch.where(outputs > 0, torch.tensor(1).cuda(), torch.tensor(0).cuda()).view(-1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    print('Test Accuracy of the model on the test images: {} %'.format(100 * correct / total))

In [None]:
from matplotlib import pyplot as plt
import numpy as np
import pickle

In [None]:
a = {'loss': hist_loss, 'acc': hist_acc}

with open('final.pkl', 'wb') as handle:
    pickle.dump(a, handle)

In [None]:
with open('original.pkl', 'rb') as handle:
    org_hist = pickle.load(handle)
with open('crop.pkl', 'rb') as handle:
    crop_hist = pickle.load(handle)
    
with open('final.pkl', 'rb') as handle:
    final_hist = pickle.load(handle)

In [None]:
plt.title("Loss")
plt.xlabel("Training Epochs")
plt.ylabel("Loss")
plt.plot(range(1,num_epochs+1),org_hist['loss'],label="Orginal")
plt.plot(range(1,num_epochs+1),crop_hist['loss'],label="Crop")
plt.plot(range(1,num_epochs+1),final_hist['loss'],label="Crop+Edge")
plt.ylim((0,1.))
plt.xticks(np.arange(1, num_epochs+1, 1.0))
plt.legend()
plt.savefig('loss.png')
plt.show()


plt.title("Accuracy")
plt.xlabel("Training Epochs")
plt.ylabel("Validation Accuracy")
plt.plot(range(1,num_epochs+1),org_hist['acc'],label="Orginal")
plt.plot(range(1,num_epochs+1),crop_hist['acc'],label="Crop")
plt.plot(range(1,num_epochs+1),final_hist['acc'],label="Crop+Edge")
plt.ylim((0,1.))
plt.xticks(np.arange(1, num_epochs+1, 1.0))
plt.legend()
plt.savefig('acc.png')
plt.show()
