In [None]:
!unzip dataset/dataset.zip

In [None]:
!sudo rm -rf "image log" sample

In [None]:
!pip install --target=$my_path pytorch_lightning

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function

import abc
import numpy as np
from sklearn.metrics import pairwise_distances

import argparse
import torch
from torch.nn import functional as F
from torch import nn
from torchvision import transforms
from torch.utils.data import Dataset, DataLoader
import cv2
import numpy as np
import os
import glob
import shutil
from PIL import Image
from sklearn.metrics import roc_auc_score
from torch import nn
import pytorch_lightning as pl
from sklearn.metrics import confusion_matrix
import pickle
from sklearn.random_projection import SparseRandomProjection
from sklearn.neighbors import NearestNeighbors
from torchvision.models import wide_resnet50_2
from scipy.ndimage import gaussian_filter



In [None]:
"""Abstract class for sampling methods.
Provides interface to sampling methods that allow same signature
for select_batch.  Each subclass implements select_batch_ with the desired
signature for readability.
"""



class SamplingMethod(object):
  __metaclass__ = abc.ABCMeta

  @abc.abstractmethod
  def __init__(self, X, y, seed, **kwargs):
    self.X = X
    self.y = y
    self.seed = seed

  def flatten_X(self):
    shape = self.X.shape
    flat_X = self.X
    if len(shape) > 2:
      flat_X = np.reshape(self.X, (shape[0],np.product(shape[1:])))
    return flat_X


  @abc.abstractmethod
  def select_batch_(self):
    return

  def select_batch(self, **kwargs):
    return self.select_batch_(**kwargs)

  def to_dict(self):
    return None

In [None]:
"""Returns points that minimizes the maximum distance of any point to a center.
Implements the k-Center-Greedy method in
Ozan Sener and Silvio Savarese.  A Geometric Approach to Active Learning for
Convolutional Neural Networks. https://arxiv.org/abs/1708.00489 2017
Distance metric defaults to l2 distance.  Features used to calculate distance
are either raw features or if a model has transform method then uses the output
of model.transform(X).
Can be extended to a robust k centers algorithm that ignores a certain number of
outlier datapoints.  Resulting centers are solution to multiple integer program.
"""
class kCenterGreedy(SamplingMethod):

  def __init__(self, X, y, seed, metric='euclidean'):
    self.X = X
    self.y = y
    self.flat_X = self.flatten_X()
    self.name = 'kcenter'
    self.features = self.flat_X
    self.metric = metric
    self.min_distances = None
    self.n_obs = self.X.shape[0]
    self.already_selected = []

  def update_distances(self, cluster_centers, only_new=True, reset_dist=False):
    """Update min distances given cluster centers.
    Args:
      cluster_centers: indices of cluster centers
      only_new: only calculate distance for newly selected points and update
        min_distances.
      rest_dist: whether to reset min_distances.
    """

    if reset_dist:
      self.min_distances = None
    if only_new:
      cluster_centers = [d for d in cluster_centers
                         if d not in self.already_selected]
    if cluster_centers:
      # Update min_distances for all examples given new cluster center.
      x = self.features[cluster_centers]
      dist = pairwise_distances(self.features, x, metric=self.metric)

      if self.min_distances is None:
        self.min_distances = np.min(dist, axis=1).reshape(-1,1)
      else:
        self.min_distances = np.minimum(self.min_distances, dist)

  def select_batch_(self, model, already_selected, N, **kwargs):
    """
    Diversity promoting active learning method that greedily forms a batch
    to minimize the maximum distance to a cluster center among all unlabeled
    datapoints.
    Args:
      model: model with scikit-like API with decision_function implemented
      already_selected: index of datapoints already selected
      N: batch size
    Returns:
      indices of points selected to minimize distance to cluster centers
    """

    try:
      # Assumes that the transform function takes in original data and not
      # flattened data.
      print('Getting transformed features...')
      self.features = model.transform(self.X)
      print('Calculating distances...')
      self.update_distances(already_selected, only_new=False, reset_dist=True)
    except:
      print('Using flat_X as features.')
      self.update_distances(already_selected, only_new=True, reset_dist=False)

    new_batch = []

    for _ in range(N):
      if self.already_selected is None:
        # Initialize centers with a randomly selected datapoint
        ind = np.random.choice(np.arange(self.n_obs))
      else:
        ind = np.argmax(self.min_distances)
      # New examples should not be in already selected since those points
      # should have min_distance of zero to a cluster center.
      assert ind not in already_selected

      self.update_distances([ind], only_new=True, reset_dist=False)
      new_batch.append(ind)
    print('Maximum distance from cluster centers is %0.2f'
            % max(self.min_distances))


    self.already_selected = already_selected

    return new_batch

