In [None]:
#import necessary modules:
import os
import numpy as np
import pandas as pd
from glob import glob
from PIL import Image
from natsort import natsorted
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision.transforms import transforms
from torch.utils.data import Dataset, DataLoader
from torch.cuda.amp import GradScaler, autocast
from torch.optim import lr_scheduler
import timm
import cv2
import matplotlib
import matplotlib.pyplot as plt
matplotlib.style.use('ggplot')
import albumentations as A
from albumentations.pytorch import ToTensorV2
from tqdm import tqdm
import random
import time
import gc
from sklearn.model_selection import StratifiedKFold
import copy
from collections import defaultdict
from sklearn.metrics import f1_score

In [None]:
class TestDataSet(Dataset):
    #initialize transforms, the df, and directory for image and id
    def __init__(self, df, transforms = None):
        self.df = df
        self.imagepaths = df["imagepath"].tolist()
        self.transforms = transforms

    def __len__(self): # upperbound of idx (index), so how many images we have
        return len(self.df) # length of df is how many images we have

    def __getitem__(self,idx): # index = each row of df
        # read image:
        image_path = self.imagepaths[idx]
        img = cv2.imread(image_path, cv2.COLOR_BGR2RGB)
        img = Image.fromarray(img)
        if self.transforms is not None:
            image = self.transforms(img)
        return torch.tensor(image)

In [None]:
# all model configs go here so that they can be changed when we want to:
# Make sure this model_config is same as the train model_config when running inference:
class model_config:
    seed = 42
    model_name = "efficientnetv2_l"
    train_batch_size = 16
    valid_batch_size = 32
    epochs = 5
    learning_rate = 0.001
    scheduler = "CosineAnnealingLR"
    T_max = int(30000/train_batch_size*epochs) # for cosineannealingLR, explore different values
    weight_decay = 1e-6 # explore different weight decay (Adam optimizer)
    n_accumulate = 1
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    iters_to_accumulate = max(1,32//train_batch_size) # for scaling accumulated gradients
    eta_min = 1e-5
    model_save_directory = os.path.join(os.getcwd(),"model") #assuming os.getcwd is the wsi_analysis directory

In [None]:
# sets the seed of the entire notebook so results are the same every time we run for reproducibility. no randomness, everything is controlled.
def set_seed(seed = 42):
    np.random.seed(seed)
    random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    # when running on the CuDNN backend, two further options must be set
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    # set a fixed value for the hash seed
    os.environ['PYTHONHASHSEED'] = str(seed)

set_seed(model_config.seed)

In [None]:
def return_f1_score(y_true,y_pred):
    f_one_score = f1_score(y_true,y_pred)
    return f_one_score

In [None]:
def loss_func(y_pred,y_true):
    loss = nn.BCEWithLogitsLoss()
    return loss(y_pred,y_true)

In [None]:
def build_model():
    model = timm.create_model(model_config.model_name,pretrained=False)
    num_features = model.classifier.in_features
    model.classifier = nn.Linear(num_features,1) #in_features = 1280, out_features = 1, so that 0 or 1 binary classification
    # model.add_module('sigmoid', nn.Sigmoid()) # obtain probability b/w 0 and 1
    model.to(model_config.device) # model to gpu
    return model

In [None]:
val_transform = transforms.Compose([
 # validate at 1024 x 1024, you want to use val dataset to real world application, but maybe resize to 384 if performance is bad.
    #transforms.Resize(384),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.2966, 0.3003, 0.3049], std=[0.4215, 0.4267, 0.4332]) #calculated above mean & std
])

In [None]:
@torch.no_grad()
def infer(model_paths, test_loader, thr= 0.5):
    pred_classes = []
    labels = []
    images = []

    for idx, image in enumerate(tqdm(test_loader, total=len(test_loader), desc='Inference')):
        image = image.to(model_config.device, dtype=torch.float) # .squeeze(0)
        label = []
        for path in model_paths:
            model = build_model() # get the backbone architecture
            model.load_state_dict(torch.load(path)) # get the weights saved from training
            model.eval() # set model in eval mode, not train
            output = model(image)
            output = nn.Sigmoid()(output) # probabilities
            label += output / len(model_paths) # prediction label is ensembled by using all of the models in the directory. Try to compare this and only using the one or two best models.
        label = (label>thr).to(torch.uint8).cpu().detach().numpy() # thresholded, >0.5 is 1 and <0.5 is 0
        pred_classes.extend(label) # pred_classes is therefore a numpy array of 0's and 1's as a prediction class.

    pred_df = pd.DataFrame({
    "predicted":pred_classes})
    return pred_df, images, labels

In [1]:
# open test_df:
test_df_src = r"\\fatherserverdw\Kevin\unstained_blank_classifier\test_df.xlsx"
test_df = pd.read_excel(test_df_src)

In [None]:
test_dataset = TestDataSet(test_df, transforms = val_transform)
test_loader  = DataLoader(test_dataset, batch_size=model_config.valid_batch_size,
                          num_workers=0)
saved_model_path = model_config.model_save_directory
model_paths  = glob(f'{saved_model_path}/best_epoch*.bin')
pred_df, images, labels = infer(model_paths, test_loader)