In [None]:
# 필요한 라이브러리 설정
import torch
from torch import nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
from torchvision import models
from pytorch_model_summary import summary

import numpy as np
from sklearn.preprocessing import MinMaxScaler

import os
from os.path import join
from pickle import load
from tqdm import tqdm
import random
import datetime

In [None]:
######### 설정 영역 ########
# 실험 관련 세팅
exp_name = 'torch_20220511' # 실험 이름 혹은 오늘 날짜
modelVersion = 'Dense_1st_torch'
nameDataset = 'IWALQQ_AE_1st'
dataType = 'angle' # or moBWHT

#################################
# 여기는 grid로 돌림!
#################################
list_learningRate = {0:0.006, 1:0.008, 2:0.01} # opt1 
list_batch_size = {0:128} # opt2
list_lossFunction =  {0:"MAE"} # opt2

totalFold = 5 # total fold, I did 5-fold cross validation
epochs = 1000 # total epoch 
log_interval = 10 # frequency for saving log file
count = 0 # In SCC, get count for grid-training

In [None]:
# 내 모델을 구현하기 위한 세부 sub module
class Encoder(nn.Module):
    def __init__(self, seq_len, n_features, embedding_dim):
        super(Encoder, self).__init__()
        self.seq_len, self.n_features = seq_len, n_features
        self.embedding_dim, self.hidden_dim = (
            embedding_dim, 2 * embedding_dim
        )
        self.rnn1 = nn.LSTM(
          input_size=n_features,
          hidden_size=self.hidden_dim,
          num_layers=1,
          batch_first=True,
          bidirectional=True
        )
        self.rnn2 = nn.LSTM(
          input_size=self.hidden_dim * 2,
          hidden_size=embedding_dim,
          num_layers=1,
          batch_first=True,
          bidirectional=True
        )
        self.output_layer = torch.nn.Linear(self.embedding_dim * 2, self.embedding_dim) # bidirectianl이 켜져 있어서 그럼
      
    def forward(self, x):
        x, (_, _) = self.rnn1(x)
        x, (hidden_n, _) = self.rnn2(x)
        return  self.output_layer(x[:,-1,:])

In [None]:
# show input shape
print(summary(Encoder(101, 42, 30), torch.zeros((32, 101, 42)), show_input=True))
# show output shape
print(summary(Encoder(101, 42, 30), torch.zeros((32, 101, 42)), show_input=False))

In [None]:
# https://discuss.pytorch.org/t/any-pytorch-function-can-work-as-keras-timedistributed/1346/25
class TimeDistributed(nn.Module):
    def __init__(self, module):
        super(TimeDistributed, self).__init__()
        self.module = module

    def forward(self, x):
        t, n = x.size(0), x.size(1)
        x_reshape = x.contiguous().view(t * n, -1)  # (samples * timesteps, input_size)
        y = self.module(x_reshape)
        # We have to reshape Y
        y = y.contiguous().view(t, n, -1)  # (samples, timesteps, output_size)
        return y

In [None]:
class Decoder(nn.Module):
    def __init__(self, seq_len, input_dim, n_features):
        super(Decoder, self).__init__()
        self.seq_len, self.input_dim = seq_len, input_dim
        self.hidden_dim, self.n_features = 2 * input_dim, n_features
        self.rnn1 = nn.LSTM(
          input_size=input_dim,
          hidden_size=input_dim,
          num_layers=1,
          batch_first=True,
          bidirectional = True
        )
        self.rnn2 = nn.LSTM(
          input_size=input_dim * 2,
          hidden_size=self.hidden_dim,
          num_layers=1,
          batch_first=True,
          bidirectional = True
        )
        self.output_layer = torch.nn.Linear(self.hidden_dim * 2, self.n_features)
        self.timedist = TimeDistributed(self.output_layer)
        
    def forward(self, x):
        # print(f'decoder first shape of x: {x.shape}')
        x = x.reshape(-1,1,self.input_dim).repeat(1,self.seq_len,1)
        # print(f'decoder after repeatvector shape of x: {x.shape}')       
        x, (hidden_n, cell_n) = self.rnn1(x)
        x, (hidden_n, cell_n) = self.rnn2(x)
        # print(f'decoder last shape of x: {self.timedist(x).shape}')
        return self.timedist(x)

In [None]:
# show input shape
print(summary(Decoder(101, 20, 42), torch.zeros((20)), show_input=True))
# show output shape
print(summary(Decoder(101, 20, 42), torch.zeros((20)), show_input=False))

In [None]:
# main module
class RecurrentAutoencoder(nn.Module):
    def __init__(self, seq_len, n_features, embedding_dim=30):
        super(RecurrentAutoencoder, self).__init__()
        self.encoder = Encoder(seq_len, n_features, embedding_dim)#.to(device)
        self.decoder = Decoder(seq_len, embedding_dim, n_features)#.to(device)
    def forward(self, x):
        # print(f'first shape of x: {x.shape}')
        x = self.encoder(x)
        x = self.decoder(x)
        # print(f'last shape of x: {x.shape}')
        return x

In [None]:

# show input shape
print(summary(RecurrentAutoencoder(101, 42, 30), torch.zeros((16, 101, 42)), show_input=True))

# show output shape
print(summary(RecurrentAutoencoder(101, 42, 30), torch.zeros((16, 101, 42)), show_input=False))

# 학습 루프 함수화 (구현)

In [None]:
# 학습루프 구현하기


# 학습 루프 함수화 (예제)
- 참고 https://koreapy.tistory.com/739

In [None]:
loss_fn = nn.CrossEntropyLoss() 
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)

In [None]:
def train(dataloader, model, loss_fn, optimizer, device): 
    size = len(dataloader.dataset) 
    for batch, (X, y) in enumerate(dataloader): 
        X, y = X.to(device), y.to(device) 
        # Compute prediction error 
        pred = model(X) 
        loss = loss_fn(pred, y) 
        # Backpropagation 
        optimizer.zero_grad() 
        loss.backward() 
        optimizer.step() 
        if batch % 100 == 0: 
            loss, current = loss.item(), batch * len(X) 
            print(f"loss: {loss:>7f} [{current:>5d}/{size:>5d}]")

In [None]:
def test(dataloader, model, loss_fn, device): 
    size = len(dataloader.dataset) 
    num_batches = len(dataloader) 
    model.eval() 
    test_loss, correct = 0, 0 
    with torch.no_grad(): 
        for X, y in dataloader: X, y = X.to(device), y.to(device) 
        pred = model(X) 
        test_loss += loss_fn(pred, y).item() 
        correct += (pred.argmax(1) == y).type(torch.float).sum().item() 
        test_loss /= num_batches 
        correct /= size 
        print(f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")

In [None]:
epochs = 5 
for t in range(epochs): 
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_dataloader, model, loss_fn, optimizer)
    test(test_dataloader, model, loss_fn) 
print("Done!")

# 배운것


In [None]:
a = torch.randn(2, 3, 4)
print(a.size())
print(a.stride())
print(a.is_contiguous())
a = a.transpose(0, 1)
print(a.is_contiguous())
a = a.contiguous()
a = a.view(-1)
print(a.size())