In [None]:
# Utilty functions

def distance_matrix(x, y=None, p=2):  # pairwise distance of vectors

	y = x if type(y) == type(None) else y
	n = x.size(0)
	m = y.size(0)
	d = x.size(1)
 
	x = x.unsqueeze(1).expand(n, m, d)
	y = y.unsqueeze(0).expand(n, m, d)

	dist = torch.pow(x - y, p).sum(2)

	return dist

def copy_files(src, dst, ignores=[]):
	src_files = os.listdir(src)
	for file_name in src_files:
		ignore_check = [True for i in ignores if i in file_name]
		if ignore_check:
			continue
		full_file_name = os.path.join(src, file_name)
		if os.path.isfile(full_file_name):
			shutil.copy(full_file_name, os.path.join(dst,file_name))
		if os.path.isdir(full_file_name):
			os.makedirs(os.path.join(dst, file_name), exist_ok=True)
			copy_files(full_file_name, os.path.join(dst, file_name), ignores)

def prep_dirs(root):
	# make embeddings dir
	# embeddings_path = os.path.join(root, 'embeddings')
	embeddings_path = os.path.join('./', args['model_path'])
	os.makedirs(embeddings_path, exist_ok=True)
	# make sample dir
	sample_path = os.path.join(root, 'sample')
	os.makedirs(sample_path, exist_ok=True)
	# make source code record dir & copy
	source_code_save_path = os.path.join(root, 'src')
	os.makedirs(source_code_save_path, exist_ok=True)
	# copy_files('./', source_code_save_path, ['.git','.vscode','__pycache__','logs','README','samples','LICENSE']) # copy source code
	return embeddings_path, sample_path, source_code_save_path

def cvt2heatmap(gray):
	heatmap = cv2.applyColorMap(np.uint8(gray), cv2.COLORMAP_JET)
	return heatmap

def heatmap_on_image(heatmap, image):
	if heatmap.shape != image.shape:
		heatmap = cv2.resize(heatmap, (image.shape[0], image.shape[1]))
	out = np.float32(heatmap)/255 + np.float32(image)/255
	out = out / np.max(out)
	return np.uint8(255 * out)

def min_max_norm(image):
    a_min, a_max = image.min(), image.max()
    # a_min, a_max = 0.32, 3.4
    return a_min, a_max, (image-a_min)/(a_max - a_min)

def min_max_norm2(image, min_value, max_value):
    image_min, image_max = image.min(), image.max()
    return image_min, image_max, (image-min_value)/(max_value - min_value)   


def cal_confusion_matrix(y_true, y_pred_no_thresh, thresh, img_path_list):
	pred_thresh = []
	false_n = []
	false_p = []
	for i in range(len(y_pred_no_thresh)):
		if y_pred_no_thresh[i] > thresh:
			pred_thresh.append(1)
			if y_true[i] == 0:
				false_p.append(img_path_list[i])
		else:
			pred_thresh.append(0)
			if y_true[i] == 1:
				false_n.append(img_path_list[i])

	cm = confusion_matrix(y_true, pred_thresh)
	print(cm)
	print('false positive')
	print(false_p)
	print('false negative')
	print(false_n)

