In [1]:
from tqdm import tqdm
import torch
import pandas as pd
import numpy as np
import os
import glob
import inspect
import importlib
import argparse
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2'


In [2]:
#Data functions, allow to load the data and targets, transform into a pytorch dataset
import torch
import cv2
import albumentations as A


class IMAGE_DATASET:
    """
    Torch-style dataset for image classification or regression tasks

    Parameters:
    -----------
    image_path: list
        list of path to images like: data/project_folder/image_id001.jpg
    resize: tuple
        tuple of integer giving the height and widthfor the resize: (width, height)
    label: list or None
        list of labels indicating the class to predict for a given image - None if using test images
    transforms: albumentation Compose object
        albumentation list of transforms you wish to apply to your images

    Returns:
    --------
    dictionnary including images and labels as tensors ready for training
    or just images if labels were not included
    """

    def __init__(self, image_path, resize=None, label=None, transforms=None):
        self.image_path = image_path
        self.resize = resize
        self.label = label
        self.transforms = transforms

    #RETURN THE LENGHT OF THE DATASET
    def __len__(self):
        return len(self.image_path)

    def __getitem__(self, item):
        #LOADING IMAGES
        image = cv2.imread(self.image_path[item])
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)

        #APPLYING DATA AUGMENTATIONS TO IMAGE DATA
        if self.transforms is not None:
            #Adding resize transforms to the augmentation list if provided
            if self.resize is not None:
                A.Compose(
                    [
                        self.transforms,
                        A.Resize(self.resize[0], self.resize[1], p=1)
                    ],
                    p=1.0,
                )
            augmented = self.transforms(image=image)
            image = augmented["image"]

        if self.label is not None:
            label = self.label[item]
            return {
                "images": image,
                "labels": torch.tensor(label, dtype=torch.long)
            }
        else:
            return {
                "images": image
            }

In [3]:
import numpy as np
from sklearn import metrics

import warnings
warnings.filterwarnings("ignore")

class AverageMeter:
    """
    Computes and stores the average and current value
    """

    def __init__(self):
        self.reset()

    def reset(self):
        self.val = 0
        self.avg = 0
        self.sum = 0
        self.count = 0

    def update(self, val, n=1):
        self.val = val
        self.sum += val * n
        self.count += n
        self.avg = self.sum / self.count

class MetricsMeter:
    def __init__(self, task):
        self.task = task
        self.metrics_dict = {}

        if self.task == "REGRESSION":
            regression_metrics = ["mae", "mse", "rmse"]
            for metric in regression_metrics:
                self.metrics_dict.update({metric: AverageMeter()})
        elif self.task == "CLASSIFICATION":
            classification_metrics = ["accuracy", "auc", "precision", "recall", "f1_score"]
            for metric in classification_metrics:
                self.metrics_dict.update({metric: AverageMeter()})

    def compute_metrics(self, predictions, labels, n_class):
        if self.task == "REGRESSION":
            mae = metrics.mean_absolute_error(labels, predictions["preds"])
            self.metrics_dict["mae"].update(mae, len(labels))
            mse = metrics.mean_squared_error(labels, predictions["preds"])
            self.metrics_dict["mse"].update(mse, len(labels))
            rmse = np.sqrt(mse)
            self.metrics_dict["rmse"].update(rmse, len(labels))
            return self.metrics_dict
            
        elif self.task == "CLASSIFICATION":
            accuracy = metrics.accuracy_score(labels, predictions["preds"])
            self.metrics_dict["accuracy"].update(accuracy, len(labels))
            precision = metrics.precision_score(labels, predictions["preds"])
            self.metrics_dict["precision"].update(precision, len(labels))
            recall = metrics.recall_score(labels, predictions["preds"])
            self.metrics_dict["recall"].update(recall, len(labels))
            f1_score = metrics.f1_score(labels, predictions["preds"])
            self.metrics_dict["f1_score"].update(f1_score, len(labels))
            auc = None
            if n_class == 2:
                auc = metrics.roc_auc_score(labels, predictions["preds_score"])
                self.metrics_dict["auc"].update(auc, len(labels))
            return self.metrics_dict
        else:
            raise Exception("task not supported")

