In [1]:
# from google.colab import drive
# drive.mount('/content/drive')
# !unzip "/content/drive/MyDrive/DLF/species_classification.zip" -d "/content"
# !pip install git+https://github.com/PyTorchLightning/pytorch-lightning
# !pip install timm 
# #DOES GITHUB WORK?

In [2]:
"""
Author: Filipe Laitenberger
"""

from torch.utils.data import Dataset
from torch import Tensor
from os import walk
from os.path import join
from PIL.Image import open
from torchvision.transforms import PILToTensor, Compose, RandomHorizontalFlip, RandomVerticalFlip, ColorJitter
from typing import Tuple


class TreeSpeciesClassification(Dataset):
    def __init__(self, image_dir: str) -> None:
        """
        This dataset provides pictures of tree tops (from above) and their
        corresponding labels
        :param image_dir: the path to the folder that contains the dataset's images
        """
        super().__init__()

        # create an array that contains all image paths
        # in the dataset, by flattening the directory structure
        self.images = [join(path, name) for path, _, files in walk(image_dir) for name in files]

        # The images are transformed to tensors and augmented
        self.transform = Compose([
            # transform the image to a tensor
            PILToTensor(),

            # randomly flip the image horizontally, and vertically
            # we don't rotate the images since that would leave
            # some pixels black which could inhibit learning
            RandomHorizontalFlip(0.5),
            RandomVerticalFlip(0.5),

            # randomly change the brightness, contrast, and
            # saturation of the images
            ColorJitter(
                brightness=(0.5, 1.5),
                contrast=(0.5, 1.5),
                saturation=(0.5, 1.5)
            )
        ])

        self.labels = {
            'Bi': 'Birch',
            'Bu': 'Beech',
            'Dgl': 'Douglas fir',
            'Ei': 'Oak',
            'Eis': 'Damaged Oak',
            'Erl': 'Alder',
            'Fi': 'Spruce',
            'Ki': 'Pine',
            'La': 'Larch',
            'Sch': 'Shadow / background'
        }

    def load_image(self, index: int) -> Tensor:
        """
        Load the index'th image from the dataset. The images are traversed such that
        the image with index 0 is the first image from the first sub folder of the dataset,
        and if, for instance, the first folder contains 1000 images, then the index 1000
        corresponds to the first image of the second sub folder, and so on.
        The image is then transformed to a tensor and augmented (randomly flipped, and the
        contrast/brightness/saturation is randomly changed)
        :param index: the index of the image in the flattened file structure
        :return: the transformed image
        """
        image = open(self.images[index]).convert("RGB")

        return self.transform(image).float()

    def load_target(self, index: int) -> int:
        """
        Load the index of the target class
        :param index: the index of the image in the flattened
        folder structure
        :return: the target class's index
        """
        label = self.images[index].split('/')[-2]
        return list(self.labels.keys()).index(label)

    def __getitem__(self, index) -> Tuple[Tensor, int]:
        """
        Load a data point
        :param index: the index of the image in the flattened
        folder structure
        :return: a tuple containing the transformed image
        and the target class's index
        """
        return self.load_image(index), self.load_target(index)

    @property
    def num_classes(self) -> int:
        """
        Get the number of classes in the dataset
        :return: the number of classes (10)
        """
        return len(self.labels)

    def __len__(self) -> int:
        """
        Get the number of images in the dataset.
        This doesn't take into account that images will
        be randomly augmented (flipped + contrast/saturation/brightness changed),
        but those transforms are partly performed within continuous ranges,
        so that they will create infinitely many variations of the images.
        Hence, only the original number of images is returned here
        :return: the number of images in the dataset
        """
        return len(self.images)

In [3]:
"""
Author: Filipe Laitenberger
"""

from pytorch_lightning import LightningDataModule
from torch.utils.data import Dataset, random_split, Subset, DataLoader
from typing import Sequence, List