In [None]:
# Neural Network Classes
class NN():

	def __init__(self, X=None, Y=None, p=2):
		self.p = p
		self.train(X, Y)

	def train(self, X, Y):
		self.train_pts = X
		self.train_label = Y

	def __call__(self, x):
		return self.predict(x)

	def predict(self, x):
		if type(self.train_pts) == type(None) or type(self.train_label) == type(None):
			name = self.__class__.__name__
			raise RuntimeError(f"{name} wasn't trained. Need to execute {name}.train() first")

		dist = distance_matrix(x, self.train_pts, self.p) ** (1 / self.p)
		labels = torch.argmin(dist, dim=1)
		return self.train_label[labels]

class KNN(NN):

	def __init__(self, X=None, Y=None, k=3, p=2):
		self.k = k
		super().__init__(X, Y, p)

	def train(self, X, Y):
		super().train(X, Y)
		if type(Y) != type(None):
			self.unique_labels = self.train_label.unique()

	def predict(self, x):
		dist = distance_matrix(x, self.train_pts, self.p) ** (1 / self.p)
		knn = dist.topk(self.k, largest=False)
		return knn

In [None]:
def embedding_concat(x, y):
	# from https://github.com/xiahaifeng1995/PaDiM-Anomaly-Detection-Localization-master
	B, C1, H1, W1 = x.size()
	_, C2, H2, W2 = y.size()
	s = int(H1 / H2)
	x = F.unfold(x, kernel_size=s, dilation=1, stride=s)
	x = x.view(B, C1, -1, H2, W2)
	z = torch.zeros(B, C1 + C2, x.size(2), H2, W2)
	for i in range(x.size(2)):
		z[:, :, i, :, :] = torch.cat((x[:, :, i, :, :], y), 1)
	z = z.view(B, -1, H2 * W2)
	z = F.fold(z, kernel_size=s, output_size=(H1, W1), stride=s)

	return z

def reshape_embedding(embedding):
	embedding_list = []
	for k in range(embedding.shape[0]):
		for i in range(embedding.shape[2]):
			for j in range(embedding.shape[3]):
				embedding_list.append(embedding[k, :, i, j])
	return embedding_list


In [None]:
#imagenet
mean_train = [0.485, 0.456, 0.406]
std_train = [0.229, 0.224, 0.225]

In [None]:
# DataLoader Class
class MVTecDataset(Dataset):
	def __init__(self, root, transform, phase):
		if phase=='train':
			self.img_path = os.path.join(root, 'train')
		else:
			self.img_path = os.path.join(root, 'test')
		self.transform = transform
		# load dataset
		self.img_paths, self.labels, self.types = self.load_dataset() # self.labels => good : 0, anomaly : 1

	def load_dataset(self):

		img_tot_paths = []
		tot_labels = []
		tot_types = []

		defect_types = os.listdir(self.img_path)
		
		for defect_type in defect_types:
			if defect_type == 'good':
				img_paths = glob.glob(os.path.join(self.img_path, defect_type) + "/*.*")
				img_tot_paths.extend(img_paths)
				tot_labels.extend([0]*len(img_paths))
				tot_types.extend(['good']*len(img_paths))
			else:
				img_paths = glob.glob(os.path.join(self.img_path, defect_type) + "/*.*")
				img_paths.sort()
				img_tot_paths.extend(img_paths)
				tot_labels.extend([1]*len(img_paths))
				tot_types.extend([defect_type]*len(img_paths))

		
		return img_tot_paths, tot_labels, tot_types

	def __len__(self):
		return len(self.img_paths)

	def __getitem__(self, idx):
		img_path, label, img_type = self.img_paths[idx], self.labels[idx], self.types[idx]
		img = Image.open(img_path).convert('RGB')
		img = self.transform(img)

		return img, label, os.path.basename(img_path[:-4]), img_type

