In [None]:
import sys
import torch
import sklearn
import numpy as np
from einops import rearrange
import torch.nn.functional as Fun
from sklearn.model_selection import train_test_split

In [None]:
ID = 1
WS = 10
EPOC = 300
batch_size = 4096
PATH = 'Model/Multitask_Net'

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

In [None]:
def Load_Data(PATH, isd, iid):
    PRFIX_P = 'HPQ-10-'
    PRFIX_R = 'HRQ-10-'
    X, Y = [], []
    
#     for isd in range(5):
#     for iid in range(1,6):
    P = np.load(PATH+PRFIX_P+str(isd)+'-'+str(iid)+'.npy', allow_pickle=True)
    R = np.load(PATH+PRFIX_R+str(isd)+'-'+str(iid)+'.npy', allow_pickle=True)
    for i in range(np.shape(P)[0]):
        if np.shape(P[i])[0]<=WS: continue
        print(iid, ' - ', i+1, '/', np.shape(P)[0], end='\r')
        for j in range(np.shape(P[i])[0]-WS):
            hp, cp = P[i][j:j+WS], P[i][j+WS]
            hr, cr = R[i][j:j+WS], R[i][j+WS]
            rate = [hr[k]/hp[k] if hp[k]!=0 else 0 for k in range(WS)]
            Rate = np.asarray(rate)
            if (Rate==0).all():              yl=1
            elif (abs(Rate-0.5)<0.01).all(): yl=2
            elif (abs(Rate-0.5)<0.01).any() or (Rate==0).any(): yl=3
            else: yl=0
            X.append(hp+hr+rate+[cp])
            Y.append([cr/cp if cp!=0 else 0, yl])
    return X, Y

In [None]:
X, Y = Load_Data('Training_Dataset/', 0, 1)
X_train, X_test, Y_train, Y_test = train_test_split(X, Y, test_size=0.2, random_state=1)
X_train, X_val,  Y_train, Y_val  = train_test_split(X_train, Y_train, test_size=0.25, random_state=1)
X_train, X_val, X_test = torch.tensor(X_train).float().to(device), torch.tensor(X_val).float().to(device), torch.tensor(X_test).float().to(device)
Y_train, Y_val, Y_test = torch.tensor(Y_train).float().to(device), torch.tensor(Y_val).float().to(device), torch.tensor(Y_test).float().to(device)
YP1, YP2, YP3, YP4 = np.sum(np.asarray(Y, dtype=int)[:,1]==0)/np.size(Y)*2, np.sum(np.asarray(Y, dtype=int)[:,1]==1)/np.size(Y)*2, np.sum(np.asarray(Y, dtype=int)[:,1]==2)/np.size(Y)*2, np.sum(np.asarray(Y, dtype=int)[:,1]==3)/np.size(Y)*2
print(np.shape(X_train), np.shape(Y_train))
print(np.shape(X_val), np.shape(Y_val))
print(np.shape(X_test), np.shape(Y_test))
print(round(YP1,3), round(YP2,3), round(YP3,3), round(YP4,3))
print(round(1/YP1,3), round(1/YP2,3), round(1/YP3,3), round(1/YP4,3))

