# NIH Chest X-ray using different swin architectures

### Importing libraries

In [6]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
# for dirname, _, filenames in os.walk('/kaggle/input'):
#     for filename in filenames:
#         print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [7]:
import torch
import torchmetrics

In [8]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

device(type='cuda')

### Creating the Dataset

In [10]:
label_map = {'Atelectasis': 0,
             'Cardiomegaly': 1,
             'Effusion': 2,
             'Infiltration': 3,
             'Mass': 4,
             'Nodule': 5,
             'Pneumonia': 6,
             'Pneumothorax': 7,
             'Consolidation': 8,
             'Edema': 9,
             'Emphysema': 10,
             'Fibrosis': 11,
             'Pleural_Thickening': 12,
             'Hernia': 13,
             'No Finding': 14}
len(label_map)

15

In [9]:
from torch.utils.data import Dataset
import pandas as pd
from PIL import Image
class NIH_Chest_Xray_Dataset(Dataset):
    def __init__(self, data_dir, train_test_file, transform=None):
        self.data_dir = data_dir
        self.transform = transform
        self.image_paths = []
        self.labels = []
        self.train_test_file = train_test_file
        train_test_img_list = [line.rstrip() for line in open(train_test_file)]
        # Read the CSV file with the image labels
        df = pd.read_csv(os.path.join(data_dir, 'Data_Entry_2017.csv'))

        # Loop through the images and labels, and store the paths and labels in lists  
        
        for i, row in df.iterrows():
            img_name = row['Image Index']
            if img_name in train_test_img_list:
                for i in range (1,13):
                #for i in range (1,2):    
                    img_path = os.path.join(data_dir,f'images_00{i}/images/',img_name)
                
                    if os.path.isfile(img_path):
                        self.image_paths.append(img_path)
                        label = row['Finding Labels'].split("|")
                        self.labels.append(label)
                        break

    def __len__(self):
        return len(self.image_paths)

    def __getitem__(self, idx):
        img_path = self.image_paths[idx]
        label = self.labels[idx]
        
        # Open the image file and apply the transforms (if any)
        img = Image.open(img_path).convert('RGB')
        if self.transform is not None:
            img = self.transform(img)

        num_labels = 15 # There are 14 possible labels in the dataset
        binary_label = torch.zeros(num_labels)
        
        for l in label:
            binary_label[label_map[l]] = 1

        return img, binary_label

In [11]:
from torchvision import datasets, transforms
data_dir = '/kaggle/input/data/'
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406],
                         std=[0.229, 0.224, 0.225])
])
train_file = os.path.join(data_dir, 'train_val_list.txt')
test_file = os.path.join(data_dir, 'test_list.txt')
train_dataset = NIH_Chest_Xray_Dataset(data_dir, train_test_file=train_file, transform=transform)
print(len(train_dataset))

67091


In [12]:
test_dataset = NIH_Chest_Xray_Dataset(data_dir, train_test_file=test_file, transform=transform)
print(len(test_dataset))

17908


In [13]:
from torch.utils.data import DataLoader
train_dataloader = DataLoader(train_dataset, batch_size=16, shuffle=True, num_workers=2)

test_dataloader = DataLoader(test_dataset, batch_size=16, shuffle=False, num_workers=2)

In [14]:
import torch.nn as nn
import torch.optim as optim

In [15]:
# from torchvision.models import swin_t
from torchvision.models import swin_b
# from torchvision.models import swin_s

### Run-1 with Swin tiny without weights

In [None]:
from torchvision.models import swin_t
# Use a pre-trained imagenet
#model = swin_t(weights='DEFAULT')
# Use no weights
model = swin_t()

# Update the fully connected layer based on the number of classes in the dataset
#model.fc = torch.nn.Linear(model.fc.in_features, len(ds_train.labels.info.class_names))
model.head = torch.nn.Linear(out_features=15,in_features=768)

model.to(device)

print(model)
# Specity the loss function and optimizer
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-4)

In [None]:
from tqdm import tqdm

