In [1]:
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import torch
import torch.nn as nn
import torch.nn.functional as F
from sklearn.preprocessing import MinMaxScaler
from torchinfo import summary

from ignite.engine import *
from ignite.handlers import *
from ignite.metrics import *
from ignite.utils import *
from ignite.contrib.metrics.regression import *
from ignite.contrib.metrics import *

In [2]:
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
path = "./Dataset/"
mmscaler = MinMaxScaler(feature_range=(-1, 1), copy=True)

#training set
train_ax = mmscaler.fit_transform(np.loadtxt(path + "train/Inertial Signals/total_acc_x_train.txt"))
train_ay = mmscaler.fit_transform(np.loadtxt(path + "train/Inertial Signals/total_acc_y_train.txt"))
train_az = mmscaler.fit_transform(np.loadtxt(path + "train/Inertial Signals/total_acc_z_train.txt"))

train_t = np.loadtxt(path + "train/y_train.txt").astype(int)
train_s = np.loadtxt(path + "train/subject_train.txt").astype(int)

#test set
test_ax = mmscaler.fit_transform(np.loadtxt(path + "test/Inertial Signals/total_acc_x_test.txt"))
test_ay = mmscaler.fit_transform(np.loadtxt(path + "test/Inertial Signals/total_acc_y_test.txt"))
test_az = mmscaler.fit_transform(np.loadtxt(path + "test/Inertial Signals/total_acc_z_test.txt"))

test_t = np.loadtxt(path + "test/y_test.txt").astype(int)
test_s = np.loadtxt(path + "test/subject_test.txt").astype(int)

print(train_t)

[5 5 5 ... 2 2 2]


In [3]:
train_size = 7352
test_size = 2947
dim_size = 3
sample_size = 128

In [4]:
# trX = (7352, 128, 3) ... trainのX
trX = np.ones((train_size, sample_size, dim_size), float)
print('trX.shape initial:{0}'.format(trX.shape))
for i in range(train_size):
  temp1 = np.ones((dim_size, sample_size), float)
  temp1[0,:] = train_ax[i,:]
  temp1[1,:] = train_ay[i,:]
  temp1[2,:] = train_az[i,:]
  trX[i,:,:] = temp1.reshape(-1,3)
print('trX.shape assigned:{0}'.format(trX.shape))

# t(movement label) or s(subject label) or both ... trainのY (7352,1)
trY = train_t.reshape(-1,1)

# teX = (2947, 3, 128) ... testのX
teX = np.ones((test_size, sample_size, dim_size), float)
print('teX.shape initial:{0}'.format(teX.shape))
for i in range(test_size):
  temp2 = np.ones((dim_size, sample_size), float)
  temp2[0,:] = test_ax[i,:]
  temp2[1,:] = test_ay[i,:]
  temp2[2,:] = test_az[i,:]
  teX[i,:,:] = temp2.reshape(-1,3)
print('trX.shape assigned:{0}'.format(teX.shape))

# testのY 2947行1列
teY = test_t.reshape(-1,1)

trX.shape initial:(7352, 128, 3)
trX.shape assigned:(7352, 128, 3)
teX.shape initial:(2947, 128, 3)
trX.shape assigned:(2947, 128, 3)


In [5]:
datax = np.vstack([trX, teX])
print('datax.shape:{0}'.format(datax.shape))
datay = np.vstack([trY, teY])
print('datay.shape:{0}'.format(datay.shape))
# dataX = trX and teX (10299, 3, 128)
dataX = torch.Tensor(np.array(datax)).to(device)
# dataY = trY and teY (10299,1)
dataY = torch.Tensor(np.array(datay)).to(device)

trainX = torch.Tensor(np.array(trX)).to(device)
trainY = torch.Tensor(np.array(trY)).to(device)

testX = torch.Tensor(np.array(teX)).to(device)
testY = torch.Tensor(np.array(teY)).to(device)

# trainYをone-hotにするためlongにして1次元配列に戻す　-> min~maxを0~5に直す
trainY = trainY.view(-1).long() - 1
print(trainY.shape)
print(torch.min(trainY), torch.max(trainY))

# one-hotにする -> 誤差を計算できるようにfloatに直す
trainY = F.one_hot(trainY, num_classes=-1)
trainY = trainY.float()
print(trainY.shape)
print(trainY)

datax.shape:(10299, 128, 3)
datay.shape:(10299, 1)
torch.Size([7352])
tensor(0, device='cuda:0') tensor(5, device='cuda:0')
torch.Size([7352, 6])
tensor([[0., 0., 0., 0., 1., 0.],
        [0., 0., 0., 0., 1., 0.],
        [0., 0., 0., 0., 1., 0.],
        ...,
        [0., 1., 0., 0., 0., 0.],
        [0., 1., 0., 0., 0., 0.],
        [0., 1., 0., 0., 0., 0.]], device='cuda:0')


