In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np
import matplotlib.pyplot as plt
from torchvision import datasets, transforms
from torch.utils.data import DataLoader
from sklearn.metrics.pairwise import euclidean_distances

# CIFAR-10 데이터셋 로드 및 전처리
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

trainset = datasets.CIFAR10(root='./data', train=True, download=True, transform=transform)
testset = datasets.CIFAR10(root='./data', train=False, download=True, transform=transform)

trainloader = DataLoader(trainset, batch_size=64, shuffle=True)
testloader = DataLoader(testset, batch_size=64, shuffle=False)

# 임의로 데이터를 오염시키는 함수 (여기서는 이미지를 흰색으로 바꾸는 방식)
def add_noise(x, noise_factor=0.5):
    noise = torch.randn_like(x) * noise_factor
    x_noisy = x + noise
    return torch.clamp(x_noisy, 0., 1.)

# Non-Local Means 필터
def non_local_means(img, patch_size=5, search_window_size=21, h=1.0):
    """
    Non-Local Means Denoising
    :param img: 노이즈가 포함된 이미지 (Tensor, 크기: [C, H, W])
    :param patch_size: 각 패치의 크기 (정수)
    :param search_window_size: 검색 창의 크기 (정수)
    :param h: 필터의 강도 (작을수록 더 강한 필터링)
    :return: denoised image
    """
    C, H, W = img.shape
    half_patch_size = patch_size // 2
    half_search_window = search_window_size // 2
    
    # 결과 이미지 초기화
    denoised_img = torch.zeros_like(img)

    # 패치와 검색창의 모든 좌표에 대해 반복
    for c in range(C):
        for i in range(H):
            for j in range(W):
                patch = img[c, max(i-half_patch_size, 0):min(i+half_patch_size+1, H), max(j-half_patch_size, 0):min(j+half_patch_size+1, W)]
                patch_flat = patch.view(-1)
                
                weights = []
                for di in range(max(i-half_search_window, 0), min(i+half_search_window+1, H)):
                    for dj in range(max(j-half_search_window, 0), min(j+half_search_window+1, W)):
                        neighbor_patch = img[c, max(di-half_patch_size, 0):min(di+half_patch_size+1, H), max(dj-half_patch_size, 0):min(dj+half_patch_size+1, W)]
                        neighbor_patch_flat = neighbor_patch.view(-1)
                        
                        # 유클리드 거리 계산
                        dist = torch.norm(patch_flat - neighbor_patch_flat)
                        weight = torch.exp(-dist**2 / (h**2))
                        weights.append((weight, (di, dj)))
                
                # 가중치 기반으로 값 계산
                norm_weights = sum([w[0] for w in weights])
                weighted_sum = sum([img[c, w[1][0], w[1][1]] * w[0] for w in weights])
                denoised_img[c, i, j] = weighted_sum / norm_weights if norm_weights != 0 else img[c, i, j]
                
    return denoised_img

# NLM을 적용하여 이미지 복원
def denoise_image(image, patch_size=5, search_window_size=21, h=1.0):
    return non_local_means(image, patch_size, search_window_size, h)

# CIFAR-10 이미지에 대해 NLM을 적용하여 복원
def apply_nlm_on_batch(batch, patch_size=5, search_window_size=21, h=1.0):
    batch_denoised = []
    for img in batch:
        denoised_img = denoise_image(img)
        batch_denoised.append(denoised_img)
    
    return torch.stack(batch_denoised)

# 모델 훈련 (NLM은 비지도 학습이므로 훈련이 필요하지 않음)
# 테스트 데이터셋에 대한 복원
model.eval()
with torch.no_grad():
    test_data = next(iter(testloader))
    test_inputs, test_labels = test_data
    noisy_test_inputs = add_noise(test_inputs)

    # NLM을 사용하여 복원된 이미지 얻기
    restored_images = apply_nlm_on_batch(noisy_test_inputs)

