In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision
from torchvision import datasets
from torchvision import transforms
from torch.autograd import Variable
import matplotlib.pyplot as plt
import time
import numpy as np


In [2]:
# training  data增强
train_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(), # 水平翻转
    transforms.RandomRotation(15), # 翻转
    transforms.ToTensor(), # 图片 to Tensor，normalize
])
# test transform
test_transform = transforms.Compose([
    transforms.ToTensor(),
])
# 调用预置的dataset

train_set = datasets.MNIST(root=r'G:\研一\深度学习\handwritedigit',
                           transform = train_transform,
                            train = True,
                            download=False
                            )

test_set = datasets.MNIST(root=r'G:\研一\深度学习\handwritedigit',
                           transform = test_transform,
                           train=False
                           )
def evaluation(outputs, labels):
    pred_y = torch.max(outputs,1)[1].data.cpu().numpy().squeeze()
    correct = sum(pred_y == labels.cpu().numpy()).item()
    return correct

In [3]:
import numpy as np
import torchvision
from torch.utils.data import DataLoader
BATCH_SIZE = 64
INPUT_SIZE = 28
TIME_STEP = 28
print('train_data.train_data.size():', train_set.data.size())    # 打印训练集特征的size
print('train_data.train_labels.size():', train_set.targets.size())    # 打印训练集标签的size

# 先归一化，在分割数据，为节约时间，只取了部分
data_x = train_set.data.type(torch.FloatTensor)/255.
data_y = train_set.targets.type(torch.Tensor)
#
train_x = data_x[:50000]
train_y = data_y[:50000]
valid_x = data_x[50000:]
valid_y = data_y[50000:]
# test_x = data_x[12000:14000]
# test_y = data_y[12000:14000]
#
data_train = list(train_x.numpy().reshape(1,-1, TIME_STEP, INPUT_SIZE))  # 使用list只会把最外层变为list，内层还是ndarray，和.tolist()方法不同
data_valid = list(valid_x.numpy().reshape(1,-1, TIME_STEP, INPUT_SIZE))
# data_test = list(test_x.numpy().reshape(1,-1, TIME_STEP, INPUT_SIZE))
data_train.append(list(train_y.numpy().reshape(-1,1)))
data_valid.append(list(valid_y.numpy().reshape(-1,1)))
# data_test.append(list(test_y.numpy().reshape(-1,1)))
#
data_train = list(zip(*data_train))   # 最外层是list，次外层是tuple，内层都是ndarray
data_valid = list(zip(*data_valid))
# data_test = list(zip(*data_test))
#
train_loader = DataLoader(data_train, batch_size=BATCH_SIZE, num_workers=8, pin_memory=True, shuffle=True)
valid_loader = DataLoader(data_valid, batch_size=BATCH_SIZE, num_workers=8, pin_memory=True, shuffle=True)
# test_loader = DataLoader(data_test, batch_size=BATCH_SIZE, num_workers=8, pin_memory=True, shuffle=False)
# load data
# train_loader = torch.utils.data.DataLoader(dataset =train_set,
#                                                 batch_size = 64,
#                                                 shuffle = True)
test_loader = torch.utils.data.DataLoader(dataset =test_set,
                                                batch_size = 64,
                                                shuffle = False)

train_data.train_data.size(): torch.Size([60000, 28, 28])
train_data.train_labels.size(): torch.Size([60000])


In [4]:
class Model(nn.Module):
    def __init__(self,embedding_dim, hidden_dim, num_layers, dropout=0.5):
        super(Model,self).__init__()
        self.lstm = nn.LSTM(input_size=embedding_dim,hidden_size=hidden_dim,num_layers=num_layers,batch_first=True)

        self.classifier = nn.Sequential( #nn.Dropout(dropout),
                                         nn.Linear(hidden_dim, 10),
                                         #nn.ReLU()
                                        )
    def forward(self, x):
        # output, (h_n, c_n)
        # output features from last layer of the LSTM
        # h_n tensor containing the hidden state for t = seq_len
        # c_n tensor containing the cell state for t = seq_len
        lstm_out, _ = self.lstm(x)
        # x 的 dimension (batch, seq_len, hidden_size)
        # 取用 LSTM 最後一層的 hidden state   ??
        #  x = h_n[-1, :, :]   或者这样
        lstm_out = lstm_out[:, -1, :]      # LSTM 最后一层的 hidden state
        lstm_out = self.classifier(lstm_out)
        return lstm_out