In [None]:
# model architecture
class STPM(pl.LightningModule):
	def __init__(self, hparams):
		super(STPM, self).__init__()

		self.save_hyperparameters(hparams)

		self.init_features()
		def hook_t(module, input, output):
			self.features.append(output)

		# self.model = torch.hub.load('pytorch/vision:v0.9.0', 'wide_resnet50_2', pretrained=True)
		self.model = wide_resnet50_2(pretrained = True)

		for param in self.model.parameters():
			param.requires_grad = False

		self.model.layer2[-1].register_forward_hook(hook_t)
		self.model.layer3[-1].register_forward_hook(hook_t)

		self.criterion = torch.nn.MSELoss(reduction='sum')

		self.init_results_list()

		self.data_transforms = transforms.Compose([
						transforms.Resize((args['load_size'], args['load_size']), Image.ANTIALIAS),
						transforms.ToTensor(),
						transforms.CenterCrop(args['input_size'])])
						# transforms.Normalize(mean=mean_train,
						# 					std=std_train)]) 

		# self.inv_normalize = transforms.Normalize(mean=[-0.485/0.229, -0.456/0.224, -0.406/0.255], std=[1/0.229, 1/0.224, 1/0.255])

	def init_results_list(self):
		self.pred_list_px_lvl = []
		self.pred_list_img_lvl = []
		self.img_path_list = []        

	def init_features(self):
		self.features = []

	def forward(self, x_t):
		self.init_features()
		_ = self.model(x_t)
		return self.features

	def save_anomaly_map(self, anomaly_map, input_img, file_name, x_type):
		if anomaly_map.shape != input_img.shape:
			anomaly_map = cv2.resize(anomaly_map, (input_img.shape[0], input_img.shape[1]))
		anomaly_map_norm = min_max_norm(anomaly_map)
		anomaly_map_norm_hm = cvt2heatmap(anomaly_map_norm*255)
		
		# print(anomaly_map_norm_hm)
		(H, W) = input_img.shape[:2]

		# save images
		cv2.imwrite(f'{self.sample_path}/{x_type}_{file_name}.jpg', input_img)
		cv2.imwrite(f'{self.sample_path}/{x_type}_{file_name}_amap.jpg', anomaly_map_norm_hm)
		# cv2.imwrite(f'{self.sample_path}/{x_type}_{file_name}_amap_on_img.jpg', hm_on_img)

	def save_anomaly_map2(self, anomaly_map, input_img, values, names):
			self.sample_path = f'./sample'
			os.makedirs(self.sample_path, exist_ok = True)
			os.makedirs('image log', exist_ok = True)

			file_name, folder_name = names
			min_value, max_value, (lower, upper) = values

			lower = np.array(lower, dtype = np.uint8)
			upper = np.array(upper, dtype = np.uint8)

			text_file = open(f'image log/{folder_name}.txt', 'a')
			if anomaly_map.shape != input_img.shape:
					anomaly_map = cv2.resize(anomaly_map, (input_img.shape[0], input_img.shape[1]))
			image_min, image_max, anomaly_map_norm = min_max_norm2(anomaly_map, min_value, max_value)

			print(f'{folder_name}. min : {image_min:.2f}, max : {image_max:.2f}')
			anomaly_map_norm_hm = cvt2heatmap(anomaly_map_norm*255)
			hm_on_img = heatmap_on_image(anomaly_map_norm_hm, input_img)

			anomaly_hsv = cv2.cvtColor(anomaly_map_norm_hm, cv2.COLOR_BGR2HSV)
			mask = cv2.inRange(anomaly_hsv, lower, upper)
			output = cv2.bitwise_and(anomaly_map_norm_hm, anomaly_map_norm_hm, mask = mask)

			_, thr = cv2.threshold(mask, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)
			draw_image = input_img.copy()
			conts, _ = cv2.findContours(thr, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)

			label = 'anomaly' if len(conts) != 0 else 'good'
			for cont in conts:
					# moment = cv2.moments(cont)
					cv2.drawContours(draw_image, [cont], 0, (0, 0, 255), 2)

			save_path = f'{self.sample_path}/{label}'
			os.makedirs(save_path, exist_ok = True)
			# save images
			cv2.imwrite(os.path.join(save_path, f'{file_name}.jpg'), input_img)
			cv2.imwrite(os.path.join(save_path, f'{file_name}_mask.jpg'), output)
			cv2.imwrite(os.path.join(save_path, f'{file_name}_draw.jpg'), draw_image)
			cv2.imwrite(os.path.join(save_path, f'{file_name}_amap.jpg'), anomaly_map_norm_hm)
			cv2.imwrite(os.path.join(save_path, f'{file_name}_amap_on_img.jpg'), hm_on_img)
			
			# self.label_value_dict[folder_name]['min'].append(image_min)
			# self.label_value_dict[folder_name]['max'].append(image_max)

			text = f'image path : {folder_name}/{file_name}\nimage pixel minimum : {image_min:.2f}\nimage pixel maximum : {image_max:.2f}\n\n\n'
			text_file.write(text)
			print(text)

	def train_dataloader(self):
		image_datasets = MVTecDataset(root=args['dataset_path'], transform=self.data_transforms, phase='train')
		train_loader = DataLoader(image_datasets, batch_size=args['batch_size'], shuffle=True, num_workers=0) #, pin_memory=True)
		return train_loader

	def test_dataloader(self):
		test_datasets = MVTecDataset(root=args['dataset_path'], transform=self.data_transforms, phase='test')
		test_loader = DataLoader(test_datasets, batch_size=1, shuffle=False, num_workers=0) #, pin_memory=True) # only work on batch_size=1, now.
		return test_loader

	def configure_optimizers(self):
		return None

	def on_train_start(self):
		self.model.eval() # to stop running_var move (maybe not critical)
		self.embedding_dir_path, self.sample_path, self.source_code_save_path = prep_dirs(self.logger.log_dir)
		self.embedding_list = []
	
	def on_test_start(self):
		self.init_results_list()
		self.embedding_dir_path, self.sample_path, self.source_code_save_path = prep_dirs(self.logger.log_dir)
		
	def training_step(self, batch, batch_idx): # save locally aware patch features
		x, _, file_name, _ = batch
		features = self(x)
		embeddings = []
		for feature in features:
			m = torch.nn.AvgPool2d(3, 1, 1)
			embeddings.append(m(feature))
		embedding = embedding_concat(embeddings[0], embeddings[1])
		self.embedding_list.extend(reshape_embedding(np.array(embedding)))

	def training_epoch_end(self, outputs): 
		total_embeddings = np.array(self.embedding_list)
		# Random projection
		self.randomprojector = SparseRandomProjection(n_components='auto', eps=0.9) # 'auto' => Johnson-Lindenstrauss lemma
		self.randomprojector.fit(total_embeddings)
		# Coreset Subsampling
		selector = kCenterGreedy(total_embeddings,0,0)
		selected_idx = selector.select_batch(model=self.randomprojector, already_selected=[], N=int(total_embeddings.shape[0]*args['coreset_sampling_ratio']))
		self.embedding_coreset = total_embeddings[selected_idx]
		
		print('initial embedding size : ', total_embeddings.shape)
		print('final embedding size : ', self.embedding_coreset.shape)
		with open(os.path.join(self.embedding_dir_path, 'embedding.pickle'), 'wb') as f:
			pickle.dump(self.embedding_coreset, f)

		torch.save(model.state_dict(), f'{args["model_path"]}/model.pt')

	def test_step(self, batch, batch_idx): # Nearest Neighbour Search
		self.embedding_coreset = pickle.load(open(os.path.join(self.embedding_dir_path, 'embedding.pickle'), 'rb'))
		x, label, file_name, x_type = batch
		# extract embedding
		features = self(x)
		embeddings = []
		for feature in features:
			m = torch.nn.AvgPool2d(3, 1, 1)
			embeddings.append(m(feature))
		embedding_ = embedding_concat(embeddings[0], embeddings[1])
		embedding_test = np.array(reshape_embedding(np.array(embedding_)))
		# NN
		knn = KNN(torch.from_numpy(self.embedding_coreset).cuda(), k=9)
		score_patches = knn(torch.from_numpy(embedding_test).cuda())[0].cpu().detach().numpy()

		anomaly_map = score_patches[:,0].reshape((28,28))
		N_b = score_patches[np.argmax(score_patches[:,0])]
		w = (1 - (np.max(np.exp(N_b))/np.sum(np.exp(N_b))))
		score = w*max(score_patches[:,0]) # Image-level score
		
		anomaly_map_resized = cv2.resize(anomaly_map, (args['input_size'], args['input_size']))
		anomaly_map_resized_blur = gaussian_filter(anomaly_map_resized, sigma=4)
		
		self.pred_list_px_lvl.extend(anomaly_map_resized_blur.ravel())
		self.pred_list_img_lvl.append(score)
		self.img_path_list.extend(file_name)
		# save images
		# x = self.inv_normalize(x)
		input_x = cv2.cvtColor(x.permute(0,2,3,1).cpu().numpy()[0]*255, cv2.COLOR_BGR2RGB)
		self.save_anomaly_map(anomaly_map_resized_blur, input_x, file_name[0], x_type[0])
  
		print(score)
	def inspection_image(self, embedding, batch, values): # Nearest Neighbour Search

			x, file_name, folder_name = batch

			# extract embedding
			x = x.unsqueeze(0)
			features = self(x)
			embeddings = []
			for feature in features:
					m = torch.nn.AvgPool2d(3, 1, 1)
					embeddings.append(m(feature))
			embedding_ = embedding_concat(embeddings[0], embeddings[1])
			embedding_test = np.array(reshape_embedding(np.array(embedding_)))

			# NN
			# knn = KNN(torch.from_numpy(self.embedding_coreset).cuda(), k=9)
			# score_patches = knn(torch.from_numpy(embedding_test).cuda())[0].cpu().detach().numpy()
			knn = KNN(torch.from_numpy(embedding), k=9)
			score_patches = knn(torch.from_numpy(embedding_test))[0].cpu().detach().numpy()

			anomaly_map = score_patches[:,0].reshape((28,28))
			anomaly_map_resized = cv2.resize(anomaly_map, (args.input_size, args.input_size))
			anomaly_map_resized_blur = gaussian_filter(anomaly_map_resized, sigma=4)

			# x = self.inv_normalize(x)
			input_x = cv2.cvtColor(x.permute(0,2,3,1).cpu().numpy()[0]*255, cv2.COLOR_BGR2RGB)
			self.save_anomaly_map2(anomaly_map_resized_blur, input_x, values ,(file_name, folder_name))


