In [None]:
import pathlib
from matplotlib import pyplot as plt
import torch
from torch import nn
from torch.optim.lr_scheduler import StepLR
from torch.utils.data import DataLoader, random_split
from torchvision.datasets import ImageFolder
from torchvision import transforms
from tqdm.notebook import tqdm
from torchsummary import summary

In [None]:
device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
f"device being used -> {device}"

In [None]:
dataset_dir = pathlib.Path("./Alzheimer_s Dataset")

train_dataset_dir = dataset_dir / "train"
test_dataset_dir = dataset_dir / "test"

transform = transforms.Compose([
    transforms.ToTensor(), # automatically applies min/max scaling
    transforms.Grayscale()
])

batch_size = 32
epochs = 100
classes = [x.parts[-1] for x in train_dataset_dir.iterdir()]
classes

In [None]:
# creating pipelines to load in overall data
train_dataset = ImageFolder(root=train_dataset_dir, transform=transform)
plt.title(train_dataset.classes[train_dataset[0][1]])
plt.imshow(train_dataset[0][0][0], cmap='gray')
plt.show()

IMG_DIM = train_dataset[0][0].size()[1:]
print(f"image dimension -> {IMG_DIM}")

# defining validation data proportion
validation_split = 0.1
validation_images = int(len(train_dataset) * validation_split)
train_images = len(train_dataset) - validation_images

# defining training/validation/testing datasets
train_dataset, val_dataset = random_split(train_dataset, (train_images, validation_images),
                                         generator=torch.Generator().manual_seed(182))

train_dataset = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True, drop_last=True)
val_dataset = DataLoader(dataset=val_dataset, batch_size=batch_size, shuffle=True)

test_dataset = DataLoader(dataset=ImageFolder(root=test_dataset_dir, transform=transform), 
                          batch_size=batch_size, shuffle=True)

print(f"train images -> {len(train_dataset) * batch_size}")
print(f"validation images -> {len(val_dataset) * batch_size}")
print(f"test images -> {len(test_dataset) * batch_size}")

In [None]:
class ConvAct(nn.Module):
    """Convolution block layer that includes batch normalization and a max pooling layer"""
    def __init__(self, in_fitlers, out_filters, kernel_size=3, stride=1, dropout=None):
        super(ConvAct, self).__init__()
        self.in_filters = in_fitlers
        self.out_filters = out_filters
        self.kernel_size = kernel_size
        self.stride = stride
        self.dropout = dropout

        self.conv_layer= nn.Conv2d(in_channels=self.in_filters, out_channels=self.out_filters, 
                                     kernel_size=self.kernel_size, stride=self.stride)
        self.act = nn.LeakyReLU()
        self.batchnorm = nn.BatchNorm2d(num_features=self.out_filters)
        self.maxpooling = nn.MaxPool2d(kernel_size=2, stride=2)
        if self.dropout != None:
            self.dropout_layer = nn.Dropout2d(self.dropout)

    def forward(self, x):
        x = self.conv_layer(x)
        x = self.act(x)
        x = self.batchnorm(x)
        x = self.maxpooling(x)
        if self.dropout != None:
            x = self.dropout_layer(x)
        return x

In [None]:
class LinearAct(nn.Module):
    """Fully connected layer block which includes batch normalization and optional dropout layers"""
    def __init__(self, in_features, out_features, output_layer=False, dropout=None):
        super(LinearAct, self).__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.output_layer = output_layer
        self.dropout = dropout

        self.fc = nn.Linear(in_features=self.in_features, out_features=self.out_features)
        if self.output_layer == False:
            self.batchnorm = nn.BatchNorm1d(num_features=self.out_features)
            self.act = nn.LeakyReLU()
        else:
            self.act = nn.Softmax(dim=1)
        
        if self.dropout != None:
            self.dropout_layer = nn.Dropout(self.dropout)

    def forward(self, x):
        if self.output_layer == False:
            x = self.fc(x)
            x = self.act(x)
            x = self.batchnorm(x)
            if self.dropout != None:
                x = self.dropout_layer(x)
        else:
            x = self.fc(x)
            x = self.act(x)
        return x

In [None]:
class Model(nn.Module):
    def __init__(self):
        super(Model, self).__init__()
        self.conv1 = ConvAct(1, 16)
        self.conv2 = ConvAct(16, 32)
        self.conv3 = ConvAct(32, 64, dropout=0.2)
        self.conv4 = ConvAct(64, 128, dropout=0.2)
        self.conv5 = ConvAct(128, 256, dropout=0.2)

        self.flatten = nn.Flatten()
        
        self.fc1 = LinearAct(in_features=3072, out_features=512, dropout=0.7)
        self.fc2 = LinearAct(in_features=512, out_features=256, dropout=0.5)
        self.fc3 = LinearAct(in_features=256, out_features=64, dropout=0.3)
        self.fc4 = LinearAct(in_features=64, out_features=len(classes), output_layer=True)

    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        x = self.conv4(x)
        x = self.conv5(x)
        
        x = self.flatten(x)
        
        x = self.fc1(x)
        x = self.fc2(x)
        x = self.fc3(x)
        x = self.fc4(x)
        return x

In [None]:
model = Model().to(device)

gamma = 0.98
lr = 3e-4
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
scheduler = StepLR(optimizer=optimizer, step_size=len(train_dataset), gamma=gamma)
criterion = nn.CrossEntropyLoss().to(device)

model, criterion = model.to(device), criterion.to(device)

In [None]:
# visualising learning rate per epoch
learning_rates = [lr]
for x in range(1, epochs):
    learning_rates.append(learning_rates[x-1] * gamma)

