In [27]:
import torch
import torch.nn as nn
import torchvision
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

-  Data : [1,2,3,4,5,6,7,8,9,10,11,12]
- sequence length 3이며 batch size 4인 데이터로 나눔
- [[1,2,3],
   [4,5,6],
   [7,8,9],
   [10,11,12]]

In [28]:
inputs = torch.Tensor([1,2,3,4,5,6,7,8,9,10,11,12])

In [29]:
input_size = 1
seq_length = 3
hidden_size = 2
num_layers = 2
batch_size = 4

### nn.LSTM Basic

Input : input과 hidden_0 이라는 2개의 input을 받음
- input : neural network로 들어가는 sequence input [seq_length, batch size, input size]
- hidden_0 : network의 초기 hidden state [num layers*num directions, batch size, input size] 
    - num directions : Bidirectional RNN일 경우 2, 나머지 1
    - hidden_0은 따로 초기화 하지 않으면 Pytorch에 의해 자동으로 모두 0으로 초기화 됨

Output : out과 hidden이라는 2개의 출력을 냄
- out : 마지막 RNN layer로부터 매 timesteps마다의 output
- h_n : 모든 RNN layer로부터 마지막 timestep의 hidden value => `(hidden, cell)`
    - (num_layers* num_directions, batch, hidden_size)

* lstm 클래스 선언

In [30]:
lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, batch_first=True)
# batch_first = True이면 (seq, batch, feature) -> (batch, seq, feature)로 바뀜

In [31]:
# Input

print('inputs before :',inputs.shape)
inputs = inputs.view(batch_size, seq_length, input_size)
print('-'*40)
print('inputs after :',inputs.shape) # [batch size, seq length, input size]

inputs before : torch.Size([12])
----------------------------------------
inputs after : torch.Size([4, 3, 1])


In [32]:
# Output
hidden_init = torch.zeros(num_layers, batch_size, hidden_size) # 안해도 되지만 초기화해봄
cell_init = torch.zeros(num_layers, batch_size, hidden_size)

out, (hidden, cell) = lstm(inputs, (hidden_init,cell_init))
print('out:',out.shape) # [batch size, seq length, num_directions*hidden size]
print('hidden:',hidden.shape) # [num directions * num layers, batch size, hidden size]
print('cell:',cell.shape) # [num directions * num layers, batch size, hidden size]

out: torch.Size([4, 3, 2])
hidden: torch.Size([2, 4, 2])
cell: torch.Size([2, 4, 2])


### LSTM application

In [33]:
import torch
import torch.nn as nn
import torchvision
from torchvision import datasets, transforms
from torch.utils.data import DataLoader

# MNIST 데이터셋 
train_data = datasets.MNIST(
    root="../data",
    train=True,
    download=True,
    transform=transforms.ToTensor(),
)

test_data = datasets.MNIST(
    root="../data",
    train=False,
    download=True,
    transform=transforms.ToTensor(),
)

# Data loader
train_loader = DataLoader(train_data, batch_size=128, shuffle=True)
test_loader = DataLoader(test_data, batch_size=128, shuffle=False)

In [34]:
# LSTM - many to one
class LSTM(nn.Module):
    def __init__(self, num_classes):
        super(LSTM, self).__init__()
        self.input_size = 28
        self.hidden_size = 128
        self.num_layers = 2
        self.LSTM = nn.LSTM(input_size=self.input_size, hidden_size=self.hidden_size, num_layers=self.num_layers, batch_first=True)
        # batch_first = True이면 (seq, batch, feature) -> (batch, seq, feature)로 바뀜
        self.fc = nn.Linear(self.hidden_size, num_classes)
    
    def forward(self, x):
        # hidden state와 cell state 초기화
        hidden_init = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
        cell_init = torch.zeros(self.num_layers, x.size(0), self.hidden_size).to(device)
        
        out, _ = self.LSTM(x, (hidden_init, cell_init))  # out: [mini-batch, seq_length, hidden_size]
        out = self.fc(out[:, -1, :])
        return out

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = LSTM(num_classes=10).to(device)

In [35]:
# Loss and optimizer
CELoss = nn.CrossEntropyLoss()
adam_optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [20]:
# Train the model
total_epochs = 3
sequence_length = 28
input_size = 28

