In [1]:
try:
  import torcheval
except:
  !pip install torcheval -q
  import torcheval

[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m179.2/179.2 kB[0m [31m7.1 MB/s[0m eta [36m0:00:00[0m
[?25h

In [2]:
import torch
import torchvision
import requests
import zipfile
import shutil
import os
import pathlib
import random
import matplotlib.pyplot as plt
from PIL import Image

from torch import nn
from pathlib import Path
from typing import Tuple, Dict, List
from torchvision import datasets, transforms
from torch.utils.data import Dataset, DataLoader
from torcheval.metrics.functional import binary_accuracy, binary_precision, binary_recall, binary_f1_score
from typing import Optional
from tqdm.auto import tqdm

In [3]:
#Setup device diagonstic code
device = "cuda" if torch.cuda.is_available() else "cpu"
device

'cuda'

# Loading training and testing image data into directories
*Note: the image data is kept in a private repo as I do not know if it can be public.*

In [4]:
#Data is kept in a private repo
DATA_PATH = Path("data/")
IMAGE_PATH = DATA_PATH / "model_training_data"
TOKEN = "..."
TRAIN_URL = "https://raw.githubusercontent.com/gordonluo13/BFO-ML-Data/main/Images_For_Model_Training/Images_For_Model_Training.zip"
EVAL_URL = "https://raw.githubusercontent.com/gordonluo13/BFO-ML-Data/main/Images_For_Model_Evaluation/Images_For_Model_Evaluation.zip"
TRAIN_ZIP_NAME = os.path.basename(TRAIN_URL)
EVAL_ZIP_NAME = os.path.basename(EVAL_URL)

if IMAGE_PATH.is_dir():
  print(f"{IMAGE_PATH} directory already exists.")
else:
  print(f"{IMAGE_PATH} directory does not exist, creating one.")
  IMAGE_PATH.mkdir(parents=True, exist_ok=True)

data/model_training_data directory does not exist, creating one.


In [5]:
def load_image_data_fn(token: str, url: str, data_path, image_path, zip_folder_name: str):
    headers = {"Authorization" : f"token {token}",
              "User-Agent" : "Mozzilla/5.0"}
    response = requests.get(url, headers=headers)
    if response.status_code == 200:
        with open(data_path / zip_folder_name, "wb") as f:
            f.write(response.content)
        print("Zip file downloaded successfully.")
    else:
        print("Zip file could not be downloaded, error status:", response.status_code)
    
    with zipfile.ZipFile(data_path / zip_folder_name, "r") as zip_ref:
        print("Unzipping image data.")
        zip_ref.extractall(image_path)
        
    macosx_path = image_path/"__MACOSX"
    if os.path.exists(macosx_path):
        shutil.rmtree(macosx_path)
        print("Deleting __MACOSX metadata folder.")
    else:
        print("No __MACOSX metadata folder exists.")
    

In [6]:
load_image_data_fn(TOKEN, TRAIN_URL, DATA_PATH, IMAGE_PATH, TRAIN_ZIP_NAME)

Zip file downloaded successfully.
Unzipping image data.
Deleting __MACOSX metadata folder.


In [7]:
TARGET_DIR = IMAGE_PATH/"Images_For_Model_Training/Train"

In [8]:
def find_classes_fn(directory: str) -> Tuple[List[str], Dict[str, int]]:
  classes = sorted(entry.name for entry in list(os.scandir(directory)) if entry.is_dir())

  if not classes:
    raise FileNotFoundError(f"Could not find any classes in {directory}")

  class_to_idx = {class_name: i for i, class_name in enumerate(classes)}
  return classes, class_to_idx

In [9]:
class_names = find_classes_fn(TARGET_DIR)[0]

# Data preparation
## Adding data augmentation

In [10]:
def transform_fn (height, width, type_data: str):
  if type_data == "train":
    prob = 0.5
  else:
    prob = 0
  return transforms.Compose([transforms.Resize(size=(height, width)),
                                     transforms.RandomHorizontalFlip(p=prob),
                                     transforms.ToTensor()])

In [11]:
class BFOImageData(Dataset):
  def __init__(self, target_dir: str, transform=None) -> None:
    self.paths = list(pathlib.Path(target_dir).glob("*/*.jpg")) + list(pathlib.Path(target_dir).glob("*/*.png"))
    self.transform = transform
    self.classes, self.class_to_idx = find_classes_fn(target_dir)

  def load_image(self, index: int) -> Image.Image:
    image_path = self.paths[index]
    return Image.open(image_path).convert("RGB")

  def __len__(self) -> int:
    return len(self.paths)

  def __getitem__(self, index: int) -> Tuple[torch.Tensor, int]:
    img = self.load_image(index)
    class_name = self.paths[index].parent.name
    class_idx = self.class_to_idx[class_name]

    if self.transform:
      return self.transform(img), class_idx
    else:
      return img, class_idx

In [12]:
TRAIN_DIR = IMAGE_PATH/"Images_For_Model_Training/Train"
TEST_DIR = IMAGE_PATH/"Images_For_Model_Training/Test"

In [13]:
IMAGE_SIZE = 512

In [14]:
train_BFO_data = BFOImageData(TRAIN_DIR, transform_fn(IMAGE_SIZE, IMAGE_SIZE, "train"))
test_BFO_data = BFOImageData(TEST_DIR, transform_fn(IMAGE_SIZE, IMAGE_SIZE, "test"))

In [15]:
BATCH_SIZE = 32
NUM_WORKERS = os.cpu_count()

## Dataloaders

In [16]:
train_dataloader = DataLoader(dataset=train_BFO_data,batch_size=BATCH_SIZE, num_workers=NUM_WORKERS, shuffle=True)
test_dataloader = DataLoader(dataset=test_BFO_data, batch_size=BATCH_SIZE, num_workers=NUM_WORKERS)

# Creating the model class, as a CNN with 3 convolutional layers

In [17]:
class ModelBFO(nn.Module):
  def __init__(self,
               input_shape: int,
               hidden_units: int,
               output_shape: int,
               image_height: int = 64,
               image_width: int = 64) -> None:
      super().__init__()
      self.conv_block_1 = nn.Sequential(
          nn.Conv2d(input_shape, hidden_units, 3, 1, 1),
          nn.BatchNorm2d(hidden_units),
          nn.ReLU(),
          nn.Conv2d(hidden_units, hidden_units, 3, 1, 1),
          nn.BatchNorm2d(hidden_units),
          nn.ReLU(),
          nn.MaxPool2d(2),
          nn.Dropout(0.25)
      )
      self.conv_block_2= nn.Sequential(
          nn.Conv2d(hidden_units, hidden_units, 3, 1, 1),
          nn.BatchNorm2d(hidden_units),
          nn.ReLU(),
          nn.Conv2d(hidden_units, hidden_units, 3, 1, 1),
          nn.BatchNorm2d(hidden_units),
          nn.ReLU(),
          nn.MaxPool2d(2),
          nn.Dropout(0.25)
      )
      self.conv_block_3 = nn.Sequential(
          nn.Conv2d(hidden_units, hidden_units, 3, 1, 1),
          nn.BatchNorm2d(hidden_units),
          nn.ReLU(),
          nn.Conv2d(hidden_units, hidden_units, 3, 1, 1),
          nn.BatchNorm2d(hidden_units),
          nn.ReLU(),
          nn.MaxPool2d(2),
          nn.Dropout(0.25)
      )

      #Dynamically determining the output shape after the conv blocks using a dummy input
      with torch.no_grad():
        dummy_input = torch.zeros(1, input_shape, image_height, image_width)
        x = self.conv_block_1(dummy_input)
        x = self.conv_block_2(x)
        x = self.conv_block_3(x)
        flatten_size = x.view(1, -1).shape[1]

      self.classifier = nn.Sequential(
          nn.Flatten(),
          nn.Linear(flatten_size,
                    output_shape)
      )
  def forward(self, x):
      x = self.conv_block_1(x)
      x = self.conv_block_2(x)
      x = self.conv_block_3(x)
      x = self.classifier(x)
      return x

In [18]:
THRESHOLD = 0.5

## Functions for training and testing the model

In [19]:
def training_fn(model: torch.nn.Module,
             dataloader: torch.utils.data.DataLoader,
             loss_fn: torch.nn.Module,
             optimizer: torch.optim.Optimizer,
             thresh: float = 0.5):
  model.train()
  train_loss =  0
  
  for batch, (X,y) in enumerate(dataloader):
    X, y = X.to(device), y.to(device).float().unsqueeze(1)

    train_logits = model(X)

    loss = loss_fn(train_logits, y)
    train_loss += loss.item()

    optimizer.zero_grad()

    loss.backward()

    optimizer.step()

    train_pred = torch.sigmoid(train_logits)  

    #print(y.squeeze().dtype())
    #print(y.view(torch.int).squeeze())

  train_loss = train_loss / len(dataloader)
  train_acc = binary_accuracy(train_pred.squeeze(), y.squeeze(), threshold=thresh)
  train_prec = binary_precision(train_pred.squeeze(), y.squeeze())
  train_rec = binary_recall(train_pred.squeeze(), y.int().squeeze(), threshold=thresh)
  train_f1 = binary_f1_score(train_pred.squeeze(), y.squeeze(), threshold=thresh)  
    
  return train_loss, train_acc, train_prec, train_rec, train_f1

In [20]:
def testing_fn(model: torch.nn.Module,
             dataloader: torch.utils.data.DataLoader,
             loss_fn: torch.nn.Module,
             thresh: float = 0.5):
  model.eval()
  test_loss = 0
  with torch.inference_mode():
    for batch, (X,y) in enumerate(dataloader):
      X, y = X.to(device), y.to(device).float().unsqueeze(1)

      test_logits = model(X)

      loss = loss_fn(test_logits, y)
      test_loss += loss.item()

      test_pred = torch.sigmoid(test_logits)
  
  test_loss = test_loss / len(dataloader)
  test_acc = binary_accuracy(test_pred.squeeze(), y.squeeze(), threshold=thresh) 
  test_prec = binary_precision(test_pred.squeeze(), y.squeeze())
  test_rec = binary_recall(test_pred.squeeze(), y.int().squeeze(), threshold=thresh) 
  test_f1 = binary_f1_score(test_pred.squeeze(), y.squeeze(), threshold=thresh)  
  return test_loss, test_acc, test_prec, test_rec, test_f1

In [21]:
def train(model: torch.nn.Module,
          train_dataloader: torch.utils.data.DataLoader,
          test_dataloader: torch.utils.data.DataLoader,
          optimizer: torch.optim.Optimizer,
          loss_fn: torch.nn.Module = nn.CrossEntropyLoss(),
          thresh: float = 0.5,
          epochs: int = 5,
          device=device):
  results = {"train_loss": [],
             "train_acc": [],
             "train_prec": [],
             "train_rec": [],
             "train_f1": [],
            "test_loss": [],
             "test_acc": [],
             "test_prec": [],
             "test_rec": [],
             "test_f1": []
             }
  for epoch in tqdm(range(epochs)): #range is 0 indexed!
    train_loss, train_acc, train_prec, train_rec, train_f1 = training_fn(model_BFO,
                                                                         dataloader=train_dataloader,
                                                                         optimizer=optimizer,
                                                                         loss_fn=loss_fn, 
                                                                         thresh=thresh)
    test_loss, test_acc, test_prec,test_rec, test_f1  = testing_fn(model_BFO,
                                                                   dataloader=train_dataloader,
                                                                   loss_fn=loss_fn, 
                                                                   thresh=thresh)
    if epoch == epochs - 1:
        print(f" Test Acc: {test_acc:.4f}| Test Prec: {test_prec:.4f}| Test Recall: {test_rec:.4f} |Test F1: {test_f1:.4f}")
     
    results["train_loss"].append(train_loss)
    results["train_acc"].append(train_acc)
    results["train_prec"].append(train_prec)
    results["train_rec"].append(train_rec)
    results["train_f1"].append(train_f1)
    results["test_loss"].append(test_loss)
    results["test_acc"].append(test_acc)
    results["test_prec"].append(test_prec)
    results["test_rec"].append(test_rec)
    results["test_f1"].append(test_f1)
  return results

In [37]:
NUM_EPOCHS = 35

model_BFO = ModelBFO(3, 30, 1, IMAGE_SIZE, IMAGE_SIZE).to(device)

loss_fn = nn.BCEWithLogitsLoss()
optimizer = torch.optim.Adam(params=model_BFO.parameters(),
                             lr=1e-3,
                             weight_decay=1e-5)

model_BFO_results = train(model_BFO,
                         train_dataloader=train_dataloader,
                         test_dataloader=test_dataloader,
                         optimizer=optimizer,
                         loss_fn=loss_fn,
                         epochs=NUM_EPOCHS)

  0%|          | 0/35 [00:00<?, ?it/s]

 Test Acc: 0.9545| Test Prec: 0.9231| Test Recall: 1.0000 |Test F1: 0.9600


In [38]:
MODEL_PATH = Path("models")
MODEL_PATH.mkdir(parents = True, exist_ok = True)

if os.path.exists(MODEL_PATH):
    shutil.rmtree(MODEL_PATH)
    print("Deleting model_path metadata folder.")
else:
  print("No model_path metadata folder.")

MODEL_PATH = Path("models")
MODEL_PATH.mkdir(parents = True, exist_ok = True)
MODEL_NAME = "BFO_ML_BFO.pth"
MODEL_SAVE_PATH = MODEL_PATH / MODEL_NAME
print(f"Saving model to: {MODEL_SAVE_PATH}")
torch.save(obj = model_BFO.state_dict(), f=MODEL_SAVE_PATH)

Deleting model_path metadata folder.
Saving model to: models/BFO_ML_Test_BFO.pth


# Loading parameters from model
*Note: while the notebook takes in a .pth file from a private repo, the same file is made available publically on this repo.*

In [24]:
def load_model_fn(token: str, url: str, data_path, zip_folder_name: str):
    headers = {"Authorization" : f"token {token}",
              "User-Agent" : "Mozzilla/5.0"}
    response = requests.get(url, headers=headers)
    file_path = data_path / zip_folder_name
    if response.status_code == 200:
        with open(file_path, "wb") as f:
            f.write(response.content)
        print("Model parameters downloaded successfully.")
        return file_path
    else:
        print("Model parameters not be downloaded, error status:", response.status_code)
        return none

In [25]:
LOAD_MODEL_URL = "https://raw.githubusercontent.com/gordonluo13/BFO-ML-Data/main/Model_Parameters/BFO_ML_Model_0_Parameters.pth"
LOAD_MODEL_NAME = os.path.basename(LOAD_MODEL_URL)
LOAD_MODEL_SAVE_PATH = MODEL_PATH/"Loaded Models"
if LOAD_MODEL_SAVE_PATH.is_dir():
  print(f"{LOAD_MODEL_SAVE_PATH} directory already exists.")
else:
  print(f"{LOAD_MODEL_SAVE_PATH} directory does not exist, creating one.")
  LOAD_MODEL_SAVE_PATH.mkdir(parents=True, exist_ok=True)


load_model_fn(TOKEN, LOAD_MODEL_URL, LOAD_MODEL_SAVE_PATH, LOAD_MODEL_NAME)

models/Loaded Models directory does not exist, creating one.
Model parameters downloaded successfully.


PosixPath('models/Loaded Models/BFO_ML_Model_0_Parameters.pth')

In [26]:
loaded_model = ModelBFO(3, 30, 1, IMAGE_SIZE, IMAGE_SIZE)
loaded_model.load_state_dict(torch.load(LOAD_MODEL_SAVE_PATH / LOAD_MODEL_NAME))
loaded_model.to(device)

ModelBFO(
  (conv_block_1): Sequential(
    (0): Conv2d(3, 30, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(30, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): Conv2d(30, 30, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): BatchNorm2d(30, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): ReLU()
    (6): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (7): Dropout(p=0.25, inplace=False)
  )
  (conv_block_2): Sequential(
    (0): Conv2d(30, 30, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): BatchNorm2d(30, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU()
    (3): Conv2d(30, 30, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (4): BatchNorm2d(30, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (5): ReLU()
    (6): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)

# Evaluating the model against unseen T-phase and non-T-phase BFO AFM images

*Note: while the notebook takes image data from a private repo, the same data is made available publically on this repo.*

In [27]:
EVAL_PATH = DATA_PATH/ "model_evaluation_data"
load_image_data_fn(TOKEN, EVAL_URL, DATA_PATH, EVAL_PATH, EVAL_ZIP_NAME)

Zip file downloaded successfully.
Unzipping image data.
Deleting __MACOSX metadata folder.


In [28]:
eval_class_names = find_classes_fn(EVAL_PATH/"Images_For_Model_Evaluation")[0]
not_t_phase_class = (str(eval_class_names[0]))
t_phase_class = (str(eval_class_names[1]))

In [29]:
def image_eval_fn(eval_path: str, image_class: str, image_name: str, image_size, class_names, model):
    eval_image_path = eval_path / "Images_For_Model_Evaluation" / image_class / image_name
    eval_image = Image.open(eval_image_path).convert("RGB")
    transform = transforms.ToTensor()
    eval_image_tensor = transform(eval_image)
    
    eval_image_transform = transforms.Compose([transforms.Resize((image_size, image_size)),])
    eval_image_transformed = eval_image_transform(eval_image_tensor)

    model.eval()
    with torch.inference_mode():
        eval_image_logit = model(eval_image_transformed.unsqueeze(dim=0).to(device))

    eval_image_pred_prob = torch.sigmoid(eval_image_logit)
    eval_image_pred_label = (eval_image_pred_prob > 0.5).float()    
    pred_label_index = int(eval_image_pred_label.item())
    eval_image_class = class_names[pred_label_index]
    print(f"Predicted class: {eval_image_class}")

In [30]:
not_t_phase_eval_names = sorted(entry.name for entry in os.scandir(EVAL_PATH / "Images_For_Model_Evaluation" / not_t_phase_class)if entry.is_file() and entry.name.lower().endswith((".png", ".jpg", ".jpeg")))
t_phase_eval_names = sorted(entry.name for entry in os.scandir(EVAL_PATH / "Images_For_Model_Evaluation" / t_phase_class)if entry.is_file() and entry.name.lower().endswith((".png", ".jpg", ".jpeg")))

In [39]:
for image in not_t_phase_eval_names:
    image_eval_fn(EVAL_PATH, not_t_phase_class, image, IMAGE_SIZE, eval_class_names, model_BFO)

Predicted class: Not_T_Phase_BFO
Predicted class: Not_T_Phase_BFO
Predicted class: Not_T_Phase_BFO
Predicted class: Not_T_Phase_BFO
Predicted class: Not_T_Phase_BFO


In [40]:
for image in t_phase_eval_names:
    image_eval_fn(EVAL_PATH, t_phase_class, image, IMAGE_SIZE, eval_class_names, model_BFO)

Predicted class: T_Phase_BFO
Predicted class: T_Phase_BFO
Predicted class: T_Phase_BFO
Predicted class: T_Phase_BFO
Predicted class: T_Phase_BFO
