<a href="https://colab.research.google.com/github/YuminosukeSato/minddrone/blob/main/preprocessing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [8]:
import numpy as np
import math
from sklearn.manifold import TSNE
from matplotlib import pyplot
import pickle
import matplotlib.pyplot as plt
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
import pandas as pd
import torch
import torch.nn as nn
from torch import optim
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torch.utils.data import TensorDataset
def preprocessing(floatArray):
    window = np.hamming(125)
    w = np.empty(125)
    d2 = np.empty((floatArray.shape[0],27*16))
    for i in range(floatArray.shape[0]-125):
        for j in range(15):
            w = np.abs(np.fft.fftn(floatArray[i:i+125,j]*window))
            d2[i,27*j:27*j+26] = np.log10(1 + w[4:30])
    return d2
def labeler(array):
  label = np.empty(array.shape[0])
  for i in range(array.shape[0]-125):
    if i <= 1250:
      label[i] = 0 #0 is normal
    elif i <= 2500:
      label[i] = 1 #1is forward
    elif i <= 3750:
      label[i] = 2 # 2 is righet
    elif i <= 5000:
      label[i] = 3
    elif i <= 6250:
      label[i] = 4
    elif i <= 7500:
      label[i] = 5
    else:
      label[i] = 6
  return label


if __name__ == "__main__":
    data = np.loadtxt("/content/OpenBCI-RAW-2022-02-16_16-37-06.txt",dtype='str',delimiter=",",skiprows=5)
    b = data[:,1:17]
    floatArray = b.astype(float)
    d2 = np.empty((floatArray.shape[0],27*16))
    d2 = preprocessing(floatArray)
    ts = d2[125*60-1:125*150,:]
    label = labeler(ts)
    train_data, test_data, train_label, test_label = train_test_split(ts, label, test_size=0.2)
    train_x = torch.Tensor(train_data)
    test_x = torch.Tensor(test_data)
    train_y = torch.LongTensor(train_label)  # torch.int64のデータ型に
    test_y = torch.LongTensor(test_label)
    train_dataset = TensorDataset(train_x, train_y)
    test_dataset = TensorDataset(test_x, test_y)

[6. 3. 6. ... 5. 6. 3.]


In [10]:
epochs = 100
n_input = 1
n_hidden = 10
n_output = 1
num_layers = 2
n_batch = 20
n_data = 1000
n_test = 200


In [9]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import math

class RNNHardCell(nn.Module):
    def __init__(self, n_input:int, n_hidden:int, state=None) -> None:
        super(RNNHardCell, self).__init__()
        self.n_input = n_input
        self.n_hidden = n_hidden
        self.in_h = nn.Linear(self.n_input, self.n_hidden, bias=False)
        self.h_h = nn.Linear(self.n_hidden, self.n_hidden, bias=False)
        self.state = state
        self.register_parameter()

    def register_parameter(self) -> None:
        stdv = 1.0 / math.sqrt(self.n_hidden)
        for weight in self.parameters():
            nn.init.uniform_(weight, -stdv, stdv)
    
    def forward(self, x, state=None):
        self.state = state
        if self.state is None:
            self.state = F.hardtanh(self.in_h(x))
        else:
            self.state = F.hardtanh(self.in_h(x) + self.h_h(self.state))
        return self.state

class RNNModel(nn.Module):
    def __init__(self, n_input, n_hidden, n_output, num_layers=1):
        super(RNNModel, self).__init__()
        self.rnn = RNNHardCell(n_input, n_hidden)
        self.out = nn.Linear(n_hidden, n_output, bias=False)
        self.num_layers = num_layers
        
    def forward(self, xs, state=None):
        state = None
        h_seq = []
        
        for x in xs:
            x = torch.from_numpy(np.asarray(x)).float()
            x = x.unsqueeze(0)
            for _ in range(self.num_layers):
                state = self.rnn(x, state)
            h_seq.append(state)
        
        h_seq = torch.stack(h_seq)
        ys = self.out(h_seq)
        ys = torch.transpose(ys, 0, 1)

        return ys

In [14]:
import os
y_test_torch = torch.from_numpy(np.asarray(test_y))
y_test_torch = y_test_torch.unsqueeze(0)
model = RNNModel(n_input, n_hidden, n_output, num_layers)
optimizer = optim.Adam(model.parameters())
MSE = nn.MSELoss()

train_loss = []
test_loss = []

if os.path.exists("result/loss") == False:
    os.makedirs("result/loss")
if os.path.exists("result/eval") == False:
        os.makedirs("result/eval")
if os.path.exists("result/model") == False:
        os.makedirs("result/model")

    
for epoch in range(1, epochs + 1):
        model.train()
        perm = np.random.permutation(n_data)
        sum_loss = 0
for i in range(0, n_data, n_batch):
            x_batch = train_data[perm[i:i + n_batch]]
            ant_batch = train_label[perm[i:i + n_batch]]
            ant_batch = torch.from_numpy(ant_batch).double()
            ant_batch = ant_batch.unsqueeze(0)

            optimizer.zero_grad()
            y_batch_pred = model(x_batch, ant_batch).double()
            loss = MSE(y_batch_pred, ant_batch)
            loss.backward()
            optimizer.step()
            sum_loss += loss.data * n_batch

        # loss(train)
ave_loss = sum_loss / n_data
train_loss.append(ave_loss)

        # loss(test)
model.eval()
y_test_pred = model.forward(test_x)
loss = MSE(y_test_pred, y_test_torch)
test_loss.append(loss.data)

        # loss display
if epoch % 100 == 1:
            print("Ep/MaxEp     train_loss     test_loss")

if epoch % 10 == 0:
            print("{:4}/{}  {:10.5}   {:10.5}".format(epoch, epochs, ave_loss, float(loss.data)))

if epoch % 20 == 0:
            plt.figure(figsize=(5, 4))
            y_pred = model.forward(test_x)
            # tensor → numpy
            y_pred = y_pred.to('cpu').detach().numpy().copy()
            plt.plot(test_x, test_y, label = "target")
            plt.plot(test_x, y_pred[0], label = "predict")
            plt.legend()
            plt.grid(True)
            plt.xlim(0, 2 * np.pi)
            plt.ylim(-1.2, 1.2)
            plt.xlabel("x")
            plt.ylabel("y")
            plt.savefig("result/eval/ep{}.png".format(epoch))
            plt.clf()
            plt.close()

    # save loss glaph
plt.figure(figsize=(5, 4))
plt.plot(train_loss, label = "training")
plt.plot(test_loss, label = "test")
plt.yscale('log')
plt.legend()
plt.grid(True)
plt.xlabel("epoch")
plt.ylabel("loss (MSE)")
plt.savefig("result/loss/loss_history.png")
plt.clf()
plt.close()

    # save best_model
torch.save(model, "result/model/best_model.pt")

RuntimeError: ignored