#train
model.train()
num_epochs = 5
for epoch in range(num_epochs):
    
    running_loss = 0.0
    for images, labels in tqdm(train_dataloader):
        images = images.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        outputs = model(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * images.size(0)
    epoch_loss = running_loss / len(train_dataset)
    print('Epoch [{}/{}], Loss: {:.4f}'.format(epoch+1, num_epochs, epoch_loss))

In [None]:
#Evaluate
model.eval()
    
running_loss = 0.0
running_corrects = torch.zeros(15).to(device)
total_samples = 0

with torch.no_grad():
    for inputs, labels in tqdm(test_dataloader):
        inputs = inputs.to(device)
        labels = labels.to(device)
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        running_loss += loss.item() * inputs.size(0)
        predicted_labels = torch.sigmoid(outputs) > 0.5
        running_corrects += (predicted_labels == labels).sum(dim=0).float()
        total_samples += inputs.size(0)

epoch_loss = running_loss / total_samples
epoch_acc = running_corrects / total_samples

print(f'Validation Loss: {epoch_loss:.4f} | Validation Acc: {epoch_acc.tolist()}')

### Run-2: Swin tiny with imagenet weights

In [None]:
# Use a pre-trained imagenet
#model = swin_t(weights='DEFAULT')
# Use no weights
model_pre_1 = swin_t(weights='DEFAULT')

# Update the fully connected layer based on the number of classes in the dataset
#model.fc = torch.nn.Linear(model.fc.in_features, len(ds_train.labels.info.class_names))
model_pre_1.head = torch.nn.Linear(out_features=15,in_features=768)

model_pre_1.to(device)

print(model_pre_1)

# Specity the loss function and optimizer
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model_pre_1.parameters(), lr=1e-4)

In [None]:
from tqdm import tqdm

#train
model_pre_1.train()
num_epochs = 5
for epoch in range(num_epochs):
    
    running_loss = 0.0
    for images, labels in tqdm(train_dataloader):
        images = images.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        outputs = model_pre_1(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * images.size(0)
    epoch_loss = running_loss / len(train_dataset)
    print('Epoch [{}/{}], Loss: {:.4f}'.format(epoch+1, num_epochs, epoch_loss))

In [None]:
#Evaluate
model_pre_1.eval()
    
running_loss = 0.0
running_corrects = torch.zeros(15).to(device)
total_samples = 0

with torch.no_grad():
    for inputs, labels in tqdm(test_dataloader):
        inputs = inputs.to(device)
        labels = labels.to(device)
        outputs = model_pre_1(inputs)
        loss = criterion(outputs, labels)
        running_loss += loss.item() * inputs.size(0)
        predicted_labels = torch.sigmoid(outputs) > 0.5
        running_corrects += (predicted_labels == labels).sum(dim=0).float()
        total_samples += inputs.size(0)

epoch_loss = running_loss / total_samples
epoch_acc = running_corrects / total_samples

print(f'Validation Loss: {epoch_loss:.4f} | Validation Acc: {epoch_acc.tolist()}')

### Run-3: Swin base without weights

In [None]:
from torchvision.models import swin_b

In [None]:
# Use a pre-trained imagenet
#model = swin_t(weights='DEFAULT')
# Use no weights
model_base_1 = swin_b()

# Update the fully connected layer based on the number of classes in the dataset
#model.fc = torch.nn.Linear(model.fc.in_features, len(ds_train.labels.info.class_names))
model_base_1.head = torch.nn.Linear(out_features=15,in_features=1024)

model_base_1.to(device)

print(model_base_1)

In [None]:
criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model_base_1.parameters(), lr=1e-4)

In [None]:
from tqdm import tqdm

