In [57]:
#imports
import os
import random
import shutil
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR
from torch.utils.data import DataLoader, Dataset, Subset
from torchvision import transforms
from PIL import Image
from sklearn.model_selection import KFold
import torch
from torchvision import transforms
from PIL import Image
from sklearn.model_selection import KFold
import numpy as np
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR
import os

In [58]:
# Define paths
data_dir = r'C:\Users\Lenovo\Documents\ChestXRay2017\chest_xray'
train_dir = os.path.join(data_dir, 'train')
test_dir = os.path.join(data_dir, 'test')
val_dir = os.path.join(data_dir, 'val')

# Define the same transform as used for training and validation
transform = transforms.Compose([
    transforms.Grayscale(num_output_channels=1),
    transforms.Resize((128, 128)),
    transforms.ToTensor()
])

# Define your custom dataset class if not already defined
class PneumoniaDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.image_paths = []
        self.labels = []
        for label, category in enumerate(['NORMAL', 'PNEUMONIA']):
            category_dir = os.path.join(root_dir, category)
            for image_name in os.listdir(category_dir):
                self.image_paths.append(os.path.join(category_dir, image_name))
                self.labels.append(label)

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        image_path = self.image_paths[idx]
        image = Image.open(image_path)
        if self.transform:
            image = self.transform(image)
        label = self.labels[idx]
        return image, label

# Define the path to the test dataset
test_dir = r'C:\Users\Lenovo\Documents\ChestXRay2017\chest_xray\test'

# Create datasets
train_dataset = PneumoniaDataset(train_dir, transform=transform)
val_dataset = PneumoniaDataset(val_dir, transform=transform)
test_dataset = PneumoniaDataset(test_dir, transform=transform)

# Create DataLoaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Create dataset
data_dir = r'C:\Users\Lenovo\Documents\ChestXRay2017\chest_xray\train'
dataset = PneumoniaDataset(data_dir, transform=transform)

In [59]:
# Simplified CNN with increased dropout rate
# Function to compute the flattened size after convolutions
def compute_flattened_size(model, input_size):
    with torch.no_grad():
        x = torch.zeros(1, *input_size)
        x = model.conv1(x)
        x = F.max_pool2d(x, 2)
        x = model.conv2(x)
        x = F.max_pool2d(model.conv2_drop(x), 2)
        return x.numel()
    
class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 16, kernel_size=5)
        self.conv2 = nn.Conv2d(16, 32, kernel_size=5)
        self.conv2_drop = nn.Dropout2d(p=0.6)  # Increased dropout rate
        self.flatten_size = compute_flattened_size(self, (1, 128, 128))
        self.fc1 = nn.Linear(self.flatten_size, 50)
        self.fc1_drop = nn.Dropout(p=0.6)  # Increased dropout rate
        self.fc2 = nn.Linear(50, 2)
        
    def forward(self, x):
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(x.size(0), -1)
        x = F.relu(self.fc1(x))
        x = self.fc1_drop(x)  # Apply dropout
        x = self.fc2(x)
        return x


In [60]:
# Define EarlyStopping class
class EarlyStopping:
    def __init__(self, patience=5, delta=0):
        self.patience = patience
        self.delta = delta
        self.best_score = None
        self.early_stop = False
        self.counter = 0

    def __call__(self, val_loss, model):
        score = -val_loss
        if self.best_score is None:
            self.best_score = score
        elif score < self.best_score + self.delta:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_score = score
            self.counter = 0

In [61]:
# Training function with integrated early stopping and model saving
def train_model(model, train_loader, val_loader, device, epochs=10, initial_learning_rate=0.01, patience=5, save_path='C:\\Users\\Lenovo\\Documents\\ChestXRay2017\\model_saves\\model.pth'):
    class_weights = torch.tensor([1.0, 2.0]).to(device)  # Adjust the weights as needed
    criterion = nn.CrossEntropyLoss(weight=class_weights)  # bias against false negatives
    optimizer = optim.Adam(model.parameters(), lr=initial_learning_rate, weight_decay=1e-5)  # Added L2 regularization)
    scheduler = StepLR(optimizer, step_size=4, gamma=0.2)  # Reduce learning rate by a factor of 0.2 every 4 epochs
    early_stopping = EarlyStopping(patience=patience)

    model.to(device)
    best_val_acc = 0
    for epoch in range(1, epochs + 1):
        model.train()
        train_loss = 0
        correct = 0
        total = 0

        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = data.to(device), target.to(device)
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()

            train_loss += loss.item()
            correct += output.argmax(dim=1).eq(target).sum().item()
            total += target.size(0)

            if batch_idx % 10 == 0:
                print(f'Train Epoch: {epoch} [{batch_idx * len(data)}/{len(train_loader.dataset)}]  Loss: {loss.item():.6f}')

        train_loss /= len(train_loader.dataset)
        train_acc = correct / total
        print(f'Epoch {epoch}: Train Loss: {train_loss:.4f}, Train Accuracy: {train_acc:.4f}')

        val_loss = 0
        correct = 0
        total = 0
        model.eval()
        with torch.no_grad():
            for data, target in val_loader:
                data, target = data.to(device), target.to(device)
                output = model(data)
                val_loss += criterion(output, target).item()
                correct += output.argmax(dim=1).eq(target).sum().item()
                total += target.size(0)

        val_loss /= len(val_loader.dataset)
        val_acc = correct / total
        print(f'Validation set: Average loss: {val_loss:.4f}, Accuracy: {correct}/{len(val_loader.dataset)} {val_acc:.4f}')

        if val_acc > best_val_acc:
            best_val_acc = val_acc
            # Ensure the directory exists
            os.makedirs(os.path.dirname(save_path), exist_ok=True)
            # Save the model
            torch.save(model.state_dict(), save_path)
            print(f'New best model saved to {save_path} with accuracy {val_acc:.4f}')
        
        early_stopping(val_loss, model)
        if early_stopping.early_stop:
            print("Early stopping")
            break

        scheduler.step()

    # Ensure the directory exists
    os.makedirs(os.path.dirname(save_path), exist_ok=True)
    

    print(f'Model saved to {save_path}')