In [5]:
def training(batch_size, n_epoch, lr, model_dir, train, valid, model, device):
    total = sum(p.numel() for p in model.parameters())
    trainable = sum(p.numel() for p in model.parameters() if p.requires_grad)
    print('\nstart training, parameter total:{}, trainable:{}\n'.format(total, trainable))
    model.train() # 將 model 的模式設為 train，這樣 optimizer 就可以更新 model 的參數
    criterion = nn.CrossEntropyLoss() #
    t_batch = len(train)
    v_batch = len(valid)
    optimizer = optim.Adam(model.parameters(), lr=lr) # 將模型的參數給 optimizer，並給予適當的 learning rate
    total_loss, total_acc, best_acc = 0, 0, 0
    for epoch in range(n_epoch):
        total_loss, total_acc = 0, 0
        # 這段做 training
        for i, (x, y) in enumerate(train):      #  for i, data in enumerate(train)   data[0]  inputs data[1] labels
            x = x.to(device, dtype=torch.float) # device 為 "cuda"，將 inputs 轉成 torch.cuda.LongTensor
            y = y.to(device, dtype=torch.long)# device為 "cuda"，將 labels 轉成 torch.cuda.FloatTensor，因為等等要餵進 criterion，所以型態要是 float
            x = x.squeeze()
            y = y.squeeze()
            y_pred = model(x) # 將 input 餵給模型
            #y_pred = y_pred.squeeze() # 去掉最外面的 dimension，好讓 outputs 可以餵進 criterion()
            # print(y_pred.shape)
            # print(y.shape)
            loss = criterion(y_pred, y) # 計算此時模型的 training loss
            optimizer.zero_grad() # 由於 loss.backward() 的 gradient 會累加，所以每次餵完一個 batch 後需要歸零
            loss.backward() # 算 loss 的 gradient
            optimizer.step() # 更新訓練模型的參數
            correct = evaluation(y_pred, y) # 計算此時模型的 training accuracy
            total_acc += (correct / batch_size)
            total_loss += loss.item()
        print('[ Epoch{}: {}/{} ] loss:{:.3f} acc:{:.3f} '.format(
                epoch+1, i+1, t_batch, loss.item(), correct*100/batch_size), end='\r')
    print('\nTrain | Loss:{:.5f} Acc: {:.3f}'.format(total_loss/t_batch, total_acc/t_batch*100))
        # 這段做 validation
    model.eval() # 將 model 的模式設為 eval，這樣 model 的參數就會固定住
    with torch.no_grad():
        total_loss, total_acc = 0, 0
        for i, (in_x, labels) in enumerate(valid):
            in_x = in_x.to(device, dtype=torch.float) # device 為 "cuda"，將 inputs 轉成 torch.cuda.LongTensor
            labels = labels.to(device, dtype=torch.long)# device 為 "cuda"，將 labels 轉成 torch.cuda.FloatTensor，因為等等要餵進 criterion，所以型態要是 float
            in_x = in_x.squeeze()
            labels = labels.squeeze()
            y_pred = model(in_x) # 將 input 餵給模型
            #y_pred = y_pred.squeeze() # 去掉最外面的 dimension，好讓 outputs 可以餵進 criterion()
            loss = criterion(y_pred, labels) # 計算此時模型的 validation loss
            correct = evaluation(y_pred, labels) # 計算此時模型的 validation accuracy
            total_acc += (correct / batch_size)
            total_loss += loss.item()

        print("Valid | Loss:{:.5f} Acc: {:.3f} ".format(total_loss/v_batch, total_acc/v_batch*100))
        if total_acc > best_acc:
            # 如果 validation 的結果優於之前所有的結果，就把當下的模型存下來以備之後做預測時使用
            best_acc = total_acc
            #torch.save(model, "{}/val_acc_{:.3f}.model".format(model_dir,total_acc/v_batch*100))
            torch.save(model, "{}/lstm.model".format(model_dir))
            print('saving model with acc {:.3f}'.format(total_acc/v_batch*100))
        print('-----------------------------------------------')
        model.train() # 將 model 的模式設為 train，這樣 optimizer 就可以更新 model 的參數（因為剛剛轉成 eval 模式）

In [6]:
def testing(batch_size, test_loader, model, device):
    model.eval()
    ret_output = []
    criterion = nn.CrossEntropyLoss()
    v_test = len(test_loader)
    with torch.no_grad():
         total_loss, total_acc = 0, 0
         for i, (in_x, labels) in enumerate(test_loader):
            in_x = in_x.to(device, dtype=torch.long) # device 為 "cuda"，將 inputs 轉成 torch.cuda.LongTensor
            labels = labels.to(device, dtype=torch.float) # device 為 "cuda"，將 labels 轉成 torch.cuda.FloatTensor，因為等等要餵進 criterion，所以型態要是 float
            y_pred = model(in_x) # 將 input 餵給模型
            y_pred = y_pred.squeeze() # 去掉最外面的 dimension，好讓 outputs 可以餵進 criterion()
            loss = criterion(y_pred, labels) # 計算此時模型的 validation loss
            correct = evaluation(y_pred, labels) # 計算此時模型的 validation accuracy
            total_acc += (correct / batch_size)
            total_loss += loss.item()

            print("Valid | Loss:{:.5f} Acc: {:.3f} ".format(total_loss/v_test, total_acc/v_test*100))
    return ret_output

In [7]:
# main
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
batch_size = 64
epoch = 5
lr = 0.001
model = Model(embedding_dim=28, hidden_dim=64, num_layers=1, dropout=0.5)
model = model.to(device)

if __name__ == '__main__':
    training(batch_size, epoch, lr, r'G:\研一\深度学习\handwritedigit', train_loader, valid_loader, model, device)


start training, parameter total:24714, trainable:24714

[ Epoch5: 782/782 ] loss:0.282 acc:21.875 
Train | Loss:0.09336 Acc: 97.093
Valid | Loss:0.09460 Acc: 96.766 
saving model with acc 96.766
-----------------------------------------------


  "type " + obj.__name__ + ". It won't be checked "