#train
model_base_1.train()
num_epochs = 5
for epoch in range(num_epochs):
    
    running_loss = 0.0
    for images, labels in tqdm(train_dataloader):
        images = images.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        outputs = model_base_1(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * images.size(0)
    epoch_loss = running_loss / len(train_dataset)
    print('Epoch [{}/{}], Loss: {:.4f}'.format(epoch+1, num_epochs, epoch_loss))

In [None]:
#Evaluate
model_base_1.eval()
    
running_loss = 0.0
running_corrects = torch.zeros(15).to(device)
total_samples = 0

with torch.no_grad():
    for inputs, labels in tqdm(test_dataloader):
        inputs = inputs.to(device)
        labels = labels.to(device)
        outputs = model_base_1(inputs)
        loss = criterion(outputs, labels)
        running_loss += loss.item() * inputs.size(0)
        predicted_labels = torch.sigmoid(outputs) > 0.5
        running_corrects += (predicted_labels == labels).sum(dim=0).float()
        total_samples += inputs.size(0)

epoch_loss = running_loss / total_samples
epoch_acc = running_corrects / total_samples

print(f'Validation Loss: {epoch_loss:.4f} | Validation Acc: {epoch_acc.tolist()}')

### Run-4: Swin base with imagenet weights

In [16]:
# Use a pre-trained imagenet
#model = swin_t(weights='DEFAULT')
# Use no weights
model_base_pre_1 = swin_b(weights='DEFAULT')

# Update the fully connected layer based on the number of classes in the dataset
#model.fc = torch.nn.Linear(model.fc.in_features, len(ds_train.labels.info.class_names))
model_base_pre_1.head = torch.nn.Linear(out_features=15,in_features=1024)

model_base_pre_1.to(device)

print(model_base_pre_1)

criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model_base_pre_1.parameters(), lr=1e-4)

