In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
from sklearn.preprocessing import StandardScaler
import pandas as pd
from tqdm import tqdm
from sklearn.metrics.pairwise import cosine_similarity
import numpy as np

# Load the CSV files
train_df = pd.read_csv('./datasets/mitbih_dataset/mitbih_train.csv', header=None)
test_df = pd.read_csv('./datasets/mitbih_dataset/mitbih_test.csv', header=None)

# Map of original class labels
classes = {0: 'N', 1: 'S', 2: 'V', 3: 'F', 4: 'Q'}

# Add a new column for 'normal' vs 'not normal'
train_df['binary_label'] = train_df[187].apply(lambda x: 0 if x == 0 else 1)
test_df['binary_label'] = test_df[187].apply(lambda x: 0 if x == 0 else 1)

# Prepare the data
X_train = train_df.iloc[:, :187].values
y_train = train_df.iloc[:, 187].values
y_train_binary = train_df['binary_label'].values

X_test = test_df.iloc[:, :187].values
y_test = test_df.iloc[:, 187].values
y_test_binary = test_df['binary_label'].values

# Normalize the data
scaler = StandardScaler()
X_train = scaler.fit_transform(X_train)
X_test = scaler.transform(X_test)

# Convert to PyTorch tensors
X_train = torch.tensor(X_train, dtype=torch.float32)
y_train = torch.tensor(y_train, dtype=torch.long)
y_train_binary = torch.tensor(y_train_binary, dtype=torch.long)

X_test = torch.tensor(X_test, dtype=torch.float32)
y_test = torch.tensor(y_test, dtype=torch.long)
y_test_binary = torch.tensor(y_test_binary, dtype=torch.long)

# Create DataLoaders with both labels
batch_size = 1024

train_dataset = TensorDataset(X_train, y_train_binary, y_train)
test_dataset = TensorDataset(X_test, y_test_binary, y_test)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)



In [None]:

# Define BasicBlock1D
class BasicBlock1D(nn.Module):
    expansion = 1

    def __init__(self, inplanes, planes, stride=1, downsample=None):
        super(BasicBlock1D, self).__init__()
        self.conv1 = nn.Conv1d(inplanes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm1d(planes)
        self.relu = nn.ReLU(inplace=True)
        self.conv2 = nn.Conv1d(planes, planes, kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm1d(planes)
        self.downsample = downsample
        self.stride = stride

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.downsample is not None:
            identity = self.downsample(x)

        out += identity
        out = self.relu(out)

        return out

# Define ResNet1D
class ResNet1D(nn.Module):
    def __init__(self, block, layers, num_classes=2):
        super(ResNet1D, self).__init__()
        self.inplanes = 64
        self.conv1 = nn.Conv1d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)
        self.bn1 = nn.BatchNorm1d(64)
        self.relu = nn.ReLU(inplace=True)
        self.maxpool = nn.MaxPool1d(kernel_size=3, stride=2, padding=1)
        self.layer1 = self._make_layer(block, 64, layers[0], stride=1)
        self.layer2 = self._make_layer(block, 128, layers[1], stride=2)
        self.layer3 = self._make_layer(block, 256, layers[2], stride=2)
        self.layer4 = self._make_layer(block, 512, layers[3], stride=2)
        self.avgpool = nn.AdaptiveAvgPool1d(1)
        self.fc = nn.Linear(512 * block.expansion, num_classes)

        for m in self.modules():
            if isinstance(m, nn.Conv1d):
                nn.init.kaiming_normal_(m.weight, mode='fan_out', nonlinearity='relu')
            elif isinstance(m, nn.BatchNorm1d):
                nn.init.constant_(m.weight, 1)
                nn.init.constant_(m.bias, 0)

    def _make_layer(self, block, planes, blocks, stride=1):
        downsample = None
        if stride != 1 or self.inplanes != planes * block.expansion:
            downsample = nn.Sequential(
                nn.Conv1d(self.inplanes, planes * block.expansion,
                          kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm1d(planes * block.expansion),
            )

        layers = []
        layers.append(block(self.inplanes, planes, stride, downsample))
        self.inplanes = planes * block.expansion
        for _ in range(1, blocks):
            layers.append(block(self.inplanes, planes))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.layer4(x)

        x = self.avgpool(x)
        x = torch.flatten(x, 1)
        x = self.fc(x)


        return x

# Define ResNet18_1D function
def ResNet18_1D():
    return ResNet1D(BasicBlock1D, [2, 2, 2, 2])  # ResNet18 with 2 layers in each block

# Hyperparameters
lr = 0.001
num_epochs = 1

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = ResNet18_1D().to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr)

# Training loop
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    with tqdm(total=len(train_loader), desc=f'Epoch {epoch+1}/{num_epochs}', unit='batch') as pbar:
        for i, (inputs, binary_labels, original_labels) in enumerate(train_loader):
            inputs, binary_labels = inputs.unsqueeze(1).to(device), binary_labels.to(device)

            optimizer.zero_grad()

            outputs = model(inputs)
            loss = criterion(outputs, binary_labels)

            loss.backward()
            optimizer.step()

            running_loss += loss.item()
            pbar.set_postfix(loss=running_loss/(i+1))
            pbar.update(1)

    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(train_loader)}")



Epoch 1/1: 100%|██████████| 86/86 [00:15<00:00,  5.43batch/s, loss=0.0958]

Epoch 1/1, Loss: 0.09578382444762906





In [None]:
# Function to extract features
def extract_features(model, data_loader):
    features = []
    labels_list = []
    model.eval()
    with torch.no_grad():
        for inputs, binary_labels, original_labels in data_loader:
            inputs = inputs.unsqueeze(1).to(device)
            outputs = model(inputs)
            features.append(outputs)
            labels_list.append(original_labels)
    features = torch.cat(features)
    labels_list = torch.cat(labels_list)
    return features, labels_list

# Extract features and labels
test_features, test_labels = extract_features(model, test_loader)

# Calculate cosine similarity
distances = cosine_similarity(test_features.detach().cpu(), test_features.detach().cpu())
np.fill_diagonal(distances, float('-inf'))

# Functions for top-k accuracy
def find_max_indices(arr, k):
    if k > len(arr):
        raise ValueError("k cannot be greater than the size of the array")
    idx = np.argpartition(arr, -k)[-k:]
    sorted_idx = np.argsort(arr[idx])[::-1]
    return idx[sorted_idx]

def compute_top_k_accuracy(distances, all_ids, k):
    correct = 0
    for id_, elem in enumerate(distances, start=0):
        indices = find_max_indices(elem, k)
        candidates = [all_ids[indices[i]].item() for i in range(len(indices))]
        if all_ids[id_].item() in candidates:
            correct += 1
    accuracy = correct / len(distances)
    return accuracy

# Compute top-k accuracy
real_top_accuracies = []
for k in range(1, 6):
    accuracy = compute_top_k_accuracy(distances, test_labels, k)
    real_top_accuracies.append(accuracy)
    print(f"Top-{k} : {accuracy:.4f}")



Top-1 : 0.8388
Top-2 : 0.9239
Top-3 : 0.9467
Top-4 : 0.9598

Top-k Accuracies:
Top-1 Accuracy: 0.8388
Top-2 Accuracy: 0.9239
Top-3 Accuracy: 0.9467
Top-4 Accuracy: 0.9598