print('number of iteration :', len(train_loader))

total_step = len(train_loader)
for epoch in range(total_epochs):
    for i, (images, labels) in enumerate(train_loader):
        images = images.reshape(-1, sequence_length, input_size).to(device)
        labels = labels.to(device)
        
        # Forward pass
        outputs = model(images)
        loss = CELoss(outputs, labels)
        
        # Backward and optimize
        adam_optimizer.zero_grad()
        loss.backward()
        adam_optimizer.step()
        
    print ('Epoch [{}/{}], Loss: {:.4f}'.format(epoch+1, total_epochs, loss.item()))

number of iteration : 469
Epoch [1/3], Loss: 0.1717
Epoch [2/3], Loss: 0.0570
Epoch [3/3], Loss: 0.0568


In [21]:
# 학습이 끝난 후 모델 성능 테스트
# test에서는 back propagation 작업을 하지 않으므로 gradient를 계산하지 않도록 함 - 메모리의 효율성을 위해

model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for images, labels in test_loader:
        images = images.reshape(-1, sequence_length, input_size).to(device)
        labels = labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    print('Test Accuracy of the model on the 10000 test images: {} %'.format(100 * correct / total)) 

Test Accuracy of the model on the 10000 test images: 97.84 %


In [22]:
# 학습한 모델을 model_RNN.ckpt라는 이름으로 저장
torch.save(model.state_dict(), 'model_LSTM.ckpt')

### Bidirectional RNN Application

In [23]:
# Bidirectional LSTM - many to one
class Bi_LSTM(nn.Module):
    def __init__(self, num_classes):
        super(Bi_LSTM, self).__init__()
        self.input_size = 28
        self.hidden_size = 128
        self.num_layers = 2
        self.LSTM = nn.LSTM(input_size=self.input_size, hidden_size=self.hidden_size, num_layers=self.num_layers, batch_first=True, bidirectional=True)
        # batch_first = True이면 (seq, batch, feature) -> (batch, seq, feature)로 바뀜
        self.fc = nn.Linear(self.hidden_size*2, num_classes)
    
    def forward(self, x):
        # hidden state와 cell state 초기화
        hidden_init = torch.zeros(self.num_layers*2, x.size(0), self.hidden_size).to(device)
        cell_init = torch.zeros(self.num_layers*2, x.size(0), self.hidden_size).to(device)
        
        out, _ = self.LSTM(x, (hidden_init, cell_init))  # out: [mini-batch, seq_length, hidden_size]
        out = self.fc(out[:, -1, :])
        return out

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = Bi_LSTM(num_classes=10).to(device)

In [24]:
# Loss and optimizer
CELoss = nn.CrossEntropyLoss()
adam_optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

In [25]:
# Train the model
total_epochs = 3
sequence_length = 28
input_size = 28

print('number of iteration :', len(train_loader))

total_step = len(train_loader)
for epoch in range(total_epochs):
    for i, (images, labels) in enumerate(train_loader):
        images = images.reshape(-1, sequence_length, input_size).to(device)
        labels = labels.to(device)
        
        # Forward pass
        outputs = model(images)
        loss = CELoss(outputs, labels)
        
        # Backward and optimize
        adam_optimizer.zero_grad()
        loss.backward()
        adam_optimizer.step()
        
    print ('Epoch [{}/{}], Loss: {:.4f}'.format(epoch+1, total_epochs, loss.item()))

number of iteration : 469
Epoch [1/3], Loss: 0.1680
Epoch [2/3], Loss: 0.1021
Epoch [3/3], Loss: 0.0382


In [26]:
# 학습이 끝난 후 모델 성능 테스트
# test에서는 back propagation 작업을 하지 않으므로 gradient를 계산하지 않도록 함 - 메모리의 효율성을 위해

model.eval()
with torch.no_grad():
    correct = 0
    total = 0
    for images, labels in test_loader:
        images = images.reshape(-1, sequence_length, input_size).to(device)
        labels = labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    print('Test Accuracy of the model on the 10000 test images: {} %'.format(100 * correct / total)) 

Test Accuracy of the model on the 10000 test images: 96.82 %
