In [None]:
try:
  import torchinfo
except:
  !pip install torchinfo
  import torchinfo

try:
  import torchmetrics
except:
  !pip install torchmetrics
  from torchmetrics import ConfusionMatrix


In [None]:
import os
import shutil

from google.colab import drive

drive.mount('/content/drive')

def copy_files_recursive(source_folder, destination_folder):
    for root, dirs, files in os.walk(source_folder):
        for file in files:
            source_path = os.path.join(root, file)
            destination_path = os.path.join(destination_folder, os.path.relpath(source_path, source_folder))

            # Create destination directories if they don't exist
            os.makedirs(os.path.dirname(destination_path), exist_ok=True)

            shutil.copyfile(source_path, destination_path)

source_folder = '/content/drive/MyDrive/chest_xray'
destination_folder = '/content/chest_xray'
if not os.path.isdir(destination_folder):
  copy_files_recursive(source_folder, destination_folder)
else:
  print("already downloaded")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
already downloaded


In [None]:
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
import matplotlib.pyplot as plt
import zipfile
import requests
from pathlib import Path
from PIL import Image
import random
import numpy as np
import matplotlib.pyplot as plt
import os
from timeit import default_timer as timer
from torchvision.models import resnet34

hidden_size = 32
num_classes = 2
NUM_EPOCHS = 50
BATCH_SIZE = 64
learning_rate = 0.0001

image_height = 256
image_width = 256
input_size = image_height * image_width


device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

data_path = Path("/content/chest_xray")

train_dir = data_path / "train"
test_dir = data_path / "test"

print(test_dir)

/content/chest_xray/test


In [None]:
image_path_list = list(data_path.glob("*/*/*.jpeg"))

train_transform = transforms.Compose([
  transforms.Resize(size=(image_height, image_width)),
  transforms.RandomRotation(degrees=35),
  transforms.Grayscale(num_output_channels=1),
  transforms.ToTensor()
])

test_transform = transforms.Compose([
  transforms.Resize(size=(image_height, image_width)),
  transforms.Grayscale(num_output_channels=1),
  transforms.ToTensor()
])

train_data = torchvision.datasets.ImageFolder(root=train_dir,
                                  transform=train_transform,
                                  target_transform=None)

test_data = torchvision.datasets.ImageFolder(root=test_dir,
                                  transform=test_transform)

img = train_data[0][0]
img
img.shape

torch.Size([1, 256, 256])

In [None]:
NUM_WORKERS=os.cpu_count()

#Oversampling to resolve Problem of imbalanced Dataset
class_weights = [1/1342,1/3872]
sample_weights = [0] * len(train_data)


for i, (data, label) in enumerate(train_data):
  sample_weights[i] = class_weights[label]

sampler = torch.utils.data.WeightedRandomSampler(sample_weights,
                                                 num_samples = len(sample_weights),
                                                 replacement = True)


train_dataloader = DataLoader(dataset=train_data,
                              batch_size=BATCH_SIZE,
                              num_workers=NUM_WORKERS,
                              sampler = sampler)

test_dataloader = DataLoader(dataset=test_data,
                             batch_size=BATCH_SIZE,
                             num_workers=NUM_WORKERS,
                             shuffle=False,
                             )


In [None]:

example_data, example_targets = next(iter(train_dataloader))

for i in range(6):
  plt.subplot(2, 3, i+1)
  plt.imshow(example_data[i][0], cmap="gray")
  if example_targets[i].item() == 0:
    plt.title("Normal")
  else:
    plt.title("Pneumonia")
  plt.axis("off")
plt.show()

In [None]:
# @title Standardtext für Titel
class NeuralNet(nn.Module):
  def __init__(self, input_size, hidden_size, output_size):
    super(NeuralNet, self).__init__()
    self.layer1 = nn.Sequential(
      nn.Conv2d(in_channels = 1,
                out_channels = 16,
                kernel_size = 3,
                stride = 1,
                padding = 1),
      nn.ReLU(),
      nn.MaxPool2d(kernel_size = 2),
      nn.Conv2d(in_channels = 16,
                out_channels = 16,
                kernel_size = 3,
                stride = 1,
                padding = 1),
      nn.ReLU(),
      nn.MaxPool2d(kernel_size = 2),
    )
    self.layer2 = nn.Sequential(
      nn.Conv2d(in_channels = 16,
                out_channels = 32,
                kernel_size = 3,
                stride = 1,
                padding = 1),
      nn.ReLU(),
      nn.MaxPool2d(kernel_size = 2),
      nn.Conv2d(in_channels = 32,
                out_channels = 32,
                kernel_size = 3,
                stride = 1,
                padding = 1),
      nn.ReLU(),
      nn.MaxPool2d(kernel_size = 2)
    )

    self.classifier = nn.Sequential(
        nn.Dropout(p=0.5),
        nn.Flatten(),
        nn.Linear(in_features = 32*int((image_height/16))*int((image_width/16)),
                  out_features = 128),
        nn.ReLU(),
        nn.Dropout(0.5),
        nn.Linear(in_features = 128,
          out_features = 128),
        nn.ReLU(),
        nn.Dropout(0.5),
        nn.Linear(128, 2)

    )


  def forward(self, x):

    x = self.layer1(x)
    x = self.layer2(x)
    x = self.classifier(x)
    return x

model = NeuralNet(input_size, hidden_size, num_classes).to(device)


