### 1. generate conformal sets (representations) 

In [2]:
import utils
import models

import torch
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd

from sklearn.metrics.pairwise import cosine_similarity



In [3]:
# class FeatureExtractor(torch.nn.Module):
#     """Get representation of intermediary layer.
#     base_model (<-- we extract here), fc, sigmoid
#     """
#     def __init__(self, model, layer='4'): # layer is 0..8
#         super().__init__()
#         self.model = model
#         self.layer = layer
    
#     def forward(self, x):
#         for name, module in self.model.base_model.named_children():
#             print(name)
#             x = module(x)
#             if name == self.layer:
#                 return x
#         return x

In [4]:
# # layer names
# for x,y in loaded_model.base_model.named_children():
#     print(x)
#     print(y)

In [7]:
# import os; print(os.getcwd())
dir_in = '../'
pos_image_path = 'combined_images_parasite'
neg_image_path = 'combined_images_neg'
model_path = 'model_perf_r34_b32'

if torch.cuda.is_available():
    loaded_model = torch.load(dir_in + model_path + '.pt')
else:
    loaded_model = torch.load(dir_in + model_path + '.pt',map_location=torch.device('cpu'))

loaded_model.eval()

ResNet(
  (base_model): Sequential(
    (0): Conv2d(4, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
    (3): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (4): Sequential(
      (0): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (1): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=Tru

In [8]:
# load images
pos_images = np.load(dir_in + 'data/' + pos_image_path + '.npy')
neg_images = np.load(dir_in + 'data/' + neg_image_path + '.npy')

pos_images.shape[0], neg_images.shape[0]

(76126, 744187)

In [49]:
from sklearn.metrics import fbeta_score
import random

def create_conformal_set(model, data, labels, num_subsets=100, subset_size=1000):
    """
    Outputs combined conformal set with pos and neg images.
    """
    best_f2 = -99999
    best_set = 0

    for _ in range(num_subsets):
        subset_indices = random.sample(range(len(pos_images)), subset_size)
        subset_indices += random.sample(range(len(pos_images), len(neg_images)), subset_size)
        
        subset = data[subset_indices]
        y = labels[subset_indices]
        outputs, features = utils.generate_predictions_and_features(model,subset,32)

        threshold = 0.85
        #0 for negative, 1 for positive, and 2 for unsure
        pred = (outputs[:,1] >= threshold).astype(int)
        # print(outputs.shape)
        # print(pred.shape)
        
        # if labels == 1:
        #     y = np.ones(len(pred), dtype='float64')
        # if labels == 0:
        #     y = np.zeros(len(pred), dtype='float64')

        # Calculate F2 score
        f2_score = fbeta_score(y, pred, beta=2)
        print(f2_score)

        if f2_score > best_f2:
            best_f2 = f2_score
            best_set = (subset, features, y)

    return best_set, best_f2

In [50]:
num_subsets = 100 # number of subsets to try
subset_size = 500 # *2 = how many samples in conformal set

data = np.concatenate([pos_images, neg_images])
labels = np.concatenate([np.ones(len(pos_images), dtype='float64'), np.zeros(len(neg_images), dtype='float64')])
best_set, f2_score = create_conformal_set(loaded_model, data, labels, num_subsets, subset_size)

0.8315964985410588
0.8246346555323592
0.8315964985410588
0.8176495190296947
0.8000839983200335
0.8263772954924875
0.7841483979763912
0.8000839983200335
0.8088851634534786
0.7734573119188504
0.8106409719312946
0.7983193277310925
0.8333333333333334
0.8088851634534786
0.7894736842105263
0.8018471872376155
0.8036088963491398
0.8402662229617305
0.8211450062682826
0.8193979933110368
0.8071278825995808
0.7947855340622372
0.7965531736023539
0.8123953098827471
0.7912457912457912
0.7965531736023539
0.8176495190296947
0.8141481791544579
0.8263772954924875
0.8141481791544579
0.8211450062682826
0.8106409719312946
0.7965531736023539
0.8018471872376155
0.8350687213660974
0.8018471872376155
0.8018471872376155
0.8193979933110368
0.8071278825995808
0.8053691275167786
0.7894736842105263
0.8281184814351272
0.8071278825995808
0.8368026644462948
0.8123953098827471
0.8193979933110368
0.8123953098827471
0.8000839983200335
0.8123953098827471
0.850622406639004
0.8141481791544579
0.8000839983200335
0.80360889634

In [51]:
subset, h_train, y = best_set
t_pos = subset[y == 1]
t_neg = subset[y == 0]
h_pos = h_train[y == 1]
h_neg = h_train[y == 0]

In [52]:
# save images for conformal set
np.save('conformal_set_pos.npy', t_pos)
np.save('conformal_set_neg.npy', t_neg)

### 2. noncoformal prediction

In [53]:
epsilon = 0.05 # 95% confidence new test example is not non-conformant

In [54]:
def get_repr(samples, batch_size=1):
    if len(samples.shape) == 3: # single image
        samples = np.expand_dims(samples, axis=0)
    _, h_sample = utils.generate_predictions_and_features(loaded_model,samples,batch_size)
    return h_sample

In [55]:
# https://www.nature.com/articles/s41746-021-00504-6

def nonconformity_measure(hs):
    """
    Output: vector of nonconformity scores for each repr
    greater values (i.e. less negative) mean greater non-conformity/ood
    """
    similarities = cosine_similarity(hs) # nxn matrix
    scores = -np.sum(similarities - np.eye(similarities.shape[0]), axis=1)  # subtract cosine similarity of sample with itself
    # print(scores)
    return scores

def single_p_value(h_train, h_sample):
    """Calculates how different sample is compared to training distribution
    Output: p_value
    If p<epsilon, then it is o.o.d
    """
    hs = np.concatenate([h_train, h_sample], axis=0)
    scores = nonconformity_measure(hs)
    sample_score = scores[-1]
    p_value = np.mean(scores[:-1] >= sample_score)
    return p_value

In [56]:
# sample to test
h_sample = get_repr(pos_images[10])

pos_p_value = single_p_value(h_pos, h_sample)
neg_p_value = single_p_value(h_neg, h_sample)

# makes sense since the pos sample is in dist for pos!
print(pos_p_value, neg_p_value)
print(pos_p_value>epsilon, neg_p_value>epsilon)

0.346 0.0
True False


check if test slides are o.o.d using conformal pred

In [57]:
test_image_path = 'PAT-109-2_2023-06-03_23-38-20.797186'
test_set = np.load(dir_in + 'data/' + test_image_path + '.npy')
test_set.shape

(71786, 4, 31, 31)

In [58]:
def conformal_test(h_pos, h_neg, test_set, num_samples):
    """
    Classified as o.o.d if in neither of pos or neg distibutions
    output: array of o.o.d images
    """

    h_test_set = get_repr(test_set, 32)
    ood_set = []

    for i in range(num_samples):
        image = test_set[i]
        h_sample = np.expand_dims(h_test_set[i], axis=0)
        pos_p_value = single_p_value(h_pos, h_sample)
        neg_p_value = single_p_value(h_neg, h_sample)

        if pos_p_value < epsilon and neg_p_value < epsilon:
            ood_set.append(image)

    return ood_set

In [59]:
ood_set = conformal_test(h_pos, h_neg, test_set, 1000)

In [60]:
np.save('ood_images.npy', ood_set)
len(ood_set)

177

In [17]:
# TODO: can try to precompute pos and neg calibration scores (still need to include H_M+1, the test sample, in the calib predictions...)

# def conformal_test(h_pos, h_neg, test_set):
#     """
#     Classified as o.o.d if in neither of pos or neg distibutions
#     output: array of o.o.d images
#     """

#     pos_calib_scores = nonconformity_measure(h_pos)
#     neg_calib_scores = nonconformity_measure(h_neg)
#     ood_set = []

#     for image in test_set:
#         h_sample = get_repr(image)

#         # p(h|pos)
#         similarity = cosine_similarity(h_pos, h_sample)   # (2000) vector comparing sample to conformal set
#         sample_score = -np.sum(similarity, axis=1)
#         print(sample_score)
#         p_value = np.mean(pos_calib_scores >= sample_score)
#         print(p_value)
#         if p_value >= epsilon:
#             break
        
#         # p(h|neg)
#         similarity = cosine_similarity(h_neg, h_sample)   # (2000) vector comparing sample to conformal set
#         sample_score = -np.sum(similarity, axis=1)
#         p_value = np.mean(neg_calib_scores >= sample_score)
#         print(p_value)
        
#         if p_value < epsilon:
#             ood_set.append(image)
    
#     return ood_set

In [61]:
# sanity check: do this for the positive samples, flagged ood should be very low
ood_test_set = conformal_test(h_pos, h_neg, pos_images, 1000)

In [62]:
len(ood_test_set)

75