In [1]:
import random
import torch
import numpy as np
from tqdm import trange
import matplotlib.pyplot as plt
from torchsummary import summary
from torchvision import io, transforms
DEVICE = 'cuda' if torch.cuda.is_available() else 'cpu'

In [2]:
IMAGE_HEIGHT, IMAGE_LENGTH = 28, 28

In [3]:
def read_data(dataset: list):
    """dataset: 'train', 'val', or 'test'. Read in and return list of image
    tensors (X) and list of int labels (y)"""
    label = {"normal": 0, "pneumonia": 1}
    upper = {"normal": "NORMAL", "pneumonia": "PNEUMONIA"}
    
    rows = []
    for clf in label.keys(): # for each label (normal and pneumonia)
        with open(f"chest_xray/{clf}_{dataset}.txt") as file:  # read in names 
            for line in file.readlines():
                image_name = line.strip()  # remove '\n' at end of string
                image = io.read_image(f"chest_xray/{dataset}/{upper[clf]}/{image_name}") # tensor
                # ex: "chest_xray/test/NORMAL/image_name.txt"
                if image.shape[0] == 1:  # some images have three dimensions
                    rows.append([image, label[clf]])

    random.shuffle(rows)
    images, labels = [e[0] for e in rows], [e[1] for e in rows]  # split into X and y
    return images, labels

In [4]:
X_train, y_train = read_data("train")
X_val, y_val = read_data("val")
X_test, y_test = read_data("test")

In [5]:
X_train_new = [transforms.Resize([IMAGE_HEIGHT, IMAGE_LENGTH])(X_train[i]) for i in range(len(X_train))]
X_train_new = torch.stack(X_train_new)

X_val_new = [transforms.Resize([IMAGE_HEIGHT, IMAGE_LENGTH])(X_val[i]) for i in range(len(X_val))]
X_val_new = torch.stack(X_val_new)

X_test_new = [transforms.Resize([IMAGE_HEIGHT, IMAGE_LENGTH])(X_test[i]) for i in range(len(X_test))]
X_test_new = torch.stack(X_test_new)

In [6]:
batch_size = 64

train_dataset = torch.utils.data.TensorDataset(X_train_new / 255, torch.tensor(y_train))
train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=False, num_workers=True)

test_dataset = torch.utils.data.TensorDataset(X_test_new / 255, torch.tensor(y_test))
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False, num_workers=True)

In [7]:
input_size = 1 * IMAGE_HEIGHT * IMAGE_LENGTH  # input spatial dimension of images
hidden_size = 256 #128         # width of hidden layer
output_size = 1          # number of output neurons

class PneumoniaClassifier(torch.nn.Module):
    
    def __init__(self):
        
        super().__init__()
        self.c1 = torch.nn.Conv2d(in_channels=1,out_channels=6,kernel_size=3,stride=1,padding=1)
        self.c2 = torch.nn.Conv2d(in_channels=6,out_channels=12,kernel_size=5,stride=3,padding=2)
        self.c3 = torch.nn.Conv2d(in_channels=12,out_channels=128,kernel_size=5,stride=1,padding=0)
        self.max_pool = torch.nn.AvgPool2d(kernel_size=2,stride=2)
        self.relu = torch.nn.ReLU()
        
        self.fc1 = torch.nn.Linear(in_features=128,out_features=64)
        self.fc2 = torch.nn.Linear(in_features=64,out_features=output_size)
        self.sigmoid = torch.nn.Sigmoid()
        
     
    def forward(self, x):
        x = self.c1(x)
        x = self.c2(x)
        x = self.relu(self.max_pool(x))
        x = self.relu(self.c3(x))
        x = torch.flatten(x,1)
        x = self.relu(self.fc1(x))
        y = self.fc2(x)
        y_output = self.sigmoid(y)
        return y_output.T[0]

model = PneumoniaClassifier().to(DEVICE)
summary(model, input_size=(1, IMAGE_HEIGHT, IMAGE_LENGTH))

