In [None]:
import torch
import torch.nn as nn
from torch import optim
from torch.autograd import Variable
import torch.nn.functional as F
from torch.utils.data import TensorDataset, DataLoader, ConcatDataset

import numpy as np
import glob
from tqdm import tqdm
import matplotlib.pyplot as plt
import umap.umap_ as umap
from sklearn.metrics import confusion_matrix

import sys
sys.path.append('../')

import random
seed = 777
random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
np.random.seed(seed)

import os
os.environ["PYTHONHASHSEED"]=str(seed)
torch.backends.cudnn.deterministic = True  # type: ignore
torch.backends.cudnn.benchmark = True  # type: ignore

import loader.load_from_h5 as loadh5
from model.model_binary import MyModel

In [None]:
target_la = 0

data_file = glob.glob('../data/normal*.h5')

feature = loadh5.FeatureDataset(data_file).get_array()
label = loadh5.FeatureDataset(data_file).get_label()

target_feature = torch.FloatTensor(feature[label==target_la])
target_label = torch.FloatTensor(label[label==target_la])

norm_ds = TensorDataset(target_feature,target_label)

Anomaly_data_file = glob.glob('../data/Anomaly_dataset.h5')
ano_ds = loadh5.FeatureDataset(Anomaly_data_file).get_dataset(split_type='all')

#ds = ConcatDataset([norm_ds,ano_ds])

length = [int(len(norm_ds)*0.7),int(len(norm_ds)*0.2)]
length.append((len(norm_ds)-sum(length)))
NtrnSet,NvalSet,NtstSet = torch.utils.data.random_split(norm_ds,length)

length = [int(len(ano_ds)*0.7),int(len(ano_ds)*0.2)]
length.append((len(ano_ds)-sum(length)))
AtrnSet,AvalSet,AtstSet = torch.utils.data.random_split(ano_ds,length)

trnSet = ConcatDataset([NtrnSet,AtrnSet])
valSet = ConcatDataset([NvalSet,AvalSet])
tstSet = ConcatDataset([NtstSet,AtstSet])

train_loader = DataLoader(trnSet, batch_size = 64, shuffle = True)
val_loader = DataLoader(valSet, batch_size = 64, shuffle=False)
test_loader = DataLoader(valSet, batch_size = 64, shuffle=False)

# train_loader = loadh5.FeatureDataset(data_file).get_dataloader(split_type='training')
# val_loader = loadh5.FeatureDataset(data_file).get_dataloader(split_type='validation')
# test_loader = loadh5.FeatureDataset(data_file).get_dataloader(split_type='test')

In [None]:
# target_la = 0

# data_file = glob.glob('../data/normal*.h5')
# test_loader = loadh5.FeatureDataset(data_file).get_dataloader(split_type='test')

In [None]:
model = MyModel().to('cuda')

c_criterion = nn.NLLLoss()
t_criterion = nn.TripletMarginLoss()
optimizer = optim.Adam(model.parameters(),lr=1e-3)

epochs = 50

In [None]:
model.eval()
tst_embedded = []
tst_y = []
for x, y in test_loader:
    x = Variable(x.float()).to('cuda')
    embedded = model.embedding(x.view(-1,784)).detach().cpu()
    tst_embedded.append(embedded)
    tst_y.append(y)
tst_embedded = torch.cat(tst_embedded)
tst_y = torch.cat(tst_y)
#print(torch.unique(tst_y))
#print(tst_embedded.shape,tst_y.shape)
hle = umap.UMAP(random_state=0,metric='euclidean',n_components=2,n_neighbors=20,min_dist=0).fit_transform(tst_embedded)
c_lst = [plt.cm.nipy_spectral(a) for a in np.linspace(0.0, 1.0, len(np.unique(tst_y)))]
plt.figure(figsize=(10,10))
for idx,label in enumerate(torch.unique(tst_y)):
    i = int(label)
    plt.scatter(hle[tst_y==i,0],hle[tst_y==i,1],label=i,color=c_lst[idx])
