In [20]:
import os
import wfdb
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import matplotlib.pyplot as plt
from torch.utils.data import DataLoader, random_split
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau
import torchvision.models as models
from sklearn.metrics import accuracy_score
from tqdm import tqdm
from sklearn.metrics import f1_score

In [103]:
class ECGDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        """
        root_dir: 데이터셋의 루트 디렉토리 (예: 'D:/mimiciv_data/mimic-iv-ecg/1.0/preprocessed')
        transform: 데이터에 적용할 변환
        """
        self.root_dir = root_dir
        self.transform = transform
        self.records = self._load_records()

    def _load_records(self):
        records = []
        # MI_F_ecg와 NOT_MI_F_ecg를 포함한 레이블 목록으로 업데이트
        for label in ['MI_M_ecg', 'NOT_MI_M_ecg', 'MI_F_ecg', 'NOT_MI_F_ecg']:
            label_dir = os.path.join(self.root_dir, label)
            for patient in os.listdir(label_dir):
                patient_dir = os.path.join(label_dir, patient)
                for session in os.listdir(patient_dir):
                    session_dir = os.path.join(patient_dir, session)
                    for file in os.listdir(session_dir):
                        if file.endswith('.hea'):
                            record_path = os.path.join(session_dir, file[:-4])  # '.hea' 확장자 제거
                            # MI_M_ecg 및 MI_F_ecg는 1로, NOT_MI_M_ecg 및 NOT_MI_F_ecg는 0으로 레이블 설정
                            records.append((record_path, 1 if 'MI_' in label else 0))
        return records

    def __len__(self):
        return len(self.records)

    def __getitem__(self, idx):
        rec_path, label = self.records[idx]
        record = wfdb.rdrecord(rec_path)
        data = record.p_signal.T  # ECG 데이터 로드 및 변환

        if self.transform:
            data = self.transform(data)

        data_tensor = torch.from_numpy(data).float()
        #data_tensor = data_tensor.view(1, 12, -1, 1)
        data_tensor = data_tensor.view(12, -1)

        return data_tensor, label

In [104]:
# Z-점수 정규화를 위한 Custom Transform 클래스 정의
class Standardize:
    def __call__(self, sample):
        return (sample - sample.mean()) / sample.std()

# 정규화 Transform을 정의합니다.
transform = transforms.Compose([
    Standardize()
])

In [105]:
dataset = ECGDataset(root_dir='C:\\Users\\Administrator\\24w_MI_Multimodal_Prediction\\processed_data\\ecg_data')

In [106]:
# Check if CUDA is available, otherwise use CPU
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# Define your save directory
save_dir = 'processed_data/ecg_data'
device

device(type='cpu')

In [95]:
# Define the ResNet-50 model
model_resnet50 = models.resnet50(pretrained=False)  # Load pre-trained ResNet-50
# Modify the final fully connected layer to match your classification task
num_classes = 2  # Replace with the number of classes in your dataset
num_ftrs = model_resnet50.fc.in_features
model_resnet50.fc = nn.Linear(num_ftrs, num_classes)
model_resnet50.to(device)

