In [3]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from torchvision import datasets, transforms

import numpy as np
import matplotlib.pyplot as plt

import os

In [None]:
if torch.cuda.is_available():
    device = torch.device("cuda")
else:
    device = torch.device("cpu")

print("Using PyTorch version:", torch.__version__, " Device:", device)

In [None]:
import torch
from torchvision import datasets, transforms
from torch.utils.data import DataLoader, random_split

batch_size = 32

# Define custom transform
custom_transform = transforms.Compose([
    transforms.Grayscale(num_output_channels=1),  # Convert images to grayscale
    transforms.Resize((227, 227)),               # Resize images to 227x227
    transforms.ToTensor(),                       # Convert image to tensor and scale pixel values to [0, 1]
])

# Load the dataset using ImageFolder
dataset = datasets.ImageFolder(
    root="data/DatasetWave_1",  # Replace with the path to your dataset
    transform=custom_transform
)

# Define dataset size for splitting
train_ratio = 0.8  # 80% for training, 20% for validation
train_size = int(train_ratio * len(dataset))
val_size = len(dataset) - train_size

# Split dataset into training and validation sets
train_dataset, val_dataset = random_split(dataset, [train_size, val_size])

# Print dataset sizes
print(f"Total dataset size: {len(dataset)}")
print(f"Training dataset size: {len(train_dataset)}")
print(f"Validation dataset size: {len(val_dataset)}")

# Create DataLoaders for training and validation
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
validation_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

# Example: Iterate through the training DataLoader
for images, labels in train_loader:
    print(f"Train batch - Images: {images.shape}, Labels: {labels.shape}")
    break

# Example: Iterate through the validation DataLoader
for images, labels in validation_loader:
    print(f"Validation batch - Images: {images.shape}, Labels: {labels.shape}")
    break

In [None]:
from collections import Counter

class_counts = Counter(train_dataset.dataset.targets)
class_names = train_dataset.dataset.classes

for class_idx, count in class_counts.items():
    print(f"Class: {class_names[class_idx]}, Count: {count}")

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        # Input layer (flattening the 2D ECG signal into a 1D vector)
        self.fc1 = nn.Linear(1 * 227 * 227, 2048)  # Increased the number of units
        self.bn1 = nn.BatchNorm1d(2048)  # Batch normalization for better convergence
        self.fc1_drop = nn.Dropout(0.3)
        
        self.fc2 = nn.Linear(2048, 1024)  # Increased layer size for more capacity
        self.bn2 = nn.BatchNorm1d(1024)  # Batch normalization for better convergence
        self.fc2_drop = nn.Dropout(0.3)
        
        self.fc3 = nn.Linear(1024, 512)
        self.bn3 = nn.BatchNorm1d(512)  # Batch normalization
        self.fc3_drop = nn.Dropout(0.3)
        
        self.fc4 = nn.Linear(512, 17)  # Output layer for 17 classes

    def forward(self, x):
        x = x.view(x.size(0), -1)  # Flatten input dynamically
        
        x = F.relu(self.bn1(self.fc1(x)))  # Batch normalization + ReLU
        x = self.fc1_drop(x)
        
        x = F.relu(self.bn2(self.fc2(x)))  # Batch normalization + ReLU
        x = self.fc2_drop(x)
        
        x = F.relu(self.bn3(self.fc3(x)))  # Batch normalization + ReLU
        x = self.fc3_drop(x)
        
        return F.log_softmax(self.fc4(x), dim=1)

# Instantiate the model and move to the device
model = Net().to(device)

# Optimizer and loss function
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
criterion = nn.CrossEntropyLoss()

print(model)

In [8]:
def train(epoch, log_interval=200):
    model.train()
    for batch_idx, (data, target) in enumerate(train_loader):
        data = data.to(device)
        target = target.to(device)
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        if batch_idx % log_interval == 0:
            print(
                f"Train Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)} "
                f"({100.0 * batch_idx / len(train_loader):.0f}%)]\tLoss: {loss.item():.6f}"
            )

# Validate function
def validate(loss_vector, accuracy_vector):
    model.eval()
    val_loss = 0
    correct = 0
    with torch.no_grad():
        for data, target in validation_loader:
            data = data.to(device)
            target = target.to(device)
            output = model(data)
            val_loss += criterion(output, target).item()
            pred = output.argmax(dim=1, keepdim=True)  # Get index of the max probability
            correct += pred.eq(target.view_as(pred)).sum().item()

    val_loss /= len(validation_loader)
    accuracy = 100.0 * correct / len(validation_loader.dataset)
    loss_vector.append(val_loss)
    accuracy_vector.append(accuracy)

    print(
        f"\nValidation set: Average loss: {val_loss:.4f}, "
        f"Accuracy: {correct}/{len(validation_loader.dataset)} ({accuracy:.0f}%)\n"
    )


