## Setting up the dataset 

In [20]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix, roc_auc_score, roc_curve

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader

from tqdm import tqdm
import wandb

In [22]:
""" 
Loading both abnormal and normal datasets.
"""

normal_df = pd.read_csv("dataset/ptbdb_normal.csv").iloc[:, :-1]
abnormal_df = pd.read_csv(
    "dataset/ptbdb_abnormal.csv").iloc[:, :-1]

In [23]:
"""
To fix the imbalance, Trimming the abnormal set
"""

anomaly_df = abnormal_df.sample(n=2000, random_state=42)

In [24]:
""" 
Dataset converted to numpy
"""
normal = normal_df.to_numpy()
anomaly = anomaly_df.to_numpy()

In [None]:
""" 
Dataset split
"""

X_train, X_test = train_test_split(normal, test_size=0.15, random_state=42, shuffle=True)

In [27]:
""" 
Custom dataset class for ECG Data 
"""

class ECGDataset(Dataset):
    def __init__(self, data):
        self.data = torch.tensor(data, dtype=torch.float32)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx], self.data[idx]  # AutoEncoder


In [28]:
""" 
Setting up dataloaders
"""

train_loader = DataLoader(ECGDataset(X_train), batch_size=128, shuffle=True)
test_loader = DataLoader(ECGDataset(X_test), batch_size=128)
anomaly_loader = DataLoader(ECGDataset(anomaly), batch_size=128)

In [29]:
""" 
Defining the AutoEnoder model. 
including both encoder and decoder
"""


class Conv1DAutoEncoder(nn.Module):
    def __init__(self, input_dim, latent_dim=32):
        super(Conv1DAutoEncoder, self).__init__()
        self.encoder = nn.Sequential(
            nn.Conv1d(1, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.BatchNorm1d(128),
            nn.MaxPool1d(2),
            nn.Conv1d(128, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.BatchNorm1d(128),
            nn.MaxPool1d(2),
            nn.Conv1d(128, latent_dim, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.BatchNorm1d(latent_dim),
            nn.MaxPool1d(2),
        )
        self.decoder = nn.Sequential(
            nn.ConvTranspose1d(latent_dim, latent_dim,
                               kernel_size=3, padding=1),
            nn.ReLU(),
            nn.BatchNorm1d(latent_dim),
            nn.ConvTranspose1d(latent_dim, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.BatchNorm1d(128),
            nn.ConvTranspose1d(128, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.BatchNorm1d(128),
            nn.Flatten(),
            nn.Linear((input_dim//8)*128, input_dim)
        )

    def forward(self, x):
        x = x.unsqueeze(1)
        encoded = self.encoder(x)
        decoded = self.decoder(encoded)
        return decoded