In [3]:
import torch
import torchvision
import torchvision.transforms as transforms, datasets
from torch.utils.data import DataLoader, WeightedRandomSampler
from torchvision import datasets
import torch.nn as nn
import torch.optim as optim
from sklearn.metrics import confusion_matrix, precision_recall_fscore_support, roc_auc_score

In [4]:
# calculate mean and std for data transform
# Define a transform to convert the data into tensors
transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.ToTensor()
])

# train dataset
dataset = datasets.ImageFolder(root='chest_xray/chest_xray/train', transform=transform)
dataloader = DataLoader(dataset, batch_size=64, shuffle=True, num_workers=4)

# Function to calculate mean and std
def get_mean_std(loader):
    # Vars to accumulate the sum and sum of squares
    mean = 0.
    std = 0.
    total_images_count = 0

    for images, _ in loader:
        # Rearrange batch to be the shape of [B, C, W * H]
        images = images.view(images.size(0), images.size(1), -1)
        # Update total image count
        total_images_count += images.size(0)
        # Compute mean and std sum
        mean += images.mean(2).sum(0) 
        std += images.std(2).sum(0)

    # Final step
    mean /= total_images_count
    std /= total_images_count

    return mean, std

mean, std = get_mean_std(dataloader)
print(f'Mean: {mean}')
print(f'Std: {std}')


Mean: tensor([0.4823, 0.4823, 0.4823])
Std: tensor([0.2218, 0.2218, 0.2218])


In [6]:
# Set device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Data transforms
transform = transforms.Compose([
    transforms.Resize((256, 256)),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(mean=mean, std=std) # global mean and standard deviation of the RGB channels of the ImageNet dataset.
])

# Load data
train_set = torchvision.datasets.ImageFolder(root='chest_xray/chest_xray/train', transform=transform)
val_set = torchvision.datasets.ImageFolder(root='chest_xray/chest_xray/val', transform=transform)
test_set = torchvision.datasets.ImageFolder(root='chest_xray/chest_xray/test', transform=transform)

# Extract labels from the dataset
train_labels = [label for _, label in train_set.imgs]

# Count each class's samples to calculate weights
class_counts = torch.tensor(
    [(torch.tensor(train_labels) == t).sum() for t in torch.unique(torch.tensor(train_labels), sorted=True)]
)

# Calculate weight for each class
weights = 1. / class_counts.float()

# Assign a weight to each sample
sample_weights = torch.tensor([weights[label] for label in train_labels])

# Define the sampler with these sample weights
sampler = WeightedRandomSampler(weights=sample_weights, num_samples=len(sample_weights), replacement=True)


train_loader = DataLoader(train_set, batch_size=32, sampler=sampler)
val_loader = DataLoader(val_set, batch_size=32, shuffle=False)
test_loader = DataLoader(test_set, batch_size=32, shuffle=False)

In [3]:
# Define the CNN architecture
class PneumoniaNet(nn.Module):
    def __init__(self):
        super(PneumoniaNet, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, stride=1, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        self.dropout = nn.Dropout(0.25)
        self.fc1 = nn.Linear(128 * 32 * 32, 512)
        self.fc2 = nn.Linear(512, 2)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.pool(self.relu(self.conv1(x)))
        x = self.pool(self.relu(self.conv2(x)))
        x = self.pool(self.relu(self.conv3(x)))
        x = x.view(-1, 128 * 32 * 32)  # Flatten the output
        x = self.dropout(x)
        x = self.relu(self.fc1(x))
        x = self.fc2(x)
        return x

# Model
model = PneumoniaNet().to(device)

# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [4]:
# Training loop
num_epochs = 10
for epoch in range(num_epochs):
    for i, (images, labels) in enumerate(train_loader):
        images = images.to(device)
        labels = labels.to(device)

        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        if (i+1) % 100 == 0:
            print(f'Epoch [{epoch+1}/{num_epochs}], Step [{i+1}/{len(train_loader)}], Loss: {loss.item():.4f}')

torch.save(model, 'model_pneumonianet.pth')


Epoch [1/10], Step [100/163], Loss: 0.0527
Epoch [2/10], Step [100/163], Loss: 0.1684
Epoch [3/10], Step [100/163], Loss: 0.0447
Epoch [4/10], Step [100/163], Loss: 0.0776
Epoch [5/10], Step [100/163], Loss: 0.0088
Epoch [6/10], Step [100/163], Loss: 0.0502
Epoch [7/10], Step [100/163], Loss: 0.0024
Epoch [8/10], Step [100/163], Loss: 0.0036
Epoch [9/10], Step [100/163], Loss: 0.0009
Epoch [10/10], Step [100/163], Loss: 0.0189


In [5]:
def evaluate_model(model, data_loader):    
    # Evaluation
    model.eval()
    y_pred = []
    y_true = []
    y_proba = []
    with torch.no_grad():
        correct = 0
        total = 0
        for images, labels in test_loader:
            images = images.to(device)
            labels = labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            y_pred.extend(predicted.cpu().numpy())
            y_true.extend(labels.cpu().numpy())
            probabilities = torch.nn.functional.softmax(outputs, dim=1)
            y_proba.extend(probabilities[:, 1].cpu().numpy())
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
        # Calculate metrics
        accuracy = correct / total
        cm = confusion_matrix(y_true, y_pred)
        precision, recall, f1_score, _ = precision_recall_fscore_support(y_true, y_pred, average='binary')
        TN, FP, FN, TP = cm.ravel()
        specificity = TN / (TN + FP)
        npv = TN / (TN + FN)  # Negative Predictive Value
        auc = roc_auc_score(y_true, y_proba)

        return accuracy, cm, precision, recall, f1_score, specificity, npv, auc


Test Accuracy of the model on the test images: 75.16025641025641 %


In [None]:
# Call the evaluation function on train data
accuracy, conf_matrix, precision, recall, f1_score, specificity, npv, auc = evaluate_model(model, train_loader)
print("Metrics on the train images")
print(f"Accuracy: {100* accuracy:.4f}%")
print(f"Confusion Matrix:\n{conf_matrix}")
print(f"Precision: {100* precision:.4f}%")
print(f"Recall: {100* recall:.4f}%")
print(f"F1 Score: {100* f1_score:.4f}%")
print(f"Specificity: {100* specificity:.4f}%")
print(f"Negative Predictive Value: {100* npv:.4f}%")
print(f"AUC: {100* auc:.4f}%")

print("\n\n")

In [None]:
# Call the evaluation function on test data
accuracy, conf_matrix, precision, recall, f1_score, specificity, npv, auc = evaluate_model(model, test_loader)
print("Metrics on the train images")
print(f"Accuracy: {100* accuracy:.4f}%")
print(f"Confusion Matrix:\n{conf_matrix}")
print(f"Precision: {100* precision:.4f}%")
print(f"Recall: {100* recall:.4f}%")
print(f"F1 Score: {100* f1_score:.4f}%")
print(f"Specificity: {100* specificity:.4f}%")
print(f"Negative Predictive Value: {100* npv:.4f}%")
print(f"AUC: {100* auc:.4f}%")