In [152]:
import warnings
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import mplcursors # Use this is for creating a cursor-interactive plot with "%matplotlib notebook"
from sklearn.decomposition import NMF # Use this for training Non-negative Matrix Factorization
from sklearn.utils.extmath import randomized_svd # Use this for training Singular Value Dec
import torch.optim as optimomposition
from sklearn.manifold import TSNE # Use this for training t-sne manifolding

plt.style.use('ggplot') # You can also use different style

# just for plot checking, use this option
# %matplotlib inline

# for interactive plot
# If you use this option, plot will appear at first-drawn position
%matplotlib notebook

warnings.filterwarnings('ignore')
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
import os
import torchvision.models as models


In [153]:

model_tsne_2d = TSNE(n_components=2)
model = torch.load('/media/data2/jiwon/snapshot_videonosiemodel.pt')

In [194]:
import torch
import torch.nn as nn
import torchvision

from torch.utils.data import Dataset, DataLoader, random_split
from torchvision import transforms as T
from PIL.JpegImagePlugin import convert_dict_qtables
from PIL import Image
import torch.fft
import torch.nn.functional as F

import numpy as np
import os
import matplotlib.pyplot as plt
import random
import warnings

from sys import argv
from time import time

path = '/media/data2/jiwon/VISION/dataset/base_files/'

'''리스트로 불러오고 싶은 프레임을 입력해서 비디오마다 해당 프레임 받아오기'''
def make_dir_dict_list(frame_index): # return img_dict_list, len(img_dict_list)
    img_dict_list = list()
    patern_class_list = ["flat", "natFBH", "nat", "natWA"]
    str_frame_index  = "{:04d}".format(frame_index)
    
    for camera in range(1,36):
        str_index_camera = str(camera)
        if camera < 10: str_index_camera = "0"+str_index_camera
        img_4_per_camera = list()
        
        for x in range(0,4):
            img_name = "D"+str_index_camera+"_I_"+ patern_class_list[x] +"_" + str_frame_index+".jpg"
            img_4_per_camera.append({"img_path": path+img_name, 'type': x})

        img_dict_list.append({'camera': camera, "img_4_per_camera": img_4_per_camera, 'frame': frame_index})
        
    return img_dict_list, len(img_dict_list)


'''이미지를 transform해서 전처리1 하자'''
class ImageTransform():

    def __init__(self, mean=([0.4741, 0.4799, 0.4672]), std=[0.2029, 0.2042, 0.2034]):
        self.data_transform = T.Compose([
            T.ToTensor(),
            # T.Normalize(mean, std)
        ])

    def __call__(self, img):
        return self.data_transform(img)


def create_united_4_homo_dataset(frame_index=60, patch_size=64, hom_patches=4):
    
    # 일단 이미지가 4개 필요함
    img_dict_list, num_images = make_dir_dict_list(frame_index)
    
    homo_4_patches = {'0':list(), '1': list(), '2':list(), '3': list()}
    for dict in img_dict_list:
        # print(dict)
        camera = dict.get('camera', None)
        img_4_per_camera = dict.get('img_4_per_camera', None) # img_4_per_camera에서 4개의 같은 카메라 출신 이미지 4개가 담겨있음
        frame = dict.get('frame', None)
        
        for idx in range(0,hom_patches):
            img_type_dict = img_4_per_camera[idx]
            type = img_type_dict.get('type', None)
            if idx != type: print(idx, type)
            
            with Image.open(img_type_dict.get('img_path', None)) as img:
                img = img.resize((patch_size*20, patch_size*20)) # 64*20 = 1280*1280
                # 패치 자르기
                for y in range(0, img.height, patch_size):
                    for x in range(0, img.width, patch_size):
                        # dict 으로 저장 
                        patch = {'patch': img.crop((x, y, x+patch_size, y+patch_size)), 'camera':camera, 'pos':(x,y)}
                        # 리스트에 dict 저장 
                        homo_4_patches[str(type)].append(patch)
                        

    return homo_4_patches    


class GroupDataset(Dataset):
    def __init__(self, homo_4_patches, transform, split='train') -> None:
        super().__init__()
        self.file_list = homo_4_patches
        self.transform = transform
        
    def __len__(self):
        return len(self.file_list['0'])
    
    def __getitem__(self, index):
        x = self.file_list[str(0)][index].get('patch')
        x_transformed = self.transform(x)
        
        y = self.file_list[str(1)][index].get('patch')
        y_transformed = self.transform(y)
        xy01 = torch.cat([x_transformed,y_transformed], dim=1)
        
        x = self.file_list[str(2)][index].get('patch')
        x_transformed = self.transform(x)
        
        y = self.file_list[str(3)][index].get('patch')
        y_transformed = self.transform(y)
        xy23 = torch.cat([x_transformed,y_transformed], dim=1)
        
        xy0123 = torch.cat([xy01,xy23], dim=1)
        return xy0123
    

