In [1]:
import torch
import torch.nn as nn
import torchvision.models as models
import torchvision.transforms as transforms
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import pandas as pd
import os
from typing import List
import numpy as np
from sklearn.model_selection import train_test_split
from imblearn.over_sampling import ADASYN
from sklearn.model_selection import StratifiedKFold
import torch.nn.functional as F


In [61]:
class ColNames:
    time_ms = 'time[min]'
    rri_ms = 'rri[ms]'
    rr_systolic_mmHg = 'rr-systolic[mmHg]'
    rr_diastolic_mmHg = 'rr-diastolic[mmHg]'
    rr_mean_mmHg = 'rr-mean[mmHg]'
    rr_flags = 'rr-flags[]'
    ibi_ms = 'ibi[ms]'
    file_name = 'file name'
    sex = 'SEX [nominal codes: "1" woman; "2" man]'

folder_path = 'HYPOL RECORDINGS/'

class MyData:
    def __init__(self, target, value):
        self.target = target
        self.value = value

    def DFToVector(self):
        if isinstance(self.value, pd.DataFrame):
            return self.value.to_numpy().ravel()
        
    def DFToMatrix(self):
        if isinstance(self.value, pd.DataFrame):
            matrix = self.value.to_numpy(dtype=np.float32)
            return matrix

    def DFModify(self):
        if isinstance(self.value, pd.DataFrame):
            columns_to_remove = [ColNames.time_ms, ColNames.rr_flags]
            self.value = self.value.drop(columns=columns_to_remove)
            self.target -= 1


class EKGDataset(Dataset):
    def __init__(self, data, targets, transform=None):
        self.data = data
        self.targets = targets
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        sample, label = self.data[idx], self.targets[idx]
        if self.transform:
            sample = self.transform(sample)
        return sample, label
    
def limit_row(df_dict, limit):
    df_dict_res = {}
    for key in df_dict:
        df_temp = df_dict[key]
        df_dict_res[key] = df_temp.head(limit)

    return df_dict_res

def merge_v1(df_dict, df_labels, target) -> dict[str,MyData]:
    dic_res = {}
    for key in df_dict:
        dic_res[key] = MyData(df_labels.loc[df_labels[ColNames.file_name] == key].iloc[0][target], df_dict[key])
    return dic_res

device = torch.device("cuda:0") if torch.cuda.is_available() else torch.device("cpu")

In [30]:

resnet = models.resnet18(pretrained=True)
torch.save(resnet.state_dict(), 'resnet18.pth')



In [46]:
resnet = models.resnet18()
resnet.load_state_dict(torch.load('resnet18.pth'))

<All keys matched successfully>

In [62]:


# Inicjalizacja pustego słownika
ekg_dict = {}

# Iteracja przez pliki w folderze
for filename in os.listdir(folder_path):
    # Sprawdzenie, czy plik ma rozszerzenie .rea (załóżmy, że wszystkie pliki EKG mają to rozszerzenie)
    if filename.endswith('.rea'):
        # Pełna ścieżka do pliku
        file_path = os.path.join(folder_path, filename)
        
        # Wczytanie pliku do DataFrame
        df_temp = pd.read_csv(file_path, delimiter='\t', header=0)
        
        # Dodanie do słownika, gdzie kluczem jest nazwa pliku, a wartością DataFrame
        ekg_dict[filename] = df_temp

df_main = pd.read_excel('HYPOL clinical characteristics.xls')
ekg_dict_1000 = limit_row(ekg_dict, 700)
merged_dict = merge_v1(ekg_dict_1000, df_main, ColNames.sex)
for val in merged_dict.values():
    val.DFModify()

In [63]:
device

device(type='cuda', index=0)

In [64]:
datas: List[MyData] = list(merged_dict.values())
targets = np.array([data.target for data in datas])
values = np.array([data.DFToMatrix() for data in datas])
X_train, X_test, y_train, y_test = train_test_split(values, targets, test_size=0.2, random_state=42, stratify=targets)

# unique_classes, class_counts = np.unique(y_train, return_counts=True)
# # Zastosowanie ADASYN do zwiększenia liczby danych 10-krotnie

# target_ratio = {cls: count * 20 for cls, count in zip(unique_classes, class_counts)}

# adasyn = ADASYN(sampling_strategy=target_ratio, random_state=42)
# X_train_resampled, y_train_resampled = adasyn.fit_resample(X_train, y_train)
# train_values_tensor = torch.tensor(train_values).double()  # Przekształć do tensora i typu float
# test_values_tensor = torch.tensor(test_values).double()
# train_targets_tensor = torch.tensor(train_targets)
# test_targets_tensor = torch.tensor(test_targets)

In [82]:
transform = transforms.Compose([
    transforms.ToTensor(),  # Zamień dane na tensor
    
    # Możesz dodać więcej transformacji, np. Normalizacja
])
# targets_tensor = torch.tensor(targets, dtype=torch.long).to(device)
# values_tensor = torch.tensor(values, dtype=torch.float64).to(device)