In [None]:
class Net(torch.nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.Atten1 = torch.nn.MultiheadAttention(1, 1, batch_first=True)
        self.Atten2 = torch.nn.MultiheadAttention(1, 1, batch_first=True)
        self.Atten3 = torch.nn.MultiheadAttention(1, 1, batch_first=True)
        self.Class1 = torch.nn.Linear(10,8)
        self.Class2 = torch.nn.Linear( 8,4)
        
        self.FC2o1  = torch.nn.Linear(7,8)
        self.FC2o2  = torch.nn.Linear(8,4)
        self.FC2o3  = torch.nn.Linear(4,1)
        self.FC3o1  = torch.nn.Linear(3,4)
        self.FC3o2  = torch.nn.Linear(4,1)
        self.LRelu  = torch.nn.LeakyReLU()
        self.softm  = torch.nn.Softmax(dim=1)
    def forward(self, x):
        x1, x2, x3, x4 = x[:, :WS], x[:, WS:WS*2], x[:, WS*2:WS*3], x[:, WS*3:WS*3+1]
        CLA = self.softm( self.Class2( self.LRelu(self.Class1(x3))))
        N = np.shape(x1)[0]
        x1, x2, x3, x4 = torch.reshape(x1, (N,WS,1)), torch.reshape(x2, (N,WS,1)), torch.reshape(x3, (N,WS,1)), torch.reshape(x4, (N,1,1))
        attn_output1, attn_output_weights1 = self.Atten1(x4, x1, x1)
        attn_output2, attn_output_weights2 = self.Atten2(x4, x1, x2)
        attn_output3, attn_output_weights3 = self.Atten3(x4, x1, x3)
        x1 = torch.sum( torch.reshape(x1,(N,-1)) * torch.reshape(attn_output_weights1,(N,-1)), 1)
        x2 = torch.sum( torch.reshape(x2,(N,-1)) * torch.reshape(attn_output_weights2,(N,-1)), 1)
        x3 = torch.sum( torch.reshape(x3,(N,-1)) * torch.reshape(attn_output_weights3,(N,-1)), 1)
        atx = torch.cat((torch.reshape(attn_output1,(N,1)), torch.reshape(attn_output2,(N,1)), torch.reshape(attn_output3,(N,1)), CLA), 1)
        out1= torch.sigmoid( self.FC2o3( self.LRelu(self.FC2o2( self.LRelu(self.FC2o1(atx))))))
        ### Ensemble
        esx = (torch.cat((out1, torch.reshape(x3,(N,1)), torch.nan_to_num(torch.reshape(x2/(x1+1E-9),(N,1)))), 1) )
        out = torch.sigmoid( self.FC3o2( self.LRelu(self.FC3o1(esx))))
        return CLA, out, attn_output_weights1, attn_output_weights2, attn_output_weights3


### Training & Validation

In [None]:
netS      = Net()
loss_fnp  = torch.nn.L1Loss()
loss_fnc  = torch.nn.CrossEntropyLoss(weight=torch.tensor([1/YP1, 1/YP2, 1/YP3, 1/YP4]).to(device))
optimizer = torch.optim.Adam(netS.parameters(), lr=0.01)
NB        = np.shape(Y_train)[0]//batch_size     # Number of batch
BLC, BLP  = np.inf, np.inf                       # Best loss for save best
netS.to(device)

for epoc in range(EPOC):
    LC, LP = 0, 0
    for b in range(NB):
        t, f = b*batch_size, (b+1)*batch_size
        if b==NB-1: x, yc, yp = X_train[t: ], Fun.one_hot(Y_train[t: ,1].to(torch.int64), num_classes=4), Y_train[t: ,0]
        else:       x, yc, yp = X_train[t:f], Fun.one_hot(Y_train[t:f,1].to(torch.int64), num_classes=4), Y_train[t:f,0]
        
        cla, pred, w1, w2, w3 = netS(x)
        optimizer.zero_grad()
        lossc = loss_fnc( torch.reshape(cla,(-1,4)), yc.float())
        LC   += float(lossc)
        lossc.backward(retain_graph=True)
        lossp = loss_fnp( torch.reshape(pred,(-1,)), yp)
        LP   += float(lossp)
        lossp.backward()
        optimizer.step()
        optimizer.zero_grad()
    x, yc, yp = X_val, Fun.one_hot(Y_val[:,1].to(torch.int64), num_classes=4), Y_val[:,0]
    cla, pred, w1, w2, w3 = netS(x)
    lossc = float( loss_fnc( torch.reshape(cla,(-1,4)), yc.float()) )
    lossp = float( loss_fnp( torch.reshape(pred,(-1,)), yp) )
    print(epoc+1, '/', EPOC, ' - ', round(lossc,5), round( (np.argmax(cla.to('cpu').detach().numpy(),axis=1)==(np.argmax(yc.to('cpu').detach().numpy(),axis=1))).mean(),3), round(lossp,5))
    
    if lossc<BLC:
        BLC = lossc
        torch.save(netS.state_dict(), PATH+'C_'+str(ID)+'.pt') 
        print('Save Best Classification at', epoc, 'with loss of', lossc)
    if lossp<BLP:
        BLP = lossp
        torch.save(netS.state_dict(), PATH+'P_'+str(ID)+'.pt') 
        print('Save Best Prediction at', epoc, 'with loss of', lossp)
print(round(lossc,5), round(lossp,5))

### Testing

In [None]:
netS.load_state_dict(torch.load(PATH+'C_'+str(ID)+'.pt'))
x, yc, yp = X_test, Fun.one_hot(Y_test[:,1].to(torch.int64), num_classes=4), Y_test[:,0]
cla, pred, w1, w2, w3 = netS(x)
lossc = float( loss_fnc( torch.reshape(cla,(-1,4)), yc.float()) )
lossp = float( loss_fnp( torch.reshape(pred,(-1,)), yp) )
print('Testing Loss:', round(lossc,5), round( (np.argmax(cla.to('cpu').detach().numpy(),axis=1)==(np.argmax(yc.to('cpu').detach().numpy(),axis=1))).mean(),3), round(lossp,5))
P = np.argmax(cla.to('cpu').detach().numpy(),axis=1)
G = (np.argmax(yc.to('cpu').detach().numpy(),axis=1))
CM = np.zeros((4,4), dtype=int)
for i in range(np.shape(P)[0]):
    CM[G[i]][P[i]] += 1
print(CM)

In [None]:
netS.load_state_dict(torch.load(PATH+'P_'+str(ID)+'.pt'))
x, yc, yp = X_test, Fun.one_hot(Y_test[:,1].to(torch.int64), num_classes=4), Y_test[:,0]
cla, pred, w1, w2, w3 = netS(x)
lossc = float( loss_fnc( torch.reshape(cla,(-1,4)), yc.float()) )
lossp = float( loss_fnp( torch.reshape(pred,(-1,)), yp) )
print('Testing Loss:', round(lossc,5), round( (np.argmax(cla.to('cpu').detach().numpy(),axis=1)==(np.argmax(yc.to('cpu').detach().numpy(),axis=1))).mean(),3), round(lossp,5))
P = np.argmax(cla.to('cpu').detach().numpy(),axis=1)
G = (np.argmax(yc.to('cpu').detach().numpy(),axis=1))
CM = np.zeros((4,4), dtype=int)
for i in range(np.shape(P)[0]):
    CM[G[i]][P[i]] += 1
print(CM)