In [None]:
# TODO:
# DICE loss
# sprawdzenie na innym datasecie
# przeniesienie do plików py
# uprzątnięcie repo
# dokumentacja
# interfejs
# Pasek do trenowania

In [None]:
!pip install wfdb

Collecting wfdb
  Downloading wfdb-4.1.2-py3-none-any.whl.metadata (4.3 kB)
Downloading wfdb-4.1.2-py3-none-any.whl (159 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m160.0/160.0 kB[0m [31m3.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: wfdb
Successfully installed wfdb-4.1.2


In [None]:
import os
import numpy as np
import wfdb
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from torch.utils.data import DataLoader, Dataset
from sklearn.metrics import accuracy_score

In [None]:
%cd /content/drive/MyDrive/Stopień_2/Semestr_2/DNN
# Define parameters
data_dir = 'files'  # Directory containing the records
sequence_length = 3000  # Sequence length (in samples)
# fs = 360  # Sampling frequency of MIT-BIH (use 360 Hz)
batch_size = 32
num_epochs = 10
learning_rate = 0.001

/content/drive/MyDrive/Stopień_2/Semestr_2/DNN


In [None]:
# Custom Dataset for ECG data
class ECGDataset(Dataset):
    def __init__(self, signals, labels):
        self.signals = signals
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        signal = self.signals[idx]
        label = self.labels[idx]
        return torch.tensor(signal, dtype=torch.float32), torch.tensor(label, dtype=torch.uint8)

In [None]:
def create_label(ann, signal_len):
    '''Create a label containing ones where AFIB is present and zeros elsewhere'''
    # create label placeholder
    label = np.zeros(signal_len, dtype=np.uint8)
    aux_notes = ann.aux_note
    # add signal length to samples to make it easier to iterate
    samples = np.append(ann.sample, signal_len)
    # add end to aux_notes to make it easier to iterate
    aux_notes.append('END')

    start = 0
    is_afib = False
    for i, note in enumerate(aux_notes):
        if note == '(AFIB':
            is_afib = True
            start = samples[i]
        else:
            end = samples[i]
            if is_afib:
                label[start:end] = 1
    return label

In [None]:
# Feature extraction and data loading
def load_data(record_names, af_symbols=['(AFIB'], segment_length=sequence_length):
    X = []
    y = []

    for record_name in record_names:
        # Load the ECG signal and annotation
        print(record_name)
        record_path = os.path.join(data_dir, record_name)
        signal, fields = wfdb.rdsamp(record_path)
        annotation = wfdb.rdann(record_path, 'atr')

        # Extract the first channel (ECG signal)
        ecg_signal = signal[:, 0]
        ecg_label = create_label(annotation, len(ecg_signal))

        # Segment the ECG signal into fixed-length windows
        num_segments = len(ecg_signal) // segment_length
        for i in range(num_segments):
            segment_start = i * segment_length
            segment_end = (i + 1) * segment_length
            segment = ecg_signal[segment_start:segment_end]
            label = ecg_label[segment_start:segment_end]
            X.append(segment)
            y.append(label)

    return np.array(X), np.array(y)

In [None]:
# Load data
excluded_records = ['00735', '03665']
db_path = './files/'
record_names = np.loadtxt(db_path + 'RECORDS', dtype=str)
record_names = [name for name in record_names if name not in excluded_records]
X, y = load_data(record_names)

04015
04043
04048
04126
04746
04908
04936
05091
05121
05261
06426
06453
06995
07162
07859
07879
07910
08215
08219
08378
08405
08434
08455


In [None]:
# Scale the data
scaler = StandardScaler()
X = scaler.fit_transform(X.reshape(-1, sequence_length)).reshape(-1, 1, sequence_length)

In [None]:
# Split data
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

In [None]:
# Create DataLoaders
train_dataset = ECGDataset(X_train, y_train)
test_dataset = ECGDataset(X_test, y_test)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [None]:
class UNet1D(nn.Module):
    def __init__(self, in_channels=1, out_channels=1, init_features=32):
        super(UNet1D, self).__init__()
        features = init_features

        # Encoder
        self.encoder1 = UNet1D._block(in_channels, features, name="enc1")
        self.pool1 = nn.MaxPool1d(kernel_size=2, stride=2)
        self.encoder2 = UNet1D._block(features, features * 2, name="enc2")
        self.pool2 = nn.MaxPool1d(kernel_size=2, stride=2)
        self.encoder3 = UNet1D._block(features * 2, features * 4, name="enc3")
        self.pool3 = nn.MaxPool1d(kernel_size=2, stride=2)
        self.encoder4 = UNet1D._block(features * 4, features * 8, name="enc4")
        self.pool4 = nn.MaxPool1d(kernel_size=2, stride=2)

        # Bottleneck
        self.bottleneck = UNet1D._block(features * 8, features * 16, name="bottleneck")

        # Decoder
        self.upconv4 = nn.ConvTranspose1d(features * 16, features * 8, kernel_size=2, stride=2)
        self.decoder4 = UNet1D._block(features * 16, features * 8, name="dec4")
        self.upconv3 = nn.ConvTranspose1d(features * 8, features * 4, kernel_size=2, stride=2)
        self.decoder3 = UNet1D._block(features * 8, features * 4, name="dec3")
        self.upconv2 = nn.ConvTranspose1d(features * 4, features * 2, kernel_size=2, stride=2)
        self.decoder2 = UNet1D._block(features * 4, features * 2, name="dec2")
        self.upconv1 = nn.ConvTranspose1d(features * 2, features, kernel_size=2, stride=2)
        self.decoder1 = UNet1D._block(features * 2, features, name="dec1")

        # Output layer
        self.conv = nn.Conv1d(features, out_channels, kernel_size=1)

    def forward(self, x):
        # Encoder
        enc1 = self.encoder1(x)
        enc2 = self.encoder2(self.pool1(enc1))
        enc3 = self.encoder3(self.pool2(enc2))
        enc4 = self.encoder4(self.pool3(enc3))

        # Bottleneck
        bottleneck = self.bottleneck(self.pool4(enc4))

        # Decoder
        dec4 = self.upconv4(bottleneck)
        if dec4.size(-1) != enc4.size(-1):
            enc4 = enc4[:, :, :dec4.size(-1)]
        dec4 = torch.cat((dec4, enc4), dim=1)
        dec4 = self.decoder4(dec4)

        dec3 = self.upconv3(dec4)
        if dec3.size(-1) != enc3.size(-1):
            enc3 = enc3[:, :, :dec3.size(-1)]
        dec3 = torch.cat((dec3, enc3), dim=1)
        dec3 = self.decoder3(dec3)

        dec2 = self.upconv2(dec3)
        if dec2.size(-1) != enc2.size(-1):
            enc2 = enc2[:, :, :dec2.size(-1)]
        dec2 = torch.cat((dec2, enc2), dim=1)
        dec2 = self.decoder2(dec2)

        dec1 = self.upconv1(dec2)
        if dec1.size(-1) != enc1.size(-1):
            enc1 = enc1[:, :, :dec1.size(-1)]
        dec1 = torch.cat((dec1, enc1), dim=1)
        dec1 = self.decoder1(dec1)

        # Output layer
        output = self.conv(dec1)

        return torch.sigmoid(output)

    @staticmethod
    def _block(in_channels, features, name):
        return nn.Sequential(
            nn.Conv1d(in_channels, features, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv1d(features, features, kernel_size=3, padding=1),
            nn.ReLU(inplace=True)
        )

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = UNet1D(in_channels=1, out_channels=1).to(device)
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)

In [None]:
print('Training')
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for i, (signals, labels) in enumerate(train_loader):
        signals, labels = signals.to(device), labels.to(device).float()  # Ensure float labels
        labels = labels.unsqueeze(1)
        # Forward pass
        outputs = model(signals)

        # Ensure output and labels match in shape

        loss = criterion(outputs, labels)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    print(f"Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss / len(train_loader):.4f}")

Training
Epoch [1/10], Loss: 0.6668
Epoch [2/10], Loss: 0.6075
Epoch [3/10], Loss: 0.3976
Epoch [4/10], Loss: 0.2929
Epoch [5/10], Loss: 0.2490
Epoch [6/10], Loss: 0.2255
Epoch [7/10], Loss: 0.2109
Epoch [8/10], Loss: 0.2001
Epoch [9/10], Loss: 0.1920
Epoch [10/10], Loss: 0.1820


In [None]:
# Evaluation
torch.save(model.state_dict(), 'modelunet11.pt')
model.load_state_dict(torch.load('modelunet11.pt'))
y_pred = []
y_true = []
with torch.no_grad():
    for signals, labels in test_loader:
        signals, labels = signals.to(device), labels.to(device).float()
        outputs = model(signals)

        # Threshold predictions
        predicted = (outputs > 0.5).float()

        y_pred.extend(predicted.cpu().numpy().flatten())
        y_true.extend(labels.cpu().numpy().flatten())

# Accuracy
accuracy = accuracy_score(y_true, y_pred)
print("Test Accuracy:", accuracy)

  model.load_state_dict(torch.load('modelunet11.pt'))


Test Accuracy: 0.9233489149768765
