In [None]:
# Standard library imports
import copy

# Third-party imports
import matplotlib.pyplot as plt
import time
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, models, transforms

In [None]:
PATH_RESIZED = "../data/resized"
PATH_TRAIN = f"{PATH_RESIZED}/train"
PATH_TEST = f"{PATH_RESIZED}/test"
TRAIN_0_PATH = f"{PATH_TRAIN}/normal"
TRAIN_1_PATH = f"{PATH_TRAIN}/pneumonia"
TEST_0_PATH = f"{PATH_TEST}/normal"
TEST_1_PATH = f"{PATH_TEST}/pneumonia"

In [None]:
print(f"Version: {torch.__version__}, GPU: {torch.cuda.is_available()}, NUM_GPU: {torch.cuda.device_count()}")

In [None]:
cuda_available = torch.cuda.is_available()
print(f"CUDA available: {cuda_available}")

# Print GPU details
if cuda_available:
    print(f"GPU device name: {torch.cuda.get_device_name(0)}")

In [None]:
mean_nums = [0.485, 0.456, 0.406]
std_nums = [0.229, 0.224, 0.225]
transform = transforms.Compose([transforms.ToTensor()])

# Load your dataset
train_dataset = datasets.ImageFolder(PATH_TRAIN, transform=transform)
test_dataset = datasets.ImageFolder(PATH_TEST, transform=transform)

In [None]:
# How many samples are there? 
len(train_dataset), len(train_dataset.targets), len(test_dataset), len(test_dataset.targets)

In [None]:
class_names = train_dataset.classes
class_names

In [None]:
image, label = train_dataset[0]
print(f"Image shape: {image.shape}")

image = image.permute(1, 2, 0)

plt.imshow(image.squeeze())
plt.title(label)
plt.show()

In [None]:
# Plot more images
torch.manual_seed(42)
fig = plt.figure(figsize=(9, 9))
rows, cols = 4, 4
for i in range(1, rows * cols + 1):
    random_idx = torch.randint(0, len(train_dataset), size=[1]).item()
    img, label = train_dataset[random_idx]
    fig.add_subplot(rows, cols, i)
    img = img.permute(1, 2, 0)
    plt.imshow(img.squeeze(), cmap="gray")
    plt.title(class_names[label])
    plt.axis(False);

In [None]:
# Create DataLoader
trainloader = DataLoader(train_dataset, batch_size=64, shuffle=False, num_workers=4)
testloader = DataLoader(test_dataset, batch_size=64, shuffle=False, num_workers=4)

In [None]:
res_mod = models.resnet50(pretrained=True)

num_ftrs = res_mod.fc.in_features
res_mod.fc = nn.Linear(num_ftrs, 2)

In [None]:
for name, child in res_mod.named_children():
    print(name)

In [None]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

In [None]:
res_mod = res_mod.to(device)

# 2 - Define our loss function
criterion = nn.CrossEntropyLoss()

# 3 - Optmization - Observe that all parameters are being optimized
optimizer_ft = optim.SGD(res_mod.parameters(), lr=0.001, momentum=0.9)

# Decay LR by a factor of 0.1 every 7 epochs
exp_lr_scheduler = optim.lr_scheduler.StepLR(optimizer_ft, step_size=7, gamma=0.1)

In [None]:
dataloaders = {"train": trainloader, "test": testloader}

In [None]:
dataset_sizes = {x: len(dataloaders[x].dataset) for x in ['train', 'test']}

In [None]:
def train_model(model, criterion, optimizer, scheduler, num_epochs=10):
    since = time.time()
    list_train_loss = [] 
    list_train_acc = [] 
    list_val_loss = []
    list_val_acc = []
    
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0

    for epoch in range(num_epochs):
        print('Epoch {}/{}'.format(epoch, num_epochs - 1))
        print('-' * 10)

        # Each epoch has a training and test phase
        for phase in ['train', 'test']:
            if phase == 'train':
                scheduler.step()
                # Set model to training mode
                model.train()  
            else:
                # Set model to evaluate mode
                model.eval()  

            current_loss = 0.0
            current_corrects = 0

            # Here's where the training happens
            print('Iterating through data...')

            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                # We need to zero the gradients, don't forget it
                optimizer.zero_grad()

                # Time to carry out the forward training poss
                # We only need to log the loss stats if we are in training phase
                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    # backward + optimize only if in training phase
                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                # We want variables to hold the loss statistics
                current_loss += loss.item() * inputs.size(0)
                current_corrects += torch.sum(preds == labels.data)

            epoch_loss = current_loss / dataset_sizes[phase]
            epoch_acc = current_corrects.double() / dataset_sizes[phase]

            print('{} Loss: {:.4f} Acc: {:.4f}'.format(phase, epoch_loss, epoch_acc))

            #register to output lists
            if phase == 'train':
              list_train_loss.append(epoch_loss)
              list_train_acc.append(epoch_acc)
            if phase == 'val':
              list_val_loss.append(epoch_loss)
              list_val_acc.append(epoch_acc)

            # Make a copy of the model if the accuracy on the validation set has improved
            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())

        print()
        # Variables to be written on tensorboard!
        #writer.add_scalar('Loss/train', epoch_loss/len(trainloader), epoch)

    time_since = time.time() - since
    print('Training complete in {:.0f}m {:.0f}s'.format(time_since // 60, time_since % 60))
    print('Best val Acc: {:4f}'.format(best_acc))
    
    # Now we'll load in the best model weights and return it
    model.load_state_dict(best_model_wts)
    return model, list_train_loss, list_train_acc, list_val_loss, list_val_acc

In [None]:
base_model, train_loss, train_acc, val_loss, val_acc = train_model(res_mod, criterion, optimizer_ft, exp_lr_scheduler, num_epochs=10)

In [None]:
import torch.nn.functional as F


In [None]:
class CNNModel(nn.Module):
    def __init__(self):
        super(CNNModel, self).__init__()
        self.conv1 = nn.Conv2d(in_channels=3, out_channels=32, kernel_size=3, stride=1, padding=1)
        self.pool = nn.MaxPool2d(kernel_size=2, stride=2, padding=0)
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1)
        self.conv3 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1, padding=1)
        self.fc1 = nn.Linear(128 * 28 * 28, 512)
        self.fc2 = nn.Linear(512, 2)
        self.dropout = nn.Dropout(0.5)

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = self.pool(F.relu(self.conv3(x)))
        x = x.view(-1, 128 * 28 * 28)
        x = F.relu(self.fc1(x))
        x = self.dropout(x)
        x = self.fc2(x)
        return x

model = CNNModel()
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = model.to(device)


In [None]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:
num_epochs = 30
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for inputs, labels in trainloader:
        inputs, labels = inputs.to(device), labels.to(device)
        
        optimizer.zero_grad()
        
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item() * inputs.size(0)
    
    epoch_loss = running_loss / len(trainloader.dataset)
    print(f'Epoch {epoch+1}/{num_epochs}, Loss: {epoch_loss:.4f}')


In [None]:
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for inputs, labels in testloader:
        inputs, labels = inputs.to(device), labels.to(device)
        outputs = model(inputs)
        _, preds = torch.max(outputs, 1)
        correct += (preds == labels).sum().item()
        total += labels.size(0)
        
accuracy = correct / total
print(f'Validation Accuracy: {accuracy:.4f}')