In [4]:
def process_outputs(task, outputs, labels=None):
    """
    Take raw tensor model output in and return processed numpy array

    Parameters
    ----------
    task: str
        CLASSIFICATION OR REGRESSION
    outputs: torch tensor
        raw tensor model outputs
    labels:
        tensor of labels

    Return
    ------
    preds: dict
        dictionnary of predictions (and prediction scores - probability like)
    labels: numpy array
        numpy array of labels
    """

    if task == "REGRESSION":
        preds = outputs.cpu().detach().numpy()
        if labels is not None:
            labels = labels.cpu().detach().numpy()
        return {"preds": preds, }, labels

    elif task == "CLASSIFICATION":
        preds = outputs.argmax(axis=1).cpu().detach().numpy()
        preds_score = outputs.softmax(dim=1)[:, 1].cpu().detach().numpy()
        if labels is not None:
            labels = labels.cpu().detach().numpy()

        return {"preds": preds, "preds_score": preds_score, }, labels
    else:
        raise Exception("task not supported")

In [5]:
##################
# IMPORT MODULES #
##################
import torch
import numpy as np
from tqdm import tqdm

import warnings
warnings.filterwarnings("ignore")
#################
# TRAINER CLASS #
#################


class TRAINER:
    """
    """

    def __init__(self, model, task, device, optimizer=None, criterion=None, n_class=2):
        self.model = model
        self.task = task
        self.device = device
        self.optimizer = optimizer
        self.criterion = criterion
        self.n_class = n_class

    #################
    # TRAINING STEP #
    #################
    def training_step(self, data_loader):
        """
        train the given model for one epoch using the training dataloader

        Parameters
        ----------
        data_loader: torch dataloader object
            the training dataloader on which we wish to train the model
           
        Returns
        -------
        self: object
            trained model
        """
        # LOSS AVERAGE
        losses = AverageMeter()
        metrics_meter = MetricsMeter(task=self.task)
        # MODEL TO TRAIN MODE
        self.model.train()
        # TRAINING LOOP
        tk0 = tqdm(data_loader, total=len(data_loader))
        for _, data in enumerate(tk0):
            # LOADING IMAGES & LABELS
            images = data["images"].to(self.device)
            labels = data["labels"].to(self.device)
            # RESET GRADIENTS
            self.model.zero_grad()
            # CALCULATE LOSS
            output = self.model(images)
            loss = self.criterion(output, labels)
            # CALCULATE GRADIENTS
            loss.backward()
            self.optimizer.step()

            #COMPUTE METRICS
            train_preds, labels = process_outputs(self.task, output, labels)
            train_metrics = metrics_meter.compute_metrics(
                train_preds, labels, n_class=self.n_class)

            # UPDATE LOSS
            losses.update(loss.item(), images.size(0))
            tk0.set_postfix(loss=losses.avg)
        return losses.avg, train_metrics

    ###################
    # VALIDATION STEP #
    ###################
    def validation_step(self, data_loader):
        """
        Validate the trained model on the validation loader and compute evaluation metric

        Parameters
        ----------
        data_loader: torch dataloader object
            the validation dataloader we use to evaluate current model performance
        metric: metric from utils.metric.metric_dict (sklearn function)
            the chosen metric we use to evaluate model performance
        Returns
        -------
        loss: float
            model current validation loss
        metrics_avg.avg: float
            model current performance using metric chosen
        """
        # LOSS & METRIC AVERAGE
        losses = AverageMeter()
        metrics_meter = MetricsMeter(task=self.task)
        # MODEL TO EVAL MODE
        self.model.eval()
        # VALIDATION LOOP
        with torch.no_grad():
            tk0 = tqdm(data_loader, total=len(data_loader))
            for _, data in enumerate(tk0):
                # LOADING IMAGES & LABELS
                images = data["images"].to(self.device)
                labels = data["labels"].to(self.device)

                # CALCULATE LOSS & METRICS
                output = self.model(images)
                loss = self.criterion(output, labels)

                #COMPUTE METRICS
                valid_preds, labels = process_outputs(
                    self.task, output, labels)
                valid_metrics = metrics_meter.compute_metrics(
                    valid_preds, labels, n_class=self.n_class)

                losses.update(loss.item(), images.size(0))
                tk0.set_postfix(loss=losses.avg)
        print(f"Validation Loss = {losses.avg}")
        return losses.avg, valid_metrics

    def test_step(self, data_loader):
        """
        test a trained model on a testloader and output predictions

        Parameters:
        -----------
        data_loader: torch dataloader object
            test dataloader
        n_class:
            number of different class in your dataset
        Returns:
        --------
        model_preds: list
            list of model predictions on the test dataset.
        """
        # DATA LOADER LOOP
        model_preds = []
        with torch.no_grad():
            tk0 = tqdm(data_loader, total=len(data_loader))
            for _, data in enumerate(tk0):
                # LOADING IMAGES
                images = data["images"].to(self.device)
                # PREDICT
                preds = self.model(images)
                test_preds, _ = process_outputs(self.task, preds, labels=None)
                model_preds.extend(test_preds["preds"])
            tk0.set_postfix(stage="test")
        return model_preds


