In [None]:
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "1,2"

In [None]:
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
import torchvision.models as models
from tqdm import tqdm
from sklearn.metrics import accuracy_score, roc_auc_score


# Custom dataset for loading .npy images
class NPYDataset(Dataset):
    def __init__(self, root_dir, transform=None):
        """
        Expects root_dir to have subfolders for each class.
        Example:
            dataset/train/class_a/
            dataset/train/class_b/
            dataset/train/class_c/
        """
        self.root_dir = root_dir
        self.transform = transform
        self.samples = []
        self.class_to_idx = {}
        # Get sorted class names for consistency
        classes = sorted([d for d in os.listdir(root_dir) if os.path.isdir(os.path.join(root_dir, d))])
        for idx, class_name in enumerate(classes):
            self.class_to_idx[class_name] = idx
            class_path = os.path.join(root_dir, class_name)
            for fname in os.listdir(class_path):
                if fname.endswith('.npy'):
                    self.samples.append((os.path.join(class_path, fname), idx))
                    
    def __len__(self):
        return len(self.samples)
    
    def __getitem__(self, index):
        path, label = self.samples[index]
        image = np.load(path)  # Load the image from .npy file
        # If the image is in HxWxC format, convert it to CxHxW.
        if image.ndim == 3 and image.shape[-1] in [1, 3]:
            image = image.transpose((2, 0, 1))
        image = torch.from_numpy(image).float()
        
        # If image has one channel, repeat it to create 3 channels.
        if image.shape[0] == 1:
            image = image.repeat(3, 1, 1)
            
        if self.transform:
            image = self.transform(image)
        return image, label


# Function to get a pretrained ResNet18 model
def get_resnet50_model(num_classes):
    model = models.resnet101(pretrained=True)
    in_features = model.fc.in_features
    model.fc = nn.Linear(in_features, num_classes)
    return model

# Training loop for both training and validation phases.
def train_model(model, dataloaders, criterion, optimizer, device, num_epochs=10):
    model.to(device)
    if torch.cuda.device_count() > 1:
        print(f"Using {torch.cuda.device_count()} GPUs!")
        model = nn.DataParallel(model)
    
    for epoch in tqdm(range(num_epochs)):
        print(f"Epoch {epoch+1}/{num_epochs}")
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()
            else:
                model.eval()
            running_loss = 0.0
            all_preds = []
            all_labels = []
            for inputs, labels in tqdm(dataloaders[phase]):
                inputs = inputs.to(device)
                labels = labels.to(device)
                optimizer.zero_grad()
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)
                    _, preds = torch.max(outputs, 1)
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()
                running_loss += loss.item() * inputs.size(0)
                all_preds.extend(preds.cpu().numpy())
                all_labels.extend(labels.cpu().numpy())
            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            epoch_acc = accuracy_score(all_labels, all_preds)
            print(f"{phase.capitalize()} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}")
        if epoch % 10 == 9:
            torch.save(model.state_dict(), os.path.join("path_to_weight", f"resnet101_epoch{epoch+1}.pth"))
            evaluate_model(model, dataloaders['val'], device, num_classes=3)
    return model

# Evaluation function: computes accuracy and ROC-AUC.
def evaluate_model(model, dataloader, device, num_classes):
    model.eval()
    all_preds = []
    all_labels = []
    all_probs = []
    with torch.no_grad():
        for inputs, labels in dataloader:
            inputs = inputs.to(device)
            labels = labels.to(device)
            outputs = model(inputs)
            probs = nn.functional.softmax(outputs, dim=1)
            _, preds = torch.max(outputs, 1)
            all_preds.extend(preds.cpu().numpy())
            all_labels.extend(labels.cpu().numpy())
            all_probs.extend(probs.cpu().numpy())
    acc = accuracy_score(all_labels, all_preds)
    # One-hot encode the labels for multi-class ROC-AUC
    one_hot_labels = np.eye(num_classes)[all_labels]
    try:
        roc_auc = roc_auc_score(one_hot_labels, all_probs, multi_class='ovr')
    except Exception as e:
        roc_auc = None
        print("ROC-AUC calculation error:", e)
    print(f"Validation Accuracy: {acc:.4f}, ROC-AUC: {roc_auc if roc_auc is not None else 0.0:.4f}")
    return acc, roc_auc

def main():
    # Set hyperparameters here (no argparse used)
    num_epochs = 40
    batch_size = 1024
    lr = 1e-3
    num_classes = 3  # Update if the number of classes changes
    
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    # Define normalization transform.
    transform = transforms.Compose([
        transforms.Normalize(mean=[0.485, 0.456, 0.406],
                             std=[0.229, 0.224, 0.225])
    ])
    
    # Create datasets for training and validation.
    train_dataset = NPYDataset("path_to_training_dataset", transform=transform)
    val_dataset = NPYDataset("path_to_testing_dataset", transform=transform)
    
    train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
    val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
    dataloaders = {"train": train_loader, "val": val_loader}
    
    # Load the ResNet18 model.
    model = get_resnet50_model(num_classes)
    
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)
    
    # Train the model.
    model = train_model(model, dataloaders, criterion, optimizer, device, num_epochs=num_epochs)
    
    # Evaluate the model on the validation set.
    evaluate_model(model, val_loader, device, num_classes)

if __name__ == "__main__":
    main()


In [None]:
import torch
import torch.nn.functional as F
import matplotlib.pyplot as plt
import numpy as np
from torchvision import transforms
import torchvision.models as models

# Define the transform (ensuring images are normalized appropriately)
transform = transforms.Compose([
    transforms.Lambda(lambda x: x.repeat(3, 1, 1) if x.shape[0] == 1 else x),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])

# Load the validation dataset
val_dataset = NPYDataset("path_to_testing_data", transform=transform)

# Load a pretrained ResNet18 model modified for your number of classes.
# Here we assume 3 classes.
num_classes = 3
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

def get_resnet18_model(num_classes):
    model = models.resnet18(pretrained=True)
    in_features = model.fc.in_features
    model.fc = torch.nn.Linear(in_features, num_classes)
    return model

model = get_resnet18_model(num_classes)
model.to(device)
model.eval()

# Optionally, load your trained weights:
# model.load_state_dict(torch.load("path_to_your_model_weights.pth"))

# Assuming the class names are the sorted keys of class_to_idx
class_names = sorted(val_dataset.class_to_idx, key=lambda x: val_dataset.class_to_idx[x])

def plot_predictions(model, dataset, device, class_names, num_images=5):
    model.eval()
    # Randomly select indices for visualization
    indices = np.random.choice(len(dataset), num_images, replace=False)
    fig, axes = plt.subplots(1, num_images, figsize=(15, 5))
    
    for ax, idx in zip(axes, indices):
        image, label = dataset[idx]
        input_tensor = image.unsqueeze(0).to(device)  
        
        # Get model predictions
        with torch.no_grad():
            output = model(input_tensor)
            probs = F.softmax(output, dim=1)
            pred = torch.argmax(probs, dim=1).item()
        
        # Convert the tensor image for plotting.
        # Undo normalization: image = (image * std) + mean
        image_np = image.cpu().numpy().transpose(1, 2, 0)
        mean = np.array([0.485, 0.456, 0.406])
        std = np.array([0.229, 0.224, 0.225])
        image_np = image_np * std + mean
        image_np = np.clip(image_np, 0, 1)
        
        ax.imshow(image_np)
        ax.set_title(f"Pred: {class_names[pred]}")
        ax.axis("off")
    
    plt.tight_layout()
    plt.show()

# Plot predictions for a few random validation images
plot_predictions(model, val_dataset, device, class_names, num_images=5)
