In [8]:
import kagglehub

# Download latest version
path = kagglehub.dataset_download("shashwatwork/knee-osteoarthritis-dataset-with-severity")

print("Path to dataset files:", path)

Path to dataset files: /home/cpm6gh/.cache/kagglehub/datasets/shashwatwork/knee-osteoarthritis-dataset-with-severity/versions/1


In [9]:
import kagglehub
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset
from torchvision import transforms, models
from PIL import Image

In [10]:
path = "/home/cpm6gh/.cache/kagglehub/datasets/shashwatwork/knee-osteoarthritis-dataset-with-severity/versions/1"
train_dir = os.path.join(path, "train")
val_dir = os.path.join(path, "val")
test_dir = os.path.join(path, "test")

In [14]:
class KneeOADataset(Dataset):
    def __init__(self, root_dir, transform=None):
        self.root_dir = root_dir
        self.transform = transform
        self.image_paths = []
        self.labels = []
        
        for label in range(5):  # Labels: 0 to 4
            label_dir = os.path.join(root_dir, str(label))
            if os.path.exists(label_dir):
                for img_name in os.listdir(label_dir):
                    self.image_paths.append(os.path.join(label_dir, img_name))
                    self.labels.append(label)

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        image = Image.open(img_path).convert("RGB")
        label = self.labels[idx]
        
        if self.transform:
            image = self.transform(image)
        
        return image, label

In [16]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

In [17]:
train_dataset = KneeOADataset(train_dir, transform=transform)
val_dataset = KneeOADataset(val_dir, transform=transform)
test_dataset = KneeOADataset(test_dir, transform=transform)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [18]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = models.resnet18(pretrained=True)



In [19]:
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 5)
model = model.to(device)

In [20]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [22]:
epochs = 10
for epoch in range(epochs):
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0
    
    for images, labels in train_loader:
        images, labels = images.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        _, predicted = outputs.max(1)
        correct += predicted.eq(labels).sum().item()
        total += labels.size(0)
    
    print(f"Epoch {epoch+1}, Loss: {running_loss/len(train_loader):.4f}, Accuracy: {100 * correct/total:.2f}%")

# Save model
torch.save(model.state_dict(), "resnet_knee_oa.pth")

Epoch 1, Loss: 0.8960, Accuracy: 62.44%
Epoch 2, Loss: 0.8134, Accuracy: 66.25%
Epoch 3, Loss: 0.7610, Accuracy: 68.09%
Epoch 4, Loss: 0.7049, Accuracy: 70.08%
Epoch 5, Loss: 0.6284, Accuracy: 73.69%
Epoch 6, Loss: 0.5470, Accuracy: 76.93%
Epoch 7, Loss: 0.4836, Accuracy: 80.24%
Epoch 8, Loss: 0.4364, Accuracy: 82.21%
Epoch 9, Loss: 0.3283, Accuracy: 86.50%
Epoch 10, Loss: 0.2975, Accuracy: 88.18%


### My Implementation

In [4]:
# import packages
import kagglehub
import os
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset,random_split
from torchvision import transforms, models, datasets
from torch import Generator
from PIL import Image

In [None]:
# setup data directories
train_dir = "/datasets/train"
val_dir = "/datasets/val"
test_dir = "/datasets/test"

In [None]:
# set random seed for reproducibility
def setup_seed(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    torch.backends.cudnn.deterministic = True

SEED = 42
setup_seed(SEED)

In [None]:
# check for gpu
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

In [None]:
# data transforms
data_transforms = {
    'train': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.RandomHorizontalFlip(),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'val': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ]),
    'test': transforms.Compose([
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
    ])
}

In [None]:
data_dir = "dataset/"
dataset_all = datasets.ImageFolder(data_dir, transform=None)
size_all = len(dataset_all)
print(f'Full dataset size: {size_all}')

In [None]:
# split into train and test sets
TEST_RATIO = 0.2
VAL_RATIO = 0.2
size_test = int(size_all * TEST_RATIO)
size_train_val = size_all - size_test
dataset_train_val, dataset_test = random_split(
    dataset_all, 
    [size_train_val, size_test], 
    generator=Generator().manual_seed(SEED)
)

In [None]:
# further split training set into train and validation
size_val = int(size_train_val * VAL_RATIO)
size_train = size_train_val - size_val
dataset_train, dataset_val = random_split(
    dataset_train_val, 
    [size_train, size_val], 
    generator=Generator().manual_seed(SEED)
)

In [None]:
# dataset split sizes
print(f'Training set size: {len(dataset_train)}')
print(f'Validation set size: {len(dataset_val)}')
print(f'Test set size: {len(dataset_test)}')

