imports

In [1]:
import torch
import torch.nn as nn
from torchvision import models
from torch.utils.data import Dataset, DataLoader
from torchvision import datasets, transforms
from sklearn import metrics
from sklearn.model_selection import train_test_split
import matplotlib.pyplot as plt
from PIL import Image
import pandas as pd
import numpy as np
import os

paths and globals definitions

In [2]:
"""run this if you are working from google colab!!"""
# from google.colab import drive
# drive.mount('/content/gdrive')
# code_dir = "/content/gdrive/MyDrive/neural network project"

'run this if you are working from google colab!!'

In [3]:
"""run this if you are working from your local computer!!"""
code_dir = os.getcwd()

In [4]:
data_dir = os.path.join(code_dir, "Data")
class_file_path = os.path.join(code_dir, "train_data.csv")

PNEUMOTHORAX = 1
NOT_PNEUMOTHORAX = 0
classes = {PNEUMOTHORAX, NOT_PNEUMOTHORAX}
num_classes = len(classes)

classification_file = pd.read_csv(class_file_path, header=0)

dataset definition

In [5]:
class PneumDataset(Dataset):
    def __init__(self, data_path, images_paths, labels, transform):
        # init custom dataset attributes
        self.data_path = data_path
        self.X_images = images_paths
        self.y_labels = labels
        self.transform = transform
        self.num_of_images = len(self.X_images)

    def __len__(self):
        # len of the dataset
        return self.num_of_images


    def __getitem__(self, index):
        # get specific sample by index
        img_path = self.X_images[index]
        label = self.y_labels[index]
        
        # opening the image
        path = os.path.join(self.data_path, img_path)
        image = Image.open(path)
        image = image.convert("RGB")
        
        # converting x,y to tensors
        image = self.transform(image)
        label = torch.tensor(label, dtype=torch.long)

        return {"image": image, "label" : label}

model definition

In [6]:
class PneumNet(nn.Module):
    def __init__(self, num_classes):
        super(PneumNet, self).__init__()

        self.model = models.resnet50(pretrained=True)
        self.fc1 = nn.Linear(2048, 2048)
        self.fc2 = nn.Linear(2048, num_classes)
        self.dropout = nn.Dropout(0.3)

    def forward(self, x):
        x = self.model.conv1(x)
        x = self.model.bn1(x)
        x = self.model.relu(x)
        x = self.model.maxpool(x)

        x = self.model.layer1(x)
        x = self.model.layer2(x)
        x = self.model.layer3(x)
        x = self.model.layer4(x)
        x = self.model.avgpool(x)

        x = x.view(x.size(0), -1)
        x = nn.functional.relu(self.fc1(x))
        x = self.dropout(x)
        x = nn.functional.softmax(self.fc2(x), dim=1)

        return x

loss, optimizer and key parameters definition 
this are the parameters to play with

In [7]:
# Hyper-parameters
epochs = 1
learning_rate = 0.01
batch_size = 20
num_workers = 1

# device
device = "cpu"
if (torch.cuda.is_available()):
    device = "cuda"
print(f"Working on device: {device}")

# transform
transform = transforms.Compose([transforms.Resize(255),
                                transforms.CenterCrop(224),
                                transforms.ToTensor()])

# model
model = PneumNet(num_classes).to(device)

# loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.SGD(model.parameters(), lr=learning_rate)

Working on device: cuda


loading and splitting the data

In [8]:
def imshow(img):
    np_img = img.numpy()
    plt.imshow(np.transpose(np_img, (1, 2, 0)))
    plt.show()

images_paths = np.asarray(classification_file["file_name"])
images_targets = np.asarray(classification_file["target"])

# split to train and test
X_train, X_test, y_train, y_test = train_test_split(images_paths, images_targets, stratify=images_targets, random_state = 50)

# create dataloaders
train_dataset = PneumDataset(data_dir, X_train, y_train, transform)
train_loader = DataLoader(train_dataset, batch_size=batch_size,
                          shuffle=True, num_workers=num_workers)

test_dataset = PneumDataset(data_dir, X_test, y_test, transform)
test_loader = DataLoader(test_dataset, batch_size=batch_size,
                         shuffle=False, num_workers=num_workers)

train function

In [9]:
def train(data_loader, model, optimizer, device, epochs):
    model.train()

    for epoch in range(epochs):
        for i, data in enumerate(data_loader):
            images = data["image"]
            labels = data["label"]

            images = images.to(device, dtype=torch.float)
            labels = labels.to(device, dtype=torch.float)

            # Forward pass
            predict = model(images)
            loss = criterion(predict, labels)

            # Backward and optimize
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            print (f'Epoch [{epoch+1}/{epochs}], Step [{i+1}], Loss: {loss.item():.4f}')


print("starting training")
train(train_loader, model, optimizer, device, epochs)

starting training


test function

