In [1]:
import torch
import torch.optim as optim
import torch.nn as nn
import torchvision.models as models
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import os
import pandas as pd
from PIL import Image
import numpy as np
import numpy as np
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score, classification_report, confusion_matrix


In [2]:
import os
import pandas as pd

# Modify the create_dataset function to include "UNKNOWN" category
def create_dataset(folder_path):
    my_list = []
    for category in ['NORMAL', 'PNEUMONIA', 'UNKNOWN', 'TUBERCULOSIS']:
        category_path = os.path.join(folder_path, category)
        for file_name in os.listdir(category_path):
            file_path = os.path.join(category_path, file_name)
            # Ensure we're only adding image files
            if os.path.isfile(file_path) and file_name.lower().endswith(('.png', '.jpg', '.jpeg', '.bmp')):
                my_list.append([file_path, category])
    return pd.DataFrame(my_list, columns=['file_path', 'label'])

# Dataset paths
dataset_dir = r"D:\source\chest_xray"
train_dir = os.path.join(dataset_dir, 'train')
val_dir = os.path.join(dataset_dir, 'val')
test_dir = os.path.join(dataset_dir, 'test')

# Create DataFrames for train, validation, and test datasets
train_df = create_dataset(train_dir)
val_df = create_dataset(val_dir)
test_df = create_dataset(test_dir)

# Convert labels to numeric: NORMAL -> 0, PNEUMONIA -> 1, UNKNOWN -> 2
train_df['label'] = train_df['label'].map({'NORMAL': 0, 'PNEUMONIA': 1, 'UNKNOWN': 2 , 'TUBERCULOSIS': 3})
val_df['label'] = val_df['label'].map({'NORMAL': 0, 'PNEUMONIA': 1, 'UNKNOWN': 2, 'TUBERCULOSIS': 3})
test_df['label'] = test_df['label'].map({'NORMAL': 0, 'PNEUMONIA': 1, 'UNKNOWN': 2, 'TUBERCULOSIS': 3})

# Print dataset sizes
print(f"Train set size: {len(train_df)}, Validation set size: {len(val_df)}, Test set size: {len(test_df)}")


Train set size: 7883, Validation set size: 33, Test set size: 1147


In [3]:
# Function to count categories in a given DataFrame
def count_categories(df, dataset_name):
    category_counts = df['label'].value_counts()
    print(f"{dataset_name} set:")
    print(f"  NORMAL: {category_counts.get(0, 0)}")  # NORMAL = 0
    print(f"  PNEUMONIA: {category_counts.get(1, 0)}")  # PNEUMONIA = 1
    print(f"  UNKNOWN: {category_counts.get(2, 0)}")  # UNKNOWN = 2 (added the UNKNOWN category)
    print(f"  TUBERCULOSIS: {category_counts.get(3, 0)}")  # TUBERCULOSIS = 3

# Count and display for train, validation, and test datasets
print("Image Counts per Category:")
count_categories(train_df, "Train")
count_categories(val_df, "Validation")
count_categories(test_df, "Test")


Image Counts per Category:
Train set:
  NORMAL: 1700
  PNEUMONIA: 3875
  UNKNOWN: 1348
  TUBERCULOSIS: 960
Validation set:
  NORMAL: 8
  PNEUMONIA: 8
  UNKNOWN: 9
  TUBERCULOSIS: 8
Test set:
  NORMAL: 234
  PNEUMONIA: 390
  UNKNOWN: 446
  TUBERCULOSIS: 77


In [4]:
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from PIL import Image
import pandas as pd

# Define Dataset class
class ImageDataset(Dataset):
    def __init__(self, dataframe, transform=None):
        self.dataframe = dataframe
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        img_path = self.dataframe.iloc[idx, 0]  # Image path
        label = self.dataframe.iloc[idx, 1]     # Image label (NORMAL, PNEUMONIA, UNKNOWN)
        img = Image.open(img_path).convert('RGB')  # Convert the image to RGB if not already

        if self.transform:
            img = self.transform(img)

        return img, label



train_transform_resnet = transforms.Compose([
    transforms.Resize((224, 224)),  # Resize to 224x224 for ResNet-18 input size
    transforms.Grayscale(num_output_channels=3),  # Convert grayscale to RGB (3 channels)
    transforms.RandomHorizontalFlip(),  # Random horizontal flip
    transforms.RandomRotation(5),  # Slight rotation between -5 and +5 degrees
    transforms.ToTensor(),  # Convert to tensor
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize for ResNet
])


val_transform_resnet = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.Grayscale(num_output_channels=3),  # Convert grayscale to RGB (3 channels)
    transforms.ToTensor(),  
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize for ResNet
])

# Assuming train_df and val_df are already created with 'file_path' and 'label' columns
train_dataset_resnet = ImageDataset(train_df, transform=train_transform_resnet)
val_dataset_resnet = ImageDataset(val_df, transform=val_transform_resnet)

# DataLoader - For training and validation datasets
batch_size = 32

train_loader_resnet = DataLoader(train_dataset_resnet, batch_size=batch_size, shuffle=True)
val_loader_resnet = DataLoader(val_dataset_resnet, batch_size=batch_size, shuffle=False)

# Optionally print dataset sizes
print(f"Training dataset size: {len(train_loader_resnet)}")
print(f"Validation dataset size: {len(val_loader_resnet)}")



Training dataset size: 247
Validation dataset size: 2