skf = StratifiedKFold(n_splits=5, random_state=42, shuffle=True)
train_accuracies = []
test_accuracies = []
for train_index, test_index in skf.split(values, targets):
    # Podział danych na zbiory treningowy i testowy
    X_train, X_test = values[train_index], values[test_index]
    y_train, y_test = targets[train_index], targets[test_index]

    # unique_classes, class_counts = np.unique(y_train, return_counts=True)
    # target_ratio = {cls: count * 10 for cls, count in zip(unique_classes, class_counts)}
    # adasyn = ADASYN(sampling_strategy=target_ratio, random_state=42)
    # X_train, y_train = adasyn.fit_resample(X_train, y_train)

    # Załaduj dane
    train_dataset = EKGDataset(X_train, y_train, transform=transform)
    test_dataset = EKGDataset(X_test, y_test, transform=transform)

    # Załaduj dane za pomocą DataLoader
    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, pin_memory=True)
    test_loader = DataLoader(test_dataset, batch_size=32, pin_memory=True)

    # Załaduj wstępnie wytrenowany model ResNet18
    # resnet = models.resnet18(pretrained=True)
    resnet = models.resnet18()
    resnet.load_state_dict(torch.load('resnet18.pth'))
    resnet.conv1 = nn.Conv2d(1, 64, kernel_size=7, stride=2, padding=3, bias=False)
    # Zamień ostatnią warstwę klasyfikacyjną na nową, dopasowaną do naszego zadania
    num_ftrs = resnet.fc.in_features
    resnet.fc = nn.Sequential(
    nn.Linear(num_ftrs, 256),  # Pierwsza warstwa gęsta
    nn.ReLU(),  # Funkcja aktywacji
    nn.Linear(256, 128),  # Druga warstwa gęsta
    nn.ReLU(),  # Funkcja aktywacji
    nn.Linear(128, 2)  # Warstwa wyjściowa
) # num_classes - liczba klas w naszym zbiorze danych

    # Definicja funkcji kosztu i optymalizatora
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(resnet.parameters(), lr=0.001)
    resnet.to(device)
    # Trenowanie modelu
    num_epochs = 20
    for epoch in range(num_epochs):
        resnet.train()
        running_loss = 0.0
        for inputs, labels in train_loader:
            optimizer.zero_grad()
            labels = labels.to(device)
            inputs = inputs.to(device, dtype=torch.float32)
            outputs = resnet(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
        if epoch % 5 == 4: 
            print(f"Epoch {epoch+1}, Loss: {running_loss/len(train_loader)}")


    # Testowanie modelu na zbiorze testowym
    resnet.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for inputs, labels in test_loader:
            labels = labels.to(device)
            inputs = inputs.to(device)
            outputs = resnet(inputs)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    print(f"Accuracy on test set: {(correct/total)*100}%")
    test_accuracies.append((correct/total)*100)
avg_train_accuracy = np.mean(train_accuracies)
avg_test_accuracy = np.mean(test_accuracies)
print("Średnia dokładność modelu SVM na danych treningowych: {:.2f}%".format(avg_train_accuracy * 100))
print("Średnia dokładność modelu SVM na danych testowych: {:.2f}%".format(avg_test_accuracy))

Epoch 5, Loss: 0.5564269168036324
Epoch 10, Loss: 0.36637295143944876
Epoch 15, Loss: 0.3099286939416613
Epoch 20, Loss: 0.1121119452374322
Accuracy on test set: 57.14285714285714%
Epoch 5, Loss: 0.5555388331413269
Epoch 10, Loss: 0.2873157869492258
Epoch 15, Loss: 0.2004724240728787
Epoch 20, Loss: 0.2568089264844145
Accuracy on test set: 45.45454545454545%
Epoch 5, Loss: 0.5952494059290204
Epoch 10, Loss: 0.38512344871248516
Epoch 15, Loss: 0.32345695580754963
Epoch 20, Loss: 0.19048451073467731
Accuracy on test set: 65.45454545454545%
Epoch 5, Loss: 0.6729573948042733
Epoch 10, Loss: 0.6269346305302211
Epoch 15, Loss: 0.5050546612058368
Epoch 20, Loss: 0.28520082788808004
Accuracy on test set: 54.54545454545454%
Epoch 5, Loss: 0.6202406925814492
Epoch 10, Loss: 0.48150877015931265
Epoch 15, Loss: 0.27897091635635923
Epoch 20, Loss: 0.36400813715798513
Accuracy on test set: 63.63636363636363%
Średnia dokładność modelu SVM na danych treningowych: nan%
Średnia dokładność modelu SVM na 

  return _methods._mean(a, axis=axis, dtype=dtype,
  ret = ret.dtype.type(ret / rcount)
