In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim import lr_scheduler
import torch.nn.functional as F
import torchvision
from torchvision import *
from torch.utils.data import Dataset, DataLoader
import yaml
from models.resnet_base_network import ResNet18
import pandas as pd
import numpy as np
from PIL import Image
from pathlib import Path
from tqdm import tqdm

In [2]:
bs = 48
device = 'cuda'

In [3]:
class ImagesDataset(Dataset):
    def __init__(self, train, transform=None):
        """
        Args:
            train (boolean): train or test
        """
        self.paths = []
        self.labels = []
        for path in Path(f'kneeKL224/{train}').glob('**/*'):
            folder, file = os.path.split(path)
            _, ext = os.path.splitext(path)
            if ext.lower() == '.png':
                self.paths.append(path)
                self.labels.append(folder[-1])
        self.transform = transform

    def __len__(self):
        return len(self.paths)

    def __getitem__(self, index):
        path = self.paths[index]
        label = int(self.labels[index])
        img = Image.open(path)

        if self.transform:
            img = self.transform(img)

        return img, label

In [4]:
tfms = transforms.Compose([
    transforms.RandomVerticalFlip(0.5),
    transforms.RandomHorizontalFlip(0.5),
    transforms.RandomAffine(degrees = 11, scale = (0.9, 1.1)),
    transforms.ToTensor()
])

In [5]:
image_datasets = {
    'train': ImagesDataset(train = 'train', transform=tfms),
    'validation': ImagesDataset(train = 'val', transform=tfms)
}

dataloaders = {
    'train': DataLoader(image_datasets['train'], batch_size=bs, shuffle=True, num_workers=0),
    'validation': DataLoader(image_datasets['validation'], batch_size=bs, shuffle=False, num_workers=0)
}

In [6]:
config = yaml.load(open("config/config.yaml", "r"), Loader=yaml.FullLoader)
encoder = ResNet18(**config['network'])
output_feature_dim = encoder.projetion.net[0].in_features

In [7]:
#load pre-trained parameters
load_params = torch.load(os.path.join('runs/Aug13_23-03-15_DESKTOP-SSCRTA9/checkpoints/model.pth'),
                         map_location=torch.device(torch.device(device)))

if 'online_network_state_dict' in load_params:
    encoder.load_state_dict(load_params['online_network_state_dict'])
    print("Parameters successfully loaded.")

# remove the projection head
encoder = encoder.to(device)

Parameters successfully loaded.


In [8]:
encoder.projetion = nn.Sequential(
    nn.Linear(output_feature_dim, output_feature_dim // 2),
    nn.ReLU(inplace=True),
    nn.Dropout(p=0.4),
    nn.Linear(output_feature_dim // 2, 5),
    nn.LogSoftmax(dim=1)).to(device)

In [9]:
def train_model(model, criterion, optimizer, scheduler, num_epochs=3):
    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch+1, num_epochs))
        print('-' * 10)

        for phase in ['train', 'validation']:
            if phase == 'train':
                scheduler.step()
                model.train()
            else:
                model.eval()

            running_loss = 0.0
            valid_acc = 0

            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                outputs = model(inputs)
                loss = criterion(outputs, labels)

                if phase == 'train':
                    optimizer.zero_grad()
                    loss.backward()
                    optimizer.step()

                _, preds = torch.max(outputs, 1)
                running_loss += loss.item() * inputs.size(0)
                correct_tensor = preds.eq(labels.data.view_as(preds))
                accuracy = torch.mean(correct_tensor.type(torch.FloatTensor))
                valid_acc += accuracy.item() * inputs.size(0)

            epoch_loss = running_loss / len(image_datasets[phase])
            epoch_acc = valid_acc / len(image_datasets[phase])

            print('{} loss: {:.4f}, acc: {:.4f}'.format(phase,
                                                        epoch_loss,
                                                        epoch_acc))
    return model

In [10]:
plist = [
        {'params': encoder.encoder[7].parameters(), 'lr': 1e-5},
        {'params': encoder.projetion.parameters(), 'lr': 5e-3}
        ]
optimizer_ft = optim.Adam(plist, lr=0.001)
criterion = nn.NLLLoss()
lr_sch = lr_scheduler.StepLR(optimizer_ft, step_size=10, gamma=0.1)


model_ft = train_model(encoder,
                       criterion,
                       optimizer_ft,
                       lr_sch,
                       num_epochs=3)

torch.save(model_ft.state_dict(), "model.bin")

Epoch 1/3
----------
train loss: 1.3277, acc: 0.4258
validation loss: 1.2075, acc: 0.5085
Epoch 2/3
----------
train loss: 1.1774, acc: 0.4884
validation loss: 1.1438, acc: 0.5085
Epoch 3/3
----------
train loss: 1.1411, acc: 0.5088
validation loss: 1.1482, acc: 0.4939


In [11]:
encoder.load_state_dict(torch.load("model.bin"))
model_ft = encoder.to(device)

TEST_BATCH_SIZE = 10
NUM_CLASSES = 5
test_tfms = transforms.Compose([
    transforms.Resize(size = (80, 80)),
    transforms.ToTensor()
])

test_dataset = ImagesDataset(train = 'test', transform=test_tfms)
test_dataset_loader = DataLoader(test_dataset, batch_size=TEST_BATCH_SIZE, shuffle=False, num_workers=0)

In [12]:
for param in model_ft.parameters():
    param.requires_grad = False

model_ft.eval()
test_preds = np.zeros((len(test_dataset), NUM_CLASSES))
tk0 = tqdm(test_dataset_loader)
for i, x_batch in enumerate(tk0):
    x_batch = x_batch[0]
    pred = model_ft(x_batch.to(device))
    test_preds[i * TEST_BATCH_SIZE:(i + 1) * TEST_BATCH_SIZE, :] = pred.detach().cpu().squeeze().numpy()

test_preds = torch.from_numpy(test_preds).float().to(device).sigmoid()
test_preds = test_preds.detach().cpu().squeeze().numpy()

100%|██████████| 166/166 [00:04<00:00, 38.36it/s]


In [13]:
data = {'y_Predicted':    np.argmax(test_preds, 1),
        'y_Actual': np.array(test_dataset.labels)
        }

df = pd.DataFrame(data, columns=['y_Actual','y_Predicted'])

In [14]:
confusion_matrix = pd.crosstab(df['y_Actual'], df['y_Predicted'], rownames=['Actual'], colnames=['Predicted'])
print (confusion_matrix)

Predicted    0  1    2    3   4
Actual                         
0          118  1  330  184   6
1           39  2  164   89   2
2           55  0  217  165  10
3           21  0  106   91   5
4            1  0   22   27   1