plt.legend(loc='best')
plt.title('UMAP 2D Before Training')
plt.savefig(f'../result/model_hom_beforeTraining.png')
plt.show()

In [None]:
train_loss=[]
val_loss = []
best_acc = 0
for epoch in tqdm(range(1,epochs+1)):
#for epoch in range(1,epochs+1):
    model.train()
    b_loss=[]
    for x ,y in train_loader:
        #print(y)
        for y_idx, y_i in enumerate(y):
            if y_i == target_la:
                y[y_idx] = 0
            else:
                y[y_idx] = 1
        #print(y)
        
        x = Variable(x.float()).to('cuda')
        y = Variable(y.type(torch.LongTensor)).to('cuda')
        
        out = model(x)
        embedded = model.embedding(x.view(-1,784))

        triplet_tensor = []
        triplet_label = []
        
        anchor_x, anchor_y = embedded[y==0], y[y==0]
        positive_x, positive_y = embedded[y==0], y[y==0]
        negative_x, negative_y = embedded[y!=0], y[y!=0]
        #print(anchor.shape,positive.shape,negative.shape)
        a_ind = torch.arange(0,len(anchor_x))
        p_ind = torch.arange(0,len(positive_x))
        n_ind = torch.arange(0,len(negative_x))
        triplet_ind = torch.cartesian_prod(a_ind,p_ind,n_ind)
        #print(triplet_ind.shape)
        
        for a,p,n in triplet_ind:
            if a != p:
                #print(anchor_x[a].shape,positive_x[p].shape,negative_x[n].shape)
                triplet_tensor.append(torch.stack([anchor_x[a],positive_x[p],negative_x[n]]))
                triplet_label.append(torch.stack([anchor_y[a],positive_y[p],negative_y[n]]))
                #break
        
        if len(triplet_tensor)==0:
            continue
        else:
            triplet_tensor = torch.stack(triplet_tensor)
            triplet_label = torch.stack(triplet_label)
            #print(triplet_tensor.shape)
            
            anchor = triplet_tensor[:,0]
            positive = triplet_tensor[:,1]
            negative = triplet_tensor[:,2]
            
            optimizer.zero_grad()
            #print(out.shape,y.shape)
            classification_loss = c_criterion(out,y)
            triplet_loss = t_criterion(anchor,positive,negative)
            loss = classification_loss + triplet_loss
            b_loss.append(loss.cpu().item())
            loss.backward()
            optimizer.step()
    train_loss.append(np.array(b_loss).mean())
    
    model.eval()
    b_loss=[]
    with torch.no_grad():
        total = 0
        correct = 0
        for x,y in val_loader:
            for y_idx, y_i in enumerate(y):
                if y_i == target_la:
                    y[y_idx] = 0
                else:
                    y[y_idx] = 1
            
            x = Variable(x.float()).to('cuda')
            y = Variable(y.type(torch.LongTensor)).to('cuda')
            
            out = model(x)
            
            out_sm = F.softmax(out,dim=1).cpu()
            predict = (F.softmax(out_sm,dim=1)).argmax(1).cpu()
            
            total += y.size(0)
            correct += (predict ==y.cpu()).sum().item()
            
            loss = c_criterion(out,y)
            b_loss.append(loss.cpu().item())
    val_loss.append(np.array(b_loss).mean())
    acc = 100 * (correct / total)
    if best_acc < acc:
        best_acc = acc
        best_epoch = epoch
        best_state_dict = model.state_dict()
        torch.save(best_state_dict,'../result/model_hom_best.pth')
    
    if epoch % 5 == 0:
        print("Validation Accuracy",acc,'at Epoch',epoch)
        torch.save(best_state_dict,f'../result/model_hom_{epoch}epoch.pth')
        model.eval()
        with torch.no_grad():
            tst_embedded = []
            tst_y = []
            for x, y in test_loader:
                x = Variable(x.float()).to('cuda')
                embedded = model.embedding(x.view(-1,784)).detach().cpu()
                tst_embedded.append(embedded)
                tst_y.append(y)
            tst_embedded = torch.cat(tst_embedded)
            tst_y = torch.cat(tst_y)
            #print(tst_embedded.shape,tst_y.shape)
            hle = umap.UMAP(random_state=0,metric='euclidean',n_components=2,n_neighbors=20,min_dist=0).fit_transform(tst_embedded)
            c_lst = [plt.cm.nipy_spectral(a) for a in np.linspace(0.0, 1.0, len(np.unique(tst_y)))]
            plt.figure(figsize=(10,10))
            for idx,label in enumerate(torch.unique(tst_y)):
                i = int(label)
                plt.scatter(hle[tst_y==i,0],hle[tst_y==i,1],label=i,color=c_lst[idx])
            plt.legend(loc='best')
            plt.title('UMAP 2D at Epoch'+str(epoch))
            plt.savefig(f'../result/model_hom_{epoch}epoch.png')
            plt.show()
            plt.close()