def prepare_dataloader(dataset: Dataset, batch_size: int, scale=1, prop=.9, shuffle=True):
    origin_sz = int(len(dataset))
    use_sz = int(origin_sz* scale)
    if scale < 1 : dataset, _ = random_split(dataset, [use_sz, origin_sz-use_sz])
    train, test = random_split(dataset, [int(use_sz*prop), use_sz-int(use_sz*prop)])
    return DataLoader(train, batch_size=batch_size, shuffle=True), DataLoader(
                        test, batch_size=batch_size, shuffle=True )
# end of prepare_dataloader ft



class DnCNN(nn.Module):
    def __init__(self, depth=17, n_channels=64, image_channels=3):
        super(DnCNN, self).__init__()
        kernel_size = 3
        padding = 1
        layers = []

        # Adding first Conv+ReLU layer
        layers.append(nn.Conv2d(in_channels=image_channels, out_channels=n_channels, kernel_size=kernel_size, padding=padding, bias=False))
        layers.append(nn.ReLU(inplace=True))

        # Adding Conv+BN+ReLU layers
        for _ in range(depth-2):
            layers.append(nn.Conv2d(in_channels=n_channels, out_channels=n_channels, kernel_size=kernel_size, padding=padding, bias=False))
            layers.append(nn.BatchNorm2d(n_channels))
            layers.append(nn.ReLU(inplace=True))

        # Adding last Conv layer
        layers.append(nn.Conv2d(in_channels=n_channels, out_channels=1, kernel_size=kernel_size, padding=padding, bias=False))

        self.dncnn = nn.Sequential(*layers)

    def forward(self, x):
        out = self.dncnn(x)
        res = out - x
        return out, res, x-out
    
    

class DistanceBasedLogitLoss(nn.Module):
    def __init__(self, len = 200, margin=1.0, lambda_ = 0.001):
        super(DistanceBasedLogitLoss, self).__init__()
        self.margin = margin
        self.lambda_ = lambda_
        self.len = len
        # self.upper_matrix = torch.triu(torch.ones((self.len,self.len)), diagonal=1)

    def forward(self, r_matrix): # [200,64,64]
        self.len = r_matrix.shape[0]
        self.positive_matrix = torch.zeros((self.len,self.len))
        self.neg_matrix = torch.zeros((self.len,self.len))
        for i in range(self.len):
            i_ = int(i/4)*4
            for j in range(i,self.len):
                if i == j : continue
                j_ = int(j/4)*4
                euclidean_distance_ij = F.pairwise_distance(r_matrix[i].reshape(-1), r_matrix[j].reshape(-1)) 
                #((r_matrix[i] - r_matrix[j])**2).sum()**.5
                # print(euclidean_distance_ij, euclidean_distance_ij.shape)
                if i_!= j_ : self.neg_matrix[i,j] = -euclidean_distance_ij
                else : self.positive_matrix[i,j] = -euclidean_distance_ij

        S_matrix = torch.sum(self.neg_matrix) + torch.sum(self.positive_matrix)
        Pos_prob = self.positive_matrix*(1/S_matrix)
        Pos_prob = Pos_prob + Pos_prob.T
        loss_i = -torch.log(torch.sum(Pos_prob, axis=0))
        loss_all = torch.sum(loss_i)

        ff_matrix = torch.fft.fftn(r_matrix, dim=[1, 2]) # 200 x 64 x 64
        psd = torch.mean(torch.abs(ff_matrix) ** 2, dim=0) # 64 x 64
        # print(psd.shape) # torch.Size([64, 64])
        left = torch.mean(torch.log(psd))
        right = torch.log(torch.mean(psd))
        reg_term = left - right

        return loss_all - self.lambda_*reg_term	
    
    
    

In [195]:
class VisionDataset(Dataset):
    def __init__(self, file_dir_list, transform, split='train') -> None:
        super().__init__()
        self.file_list = file_dir_list
        self.transform = transform
        
    def __len__(self):
        return len(self.file_list)
    
    def __getitem__(self, index):
        img_dict = self.file_list[index]
        img_path = img_dict['img_path']
        img = Image.open(img_path)
        img_transformed = self.transform(img)
        
        label = img_dict['camera']
        
        return img_transformed, label