----------------------------------------------------------------
        Layer (type)               Output Shape         Param #
            Conv2d-1            [-1, 6, 28, 28]              60
            Conv2d-2           [-1, 12, 10, 10]           1,812
         AvgPool2d-3             [-1, 12, 5, 5]               0
              ReLU-4             [-1, 12, 5, 5]               0
            Conv2d-5            [-1, 128, 1, 1]          38,528
              ReLU-6            [-1, 128, 1, 1]               0
            Linear-7                   [-1, 64]           8,256
              ReLU-8                   [-1, 64]               0
            Linear-9                    [-1, 1]              65
          Sigmoid-10                    [-1, 1]               0
Total params: 48,721
Trainable params: 48,721
Non-trainable params: 0
----------------------------------------------------------------
Input size (MB): 0.00
Forward/backward pass size (MB): 0.05
Params size (MB): 0.19
Estimated Tot

In [8]:
def train_one_epoch(train_loader, model, device, optimizer, log_interval, epoch):
    model.train()
    losses = []
    counter = []
    
    for i, (img, label) in enumerate(train_loader):
        img, label = img.float().to(device), label.float().to(device)
        optimizer.zero_grad()
        outputs = model(img)
        loss = torch.nn.BCELoss()(outputs, label)
        loss.backward()
        optimizer.step()
    
        # Record training loss every log_interval and keep counter of total training images seen
        if (i+1) % log_interval == 0:
            losses.append(loss.item())
            counter.append(
                (i * batch_size) + img.size(0) + epoch * len(train_loader.dataset))

    return losses, counter

In [9]:
def test_one_epoch(test_loader, model, device):
    model.eval()
    test_loss = 0
    num_correct = 0
    tp = 0
    fn = 0
    fp = 0
    
    with torch.no_grad():
        for i, (img, label) in enumerate(test_loader):
            output = model(img)
            pred = torch.round(output)  # round probability into binary classification
            tp += sum([1 for pred, true in zip(pred, label) if pred == true and true == 1])
            fn += sum([1 for pred, true in zip(pred, label) if pred != true and pred == 0])
            fp += sum([1 for pred, true in zip(pred, label) if pred != true and pred == 1])
            num_correct += pred.eq(label).sum().item()
            test_loss /= len(test_loader)
            
    test_loss /= len(test_loader.dataset)
    return test_loss, num_correct, tp, fn, fp

In [10]:
# Hyperparameters
lr = 0.05
max_epochs=25
gamma = 0.95

# Recording data
log_interval = 1

# Instantiate optimizer (model was created in previous cell)
optimizer = torch.optim.SGD(model.parameters(), lr=lr)

train_losses = []
train_counter = []
test_losses = []
test_correct = []
for epoch in trange(max_epochs, leave=True, desc='Epochs'):
    train_loss, counter = train_one_epoch(train_loader, model, DEVICE, optimizer, log_interval, epoch)
    test_loss, num_correct, tp, fn, fp = test_one_epoch(test_loader, model, DEVICE)

    # Record results
    train_losses.extend(train_loss)
    train_counter.extend(counter)
    test_losses.append(test_loss)
    test_correct.append(num_correct)


Epochs: 100%|██████████| 25/25 [01:23<00:00,  3.32s/it]


In [11]:
print(f"Precision: {tp/ (tp + fp)}")
print(f"Recall: {tp/ (tp + fn)}")
print(f"F1 Score: {tp/( tp + .5*(fp + fn))}")
print(f"Test accuracy: {test_correct[-1]/len(test_loader.dataset)}")

Precision: 0.7223264540337712
Recall: 0.9871794871794872
F1 Score: 0.8342361863488624
Test accuracy: 0.7548076923076923


In [12]:
def show_image_resize(rand_idx):
    """Show the image resize using global variables for height / weight"""
    plt.imshow(X_train[rand_idx][0])
    plt.show()
#    plt.imshow(transforms.Resize([IMAGE_HEIGHT, IMAGE_LENGTH])(X_train[rand_idx])[0])
    plt.imshow(transforms.Resize([28, 28])(X_train[rand_idx])[0])
    
#show_image_resize(74)