In [2]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
from torch.utils.data import Dataset, DataLoader, random_split
# from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

# Check for CUDA
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')


Using device: cuda


In [25]:
from tqdm import tqdm

In [3]:
class PulsarDataset(Dataset):
    def __init__(self, data, labels, transform=None):
        self.data = data
        self.labels = labels
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        image = self.data[idx]
        label = self.labels[idx]

        if self.transform:
            image = self.transform(image)

        return image, label

pulsar_file_path = 'pulsar_final.npz'
non_pulsar_file_path = 'non_pulsar_final.npz'

In [4]:
pulsar_file_path = 'pulsar_final.npz'
non_pulsar_file_path = 'non_pulsar_final.npz'

with np.load(pulsar_file_path) as data:
    pulsar_images = data['images']
    pulsar_labels = data['labels']

with np.load(non_pulsar_file_path) as data:
    non_pulsar_images = data['images']
    non_pulsar_labels = data['labels']

In [5]:
augment_transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),
    transforms.RandomResizedCrop(32, scale=(0.8, 1.0)),
    transforms.ToTensor()
])

# No transformation for non-pulsar images
no_transform = transforms.Compose([
    transforms.ToPILImage(),
    transforms.ToTensor()
])

In [6]:
pulsar_dataset = PulsarDataset(pulsar_images, pulsar_labels, transform=augment_transform)
non_pulsar_dataset = PulsarDataset(non_pulsar_images, non_pulsar_labels, transform=no_transform)

In [7]:
num_pulsar = len(pulsar_images)
num_non_pulsar = len(non_pulsar_images)
augmentations_needed = num_non_pulsar - num_pulsar

In [8]:
augmented_pulsar_images = []
augmented_pulsar_labels = []

