# Regression of leaf water content from holly leaf linear polarized images

## Setup environment

In [None]:
import os


try:
  from google import colab
  colab.drive.mount("/content/gdrive")
  %pip install -qU neptune
  DRIVE_ROOT = "/content/gdrive/My Drive/"
except ModuleNotFoundError:
  DRIVE_ROOT = "H:\\我的云端硬盘\\"

PROJECT_NAME = "cnn-hllpi-lwc-regression"
PROJECT_ROOT = os.path.join(DRIVE_ROOT, "Repositories", PROJECT_NAME)
print(f"Project root at:\n  {PROJECT_ROOT}")

In [None]:
from typing import Any, Callable, Dict, List, Optional, Tuple, Union

import neptune
import pandas as pd
import torch
import tqdm
from matplotlib import pyplot as plt
from neptune import types
from PIL import Image, ImageOps
from torch import cuda, nn, optim
from torch.backends import cudnn
from torch.optim import lr_scheduler
from torch.utils import data
from torchvision import datasets, models, transforms, utils

In [None]:
def set_random_seed(seed: int) -> torch.Generator:
  generator = torch.manual_seed(seed)
  torch.cuda.manual_seed(seed)
  torch.cuda.manual_seed_all(seed)
  print(f"Random seed set to {seed}")
  return generator


def get_device() -> torch.device:
  device = torch.device("cuda") if cuda.is_available() else torch.device("cpu")
  print(f"Using device: {device}")
  return device


_ = set_random_seed(42)  # Ignore the torch generator
device = get_device()

cudnn.deterministic = True
cudnn.benchmark = False

In [None]:
NEPTUNE_USER_NAME = ""
NEPTUNE_API_KEY = ""

neptune_logger = neptune.init_run(
  project=f"{NEPTUNE_USER_NAME}/{PROJECT_NAME}",
  api_token=f"{NEPTUNE_API_KEY}",
  source_files="hllpi_lwc_regression.ipynb",
  git_ref=types.GitRef(f"{PROJECT_ROOT}")
)

---

## Prepare data

In [None]:
def remove_surrounding_border(image: Image.Image) -> Image.Image:
  crop_box = (0, 30, image.width, image.height)
  image = image.crop(crop_box)
  invert_image = ImageOps.invert(image)
  crop_box =  invert_image.getbbox()
  image = image.crop(crop_box)
  crop_box = (10, 10, image.width - 10, image.height - 10)
  image = image.crop(crop_box)
  return image


class HollyLeafPolarizationImages(datasets.VisionDataset):
  def __init__(
      self,
      root: str,
      train: bool = True,
      transform: Optional[Callable] = None,
      target_transform: Optional[Callable] = None,
    ) -> None:
    super().__init__(
      root, transform=transform, target_transform=target_transform,
    )
    self.train = train
    self.images = []
    self.targets = []
    self._load_data()

  def _load_data(self) -> None:
    meta_folder = os.path.join(self.root, "meta")
    if self.train:
      meta_file = os.path.join(meta_folder, "train.csv")
      image_folder = os.path.join(self.root, "train")
    else:
      meta_file = os.path.join(meta_folder, "test.csv")
      image_folder = os.path.join(self.root, "test")
    meta = pd.read_csv(meta_file)
    for image_name, target in zip(meta.iloc[:, 0], meta.iloc[:, 1]):
      if not image_name.endswith(datasets.folder.IMG_EXTENSIONS):
        image_name += ".png"
      image_path = os.path.join(image_folder, image_name)
      image = Image.open(image_path).convert("RGB")

      image = remove_surrounding_border(image)
      target = torch.tensor(target, dtype=torch.float32)

      self.images.append(image)
      self.targets.append(target)

  def __len__(self) -> int:
    return len(self.images)

  def __getitem__(self, index: int) -> Tuple[Any, Any]:
    image, target = self.images[index], self.targets[index]
    if self.transform is not None:
      image = self.transform(image)
    if self.target_transform is not None:
      target = self.target_transform(target)
    return image, target

  def extra_repr(self) -> str:
    return f"Is training dataset: {self.train}"

In [None]:
def compute_std_mean(
    data_dir: str,
    transform: Callable) -> Tuple[torch.Tensor, torch.Tensor]:
  dataset = HollyLeafPolarizationImages(
    data_dir, train=True, transform=transform)
  tensor_images = torch.cat([image for image, _ in dataset], dim=1)
  return torch.std_mean(tensor_images, dim=(1, 2))


