In [0]:
#fetching the data from MIT-BIH 
!wget -r -N -c -np https://physionet.org/files/nstdb/1.0.0/
!wget -r -N -c -np https://physionet.org/files/mitdb/1.0.0/
!pip install wfdb

In [0]:
#imports here
import wfdb
import matplotlib.pyplot as plt
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
import math
from sklearn.preprocessing import MinMaxScaler, normalize
from statistics import variance as var
from operator import add

In [0]:
#reading the muscle artifact noise and spliting it into 650 parts each of length 1000 and normalizing it
#ma contains 650000 sample points
ma,f = wfdb.rdsamp('/content/physionet.org/files/nstdb/1.0.0/ma', sampto=650000)
ma=np.array(ma)
noise=[]
v=0
for i in range(649):
  y=[]
  for k in range(1000):
    y.append(ma[v+k][0])
  z=[]
  p=max(y)
  q=min(y)  
  for l in range(len(y)):
      t=(y[l]-q)/(p-q)
      z.append(t) 
  noise.append(z)
  v=v+1000
plt.plot(noise[1])

In [0]:
#reading data spliting and normalizing it........
data=[]
rec=['100','101','102','103','104','105','106','107','108','109','111','112','113','114','115','116','117','118','119','121','122','123','124','200','201','202','203','205','207','208','209','210','212','213','214','215','217','219','220','221','222','223','228','230','231','232','233','234']

for string in rec:
  record= wfdb.rdsamp('/content/physionet.org/files/mitdb/1.0.0/'+string, sampto=650000)
  ann = wfdb.rdann('/content/physionet.org/files/mitdb/1.0.0/'+string, 'atr', sampto=650000)
  I = record[0][:, 0]
  a=0
  e=1000
  for i in range(650):
    z=[]
    d=I[a:e]
    p=max(d)
    q=min(d)
    for l in range(len(d)):
      t=(d[l]-q)/(p-q)
      z.append(t)
    data.append(z)
    a+=1000
    e+=1000
#plotting the acquired ecg signal, mind that we are only taking lead I
plt.plot(data[1])
plt.xlabel('samples')
plt.ylabel('mV')
#dividing data in train and test test samples
train_data=[]
test_data=[]
for i in range(int(len(data)*0.8)):
  train_data.append(data[i])
for j in range(int(len(data)*0.8),len(data)):
  test_data.append(data[j])

In [0]:
fig_size = plt.rcParams["figure.figsize"]

# Prints: [8.0, 6.0]
print("Current size:", fig_size)
# Set figure width to 12 and height to 9
fig_size[0] = 10
fig_size[1] = 2
plt.rcParams["figure.figsize"] = fig_size
plt.plot(data[1000])
plt.xlabel('samples')
plt.ylabel('mV')

In [0]:
#noise_ma_data base
import random
n=[]
n_tr=[]
n_ts=[]
noise_ma_data=[]
for i in range(31200):
  l=random.randrange(0,200)
  n.append(noise[l])
  p=list(map(add,data[i],noise[l])) 
  p=np.array(p)
  noise_ma_data.append(p)
trainma_data=[]
testma_data=[]
for i in range(int(len(noise_ma_data)*0.8)):
  trainma_data.append(noise_ma_data[i])
  n_tr.append(n[i])
for j in range(int(len(noise_ma_data)*0.8),len(noise_ma_data)):
  testma_data.append(noise_ma_data[j])
  n_ts.append(n[j])

In [0]:
plt.plot(noise_ma_data[20])

In [0]:

#Metrics for future implementation
#to calulate snr, rmse, max_abs_val, normalized_cross_correlation
#https://github.com/scipy/scipy/issues/9097
def urms(x):
  y=[]
  for i in range(len(x)):
    y.append(x[i]**2)
  s=sum(y)
  return s
def signaltonoise(a,b):
  a=urms(a)
  b=urms(b)
  snr=a/b
  return np.log(snr)*10
#intellipaat.com/community/1269/is-there-a-library-function-for-root-mean-square-error-rmse-in-python
def rmse(predictions, targets): 
    return np.sqrt(((np.array(predictions) - np.array(targets)) ** 2).mean())
#https://stackoverflow.com/questions/44864633/pythonic-way-to-find-maximum-absolute-value-of-list
def max_abs_val(samp_list):
  max_abs_value = samp_list[0]
  for num in samp_list:
      if abs(num) > max_abs_value:
          max_abs_value = abs(num)    
  return max_abs_value
#www.researchgate.net/post/How_can_one_calculate_normalized_cross_correlation_between_two_arrays
def normalized_cross_corr(x,y):
  x,y=np.array(x),np.array(y)
  N=len(x)
  a=(np.sqrt(var(x)*var(y)))
  b=(x - x.mean())* (y - y.mean())
  ncc = (1/N)*(sum(b))
  return ncc/a
