In [7]:
import numpy as np
import torch
import torch.optim as optim
import torch.nn as nn
import matplotlib.pyplot as plt
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
import copy

In [8]:
np.random.seed(1)
torch.manual_seed(1)
torch.set_default_dtype(torch.float)
device = torch.device('mps' if torch.backends.mps.is_available() else 'mps')

In [9]:
data       = np.load("./data/ecg_data.npz")
Xtr        = torch.tensor(data["Xtr"]).type(torch.float).to(device)    #Clean train
Xtr_noise  = torch.tensor(data["Xtr_noise"]).type(torch.float).to(device)  #Noisy train
Xte_noise  = torch.tensor(data["Xte_noise"]).type(torch.float).to(device)  #Noisy test

params = np.load("./data/ecg_params.npz")
W = torch.FloatTensor(params["W"]).type(torch.float).to(device)  #Decoder parameters
V = torch.FloatTensor(params["V"]).type(torch.float).to(device)  #Encoder parameters

In [14]:
class Relu_AE(nn.Module):
    def __repr__(self):
        return {"model":"LinearAE","K":self.K,"lr":self.lr,"epoch":self.epochs}
    
    def __init__(self, dim,K,epoch=1000,lr=0.001):
        super(Relu_AE, self).__init__()
        self.model=nn.Sequential(
            nn.Linear(dim, K),
            nn.Tanh(),
            nn.Linear(K,dim)
            )
        self.lr=lr
        self.epochs=epoch
        self.criterion=nn.MSELoss(reduction="mean")
        self.K=K

    def forward(self, x):
        x = self.model(x)
        return x
    
    def reconstruct(self,x,W,V):
        self.model[0].weight.data=W
        self.model[1].weight.data=V
        x=self.model(x)
        return x
    
    def loss(self,X,X_hat):
        return self.criterion(X,X_hat).item()
    
    
    def fit(self,X,R):
        train_loss_list=[]
        optimizer=optim.Adam(self.parameters(),lr=self.lr)
        for epoch in range(self.epochs+1):
            train_loss = 0.0
            self.model.train()
            optimizer.zero_grad()
            output = self.model(X)
            loss = self.criterion(output, R)

            loss.backward()
            optimizer.step()
            train_loss_list.append(loss)
            #if(epoch%100==0):print('Epoch [{}/{}], Train Loss: {:.9f}'.format(epoch, self.epochs, loss))
        #self.saver(self.model.state_dict())
        return loss.item()

    def test(self,X,R):
        self.model.eval()
        with torch.no_grad():
            output=self.model(X)
            loss=self.criterion(output,R)
            return loss

    def print_param(self):
        for param in self.parameters():
            print(f'parameter shape:{param.shape}')
            #print(f'parameter value:{param.data}')
    
    @staticmethod
    def plot_loss(X,X_hat):
        time=len(X)
        plt.plot(range(1, time+1), X.cpu().detach().numpy(), label='Noisy Data')
        plt.plot(range(1, time+1), X_hat.cpu().detach().numpy(), label='Reconstructed Data')
        plt.xlabel('Time')
        plt.ylabel('ECG')
        plt.title('Noisy Data vs. Reconstructed Data')
        plt.legend()
        plt.show()

    def saver(self,best_weights,PATH="./models"):
        name=''.join(f"{key}{val}" for key, val in self.__repr__().items())
        path=PATH+"/"+name
        torch.save(best_weights,path)
    
    # def dataloader(self,X,R,batch_size=128):
    #     dataset = ECGDataset(X,R)
    #     dataloader=DataLoader(dataset, batch_size=batch_size, shuffle=True)
    #     return dataloader

In [12]:
model=Relu_AE(100,10,lr=0.01,epoch=1500).to(device)
loss=model.fit(Xtr,Xtr)


Epoch [0/1500], Train Loss: 2.675340652
Epoch [100/1500], Train Loss: 0.500086784
Epoch [200/1500], Train Loss: 0.302896261
Epoch [300/1500], Train Loss: 0.231185749
Epoch [400/1500], Train Loss: 0.194383904
Epoch [500/1500], Train Loss: 0.182105348
Epoch [600/1500], Train Loss: 0.168802336
Epoch [700/1500], Train Loss: 0.164180547
Epoch [800/1500], Train Loss: 0.163829684
Epoch [900/1500], Train Loss: 0.155608192
Epoch [1000/1500], Train Loss: 0.159630999
Epoch [1100/1500], Train Loss: 0.151950091
Epoch [1200/1500], Train Loss: 0.152611017
Epoch [1300/1500], Train Loss: 0.146961004
Epoch [1400/1500], Train Loss: 0.156033874
Epoch [1500/1500], Train Loss: 0.144171774


In [16]:
K=[k for k in range(10,150,5)]
train_loss_list=[]
test_loss_list=[]
best_k=0
best_loss=np.inf
best_model=None
for k in K:
    X_train, X_val, R_train, R_val = train_test_split(Xtr_noise, Xtr, test_size=0.2)
    model=Relu_AE(100,k,lr=0.001,epoch=1500).to(device)
    train_loss=model.fit(X_train,R_train)
    train_loss_list.append(train_loss)
    test_loss=model.test(X_val,R_val)
    test_loss_list.append(test_loss)
    if test_loss<best_loss:
        best_k=k
        best_model=model
        best_loss=test_loss
    print(f'K:{k} Train Loss:{train_loss}   Test Loss:{test_loss}')

K:10 Train Loss:0.3219006359577179   Test Loss:0.32627904415130615
K:15 Train Loss:0.19815082848072052   Test Loss:0.20917046070098877
K:20 Train Loss:0.15424960851669312   Test Loss:0.1589110642671585
K:25 Train Loss:0.12821383774280548   Test Loss:0.11921767145395279
K:30 Train Loss:0.10664771497249603   Test Loss:0.10253103822469711
K:35 Train Loss:0.09462974965572357   Test Loss:0.10278108716011047
K:40 Train Loss:0.08488091826438904   Test Loss:0.09520789980888367
K:45 Train Loss:0.07942568510770798   Test Loss:0.09023468941450119
K:50 Train Loss:0.07480417937040329   Test Loss:0.08563196659088135


In [None]:
X_pred=best_model.forward(Xte_noise)
for i in range(5):
    best_model.plot_loss(Xte_noise[i],X_pred[i])