class TreeSpeciesClassificationDataModule(LightningDataModule):
    def __init__(self, batch_size: int = 32) -> None:
        """
        This dataloader splits the dataset into a training set, a validation set,
        and a test set, and provides data loaders for all of them
        :param batch_size: The size of each batch of image/target pairs
        used for stochastic gradient decent (or one of its variants)
        """
        super().__init__()

        self.batch_size = batch_size

        self.dataset = TreeSpeciesClassification(image_dir='content/species_classification')

        self.train, self.test, self.val = self.split_dataset(self.dataset)

    @property
    def num_classes(self):
        return self.dataset.num_classes

    @staticmethod
    def split_dataset(dataset: Dataset) -> List[Subset[Dataset]]:
        """
        Split a dataset into a training set, a validation set,
        and a test set
        :param dataset: the dataset that is to be split
        :return: a list of three subsets of the original dataset (train/test/val)
        """
        size = dataset.__len__()

        # get 70% for the train set
        train_size = int(size // 1.25)

        # 20% for test set
        test_size = int(size // 5)

        # get 10% for val set
        val_size = int(size - train_size - test_size)

        lengths: Sequence = [train_size, test_size, val_size]

        return random_split(dataset, lengths)

    def train_dataloader(self) -> DataLoader:
        """
        Get a data loader that shuffles and provides batches of the training set
        :return: the training data loader
        """
        return DataLoader(self.train, batch_size=self.batch_size, shuffle=True, num_workers=12)

    def val_dataloader(self) -> DataLoader:
        """
        Get a data loader that shuffles and provides batches of the validation set
        :return: the validation data loader
        """
        return DataLoader(self.val, batch_size=self.batch_size, num_workers=12)

    def test_dataloader(self) -> DataLoader:
        """
        Get a data loader that shuffles and provides batches of the test set
        :return: the test data loader
        """
        return DataLoader(self.test, batch_size=self.batch_size, num_workers=12)

In [4]:
"""
Author: Filipe Laitenberger
"""

from pytorch_lightning import LightningModule
from torch.optim import Adam, Optimizer
from torch.optim.lr_scheduler import ReduceLROnPlateau
from pytorch_lightning.utilities.types import LRSchedulerType
from torch import Tensor, save, load
from torch.nn.functional import cross_entropy
from os.path import isfile
from typing import Tuple, List
from torchmetrics import Accuracy


class TreeClassificationModel(LightningModule):
    def __init__(self, learning_rate: float = 0.05, filename: str = 'model.pt') -> None:
        """
        This class serves as an abstract class that implements methods
        both models in this study use, like the training/validation/testing steps
        and configurations options such as optimizers and schedulers
        :param learning_rate: The learning rate of the model
        :param filename: The filename under which it will be saved after each epoch
        """
        super().__init__()

        self.filename = filename
        self.learning_rate = learning_rate
        self.accuracy = Accuracy()

    def forward(self, x: Tensor) -> Tensor:
        pass

    def configure_optimizers(self) -> Tuple[List[Optimizer], List[LRSchedulerType]]:
        """
        This methods specifies the optimizer and learning rate scheduler.
        We use ADAM and a ReduceOnPlateau learning rate scheduler that
        multiplies the learning rate by 0.1 if the training loss doesn't
        decrease for three epochs
        :return: the optimizer and learning rate scheduler
        """
        optimizer = Adam(self.parameters(), lr=self.learning_rate)

        scheduler = {
            'scheduler': ReduceLROnPlateau(optimizer, patience=3),
            'monitor': 'train_loss'
        }

        return [optimizer], [scheduler]

    def training_step(self, batch: Tensor, _) -> Tensor:
        """
        A training step
        :param batch: the batch tensor
        :return: the loss of the batch under the current model
        """
        # get columns of batch
        images, targets = batch

        predicted = self.forward(images)
        loss = cross_entropy(predicted, targets)

        self.log('train_loss', loss)

        return loss

    def validation_step(self, batch: Tensor, _) -> Tensor:
        """
        A validation step
        :param batch: the batch tensor
        :return: the loss of the batch under the current model
        """
        # get columns of batch
        images, targets = batch

        predicted = self.forward(images)
        loss = cross_entropy(predicted, targets)
        accuracy = self.accuracy(predicted, targets)

        self.log('val_loss', loss)
        self.log('val_acc', accuracy)

        print('Validation Loss: ', loss)
        print('Validation Accuracy: ', accuracy)

        return loss

    def test_step(self, batch: Tensor, _) -> Tensor:
        """
        A test step
        :param batch: the batch tensor
        :return: the loss of the batch under the current model
        """
        # get columns of batch
        images, targets = batch

        predicted = self.forward(images)
        loss = cross_entropy(predicted, targets)
        accuracy = self.accuracy(predicted, targets)

        self.log('test_loss', loss)
        self.log('test_acc', accuracy)

        print('Test Loss: ', loss)
        print('Test Accuracy: ', accuracy)

        return loss

    def training_epoch_end(self, _) -> None:
        """
        At the end of each epoch we save the model
        """
        self.save()

    def save(self) -> None:
        """
        Save model under specified filename
        """
        print("Saving model at: " + self.filename)
        save(self.state_dict(), self.filename)

    def load(self) -> None:
        """
        Load model from the filename provided
        """
        if isfile(self.filename):
            print("Loading model from: " + self.filename)
            self.load_state_dict(load(self.filename))

In [5]:
"""
Author: Mohamed Gamil
        Thijs van der Laan
"""

import torch.nn as nn
from torch import Tensor




class LeNet5(TreeClassificationModel):
    def __init__(self, num_classes: int, learning_rate: float = 0.05, filename: str = 'model.pt') -> None:
        """
        This class implements the Lenet-5 model proposed by Yann LeCunn in 1998
        :param num_classes: The number of classes in the dataset
        :param learning_rate: The learning rate for performing gradient descent
        :param filename: The filename under which the model is saved after every epoch
        """
        super().__init__(learning_rate, filename)

        self.layer1 = nn.Sequential(
            nn.Conv2d(3, 6, kernel_size=(5, 5), stride=(1, 1), padding=0),
            nn.BatchNorm2d(6),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride = 2)
        )
        
        self.layer2 = nn.Sequential(
            nn.Conv2d(6, 16, kernel_size=(5, 5), stride=(1, 1), padding=0),
            nn.BatchNorm2d(16),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size = 2, stride = 2)
        )
        
        self.fc = nn.Linear(7744, 120)
        self.relu = nn.ReLU()

        self.layer3 = nn.Sequential(
            nn.Conv2d(120, 84, kernel_size=(5, 5), stride=(1, 1), padding=0),
            nn.ReLU(),
        )

        self.fc2 = nn.Linear(84, num_classes)

    def forward(self, x: Tensor) -> Tensor:
        """
        feed images through the model
        :param x: The images
        :return: A tensor of class probabilities
        """
        out = self.layer1(x)
        out = self.layer2(out)
        out = out.reshape(out.size(0), -1)
        out = self.fc(out)
        out = self.relu(out)
        out = self.layer3(out)
        out = self.fc2(out)

        return out

