In [1]:
!pip install torch torchvision torchaudio



In [2]:
import matplotlib.pyplot as plt
import numpy as np
import torch
import torchvision.transforms as transforms
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from htru1 import HTRU1
from torch.utils.data import WeightedRandomSampler
import random

In [3]:
# Set random seed for reproducibility
def set_seed(seed=42):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)
        torch.backends.cudnn.deterministic = True
        torch.backends.cudnn.benchmark = False

set_seed(42)

In [4]:
# Define the transformation to apply
transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(5),
    transforms.ColorJitter(brightness=0.2, contrast=0.2),
    transforms.RandomCrop(32, padding=4),
    transforms.ToTensor(),
    transforms.Normalize((0.5, 0.5, 0.5), (0.5, 0.5, 0.5))
])

In [5]:
# Load the training and test datasets
trainset = HTRU1(root='./data', train=True, download=True, transform=transform)
testset = HTRU1(root='./data', train=False, download=True, transform=transform)

Files already downloaded and verified
Files already downloaded and verified


In [6]:
# Calculate class counts and weights for sampling
class_counts = [sum(1 for label in trainset.targets if label == 0),
                sum(1 for label in trainset.targets if label == 1)]
class_weights = [1.0 / count for count in class_counts]
sample_weights = [class_weights[label] for label in trainset.targets]
sampler = WeightedRandomSampler(sample_weights, len(sample_weights))

# Create data loaders
trainloader = torch.utils.data.DataLoader(trainset, batch_size=64, sampler=sampler, num_workers=2)
testloader = torch.utils.data.DataLoader(testset, batch_size=64, shuffle=False, num_workers=2)

classes = ('nonpulsar', 'pulsar')

In [7]:
# Define the neural network with increased complexity
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        # Adding more layers and filters
        self.conv1 = nn.Conv2d(3, 16, 5)
        self.conv2 = nn.Conv2d(16, 32, 5)
        self.conv3 = nn.Conv2d(32, 64, 3)
        # Change kernel size to (2, 2) and stride to 1
        self.conv4 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=2, stride=1, padding=1)  # Added another convolutional layer
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1_input_size = None  # Placeholder to calculate dynamically
        self.fc1 = nn.Linear(128 * 3 * 3, 512)  # Adjusted size to accommodate new filters
        self.fc2 = nn.Linear(512, 256)
        self.fc3 = nn.Linear(256, 128)
        self.fc4 = nn.Linear(128, 2)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = self.pool(F.relu(self.conv3(x)))
        x = self.pool(F.relu(self.conv4(x)))  # Added new convolutional layer

        if self.fc1_input_size is None:
            self.fc1_input_size = x.view(x.size(0), -1).size(1)
            self.fc1 = nn.Linear(self.fc1_input_size, 512)
            print(f"Dynamically adjusted fc1 input size: {self.fc1_input_size}")

        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        x = self.fc4(x)
        return x

In [None]:
# Main training and evaluation code
if __name__ == '__main__':
    net = Net()
    device = torch.device('cpu')
    net.to(device)

    pulsar_weight = 1
    non_pulsar_weight = 1.5
    class_weights = torch.tensor([non_pulsar_weight, pulsar_weight]).to(device)
    criterion = nn.CrossEntropyLoss(weight=class_weights)
    optimizer = optim.Adam(net.parameters(), lr=0.001)

    # Learning Rate Scheduler: Reduce LR when validation loss plateaus
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=3, verbose=True)

    # Early stopping parameters
    best_val_loss = float('inf')
    patience = 8
    patience_counter = 0
    nepoch = 10  # Maximum epochs

    # Lists to track loss
    train_losses = []
    val_losses = []

    for epoch in range(nepoch):
        net.train()
        running_loss = 0.0

        for i, data in enumerate(trainloader, 0):
            inputs, labels = data
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = net(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()

        train_losses.append(running_loss / len(trainloader))

        # Validation step
        net.eval()
        val_loss = 0.0
        with torch.no_grad():
            for data in testloader:
                inputs, labels = data
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = net(inputs)
                loss = criterion(outputs, labels)
                val_loss += loss.item()

        val_loss /= len(testloader)
        val_losses.append(val_loss)
        print(f"Epoch {epoch + 1}, Train Loss: {train_losses[-1]:.4f}, Val Loss: {val_loss:.4f}")

        # Check for early stopping and save the model
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            patience_counter = 0
            torch.save(net.state_dict(), 'best_model.pth')  # Save best model
            print(f"Model saved at epoch {epoch + 1}")
        else:
            patience_counter += 1

        if patience_counter >= patience:
            print(f"Early stopping triggered at epoch {epoch + 1}")
            break

        # Step the scheduler based on validation loss
        scheduler.step(val_loss)

    print('Finished Training')

    # Plotting learning curves
    plt.plot(train_losses, label='Train Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()
    plt.title('Learning Curves')
    plt.show()

    # Load the best model for evaluation
    net.load_state_dict(torch.load('best_model.pth', weights_only=True))
    net.eval()  # Set to evaluation mode

    # Evaluation
    correct = 0
    total = 0
    class_correct = [0, 0]
    class_total = [0, 0]

    with torch.no_grad():
        for data in testloader:
            images, labels = data
            images, labels = images.to(device), labels.to(device)
            outputs = net(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

            for i in range(len(labels)):
                label = labels[i]
                class_correct[label] += (predicted[i] == label).item()
                class_total[label] += 1

    print(f'Accuracy of the network on the test images: {100 * correct / total:.2f}%')
    for i in range(2):
        print(f'Accuracy of {classes[i]}: {100 * class_correct[i] / class_total[i]:.2f}%')




Dynamically adjusted fc1 input size: 128
Epoch 1, Train Loss: 0.1817, Val Loss: 0.1770
Model saved at epoch 1
Epoch 2, Train Loss: 0.0868, Val Loss: 0.0737
Model saved at epoch 2
Epoch 3, Train Loss: 0.0629, Val Loss: 0.0484
Model saved at epoch 3
Epoch 4, Train Loss: 0.0516, Val Loss: 0.0770
Epoch 5, Train Loss: 0.0440, Val Loss: 0.0662
Epoch 6, Train Loss: 0.0379, Val Loss: 0.0521
Epoch 7, Train Loss: 0.0354, Val Loss: 0.0712
Epoch 8, Train Loss: 0.0266, Val Loss: 0.0494
Epoch 9, Train Loss: 0.0238, Val Loss: 0.0498
Epoch 10, Train Loss: 0.0230, Val Loss: 0.0533
Finished Training
