In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torch.utils.data import DataLoader, Dataset
from sklearn.model_selection import train_test_split
from sklearn.model_selection import KFold
from sklearn.metrics import accuracy_score
import numpy as np
import scipy.io


In [None]:
class CNNBlock(nn.Module):
    def __init__(self, in_layer, out_layer, kernel_size, stride):
        super(CNNBlock, self).__init__()

        self.conv1 = nn.Conv1d(in_layer, out_layer, kernel_size=kernel_size, stride=stride, bias=True)
        self.bn = nn.BatchNorm1d(out_layer)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.conv1(x)
        x = self.bn(x)
        out = self.relu(x)

        return out

class SEBlock(nn.Module):
    def __init__(self,in_layer, out_layer):
        super(SEBlock, self).__init__()

        self.conv1 = nn.Conv1d(in_layer, out_layer//16, kernel_size=1, padding=0)
        self.conv2 = nn.Conv1d(out_layer//16, in_layer, kernel_size=1, padding=0)
        # self.fc = nn.Linear(1,out_layer//16)
        # self.fc2 = nn.Linear(out_layer//16,out_layer)
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()

    def forward(self,x):

        x_se = nn.functional.adaptive_avg_pool1d(x,1)
        x_se = self.conv1(x_se)
        x_se = self.relu(x_se)
        x_se = self.conv2(x_se)
        x_se = self.sigmoid(x_se)

        x_out = torch.add(x, x_se)
        return x_out
'''
class SEBlock(nn.Module):
    def __init__(self, in_channels, reduction=16):
        super(SEBlock, self).__init__()

        self.global_avg_pool = nn.AdaptiveAvgPool1d(1)
        self.fc1 = nn.Linear(in_channels, in_channels // reduction, bias=True)
        self.fc2 = nn.Linear(in_channels // reduction, in_channels, bias=True)
        self.activation_1 = nn.ReLU()
        self.activation_2 = nn.Sigmoid()

    def forward(self, x):
        batch_size, num_channels, H = x.size()
        squeeze_x = self.global_avg_pool(x).view(batch_size, num_channels)

        squeeze_x = self.activation_1(self.fc1(squeeze_x))
        squeeze_x = self.activation_2(self.fc2(squeeze_x))
        return x * squeeze_x.view(batch_size, num_channels, 1)
'''
class AttentionBlock(nn.Module):
    def __init__(self, input_dim, output_dim):
        super(AttentionBlock, self).__init__()
        self.fc1 = nn.Linear(input_dim, output_dim)
        self.fc2 = nn.Linear(output_dim, output_dim * 2)
        self.fc3 = nn.Linear(output_dim * 2, input_dim)
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, x):
        attention = self.fc1(x)
        attention = F.relu(attention)
        attention = self.fc2(attention)
        attention = F.relu(attention)
        attention = self.fc3(attention)
        attention = self.softmax(attention)
        return attention * x

class BiLSTMBlock(nn.Module):
    def __init__(self, input_dim, hidden_dim, num_layers=2, dropout=0.4):
        super(BiLSTMBlock, self).__init__()
        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers=num_layers, dropout=dropout, batch_first=True, bidirectional=True)

    def forward(self, x):
        x, _ = self.lstm(x)
        return x

class DenseBlock(nn.Module):
    def __init__(self, input_dim, output_dim, num_layers=1, dropout=0.2):
        super(DenseBlock, self).__init__()
        self.fc1 = nn.Linear(input_dim, output_dim)
        self.dropout = nn.Dropout(dropout)

    def forward(self, x):
        x = self.fc1(x)
        x = self.dropout(x)

        return x



