# 2022/08/31

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.utils.prune as prune
import torch.nn.functional as F

from torch.utils.data import TensorDataset
from torch.utils.data import DataLoader
from torch.utils.data import random_split

from scipy import io
import os

### MAT file에서 data 읽고 준비

In [None]:
mat_file = io.loadmat('/Users/goldenyoo/Library/Mobile Documents/com~apple~CloudDocs/BioCAS_prepare/Python_code/Data_center/one_dx/Calib_data_1.mat')

K1 = mat_file['K1']
A1 = mat_file['A1']

K2 = mat_file['K2']
A2 = mat_file['A2']

Y1 = mat_file['Y1']
Y2 = mat_file['Y2']

In [None]:
# K 특성에 대한 Class1 vs Class2 Data 가져오기
k1 = torch.FloatTensor(K1)
k1 = k1.transpose(0,2)

k2 = torch.FloatTensor(K2)
k2 = k2.transpose(0,2)

# A 특성에 대한 Class1 vs Class2 Data 가져오기
a1 = torch.FloatTensor(A1)
a1 = a1.transpose(0,2)

a2 = torch.FloatTensor(A2)
a2 = a2.transpose(0,2)

print( "Total_data: {}".format(k1.size()[0]), "/ input_size: {}".format(k1.size()[1]),"/ Seq_len: {}\n".format(k1.size()[2]))
print("k1 size: ",k1.size())
print("k2 size: ",k2.size())
print("a1 size: ",a1.size())
print("a2 size: ",a2.size())

# Y에 대한 Class1 vs Class2 Data 가져오기
y1 = torch.LongTensor(Y1)
y2 = torch.LongTensor(Y2)

print("\ny1 size:",y1.size())
print("y2 size:",y2.size())

### Train data 만들기

In [None]:
k_train = torch.cat([k1,k2],dim=0)
a_train = torch.cat([a1,a2],dim=0)

y_train = torch.cat([y1,y2],dim=0)
print("k_train size: {}".format(k_train.size()))
print("a_train size: {}".format(a_train.size()))
print("y_train size: {}".format(y_train.size()))

### y_train의 one-hot coding

In [None]:
# y_train_one_hot = F.one_hot(y_train-1,num_classes=2)
# print(y_train_one_hot.size())
# y_train_one_hot.squeeze_()
# print(y_train_one_hot.size())

y_train = y_train-1 # y를 0~1의 정수로 만들어야함.
print(y_train.size())

### Dataset & DataLoader

In [None]:
batch_size = 64

dataset = TensorDataset(k_train,a_train,y_train) # 각 tensor의 첫번째 dim이 일치해야한다

# Data Split
dataset_size = len(dataset)
train_size = int(dataset_size * 0.8)
validation_size = int(dataset_size * 0.1)
test_size = dataset_size - train_size - validation_size

train_dataset, valid_dataset, test_dataset = random_split(dataset, [train_size, validation_size, test_size])

train_dataloader    = DataLoader(train_dataset  ,batch_size=      batch_size  , shuffle=True, drop_last=True)
valid_dataloader    = DataLoader(valid_dataset  ,batch_size= int(batch_size/8), shuffle=True, drop_last=True)
test_dataloader     = DataLoader(test_dataset   ,batch_size= test_size,         shuffle=True, drop_last=True)

In [None]:
hidden_size = 3
lstm_output_size = hidden_size
input_size = 22
n_class = 2

dtype = torch.float

class TextLSTM(nn.Module):
  def __init__(self):
    super(TextLSTM, self).__init__()

    self.lstm_1 = nn.LSTM(input_size=input_size, hidden_size=hidden_size, dropout=0.3)
    self.lstm_2 = nn.LSTM(input_size=input_size, hidden_size=hidden_size, dropout=0.3)
    self.fc = nn.Linear(hidden_size*2, n_class)

  def forward(self, hidden_and_cell_k, hidden_and_cell_a, K_and_A):
    (k, a) = K_and_A



    k = k.transpose(1,2)
    k = k.transpose(0,1)
    a = a.transpose(1,2)
    a = a.transpose(0,1)

    outputs1, (h_n1,c_n1) = self.lstm_1(k, hidden_and_cell_k)
    outputs2, (h_n2,c_n2) = self.lstm_2(a, hidden_and_cell_a)

    outputs = torch.cat((h_n1[-1],h_n2[-1]), dim=1)  

    model = self.fc(outputs)  # 최종 예측 최종 출력 층
    return model

In [None]:
model = TextLSTM()
print(model)

In [None]:
n_epochs = 300
prunFreq = 1

"""
Training
"""
model = TextLSTM()
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)

model.train()

for epoch in range(n_epochs+1):
  for batch_idx, samples in enumerate(train_dataloader):

    k_train_mb, a_train_mb, y_train_mb = samples 

    hidden_k  = torch.zeros(1, batch_size, hidden_size, requires_grad=True)
    cell_k    = torch.zeros(1, batch_size, hidden_size, requires_grad=True)
    hidden_a  = torch.zeros(1, batch_size, hidden_size, requires_grad=True)
    cell_a    = torch.zeros(1, batch_size, hidden_size, requires_grad=True)

    # Forward
    output = model((hidden_k, cell_k), (hidden_a, cell_a), (k_train_mb,a_train_mb))

    # Cost
    loss = criterion(output, y_train_mb.squeeze())

    if (epoch) % 50 == 0 and batch_idx % 2 == 0:
      # print('Epoch:', '%04d' % (epoch + 1), 'cost =', '{:.6f}'.format(loss))
      print('Epoch {:3d}/{} Batch: {:2d} Cost: {:.6f}'.format(epoch, n_epochs, batch_idx, loss))
    
    # Backpropagate
    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

In [None]:
model.eval()

for batch_idx, samples in enumerate(test_dataloader):
    k_train_mb, a_train_mb, y_train_mb = samples 

    hidden_k    = torch.zeros(1, test_size, hidden_size)
    cell_k      = torch.zeros(1, test_size, hidden_size)
    hidden_a    = torch.zeros(1, test_size, hidden_size)
    cell_a      = torch.zeros(1, test_size, hidden_size)

    output = model((hidden_k, cell_k), (hidden_a, cell_a), (k_train_mb,a_train_mb))
    prediction = output.argmax(dim=1)
    correct = prediction.eq(y_train_mb.view_as(prediction)).sum().item()
    print(correct/test_size)