In [None]:
config = {}
def get_args():
  config['phase'] = 'train'
  config['dataset_path'] = 'dataset'
  config['epochs'] = 3
  config['batch_size'] = 2
  config['load_size'] = 256
  config['input_size'] = 224
  config['coreset_sampling_ratio'] = 0.001
  config['output_path'] = 'output'
  config['save_anomaly_map'] = True
  config['n_neighbors'] = 9
  config['model_path'] = 'model'
  return config

In [None]:
!sudo rm -rf model output

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
from easydict import EasyDict
args = EasyDict(get_args())
	
USE_CUDA = torch.cuda.is_available()
gpu_name = torch.cuda.get_device_name(0)
device = torch.device('cuda:0' if USE_CUDA else 'cpu')

print('학습을 진행하는 기기:', device)
print('gpu 사용 가능? : ', USE_CUDA, '사용 중인 gpu 이름 : ', gpu_name)

trainer = pl.Trainer.from_argparse_args(args, default_root_dir=args['output_path'], max_epochs=args['epochs'], gpus=1)
model = STPM(hparams=args)

os.makedirs(args['model_path'], exist_ok = True)
if args['phase'] == 'train':
  trainer.fit(model)		
  trainer.test(model)
elif args['pahse'] == 'test':
  trainer.test(model)