def convert_numpy_tensor(arr):
  arr=np.array(arr)
  ten=torch.from_numpy(arr)
  return ten    

In [0]:
#AutoEncoder 
# define the NN architecture
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        self.fc1 = nn.Linear(1000,500)
        self.fc2 = nn.Linear(500,250)
        self.fc3 = nn.Linear(250,100)
        self.fc4=nn.Linear(100,250)
        self.fc5=nn.Linear(250,500)
        self.fc6=nn.Linear(500,1000)
        self.dropout = nn.Dropout(0.2)
    def encoder(self, x):
        x=x.squeeze()
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        x=F.relu(x)
        return x
    def decoder(self,x):
        x = F.relu(self.fc4(x))
        x = F.relu(self.fc5(x))
        x = self.fc6(x)
        x=x.view(-1)
        return x
# initialize the NN
model = Net()
model.cuda()
print(model)
model.double()

In [0]:
# specify loss function
criterion = nn.MSELoss()
optimizer = torch.optim.SGD(model.parameters(), lr=0.09,momentum=0.2)
# number of epochs to train the model
n_epochs = 100
# initialize tracker for minimum validation loss
valid_loss_min = np.Inf # set initial "min" to infinity

for epoch in range(n_epochs):
    # monitor training loss
    train_loss = 0.0
    valid_loss = 0.0
    model.train() # prep model for training
    for i in range(int(len(trainma_data))):
        optimizer.zero_grad()
        # forward pass: compute predicted outputs by passing inputs to the model
        b=convert_numpy_tensor(trainma_data[i])
        b=b.cuda()
        compressed = model.encoder(b)
        output=model.decoder(compressed)
        g=convert_numpy_tensor(train_data[i])
        g=g.cuda()
        # calculate the loss
        loss = criterion(output,g.squeeze())
        # backward pass: compute gradient of the loss with respect to model parameters
        loss.backward()
        # perform a single optimization step (parameter update)
        optimizer.step()
        # update running training loss
        train_loss += loss.item()*b.size(0)
    model.eval() # prep model for evaluation
    for j in range(int(len(testma_data))):
        # forward pass: compute predicted outputs by passing inputs to the model
        c=torch.from_numpy(testma_data[j])
        c=c.cuda()
        compressed = model.encoder(c)
        output=model.decoder(compressed)
        h=convert_numpy_tensor(test_data[j])
        h=h.cuda()
        # calculate the loss
        loss = criterion(output,h.squeeze())
        # update running validation loss 
        valid_loss += loss.item()*c.size(0)
        if (j==len(test_data)-1) and (epoch==n_epochs):
            plt.plot(testma_data[j])
            plt.plot(output.detach().numpy())
        
    # print training/validation statistics 
    # calculate average loss over an epoch
    train_loss = train_loss/int(len(trainma_data)*0.3)
    valid_loss = valid_loss/int(len(testma_data)*0.3)
    
    print('Epoch: {} \tTraining Loss: {:.6f} \tValidation Loss: {:.6f}'.format(
        epoch+1, 
        train_loss,
        valid_loss
        ))

In [0]:
#check for viz
c=torch.from_numpy(noise_ma_data[28000])
c=c.cuda()
compressed = model.encoder(c)
output=model.decoder(compressed)
output=output.detach().cpu().numpy()
plt.plot(output)
plt.plot(noise_ma_data[28000])
plt.plot(data[28000])

In [0]:
#testing
snr_original=0
snr_noise_added=0
snr_denoised=0
ncc=0
rootms=0
for i in range(20000,30000):
  c=torch.from_numpy(noise_ma_data[i])
  c=c.cuda()
  compressed = model.encoder(c)
  output=model.decoder(compressed)
  output=output.detach().cpu().numpy()
  d=data[i]
  snr_original=snr_original+signaltonoise(d,n[i])
  snr_noise_added=snr_noise_added+signaltonoise(noise_ma_data[i],n[i])
  snr_denoised=snr_denoised+signaltonoise(output,output-d)
  ncc=ncc+normalized_cross_corr(d,output)
  rootms=rootms+rmse(d,output)
snr_original=snr_original/10000
snr_noise_added=snr_noise_added/10000
snr_denoised=snr_denoised/10000
rootms=rootms/10000
ncc=ncc/10000
print('snr of original signal {} db'.format(snr_original))
print('snr of noise_added signal {} db'.format(snr_noise_added))
print('snr of denoised signal {} db'.format(snr_denoised))
print('ncc of denoised signal {}'.format(ncc))
print('root mean square error {}'.format(rootms))

In [0]:
#store the saved model
torch.save(model.state_dict(), 'checkpointma_10X.pt')

In [0]:
from google.colab import files
files.download('checkpointma_10X.pt')

In [0]:
#loading the model..........
model.load_state_dict(torch.load('checkpoint_10X_2.pt'))