In [None]:
plt.plot(train_loss,label='train loss')
plt.plot(val_loss,label='validation loss')
plt.legend()
plt.show()

In [None]:
print("Best Validation Accuracy Epoch",best_epoch)
print("Best Validation Accuracy(%)",best_acc)

In [None]:
model.load_state_dict(torch.load('../result/model_hom_best.pth'))

In [None]:
model.eval()
tst_embedded = []
tst_y = []
with torch.no_grad():
    for x, y in test_loader:
        x = Variable(x.float()).to('cuda')
        embedded = model.embedding(x.view(-1,784)).detach().cpu()
        tst_embedded.append(embedded)
        tst_y.append(y)
            
tst_embedded = torch.cat(tst_embedded)
tst_y = torch.cat(tst_y)
#print(tst_embedded.shape,tst_y.shape)
hle = umap.UMAP(random_state=0,metric='euclidean',n_components=2,n_neighbors=20,min_dist=0).fit_transform(tst_embedded)
c_lst = [plt.cm.nipy_spectral(a) for a in np.linspace(0.0, 1.0, len(np.unique(tst_y)))]
plt.figure(figsize=(10,10))
for idx,label in enumerate(torch.unique(tst_y)):
    i = int(label)
    plt.scatter(hle[tst_y==i,0],hle[tst_y==i,1],label=i,color=c_lst[idx])
plt.legend(loc='best')
plt.title('UMAP 2D at Best epoch')
plt.savefig(f'../result/model_hom_{epoch}epoch.png')
plt.show()

In [None]:
label = []
pred = []
with torch.no_grad():
    for x, y in test_loader:
        #print(y)
        for y_idx, y_i in enumerate(y):
            #print(y.shape)
            if y_i == target_la:
                y[y_idx] = 0
            else:
                y[y_idx] = 1
        #print(y)
        x = Variable(x.float()).to('cuda')
        y = Variable(y.type(torch.LongTensor)).to('cuda')
        out = model(x)
                
        out_sm = F.softmax(out,dim=1).cpu()
        predict = (F.softmax(out_sm,dim=1)).argmax(1).cpu()
        label.append(y.cpu().numpy())
        pred.append(predict.cpu().numpy())
label = np.concatenate(label)
pred = np.concatenate(pred)
print(label.shape)
print(pred.shape)
confusion_mat = confusion_matrix(label,pred)
print(confusion_mat)

In [None]:
label = []
pred = []
with torch.no_grad():
    for x, y in val_loader:
        #print(y)
        for y_idx, y_i in enumerate(y):
            #print(y.shape)
            if y_i == target_la:
                y[y_idx] = 0
            else:
                y[y_idx] = 1
        #print(y)
        x = Variable(x.float()).to('cuda')
        y = Variable(y.type(torch.LongTensor)).to('cuda')
        out = model(x)
                
        out_sm = F.softmax(out,dim=1).cpu()
        predict = (F.softmax(out_sm,dim=1)).argmax(1).cpu()
        label.append(y.cpu().numpy())
        pred.append(predict.cpu().numpy())
label = np.concatenate(label)
pred = np.concatenate(pred)
print(label.shape)
print(pred.shape)
confusion_mat = confusion_matrix(label,pred)
print(confusion_mat)