In [None]:
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset, ConcatDataset, SubsetRandomSampler
import pandas as pd
import numpy as np

import os
import shutil
import cv2
import time
from sklearn.model_selection import KFold, train_test_split
import math 
import matplotlib.pyplot as plt
from PIL import Image
from collections import Counter

In [None]:
def Generate_Data(number):
     data = torch.FloatTensor(number,1,100).uniform_(1,5)
     return data 

def Generate_label(number):
     data = torch.randint(low=0, high=2, size=(number,100), dtype=torch.float)
     data = data.to(torch.int64)
     return data 

In [None]:
train_x = Generate_Data(400)
train_y = Generate_label(400)

test_x = Generate_Data(80)
test_y = Generate_label(80)

In [None]:
# testing 時不需做 data augmentation
train_transform = transforms.Compose([    
    transforms.ToPILImage(),
    transforms.ToTensor(),
])

test_transform = transforms.Compose([ 
    transforms.ToPILImage(),   
    transforms.ToTensor(),
])

class EPG_Dataset(Dataset):
    def __init__(self, x, y, transform=None):
        self.x = x
        # label is required to be a LongTensor
        self.y = y
        self.transform = transform
    def __len__(self):
        return len(self.x)
    def __getitem__(self, index):
        X = self.x[index]
        if self.transform is not None:
            X = self.transform(X)
            Y = self.y[index]
            return X, Y

In [None]:
batch_size = 30

train_set = EPG_Dataset(train_x, train_y, train_transform)
test_set = EPG_Dataset(test_x, test_y, test_transform)
print("Size of training data = {}".format(len(train_x)))
print("Size of testing data = {}".format(len(test_x)))

train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_set, batch_size=batch_size, shuffle=True)

Size of training data = 400
Size of testing data = 80


In [None]:
class EPG_CNN_RNN(nn.Module):
    def __init__(self):
        super(EPG_CNN_RNN, self).__init__()
        # torch.nn.Conv2d(in_channels, out_channels, kernel_size, stride, padding)
        # torch.nn.MaxPool2d(kernel_size, stride, padding)
        # input 維度 [100]
        self.cnn = nn.Sequential(
            nn.Conv2d(1, 32, 3, 1, 1),  #[32, 50]
            nn.MaxPool2d(1, 2, 0),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            
            nn.Conv2d(32, 64, 3, 1, 1),  #[64, 25]
            nn.MaxPool2d(1, 2, 0),
            nn.BatchNorm2d(64),
            nn.ReLU(),   
        )       

        self.rnn = nn.RNN(64*25, 600, 2, batch_first=True)
        
        self.fc = nn.Sequential(
            nn.Linear(600, 300),
            nn.ReLU(),
            nn.Linear(300, 200),
        )
        

    def forward(self, x):
        out = self.cnn(x)
        out = out.view(out.size()[0], -1)
        #out = self.rnn(out)
        #out = out[0][:, -1, :]
        out, _ = self.rnn(out.unsqueeze(0))
        out = self.fc(out.squeeze(0))
        #out = self.fc(out)
        out = out.view(out.size()[0], 100, 2)

        return out

In [None]:
model = EPG_CNN_RNN().cuda()
#model.load_state_dict(torch.load('./model_ep?.pt'))
loss = nn.CrossEntropyLoss() 
optimizer = torch.optim.Adam(model.parameters(), lr=0.0005) 
num_epoch = 25

for epoch in range(num_epoch):
    epoch_start_time = time.time()
    train_acc = 0.0
    train_loss = 0.0

    model.train()
    for i, data in enumerate(train_loader):
        optimizer.zero_grad()
        train_pred = model(data[0].cuda())
        batch_loss = loss(train_pred.view(-1,2), data[1].view(-1).cuda())
        batch_loss.backward()
        optimizer.step()
        #print(np.argmax(train_pred.cpu().data.numpy(), axis=2))
        #print(data[1].numpy())
        train_acc += np.sum(np.argmax(train_pred.cpu().data.numpy(), axis=2) == data[1].numpy())
        train_loss += batch_loss.item()


    print('[%03d/%03d] %2.2f sec(s) Train Acc: %3.6f Loss: %3.6f' % \
      (epoch + 1, num_epoch, time.time()-epoch_start_time, \
      train_acc/(train_set.__len__()*100), train_loss/(train_set.__len__()*100)))

[001/025] 0.14 sec(s) Train Acc: 0.501175 Loss: 0.000243
[002/025] 0.15 sec(s) Train Acc: 0.544250 Loss: 0.000240
[003/025] 0.15 sec(s) Train Acc: 0.621975 Loss: 0.000232
[004/025] 0.15 sec(s) Train Acc: 0.681350 Loss: 0.000219
[005/025] 0.14 sec(s) Train Acc: 0.731925 Loss: 0.000203
[006/025] 0.15 sec(s) Train Acc: 0.765125 Loss: 0.000185
[007/025] 0.16 sec(s) Train Acc: 0.792375 Loss: 0.000169
[008/025] 0.14 sec(s) Train Acc: 0.819075 Loss: 0.000154
[009/025] 0.15 sec(s) Train Acc: 0.846125 Loss: 0.000139
[010/025] 0.14 sec(s) Train Acc: 0.872100 Loss: 0.000124
[011/025] 0.14 sec(s) Train Acc: 0.891825 Loss: 0.000111
[012/025] 0.14 sec(s) Train Acc: 0.905175 Loss: 0.000101
[013/025] 0.14 sec(s) Train Acc: 0.924200 Loss: 0.000090
[014/025] 0.14 sec(s) Train Acc: 0.934750 Loss: 0.000082
[015/025] 0.14 sec(s) Train Acc: 0.947375 Loss: 0.000073
[016/025] 0.14 sec(s) Train Acc: 0.954775 Loss: 0.000066
[017/025] 0.15 sec(s) Train Acc: 0.961650 Loss: 0.000061
[018/025] 0.14 sec(s) Train Acc

In [None]:
model.eval()
test_acc, test_loss = 0.0,0
with torch.no_grad():
        for i, data in enumerate(test_loader):
            test_pred = model(data[0].cuda())
            batch_loss = loss(test_pred.view(-1,2), data[1].view(-1).cuda())
            
            test_acc += np.sum(np.argmax(test_pred.cpu().data.numpy(), axis=2) == data[1].numpy())
            test_loss += batch_loss.item()

print("Test Acc:{}, Test Loss:{}".format(test_acc/(test_set.__len__()*100), test_loss/(test_set.__len__()*100)))

Test Acc:0.492625, Test Loss:0.0005852205157279968