In [6]:
class MyLSTM(nn.Module):
    def __init__(self):
        super().__init__()
        self.hidden_size = 70
        self.lstm = nn.LSTM(input_size=3, hidden_size=self.hidden_size, num_layers=1, batch_first=True) 
        self.relu = nn.ReLU()
        self.linear = nn.Linear(70, 6)
        self.softmax = nn.Softmax(-1)

    def forward(self, x):
        h_0 = torch.zeros(1, x.size(0), self.hidden_size).to(device)
        c_0 = torch.zeros(1, x.size(0), self.hidden_size).to(device)
        _, (h_out, _) = self.lstm(x, (h_0, c_0))
        h_out = h_out.view(-1, self.hidden_size)
        h_out = self.relu(h_out)
        h_out = self.linear(h_out)
        y_hat = self.softmax(h_out)
        return y_hat


# print(summary(MyLSTM(), input_size=(7352, 128, 3), device=torch.device(device)))  

In [11]:
def train(model, optimizer):
  model.train()
  y_hat = model(trainX)
  loss = F.mse_loss(y_hat, trainY)
  # loss = nn.CrossEntropyLoss()
  # loss(y_hat, trainY)
  optimizer.zero_grad()
  loss.backward()
  optimizer.step()
  return loss.item()

def predict(model):
  model.eval()
  train_predict = model(testX)
  print(train_predict.shape)
  #data_predict = train_predict.data.numpy()
  #dataY_plot = dataY.data.numpy()
  # output tensor.argmax
  
  # data_predict = torch.argmax(train_predict, dim=-1)
  # data_predict = F.one_hot(data_predict, num_classes=-1)
  
  # metric = ConfusionMatrix(num_classes=6)
  # metric.attach(default_evaluator, 'cm')
  # y_true = dataY.view(-1).int()
  # y_pred = data_predict
  # print(y_true.shape)
  # print(y_pred.shape)

  # state = default_evaluator.run([[y_pred, y_true]])
  # print(state.metrics['cm'])

loss = []

def main():
  model = MyLSTM()
  optimizer = torch.optim.Adam(model.parameters(), lr=0.01)
  model = model.to(device)

  for epoch in range(500):
    _loss = train(model, optimizer)
    loss.append(_loss)
    if epoch % 20 == 0:
      print(f"Epoch = {epoch+1}, Loss = {_loss:.5f}")
  return model

In [8]:
# def eval_step(engine, batch):
#     return batch

# default_evaluator = Engine(eval_step)

# # create default optimizer for doctests

# param_tensor = torch.zeros([1], requires_grad=True)
# default_optimizer = torch.optim.SGD([param_tensor], lr=0.1)

# # create default trainer for doctests
# # as handlers could be attached to the trainer,
# # each test must define his own trainer using `.. testsetup:`

# def get_default_trainer():

#     def train_step(engine, batch):
#         return batch

#     return Engine(train_step)

# # create default model for doctests

# default_model = nn.Sequential(OrderedDict([
#     ('base', nn.Linear(4, 2)),
#     ('fc', nn.Linear(2, 1))
# ]))

# # manual_seed(666)

In [9]:
model = main()

Epoch = 1, Loss = 0.13914
Epoch = 21, Loss = 0.13941
Epoch = 41, Loss = 0.10847
Epoch = 61, Loss = 0.10754
Epoch = 81, Loss = 0.11516
Epoch = 101, Loss = 0.12286
Epoch = 121, Loss = 0.11097
Epoch = 141, Loss = 0.10835
Epoch = 161, Loss = 0.10251
Epoch = 181, Loss = 0.10147
Epoch = 201, Loss = 0.09864
Epoch = 221, Loss = 0.07674
Epoch = 241, Loss = 0.12249
Epoch = 261, Loss = 0.10909
Epoch = 281, Loss = 0.10713
Epoch = 301, Loss = 0.10066
Epoch = 321, Loss = 0.09406
Epoch = 341, Loss = 0.08523
Epoch = 361, Loss = 0.07816
Epoch = 381, Loss = 0.07890
Epoch = 401, Loss = 0.06939
Epoch = 421, Loss = 0.08663
Epoch = 441, Loss = 0.07831
Epoch = 461, Loss = 0.07224
Epoch = 481, Loss = 0.06974


In [12]:
predict(model)

torch.Size([2947, 6])