'''리스트로 불러오고 싶은 프레임을 입력해서 비디오마다 해당 프레임 받아오기'''
def make_data_file_dir_list(frame_index): # return train_img_dir_list
    train_img_dir_list = list()
    patern_class_list = ["flat", "natFBH", "nat", "natWA"]
    frame_index  = "{:04d}".format(frame_index)
    
    for i in range(1,36):
        str_index = str(i)
        if i < 10: str_index = "0"+str_index
        
        for x in range(0,4):
            img_name = "D"+str_index+"_I_"+ patern_class_list[x] +"_" + frame_index+".jpg"
            img_label_dict = {"img_path": path+img_name, "camera":i}
            train_img_dir_list.append(img_label_dict)
    
    return train_img_dir_list
    # 일단 이미지가 4개 필요함
img_dict_list = make_data_file_dir_list(60)
print(img_dict_list[0])

'''대충 작은 거 하나 불러와서 평균, 분산 확인하기'''
toyset = VisionDataset(img_dict_list, ImageTransform())
print(len(toyset), toyset[0][0].shape, toyset[0][1])

{'img_path': '/media/data2/jiwon/VISION/dataset/base_files/D01_I_flat_0060.jpg', 'camera': 1}
140 torch.Size([3, 1920, 2560]) 1


In [196]:
# homo_4_patches = create_united_4_homo_dataset(frame_index=60, patch_size=64, hom_patches=4)

split = 70
# trian_4_patches = hom
model = DnCNN()
# dataset = GroupDataset(homo_4_patches, transform=ImageTransform())
train_dataloader, test_dataloader = prepare_dataloader(toyset, batch_size=1, scale=1.0, prop=.8)
# 이미지를 벡터로 변환하기
model_dict = torch.load('/media/data2/jiwon/snapshot_videonosiemodel_full.pt')
model.load_state_dict(model_dict['MODEL_STATE'])
print(model_dict['EPOCHS_RUN'], model_dict['Loss'])
# model.eval()


19 1713.9589266095843


In [197]:

features = []
labels = []
pt_sz=64
with torch.no_grad():
    for x, label in train_dataloader:
        bt_sz = x.shape[0]
        print(label)
        # print(x.shape, x.reshape(x.shape[1], x.shape[2], x.shape[3]).shape)
        # x = x.reshape(x.shape[1], x.shape[2], x.shape[3])
        batch_features, res, res_hat = model(x)
        features.append(res)
        labels.append(label)

labels = torch.cat(labels, axis=0)
# features = torch.cat(features, axis=0)


