In [2]:
import numpy as np  
from matplotlib import pyplot as plt 
from PIL import Image    
import torch

from anomalib.config import get_configurable_parameters
from anomalib.data import get_datamodule
from anomalib.models import get_model
from anomalib.models.components import feature_extractors
import torchvision
from anomalib.models.components.feature_extractors import TorchFXFeatureExtractor
from torchvision.models.densenet import DenseNet201_Weights
import torch.nn.functional as F
from anomalib.models.components.cluster.kmeans import KMeans
import torchvision.models as models

  from .autonotebook import tqdm as notebook_tqdm


In [3]:
MODEL = "patchcore"
CONFIG_similiar_PATH = f"/home/students/tyang/anomalib/src/anomalib/models/{MODEL}/config_similiar.yaml"
with open(file=CONFIG_similiar_PATH, mode="r",encoding="utf-8") as f:
    print(f.read())
    
config_similiar = get_configurable_parameters(config_path=CONFIG_similiar_PATH)



dataset:
  name: airogs
  format: airogs
  path: /home/students/tyang/yolov5results
  task: classification # options: [classification, segmentation]
  category: cat0/crops/od
  pre_selection: False
  number_of_samples: 17999
  train_batch_size: 32
  eval_batch_size: 32
  num_workers: 8
  image_size: 256 # dimensions to which images are resized (mandatory)
  center_crop:  # dimensions to which images are center-cropped after resizing (optional)
  normalization: imagenet # data distribution to which the images will be normalized: [none, imagenet]
  transform_config:
    train: null
    eval: null
  test_split_mode: from_dir # options: [from_dir, synthetic]
  test_split_ratio: 0.1 # fraction of train images held out testing (usage depends on test_split_mode)
  val_split_mode: same_as_test # options: [same_as_test, from_test, synthetic]
  val_split_ratio: 0.1 # fraction of train/test images held out for validation (usage depends on val_split_mode)

  tiling:
    apply: false
    tile_size:

  warn(
  warn(


In [5]:
CONFIG_select_PATH = f"/home/students/tyang/anomalib/src/anomalib/models/{MODEL}/config_select.yaml"
with open(file=CONFIG_select_PATH, mode="r",encoding="utf-8") as f:
    print(f.read())
    
config_select = get_configurable_parameters(config_path=CONFIG_select_PATH)


dataset:
  name: airogs
  format: airogs
  path: /home/students/tyang/yolov5results
  task: classification # options: [classification, segmentation]
  category: cat0/crops/od
  pre_selection: True
  number_of_samples: 9
  train_batch_size: 1
  eval_batch_size: 1
  num_workers: 8
  image_size: 256 # dimensions to which images are resized (mandatory)
  center_crop:  # dimensions to which images are center-cropped after resizing (optional)
  normalization: imagenet # data distribution to which the images will be normalized: [none, imagenet]
  transform_config:
    train: null
    eval: null
  test_split_mode: from_dir # options: [from_dir, synthetic]
  test_split_ratio: 0.001 # fraction of train images held out testing (usage depends on test_split_mode)
  val_split_mode: same_as_test # options: [same_as_test, from_test, synthetic]
  val_split_ratio: 0.001 # fraction of train/test images held out for validation (usage depends on val_split_mode)

  tiling:
    apply: false
    tile_size: nu

  warn(
  warn(


In [6]:

select_data_module = get_datamodule(config=config_select)
select_data_module.prepare_data() # check if the dataset is avaliable
select_data_module.setup()





In [19]:
similiar_data_module = get_datamodule(config=config_similiar)
similiar_data_module.prepare_data() # check if the dataset is avaliable
similiar_data_module.setup()


In [11]:
import torchvision.transforms.functional as TF

invTrans = torchvision.transforms.Compose([ torchvision.transforms.Normalize(mean = [ 0., 0., 0. ],
                                                                            std = [ 1/0.229, 1/0.224, 1/0.225 ]),
                                            torchvision.transforms.Normalize(mean = [ -0.485, -0.456, -0.406 ],
                                                                            std = [ 1., 1., 1. ]),
                                            torchvision.transforms.Resize((640,640)),
                                            
                                            ])


In [20]:
i, train_data = next(enumerate(similiar_data_module.train_dataloader()))


In [21]:

feature_extractor = TorchFXFeatureExtractor(
                    backbone="densenet201",
                    return_nodes=["features.denseblock1.denselayer6.conv2"],
                    weights=DenseNet201_Weights.IMAGENET1K_V1,
                )


  torch.has_cuda,
  torch.has_cudnn,
  torch.has_mps,
  torch.has_mkldnn,


In [22]:
# feature shape is (batch_size, channel, height, width) 
feature = feature_extractor(train_data["image"])
print(feature["features.denseblock1.denselayer6.conv2"].shape)


torch.Size([1000, 32, 64, 64])


In [23]:
print(train_data["label"].shape)

torch.Size([1000])


In [24]:
# extract features from the whole training dataset

feature_list = []

for  i, train_data in enumerate(similiar_data_module.train_dataloader()):
    features = feature_extractor(train_data["image"])["features.denseblock1.denselayer6.conv2"]
    feature_list.append(features)



In [25]:
# convert the list of features to a tensor
global_feature_tensor = torch.vstack(feature_list)   

In [26]:
# global feature shape is (train_data size, channel, height, width)
print(global_feature_tensor.shape)

torch.Size([15658, 32, 64, 64])


In [27]:
from anomalib.models.components.cluster.kmeans import KMeans

def get_kmeans_centers(feature_tensor, n_clusters):
    """
    Args:
        feature_t: feature tensor, shape is (batch_size, channel, height, width)
        n_clusters: number of clusters
        
    Returns:
        cluster_center: shape is (n_clusters, channel)
        kmeans: kmeans model   """
    
    feature_t = feature_tensor.permute(1,0,2,3)
    feature_t = feature_t.flatten(start_dim=1)
    feature_t= feature_t.permute(1,0)


    kmeans = KMeans(n_clusters=n_clusters)
    kmeans.fit(feature_t)
    cluster_center = kmeans.cluster_centers_
   
    return cluster_center, kmeans



In [28]:
clusters_centers,kmeans = get_kmeans_centers(global_feature_tensor, n_clusters=2)

print(clusters_centers.shape)
print(len(clusters_centers))

torch.Size([2, 32])
2


In [29]:
def bag_of_words_statistics(Ptst, Cref, S):
    """
    Params:
    Ptst: Feature tensor of a set of images, tensor of shape (N, C, H, W)
    Cref: reference Cluster centers, tensor of shape (K, C)
    S: number of subregions per image dimension, integer

    Returns:
    bow_stats: list of normalized Bag-of-words statistics, possibility-like , length N, each element is a tensor of shape (S * S, K)
    """
    Ptst = torch.vsplit(Ptst, Ptst.shape[0])
    bow_stats = []
    for Itst in Ptst:
        Itst = Itst.squeeze(0)
        #print(Itst.shape)
        subtensors = torch.chunk(Itst, S, dim=1)
        subtensor = [torch.chunk(st, S, dim=2) for st in subtensors]
        
        
        image_bow_stats = torch.zeros(S * S, len(Cref), dtype=torch.float32)
        for i in range(S):
            for j in range(S):
                st_value = subtensor[i][j]
                st_value = st_value.flatten(start_dim=1)
                st_value = st_value.permute(1,0)
               # print(st_value.shape)
                
                cluster_idx = kmeans.predict(st_value)
                #print(cluster_idx.shape)
                cluster_idx = cluster_idx.float()

                hist = torch.histc(cluster_idx, bins = len(Cref), min = torch.min(cluster_idx), max = torch.max(cluster_idx))
                normalized_hist = hist / torch.sum(hist)
                image_bow_stats[i * S + j] = normalized_hist
        
        bow_stats.append(image_bow_stats)
            
        
    return bow_stats


In [30]:
global_bow_stats = bag_of_words_statistics(global_feature_tensor, clusters_centers, S=4)

In [31]:
for i, train_data in enumerate(select_data_module.train_dataloader()):
    
    print(train_data["image_path"])

['/home/students/tyang/yolov5results/cat0/crops/od/TRAIN000076.jpg']
['/home/students/tyang/yolov5results/cat0/crops/od/TRAIN002718.jpg']
['/home/students/tyang/yolov5results/cat0/crops/od/TRAIN003406.jpg']
['/home/students/tyang/yolov5results/cat0/crops/od/TRAIN003539.jpg']
['/home/students/tyang/yolov5results/cat0/crops/od/TRAIN005134.jpg']
['/home/students/tyang/yolov5results/cat0/crops/od/TRAIN008303.jpg']
['/home/students/tyang/yolov5results/cat0/crops/od/TRAIN010313.jpg']
['/home/students/tyang/yolov5results/cat0/crops/od/TRAIN010730.jpg']
['/home/students/tyang/yolov5results/cat0/crops/od/TRAIN017815.jpg']


In [1]:
import matplotlib.pyplot as plt

plt.figure(figsize=(10,10))

for i, select_data in enumerate(select_data_module.train_dataloader()):
    plt.subplot(3, 3, i+1)
    select_image = select_data["image"]
    conv_img = invTrans(select_image)
    plt.imshow(TF.to_pil_image(conv_img.squeeze()))
    plt.axis('off')
    
    

NameError: name 'select_data_module' is not defined

<Figure size 1000x1000 with 0 Axes>

In [32]:
reference_feature_list = []

for  i, train_data in enumerate(select_data_module.train_dataloader()):
    features = feature_extractor(train_data["image"])["features.denseblock1.denselayer6.conv2"]
    reference_feature_list.append(features)

reference_feature_tensor = torch.vstack(reference_feature_list) 
reference_bow_stats = bag_of_words_statistics(reference_feature_tensor, clusters_centers, S=4)


In [None]:
torch.save(global_bow_stats, "/home/students/tyang/Documents/global_bow_stats.pt")
torch.save(reference_bow_stats, "/home/students/tyang/Documents/reference_bow_stats.pt")

In [None]:
print(len(global_bow_stats))
print(global_bow_stats[0].shape)
print(len(reference_bow_stats))
print(reference_bow_stats[0].shape)

In [33]:
def compute_similiarity(bow_stats, bow_stats_ref, select_ratio, step_size):
    """
    Params:
    bow_stats: list of normalized Bag-of-words statistics, possibility-like , length N, each element is a tensor of shape (S * S, K)
    bow_stats_ref:  normalized Bag-of-words statistics of reference image, 
    possibility-like , length N, each element is a tensor of shape (S * S, K)
    step_size: step size of sliding window, integer, should be the same as batchsize

    Returns:
    similiarity: list of similiarity, length N, each element is a tensor of shape (S * S, S * S)
    """
    stacked_bow_stats = torch.stack(bow_stats) # shape is (N, S * S, K)
    #print(stacked_bow_stats.shape)
    bow_stats_ref = bow_stats_ref.unsqueeze(0)  # shape is (1, S * S, K)
   # print(bow_stats_ref.shape)

    stacked_bow_stats[stacked_bow_stats == 0 ] = 1e-10
    bow_stats_ref[bow_stats_ref == 0 ] = 1e-10

    selected_idxs =[]
    selected_distances = []
    for i in range(0, stacked_bow_stats.shape[0], step_size):
        current_bow = stacked_bow_stats[i:i+step_size if i+step_size < stacked_bow_stats.shape[0] else stacked_bow_stats.shape[0], :, :]
        expended_bow_stats_ref = bow_stats_ref.expand(current_bow.shape[0], bow_stats_ref.shape[1], bow_stats_ref.shape[2])

        kl_divegence = F.kl_div(current_bow.log(), expended_bow_stats_ref, reduction="none")
       
        kl_total = torch.sum(kl_divegence, dim=2)
        distance = torch.sum(kl_total, dim=1)
        topk_near_distances,near_idx = torch.topk(distance, k=int(len(distance) * select_ratio), dim=0, largest=False, sorted=True)
        
        selected_idxs.append(near_idx)
        
        selected_distances.append(topk_near_distances)

    return selected_idxs, selected_distances

        
       

    

    

In [None]:
ref0_idxs,distances = compute_similiarity(global_bow_stats, reference_bow_stats[0], select_ratio=0.05, step_size=1000)

In [35]:
def second_selection(index, distance, select_ratio):
    """
    Params:
    idxs: list of selected image indices in each batch , length number of batches
    distances: list of selected image distances, length number of batches
    select_ratio: ratio of selected images compare to all images, float

    Returns:
    selected_idx: list of selected image indices with biggest distances over all in each batches, length number of batches
    """
    selected_distances = [[] for _ in range(len(distance))]
    distance_t = torch.cat(distance)
    topk, global_ids = torch.topk(distance_t, k=int(len(distance_t) * select_ratio ), dim=0, largest=False, sorted=True)
    selected_idxs = [[] for _ in range(len(index))]
    
    for global_id in global_ids:
        batch_id = global_id // index[0].shape[0]
        local_id = global_id % index[0].shape[0]
        selected_idxs[batch_id].append(index[batch_id][local_id])
        selected_distances[batch_id].append(distance_t[global_id])
        
  
    
    return selected_idxs, selected_distances

In [None]:
second_indices,second_dists = second_selection(ref0_idxs, distances, select_ratio=0.9)

In [None]:
print(len(second_indices))
print(len(second_dists))
print(second_indices[0])
print(len(second_indices[0]))
print(len(second_indices[1]))

In [None]:
merged_datas = {"image_path": [], "label": []}
for  i, train_data in enumerate(similiar_data_module.train_dataloader()):
    selected_data = ( {"image_path": train_data["image_path"][second_indice], "label": train_data["label"][second_indice]} for second_indice in second_indices[i] )
    
    
    for data in selected_data:
        for key, values in data.items():
            merged_datas[key].append(values)
    

print(merged_datas["image_path"])

In [None]:
print(len(merged_datas["image_path"]))

In [None]:
import csv 

csv_path = "/home/students/tyang/Documents/similiar_csv_files/category0_ref0.csv"


with open(csv_path, mode="w", newline="") as csv_file:
        fieldnames = ["image_path", "label"]
        writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
        for i in range(len(merged_datas["image_path"])):
            rowdict = {"image_path": merged_datas["image_path"][i], "label": merged_datas["label"][i]}
            writer.writerow(rowdict)

In [None]:
ref1_idxs,distances = compute_similiarity(global_bow_stats, reference_bow_stats[1], select_ratio=0.05, step_size=1000)
second_indices,second_dists = second_selection(ref1_idxs, distances, select_ratio=0.9)

merged_datas = {"image_path": [], "label": []}
for  i, train_data in enumerate(similiar_data_module.train_dataloader()):
    selected_data = ( {"image_path": train_data["image_path"][second_indice], "label": train_data["label"][second_indice]} for second_indice in second_indices[i] )
    
    
    for data in selected_data:
        for key, values in data.items():
            merged_datas[key].append(values)
    

import csv 

csv_path = "/home/students/tyang/Documents/similiar_csv_files/category0_ref1.csv"


with open(csv_path, mode="w", newline="") as csv_file:
        fieldnames = ["image_path", "label"]
        writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
        for i in range(len(merged_datas["image_path"])):
            rowdict = {"image_path": merged_datas["image_path"][i], "label": merged_datas["label"][i]}
            writer.writerow(rowdict)



In [None]:
ref2_idxs,distances = compute_similiarity(global_bow_stats, reference_bow_stats[2], select_ratio=0.05, step_size=1000)
second_indices,second_dists = second_selection(ref2_idxs, distances, select_ratio=0.9)

merged_datas = {"image_path": [], "label": []}
for  i, train_data in enumerate(similiar_data_module.train_dataloader()):
    selected_data = ( {"image_path": train_data["image_path"][second_indice], "label": train_data["label"][second_indice]} for second_indice in second_indices[i] )
    
    
    for data in selected_data:
        for key, values in data.items():
            merged_datas[key].append(values)
    


csv_path = "/home/students/tyang/Documents/similiar_csv_files/category0_ref2.csv"


with open(csv_path, mode="w", newline="") as csv_file:
        fieldnames = ["image_path", "label"]
        writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
        for i in range(len(merged_datas["image_path"])):
            rowdict = {"image_path": merged_datas["image_path"][i], "label": merged_datas["label"][i]}
            writer.writerow(rowdict)

In [None]:
import csv
ref3_idxs,distances = compute_similiarity(global_bow_stats, reference_bow_stats[3], select_ratio=0.05, step_size=1000)
second_indices,second_dists = second_selection(ref3_idxs, distances, select_ratio=0.9)

merged_datas = {"image_path": [], "label": []}
for  i, train_data in enumerate(similiar_data_module.train_dataloader()):
    selected_data = ( {"image_path": train_data["image_path"][second_indice], "label": train_data["label"][second_indice]} for second_indice in second_indices[i] )
    
    
    for data in selected_data:
        for key, values in data.items():
            merged_datas[key].append(values)
    


csv_path = "/home/students/tyang/Documents/similiar_csv_files/category0_ref3.csv"


with open(csv_path, mode="w", newline="") as csv_file:
        fieldnames = ["image_path", "label"]
        writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
        for i in range(len(merged_datas["image_path"])):
            rowdict = {"image_path": merged_datas["image_path"][i], "label": merged_datas["label"][i]}
            writer.writerow(rowdict)

In [None]:
ref4_idxs,distances = compute_similiarity(global_bow_stats, reference_bow_stats[4], select_ratio=0.05, step_size=1000)
second_indices,second_dists = second_selection(ref4_idxs, distances, select_ratio=0.9)

merged_datas = {"image_path": [], "label": []}
for  i, train_data in enumerate(similiar_data_module.train_dataloader()):
    selected_data = ( {"image_path": train_data["image_path"][second_indice], "label": train_data["label"][second_indice]} for second_indice in second_indices[i] )
    
    
    for data in selected_data:
        for key, values in data.items():
            merged_datas[key].append(values)
    


csv_path = "/home/students/tyang/Documents/similiar_csv_files/category0_ref4.csv"


with open(csv_path, mode="w", newline="") as csv_file:
        fieldnames = ["image_path", "label"]
        writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
        for i in range(len(merged_datas["image_path"])):
            rowdict = {"image_path": merged_datas["image_path"][i], "label": merged_datas["label"][i]}
            writer.writerow(rowdict)

In [None]:
ref5_idxs,distances = compute_similiarity(global_bow_stats, reference_bow_stats[5], select_ratio=0.05, step_size=1000)
second_indices,second_dists = second_selection(ref5_idxs, distances, select_ratio=0.9)

merged_datas = {"image_path": [], "label": []}
for  i, train_data in enumerate(similiar_data_module.train_dataloader()):
    selected_data = ( {"image_path": train_data["image_path"][second_indice], "label": train_data["label"][second_indice]} for second_indice in second_indices[i] )
    
    
    for data in selected_data:
        for key, values in data.items():
            merged_datas[key].append(values)
    


csv_path = "/home/students/tyang/Documents/similiar_csv_files/category0_ref5.csv"


with open(csv_path, mode="w", newline="") as csv_file:
        fieldnames = ["image_path", "label"]
        writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
        for i in range(len(merged_datas["image_path"])):
            rowdict = {"image_path": merged_datas["image_path"][i], "label": merged_datas["label"][i]}
            writer.writerow(rowdict)

In [None]:
ref6_idxs,distances = compute_similiarity(global_bow_stats, reference_bow_stats[6], select_ratio=0.05, step_size=1000)
second_indices,second_dists = second_selection(ref6_idxs, distances, select_ratio=0.9)

merged_datas = {"image_path": [], "label": []}
for  i, train_data in enumerate(similiar_data_module.train_dataloader()):
    selected_data = ( {"image_path": train_data["image_path"][second_indice], "label": train_data["label"][second_indice]} for second_indice in second_indices[i] )
    
    
    for data in selected_data:
        for key, values in data.items():
            merged_datas[key].append(values)
    


csv_path = "/home/students/tyang/Documents/similiar_csv_files/category0_ref6.csv"


with open(csv_path, mode="w", newline="") as csv_file:
        fieldnames = ["image_path", "label"]
        writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
        for i in range(len(merged_datas["image_path"])):
            rowdict = {"image_path": merged_datas["image_path"][i], "label": merged_datas["label"][i]}
            writer.writerow(rowdict)

In [None]:
ref7_idxs,distances = compute_similiarity(global_bow_stats, reference_bow_stats[7], select_ratio=0.05, step_size=1000)
second_indices,second_dists = second_selection(ref7_idxs, distances, select_ratio=0.9)

merged_datas = {"image_path": [], "label": []}
for  i, train_data in enumerate(similiar_data_module.train_dataloader()):
    selected_data = ( {"image_path": train_data["image_path"][second_indice], "label": train_data["label"][second_indice]} for second_indice in second_indices[i] )
    
    
    for data in selected_data:
        for key, values in data.items():
            merged_datas[key].append(values)
    


csv_path = "/home/students/tyang/Documents/similiar_csv_files/category0_ref7.csv"


with open(csv_path, mode="w", newline="") as csv_file:
        fieldnames = ["image_path", "label"]
        writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
        for i in range(len(merged_datas["image_path"])):
            rowdict = {"image_path": merged_datas["image_path"][i], "label": merged_datas["label"][i]}
            writer.writerow(rowdict)

In [37]:
import csv

ref8_idxs,distances = compute_similiarity(global_bow_stats, reference_bow_stats[8], select_ratio=0.05, step_size=1000)
second_indices,second_dists = second_selection(ref8_idxs, distances, select_ratio=0.9)

merged_datas = {"image_path": [], "label": []}
for  i, train_data in enumerate(similiar_data_module.train_dataloader()):
    selected_data = ( {"image_path": train_data["image_path"][second_indice], "label": train_data["label"][second_indice]} for second_indice in second_indices[i] )
    
    
    for data in selected_data:
        for key, values in data.items():
            merged_datas[key].append(values)
    


csv_path = "/home/students/tyang/Documents/similiar_csv_files/category0_ref8.csv"


with open(csv_path, mode="w", newline="") as csv_file:
        fieldnames = ["image_path", "label"]
        writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
        for i in range(len(merged_datas["image_path"])):
            rowdict = {"image_path": merged_datas["image_path"][i], "label": merged_datas["label"][i]}
            writer.writerow(rowdict)

In [None]:
import pickle

with open("/home/students/tyang/Documents/kmeans_category_0_cpr.pkl", "wb") as f:
    pickle.dump(kmeans, f)

In [None]:
torch.save(feature_extractor, "/home/students/tyang/Documents/feature_extractor_category_0_cpr.pt")
torch.save(clusters_centers, "/home/students/tyang/Documents/cluster_centers_category_0_cpr.pt")


In [None]:
torch.save(kmeans, "/home/students/tyang/Documents/kmeans_category_0_cpr.pt")