plt.figure(figsize=(7, 4))
plt.plot(learning_rates, color='black', label='learning rate', linestyle='--')
plt.legend(loc='best')
plt.title("learning rate every epoch")
plt.tight_layout()
plt.margins(x=0.01, y=0.01)
plt.show()

In [None]:
summary(model, (1, *IMG_DIM))

In [None]:
model

In [None]:
accuracy_per_epoch_train, loss_per_epoch_train = [], []
accuracy_per_epoch_val, loss_per_epoch_val = [], []

In [None]:
for epoch in range(1, epochs+1):
    print(f"Epoch: {epoch}/{epochs}")
    
    # TRAINING
    model.train()
    
    batch_size = len(train_dataset)
    correct_training_predictions = 0
    train_tqdm = tqdm(train_dataset, total=batch_size, colour='black')
    train_losses = []
    train_accuracies = []

    for (x, y) in train_tqdm:
        x = x.to(device).float()
        y = y.to(device)

        pred = model(x).double().cpu()
        loss = criterion(pred.to(device), y)

        predictions = pred.argmax(1).int()
        y = torch.Tensor(y.tolist()).int()

        correct_training_predictions += (predictions == y).sum().item()
        
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        scheduler.step()
        
        train_losses.append(loss.item())
        train_accuracies.append((correct_training_predictions / train_images) * 100)

        average_train_accuracy = round(sum(train_accuracies) / len(train_accuracies), 4)
        average_train_loss = round(sum(train_losses) / len(train_losses), 4)
        train_tqdm.set_description(f"Training Avg Loss: {average_train_loss} || Avg Acc: {average_train_accuracy}%")


    # VALIDATION
    with torch.no_grad():
        model.eval()
        correct_eval_predictions = 0

        val_tqdm = tqdm(val_dataset, total=len(val_dataset), colour='black')
        val_losses = []
        val_accuracies = []

        for (x, y) in val_tqdm:
            x, y = x.to(device), y.to(device)

            pred = model(x).double().cpu()
            loss = criterion(pred.to(device), y)

            predictions = pred.argmax(1).int()
            y = torch.Tensor(y.tolist()).int()

            correct_eval_predictions += (predictions == y).sum().item()

            val_losses.append(loss.item())
            val_accuracies.append((correct_eval_predictions / validation_images) * 100)

            average_val_accuracy = round(sum(val_accuracies) / len(val_accuracies), 4)
            average_val_loss = round(sum(val_losses) / len(val_losses), 4)

            val_tqdm.set_description(f"Validation Avg Loss: {average_val_loss} || Avg Acc: {average_val_accuracy}%")
    print("\n===========================================================================================================\
====================\n")
    # STORING TRAIN AND VALIDATION AVERAGE ACCURACIES AND LOSSES PER EPOCH
    accuracy_per_epoch_train.append(average_train_accuracy)
    accuracy_per_epoch_val.append(average_val_accuracy)
    
    loss_per_epoch_train.append(average_train_loss)
    loss_per_epoch_train.append(average_val_loss)

In [None]:
# Testing
test_images = len(test_dataset)
test_tqdm = tqdm(test_dataset, total=len(test_dataset), colour='black')
test_losses = []
test_accuracies = []
with torch.no_grad():
    model.eval()
    correct_test_predictions = 0

    for (x, y) in test_tqdm:
        x, y = x.to(device), y.to(device)

        pred = model(x).double().cpu()
        loss = criterion(pred.to(device), y)

        predictions = pred.argmax(1).int()
        y = torch.Tensor(y.tolist()).int()

        correct_test_predictions = (predictions == y).sum().item()

        test_losses.append(loss.item())
        test_accuracies.append((correct_test_predictions / test_images) * 100)

        average_test_accuracy = round(sum(test_accuracies) / len(test_accuracies), 4)
        average_test_loss = round(sum(test_losses) / len(test_losses), 4)

        test_tqdm.set_description(f"Test Avg Loss: {average_test_loss} || Avg Acc: {average_test_accuracy}%")

In [None]:
fig, ax = plt.subplots(nrows=2, ncols=1, figsize=(10, 5))
ax[0].set_title(f"Avg Test Acc: {sum(test_accuracies) / len(test_accuracies):.4f}%")
ax[0].plot(accuracy_per_epoch_train, color="black", linestyle="--", label="Training Accuracy")
ax[0].plot(accuracy_per_epoch_val, color="green", linestyle="--", label="Training Validation")
ax[0].fill_between(x=range(epochs), y1=accuracy_per_epoch_train, y2=accuracy_per_epoch_val, color="cyan", alpha=0.2)
ax[0].axhline(sum(test_accuracies) / len(test_accuracies), label="Test Accruacy")
ax[0].margins(x=0.01, y=0.01)
ax[0].legend(loc="best")
plt.tight_layout()

ax[1].set_title(f"Avg Loss: {sum(test_losses) / len(test_losses):.4f}")
ax[1].plot(loss_per_epoch_train, color="black", linestyle="--", label="Training Loss")
ax[1].plot(loss_per_epoch_val, color="green", linestyle="--", label="Validation Loss")
ax[1].fill_between(x=range(epochs), y1=loss_per_epoch_train, y2=loss_per_epoch_val, color="cyan", alpha=0.2)
ax[1].axhline(sum(test_losses) / len(test_losses), label="Test Loss")
ax[1].margins(x=0.01, y=0.01)
ax[1].legend(loc="best")
plt.tight_layout()
plt.show()