In [None]:
# create a class to apply transformations to our subset datasets
class TransformedSubset:
    def __init__(self, subset, transform):
        self.subset = subset
        self.transform = transform
    
    def __len__(self):
        return len(self.subset)
    
    def __getitem__(self, idx):
        x, y = self.subset[idx]
        if self.transform:
            # convert tensor back to PIL if needed
            if isinstance(x, torch.Tensor):
                from torchvision.transforms.functional import to_pil_image
                x = to_pil_image(x)
            x = self.transform(x)
        return x, y

In [None]:
train_dataset = TransformedSubset(dataset_train, data_transforms['train'])
val_dataset = TransformedSubset(dataset_val, data_transforms['val'])
test_dataset = TransformedSubset(dataset_test, data_transforms['test'])

In [None]:
# create data loaders
BATCH_SIZE = 32  # smaller batch size for more modest gpus
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

dataloaders = {
    'train': train_loader,
    'val': val_loader,
    'test': test_loader
}

# class names and number of classes
class_names = dataset_all.classes
num_classes = len(class_names)
print(f'Number of classes: {num_classes}' + '\n')
print('Class Names:')
for classes in class_names:
  print(classes)

In [None]:
# model 3: vgg11 with transfer learning
def create_vgg11_model():
    model = models.vgg11_bn(weights='IMAGENET1K_V1')
    
    # freeze early layers
    for param in model.features.parameters():
        param.requires_grad = False
    
    # replace classifier
    model.classifier[6] = nn.Linear(4096, num_classes)
    
    return model.to(device)

In [None]:
# training function
def train_model(model, criterion, optimizer, scheduler, num_epochs=5):
    start_time = time.time()
    
    # initialize variables to track best model
    best_model_wts = model.state_dict()
    best_acc = 0.0
    
    # track training history
    history = {
        'train_loss': [],
        'train_acc': [],
        'val_loss': [],
        'val_acc': []
    }
    
    for epoch in range(num_epochs):
        print(f'Epoch {epoch+1}/{num_epochs}')
        print('-' * 10)
        
        # each epoch has training and validation phase
        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()
            else:
                model.eval()
            
            running_loss = 0.0
            running_corrects = 0
            
            # iterate over batches
            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)
                
                # zero gradients
                optimizer.zero_grad()
                
                # forward pass
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    loss = criterion(outputs, labels)
                    _, preds = torch.max(outputs, 1)
                    
                    # backward pass + optimize only in training
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()
                
                # statistics
                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)
            
            # calculate epoch statistics
            epoch_loss = running_loss / len(dataloaders[phase].dataset)
            epoch_acc = running_corrects.double() / len(dataloaders[phase].dataset)
            
            # store in history
            if phase == 'train':
                history['train_loss'].append(epoch_loss)
                history['train_acc'].append(epoch_acc.item())
            else:
                history['val_loss'].append(epoch_loss)
                history['val_acc'].append(epoch_acc.item())
            
            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')
            
            # save best model
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = model.state_dict().copy()
        
        # update learning rate
        if scheduler is not None:
            scheduler.step()
        
        print()
    
    # training completed
    time_elapsed = time.time() - start_time
    print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    print(f'Best val Acc: {best_acc:.4f}')
    
    # load the best model weights
    model.load_state_dict(best_model_wts)
    return model, history

In [None]:
# test model function
def test_model(model, test_loader):
    model.eval()
    running_corrects = 0
    
    with torch.no_grad():
        for inputs, labels in test_loader:
            inputs = inputs.to(device)
            labels = labels.to(device)
            
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            
            running_corrects += torch.sum(preds == labels.data)
    
    acc = running_corrects.double() / len(test_loader.dataset)
    print(f'Test Accuracy: {acc:.4f}')
    return acc.item()

In [None]:
# plot training history
def plot_history(history, title):
    plt.figure(figsize=(12, 4))
    
    # plot accuracy
    plt.subplot(1, 2, 1)
    plt.plot(history['train_acc'], label='Train')
    plt.plot(history['val_acc'], label='Validation')
    plt.title(f'{title} - Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    
    # plot loss
    plt.subplot(1, 2, 2)
    plt.plot(history['train_loss'], label='Train')
    plt.plot(history['val_loss'], label='Validation')
    plt.title(f'{title} - Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    
    plt.tight_layout()
    plt.show()

In [None]:
# train and evaluate vgg11
print("\nTraining VGG11...")
vgg11_model = create_vgg11_model()
    
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(filter(lambda p: p.requires_grad, vgg11_model.parameters()), lr=0.001)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

vgg11_model, vgg11_history = train_model(vgg11_model, criterion, optimizer, scheduler, num_epochs=10)
    
# plot results
plot_history(vgg11_history, "VGG11")

# test final model
vgg11_acc = test_model(vgg11_model, dataloaders['test'])