class MuDANet(nn.Module):
    def __init__(self, num_classes):
        super(MuDANet, self).__init__()

        # CNNBlock1 Stream-1
        self.cnn_block1_stream1 = CNNBlock(1, 128, 3, 1)
        self.se_block1_stream1 = SEBlock(128, 128)
        self.cnn_block2_stream1 = CNNBlock(128, 256, 9, 3)
        self.se_block2_stream1 = SEBlock(256, 256)
        self.cnn_block3_stream1 = CNNBlock(256, 256, 9, 3)
        self.se_block3_stream1 = SEBlock(256, 256)
        self.dense_block1_stream1 = DenseBlock(76032, 32)
        self.dense_block2_stream1 = DenseBlock(32, 64)
        self.dense_block3_stream1 = DenseBlock(64, 128)

        # CNNBlock2 Stream-1
        self.cnn_block4_stream1 = CNNBlock(1, 128, 3, 1)
        self.se_block4_stream1 = SEBlock(128, 128)
        self.cnn_block5_stream1 = CNNBlock(128, 256, 9, 3)
        self.se_block5_stream1 = SEBlock(256, 256)
        self.cnn_block6_stream1 = CNNBlock(256, 256, 9, 3)
        self.se_block6_stream1 = SEBlock(256, 256)
        self.dense_block4_stream1 = DenseBlock(2816, 32)
        self.dense_block5_stream1 = DenseBlock(32, 64)
        self.dense_block6_stream1 = DenseBlock(64, 128)
        self.attention_block_stream1 = AttentionBlock(128, 384)



        # CNNBlock1 Stream-2
        self.cnn_block1_stream2 = CNNBlock(1, 128, 3, 1)
        self.se_block1_stream2 = SEBlock(128, 128)
        self.cnn_block2_stream2 = CNNBlock(128, 256, 9, 3)
        self.se_block2_stream2 = SEBlock(256, 256)
        self.cnn_block3_stream2 = CNNBlock(256, 256, 9, 3)
        self.se_block3_stream2 = SEBlock(256, 256)
        self.dense_block1_stream2 = DenseBlock(76032, 32)
        self.dense_block2_stream2 = DenseBlock(32, 64)
        self.dense_block3_stream2 = DenseBlock(64, 128)

        self.cnn_block4_stream2 = CNNBlock(1, 128, 3, 1)
        self.se_block4_stream2 = SEBlock(128, 128)
        self.cnn_block5_stream2 = CNNBlock(128, 256, 9, 3)
        self.se_block5_stream2 = SEBlock(256, 256)
        self.cnn_block6_stream2 = CNNBlock(256, 256, 9, 3)
        self.se_block6_stream2 = SEBlock(256, 256)
        self.dense_block4_stream2 = DenseBlock(2816, 32)
        self.dense_block5_stream2 = DenseBlock(32, 64)
        self.dense_block6_stream2 = DenseBlock(64, 128)
        self.attention_block_stream2 = AttentionBlock(128, 384)

        # Bi-LSTM Block
        self.bilstm1_block = BiLSTMBlock(128, 256)
        self.bilstm2_block = BiLSTMBlock(128, 256)

        # Fully Connected Block
        self.fc1 = DenseBlock(512, 1024)
        self.fc2 = DenseBlock(1024, 1024)
        self.fc3 = DenseBlock(1024, 256, dropout=0.0)
        self.fc4 = DenseBlock(256, 3, dropout=0.0)


    def forward(self, x):
        # Stream-1
        x1 = self.cnn_block1_stream1(x)
        x1 = self.se_block1_stream1(x1)
        x1 = self.cnn_block2_stream1(x1)
        x1 = self.se_block2_stream1(x1)
        x1 = self.cnn_block3_stream1(x1)
        x1 = self.se_block3_stream1(x1)
        x1 = torch.flatten(x1, start_dim=1)
        x1 = self.dense_block1_stream1(x1)
        x1 = self.dense_block2_stream1(x1)
        x1 = self.dense_block3_stream1(x1)

        x1 = x1.view(x1.size(0), 1, -1)
        x1 = self.cnn_block4_stream1(x1)
        x1 = self.se_block4_stream1(x1)
        x1 = self.cnn_block5_stream1(x1)
        x1 = self.se_block5_stream1(x1)
        x1 = self.cnn_block6_stream1(x1)
        x1 = self.se_block6_stream1(x1)
        x1 = torch.flatten(x1, start_dim=1)
        x1 = self.dense_block4_stream1(x1)
        x1 = self.dense_block5_stream1(x1)
        x1 = self.dense_block6_stream1(x1)
        x1 = self.attention_block_stream1(x1)

        # Stream-2
        x2 = self.cnn_block1_stream2(x)
        x2 = self.se_block1_stream2(x2)
        x2 = self.cnn_block2_stream2(x2)
        x2 = self.se_block2_stream2(x2)
        x2 = self.cnn_block3_stream2(x2)
        x2 = self.se_block3_stream2(x2)
        x2 = torch.flatten(x2, start_dim=1)
        x2 = self.dense_block1_stream2(x2)
        x2 = self.dense_block2_stream2(x2)
        x2 = self.dense_block3_stream2(x2)
        x2 = x2.view(x2.size(0), 1, -1)
        x2 = self.cnn_block4_stream2(x2)
        x2 = self.se_block4_stream2(x2)
        x2 = self.cnn_block5_stream2(x2)
        x2 = self.se_block5_stream2(x2)
        x2 = self.cnn_block6_stream2(x2)
        x2 = self.se_block6_stream2(x2)
        x2 = torch.flatten(x2, start_dim=1)

        x2 = self.dense_block4_stream2(x2)
        x2 = self.dense_block5_stream2(x2)
        x2 = self.dense_block6_stream2(x2)
        x2 = self.attention_block_stream2(x2)



        x_fused = x1 + x2

        x_fused = torch.flatten(x_fused, start_dim=1)
        x_fused = x_fused.view(x_fused.size(0), 1, -1)
        # Bi-LSTM
        x_fused1 = self.bilstm1_block(x_fused)
        x_fused2 = self.bilstm2_block(x_fused)
        x_fused = torch.add(x_fused1, x_fused2)
        # Fully Connected Layers
        x_fused = x_fused.view(x_fused.size(0), -1)
        x_fused = F.relu(self.fc1(x_fused))
        x_fused = F.relu(self.fc2(x_fused))


        x_fused = F.relu(self.fc3(x_fused))

        x_fused = self.fc4(x_fused)

        return F.softmax(x_fused, dim=-1)

