<a href="https://colab.research.google.com/github/Hy-per-ion/ECG-Classification/blob/main/VIT.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from torchvision.models import vit_b_16
from einops import rearrange
from PIL import Image
import os

# FeedForward class
class FeedForward(nn.Module):
    def __init__(self, dim, hidden_dim, dropout=0.):
        super().__init__()
        self.Lin1 = nn.Linear(dim, hidden_dim)
        self.BN = nn.BatchNorm1d(hidden_dim)
        self.act = nn.GELU()
        self.drop = nn.Dropout(dropout)
        self.Lin2 = nn.Linear(hidden_dim, dim)

    def forward(self, x):
        x = self.Lin1(x)
        x = self.BN(x)  # Apply BatchNorm1d
        x = self.act(x)
        x = self.drop(x)
        x = self.Lin2(x)
        return x

# Custom Dataset for ECG images
class ECGImageDataset(Dataset):
    def __init__(self, data_dir, transform=None):
        self.data_dir = data_dir
        self.transform = transform
        self.classes = sorted(os.listdir(data_dir))
        self.image_paths = []
        self.labels = []

        for idx, class_name in enumerate(self.classes):
            class_dir = os.path.join(data_dir, class_name)
            for img_name in os.listdir(class_dir):
                self.image_paths.append(os.path.join(class_dir, img_name))
                self.labels.append(idx)

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        label = self.labels[idx]
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image, label

# ViT-based model with FeedForward block
class ViTBNFFN(nn.Module):
    def __init__(self, num_classes):
        super(ViTBNFFN, self).__init__()
        self.vit = vit_b_16(pretrained=True)

        # Extract input features of the original head
        in_features = self.vit.heads.head.in_features

        # Replace the original head with Identity
        self.vit.heads.head = nn.Identity()

        # Define FeedForward block and classifier
        self.feedforward = FeedForward(dim=in_features, hidden_dim=512, dropout=0.1)
        self.classifier = nn.Linear(in_features, num_classes)

    def forward(self, x):
        x = self.vit(x)  # Extract features
        x = self.feedforward(x)  # Pass through FeedForward block
        return self.classifier(x)  # Classify the final output

# Paths and parameters
train_dir = '/content/drive/MyDrive/ECG_Data/train'
test_dir = '/content/drive/MyDrive/ECG_Data/test'
batch_size = 32
learning_rate = 0.001
num_epochs = 10
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Transforms
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5]),
])

# Load datasets
train_dataset = ECGImageDataset(train_dir, transform=transform)
test_dataset = ECGImageDataset(test_dir, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, pin_memory=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, pin_memory=True)

# Initialize model, loss, optimizer, and scaler
model = ViTBNFFN(num_classes=len(train_dataset.classes)).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.AdamW(model.parameters(), lr=learning_rate)  # AdamW for better optimization
scaler = torch.cuda.amp.GradScaler()

# Training loop with mixed precision
def train_epoch(model, dataloader, optimizer, criterion, device, scaler):
    model.train()
    running_loss = 0.0
    for images, labels in dataloader:
        images, labels = images.to(device, non_blocking=True), labels.to(device, non_blocking=True)

        optimizer.zero_grad()
        with torch.cuda.amp.autocast():  # Mixed precision
            outputs = model(images)
            loss = criterion(outputs, labels)

        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        running_loss += loss.item()
    return running_loss / len(dataloader)

# Testing loop
def test_model(model, dataloader, device):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in dataloader:
            images, labels = images.to(device, non_blocking=True), labels.to(device, non_blocking=True)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    return correct / total

# Main training process
for epoch in range(num_epochs):
    train_loss = train_epoch(model, train_loader, optimizer, criterion, device, scaler)
    accuracy = test_model(model, test_loader, device)
    print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {train_loss:.4f}, Accuracy: {accuracy:.4f}")


Downloading: "https://download.pytorch.org/models/vit_b_16-c867db91.pth" to /root/.cache/torch/hub/checkpoints/vit_b_16-c867db91.pth
100%|██████████| 330M/330M [00:05<00:00, 57.9MB/s]
  scaler = torch.cuda.amp.GradScaler()
  with torch.cuda.amp.autocast():  # Mixed precision


Epoch 1/10, Loss: 0.9381, Accuracy: 0.2500
Epoch 2/10, Loss: 0.6024, Accuracy: 0.2500
Epoch 3/10, Loss: 0.4983, Accuracy: 0.4576
Epoch 4/10, Loss: 0.4544, Accuracy: 0.2500
Epoch 5/10, Loss: 0.3715, Accuracy: 0.2500
Epoch 6/10, Loss: 0.2858, Accuracy: 0.2902
Epoch 7/10, Loss: 0.2629, Accuracy: 0.2500


