In [1]:
import time

import numpy as np
import pandas as pd

import torch
import torch.nn as nn
import torch.optim as optim
import torch.utils.data as Data

from torch.utils.data.sampler import WeightedRandomSampler

from sklearn import svm
from sklearn.metrics import f1_score
from sklearn.model_selection import train_test_split

In [11]:
data_x = np.load("data/k_mers/k4_datax.npy")
data_y = np.load("data/k_mers/k4_datay.npy")

In [7]:
class NN2Layers(torch.nn.Module):
    def __init__(self, ninp: int, nhid: int, ntoken: int, dropout: float=0.0):
        
        super(NN2Layers, self).__init__()
        self.drop = nn.Dropout(dropout)
        self.nn1 = nn.Linear(ninp, nhid)
        self.nn2 = nn.Linear(nhid, ntoken)

        self.init_weights()

        self.ninp = ninp
        self.nhid = nhid
        self.ntoken = ntoken

    def init_weights(self):
        initrange = 0.1
        self.nn1.bias.data.zero_()
        self.nn1.weight.data.uniform_(-initrange, initrange)
        self.nn2.bias.data.zero_()
        self.nn2.weight.data.uniform_(-initrange, initrange)

    def forward(self, x: torch.tensor):

        output = self.nn1(x)
        output = self.drop(output)
        output = self.nn2(output)

        return output.softmax(dim=1)
 
    def predict(self, x: torch.tensor):
        # """預測並輸出機率大的類別

        # Args:
        #     x (torch.tensor): 詞 tensor。如果batch_first=True，input shape為（批次，序列），否則（序列，批次）。

        # Returns:
        #     [torch.tensor]: shape 與 x 一樣，但是序列為類別序列。
        # """
        output = self.forward(x)
        _, output = torch.max(output, 1)

        return output

class Trainer():
    def __init__(self, model, optimizer, loss_fn):
        self.model = model
        self.optimizer = optimizer
        self.loss_fn = loss_fn
    
    def train(self, x, y, valid_x, valid_y, epochs = 2, batch_size = 1, epoch_print = True, sampler = None):
        # Early stopping
        the_last_loss = 100
        patience = 2
        trigger_times = 0
        
        train_dataset = Data.TensorDataset(x, y)
        loader = Data.DataLoader(
            dataset = train_dataset,
            batch_size = batch_size,
            sampler = sampler,
        )
        
        start_time = time.time()
        step_size = len(loader)
        loss_history = []
        for epoch in range(epochs):
            self.model.train()
            epoch_loss = 0
            epoch_time = time.time()
            for step, (batch_x, batch_y) in enumerate(loader):
                step_time = time.time()
                
                self.optimizer.zero_grad()
                pred_y = self.model(batch_x)
                loss = self.loss_fn(pred_y, batch_y)
                loss.backward()
                self.optimizer.step()
                
                loss_history.append(loss.item())
                epoch_loss += loss.item()
            
            
            # Early stopping
            the_current_loss = self.validation(valid_x, valid_y, batch_size=batch_size)
            # print('The current loss:', the_current_loss)
            
            if the_current_loss >= the_last_loss:
                trigger_times += 1
                # print('trigger times:', trigger_times)
                if trigger_times >= patience:
                    print('Early stopping!\nStart to test process.')
                    break
            else:
                # print('trigger times: 0')
                trigger_times = 0
            
            if epoch_print:
                print('Epoch: %i | Loss: %.2f | time: %.2f s' % (epoch, the_current_loss, time.time() - epoch_time))
            
            the_last_loss = the_current_loss
            
        print('All Time: %.2f s | Loss: %.2f' % (time.time() - start_time, the_current_loss))
    
    def validation(self, valid_x, valid_y, batch_size = 1):
        train_dataset = Data.TensorDataset(valid_x, valid_y)
        valid_loader = Data.DataLoader(
            dataset = train_dataset,
            batch_size = batch_size,
        )
        self.model.eval()
        loss_total = 0

        # Test validation data
        with torch.no_grad():
            for step, (batch_x, batch_y) in enumerate(valid_loader):

                outputs = model(batch_x)
                loss = self.loss_fn(outputs, batch_y)
                loss_total += loss.item()

        return loss_total / len(valid_loader)

    def test(self, x, y):
        y_pred = self.model.predict(x)
        
        one_hot_y = np.eye(self.model.ntoken)[y]
        one_hot_y_pred = np.eye(self.model.ntoken)[y_pred]
        token_acc_array = []
        for i in range(self.model.ntoken):
            y_token = torch.tensor(one_hot_y[:, i])
            y_pred_token = torch.tensor(one_hot_y_pred[:, i])
            
            tp = (y_token * y_pred_token).sum(dim=0).to(torch.float32)
            tn = ((1 - y_token) * (1 - y_pred_token)).sum(dim=0).to(torch.float32)
            fp = ((1 - y_token) * y_pred_token).sum(dim=0).to(torch.float32)
            fn = (y_token * (1 - y_pred_token)).sum(dim=0).to(torch.float32)
            precision = tp / (tp + fp)
            rec = tp / (tp + fn)
            f1 = 2 * rec * precision / (rec + precision)
            token_acc_array.append(f1)
        acc = (y_pred == y).float().sum() / len(y)
        token_acc_array = torch.tensor(token_acc_array)
        return acc, token_acc_array

