In [2]:
import gc
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim import lr_scheduler
from torch.utils import data
import numpy as np
import torchvision
from  numpy import exp, absolute, asarray
from torchvision import datasets, transforms
import matplotlib.pyplot as plt
import time, os, json, random, gc, copy, math, multiprocessing
from sklearn import svm
import sklearn.model_selection as model_selection
from sklearn.metrics import accuracy_score,f1_score,precision_score ,recall_score 
from PIL import Image
from torch.utils.data import TensorDataset, DataLoader
from pathlib import Path

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

cuda:0


In [5]:
data_path = str(Path().resolve().parent.parent.parent) + "/Datasets/Validation Datasets/DIP/"

In [28]:
SIZE = 128
STRIDE = 64
img_path = data_path
HO_P_list = os.listdir(img_path + "positiveLR/")

In [39]:
HO_dict = {}
HO_data = []
HO_label = []
orig_y_HO = []

In [40]:
def sliding_window(image, stride, imgSize):
    height, width, _ = image.shape
    img = []
    a1 = list(range(0, height-imgSize+stride, stride))
    a2 = list(range(0, width-imgSize+stride, stride))
    if (a1[-1]+imgSize != height):
        a1[-1] = height-imgSize
    if (a2[-1]+imgSize != width):
        a2[-1] = width-imgSize
    for y in a1:
        for x in a2:
            im1 = image[y:y+imgSize, x:x+imgSize, :]
            img.append(np.array(im1))
    return img

In [41]:
def handle_one_image(path, sType):
    image_data = []
    im = Image.open(path)
    im = im.convert('RGB')
    im = np.array(im)
    if (im.shape[0] >= SIZE and im.shape[1] >= SIZE):
        img = sliding_window(im, STRIDE, SIZE)
        for i in range(len(img)):
            if(img[i].shape[2] >=3):
                image_data.append(img[i])
        return image_data, sType
    else:
        # indicate that no images are available
        return [], sType

In [42]:
def sub_parallel_HO():
    print("Applying sliding windows to Held out data")
    global HO_data
    global HO_label
    global HO_dict
    global orig_y_HO
    global image
    # Create a list of tuples consisting of the file path, and the class
    # dictionary info for each of the cl arguments
    args = []
    for img_file in HO_P_list:
        path = img_path +"/positiveLR/"+img_file
        im = Image.open(path)
        im = im.convert('RGB')
        im = np.array(im)
        if (im.shape[0] >= SIZE and im.shape[1] >= SIZE):
            args.append((path, 1))
            orig_y_HO.append(1)
            
    num_workers = multiprocessing.cpu_count()  

    with multiprocessing.Pool(processes = num_workers) as pool:   # or however many processes
        subimage_counter = 0
        origimage_counter = 0
        # Use multiprocessing to call handle_on_image(pathname, info)
        # and return the results in order
        for images, info in pool.starmap(handle_one_image, args):
            image_list = []
            # Images is a list of returned images.  info is the class_dictionary info that we passed
            for image in images:
                image_list.append(subimage_counter)
                subimage_counter += 1
                HO_data.append(image)
                HO_label.append(info)
            HO_dict[origimage_counter] = image_list
            origimage_counter += 1
    print("Sliding window process finished for HO data")

In [43]:
sub_parallel_HO()

Applying sliding windows to Held out data
Sliding window process finished for HO data


In [44]:
HO_data = np.array(HO_data)
HO_label = np.array(HO_label)

In [45]:
print(HO_data.shape)

(47205, 128, 128, 3)


In [46]:
HO_data = np.moveaxis(HO_data, 3, 1)
print(HO_data.shape)

(47205, 3, 128, 128)


In [59]:
model = torch.load(str(Path().resolve().parent.parent) + "/Benchmark/PanEtAl/" + "finalmodel-full.pth")

In [25]:
tensor_x = torch.Tensor(HO_data) # transform to torch tensor
tensor_y = torch.Tensor(HO_label)

my_dataset = TensorDataset(tensor_x,tensor_y) # create your datset
holoader = DataLoader(my_dataset, batch_size=32)

In [34]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=1e-4, momentum=0.9)

In [36]:
ho_loss = 0
ho_accuracy = 0
res = []
orig_labels = []
model.eval()
with torch.no_grad():
    for inputs, labels in holoader:
        inputs = inputs.to(device)
        labels = labels.type(torch.LongTensor) # <---- Here (casting)
        labels = labels.to(device)
        logps = model.forward(inputs)
        batch_loss = criterion(logps, labels)
        ho_loss += batch_loss.item()

        ps = torch.exp(logps)
        top_p, top_class = ps.topk(1, dim=1)
        equals = top_class == labels.view(*top_class.shape)
        res.extend(top_class)
        orig_labels.extend(labels)
        ho_accuracy += torch.mean(equals.type(torch.FloatTensor)).item()                
print(f"Test loss: {ho_loss/len(holoader):.3f}.. "
      f"Test accuracy: {ho_accuracy/len(holoader):.3f}")

Test loss: 0.000.. Test accuracy: 1.000


In [53]:
final_pred_float_thresh = []
for img, subimage in HO_dict.items():
    if isinstance(subimage, int):
        final_pred_float_thresh.append(1 if subimage>=0.5 else 0)
    else:
        mean_ax1 = np.mean([res[x - 1].item() for x in subimage])
        final_pred_float_thresh.append(1 if mean_ax1>=0.5 else 0)

In [58]:
correct = (np.array(final_pred_float_thresh) == np.array(orig_y_HO))
accuracy = correct.sum() / correct.size
print(accuracy)

1.0