def prepare_dataloaders(
    data_dir: str,
    train_transform: Callable,
    test_transform: Callable,
    batch_sizes: Union[int, List[int]],
    num_workers: Union[int, List[int]],
    train_split_ratio: float,
  ) -> Dict[str, data.DataLoader]:
  if isinstance(batch_sizes, int):
    batch_sizes = [batch_sizes] * 3
  if isinstance(num_workers, int):
    num_workers = [num_workers] * 3

  train_dataset = HollyLeafPolarizationImages(
    data_dir, train=True, transform=train_transform)
  print(train_dataset)

  train_length = int(len(train_dataset) * train_split_ratio)
  val_length = len(train_dataset) - train_length
  train_dataset, val_dataset = data.random_split(
    train_dataset, [train_length, val_length])

  train_dataloader = data.DataLoader(
    train_dataset, batch_size=batch_sizes[0], shuffle=True,
    num_workers=num_workers[0], pin_memory=True, drop_last=True)
  val_dataloader = data.DataLoader(
    val_dataset, batch_size=batch_sizes[1],
    shuffle=False, num_workers=num_workers[1])

  test_dataset = HollyLeafPolarizationImages(
    data_dir, train=False, transform=test_transform)
  print(test_dataset)

  test_dataloader = data.DataLoader(
    test_dataset, batch_size=batch_sizes[2],
    shuffle=False, num_workers=num_workers[2])

  return {"train": train_dataloader,
          "val": val_dataloader,
          "test": test_dataloader}


def check_dataloaders(dataloaders: Dict[str, data.DataLoader]) -> None:
  for iterator in map(iter, dataloaders.values()):
    images, targets = next(iterator)
    grid = utils.make_grid(images, nrow=10)
    plt.imshow(grid.permute(1, 2, 0))
    plt.axis("off")
    plt.show()
    print(f"Targets: {targets}")

In [None]:
INPUT_IMAGE_SIZE = (512, 512)
DATASET_ROOT_DIR = os.path.join(PROJECT_ROOT, "data")

transform = transforms.Compose([
  transforms.RandomHorizontalFlip(),
  transforms.RandomVerticalFlip(),
  transforms.RandomRotation(180),
  transforms.Resize(INPUT_IMAGE_SIZE),
  transforms.ToTensor(),
])

std, mean = compute_std_mean(DATASET_ROOT_DIR, transform)
print(f"std: {std}\nmean: {mean}")

train_transform = transforms.Compose([
  transforms.RandomHorizontalFlip(),
  transforms.RandomVerticalFlip(),
  transforms.RandomRotation(180),
  transforms.Resize(INPUT_IMAGE_SIZE),
  transforms.ToTensor(),
  transforms.Normalize(mean, std),
])

test_transform = transforms.Compose([
  transforms.Resize(INPUT_IMAGE_SIZE),
  transforms.ToTensor(),
  transforms.Normalize(mean, std),
])

dataloaders = prepare_dataloaders(
  DATASET_ROOT_DIR,
  train_transform,
  test_transform,
  batch_sizes=[20, 5, 5],
  num_workers=0,
  train_split_ratio=0.8,
)

check_dataloaders(dataloaders)

---

## Config experiment procedures

In [None]:
supported_items = {
  "model": {
  },
  "loss_function": {
    "mse": nn.MSELoss,
    "l1": nn.L1Loss,
    "smooth_l1": nn.SmoothL1Loss,
    "huber": nn.HuberLoss,
    "cross_entropy": nn.CrossEntropyLoss,
  },
  "optimizer": {
    "adam": optim.Adam,
    "sgd": optim.SGD,
  },
  "scheduler": {
    "step_lr": lr_scheduler.StepLR,
    "multi_step_lr": lr_scheduler.MultiStepLR,
  },
}


def config_forward_propagate(
    device: torch.device,
    *,
    model_name: str,
    model_params: Dict[str, Any],
    loss_function_name: str,
    loss_function_params: Dict[str, Any],
  ) -> Tuple[nn.Module, nn.Module]:
  model_cls = supported_items["model"][model_name]
  model = model_cls(**model_params)
  neptune_logger["model/summary"] = str(model)
  loss_function_cls = supported_items["loss_function"][loss_function_name]
  loss_function = loss_function_cls(**loss_function_params)
  return model.to(device), loss_function.to(device)


