In [3]:
import os
import argparse
import torch
import torchvision
import torchvision.transforms as transforms
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import confusion_matrix
from sklearn.metrics import accuracy_score
from sklearn.metrics import precision_score
from sklearn.metrics import recall_score
from sklearn.metrics import f1_score
from sklearn.metrics import roc_auc_score
from sklearn.metrics import classification_report

from utils import yaml_config_hook

from modules import SimCLR, LogisticRegression, get_resnet, EarlyStopping
from modules.transformations import TransformsSimCLR

In [4]:
MODEL_NUM = 5
RESNET = 'resnet50'

In [5]:
parser = argparse.ArgumentParser(description="SimCLR")
config = yaml_config_hook("./config/config.yaml")
for k, v in config.items():
    parser.add_argument(f"--{k}", default=v, type=type(v))

args_str = '' 
args, _ = parser.parse_known_args(args=args_str)

args.model_num = MODEL_NUM
args.resnet = RESNET

# args.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
args.device = torch.device('cuda')

print(args.device)

cuda


In [6]:
test_dataset = torchvision.datasets.ImageFolder(
    '/home/opticho/source/SimCLR/datasets/dataset2/test', 
    transform=TransformsSimCLR(size=(args.image_size, args.image_size)).test_transform)

In [7]:
test_dataset # [ [ [image], [label] ] * 835 ]

Dataset ImageFolder
    Number of datapoints: 835
    Root location: /home/opticho/source/SimCLR/datasets/dataset2/test
    StandardTransform
Transform: Compose(
               Resize(size=(224, 224), interpolation=PIL.Image.BILINEAR)
               ToTensor()
           )

In [8]:
test_loader = torch.utils.data.DataLoader(
    test_dataset,
    batch_size=args.logistic_batch_size,
    shuffle=False,
    drop_last=False,
    num_workers=args.workers,
)

In [9]:
test_loader.dataset.samples[0], test_loader.dataset.samples[834]

(('/home/opticho/source/SimCLR/datasets/dataset2/test/Covid/Covid0_1.png', 0),
 ('/home/opticho/source/SimCLR/datasets/dataset2/test/Others/Others9_7.png',
  2))

In [10]:
encoder = get_resnet(args.resnet, pretrained=False)
n_features = encoder.fc.in_features

simclr_model = SimCLR(args, encoder, n_features)
model_fp = os.path.join(
    args.model_path, "model{}.tar".format(args.model_num)
)
simclr_model.load_state_dict(torch.load(model_fp, map_location=args.device.type))
simclr_model.eval()
simclr_model = simclr_model.to(args.device)

In [11]:
n_classes = 3
model = LogisticRegression(simclr_model.n_features, n_classes)
model_saved = os.path.join(args.model_path, f"downstream_{args.model_num}.tar")
model.load_state_dict(torch.load(model_saved, map_location=args.device.type))
model = model.to(args.device)

optimizer = torch.optim.Adam(model.parameters(), lr=3e-4)
criterion = torch.nn.CrossEntropyLoss()

In [12]:
def inference(loader, simclr_model, device):
    feature_vector = []
    labels_vector = []
    for step, (x, y) in enumerate(loader):
        x = x.to(device)

        # get encoding
        with torch.no_grad():
            h, _, z, _ = simclr_model(x, x)

        h = h.detach()

        feature_vector.extend(h.cpu().detach().numpy())
        labels_vector.extend(y.numpy())

        # if step % 20 == 0:
        #     print(f"Step [{step}/{len(loader)}]\t Computing features...")

    feature_vector = np.array(feature_vector)
    labels_vector = np.array(labels_vector)
    print("Features shape {}".format(feature_vector.shape))
    return feature_vector, labels_vector
    
def get_features_test(simclr_model, test_loader, device):
    test_X, test_y = inference(test_loader, simclr_model, device)
    return test_X, test_y

In [13]:
def create_data_loaders_from_arrays_test(X_test, y_test, batch_size):
    
    test = torch.utils.data.TensorDataset(
        torch.from_numpy(X_test), torch.from_numpy(y_test)
    )
    test_loader = torch.utils.data.DataLoader(
        test, batch_size=batch_size, shuffle=False
    )
    return test_loader


In [14]:
(test_X, test_y) = get_features_test(
    simclr_model, test_loader, args.device
)

arr_test_loader = create_data_loaders_from_arrays_test(
    test_X, test_y, args.logistic_batch_size
)

Features shape (835, 2048)


In [15]:
def print_specificity(specificity):
    print('\t\tspecificity')
    print('')

    print(f'       covid\t{specificity[0]:.2f}')
    print(f'     healthy\t{specificity[1]:.2f}')
    print(f'      others\t{specificity[2]:.2f}')
    print('')

    macro_specificity = sum(specificity) / 3.0
    print(f'   macro avg\t{macro_specificity:.2f}')

    weighted = [434/835, 152/835, 249/835] 
    weighted_specificity = weighted @ specificity
    print(f'weighted avg\t{weighted_specificity:.2f}')
    print('')

In [34]:
def test(args, loader, model, criterion, optimizer):
    loss_epoch = 0
    accuracy_epoch = 0
    model.eval()
    pred = []
    true = []
    soft = []
    for step, (x, y) in enumerate(loader):
        model.zero_grad()

        x = x.to(args.device)
        y = y.to(args.device)

        outputs = model(x)
        loss = criterion(outputs, y)

        predicted = outputs.argmax(1)

        softmax = torch.nn.Softmax(dim=1)
        s = softmax(outputs).cpu().detach().tolist()
        for i in range(len(s)):
            soft.append(s[i])

        preds = predicted.cpu().numpy()
        labels = y.cpu().numpy()
        preds = np.reshape(preds, (len(preds), 1))
        labels = np.reshape(labels, (len(preds), 1))

        for i in range(len(preds)):
            pred.append(preds[i][0].item())
            true.append(labels[i][0].item())
        
        acc = (predicted == y).sum().item() / y.size(0)
        accuracy_epoch += acc

        loss_epoch += loss.item()

    return loss_epoch, accuracy_epoch, (pred, true, soft)

loss_epoch, accuracy_epoch, result = test(
    args, arr_test_loader, model, criterion, optimizer
)
print(
    f"[FINAL]\t Loss: {loss_epoch / len(arr_test_loader)}\t Accuracy: {accuracy_epoch}"
)

[FINAL]	 Loss: 0.42992962469105367	 Accuracy: 22.739583333333332


In [50]:
import os
import csv

preds, true, soft = result
images_path = test_loader.dataset.samples
# images_path -> [ [images path, label] * 835 ]


print(preds[0], true[0], round(soft[0][0], 4), os.path.basename(images_path[0][0]))


with open("majority.csv", "w") as f:
    wr = csv.writer(f)
    wr.writerow(["file", "prob_0", "prob_1", "prob_2", "pred", "label"])
    for i in range(len(preds)):
        f = os.path.basename(images_path[i][0])
        prob_0 = round(soft[i][0], 6)
        prob_1 = round(soft[i][1], 6)
        prob_2 = round(soft[i][2], 6)
        pred = preds[i]
        label = true[i]
        wr.writerow([f, prob_0, prob_1, prob_2, pred, label])

0 0 0.9963 Covid0_1.png