# Loss and optimizer
loss_function = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(),
                             lr=learning_rate,
                             weight_decay=1e-2)

In [None]:
def plot_loss_curves(results):
    loss = results['train_loss']
    test_loss = results['test_loss']

    accuracy = results['train_acc']
    test_accuracy = results['test_acc']

    epochs = range(len(results['train_loss']))

    plt.clf()

    plt.figure(figsize=(15, 7))

    plt.subplot(1, 2, 1)
    plt.plot(epochs, loss, label='train_loss')
    plt.plot(epochs, test_loss, label='test_loss')
    plt.title('Loss')
    plt.xlabel('Epochs')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(epochs, accuracy, label='train_accuracy')
    plt.plot(epochs, test_accuracy, label='test_accuracy')
    plt.title('Accuracy')
    plt.xlabel('Epochs')
    plt.legend()

    plt.show()

In [None]:
def train_step(model, dataloader, loss_function, optimizer):
  loss_acc, train_accuracy = 0, 0


  for images, labels in dataloader:
    images = images.to(device)
    labels = labels.to(device)


    labels_predict = model(images)

    loss = loss_function(labels_predict, labels)
    loss_acc += loss.item()


    loss.backward()
    optimizer.step()
    optimizer.zero_grad()

    train_loss = loss_acc / len(dataloader)

    labels_predict_class = torch.argmax(labels_predict, dim=1)
    train_accuracy += (labels_predict_class==labels).sum().item()/len(labels_predict)
  loss_acc = loss_acc / len(dataloader)
  train_accuracy = train_accuracy / len(dataloader)
  return loss_acc, train_accuracy

In [None]:
def test_step(model, dataloader, loss_function):
  labels_predicted = []
  labels_expected = []
  with torch.no_grad():
    n_samples = len(dataloader.dataset)
    n_correct = 0
    test_loss = 0
    for images, labels in dataloader:
      images = images.to(device)
      labels = labels.to(device)

      outputs = model(images)

      loss = loss_function(outputs, labels)
      test_loss += loss.item()

      _, label_predicted = torch.max(outputs, 1)

      labels_predicted.append(label_predicted)
      labels_expected.append(labels)

      boolTensor = (label_predicted == labels)

      n_correct += boolTensor.sum().item()


    accuracy = n_correct / n_samples
    test_loss = test_loss / len(dataloader)

  return test_loss, accuracy, labels_predicted, labels_expected

In [None]:
def train(model, train_dataloader, test_dataloader, optimizer, loss_function, num_epochs):
  #animator = d2l.torch.Animator(xlabel='epoch', xlim=[1, num_epochs],
                       # legend=['class error', 'bbox mae'])
  results = {"train_loss": [],
      "train_acc": [],
      "test_loss": [],
      "test_acc": []
  }
  start_time = timer()
  for epoch in range(num_epochs):
    loss_acc = 0
    train_accuracy = 0
    train_loss, train_acc = train_step(model = model,
                                      dataloader = train_dataloader,
                                      loss_function = loss_function,
                                      optimizer = optimizer)

    test_loss, test_acc, _, _ = test_step(model = model,
                                    dataloader = test_dataloader,
                                    loss_function = loss_function)
    end_time = timer()
    print(
        f"Epoch: {epoch+1} | "
        f"train_loss: {train_loss:.4f} | "
        f"train_acc: {train_acc:.4f} | "
        f"test_loss: {test_loss:.4f} | "
        f"test_acc: {test_acc:.4f} | "
        f"time_total {end_time-start_time}"
    )

    results["train_loss"].append(train_loss)
    results["train_acc"].append(train_acc)
    results["test_loss"].append(test_loss)
    results["test_acc"].append(test_acc)

    if (epoch+1) % 5 == 0:
      plot_loss_curves(results)
  return results

In [None]:
model_results = train(model = model,
                      train_dataloader = train_dataloader,
                      test_dataloader = test_dataloader,
                      optimizer = optimizer,
                      loss_function = loss_function,
                      num_epochs = NUM_EPOCHS)

plot_loss_curves(model_results)

In [None]:
torchinfo.summary(model)

In [None]:
test_loss, test_acc, labels_predicted, labels_expected = test_step(model,
                                                                   test_dataloader,
                                                                   loss_function)

labels_expected = torch.stack(labels_expected)
labels_predicted = torch.stack(labels_predicted)

print(labels_expected)
print(labels_predicted)

In [None]:
import seaborn
import matplotlib.pyplot as plt

confmat = ConfusionMatrix(task = "binary", num_classes=2).to(device)
confmat_tensor = confmat(preds = labels_predicted,
                         target = labels_expected)

confmat_normalized = confmat_tensor/torch.sum(confmat_tensor, axis=1).reshape(-1,1)

print(confmat_normalized)

hm = seaborn.heatmap(confmat_normalized.cpu(),
                annot = True,
                cmap="crest",
                xticklabels=["NORMAL","PNEUMONIA"],
                yticklabels=["NORMAL","PNEUMONIA"])
plt.xlabel("Predicted")
plt.ylabel("ACtual")
plt.title("Confusion Matrix")
plt.show()

In [None]:
test_loss, test_accuracy, true_labels, predicted_labels = evaluate_model(h, model, test_dataloader, loss_function, device)
 plot_metrics(h, model_results, test_loss, test_accuracy, true_labels, predicted_labels)