In [5]:
# Check for multi-GPU availability
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Device: {device}")
# Function to prepare model for multi-GPU usage
def prepare_model_for_multigpu(model):
    if torch.cuda.device_count() > 1:
        print(f"Using {torch.cuda.device_count()} GPUs!")
        model = nn.DataParallel(model)
    model = model.to(device)
    return model

Device: cuda


In [None]:


# Training function with history tracking and logging to text file
def train_model_with_history(model, train_loader, val_loader, criterion, optimizer, num_epochs=30, device='cuda'):
    # Ensure model is moved to the correct device (GPU/CPU)
    model.to(device)
    log_file='training_log.txt'
    if os.path.exists(log_file):
        os.remove(log_file)
    # Track training and validation losses and accuracies
    train_losses = []
    val_losses = []
    train_accuracies = []
    val_accuracies = []

    # Load model weights (if you're resuming from a pre-trained model)
    if os.path.exists('model/model_29.pt'):
        model.load_state_dict(torch.load('model/model_29.pt', map_location=device))
        print("Model weights loaded successfully.")
    else:
        print("No pre-trained weights found, starting from scratch.")
    
    model_dir = 'model'
    if not os.path.exists(model_dir):
        os.makedirs(model_dir)
    
    # Open log file for writing
    with open(log_file, 'w') as log:
        # Log header
        log.write("Epoch, Train Loss, Train Accuracy, Val Loss, Val Accuracy\n")

        # Training loop for the specified number of epochs
        for epoch in range(num_epochs):
            model.train()
            running_loss = 0.0
            correct = 0
            total = 0

            # Training loop
            for images, labels in train_loader:
                images, labels = images.to(device), labels.to(device)
                
                # Zero the parameter gradients
                optimizer.zero_grad()

                # Forward pass
                outputs = model(images)
                loss = criterion(outputs, labels)
                
                # Backward pass and optimize
                loss.backward()
                optimizer.step()

                # Track running loss and accuracy
                running_loss += loss.item()
                _, predicted = torch.max(outputs.data, 1)
                total += labels.size(0)
                correct += (predicted == labels).sum().item()

            # Calculate training loss and accuracy
            train_loss = running_loss / len(train_loader)
            train_accuracy = 100 * correct / total
            train_losses.append(train_loss)
            train_accuracies.append(train_accuracy)

            # Validation loop
            model.eval()
            val_loss = 0.0
            val_correct = 0
            val_total = 0
            
            with torch.no_grad():
                for images, labels in val_loader:
                    images, labels = images.to(device), labels.to(device)
                    outputs = model(images)
                    loss = criterion(outputs, labels)
                    val_loss += loss.item()
                    _, predicted = torch.max(outputs.data, 1)
                    val_total += labels.size(0)
                    val_correct += (predicted == labels).sum().item()

            # Calculate validation loss and accuracy
            val_loss = val_loss / len(val_loader)
            val_accuracy = 100 * val_correct / val_total
            val_losses.append(val_loss)
            val_accuracies.append(val_accuracy)

            # Print results for the epoch
            print(f"Epoch [{epoch+1}/{num_epochs}] "
                  f"Train Loss: {train_loss:.4f}, Train Acc: {train_accuracy:.2f}% | "
                  f"Val Loss: {val_loss:.4f}, Val Acc: {val_accuracy:.2f}%")

            # Log the results to the file
            with open(log_file, 'a') as log:
                log.write(f"{epoch+1}, {train_loss:.4f}, {train_accuracy:.2f}, {val_loss:.4f}, {val_accuracy:.2f}\n")

            # Save the model after every epoch (ensure the directory exists)
            torch.save(model.state_dict(), f'{model_dir}/model_{epoch}.pt')

    return train_losses, val_losses, train_accuracies, val_accuracies


class_weights = [1.5, 0.5, 1.0 ,2.5]  
# Convert class weights to a tensor
class_weights_tensor = torch.tensor(class_weights, dtype=torch.float32).to(device)

# Define the weighted CrossEntropy loss function
criterion = nn.CrossEntropyLoss(weight=class_weights_tensor)


In [14]:
import numpy as np
from sklearn.utils.class_weight import compute_class_weight

# Sample class distribution based on the dataset
class_labels = np.array([0, 1, 2 , 3])  # Example class labels
y_train = np.array([0]*1700 + [1]*3875 + [2]*1348 + [3]*960)  # Example training set distribution

# Calculate class weights using sklearn
class_weights = compute_class_weight('balanced', classes=class_labels, y=y_train)

print(f"Class weights: {dict(zip(class_labels, class_weights))}")


Class weights: {np.int64(0): np.float64(1.1592647058823529), np.int64(1): np.float64(0.5085806451612903), np.int64(2): np.float64(1.461980712166172), np.int64(3): np.float64(2.052864583333333)}


In [7]:
# Load pretrained ResNet-18 model
resnet18 = models.resnet18(pretrained=True)
resnet18.fc = nn.Linear(resnet18.fc.in_features, 4)  # Modify for binary classification
resnet18 = prepare_model_for_multigpu(resnet18)  # Apply multi-GPU




In [None]:
# Define optimizer for ResNet-18
optimizer_resnet = optim.Adam(resnet18.parameters(), lr=0.0001)

# Train the Pretrained ResNet-18 Model
print("Training Pretrained ResNet-18 Model...")
train_losses_resnet, val_losses_resnet, train_accuracies_resnet, val_accuracies_resnet = train_model_with_history(
    resnet18, train_loader_resnet, val_loader_resnet, criterion, optimizer_resnet, num_epochs=60
)