In [62]:


# K-Fold Cross Validation function with model saving
def k_fold_cross_validation(model_class, dataset, k=3, epochs=10, learning_rate=0.001, patience=5, save_path='C:\\Users\\Lenovo\\Documents\\ChestXRay2017\\model_saves'):
    kf = KFold(n_splits=k, shuffle=True, random_state=42)
    
    # Store results for each fold
    fold_results = []
    
    for fold, (train_idx, val_idx) in enumerate(kf.split(dataset)):
        print(f'Fold {fold+1}/{k}')
        
        # Subset the dataset for training and validation
        train_subset = Subset(dataset, train_idx)
        val_subset = Subset(dataset, val_idx)
        
        train_loader = DataLoader(train_subset, batch_size=32, shuffle=True)
        val_loader = DataLoader(val_subset, batch_size=32, shuffle=False)
        
        # Initialize the model
        model = model_class()
        
        # Train the model
        fold_save_path = os.path.join(save_path, f'model_fold_{fold+1}_lr_0_001.pth')
        train_model(model, train_loader, val_loader, device, epochs=epochs, initial_learning_rate=learning_rate, patience=patience, save_path=fold_save_path)
        
        # Validate the model on the validation set
        val_loss = 0
        correct = 0
        total = 0
        class_weights = torch.tensor([1.0, 2.0]).to(device)  # Adjust the weights as needed
        criterion = nn.CrossEntropyLoss(weight=class_weights)
        model.eval()
        with torch.no_grad():
            for data, target in val_loader:
                data, target = data.to(device), target.to(device)
                output = model(data)
                val_loss += criterion(output, target).item()
                correct += output.argmax(dim=1).eq(target).sum().item()
                total += target.size(0)
        
        val_loss /= len(val_loader.dataset)
        val_acc = correct / total
        print(f'Validation set: Average loss: {val_loss:.4f}, Accuracy: {correct}/{len(val_loader.dataset)} ({val_acc:.4f}')
        
        fold_results.append((val_loss, val_acc))
    
    # Calculate the average results across all folds
    avg_val_loss = np.mean([result[0] for result in fold_results])
    avg_val_acc = np.mean([result[1] for result in fold_results])
    
    print(f'\nK-Fold Cross Validation Results: Average Loss: {avg_val_loss:.4f}, Average Accuracy: {avg_val_acc:.4f}')


In [None]:
# Example usage
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
k_fold_cross_validation(SimpleCNN, dataset, k=3, epochs=4, learning_rate=0.002, patience=2, save_path='C:\\Users\\Lenovo\\Documents\\ChestXRay2017\\model_saves')



In [None]:
#training

# Define the model
model = SimpleCNN().to(device)

# Load the saved model state
model_path = 'C:\\Users\\Lenovo\\Documents\\ChestXRay2017\\model_saves\\continued_model.pth'
model.load_state_dict(torch.load(model_path))

# Define the optimizer
optimizer = optim.Adam(model.parameters(), lr=0.001, weight_decay=1e-5)  # Continue with the learning rate you want

# Continue training the model
train_model(
    model, 
    train_loader, 
    val_loader, 
    device, 
    epochs=6,  # Continue training for more epochs
    initial_learning_rate=0.00001, 
    patience=2, 
    save_path='C:\\Users\\Lenovo\\Documents\\ChestXRay2017\\model_saves\\continued_model.pth'
)


In [63]:
from sklearn.model_selection import KFold
import numpy as np
import torch
from torch.utils.data import DataLoader, Subset

