<a href="https://colab.research.google.com/github/fathursidiq/CNN/blob/main/ECG_Seg_Class.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
# Generate dummy data for ECG segments and labels
num_samples = 1000  # Number of samples
segment_length = 250  # Length of each ECG segment (e.g., 1 second sampled at 250 Hz)
num_classes = 5  # Number of classification classes

# Create random ECG segments (simulated as sine waves with added noise)
time = np.linspace(0, 1, segment_length, endpoint=False)
dummy_segments = np.array([np.sin(2 * np.pi * 5 * time) + 0.1 * np.random.randn(segment_length) for _ in range(num_samples)])

# Normalize the ECG segments to a standard range (optional, depends on your preprocessing pipeline)
dummy_segments = (dummy_segments - np.min(dummy_segments)) / (np.max(dummy_segments) - np.min(dummy_segments))

# Create random labels for the segments
dummy_labels = np.random.randint(0, num_classes, size=num_samples)

# Save the dummy data to .npy files
np.save('ecg_segments.npy', dummy_segments)
np.save('ecg_labels.npy', dummy_labels)

# Output file paths
'ecg_segments.npy', 'ecg_labels.npy'


In [None]:
# 1. Dataset Preparation
class ECGDataset(Dataset):
    def __init__(self, data, labels):
        self.data = data
        self.labels = labels

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        return torch.tensor(self.data[idx], dtype=torch.float32), torch.tensor(self.labels[idx], dtype=torch.long)

# Load your preprocessed ECG data (example placeholders)
data = np.load('ecg_segments.npy')  # Shape: (num_samples, segment_length)
labels = np.load('ecg_labels.npy')  # Shape: (num_samples, )

# Split data into training and validation sets
train_size = int(0.8 * len(data))
val_size = len(data) - train_size
train_data, val_data = torch.utils.data.random_split(
    list(zip(data, labels)), [train_size, val_size]
)

train_dataset = ECGDataset(*zip(*train_data))
val_dataset = ECGDataset(*zip(*val_data))

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)

# 2. Model Definition
class ECGClassifier(nn.Module):
    def __init__(self):
        super(ECGClassifier, self).__init__()
        self.conv_layers = nn.Sequential(
            nn.Conv1d(1, 16, kernel_size=5, stride=1, padding=2),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2),
            nn.Conv1d(16, 32, kernel_size=5, stride=1, padding=2),
            nn.ReLU(),
            nn.MaxPool1d(kernel_size=2)
        )
        self.fc_layers = nn.Sequential(
            nn.Linear(32 * (data.shape[1] // 4), 128),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(128, 5)  # Assuming 5 classes for classification
        )

    def forward(self, x):
        x = x.unsqueeze(1)  # Add channel dimension
        x = self.conv_layers(x)
        x = x.view(x.size(0), -1)  # Flatten
        x = self.fc_layers(x)
        return x



# 3. Training Loop

def train_model(model, train_loader, val_loader, num_epochs=20, learning_rate=0.001):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    model.to(device)

    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=learning_rate)

    train_losses = []
    val_losses = []

    for epoch in range(num_epochs):
        model.train()
        train_loss = 0
        for inputs, targets in train_loader:
            inputs, targets = inputs.to(device), targets.to(device)

            optimizer.zero_grad()
            outputs = model(inputs)
            loss = criterion(outputs, targets)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()

        val_loss = 0
        model.eval()
        with torch.no_grad():
            for inputs, targets in val_loader:
                inputs, targets = inputs.to(device), targets.to(device)
                outputs = model(inputs)
                loss = criterion(outputs, targets)
                val_loss += loss.item()

        train_losses.append(train_loss / len(train_loader))
        val_losses.append(val_loss / len(val_loader))

        print(f"Epoch {epoch+1}/{num_epochs}, Train Loss: {train_loss/len(train_loader):.4f}, Validation Loss: {val_loss/len(val_loader):.4f}")

    return train_losses, val_losses

# 4. Model Initialization and Execution
model = ECGClassifier()
train_losses, val_losses = train_model(model, train_loader, val_loader)

# Plot Training and Validation Loss
plt.figure(figsize=(10, 6))
plt.plot(train_losses, label='Train Loss')
plt.plot(val_losses, label='Validation Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Training and Validation Loss')
plt.legend()
plt.show()

# 5. Evaluation and Confusion Matrix
model.eval()
all_preds = []
all_labels = []
with torch.no_grad():
    for inputs, targets in val_loader:
        inputs = inputs.to(torch.device('cuda' if torch.cuda.is_available() else 'cpu'))
        outputs = model(inputs)
        preds = torch.argmax(outputs, dim=1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(targets.numpy())

# Generate Confusion Matrix
cm = confusion_matrix(all_labels, all_preds)
disp = ConfusionMatrixDisplay(confusion_matrix=cm, display_labels=range(5))

# Plot Confusion Matrix
disp.plot(cmap='Blues')
plt.title('Confusion Matrix')
plt.show()

# Save the Trained Model
torch.save(model.state_dict(), 'ecg_classifier.pth')


In [None]:
# Function to Detect Peaks and Calculate Intervals
def detect_peaks(ecg_signal, threshold=0.85):
    """Detect peaks in the ECG signal and return their indices."""
    peaks = []
    for i in range(1, len(ecg_signal) - 1):
        if ecg_signal[i] > threshold and ecg_signal[i] > ecg_signal[i - 1] and ecg_signal[i] > ecg_signal[i + 1]:
            peaks.append(i)
    return np.array(peaks)

def calculate_intervals(peaks):
    """Calculate intervals (e.g., RR intervals) between detected peaks."""
    return np.diff(peaks)

# Example Usage for Peak Detection and Interval Calculation
sample_ecg = data[0]  # Use the first ECG segment as an example
peaks = detect_peaks(sample_ecg, threshold=0.5)
intervals = calculate_intervals(peaks)

# Plot ECG signal with detected peaks
plt.figure(figsize=(12, 6))
plt.plot(sample_ecg, label='ECG Signal')
plt.scatter(peaks, sample_ecg[peaks], color='red', label='Detected Peaks')
plt.xlabel('Time (samples)')
plt.ylabel('Amplitude')
plt.title('ECG Signal with Detected Peaks')
plt.legend()
plt.show()

selected_values = intervals[intervals > 28]

# Hitung rata-rata
mean_value = np.mean(selected_values)

print("Selected Values:", selected_values)
print("Mean Value:", mean_value)

print("Detected Peaks Indices:", peaks)
print("Calculated Intervals (samples):", mean_value)