tensor([2])
tensor([33])
tensor([31])
tensor([34])
tensor([19])
tensor([8])
tensor([32])
tensor([28])
tensor([1])
tensor([33])
tensor([29])
tensor([29])
tensor([4])
tensor([9])
tensor([7])
tensor([35])
tensor([17])
tensor([34])
tensor([21])
tensor([17])
tensor([8])
tensor([25])
tensor([27])
tensor([6])
tensor([4])
tensor([7])
tensor([13])
tensor([30])
tensor([10])
tensor([18])
tensor([28])
tensor([6])
tensor([23])
tensor([35])
tensor([18])
tensor([31])
tensor([26])
tensor([12])
tensor([15])
tensor([8])
tensor([5])
tensor([23])
tensor([25])
tensor([13])
tensor([21])
tensor([4])
tensor([9])
tensor([1])
tensor([14])
tensor([27])
tensor([5])
tensor([7])
tensor([30])
tensor([2])
tensor([24])
tensor([1])
tensor([17])
tensor([10])
tensor([31])
tensor([6])
tensor([22])
tensor([11])
tensor([5])
tensor([32])
tensor([12])
tensor([33])
tensor([28])
tensor([8])
tensor([11])
tensor([27])
tensor([22])
tensor([22])
tensor([29])
tensor([15])
tensor([13])
tensor([29])
tensor([19])
tensor([27])
tensor([3

In [169]:
print(features[1].shape)
print(len(features))

f = list()
transform = transforms.Compose([T.RandomCrop(64),  T.Grayscale()])
for i in range(len(features)):
    x = features[i].squeeze()
    print(x.shape)
    t_x = transform(x) 
    print(t_x.shape)
    t_x = t_x.view(-1)
    f.append(t_x)
    
print(len(f))
f_cat = torch.cat(f, axis=0)
# labels = torch.cat(labels, axis=0)
    
    

torch.Size([1, 3, 1840, 3264])
112
torch.Size([3, 2448, 3264])
torch.Size([1, 64, 64])
torch.Size([3, 1840, 3264])
torch.Size([1, 64, 64])
torch.Size([3, 1152, 2048])
torch.Size([1, 64, 64])
torch.Size([3, 2336, 4160])
torch.Size([1, 64, 64])
torch.Size([3, 1920, 2560])
torch.Size([1, 64, 64])
torch.Size([3, 1836, 3264])
torch.Size([1, 64, 64])
torch.Size([3, 1536, 2048])
torch.Size([1, 64, 64])
torch.Size([3, 1536, 2048])
torch.Size([1, 64, 64])
torch.Size([3, 720, 960])
torch.Size([1, 64, 64])
torch.Size([3, 1944, 2592])
torch.Size([1, 64, 64])
torch.Size([3, 2976, 3968])
torch.Size([1, 64, 64])
torch.Size([3, 2448, 3264])
torch.Size([1, 64, 64])
torch.Size([3, 960, 1280])
torch.Size([1, 64, 64])
torch.Size([3, 2448, 3264])
torch.Size([1, 64, 64])
torch.Size([3, 960, 1280])
torch.Size([1, 64, 64])
torch.Size([3, 2704, 4784])
torch.Size([1, 64, 64])
torch.Size([3, 960, 1280])
torch.Size([1, 64, 64])
torch.Size([3, 1280, 960])
torch.Size([1, 64, 64])
torch.Size([3, 960, 1280])
torch.Si

In [183]:
f_f = []
for i in range(len(labels)):
    x = f[i]
    # print(x.shape)
    x_v = x.reshape(1, 64*64)
    # print(x_v.shape)
    f_f.append(x_v)
f_f_concat = torch.concat(f_f, dim=0)
print(len(f_f_concat))
# TSNE을 이용하여 시각화하기
tsne = TSNE(n_components=2, random_state=0)
features_tsne = tsne.fit_transform(f_f_concat)


112


In [184]:
# print(len(features_tsne), len(labels))
# print(len(f))

# labels = torch.cat(labels, axis=0)
fig, ax = plt.subplots(figsize=(8,8))
for label in torch.unique(labels):
    idx = labels == label
    ax.scatter(features_tsne[idx, 0], features_tsne[idx, 1], label=label.item(), alpha=0.5)
    if label == 10: break
ax.legend(loc='upper left')
plt.show()
fig.savefig('/home/jiwon/csiro/tsne/tsne_fingerprint.jpg')

<IPython.core.display.Javascript object>

In [192]:
i = 3
image_name = '/home/jiwon/csiro/tsne/'
def save_img_fig(img, i, img_name='img'):
    img = img.squeeze(0).detach().numpy()
    a=np.transpose(img, (1,2,0))
    plt.imshow(a)
    plt.grid(False)
    plt.savefig(image_name+img_name+str(i)+'.jpg')


x = train_dataloader.dataset[i]
b = np.transpose(x[0], (1,2,0))
plt.imshow(b)
plt.savefig(image_name+'image'+str(i)+'.jpg')

output, res, res_hat = model(x[0].unsqueeze(0))
save_img_fig(img=x[0], i=i, img_name='input')
save_img_fig(img=output, i=i, img_name='output')
save_img_fig(img=res, i=i, img_name='out-in')
save_img_fig(img=res_hat, i=i, img_name='in-out')

<IPython.core.display.Javascript object>

Clipping input data to the valid range for imshow with RGB data ([0..1] for floats or [0..255] for integers).
Clipping input data to the valid range for imshow with RGB data ([0..1] for floats or [0..255] for integers).


In [191]:
'''from pytorch_grad_cam import GradCAM, HiResCAM, ScoreCAM, GradCAMPlusPlus, AblationCAM, XGradCAM, EigenCAM, FullGrad
from pytorch_grad_cam.utils.model_targets import ClassifierOutputTarget
from pytorch_grad_cam.utils.image import show_cam_on_image
import torchvision
from torchvision.models import resnet18
import torch
import torch.nn as nn
import cv2
import matplotlib.pyplot as plt
from PIL import Image'''

'from pytorch_grad_cam import GradCAM, HiResCAM, ScoreCAM, GradCAMPlusPlus, AblationCAM, XGradCAM, EigenCAM, FullGrad\nfrom pytorch_grad_cam.utils.model_targets import ClassifierOutputTarget\nfrom pytorch_grad_cam.utils.image import show_cam_on_image\nimport torchvision\nfrom torchvision.models import resnet18\nimport torch\nimport torch.nn as nn\nimport cv2\nimport matplotlib.pyplot as plt\nfrom PIL import Image'

In [206]:
image = Image.open('/media/data2/jiwon/CSIRO/cropped_face/Faceswap/0115/00280.png')

t = T.ToTensor()
image = t(image)
print(image.shape)
image = image.reshape((1,3,image.shape[1], image.shape[2]))
out, inout, outin = model(image)

torch.Size([3, 202, 150])


In [207]:
save_img_fig(img=image[0], i=0, img_name='fs')

<IPython.core.display.Javascript object>