KeyboardInterrupt: 

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms
from torchvision.models import vit_b_16
from einops import rearrange
from PIL import Image
import os

# FeedForward class as provided
class FeedForward(nn.Module):
    def __init__(self, dim, hidden_dim, dropout=0.):
        super().__init__()
        self.Lin1 = nn.Linear(dim, hidden_dim)
        self.BN = nn.BatchNorm1d(hidden_dim)
        self.act = nn.GELU()
        self.drop = nn.Dropout(dropout)
        self.Lin2 = nn.Linear(hidden_dim, dim)

    def forward(self, x):
        x = self.Lin1(x)
        x = self.BN(x)  # Apply BatchNorm1d
        x = self.act(x)
        x = self.drop(x)
        x = self.Lin2(x)
        return x

# Custom Dataset for ECG images
class ECGImageDataset(Dataset):
    def __init__(self, data_dir, transform=None):
        self.data_dir = data_dir
        self.transform = transform
        self.classes = os.listdir(data_dir)
        self.image_paths = []
        self.labels = []

        for idx, class_name in enumerate(self.classes):
            class_dir = os.path.join(data_dir, class_name)
            for img_name in os.listdir(class_dir):
                self.image_paths.append(os.path.join(class_dir, img_name))
                self.labels.append(idx)

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        label = self.labels[idx]
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)
        return image, label

In [None]:
class ViTBNFFN(nn.Module):
    def __init__(self, num_classes):
        super(ViTBNFFN, self).__init__()
        self.vit = vit_b_16(pretrained=True)

        # Extract input features of the original head
        in_features = self.vit.heads.head.in_features

        # Replace the original head with Identity
        self.vit.heads.head = nn.Identity()

        # Define FeedForward block and classifier
        self.feedforward = FeedForward(dim=in_features, hidden_dim=512, dropout=0.1)
        self.classifier = nn.Linear(in_features, num_classes)

    def forward(self, x):
        x = self.vit(x)  # Extract features
        x = self.feedforward(x)  # Pass through FeedForward block
        return self.classifier(x)  # Classify the final output

In [None]:
# Paths and parameters
train_dir = '/content/drive/MyDrive/ECG_Data/train'
test_dir = '/content/drive/MyDrive/ECG_Data/test'
batch_size = 8
learning_rate = 0.001
num_epochs = 10
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Transforms
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5], std=[0.5]),
])

# Load datasets
train_dataset = ECGImageDataset(train_dir, transform=transform)
test_dataset = ECGImageDataset(test_dir, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4, pin_memory=True)
#train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Initialize model, loss, and optimizer
model = ViTBNFFN(num_classes=len(train_dataset.classes)).to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate)


In [None]:
# Initialize scaler for mixed precision training
scaler = torch.cuda.amp.GradScaler()

def train_epoch(model, dataloader, optimizer, criterion, device, scaler):
    model.train()
    running_loss = 0.0
    for images, labels in dataloader:
        images, labels = images.to(device), labels.to(device)

        optimizer.zero_grad()
        with torch.cuda.amp.autocast():  # Use mixed precision
            outputs = model(images)
            loss = criterion(outputs, labels)
        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()

        running_loss += loss.item()
    return running_loss / len(dataloader)

  scaler = torch.cuda.amp.GradScaler()


In [None]:
# Testing loop
def test_model(model, dataloader, device):
    model.eval()
    correct = 0
    total = 0
    with torch.no_grad():
        for images, labels in dataloader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    return correct / total

# Main training process
for epoch in range(num_epochs):
    train_loss = train_epoch(model, train_loader, optimizer, criterion, device, scaler)
    accuracy = test_model(model, test_loader, device)
    print(f"Epoch {epoch + 1}/{num_epochs}, Loss: {train_loss:.4f}, Accuracy: {accuracy:.4f}")

  with torch.cuda.amp.autocast():  # Use mixed precision


Epoch 1/10, Loss: 0.9928, Accuracy: 0.2500
Epoch 2/10, Loss: 0.7610, Accuracy: 0.2500
Epoch 3/10, Loss: 0.7120, Accuracy: 0.2500
Epoch 4/10, Loss: 0.6063, Accuracy: 0.2500
Epoch 5/10, Loss: 0.5245, Accuracy: 0.2500
Epoch 6/10, Loss: 0.4821, Accuracy: 0.2500
Epoch 7/10, Loss: 0.5112, Accuracy: 0.2500
Epoch 8/10, Loss: 0.4195, Accuracy: 0.2500