ResNet(
  (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 

In [107]:
# # Define the ResNet-50 model
# model_resnet50 = models.resnet50(pretrained=False)  # Load pre-trained ResNet-50
# # Modify the final fully connected layer to match your classification task
# num_classes = 2  # Replace with the number of classes in your dataset
# num_ftrs = model_resnet50.fc.in_features
# model_resnet50.fc = nn.Linear(num_ftrs, num_classes)
# model_resnet50.to(device)

model_resnet50 = models.resnet50(pretrained=False)  # 사전 훈련된 가중치 없이 ResNet-50 로드

# 첫 번째 콘볼루션 층의 입력 채널을 12로 변경
model_resnet50.conv1 = nn.Conv2d(12, 64, kernel_size=7, stride=2, padding=3, bias=False)

# 최종 완전 연결(FC) 층 수정하여 클래스 수 맞추기
num_classes = 2  # 데이터셋의 클래스 수
num_ftrs = model_resnet50.fc.in_features
model_resnet50.fc = nn.Linear(num_ftrs, num_classes)

# 모델을 GPU나 CPU로 이동
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model_resnet50.to(device)

ResNet(
  (conv1): Conv2d(12, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
  (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU(inplace=True)
  (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
  (layer1): Sequential(
    (0): Bottleneck(
      (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
      (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
      (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      (relu): ReLU(inplace=True)
      (downsample): Sequential(
        (0): Conv2d(64, 256, kernel_size=(1, 1), stride=(1,

In [84]:
#dataset = dataset.view(1, 12, 5000,1)

In [108]:
# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model_resnet50.parameters(), lr=0.003)

In [109]:
# 데이터셋 크기
total_size = len(dataset)

# 훈련 세트와 테스트 세트의 크기 계산
train_size = int(0.9 * total_size)
test_size = total_size - train_size

# 데이터셋을 훈련 세트와 테스트 세트로 무작위 분할
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

# DataLoader 인스턴스 생성
train_loader = DataLoader(train_dataset, batch_size=1, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False)

In [99]:
total_size

1742

In [110]:
def train(model, train_loader, criterion, optimizer, device):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    #for i, (inputs, labels) in enumerate(train_loader):
    for inputs, labels in train_loader:
        inputs = inputs.to(device).float()
        labels = labels.to(device).long()
        #inputs = inputs.permute(0, 2, 1)
        inputs = inputs.unsqueeze(-1)
        optimizer.zero_grad()

        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

        # 첫 번째 배치의 첫 번째 손실과 출력값 출력
        if i == 0:
            print(f'First batch, first prediction in this epoch: {predicted[0]}')
            print(f'First batch, first loss in this epoch: {loss.item()}')

    epoch_loss = running_loss / len(train_loader)
    epoch_acc = 100 * correct / total

    print(f'Training Loss: {epoch_loss:.4f}, Accuracy: {epoch_acc:.4f}%')
    return epoch_acc

In [111]:
def validate(model, test_loader, criterion, device):
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for i, (inputs, labels) in enumerate(test_loader):
            inputs = inputs.to(device).float()  
            labels = labels.to(device).long()
            inputs = inputs.permute(0, 2, 1)

            outputs = model(inputs)
            loss = criterion(outputs, labels)

            running_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

            # 첫 번째 배치의 첫 번째 손실과 출력값 출력
            if i == 0:
                print(f'First batch, prediction in this epoch: {predicted[0]}')
                print(f'First batch, loss in this epoch: {loss.item()}')

    epoch_loss = running_loss / len(test_loader)
    epoch_acc = 100 * correct / total

    print(f'Validation Loss: {epoch_loss:.4f}, Accuracy: {epoch_acc:.4f}%')
    return epoch_acc

In [112]:

# Training loop
num_epochs = 5  # Replace with the number of epochs you want to train for
best_loss = 100
for epoch in range(num_epochs):
    model_resnet50.train()  # Set the model to training mode
    running_loss = 0.0

    for inputs, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs} - Training"):
        inputs, labels = inputs.to(device), labels.to(device)
        
        optimizer.zero_grad()

        outputs = model_resnet50(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        if loss.item() < best_loss:
            best_loss = loss.item()
            # Save the model weights
            model_path = save_dir + 'resnet50.pth'  # Direct path concatenation
            torch.save(model_resnet50.state_dict(), model_path)

        running_loss += loss.item()

    # Calculate training accuracy
    model_resnet50.eval()  # Set the model to evaluation mode
    all_preds = []
    all_labels = []
    
    # Wrap validation_loader with tqdm for a progress bar
    with torch.no_grad():
        for inputs, labels in tqdm(test_loader, desc=f"Epoch {epoch+1}/{num_epochs} - Validation"):
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model_resnet50(inputs)
            _, preds = torch.max(outputs, 1)

            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())

    accuracy = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds)  # Calculate F1 score

    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss / len(train_loader)}, Validation Accuracy: {accuracy * 100:.2f}%, Validation F1 Score: {f1 * 100:.2f}%")

print("Training complete")

Epoch 1/5 - Training:   0%|          | 0/1567 [00:00<?, ?it/s]


RuntimeError: Given groups=1, weight of size [64, 12, 7, 7], expected input[1, 1, 12, 5000] to have 12 channels, but got 1 channels instead