In [None]:
import torch
from torch import nn
from torch.utils.data import Dataset, DataLoader
import numpy as np
import matplotlib.pyplot as plt

In [None]:
T = 60
La = 50
N = T * La

input_layers = N
hidden_layer_1 = 1500
hidden_layer_2 = 500
output_layers = La

In [None]:
data_dir = './data'
filename = 'test_data'
labelname = filename + '_labels'

In [None]:
x = np.load(f'{data_dir}/{filename}.npy')

In [None]:
y = np.load(f'{data_dir}/{labelname}.npy')

In [None]:
split = 80
num_train = int(x.shape[0] * split / 100)
trainf = x[0:num_train]
trainl = y[0:num_train]
testf = x[num_train:x.shape[0]]
testl = y[num_train:x.shape[0]]

In [None]:
class MyDataset(Dataset):
  
    def __init__(self, X, Y):
        self.X = torch.Tensor(X)
        self.Y = torch.Tensor(Y)

    def __len__(self):
        return self.X.shape[0]
  
    def __getitem__(self, index):
        return (self.X[index]), (self.Y[index])

In [None]:
Sigmoid = nn.Sigmoid()
Softmax = nn.Softmax()
LinearSigmoid = nn.SiLU()
class Net(nn.Module):
    Sigmoid = nn.Sigmoid()
    Softmax = nn.Softmax()
    LinearSigmoid = nn.SiLU()
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(input_layers, hidden_layer_1)
        self.fc2 = nn.Linear(hidden_layer_1, hidden_layer_2)
        self.fc3 = nn.Linear(hidden_layer_2, output_layers)
    def forward(self, x):
        x = Sigmoid(self.fc1(x))
        x = Sigmoid(self.fc2(x))
        x = Softmax(self.fc3(x))
        return x.view(-1)

net = Net().cuda()

In [None]:
class Trainer():
    def __init__(self,  dataset, net, loss_f, learning_rate=1e-3, 
                 epoch_amount=500,
                 batch_size=80000,
                 max_batches_per_epoch=None,
                 device='cuda:0', early_stopping=10, 
                 optim=torch.optim.Adam, 
                 scheduler=None, permutate=True):
        
        self.loss_f = loss_f
        self.learning_rate = learning_rate
        self.epoch_amount = epoch_amount
        self.batch_size = batch_size
        self.max_batches_per_epoch = max_batches_per_epoch
        self.device = device
        self.early_stopping = early_stopping
        self.optim = optim
        self.scheduler = scheduler
        self.permutate = permutate
        self.dataset = dataset
        self.start_model = net
        self.best_model = net

        self.train_loss = []

        self.final_c = []
        self.final_p = []
        self.best_model_n = []
        ### NEW
        self.train_perc = []
        self.test_perc = [] 
        

    def predict(self, X):
        return self.best_model(X)

    def fit(self, X_train, X_test, y_train, y_test):       
        Net = self.start_model
                  
        device = torch.device(self.device)
        print(device, y_train.shape[0], y_test.shape[0])
        Net.to(self.device)
        
        optimizer = self.optim(Net.parameters(), lr=self.learning_rate)
        
        if self.scheduler is not None:
            scheduler = self.scheduler(optimizer)
        
        train = self.dataset(X_train, y_train)
        val = self.dataset(X_test, y_test)  
        
        train = DataLoader(train, batch_size=y_train.shape[0], shuffle = False) 
        val = DataLoader(val, batch_size=y_test.shape[0], shuffle = False)
        
        best_val_loss = float('inf') # Лучшее значение функции потерь на валидационной выборке
                                     # функции потерь на валидационной выборке
        best_ep = 0                  # Эпоха, на которой достигалось лучшее 
                                     # значение функции потерь на валидационной выборке
        tStart = time.monotonic()
        for epoch in range(self.epoch_amount): 
            Net.train()
            mean_loss = 0
            batch_n = 0
            mean_pred = []
            for batch_X, target in train:
                optimizer.zero_grad()
                batch_X = batch_X.to(self.device)
                target = target.to(self.device)
                predicted_values = Net(batch_X)
                target = target.view(predicted_values.size()) 

                loss = self.loss_f(predicted_values, target)
                loss.backward()
                optimizer.step()
                
                if epoch == self.epoch_amount-1:
                    self.final_c.append(batch_X.cpu().detach().numpy())
                    self.final_p.append(predicted_values.cpu().detach().numpy())

                mean_loss += float(loss)
                batch_n += 1

            mean_loss /= batch_n
            self.train_loss.append(mean_loss)

            if epoch == 0:
                tt = mean_loss
   
            if epoch % 25 == 0:
                tEnd = time.monotonic()
                print('{:4} - {:.6f} - {:5.1f} sec'.format(epoch, mean_loss/tt, (tEnd - tStart)))
                tStart = time.monotonic()
            if epoch % 100 == 0:    
                train_num = 0
                num = 0
                for batch_X, target in train:
                    batch_X = batch_X.to(self.device)
                    target = target.to(self.device)
                    predicted_values = Net(batch_X)
                    predicted_values = predicted_values.view(target.size())
                    targetIND = torch.argmax(target, dim = 1)
                    predictIND = torch.argmax(predicted_values, dim = 1)
                    for i, j in zip(targetIND, predictIND):
                        if i == j:
                            train_num += 1
                        num += 1
                self.train_perc.append(train_num/num*100)
                print('train - {:5.2f}%'.format(train_num/num*100), end = ' ')
                val_num = 0
                num = 0
                for batch_X, target in val:
                    batch_X = batch_X.to(self.device)
                    target = target.to(self.device)
                    predicted_values = Net(batch_X)
                    predicted_values = predicted_values.view(target.size())
                    targetIND = torch.argmax(target, dim = 1)
                    predictIND = torch.argmax(predicted_values, dim = 1)
                    for i, j in zip(targetIND, predictIND):
                        if i == j:
                            val_num += 1
                        num += 1
                self.test_perc.append(val_num/num*100)
                print('test - {:5.2f}%'.format(val_num/num*100)) 

In [None]:
params = {
    'dataset': MyDataset,
    'net': net,
    'epoch_amount': 10000, 
    'learning_rate': 2e-4,
    'early_stopping': 25,
    'loss_f': nn.BCELoss(), # BCELoss, CrossEntropyLoss(), MSELoss()
    'optim': torch.optim.Adam,
}

In [None]:
clf = Trainer(**params)
clf.fit(trainf, testf, trainl, testl)

In [None]:
plt.figure(figsize = (16,8))
plt.plot(10*np.log10(np.asarray(clf.train_loss)/clf.train_loss[0]))
plt.show()

In [None]:
plt.figure(figsize = (16,8))
xt = np.asarray(range(len(clf.train_perc)))*100
plt.plot(xt, np.asarray(clf.train_perc), color = 'red')
plt.plot(xt, np.asarray(clf.test_perc), color = 'green')
plt.show()

In [None]:
print(np.asarray(clf.train_perc)[-1], np.asarray(clf.test_perc)[-1])

In [None]:
savedir = './NN_states'

In [None]:
state = {
    'epoch': np.asarray(clf.train_loss).shape[0],
    'hidden_layers': (hidden_layer_1, hidden_layer_2),
    'func_activ': ("Logsig", "Logsig"), 
    'state_dict' : net.state_dict(),
    'optim': params['optim'],
    'loss_f': "BCELoss",
    'train_test': (np.asarray(clf.train_perc)[-1], np.asarray(clf.test_perc)[-1]),
    'data_dir': data_dir,
    'labels': labelname
}

torch.save(state, f"{savedir}/{labelname}_test.st")