In [2]:
## P1 - Parte Computacional - PSI3471
## Aluno: Leonardo Isao Komura - NUSP: 11261656

In [3]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import math
from scipy.fft import dct
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

In [4]:
# Leitura dos arquivos
ecg_treino = pd.read_csv("ecg_treino.csv")
rotulos_treino = pd.read_csv("rotulos_treino.csv")
n_rotulos_treino = pd.read_csv("n_rotulos_treino.csv")  

ecg_teste = pd.read_csv("ecg_teste.csv")
rotulos_teste = pd.read_csv("rotulos_teste.csv")
n_rotulos_teste = pd.read_csv("n_rotulos_teste.csv") 

In [5]:
# Transformação em formato numpy
treino = ecg_treino.to_numpy()
r_treino = rotulos_treino.to_numpy()
n_treino = n_rotulos_treino.to_numpy()

teste = ecg_teste.to_numpy()
r_teste = rotulos_teste.to_numpy()
n_teste = n_rotulos_teste.to_numpy()

In [6]:
# Deletando os "nomes" dos pacientes
treino = np.delete(treino, 0, axis=1)
r_treino = np.delete(r_treino, 0, axis=1)
n_treino = np.delete(n_treino, 0, axis=1)

teste = np.delete(teste, 0, axis=1)
r_teste = np.delete(r_teste, 0, axis=1)
n_teste = np.delete(n_teste, 0, axis=1)

In [59]:
# Cálculo da média do "comprimento" dos 5 primeiros batimentos de todos pacientes
nn = 0.0
for i in range(0, len(n_treino)):
    for j in range(5):
        nn = nn + abs(n_treino[i,j] - n_treino[i, j+1])
nn_average = math.ceil(nn/(5*len(n_treino)))
print("Média do comprimento dos 5 primeiros batimentos de todos pacientes: ", nn_average)

Média do comprimento dos 5 primeiros batimentos de todos pacientes:  294


In [62]:
# Amostrando os primeiros nn_average pontos de cada paciente
amostra_treino = np.zeros((len(treino), nn_average))
amostra_teste = np.zeros((len(teste), nn_average))
for i in range(len(treino)):
    for j in range(nn_average):
        amostra_treino[i, j] = treino[i, j]

for i in range(len(teste)):
    for j in range(nn_average):
        amostra_teste[i, j] = teste[i, j]

In [71]:
# Transformada de Fourier da amostra
ft_treino = dct(amostra_treino, n = 20, type = 1)
ft_teste = dct(amostra_teste, n = 20, type = 1)

# Adicionar coluna de 1's (não é para bias)
coluna = np.ones((len(n_treino), 1))
ft_treino = np.append(coluna, ft_treino, axis = 1)
coluna = np.ones((len(n_teste), 1))
ft_teste = np.append(coluna, ft_teste, axis = 1)

In [76]:
# Cálculo da média do "comprimento" dos 5 primeiros batimentos de cada paciente e adicionando na primeira coluna
for i in range(0, len(n_treino)):
    nn = 0.0
    for j in range(5):
        nn = nn + abs(n_treino[i,j] - n_treino[i, j+1])
    nn_average = math.ceil(nn/5)
    ft_treino[i,0] = nn_average
    
for i in range(0, len(n_teste)):
    nn = 0.0
    for j in range(5):
        nn = nn + abs(n_teste[i,j] - n_teste[i, j+1])
    nn_average = math.ceil(nn/5)
    ft_teste[i,0] = nn_average

In [78]:
# Retornando para dataframe
df_ft_treino = pd.DataFrame(ft_treino)
df_ft_teste  = pd.DataFrame(ft_teste)

In [None]:
class MLP(nn.Module):
    def __init__(self, input_size, n_hidden, output_size):
        super(MLP, self).__init__()
        self.input_size = input_size
        self.network = nn.Sequential(
            nn.Linear(input_size, n_hidden), 
            nn.ELU(),
            nn.Linear(n_hidden, n_hidden-10), 
            nn.ELU(),
            nn.Linear(n_hidden-10, n_hidden-10), 
            nn.ELU(),
            nn.Linear(n_hidden-10, output_size), 
            nn.LogSoftmax(dim=1)
        )

    def forward(self, x):
        x = x.view(-1, self.input_size)
        return self.network(x)

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
def train(epoch, model):
    # Coloca o modelo em modo de treinamento
    model.train()
    
    # Loop sobre os mini-batches, fornecidos pelo DataLoader train_loader
    for batch_idx, (data, target) in enumerate(ft_teste):      
        # Para mandar os dados para o device (GPU ou CPU definido anteriormente), usamos o método .to(device)
        data, target = data.to(device), target.to(device)
        
        # Ajuste de dimensões
        data = data.view(-1, 1, 28, 28)

        # Necessário no PyTorch, para limpar o cache de gradientes acumulados
        optimizer.zero_grad()
        
        # Cálculo da saída
        output = model(data)
        
        # nll_loss é a função custo da entropia cruzada
        loss = F.cross_entropy(output, target)
        
        # cálculo dos gradientes
        loss.backward()
        
        # atualização dos parâmetros do modelo
        optimizer.step()
        
        # Exibe o status do treinamento
        if batch_idx % 100 == 0:
            print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                epoch, batch_idx * len(data), len(train_loader.dataset),
                100. * batch_idx / len(train_loader), loss.item()))
            
def test(model):
    # Coloca o modelo em modo de teste
    model.eval()
    
    # Variáveis usadas para contabilizar o valor da função custo e número de acertos
    test_loss = 0
    correct = 0
    
    # Loop sobre os mini-batches, fornecidos pelo DataLoader test_loader
    for data, target in ft_teste:
        
        # Para mandar os dados para o device (GPU ou CPU definido anteriormente), usamos o método .to(device)     
        data, target = data.to(device), target.to(device)
        
        # Ajuste de dimensões
        data = data.view(-1, 1, 28, 28)
        
        # Cálculo da saída
        output = model(data)

        # Valor da função custo
        test_loss += F.cross_entropy(output, target, reduction='sum').item() # sum up batch loss                                                               

        # Cálculo do número de acertos
        pred = output.data.max(1, keepdim=True)[1] # get the index of the max log-probability                                                                 
        correct += pred.eq(target.data.view_as(pred)).cpu().sum().item()

    # Mostra o desempenho obtido no teste    
    test_loss /= len(test_loader.dataset)
    accuracy = 100. * correct / len(test_loader.dataset)
    accuracy_list.append(accuracy)
    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        accuracy))