In [None]:
import torch
from torch import nn
from torch.utils.data import DataLoader
import numpy as np
import os
from pathlib import Path
import matplotlib.pyplot as plt
import torch.nn.functional as F
import wandb
from torchvision.models import VisionTransformer


In [None]:
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"Using {device} device")

In [None]:
np.random.seed(0)
torch.manual_seed(0)

In [None]:
## Load the testing & training data from data_npy folder
X_test = [np.load('data_npy/testing/{}'.format(file))
          for file in os.listdir('data_npy/testing')]
X_train = [np.load('data_npy/training/{}'.format(file))
           for file in os.listdir('data_npy/training')]

## load the labels and put 1 if the label contains (onlay), 0 if doesn't contain (onlay)
y_train = [1 if 'onlay' in file else 0 for file in os.listdir(
    'data_npy/training')]
y_test = [1 if 'onlay' in file else 0 for file in os.listdir(
    'data_npy/testing')]


In [None]:
# Reshape the data to 4D array
X_train = [x.reshape(-1, 128, 128, 128) for x in X_train]

X_test = [x.reshape(-1, 128, 128, 128) for x in X_test]


In [None]:
## load to tensors
X_train = torch.FloatTensor(np.array(X_train))
X_test = torch.FloatTensor(np.array(X_test))
y_train = torch.LongTensor(np.array(y_train))
y_test = torch.LongTensor(np.array(y_test))


In [None]:
# Define model
class ViT3D(nn.Module):
    def __init__(self):
        super().__init__()

        self.frozen_block = nn.Sequential(
            nn.Conv3d(1, 16, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool3d(kernel_size=2),
            
            nn.Conv3d(16, 32, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool3d(kernel_size=2),
            
            nn.Conv3d(32, 64, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool3d(kernel_size=2),
            
            nn.Conv3d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool3d(kernel_size=2),
            
            nn.Conv3d(128, 256, kernel_size=3, padding=1),
            nn.ReLU(),
            nn.MaxPool3d(kernel_size=2),
            
            
            nn.Flatten(),
        )

        
        self.vit = VisionTransformer(
            image_size=128,
            patch_size=32,
            num_classes=2,
            num_heads=10,
            mlp_dim=2048,
            dropout=0.4,
            attention_dropout=0.4,
            num_layers=10,
            hidden_dim=100,
        )

    def forward(self, x):
        x = self.frozen_block(x)
        x = x.view(-1, 1, 128, 128).repeat(1, 3, 1, 1)
        x = self.vit(x)
        return x


model = ViT3D().to(device)


In [None]:
train_dataloader = [(torchX, torchY)
                    for torchX, torchY in zip(X_train, y_train)]
test_dataloader = [(torchX, torchY) for torchX, torchY in zip(X_test, y_test)]


In [None]:
batch_size = 2
epochs = 500
learning_rate = 1e-5
momentum = 0.9


In [None]:
train_dataloader = DataLoader(train_dataloader, batch_size, shuffle=True)
test_dataloader = DataLoader(test_dataloader, batch_size, shuffle=False)


In [None]:
loss_fn = nn.CrossEntropyLoss()
optimiser = torch.optim.Adam(model.parameters(), lr=learning_rate)


In [None]:
# start a new wandb run to track this script
wandb.init(
    # set the wandb project where this run will be logged
    project="cse-499a-project-3d-tooth-vit",

    # track hyperparameters and run metadata
    config={
        "learning_rate": learning_rate,
        "architecture": "CNN + ViT 3D",
        "dataset": "3D Tooth",
        "epochs": epochs,
    }
)


In [None]:
# Define training loop
def train(dataloader, model, loss_fn, optimizer):
    size = len(dataloader.dataset)
    model.train()
    for batch, (X, y) in enumerate(dataloader):
        X, y = X.to(device), y.to(device)

        # Compute prediction error
        pred = model(X)
        loss = loss_fn(pred, y)

        # Backpropagation
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Print
        loss, current = loss.item(), batch * len(X)
        # wandb.log({"loss": loss})
        print(f"loss: {loss:>7f}  [{current:>5d}/{size:>5d}]")


In [None]:
# Define test function
def test(dataloader, model, loss_fn):
    size = len(dataloader.dataset)
    num_batches = len(dataloader)
    model.eval()
    test_loss, correct = 0, 0
    with torch.no_grad():
        for X, y in dataloader:
            X, y = X.to(device), y.to(device)
            pred = model(X)
            test_loss += loss_fn(pred, y).item()
            correct += (pred.argmax(1) == y).type(torch.float).sum().item()
    test_loss /= num_batches
    correct /= size
    accuracy = (100*correct)
    loss = test_loss
    # log metrics to wandb
    #wandb.log({"accuracy": accuracy, "avg_loss": loss})
    print(
        f"Test Error: \n Accuracy: {(100*correct):>0.1f}%, Avg loss: {test_loss:>8f} \n")
    return loss, accuracy


In [None]:
losses = []
min_loss = 100.0
for t in range(epochs):
    print(f"Epoch {t+1}\n-------------------------------")
    train(train_dataloader, model, loss_fn, optimiser)
    l, a = test(test_dataloader, model, loss_fn)
    if(l < min_loss):
        min_loss = l
        # Save model
        torch.save(model.state_dict(), "./saved_models/cnn_with_vit_1_accuracy"+str(a)+"__loss"+str(l)+".pth")
        print("Saved PyTorch Model State to model.pt")
    
print("Done!")