In [9]:
def validate(loss_vector, accuracy_vector):
    model.eval()
    val_loss, correct = 0, 0
    with torch.no_grad():  # Disable gradient calculations for validation
        for data, target in validation_loader:
            data = data.to(device)
            target = target.to(device)
            output = model(data)
            val_loss += criterion(output, target).item()  # Accumulate validation loss
            pred = output.argmax(dim=1, keepdim=True)  # Get predictions
            correct += pred.eq(target.view_as(pred)).sum().item()

    val_loss /= len(validation_loader)  # Average validation loss
    loss_vector.append(val_loss)

    accuracy = 100.0 * correct / len(validation_loader.dataset)  # Calculate accuracy
    accuracy_vector.append(accuracy)

    print(
        f"\nValidation set: Average loss: {val_loss:.4f}, "
        f"Accuracy: {correct}/{len(validation_loader.dataset)} ({accuracy:.2f}%)\n"
    )


In [None]:
lossv, accv = [], []
epochs = 15

for epoch in range(1, epochs + 1):
    print(f"Epoch {epoch}")
    train(epoch)
    validate(lossv, accv)


In [9]:
weights = [
    (name, param.detach().cpu().numpy()) for name, param in model.named_parameters()
]

In [None]:
weights

In [None]:
model.eval()

# For first BatchNorm layer
print("BatchNorm1 (mean):", model.bn1.running_mean)
print("BatchNorm1 (var):", model.bn1.running_var)

# For second BatchNorm layer
print("BatchNorm2 (mean):", model.bn2.running_mean)
print("BatchNorm2 (var):", model.bn2.running_var)

# For third BatchNorm layer
print("BatchNorm3 (mean):", model.bn3.running_mean)
print("BatchNorm3 (var):", model.bn3.running_var)

In [None]:
for i, w in enumerate(weights):
    print(w[0])

In [None]:
for w in weights:
    with open("../model/" + str(w[0]) + ".npy", "wb") as f:
        print("shape")

        if "bias" in w[0]:
            weights = np.expand_dims(w[1], axis=(0))
            print(weights.shape)

            weights = np.ascontiguousarray(weights)
            np.save(f, weights)
            
        else:
            weights = w[1].T
            print(weights.shape)

            weights = np.ascontiguousarray(weights)
            np.save(f, weights)

In [None]:
import torch
import numpy as np

# Assuming model is already defined and in evaluation mode
model.eval()

# For first BatchNorm layer
bn1_mean = model.bn1.running_mean.cpu().numpy()
bn1_var = model.bn1.running_var.cpu().numpy()

# For second BatchNorm layer
bn2_mean = model.bn2.running_mean.cpu().numpy()
bn2_var = model.bn2.running_var.cpu().numpy()

# For third BatchNorm layer
bn3_mean = model.bn3.running_mean.cpu().numpy()
bn3_var = model.bn3.running_var.cpu().numpy()

# Function to save the statistics
def save_stats(stats, filename):
    # Expand dimensions
    stats_expanded = np.expand_dims(stats, axis=0)
    print(f"Shape of {filename}: {stats_expanded.shape}")

    # Ensure the array is contiguous
    stats_contiguous = np.ascontiguousarray(stats_expanded)

    # Save to .npy file
    np.save(filename, stats_contiguous)

# Save the statistics
save_stats(bn1_mean, '../model/bn1_mean.npy')
save_stats(bn1_var, '../model/bn1_var.npy')
save_stats(bn2_mean, '../model/bn2_mean.npy')
save_stats(bn2_var, '../model/bn2_var.npy')
save_stats(bn3_mean, '../model/bn3_mean.npy')
save_stats(bn3_var, '../model/bn3_var.npy')

print("BatchNorm statistics saved as numpy files.")

In [None]:
def save_class_0_image(dataloader, filename):
    for images, labels in dataloader:
        for i, label in enumerate(labels):
            if label.item() == 0:  
                image = images[i]  
                image_np = image.view(-1, 227*227).cpu().numpy()  
                print(image_np.shape)
                np.save(filename, image_np)  
                print(f"Image from class 0 saved as {filename}")
                return  

# Save the image from the training DataLoader
save_class_0_image(validation_loader, 'val_class_0_image.npy')

In [None]:
filename = 'images_mlp/val_class_16_image.npy'  # Replace with the path to your .npy file
loaded_array = np.load(filename)

# Ensure the array is in the correct shape for plotting
# Assuming the array was originally reshaped to (1, 227*227)
if loaded_array.shape == (1, 227*227):
    image = loaded_array.reshape(227, 227)
elif loaded_array.shape == (227*227,):
    image = loaded_array.reshape(227, 227)
else:
    raise ValueError("Unexpected array shape")

# Plot the image using Matplotlib
plt.imshow(image, cmap='gray')  # Use 'gray' colormap for grayscale images
plt.title('Image from Class 16')
plt.axis('off')  # Hide the axis
plt.show()