SwinTransformer(
  (features): Sequential(
    (0): Sequential(
      (0): Conv2d(3, 128, kernel_size=(4, 4), stride=(4, 4))
      (1): Permute()
      (2): LayerNorm((128,), eps=1e-05, elementwise_affine=True)
    )
    (1): Sequential(
      (0): SwinTransformerBlock(
        (norm1): LayerNorm((128,), eps=1e-05, elementwise_affine=True)
        (attn): ShiftedWindowAttention(
          (qkv): Linear(in_features=128, out_features=384, bias=True)
          (proj): Linear(in_features=128, out_features=128, bias=True)
        )
        (stochastic_depth): StochasticDepth(p=0.0, mode=row)
        (norm2): LayerNorm((128,), eps=1e-05, elementwise_affine=True)
        (mlp): MLP(
          (0): Linear(in_features=128, out_features=512, bias=True)
          (1): GELU(approximate='none')
          (2): Dropout(p=0.0, inplace=False)
          (3): Linear(in_features=512, out_features=128, bias=True)
          (4): Dropout(p=0.0, inplace=False)
        )
      )
      (1): SwinTransformerBlock

In [17]:
from torchmetrics import AUROC

In [18]:
ml_auroc = AUROC(task="multilabel",num_labels=15, average="macro", thresholds=None)
roc_auc_score = []

In [2]:
from tqdm import tqdm

#train
model_base_pre_1.train()
num_epochs = 5
for epoch in range(num_epochs):
    
    running_loss = 0.0
    running_roc_auc = 0.0

    for images, labels in tqdm(train_dataloader):
        images = images.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        outputs = model_base_pre_1(images)
        loss = criterion(outputs, labels)
        roc_auc = ml_auroc(outputs, labels.to(dtype=torch.long))
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * images.size(0)  
        running_roc_auc += roc_auc

    epoch_loss = running_loss / len(train_dataset)
    epoch_roc_auc = running_roc_auc / len(train_dataset)
    roc_auc_score.append(epoch_roc_auc)

    print('Epoch [{}/{}], Loss: {:.4f}, ROC_AUC_Score: {:.4f}'.format(epoch+1, num_epochs, epoch_loss, epoch_roc_auc))

NameError: name 'model_base_pre_1' is not defined

In [None]:
#Evaluate
model_base_pre_1.eval()
    
running_loss = 0.0
running_corrects = torch.zeros(15).to(device)
total_samples = 0

with torch.no_grad():
    for inputs, labels in tqdm(test_dataloader):
        inputs = inputs.to(device)
        labels = labels.to(device)
        outputs = model_base_pre_1(inputs)
        loss = criterion(outputs, labels)
        running_loss += loss.item() * inputs.size(0)
        predicted_labels = torch.sigmoid(outputs) > 0.5
        running_corrects += (predicted_labels == labels).sum(dim=0).float()
        total_samples += inputs.size(0)

epoch_loss = running_loss / total_samples
epoch_acc = running_corrects / total_samples


print(f'Validation Loss: {epoch_loss:.4f} | Validation Acc: {epoch_acc.tolist()} | roc_auc_score: {}')

### Run-5: Swin small from scratch

In [None]:
# Use a pre-trained imagenet
#model = swin_t(weights='DEFAULT')
# Use no weights
model_small_1 = swin_s()

# Update the fully connected layer based on the number of classes in the dataset
#model.fc = torch.nn.Linear(model.fc.in_features, len(ds_train.labels.info.class_names))
model_small_1.head = torch.nn.Linear(out_features=15,in_features=768)

model_small_1.to(device)

print(model_small_1)

criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model_small_1.parameters(), lr=1e-4)

In [None]:
from tqdm import tqdm

#train
model_small_1.train()
num_epochs = 5
for epoch in range(num_epochs):
    
    running_loss = 0.0
    for images, labels in tqdm(train_dataloader):
        images = images.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        outputs = model_small_1(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * images.size(0)
    epoch_loss = running_loss / len(train_dataset)
    print('Epoch [{}/{}], Loss: {:.4f}'.format(epoch+1, num_epochs, epoch_loss))

In [None]:
#Evaluate
model_small_1.eval()
    
running_loss = 0.0
running_corrects = torch.zeros(15).to(device)
total_samples = 0

with torch.no_grad():
    for inputs, labels in tqdm(test_dataloader):
        inputs = inputs.to(device)
        labels = labels.to(device)
        outputs = model_small_1(inputs)
        loss = criterion(outputs, labels)
        running_loss += loss.item() * inputs.size(0)
        predicted_labels = torch.sigmoid(outputs) > 0.5
        running_corrects += (predicted_labels == labels).sum(dim=0).float()
        total_samples += inputs.size(0)

epoch_loss = running_loss / total_samples
epoch_acc = running_corrects / total_samples

print(f'Validation Loss: {epoch_loss:.4f} | Validation Acc: {epoch_acc.tolist()}')

### Run-6: Swin small with imagenet weights

In [None]:
# Use a pre-trained imagenet
#model = swin_t(weights='DEFAULT')
# Use no weights
model_small_2 = swin_s(weights='DEFAULT')

# Update the fully connected layer based on the number of classes in the dataset
#model.fc = torch.nn.Linear(model.fc.in_features, len(ds_train.labels.info.class_names))
model_small_2.head = torch.nn.Linear(out_features=15,in_features=768)

model_small_2.to(device)

print(model_small_2)

criterion = nn.BCEWithLogitsLoss()
optimizer = optim.Adam(model_small_2.parameters(), lr=1e-4)


In [None]:
from tqdm import tqdm

#train
model_small_2.train()
num_epochs = 5
for epoch in range(num_epochs):
    
    running_loss = 0.0
    for images, labels in tqdm(train_dataloader):
        images = images.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()
        outputs = model_small_2(images)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        running_loss += loss.item() * images.size(0)
    epoch_loss = running_loss / len(train_dataset)
    print('Epoch [{}/{}], Loss: {:.4f}'.format(epoch+1, num_epochs, epoch_loss))

In [None]:
#Evaluate
model_small_2.eval()
    
running_loss = 0.0
running_corrects = torch.zeros(15).to(device)
total_samples = 0

with torch.no_grad():
    for inputs, labels in tqdm(test_dataloader):
        inputs = inputs.to(device)
        labels = labels.to(device)
        outputs = model_small_2(inputs)
        loss = criterion(outputs, labels)
        running_loss += loss.item() * inputs.size(0)
        predicted_labels = torch.sigmoid(outputs) > 0.5
        running_corrects += (predicted_labels == labels).sum(dim=0).float()
        total_samples += inputs.size(0)

epoch_loss = running_loss / total_samples
epoch_acc = running_corrects / total_samples

print(f'Validation Loss: {epoch_loss:.4f} | Validation Acc: {epoch_acc.tolist()}')