In [1]:
import torch
import torchvision
import torch.nn.functional as F
from torch import nn, optim
from torchvision import transforms, datasets
from torch.utils.data import Dataset

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd


In [2]:
# 하이퍼파라미터
EPOCH = 5000
BATCH_SIZE = 512
USE_CUDA = torch.cuda.is_available()
DEVICE = torch.device("cuda" if USE_CUDA else "cpu")

print("다음 기기로 학습합니다:", DEVICE)

다음 기기로 학습합니다: cpu


In [3]:
import math
width = 100
height = 100
max_dist = math.sqrt(width * width + height * height)
# max_dist = 100

In [4]:
class CustomDataset(Dataset):
	def __init__(self, csv_path, max_dist):
		df = pd.read_csv(csv_path)
		# df.shape : (n_samples, 2 + 2 * n_refs + n_refs + n_refs)
		# cols = target_x, target_y, ref_x_1, ref_y_1, ... ref_x_n, ref_y_n, distance_1, ..., distance_n, distance_1_with_noise, ..., distance_n_with_noise
  
		cols = [ col_ for col_ in df.columns if "distance" in col_ ]

		distances_cols = [col_ for col_ in cols if "noise" not in col_]
		distances_with_noise_cols = [col_ for col_ in cols if "noise" in col_]
		
		self.n_distances = len(distances_cols)
  
		# 각 해당 컬럼을 list of list로 변경
		self.distances = df[distances_cols].values
		self.distances_with_noise = df[distances_with_noise_cols].values
	
	def __len__(self):
		# 가지고 있는 데이터셋의 길이를 반환한다.
		return len(self.distances) # 1314

	def __getitem__(self, idx):
		distances = torch.FloatTensor(self.distances[idx])
		distances_with_noise = torch.FloatTensor(self.distances_with_noise[idx])
		return distances, distances_with_noise # 해당하는 idx(인덱스)의 input과 output 데이터를 반환한다.
 
	def get_n_distances(self):
		return self.n_distances

In [5]:
train_path = "./train.csv"

trainset = CustomDataset(train_path, max_dist)

train_loader = torch.utils.data.DataLoader(
    dataset     = trainset,
    batch_size  = BATCH_SIZE,
    shuffle     = True,
)

In [6]:
n_distances = trainset.get_n_distances()

In [7]:
class Autoencoder(nn.Module):
    def __init__(self):
        super(Autoencoder, self).__init__()

        self.encoder = nn.Sequential(
            # nn.Linear(n_distances, 128),
            # nn.ReLU(),
            # nn.Linear(128, 64),
            # nn.ReLU(),
            # nn.Linear(64, 12),
            # nn.ReLU(),
            # nn.Linear(12, 3),   # 입력의 특징을 3차원으로 압축합니다
            nn.Linear(n_distances, n_distances),
            nn.LeakyReLU(),
            nn.Linear(n_distances, 9),   # 입력의 특징을 3차원으로 압축합니다   
        )
        self.decoder = nn.Sequential(
            # nn.Linear(3, 12),
            # nn.ReLU(),
            # nn.Linear(12, 64),
            # nn.ReLU(),
            # nn.Linear(64, 128),
            # nn.ReLU(),
            # nn.Linear(128, n_distances),
            # nn.Sigmoid(),       # 픽셀당 0과 1 사이로 값을 출력합니다
            nn.Linear(9, n_distances),
            nn.LeakyReLU(),
            nn.Linear(n_distances, n_distances),
            nn.Sigmoid(),       # 픽셀당 0과 1 사이로 값을 출력합니다
            
        )

    def forward(self, x):
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return encoded, decoded

In [8]:
autoencoder = Autoencoder().to(DEVICE)
optimizer = torch.optim.Adam(autoencoder.parameters(), lr=0.0001, weight_decay=0.00001)
criterion = nn.MSELoss()

In [9]:
def train(autoencoder, train_loader):
    autoencoder.train()
    avg_loss = 0
    for step, (distances, distances_with_noise) in enumerate(train_loader):
        # noisy_x = add_noise(x)  # 입력에 노이즈 더하기
        # noisy_x = noisy_x.view(-1, 28*28).to(DEVICE)
        # y = x.view(-1, 28*28).to(DEVICE)
        distances /= max_dist
        distances_with_noise /= max_dist
        distances = distances.to(DEVICE)
        distances_with_noise = distances_with_noise.to(DEVICE)

        # label = label.to(DEVICE)
        encoded, decoded = autoencoder(distances_with_noise)

        # print(distances[0][:5])
        # print(distances_with_noise[0][:5])
        # print(decoded[0][:5])
        # print()
        
        loss = criterion(decoded, distances)
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        avg_loss += loss.item()
    return avg_loss / len(train_loader)