In [None]:
def test(data_loader, model, device):
    model.eval()
    # init lists to store targets and outputs
    original_labels = []
    predict_labels = []
    # no_grad context
    with torch.no_grad():
        for data in data_loader:
            images = data["image"]
            labels = data["label"]

            images = images.to(device, dtype=torch.float)
            labels = labels.to(device, dtype=torch.float)

            # predict
            output = model(images)
            print(output)
            # convert targets and outputs to lists
            labels = labels.detach().cpu().numpy().tolist()
            output = output.detach().cpu().numpy().tolist()
            
            # extend the original list
            original_labels.extend(labels)
            predict_labels.extend(output)
            
    return original_labels, predict_labels

original_labels, predict_labels = test(test_loader, model, device=device)
# print(original_labels)
# print(predict_labels)

# roc_auc = metrics.roc_auc_score(original_labels, predict_labels)

tensor([[0.1464, 0.8536],
        [0.1981, 0.8019],
        [0.2212, 0.7788],
        [0.1856, 0.8144],
        [0.2053, 0.7947],
        [0.1389, 0.8611],
        [0.2150, 0.7850],
        [0.1690, 0.8310],
        [0.2030, 0.7970],
        [0.1462, 0.8538],
        [0.1982, 0.8018],
        [0.1684, 0.8316],
        [0.2124, 0.7876],
        [0.1753, 0.8247],
        [0.2245, 0.7755],
        [0.1470, 0.8530],
        [0.2299, 0.7701],
        [0.2156, 0.7844],
        [0.1925, 0.8075],
        [0.1508, 0.8492]], device='cuda:0')
tensor([[0.2087, 0.7913],
        [0.2207, 0.7793],
        [0.1598, 0.8402],
        [0.1549, 0.8451],
        [0.1951, 0.8049],
        [0.1864, 0.8136],
        [0.2332, 0.7668],
        [0.2619, 0.7381],
        [0.2091, 0.7909],
        [0.2477, 0.7523],
        [0.2235, 0.7765],
        [0.1770, 0.8230],
        [0.2452, 0.7548],
        [0.1512, 0.8488],
        [0.1898, 0.8102],
        [0.1877, 0.8123],
        [0.1618, 0.8382],
        [0.2292, 0.7

In [None]:
# def split_data():
#     """
#     ONLY RUN ONCE!!!
#     split the data to 2 class directory using the information from the data file
#     """

#     for row, file_name in enumerate(classification_file["file_name"]):
#       lable = classification_file["target"][row]
#       original_path = os.path.join(data_dir, file_name)
      
#       if lable == PENUMOTHORAX:
#         dest_path = os.path.join(Pneumothorax_dir, file_name)
#       elif lable == NOT_PENUMOTHORAX:
#         dest_path = os.path.join(not_Pneumothorax_dir, file_name)

#       shutil.copy(original_path,dest_path)




In [None]:
# Train and evaluate functions 

def train(data_loader, model, optimizer, device):
    """
    training for one epoch with selected model and params
     data_loader:  pytorch dataloader
     model: pytorch model
     optimizer: optimizer 
     device: cuda/cpu
    """
    # set training mode 
    model.train()
    # go over every batch of data in data loader
    for data in data_loader:
        inputs = data["image"]
        targets = data["targets"]
        # move inputs/targets to cuda/cpu device
        inputs = inputs.to(device, dtype=torch.float)
        targets = targets.to(device, dtype=torch.float)
        # zero grad the optimizer
        optimizer.zero_grad()
        # do the forward step of model
        outputs = model(inputs)
        # calculate loss
        loss = nn.BCEWithLogitsLoss()(outputs, targets.view(-1, 1))
        # backward step the loss
        loss.backward()
        # step optimizer
        optimizer.step()
        
def evaluate(data_loader, model, device):
    """
    Evaluation for one epoch
    data_loader: this is the pytorch dataloader
    model: pytorch model
    device: cuda/cpu
    """
    # put model in evaluation mode
    model.eval()
    # init lists to store targets and outputs
    final_targets = []
    final_outputs = []
    # no_grad context
    with torch.no_grad():
        for data in data_loader:
            inputs = data["image"]
            targets = data["targets"]
            inputs = inputs.to(device, dtype=torch.float)
            targets = targets.to(device, dtype=torch.float)
            # generate prediction
            output = model(inputs)
            # convert targets and outputs to lists
            targets = targets.detach().cpu().numpy().tolist()
            output = output.detach().cpu().numpy().tolist()
            # extend the original list
            final_targets.extend(targets)
            final_outputs.extend(output)
            
    return final_outputs, final_targets
# train and print auc score for all epochs
for epoch in tqdm(range(EPOCHS)):
    # train 
    train(train_loader, model, optimizer, device=device)
    # predict 
    predictions, valid_targets = evaluate(valid_loader, model, device=device)
    # metrics 
    roc_auc = metrics.roc_auc_score(valid_targets, predictions)
    print(f"Epoch={epoch}, Valid ROC AUC={roc_auc}")

    