<a href="https://colab.research.google.com/github/Felipex99/Rede_Neural_ECG/blob/main/Algoritmo_ECG.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

1-Execute primeiramente o código abaixo para instalar a biblioteca WFDB e definir a o diretório raiz do colab

In [None]:
import os
data_path = '/content/'
os.chdir(data_path)
!rm -rf ./*
! pip install wfdb
!ls

2-Depois execute a célula abaixo para baixar todos os dados dos eletrocardiogramas presentes no repositório do site da https://physionet.org/


In [None]:
! wget -r -N -c -np https://physionet.org/files/ludb/1.0.1/

3-Execute esta célula para ver os resultados da rede neural

In [None]:
%matplotlib inline
import torch
from torch import nn,optim
import os
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader, Dataset
import wfdb
import numpy as np
import pandas as pd
from sklearn import metrics
data_path = '/content/'
os.chdir(data_path)
# definindo o caminho para se trabalhar os dados do ECG
data_path = '/content/physionet.org/files/ludb/1.0.1/data'
os.chdir(data_path)

# variáveis para alterar os padrões da rede neural
args = {
    'batch_size':6,
    'num_workers':2,
    'num_classes':12,
    'lr':1e-3,
    'weight_decay':5e-4,
    'num_epochs':20
}

# criando a classe ECG para extrair os dados
class ECG(Dataset):
  def __init__(self,data_path):
    self.record_names = []
    self.rotulo = []
    self.dados = []
    # procurando os dados de formato wfdb na pasta
    for arq in os.listdir(data_path):
      if arq.endswith('.dat'):
        self.record_names.append(arq[:-4])
        #print('({})Exame: {}'.format(len(self.record_names),self.record_names))
        # ler no arquivo se possui alguma nota de alteração no ECG
        nota_arq = os.path.join(data_path,arq[:-4]+'.hea')
        with open(nota_arq,'r') as linha:
          diagnostico = linha.read()
          if 'Sinus rhythm' in diagnostico:
            self.rotulo.append(0) # exame normal
          else:
             self.rotulo.append(1)  # anormalidade encontrada no exame
        # ler o arquivo do registro
        record_file = os.path.join(data_path,arq[:-4])
        signal,meta_data = wfdb.rdsamp(record_file)
        self.dados.append(signal)
        #print('Rotulo{}: '.format(self.rotulo))
  def __getitem__(self,idx):
    sample = self.dados[idx][1]
    rotulo = self.rotulo[idx]
    # converter pra tensor
    sample = torch.from_numpy(sample.astype(np.float32))
    rotulo = torch.tensor([rotulo],dtype = torch.float32)
    #rotulo = torch.from_numpy(rotulo.astype(np.float32))

    return sample, rotulo

  def __len__(self):
    return len(self.record_names)

dataset = ECG(data_path)
train_size = int(0.8*len(dataset))
test_size = len(dataset)-train_size

train_dataset,test_dataset = torch.utils.data.random_split(dataset,[train_size,test_size])



train_loader = DataLoader(train_dataset,
                          batch_size = args['batch_size'],
                          shuffle = True,
                          num_workers = args['num_workers'])

test_loader = DataLoader(test_dataset,
                         batch_size = args['batch_size'],
                         shuffle = True,
                         num_workers = args['num_workers'])

class RedeNeural(nn.Module):
    def __init__(self,entrada,escondida,saida):
        super(RedeNeural,self).__init__()
        self.escondida = nn.Sequential(
            nn.Linear(entrada,escondida),
            nn.ReLU(),
            nn.Linear(escondida,escondida),
            nn.ReLU(),
            )
        self.saida = nn.Linear(escondida,saida)
        self.sigmoid = nn.Sigmoid()  # Para prever valores entre 0 e 1
    def forward(self,X):
        X = X.view(X.size(0),-1)
        hidden = self.escondida(X)
        out = self.sigmoid(self.saida(hidden))
        return out

entrada = len(dataset[0][0]) #quantidades de atributos  12 derivações
escondida = 16
saida = 1 # variaveis que serao preditas 0 = Normal, 1 = Anormal
rede = RedeNeural(entrada,escondida,saida)
criterion = nn.BCELoss()
optimizer = optim.Adam(rede.parameters(),lr = args['lr'],weight_decay = args['weight_decay'])

########
def train(train_loader,RedeNeural,epoch):
      rede.train()
      epoch_loss = []
      for batch in train_loader:
          dado,rotulo = batch
          optimizer.zero_grad()
          pred = rede(dado)

          loss = criterion(pred,rotulo)
          epoch_loss.append(loss.data)

          loss.backward()
          optimizer.step()
      epoch_loss = np.asarray(epoch_loss)
      print('Treino:: Epoca %d,Loss:%.4f +/- %.4f'%(epoch,epoch_loss.mean(),epoch_loss.std()))
      return epoch_loss.mean()
#########
def test(test_loader,RedeNeural,epoch):
      rede.eval()
      rotulos_reais=[]
      rotulos_previstos = []
      with torch.no_grad():
        epoch_loss = []
        for batch in test_loader:
            dado,rotulo = batch
            # Forward
            pred = rede(dado)
            prev = torch.round(pred)
            rotulos_reais.extend(rotulo.numpy())
            rotulos_previstos.extend(prev.numpy())
            loss = criterion(pred,rotulo)
            epoch_loss.append(loss.data)
        acuracia = metrics.accuracy_score(rotulos_reais,rotulos_previstos)
        epoch_loss = np.asarray(epoch_loss)
        print('Acurracia : {:.2f}%, Real{} = PrevistoIA={}'.format(acuracia*100, rotulos_reais[0],rotulos_previstos[0]))
        print('Teste:: Epoca %d,Loss:%.4f +/- %.4f'%(epoch,epoch_loss.mean(),epoch_loss.std()))
        return epoch_loss.mean()

train_losses, test_losses = [],[]
for epoch in range(args['num_epochs']):
  #train
  train_losses.append(train(train_loader,rede,epoch))
  #validate
  test_losses.append(test(test_loader,rede,epoch))
  print('-----------------------------')
plt.figure(figsize=(5,5))
plt.plot(train_losses,label = 'Train')
plt.plot(test_losses,label = 'Test',linewidth = 3, alpha = 0.5)
plt.xlabel('Épocas',fontsize = 16)
plt.ylabel('Perda',fontsize = 16)
plt.title('Convergência',fontsize = 16)
plt.legend()
plt.show()