In [10]:

for epoch in range(1, EPOCH+1):
    loss = train(autoencoder, train_loader)
    print("[Epoch {}] loss:{}".format(epoch, loss))
    # 이번 예제에선 학습시 시각화를 건너 뜁니다

[Epoch 1] loss:0.04633130971342325
[Epoch 2] loss:0.045923404209315774
[Epoch 3] loss:0.04545548167079687
[Epoch 4] loss:0.04501259978860617
[Epoch 5] loss:0.04453497417271137
[Epoch 6] loss:0.0440627958625555
[Epoch 7] loss:0.043577522598207
[Epoch 8] loss:0.04310768637806177
[Epoch 9] loss:0.04261479023844004
[Epoch 10] loss:0.04213306568562984
[Epoch 11] loss:0.04168057683855295
[Epoch 12] loss:0.041171666048467156
[Epoch 13] loss:0.040689053013920784
[Epoch 14] loss:0.04021608140319586
[Epoch 15] loss:0.039730678871273996
[Epoch 16] loss:0.03921353686600924
[Epoch 17] loss:0.03881218433380127
[Epoch 18] loss:0.03826072197407484
[Epoch 19] loss:0.03785340394824743
[Epoch 20] loss:0.03738649990409613
[Epoch 21] loss:0.036945870332419875
[Epoch 22] loss:0.036507084779441354
[Epoch 23] loss:0.03608375359326601
[Epoch 24] loss:0.03568839002400637
[Epoch 25] loss:0.03534011319279671
[Epoch 26] loss:0.03499648030847311
[Epoch 27] loss:0.03467036560177803
[Epoch 28] loss:0.0344032678753137

In [11]:

def evaluate(model, test_loader):
    model.eval()
    test_loss = 0
    correct = 0
    with torch.no_grad():
        for step, (distances, distances_with_noise) in enumerate(test_loader):
            distances_with_noise /= max_dist
            distances, distances_with_noise = distances.to(DEVICE), distances_with_noise.to(DEVICE)
            encoded, decoded = model(distances_with_noise)

            decoded *= max_dist
            
            losses = F.mse_loss(decoded, distances,
                                         reduction='sum').item()

            print(distances)
            print(distances_with_noise * max_dist)
            print(decoded)
            print()
            # 모든 오차 더하기
            test_loss += losses
            
    test_loss /= len(test_loader.dataset)
    return test_loss

In [12]:
test_path = "./test.csv"

testset = CustomDataset(test_path, max_dist)

test_loader = torch.utils.data.DataLoader(
    dataset     = testset,
    batch_size  = 1,
    shuffle     = False,
)

test_loss = evaluate(autoencoder, test_loader)

print('Test Mean Square Error: {:.4f}'.format(test_loss))

tensor([[108.1665,  56.3560,  64.6220,  21.5870,  87.6641,  94.1754,  56.9210,
          15.8114,  36.1386,  29.6816,  69.4622, 101.0792]])
tensor([[111.1012,  58.2357,  65.4832,  22.5645,  89.0342,  93.8135,  57.0489,
          17.8387,  40.0767,  33.2389,  72.4131, 101.8457]])
tensor([[ 97.4222,  50.6750,  53.8850,  23.3334,  71.0599, 109.3974,  57.5345,
          25.2760,  37.3427,  32.2437,  86.4849,  91.7682]])

tensor([[21.3776, 56.0357, 66.6108, 91.8314, 50.4777, 31.8904, 30.4138, 65.8559,
         77.7817, 54.0370, 43.6578, 50.6360]])
tensor([[24.8128, 60.2713, 74.5374, 93.6205, 53.3429, 36.4026, 31.2495, 71.5455,
         81.8565, 57.8037, 48.5734, 54.9690]])
tensor([[23.5770, 69.8071, 74.4562, 88.0946, 45.8077, 30.5721, 42.0149, 57.8508,
         73.3431, 56.3816, 47.1920, 40.9824]])

tensor([[40.3113, 22.3607, 70.0071, 17.4642, 18.4391, 84.5044, 56.5685, 79.7120,
          8.0623, 95.9635, 49.3964, 96.8969]])
tensor([[ 42.7777,  25.2771,  72.6012,  18.6997,  20.5176,  87.100