# 복원된 이미지와 원본 이미지 비교
n = 10  # 이미지 갯수
plt.figure(figsize=(20, 4))
for i in range(n):
    # 원본
    ax = plt.subplot(2, n, i + 1)
    plt.imshow(test_inputs[i].permute(1, 2, 0) / 2 + 0.5)
    plt.title("Original")
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)

    # 복원된 이미지
    ax = plt.subplot(2, n, i + 1 + n)
    plt.imshow(restored_images[i].permute(1, 2, 0) / 2 + 0.5)
    plt.title("Restored by NLM")
    ax.get_xaxis().set_visible(False)
    ax.get_yaxis().set_visible(False)

plt.show()


In [None]:
import torch
import os
import numpy as np
import torch.optim as optim
import torch.nn as nn
import gc
from sklearn.manifold import TSNE
import matplotlib.pyplot as plt

# TSNE 모델 클래스 정의
class TSNE_Model(torch.nn.Module):
    def __init__(self, n_components=2, perplexity=30.0, n_iter=1000):
        super(TSNE_Model, self).__init__()
        self.n_components = n_components
        self.perplexity = perplexity
        self.n_iter = n_iter

    def forward(self, X):
        """
        X: (N, D) tensor, where N is the number of data points and D is the dimension of each data point
        """
        X = X.detach().cpu().numpy()  # PyTorch tensor -> NumPy array
        tsne = TSNE(n_components=self.n_components, perplexity=self.perplexity, n_iter=self.n_iter)
        tsne_results = tsne.fit_transform(X)
        return torch.tensor(tsne_results, dtype=torch.float32).to(X.device)

# 학습 함수
def train_model(model, trainloader, device='cuda', epochs=10, Data_Reset=False, net_reset=False, 
                folder_path='./', net_name="default_net.pt", log_save=True, log_name="default_log.npy"):
    
    if not os.path.exists(folder_path):
        print("디렉토리 없음")
        if not net_reset and not Data_Reset:
            return
        os.mkdir(folder_path)
        print("디렉토리를 만들었습니다:", folder_path)
    
    os.chdir(folder_path)

    if not os.path.exists(net_name) and not net_reset:
        print("학습데이터 없음")
        os.chdir("../")
        return

    # DataLoader에서 데이터 항목 개수 확인
    for batch in trainloader:
        num_items = len(batch)  # 배치에서 항목 개수 확인
        print("데이터 항목 수:", num_items)
        break

    # 모델 초기화 또는 데이터 초기화가 필요한 경우
    if net_reset or Data_Reset or (not os.path.exists(net_name)):
        model.train()
        loss_list = []

        criterion = nn.MSELoss()  # 손실 함수
        optimizer = optim.Adam(model.parameters(), lr=0.001)

        for epoch in range(epochs):
            running_loss = 0.0
            for data in trainloader:
                if num_items == 2:
                    inputs, _ = data
                elif num_items == 4:
                    _, _, inputs, _ = data
                inputs = inputs.to(device)

                # TSNE 모델에 데이터를 전달하여 차원 축소
                outputs = model(inputs)

                # 손실 계산 (이 경우엔 차원 축소된 결과와 원본 데이터를 비교하지 않음)
                # 차원 축소 모델이므로 손실 계산은 생략할 수 있음
                # loss = criterion(outputs, inputs)  # 차원 축소는 손실을 계산하지 않음

                # 손실 없이 그냥 출력값만 업데이트 (TSNE는 지도 학습이 아닌 차원 축소 모델이므로)
                # running_loss += loss.item()

            print(f'Epoch [{epoch+1}/{epochs}]')

        # 모델 저장
        torch.save(model.state_dict(), net_name)
        print(f"모델을 저장하였습니다: {net_name}")
        
        if log_save:
            np.save(log_name, np.array(loss_list))
            print(f"로그를 저장하였습니다: {log_name}.npy")
        
        os.chdir("../")
        del model
        torch.cuda.empty_cache()
        gc.collect()
        return
    
    else:
        if not os.path.exists(log_name + ".npy"):
            print("학습 로그 없음")
        else:
            loss_list = np.load(log_name + ".npy")
            print("로그 로드됨")

        os.chdir("../")
        return