def config_backward_propagate(
    model: nn.Module,
    *,
    optimizer_name: str,
    optimizer_params: Dict[str, Any],
    scheduler_name: str,
    scheduler_params: Dict[str, Any],
  ) -> Tuple[optim.Optimizer, lr_scheduler._LRScheduler]:
  optimizer_cls = supported_items["optimizer"][optimizer_name]
  optimizer = optimizer_cls(model.parameters(), **optimizer_params)
  scheduler_cls = supported_items["scheduler"][scheduler_name]
  scheduler = scheduler_cls(optimizer, **scheduler_params)
  return optimizer, scheduler

In [None]:
MODEL_ARCHIVE_DIR = os.path.join(PROJECT_ROOT, "archive")
os.makedirs(MODEL_ARCHIVE_DIR, exist_ok=True)


def save_model_archive(model: nn.Module, archive_name: str) -> None:
  if not archive_name.endswith(".pth"):
    archive_name += ".pth"
  save_path = os.path.join(MODEL_ARCHIVE_DIR, archive_name)
  torch.save(model.state_dict(), save_path)
  neptune_logger["model/archive"].upload(save_path)


def load_model_archive(model: nn.Module, archive_name: str) -> None:
  if not archive_name.endswith(".pth"):
    archive_name += ".pth"
  load_path = os.path.join(MODEL_ARCHIVE_DIR, archive_name)
  model.load_state_dict(torch.load(load_path))


def remove_model_archive(archive_name: str) -> None:
  if not archive_name.endswith(".pth"):
    archive_name += ".pth"
  remove_path = os.path.join(MODEL_ARCHIVE_DIR, archive_name)
  os.remove(remove_path)

In [None]:
def train_epoch(
    device: torch.device,
    model: nn.Module,
    loss_function: nn.Module,
    optimizer: optim.Optimizer,
    scheduler: lr_scheduler._LRScheduler,
    dataloader: data.DataLoader,
  ) -> None:
  model.train()
  epoch_loss = 0.0
  for images, targets in dataloader:
    images, targets = images.to(device), targets.to(device)
    optimizer.zero_grad()
    outputs = model(images)
    loss = loss_function(outputs, targets)
    epoch_loss += loss.item()
    loss.backward()
    optimizer.step()
  scheduler.step()
  epoch_loss /= len(dataloader.dataset)
  neptune_logger["metrics/train_loss"].append(epoch_loss)


def validate_epoch(
    device: torch.device,
    model: nn.Module,
    loss_function: nn.Module,
    dataloader: data.DataLoader,
  ) -> float:
  model.eval()
  epoch_loss = 0.0
  with torch.no_grad():
    for images, targets in dataloader:
      images, targets = images.to(device), targets.to(device)
      outputs = model(images)
      loss = loss_function(outputs, targets)
      epoch_loss += loss.item()
  epoch_loss /= len(dataloader.dataset)
  neptune_logger["metrics/val_loss"].append(epoch_loss)
  return epoch_loss

In [None]:
def fit_model(
    max_epochs: int,
    device: torch.device,
    forward_params: Dict[str, Any],
    backward_params: Dict[str, Any],
    dataloaders: Dict[str, data.DataLoader],
    validate_frequency: int,
  ) -> str:
  model, loss_function = config_forward_propagate(device, **forward_params)
  optimizer, scheduler = config_backward_propagate(model, **backward_params)
  best_loss = float("inf")
  for epoch in tqdm.tqdm(range(max_epochs)):
    train_epoch(
      device, model, loss_function, optimizer, scheduler, dataloaders["train"])
    if (epoch + 1) % validate_frequency == 0 or epoch + 1 == max_epochs:
      current_loss = validate_epoch(device, model, loss_function, dataloaders["val"])
      if current_loss < best_loss:
        best_loss = current_loss
        if epoch + 1 != validate_frequency:
          remove_model_archive(archive_name)
        archive_name = (f"{forward_params['model_name']}"
                     f"_step{epoch}_loss{current_loss:.4f}.pth")
        save_model_archive(model, archive_name)
  print(f"\nBest model saved as {archive_name}.")
  return archive_name


def test_model(
    archive_name: str,
    device: torch.device,
    dataloaders: Dict[str, data.DataLoader],
    forward_params: Dict[str, Any],
  ) -> None:
  model, loss_function = config_forward_propagate(device, **forward_params)
  load_model_archive(model, archive_name)
  model.eval()
  epoch_loss = 0.0
  epoch_mae = 0.0
  with torch.no_grad():
    target, prediction = [], []
    for images, targets in tqdm.tqdm(dataloaders["test"]):
      images, targets = images.to(device), targets.to(device)
      target.extend(targets.cpu().tolist())
      outputs = model(images)
      prediction.extend(outputs.cpu().tolist())
      loss = loss_function(outputs, targets)
      mae = torch.abs(outputs - targets).mean()
      epoch_loss += loss.item()
      epoch_mae += mae.item()
  for value in target:
    neptune_logger["metrics/test_target"].append(value)
  for value in prediction:
    neptune_logger["metrics/test_prediction"].append(value)
  epoch_loss /= len(dataloaders["test"].dataset)
  neptune_logger["metrics/test_loss"] = epoch_loss
  epoch_mae /= len(dataloaders["test"].dataset)
  neptune_logger["metrics/test_mae"] = epoch_mae
  print(f"\nLoss = {epoch_loss:.4f}, MAE = {epoch_mae:.4f}")

