In [None]:
import warnings
warnings.filterwarnings("ignore")
from tqdm import tqdm
import torch
from torch.distributions import Categorical
import torch.nn.functional as F
import os
os.environ['KMP_DUPLICATE_LIB_OK']='True'
import optuna

In [None]:
best_model_path = path+'best_models\\'
class Flexible_Net(torch.nn.Module):
    def __init__(self, num_in, hidden_size, num_out, num_layers=1, actv_index=0, last_actv_index=0, bidirectional=False):
        super(Flexible_Net, self).__init__()
        self.num_layers = num_layers
        activation_functions = [torch.nn.ReLU(), torch.nn.RReLU(), torch.nn.LeakyReLU(), torch.nn.PReLU(), torch.nn.Softplus(), 
                                torch.nn.ELU(), torch.nn.CELU(), torch.nn.SELU(), torch.nn.GELU(), torch.nn.ReLU6(), torch.nn.Sigmoid(), 
                                torch.nn.Tanh(), torch.nn.Softsign(), torch.nn.Hardtanh(), torch.nn.Tanhshrink(), torch.nn.Softshrink(), 
                                torch.nn.Hardshrink(), torch.nn.LogSigmoid(), torch.nn.Softmin(), torch.nn.Softmax(), torch.nn.LogSoftmax()]
        if bidirectional==False:
            self.initial_layer = torch.nn.LSTM(num_in, hidden_size, dropout=0.5)
            self.mid_layer = torch.nn.LSTM(hidden_size, hidden_size, dropout=0.5)
            self.last_layer = torch.nn.Linear(hidden_size, num_out)
        else:
            self.initial_layer = torch.nn.LSTM(num_in, hidden_size, dropout=0.5, bidirectional=True)
            self.mid_layer = torch.nn.LSTM(hidden_size*2, hidden_size, dropout=0.5, bidirectional=True)
            self.last_layer = torch.nn.Linear(hidden_size*2, num_out)            
        self.act_fun = activation_functions[actv_index]
        self.last_act_fun = activation_functions[last_actv_index]
        
    def forward(self, x):
        x, (h_n, h_c) = self.initial_layer(x)
        x = self.act_fun(x)
        if self.num_layers>1:
            for i in range(self.num_layers):
                x, (h_n, h_c) = self.mid_layer(x)
                x = self.act_fun(x)
        else:
            pass
        x = self.last_layer(x)
        x = self.last_act_fun(x)
        return x

In [None]:
def objective(trial):
    #########################################################################################
    learning_rate = trial.suggest_discrete_uniform('learning_rate', 0.00001, 0.001, 0.00001)
    update_per_n_batch = trial.suggest_int('update_per_n_batch', int(len(train_x)/250), int(len(train_x)/10), 15)
    num_layers = trial.suggest_int('num_layers', 1, 10, 2)
    hidden_size = trial.suggest_int('hidden_size', 64, 256, 8)
    activation_functions_index = trial.suggest_int('activation_functions_index', 0, 20, 1)
    last_activation_functions_index = trial.suggest_int('last_activation_functions_index', 0, 20, 1)
    bidirectional_value = trial.suggest_categorical('bidirectional_value', ['True', 'False'])
    torch.manual_seed(123456)
    #########################################################################################
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    if os.path.exists(best_model_path) == True:
        pass
    else:
        os.makedirs(best_model_path) 
    n_features = train_x.shape[-1]
    output_feature = 1
    model = Flexible_Net(num_in=n_features, hidden_size=hidden_size, num_out=output_feature, num_layers=num_layers, 
                         actv_index=activation_functions_index, last_actv_index=last_activation_functions_index, 
                         bidirectional=bidirectional_value).to(device)
    #########################################################################################
    rounds_of_train = 35
    window_size = 65
    early_stop_threshold = 4
    #########################################################################################
    criterion = torch.nn.CrossEntropyLoss()
    optimizer = torch.optim.AdamW(model.parameters(), lr=learning_rate)
    model.train()
    best_acc = -np.inf
    early_stop_counter = 0
    last_acc = 0
    for r in range(rounds_of_train):
        accuracy_count = []
        output_list = []
        valid_list = []
        for i in tqdm(range(train_x.shape[0]-window_size)):
            inputs = torch.as_tensor(train_x[i]).to(device)
            valid_y = torch.as_tensor(train_y[i]).to(device)#.float()
            output = model(inputs)
            output = output[-1]
            output = output.reshape(len(output),1)
            output_list.append(output.cpu().detach().numpy())
            valid_y = valid_y.reshape(output_feature,1) 
            valid_list.append(valid_y.cpu().detach().numpy())
            if i % update_per_n_batch==0 or i==train_x.shape[0]-window_size:
                single_loss = criterion(torch.tensor(output_list,requires_grad=True), torch.tensor(valid_list,requires_grad=True))
                single_loss.backward()
                optimizer.step()
                output_list = []
                valid_list = []
            else:
                pass
            if output[0][0]>=0.75 and valid_y[0][0]==1:
                accuracy_count.append(1)
            elif output[0][0]<=0.25 and valid_y[0][0]==0:
                accuracy_count.append(1)
            else:
                accuracy_count.append(0)
        accuracy = np.mean(accuracy_count)
        print('Round ', r, ' has finished, accuracy: ', accuracy)
        if accuracy > best_acc:
            best_acc = accuracy
            print('New Best Accuracy: ', best_acc, ', saving to ', best_model_path+'best_model(LSTM)_'+str(accuracy)+'_.pth')
            torch.save(model, best_model_path+'best_model(LSTM)_'+str(accuracy)+'_.pth')
        else:
            pass
        print('Current Best Accuracy: ', best_acc)
        if accuracy <= last_acc:
            early_stop_counter = early_stop_counter+1
            print('Early Stop Counter: ', early_stop_counter, '. Training will stop at: ', early_stop_threshold)
        else:
            pass
        if early_stop_counter>=early_stop_threshold:
            break
        else:
            pass
        last_acc = accuracy
    return best_acc

study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=1000, n_jobs = 1)
print(study.best_params)