In [4]:
def CV(x, y, folder):
    unique, count = np.unique(y, return_counts=True)
    cv_x = []
    cv_y = []
    for u in unique:
        u_x = x[y == u]
        u_y = y[y == u]
        arr = np.arange(len(u_x))
        np.random.shuffle(arr)
        u_x = u_x[arr]
        u_y = u_y[arr]
        
        linspace = np.linspace(0, len(u_x), folder + 1, dtype=int)
        
        for i in range(folder):
            if unique[0] == u:
                cv_x.append(u_x[linspace[i]:linspace[i+1]])
                cv_y.append(u_y[linspace[i]:linspace[i+1]])
            else:
                cv_x[i] = np.append(cv_x[i], u_x[linspace[i]:linspace[i+1]], axis=0)
                cv_y[i] = np.append(cv_y[i], u_y[linspace[i]:linspace[i+1]], axis=0)
    return cv_x, cv_y

In [5]:
from sklearn.datasets import make_classification
# generate 2 class dataset
x, y = make_classification(n_samples=3000, n_classes=2, weights=[0.05, 0.95], random_state=0)
# summarize dataset
print("x", x.shape)
classes = np.unique(y)
total = len(y)
for c in classes:
	n_examples = len(y[y==c])
	percent = n_examples / total * 100
	print('> Class=%d : %d/%d (%.1f%%)' % (c, n_examples, total, percent))

x (3000, 20)
> Class=0 : 171/3000 (5.7%)
> Class=1 : 2829/3000 (94.3%)


In [12]:
ntoken = 2
nhid = 512

batch_size = 1
lr = 3e-1
epochs = 50

cv_x, cv_y = CV(data_x, data_y, 10)
# cv_x, cv_y = CV(x, y, 10)
score_array = []
for i in range(len(cv_x)):
    train_x = None
    for j in range(len(cv_x)):
        if i == j :
            test_x = cv_x[i]
            test_y = cv_y[i]
        else:
            if train_x is None:
                train_x = cv_x[i]
                train_y = cv_y[i]
            else:
                train_x = np.append(train_x, cv_x[i], axis=0)
                train_y = np.append(train_y, cv_y[i], axis=0)
    
    ninp = train_x.shape[1]

    model = NN2Layers(ninp, nhid, ntoken)
    loss_fn = nn.CrossEntropyLoss()
    # optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=1e-4)
    optimizer = optim.SGD(model.parameters(), lr=lr)
    
    tensor_x = torch.tensor(train_x).to(torch.float)
    tensor_y = torch.tensor(train_y).to(torch.long)

    test_x = torch.tensor(test_x).to(torch.float)
    test_y = torch.tensor(test_y).to(torch.long)
    
    weight = 1. / np.unique(train_y, return_counts=True)[1]
    samples_weight = np.array([weight[t] for t in tensor_y])

    samples_weight = torch.from_numpy(samples_weight)
    samples_weigth = samples_weight.double()
    sampler = WeightedRandomSampler(samples_weight, len(samples_weight))
    
    trainer = Trainer(model, optimizer, loss_fn)
    trainer.train(tensor_x, tensor_y, test_x, test_y, epochs=epochs, batch_size=batch_size, epoch_print=False, sampler=sampler)
    acc, token_acc_array = trainer.test(test_x, test_y)
    print(acc, token_acc_array)
    score_array.append(sum(token_acc_array) / len(token_acc_array))
    
sum(score_array) / len(score_array)


Early stopping!
Start to test process.
All Time: 29.52 s | Loss: 0.77
tensor(0.0726) tensor([0.1354,    nan])
Early stopping!
Start to test process.
All Time: 44.65 s | Loss: 0.54
tensor(0.9243) tensor([   nan, 0.9607])
Early stopping!
Start to test process.
All Time: 66.42 s | Loss: 0.86
tensor(0.0757) tensor([0.1407,    nan])
Early stopping!
Start to test process.
All Time: 29.44 s | Loss: 0.77
tensor(0.0757) tensor([0.1407,    nan])
Early stopping!
Start to test process.
All Time: 14.57 s | Loss: 0.89
tensor(0.0757) tensor([0.1407,    nan])
Early stopping!
Start to test process.
All Time: 40.45 s | Loss: 0.94
tensor(0.0757) tensor([0.1407,    nan])
Early stopping!
Start to test process.
All Time: 18.12 s | Loss: 0.93
tensor(0.0757) tensor([0.1407,    nan])
Early stopping!
Start to test process.
All Time: 18.28 s | Loss: 0.86
tensor(0.0757) tensor([0.1407,    nan])
Early stopping!
Start to test process.
All Time: 18.44 s | Loss: 0.95
tensor(0.0757) tensor([0.1407,    nan])
Early stop

tensor(nan)