In [6]:
import torch
import torch.nn as nn
from torchvision import models


class RESNET18(nn.Module):
    def __init__(self, n_class=2, pretrain=True):
        super(RESNET18, self).__init__()

        self.base_model = models.resnet18(pretrained=pretrain)
        in_features = self.base_model.fc.out_features
        #self.nb_features = self.base_model.fc.in_features
        self.l0 = nn.Linear(in_features, n_class)

    def forward(self, image):
        x = self.base_model(image)
        out = self.l0(x)
        return out


In [7]:
config = {
    "test_path": "D:/Documents/GitHub/image_pipeline/data/aerial-cactus-identification/test/",
    "weight_path": "D:/Documents/GitHub/image_pipeline/projects/AERIAL_CACTUS/model_output/",
    "device": torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu"),
    "split" : False
}


In [8]:
image_path = []
for filename in glob.glob(os.path.join(config["test_path"], "*.jpg")):
    filename = filename.split("\\", -1)[-1]
    image_path.append(filename)
df_test = pd.DataFrame({"id": image_path})
df_test.head()

Unnamed: 0,id
0,000940378805c44108d287872b2f04ce.jpg
1,0017242f54ececa4512b4d7937d1e21e.jpg
2,001ee6d8564003107853118ab87df407.jpg
3,002e175c3c1e060769475f52182583d0.jpg
4,0036e44a7e8f7218e9bc7bf8137e4943.jpg


In [9]:
import albumentations as A
from albumentations.pytorch import ToTensorV2
#The mean and std I use are the values from the ImageNet dataset
#The augmentations are used to make training harder and more robust to novel situations.
#We don't use augment on the validation set other than normalization to try and estimate the real power of the model in the wild.
Augmentations = {
    'train':
        A.Compose(
            [
                A.HueSaturationValue(
                    hue_shift_limit=0.2, sat_shift_limit=0.2, val_shift_limit=0.2, p=0.5),
                A.RandomBrightnessContrast(
                    brightness_limit=(-0.1, 0.1), contrast_limit=(-0.1, 0.1), p=0.5),
                A.Normalize(mean=[0.485, 0.456, 0.406], std=[
                            0.229, 0.224, 0.225], max_pixel_value=255.0, p=1.0,),
                ToTensorV2()
            ],
        ),
    'valid':
        A.Compose(
            [
                A.Normalize(mean=[0.485, 0.456, 0.406], std=[
                            0.229, 0.224, 0.225], max_pixel_value=255.0, p=1.0,),
                ToTensorV2()
            ],
        ),
    'test':
        A.Compose(
            [
                A.Normalize(mean=[0.485, 0.456, 0.406], std=[
                            0.229, 0.224, 0.225], max_pixel_value=255.0, p=1.0,),
                ToTensorV2()
            ],
        )
}

