In [1]:
import numpy as np
import os
import torch
from arguments import parser
from torch.utils.data import DataLoader
from datasets import create_dataset
from accelerate import Accelerator
import matplotlib.pyplot as plt 
from utils.utils import img_show, img_cvt


import random 
def torch_seed(random_seed):
    torch.manual_seed(random_seed)
    torch.cuda.manual_seed(random_seed)
    torch.cuda.manual_seed_all(random_seed) # if use multi-GPU 
    # CUDA randomness
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False
    
    np.random.seed(random_seed)
    random.seed(random_seed)
    os.environ['PYTHONHASHSEED'] = str(random_seed)
    
torch_seed(42)

torch.autograd.set_detect_anomaly(True)

default_setting = './configs/benchmark/rd_cifar10.yaml'
cfg = parser(jupyter=True, default_setting = default_setting)


accelerator = Accelerator(
    mixed_precision             = cfg.TRAIN.mixed_precision
)

# load dataset
trainset, testset = create_dataset(
    dataset_name  = cfg.DATASET.dataset_name,
    datadir       = cfg.DATASET.datadir,
    class_name    = cfg.DATASET.class_name,
    img_size      = cfg.DATASET.img_size,
    mean          = cfg.DATASET.mean,
    std           = cfg.DATASET.std,
    aug_info      = cfg.DATASET.aug_info,
    **cfg.DATASET.get('params',{})
)

# make save directory
savedir = os.path.join(
                            cfg.DEFAULT.savedir,
                            cfg.DATASET.dataset_name,
                            cfg.DATASET.class_name
                        )

exp_name         = cfg.DEFAULT.exp_name
method            = cfg.MODEL.method
backbone          = cfg.MODEL.backbone
model_params      = cfg.MODEL.get('params',{})

batch_size       = cfg.DATASET.batch_size
test_batch_size  = cfg.DATASET.test_batch_size
num_workers      = cfg.DATASET.num_workers

opt_name         = cfg.OPTIMIZER.opt_name
lr               = cfg.OPTIMIZER.lr
opt_params       = cfg.OPTIMIZER.get('params',{})

epochs           = cfg.TRAIN.epochs
log_interval     = cfg.TRAIN.log_interval
use_wandb        = cfg.TRAIN.wandb.use

savedir          = savedir
seed             = cfg.DEFAULT.seed
accelerator      = accelerator
cfg              = cfg

# # define train dataloader
trainloader = DataLoader(
    dataset     = trainset,
    batch_size  = batch_size,
    num_workers = num_workers,
    shuffle     = True 
)

# define test dataloader
testloader = DataLoader(
    dataset     = testset,
    batch_size  = test_batch_size,
    shuffle     = False,
    num_workers = num_workers
)

model = __import__('models').__dict__[method](
        backbone = backbone,
        **model_params
        )

# #optimizer = __import__('torch.optim', fromlist='optim').__dict__[opt_name](model.parameters(), lr=lr, **opt_params)
# model.load_state_dict(torch.load('results/MVTecAD/ReverseDistillation/bottle/imagenet_stats-anomaly_ratio_0/seed_42/model_best.pt'))

  from .autonotebook import tqdm as notebook_tqdm


Files already downloaded and verified
Files already downloaded and verified


FeatureExtractor is deprecated. Use TimmFeatureExtractor instead. Both FeatureExtractor and TimmFeatureExtractor will be removed in a future release.


In [2]:
cos_loss = torch.nn.CosineSimilarity()

imgs,labels,gts = next(iter(trainloader))

output = model(imgs)
e_output, d_output = output

e1,e2,e3 = e_output 
d1,d2,d3 = d_output

loss_sum = 0 
for e,d in zip([e1,e2,e3],[d1,d2,d3]):
    loss_sum += torch.mean(
                    1
                    - cos_loss(
                        e.view(e.shape[0], -1),
                        d.view(d.shape[0], -1),
                    )
                )
grad = torch.autograd.grad(loss_sum, (d1,d2,d3))

In [3]:
import torch.nn.functional as F 
emb = [] 
for i,g in enumerate(grad):
    g = g.mean(1)
    g_norm = torch.norm(g,dim=(1,2))
    emb.append(g_norm.unsqueeze(1))
embs = torch.hstack(emb)
    

In [22]:
ind = torch.argmax(torch.norm(embs, 2,1)).item()
mu = [embs[ind]]

D2 = torch.cdist(mu[-1].view(1,-1), embs, 2)[0].cpu().numpy()

D2 = D2.ravel().astype(float)
Ddist = (D2 ** 2)/ sum(D2 ** 2)

In [43]:
indsAll = init_centers(X = embs, K = 1)

In [42]:
indsAll

[10, 8, 6, 11, 7, 13, 14, 4, 9, 5]

In [33]:
from scipy import stats

@torch.no_grad()
def init_centers(X: torch.Tensor, K: int = None):    
    '''
    K-MEANS++ seeding algorithm 
    - 초기 센터는 gradient embedding의 norm이 가장 큰 것으로 사용 
    - 그 이후 부터는 앞서 선택 된 center와의 거리를 계산, 이와 비례하는 확률로 이산 확률 분포를 만듬
    - 이 이산 확률 분포에서 새로운 센터를 선택 
    - 이렇게 뽑힌 센터들은 k-means 목적함수의 기대값에 근사되는 것이 증명 됨, 따라서 다양성을 확보할 수 있음 (Arthur and Vassilvitskii, 2007)
    '''    
    # K-means ++ initializing
    embs = torch.Tensor(X)
    ind = torch.argmax(torch.norm(embs, 2, 1)).item()
    embs = embs.cuda()
    
    mu = [embs[ind]]
    indsAll = [ind]
    centInds = [0.] * len(embs)
    cent = 0
    
    # Sampling 
    while len(mu) < K:
        if len(mu) == 1:
            D2 = torch.cdist(mu[-1].view(1,-1), embs, 2)[0].cpu().numpy() # Calculate l2 Distance btw mu and embs 
        else:
            newD = torch.cdist(mu[-1].view(1,-1), embs, 2)[0].cpu().numpy() # Calculate l2 Distance btw mu and embs 
            for i in range(len(embs)):
                if D2[i] >  newD[i]:
                    centInds[i] = cent
                    D2[i] = newD[i]            
        D2 = D2.ravel().astype(float)
        
        Ddist = (D2 ** 2)/ sum(D2 ** 2) 
        customDist = stats.rv_discrete(name='custm', values=(np.arange(len(D2)), Ddist)) # 이산 확률 분포 구축 
        ind = customDist.rvs(size=1)[0] # 이산 확률 분포에서 하나 추출 
        
        while ind in indsAll: ind = customDist.rvs(size=1)[0] # repeat until choosing index not in indsAll 
        
        mu.append(embs[ind])
        indsAll.append(ind)
        cent += 1
    return indsAll