GPU available: True, used: True
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs


학습을 진행하는 기기: cuda:0
gpu 사용 가능? :  True 사용 중인 gpu 이름 :  Tesla P100-PCIE-16GB


  "Argument interpolation should be of type InterpolationMode instead of int. "
LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]

  | Name      | Type    | Params
--------------------------------------
0 | model     | ResNet  | 68.9 M
1 | criterion | MSELoss | 0     
--------------------------------------
0         Trainable params
68.9 M    Non-trainable params
68.9 M    Total params
275.533   Total estimated model params size (MB)
  f"The dataloader, {name}, does not have many workers which may be a bottleneck."


Training: -1it [00:00, ?it/s]

  return torch.max_pool2d(input, kernel_size, stride, padding, dilation, ceil_mode)


Getting transformed features...
Calculating distances...
Maximum distance from cluster centers is 2.97
initial embedding size :  (196000, 1536)
final embedding size :  (196, 1536)
Getting transformed features...
Calculating distances...
Maximum distance from cluster centers is 2.79
initial embedding size :  (392000, 1536)
final embedding size :  (392, 1536)
Getting transformed features...
Calculating distances...
Maximum distance from cluster centers is 2.70
initial embedding size :  (588000, 1536)
final embedding size :  (588, 1536)


LOCAL_RANK: 0 - CUDA_VISIBLE_DEVICES: [0]
  f"The dataloader, {name}, does not have many workers which may be a bottleneck."