---

## Define models

In [None]:
class AlexNet(nn.Module):
  def __init__(self) -> None:
    super().__init__()
    self.bulit_in_alexnet = models.AlexNet(num_classes=1)

  def forward(self, images) -> torch.Tensor:
    output = self.bulit_in_alexnet(images)
    return output.reshape(-1)


supported_items["model"].setdefault("alexnet", AlexNet)


class DenseNet(nn.Module):
  def __init__(self) -> None:
    super().__init__()
    self.bulit_in_densenet = models.DenseNet(num_classes=1)

  def forward(self, images) -> torch.Tensor:
    output = self.bulit_in_densenet(images)
    return output.reshape(-1)


supported_items["model"].setdefault("densenet", DenseNet)

In [None]:
class CustomVggRegressor(nn.Module):
  def __init__(self) -> None:
    super().__init__()
    # input: B x 3 x 512 x 512
    self.vgg_block1 = nn.Sequential(
      nn.Conv2d(3, 16, 4, 2),
      nn.BatchNorm2d(16),
      nn.ReLU(),
      nn.Conv2d(16, 16, 4, 2),
      nn.BatchNorm2d(16),
      nn.Dropout2d(0.1),
      nn.ReLU(),
      nn.AvgPool2d(2),
    )
    self.vgg_block2 = nn.Sequential(
      nn.Conv2d(16, 32, 4, 2),
      nn.BatchNorm2d(32),
      nn.ReLU(),
      nn.Conv2d(32, 32, 4, 2),
      nn.BatchNorm2d(32),
      nn.Dropout2d(0.1),
      nn.ReLU(),
      nn.AvgPool2d(2),
    )
    self.vgg_block3 = nn.Sequential(
      nn.Conv2d(32, 64, 2),
      nn.BatchNorm2d(64),
      nn.ReLU(),
      nn.Conv2d(64, 64, 2),
      nn.BatchNorm2d(64),
      nn.Dropout2d(0.1),
      nn.ReLU(),
      nn.AvgPool2d(2),
    )
    self.convolutional_layers = nn.Sequential(
      self.vgg_block1,
      self.vgg_block2,
      self.vgg_block3,
    )
    self.flatten_layer = nn.Flatten()
    self.full_connection_layers = nn.Sequential(
      nn.Dropout(0.4),
      nn.Linear(256, 16),
      nn.Dropout(0.5),
      nn.ReLU(),
      nn.Linear(16, 1),
      nn.ReLU(),
    )

  def forward(self, image_batch: torch.Tensor) -> torch.Tensor:
    feature_map = self.convolutional_layers(image_batch)
    feature_vector = self.flatten_layer(feature_map)
    return self.full_connection_layers(feature_vector).reshape(-1)


supported_items["model"].setdefault("custom_vgg", CustomVggRegressor)

---

## Execute experiments

In [None]:
if neptune_logger._with_id is not None:
  del neptune_logger["metrics"]
  del neptune_logger["model"]
  del neptune_logger["parameters"]

forward_params = {
  "model_name": "alexnet",
  "model_params": {
  },
  "loss_function_name": "mse",
  "loss_function_params": {
  },
}

backward_params = {
  "optimizer_name": "adam",
  "optimizer_params": {
    "lr": 0.01,
    "weight_decay": 0.0005,
  },
  "scheduler_name": "step_lr",
  "scheduler_params": {
    "step_size": 30,
    "gamma": 0.1,
  },
}

neptune_logger["parameters/forward"] = forward_params
neptune_logger["parameters/backward"] = backward_params

In [None]:
best_model_archive = fit_model(
  max_epochs=300,
  device=device,
  forward_params=forward_params,
  backward_params=backward_params,
  dataloaders=dataloaders,
  validate_frequency=10,
)

test_model(
  archive_name=best_model_archive,
  device=device,
  dataloaders=dataloaders,
  forward_params=forward_params,
)

---

## Stop logger

In [None]:
neptune_logger.stop()