In [None]:
import matplotlib.pyplot as plt
%matplotlib inline
import numpy as np
import pandas as pd
import os
import sys
import shutil
import math
import random
import heapq 
import time
import copy
import itertools  
from scipy.spatial.distance import pdist
from scipy.signal import butter, lfilter
from sklearn.model_selection import train_test_split
from scipy.spatial import distance
from functools import reduce
import faiss 
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
torch.cuda.set_device(2)
print (torch.cuda.current_device())

In [None]:
#Hanmming Distance
#Input: X, numpy array, m*n, m is samples of PVC (m subjects), n is the dimensions of PVC
#       diff, float, [0,1], the difference between PVCs
#Output: y, 0 is unifocal, 1 is multifocal

def Func_HammingDist(X, diff=0.3):
   
    def tanh(x): #tangent
        return (np.exp(x) - np.exp(-x)) / (np.exp(x) + np.exp(-x))
    
    if X.shape[0]>1: 
        X = np.sign(tanh(X)) 
        k = 1
        for i in range(X.shape[0]):
            for j in range(i+1,X.shape[0]):
                dist = pdist(np.vstack([X[i],X[j]]),'hamming')
                if dist>0.3: 
                    k = k+1
                    break
        if k==1: return 0 #unifocal
        else:  return 1 #multifocal
    else: return 0 #unifocal

In [None]:
#Autoencoder + Hanmming Distance
#Input: X, numpy array, m*n, m is samples of PVC (m subjects), n is the dimensions of PVC
#       diff, float, [0,1], the difference between PVCs
#Output: y, 0 is unifocal, 1 is multifocal

class autoencoder(nn.Module):
    def __init__(self, X_n):
        super(autoencoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Linear(X_n, 128),
            nn.ReLU(True),
            nn.Linear(128, 64),
            nn.ReLU(True), nn.Linear(64, 12), nn.ReLU(True), nn.Linear(12, 3))
        self.decoder = nn.Sequential(
            nn.Linear(3, 12),
            nn.ReLU(True),
            nn.Linear(12, 64),
            nn.ReLU(True),
            nn.Linear(64, 128),
            nn.ReLU(True), nn.Linear(128, X_n), nn.Tanh())

    def forward(self, x):
        x = self.encoder(x)
        x = self.decoder(x)
        return x

def Func_AutoencoderHammingDist(X, diff=0.2):
    if X.shape[0]<2: return 0 #unifocal
    #model training 
    model = autoencoder(X_n=X.shape[1]).cuda()
    criterion = nn.MSELoss()
    optimizer = torch.optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)
    best_net, best_loss = None, float('inf')
    batchSize = 100
    num_epochs = 200
    for epoch in range(num_epochs):
        losses = []
        num_batches = X.shape[0] // batchSize + 1
        for i in range(num_batches):
            min_idx = i * batchSize
            max_idx = np.min([X.shape[0], (i+1)*batchSize])
            inputs = torch.from_numpy(X[min_idx:max_idx]).type(torch.FloatTensor).cuda()
            # ===================forward=====================
            outputs = model(inputs)
            loss = criterion(outputs, inputs)
            # ===================backward====================
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
           # ===================log========================
            losses.append(loss.item())
        #print('epoch [{}/{}], loss:{:.6f}'.format(epoch + 1, num_epochs, np.mean(losses)))
        if np.mean(losses) < best_loss:
            best_loss = np.mean(losses)
            best_net = copy.deepcopy(model)
    print("best_loss = %.6f" % (best_loss))    
    #release gpu memory
    model = model.cpu()
    torch.cuda.empty_cache()    
    #predict PVC 
    k = 1
    for i in range(X.shape[0]-1):
        Q_batch = torch.from_numpy(X[i:i+1]).type(torch.FloatTensor).cuda()
        N_batch = torch.from_numpy(X[i+1:i+2]).type(torch.FloatTensor).cuda()
        Q_hash = torch.sign(best_net(Q_batch)).cpu().detach().numpy()
        N_hash = torch.sign(best_net(N_batch)).cpu().detach().numpy()
        #loss_neg = torch.mean(pdist(Q_hash, N_hash)).item()
        dist = pdist(np.vstack([Q_hash,N_hash]),'hamming')
        if dist>0.2: 
            k = k+1
            break
    if k==1: return 0 #unifocal
    else:  return 1 #multifocal