In [9]:
for _ in range(augmentations_needed // num_pulsar):
    for i in range(num_pulsar):
        augmented_image = augment_transform(pulsar_images[i])
        augmented_pulsar_images.append(augmented_image.numpy())
        augmented_pulsar_labels.append(1)

In [15]:
print("Pulsar: ",pulsar_images.shape)
print("Augmented Pulsar:",augmented_pulsar_images.shape)

Pulsar:  (1196, 32, 32, 3)
Augmented Pulsar: (88504, 32, 32, 3)


In [14]:
augmented_pulsar_images = np.transpose(augmented_pulsar_images, (0, 2, 3, 1))


In [16]:
augmented_pulsar_images = np.array(augmented_pulsar_images)
augmented_pulsar_labels = np.array(augmented_pulsar_labels)

# Combine original and augmented pulsar data
all_pulsar_images = np.concatenate((pulsar_images, augmented_pulsar_images), axis=0)
all_pulsar_labels = np.concatenate((pulsar_labels, augmented_pulsar_labels), axis=0)

In [17]:
# Combine all data
all_images = np.concatenate((all_pulsar_images, non_pulsar_images), axis=0)
all_labels = np.concatenate((all_pulsar_labels, non_pulsar_labels), axis=0)

In [19]:
# Create the final dataset
final_dataset = PulsarDataset(all_images, all_labels, transform=no_transform)

# Split the dataset into training and testing sets
train_size = int(0.8 * len(final_dataset))
test_size = len(final_dataset) - train_size
train_dataset, test_dataset = random_split(final_dataset, [train_size, test_size])

In [20]:
# Dataloaders
batch_size = 64
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

In [21]:
class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.fc1 = nn.Linear(64 * 8 * 8, 128)
        self.fc2 = nn.Linear(128, 1)

    def forward(self, x):
        x = torch.relu(self.conv1(x))
        x = torch.max_pool2d(x, 2)
        x = torch.relu(self.conv2(x))
        x = torch.max_pool2d(x, 2)
        x = x.view(-1, 64 * 8 * 8)
        x = torch.relu(self.fc1(x))
        x = torch.sigmoid(self.fc2(x))
        return x

In [22]:
model = SimpleCNN().to(device)
criterion = nn.BCELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [29]:
# Training loop
num_epochs = 10

for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for images, labels in tqdm(train_loader, desc=f"Epoch {epoch+1}/{num_epochs}"):
        images = images.float().to(device)
        labels = labels.float().view(-1, 1).to(device)

        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

    print(f'Epoch [{epoch + 1}/{num_epochs}], Loss: {running_loss/len(train_loader):.4f}')

print('Finished Training')

Epoch 1/10: 100%|██████████| 2247/2247 [00:36<00:00, 61.59it/s]


Epoch [1/10], Loss: 0.0475


Epoch 2/10: 100%|██████████| 2247/2247 [00:38<00:00, 59.10it/s]


Epoch [2/10], Loss: 0.0427


Epoch 3/10: 100%|██████████| 2247/2247 [00:34<00:00, 64.83it/s]


Epoch [3/10], Loss: 0.0502


Epoch 4/10: 100%|██████████| 2247/2247 [00:34<00:00, 65.46it/s]


Epoch [4/10], Loss: 0.0436


Epoch 5/10: 100%|██████████| 2247/2247 [00:35<00:00, 64.16it/s]


Epoch [5/10], Loss: 0.0416


Epoch 6/10: 100%|██████████| 2247/2247 [00:38<00:00, 57.94it/s]


Epoch [6/10], Loss: 0.0393


Epoch 7/10: 100%|██████████| 2247/2247 [00:42<00:00, 53.39it/s]


Epoch [7/10], Loss: 0.0366


Epoch 8/10: 100%|██████████| 2247/2247 [00:35<00:00, 62.93it/s]


Epoch [8/10], Loss: 0.0348


Epoch 9/10: 100%|██████████| 2247/2247 [00:35<00:00, 62.84it/s]


Epoch [9/10], Loss: 0.0292


Epoch 10/10: 100%|██████████| 2247/2247 [00:39<00:00, 56.63it/s]

Epoch [10/10], Loss: 0.0300
Finished Training





In [30]:
# Evaluation
model.eval()
all_preds = []
all_labels = []

with torch.no_grad():
    for images, labels in test_loader:
        images = images.float().to(device)
        labels = labels.float().view(-1, 1).to(device)
        outputs = model(images)
        preds = (outputs > 0.5).float()
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(labels.cpu().numpy())

# Assuming `all_preds` and `all_labels` are populated as shown in your code snippet

all_preds = np.array(all_preds)
all_labels = np.array(all_labels)

# Accuracy
correct_predictions = np.sum(all_preds == all_labels)
total_predictions = all_labels.size
accuracy = correct_predictions / total_predictions

# Precision, Recall, F1 Score
TP = np.sum((all_preds == 1) & (all_labels == 1))
FP = np.sum((all_preds == 1) & (all_labels == 0))
FN = np.sum((all_preds == 0) & (all_labels == 1))

precision = TP / (TP + FP) if (TP + FP) > 0 else 0
recall = TP / (TP + FN) if (TP + FN) > 0 else 0
f1_score = 2 * (precision * recall) / (precision + recall) if (precision + recall) > 0 else 0

print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")
print(f"F1 Score: {f1_score:.4f}")



# accuracy = accuracy_score(all_labels, all_preds)
# precision = precision_score(all_labels, all_preds)
# recall = recall_score(all_labels, all_preds)
# f1 = f1_score(all_labels, all_preds)

# print(f'Accuracy: {accuracy:.4f}')
# print(f'Precision: {precision:.4f}')
# print(f'Recall: {recall:.4f}')
# print(f'F1 Score: {f1:.4f}')

Accuracy: 0.9917
Precision: 0.9988
Recall: 0.9847
F1 Score: 0.9917


In [31]:
#save the model
torch.save(model.state_dict(), 'model.pth')