In [17]:
Augments = A.Compose(
    A.Compose(
        [
            Augmentations["train"],
            A.Resize(64, 64, p=1)
        ],
        p=1.0,
    )
)
Augments

Compose([
  Compose([
    HueSaturationValue(always_apply=False, p=0.5, hue_shift_limit=(-0.2, 0.2), sat_shift_limit=(-0.2, 0.2), val_shift_limit=(-0.2, 0.2)),
    RandomBrightnessContrast(always_apply=False, p=0.5, brightness_limit=(-0.1, 0.1), contrast_limit=(-0.1, 0.1), brightness_by_max=True),
    Normalize(always_apply=False, p=1.0, mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225], max_pixel_value=255.0),
    ToTensorV2(always_apply=True, p=1.0, transpose_mask=False),
  ], p=1.0, bbox_params=None, keypoint_params=None, additional_targets={}),
  Resize(always_apply=False, p=1, height=64, width=64, interpolation=1),
], p=1.0, bbox_params=None, keypoint_params=None, additional_targets={})

In [10]:
model = RESNET18(n_class=2, pretrain=False)
model.to(config["device"])

RESNET18(
  (base_model): ResNet(
    (conv1): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (relu): ReLU(inplace=True)
    (maxpool): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (layer1): Sequential(
      (0): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
      )
      (1): BasicBlock(
        (conv1): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track

In [11]:
test_img = df_test["id"].values.tolist()
test_img = [os.path.join(config["test_path"], os.path.splitext(i)[0] + ".jpg") for i in test_img]

In [15]:
# TEST DATASET
test_ds = IMAGE_DATASET(
    image_path=test_img,
    label=None,
    resize=(32,32),
    transforms=Augments
)
# TEST DATALOADER
test_loader = torch.utils.data.DataLoader(
    test_ds,
    batch_size=16,
    shuffle=False,
    num_workers=0
)

In [16]:
final_preds = []
for fold in range(max(5, 1)):
    print(f"Starting predictions for fold  : {fold}")
    # LOAD MODEL WITH FOLD WEIGHTS
    model_weights = torch.load(f"D:/Documents/GitHub/image_pipeline/projects/AERIAL_CACTUS/model_output/model_RESNET18_2021-05-25_{fold}.bin")
    model.load_state_dict(model_weights)
    model.eval()

    trainer = TRAINER(model=model, task="CLASSIFICATION",
                        device=config["device"], n_class=2)
    # DATA LOADER LOOP
    predictions = trainer.test_step(data_loader=test_loader)
    final_preds.append(predictions)

    if config["split"] is True:
        break

final_preds = np.mean(np.column_stack(final_preds), axis=1)
# CONDITIONAL SUBMISSION FILE DEPENDING IF WE HAVE A TEST FILE OR NOT
test_final_data = {"id": df_test["id"].values.tolist(), "has_cactus": final_preds}
test_df = pd.DataFrame(data=test_final_data, index=None)
test_df.to_csv(os.path.join("D:/Documents/GitHub/image_pipeline/projects/AERIAL_CACTUS/", "preds.csv"))


Starting predictions for fold  : 0


  0%|          | 0/250 [00:00<?, ?it/s]


error: OpenCV(4.5.2) :-1: error: (-5:Bad argument) in function 'resize'
> Overload resolution failed:
>  - src is not a numpy array, neither a scalar
>  - Expected Ptr<cv::UMat> for argument 'src'