In [None]:
import torch
import torch.nn as nn
from torchvision import transforms, datasets
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
import os

KeyboardInterrupt: 

In [None]:
# Patch Embedding class for 224x224 input image
class PatchEmbedding(nn.Module):
    def __init__(self, img_size=224, patch_size=16, in_channels=1, embed_dim=64):
        super().__init__()
        self.img_size = img_size
        self.patch_size = patch_size
        self.n_patches = (img_size // patch_size) ** 2
        self.proj = nn.Conv2d(in_channels, embed_dim, kernel_size=patch_size, stride=patch_size)

    def forward(self, x):
        x = self.proj(x)
        x = x.flatten(2)
        x = x.transpose(1, 2)
        return x

In [None]:
# Vision Transformer class
class SimpleViT(nn.Module):
    def __init__(self, img_size=224, patch_size=16, in_channels=1, embed_dim=64, num_heads=4, num_layers=2, num_classes=4):
        super().__init__()
        self.patch_embed = PatchEmbedding(img_size, patch_size, in_channels, embed_dim)
        self.cls_token = nn.Parameter(torch.zeros(1, 1, embed_dim))
        self.pos_embed = nn.Parameter(torch.zeros(1, self.patch_embed.n_patches + 1, embed_dim))
        self.transformer = nn.TransformerEncoder(
            nn.TransformerEncoderLayer(d_model=embed_dim, nhead=num_heads, batch_first=True),
            num_layers=num_layers
        )
        self.fc = nn.Linear(embed_dim, num_classes)

    def forward(self, x):
        x = self.patch_embed(x)
        cls_tokens = self.cls_token.expand(x.shape[0], -1, -1)
        x = torch.cat((cls_tokens, x), dim=1)
        x = x + self.pos_embed
        x = self.transformer(x)
        x = x[:, 0]
        x = self.fc(x)
        return x

In [None]:
# Define transformations for ECG dataset
transform = transforms.Compose([
    transforms.Grayscale(num_output_channels=1),  # Ensure grayscale
    transforms.Resize((224, 224)),  # Resize to 224x224
    transforms.ToTensor(),
    transforms.Normalize((0.5,), (0.5,))  # Normalize to [-1, 1]
])

# Load ECG dataset
data_dir = "/content/drive/MyDrive/ECG_Data"  # Replace with your dataset path
train_dir = os.path.join(data_dir, "train")
test_dir = os.path.join(data_dir, "test")

train_dataset = datasets.ImageFolder(train_dir, transform=transform)
test_dataset = datasets.ImageFolder(test_dir, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=16, shuffle=False)

# Initialize model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = SimpleViT(img_size=224, patch_size=16, in_channels=1, embed_dim=64, num_classes=4).to(device)  # Adjust num_classes

# Define loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Training loop with stopping condition
epochs = 50  # Maximum number of epochs
stop_accuracy = 99.0  # Target accuracy to stop training
model.train()

In [None]:
for epoch in range(epochs):
    epoch_loss = 0
    correct = 0
    total = 0

    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        epoch_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

    accuracy = 100 * correct / total
    print(f"Epoch {epoch+1}/{epochs}, Loss: {epoch_loss:.4f}, Accuracy: {accuracy:.2f}%")

    # Stop if accuracy is greater than or equal to the threshold
    if accuracy >= stop_accuracy:
        print(f"Stopping training as accuracy reached {accuracy:.2f}% in epoch {epoch+1}.")
        break

# Evaluation on test dataset
model.eval()
test_correct = 0
test_total = 0
with torch.no_grad():
    for images, labels in test_loader:
        images, labels = images.to(device), labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs, 1)
        test_total += labels.size(0)
        test_correct += (predicted == labels).sum().item()

test_accuracy = 100 * test_correct / test_total
print(f"Test Accuracy: {test_accuracy:.2f}%")

# Visualize a sample image
def visualize_image(image_tensor, label, predicted_label=None):
    image_tensor = image_tensor.squeeze().cpu().numpy()
    plt.imshow(image_tensor, cmap='gray')
    title = f"True Label: {label}"
    if predicted_label is not None:
        title += f", Predicted: {predicted_label}"
    plt.title(title)
    plt.axis('off')
    plt.show()

# Visualize one sample
sample_image, sample_label = train_dataset[0]
sample_image = sample_image.unsqueeze(0).to(device)
with torch.no_grad():
    output = model(sample_image)
    _, predicted_label = torch.max(output, 1)

visualize_image(sample_image.cpu(), train_dataset.classes[sample_label], train_dataset.classes[predicted_label.item()])