In [6]:
"""
Author: Filipe Laitenberger
"""

import timm
from torch.nn import Linear, Softmax




class Xception(TreeClassificationModel):
    def __init__(self, num_classes: int, learning_rate: float = 0.05, filename: str = 'model.pt') -> None:
        """
        This model is an image classifier inspired by the 'Xception' model
        :param num_classes: The number of classes in the dataset
        :param learning_rate: The learning rate for performing gradient descent
        :param filename: The filename under which the model is saved after every epoch
        """
        super().__init__(learning_rate, filename)

        # we use a predefined model 'Xception' as it is
        # one of the state of the art networks
        self.model = timm.create_model('xception')

        # change the last layer of the network to map to the
        # classes of our dataset
        self.model.fc = Linear(2048, num_classes)

        self.softmax = Softmax(dim=1)

    def forward(self, x):
        """
        feed images through the model
        :param x: The images
        :return: A tensor of class probabilities
        """
        return self.softmax(self.model(x))

In [7]:
"""
Author: Filipe Laitenberger
"""

from pytorch_lightning import Trainer, LightningModule, LightningDataModule
from torch import cuda



def run_model(model: LightningModule, data_module: LightningDataModule) -> None:
    """
    Run a model out of the two used in this experiment (xception or lenet5)
    and do some repetetive steps such as calculating the learning rate
    :param model: A pytorch-lightning model
    """
    # load the model if it exists already
    model.load()

    trainer = Trainer(
        max_epochs=100,
        # if GPUs are available, use all of them
        gpus=(-1 if cuda.is_available() else 0)
    )

    # calculate optimal learning rate
    # lr_finder = trainer.tuner.lr_find(model, datamodule=data_module)
    # model.learning_rate = lr_finder.suggestion()

    # train the network
    trainer.fit(model, data_module)

    # test the network
    trainer.test(model, data_module)


if __name__ == '__main__':
    """
    Run the experiment by training and testing the two models
    (Xception and LeNet-5)
    """

    data_module = TreeSpeciesClassificationDataModule()

    models = [
        Xception(num_classes=data_module.num_classes, filename='xception.pt'),
        LeNet5(num_classes=data_module.num_classes, filename='lenet5.pt')
    ]

    for model in models:
        run_model(model, data_module)

GPU available: False, used: False
TPU available: False, using: 0 TPU cores
IPU available: False, using: 0 IPUs

  | Name     | Type     | Params
--------------------------------------
0 | accuracy | Accuracy | 0     
1 | model    | Xception | 20.8 M
2 | softmax  | Softmax  | 0     
--------------------------------------
20.8 M    Trainable params
0         Non-trainable params
20.8 M    Total params
83.310    Total estimated model params size (MB)


Loading model from: xception.pt


HBox(children=(FloatProgress(value=1.0, bar_style='info', description='Validation sanity check', layout=Layout…

MisconfigurationException: Total length of `Dataloader` across ranks is zero. Please make sure that it returns at least 1 batch.