# 일반 학습

In [None]:
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=30, device='cuda'):
    model = model.to(device)

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        train_loss = running_loss / len(train_loader.dataset)
        train_accuracy = correct / total
        print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {train_loss:.4f}, Accuracy: {train_accuracy:.4f}')

        val_loss, val_accuracy = evaluate_model(model, val_loader, criterion, device)
        print(f'Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}')
        fold_accuracy.append(val_accuracy)


def evaluate_model(model, val_loader, criterion, device='cuda'):
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            val_loss += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    val_loss /= len(val_loader.dataset)
    val_accuracy = correct / total
    return val_loss, val_accuracy


class ECGDataset(Dataset):
    def __init__(self, data, labels):
        self.data = data
        self.labels = labels

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]

def weights_init(m):
    if isinstance(m, nn.Conv1d):
        nn.init.xavier_uniform_(m.weight)

def main():
    np_data = np.load('/content/drive/MyDrive/ECG/training_9s.npy').astype(np.float32)
    label = np.load('/content/drive/MyDrive/ECG/training_9s_label.npy').astype(np.float32)


    data = torch.tensor(np_data.reshape(np_data.shape[0], 1, np_data.shape[2]), dtype=torch.float32)
    labels = torch.tensor(label, dtype=torch.long)
    train_data, val_data, train_labels, val_labels = train_test_split(data, labels, test_size=0.1)

    train_dataset = ECGDataset(train_data, train_labels)
    val_dataset = ECGDataset(val_data, val_labels)


    train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

    model = MuDANet(num_classes=3)
    model.apply(weights_init)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.0001)

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=40, device=device)

if __name__ == '__main__':
    main()
    print(f"\nOverall Accuracy: {np.mean(fold_accuracy):.4f}")

# K - Fold

In [None]:
fold_accuracy = []
def train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=30, device='cuda'):
    model = model.to(device)

    for epoch in range(num_epochs):
        model.train()
        running_loss = 0.0
        correct = 0
        total = 0

        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

        train_loss = running_loss / len(train_loader.dataset)
        train_accuracy = correct / total
        print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {train_loss:.4f}, Accuracy: {train_accuracy:.4f}')

        val_loss, val_accuracy = evaluate_model(model, val_loader, criterion, device)
        print(f'Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.4f}')
        fold_accuracy.append(val_accuracy)


def evaluate_model(model, val_loader, criterion, device='cuda'):
    model.eval()
    val_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            val_loss += loss.item() * inputs.size(0)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    val_loss /= len(val_loader.dataset)
    val_accuracy = correct / total
    return val_loss, val_accuracy


class ECGDataset(Dataset):
    def __init__(self, data, labels):
        self.data = data
        self.labels = labels

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return self.data[idx], self.labels[idx]

def weights_init(m):
    if isinstance(m, nn.Conv1d):
        nn.init.xavier_uniform_(m.weight)

def main():

    np_data = np.load('/content/drive/MyDrive/ECG/training_9s.npy').astype(np.float32)
    label = np.load('/content/drive/MyDrive/ECG/training_9s_label.npy').astype(np.float32)
    X_tensor = torch.tensor(np_data.reshape(np_data.shape[0], 1, np_data.shape[2]), dtype=torch.float32)
    y_tensor = torch.tensor(label, dtype=torch.long)

    kf = KFold(n_splits=10, shuffle=True, random_state=42)

    for fold, (train_index, test_index) in enumerate(kf.split(X_tensor)):
        print(f"Fold {fold + 1}")
        # Train/test 데이터 나누기
        X_train, X_test = X_tensor[train_index], X_tensor[test_index]
        y_train, y_test = y_tensor[train_index], y_tensor[test_index]

        # 데이터 로더 생성
        train_dataset = ECGDataset(X_train, y_train)
        test_dataset = ECGDataset(X_test, y_test)

        train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
        val_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

        model = MuDANet(num_classes=3)
        model.apply(weights_init)

        criterion = nn.CrossEntropyLoss()
        optimizer = optim.Adam(model.parameters(), lr=0.0001)

        device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        train_model(model, train_loader, val_loader, criterion, optimizer, num_epochs=40, device=device)



if __name__ == '__main__':
    main()
    print(f"\nOverall Accuracy: {np.mean(fold_accuracy):.4f}")