Testing: 0it [00:00, ?it/s]

ValueError: ignored

In [None]:
!ls model

embedding.pickle  model.pt


In [None]:
def load_model(model_path, embedding_path):
    model = STPM(hparams = args)
    model.load_state_dict(torch.load(model_path), strict = False)
    model.eval()
    
    embedding_coreset = pickle.load(open(embedding_path, 'rb'))
    return model, embedding_coreset

In [None]:
from imutils import paths
from easydict import EasyDict
args = EasyDict(get_args())

print(f'{args["model_path"]}/{args["model_path"]}.pt')
model, embedding = load_model('model/model.pt', 'model/embedding.pickle')
print(model)

image_paths = sorted(paths.list_images('dataset/test/good'))
min_value, max_value = 0.32, 3.4 #bow tape data
boundary = [[0, 25, 25], [20, 255, 255]]

for idx, image_path in enumerate(image_paths, 1):
  file_name = image_path.split(os.path.sep)[-1]
  folder_name = image_path.split(os.path.sep)[-2]

  image = cv2.imread(image_path)
  image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

  image = Image.fromarray(image)
  data_transform =transforms.Compose([
              transforms.Resize((args.load_size, args.load_size), Image.ANTIALIAS),
              transforms.ToTensor(),
              transforms.CenterCrop(args.input_size),
              transforms.Normalize(mean=mean_train,
                                  std=std_train)])

  image = data_transform(image)
  batch = [image, file_name, folder_name]
  model.inspection_image(embedding, batch, (min_value, max_value, boundary))

model/model.pt


  "Argument interpolation should be of type InterpolationMode instead of int. "


STPM(
  (model): ResNet(
    (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
    (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (layer1): Sequential(
      (0): Bottleneck(
        (conv1): Conv2d(64, 128, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn1): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv2): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(128, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv3): Conv2d(128, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (downsample): Sequential(
         

KeyboardInterrupt: ignored