def k_fold_cross_validation_without_saving(model_class, dataset, model_path, k=5, epochs=10, learning_rate=0.001, patience=5):
    kf = KFold(n_splits=k, shuffle=True, random_state=42)
    
    best_fold = -1
    best_val_acc = 0
    fold_results = []

    for fold, (train_idx, val_idx) in enumerate(kf.split(dataset)):
        print(f'Fold {fold+1}/{k}')
        
        train_subset = Subset(dataset, train_idx)
        val_subset = Subset(dataset, val_idx)
        
        train_loader = DataLoader(train_subset, batch_size=32, shuffle=True)
        val_loader = DataLoader(val_subset, batch_size=32, shuffle=False)
        
        model = model_class().to(device)
        
        # Load pre-trained weights
        model.load_state_dict(torch.load(model_path))
        
        train_model(model, train_loader, val_loader, device, epochs=epochs, initial_learning_rate=learning_rate, patience=patience)
        
        val_loss = 0
        correct = 0
        total = 0
        criterion = nn.CrossEntropyLoss()
        model.eval()
        with torch.no_grad():
            for data, target in val_loader:
                data, target = data.to(device), target.to(device)
                output = model(data)
                val_loss += criterion(output, target).item()
                correct += output.argmax(dim=1).eq(target).sum().item()
                total += target.size(0)
        
        val_loss /= len(val_loader.dataset)
        val_acc = correct / total
        print(f'Validation set: Average loss: {val_loss:.4f}, Accuracy: {correct}/{len(val_loader.dataset)} ({val_acc:.0f}%)')
        
        fold_results.append((val_loss, val_acc))

        if val_acc > best_val_acc:
            best_val_acc = val_acc
            best_fold = fold + 1

    avg_val_loss = np.mean([result[0] for result in fold_results])
    avg_val_acc = np.mean([result[1] for result in fold_results])
    
    print(f'\nK-Fold Cross Validation Results: Average Loss: {avg_val_loss:.4f}, Average Accuracy: {avg_val_acc:.4f}')
    print(f'Best fold: {best_fold} with accuracy {best_val_acc:.4f}')


In [None]:
model_path = 'C:\\Users\\Lenovo\\Documents\\ChestXRay2017\\model_saves\\continued_model6.pth'

# Perform k-fold cross-validation with loaded model
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
k_fold_cross_validation_with_loaded_model(SimpleCNN, dataset, model_path, k=5, epochs=3, learning_rate=0.001, patience=1, save_path='C:\\Users\\Lenovo\\Documents\\ChestXRay2017\\model_saves')

In [64]:
def test_model(model, test_loader, device):
    model.eval()  # Set the model to evaluation mode
    test_loss = 0
    correct = 0
    total = 0
    criterion = nn.CrossEntropyLoss()  # Define the loss function

    with torch.no_grad():  # Disable gradient calculation
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += criterion(output, target).item()  # Sum up batch loss
            pred = output.argmax(dim=1, keepdim=True)  # Get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()
            total += target.size(0)

    test_loss /= len(test_loader.dataset)  # Average loss
    accuracy = 100. * correct / total  # Accuracy percentage

    print(f'\nTest set: Average loss: {test_loss:.4f}, Accuracy: {correct}/{total} {accuracy:.0f}\n')




In [65]:
# Load the saved model
model = SimpleCNN().to(device)
model.load_state_dict(torch.load('C:\\Users\\Lenovo\\Documents\\ChestXRay2017\\model_saves\\continued_model6.pth'))

# Test the model
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)  # Assuming 'test_dataset' is the test dataset
test_model(model, test_loader, device)


Test set: Average loss: 0.0474, Accuracy: 448/624 72



In [66]:
def test_model_with_counts(model, test_loader, device):
    model.eval()  # Set the model to evaluation mode
    test_loss = 0
    correct = 0
    total = 0
    false_positives = 0
    false_negatives = 0
    criterion = nn.CrossEntropyLoss()  # Define the loss function

    with torch.no_grad():  # Disable gradient calculation
        for data, target in test_loader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            test_loss += criterion(output, target).item()  # Sum up batch loss
            pred = output.argmax(dim=1, keepdim=True)  # Get the index of the max log-probability
            correct += pred.eq(target.view_as(pred)).sum().item()
            total += target.size(0)
            
            # Calculate false positives and false negatives
            false_positives += ((pred == 1) & (target.view_as(pred) == 0)).sum().item()
            false_negatives += ((pred == 0) & (target.view_as(pred) == 1)).sum().item()

    test_loss /= len(test_loader.dataset)  # Average loss
    accuracy = 100. * correct / total  # Accuracy percentage

    print(f'\nTest set: Average loss: {test_loss:.4f}, Accuracy: {correct}/{total} ({accuracy:.0f}%)\n')
    print(f'False Positives: {false_positives}')
    print(f'False Negatives: {false_negatives}')

    return false_positives, false_negatives, accuracy


In [67]:
# Load the saved model
model = SimpleCNN().to(device)
model.load_state_dict(torch.load('C:\\Users\\Lenovo\\Documents\\ChestXRay2017\\model_saves\\continued_model6.pth'))

# Test the model
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)  # Assuming 'test_dataset' is the test dataset
test_model_with_counts(model, test_loader, device)


Test set: Average loss: 0.0474, Accuracy: 448/624 (72%)

False Positives: 173
False Negatives: 3


(173, 3, 71.7948717948718)