In [None]:
#!pip install timm

In [None]:
import pandas as pd
import numpy as np
import json
import gc
import os
import torch
from torch import nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, random_split
from torchvision import transforms
import pytorch_lightning as pl
import albumentations
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, utils
from albumentations.core.composition import Compose
from albumentations.pytorch import ToTensorV2
import torchvision
from typing import Any, Dict, List, Union, Optional
from pytorch_lightning.callbacks import ModelCheckpoint, EarlyStopping
from sklearn.model_selection import StratifiedKFold
import cv2
import torch
from efficientnet_pytorch import EfficientNet
import numpy as np
import timm
from PIL import Image
from PIL import ImageFile

In [None]:
import pytorch_lightning
pytorch_lightning.__version__

In [None]:
pytorch_lightning.utilities.seed.seed_everything(seed=42)

In [None]:
torch.__version__

In [None]:
base_dir = r'C:\Users\Kaggle\Leaf_Classification\input'

In [None]:
### read csv and label mapping

labels = pd.read_csv(os.path.join(base_dir,'train.csv'))

with open(f'{base_dir}/label_num_to_disease_map.json') as f:
    label_mapper = json.load(f)

#### Dataset definition

In [None]:
class ImageClassificationDataset(Dataset):
    def __init__(
        self,
        image_names: List,
        transforms: Compose,
        labels: Optional[List[int]],
        img_path: str = '',
        mode: str = 'train',
        labels_to_ohe: bool = False,
        n_classes: int = 5,
    ):
        """
        Image classification dataset.

        Args:
            df: dataframe with image id and bboxes
            mode: train/val/test
            img_path: path to images
            transforms: albumentations
        """

        self.mode = mode
        self.transforms = transforms
        self.img_path = img_path
        self.image_names = image_names
        if labels is not None:
            if not labels_to_ohe:
                self.labels = np.array(labels)
            else:
                self.labels = np.zeros((len(labels), n_classes))
                self.labels[np.arange(len(labels)), np.array(labels)] = 1

    def __getitem__(self, idx: int) -> Dict[str, np.array]:
        image_path = self.img_path + self.image_names[idx]
        image = cv2.imread(f'{image_path}', cv2.IMREAD_COLOR)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        if image is None:
            raise FileNotFoundError(image_path)
        target = self.labels[idx]

        img = self.transforms(image=image)['image']
        sample = {'image_path': image_path, 'image': img, 'target': np.array(target).astype('int64')}

        return sample

    def __len__(self) -> int:
        return len(self.image_names)


### Define augmentations

In [None]:
sz = 576
sz

In [None]:
# augmentations taken from: https://www.kaggle.com/khyeh0719/pytorch-efficientnet-baseline-train-amp-aug
train_augs = albumentations.Compose([
            albumentations.RandomResizedCrop(sz, sz),
            albumentations.Transpose(p=0.5),
            albumentations.HorizontalFlip(p=0.5),
            albumentations.VerticalFlip(p=0.5),
            albumentations.ShiftScaleRotate(p=0.5),
            albumentations.HueSaturationValue(
                hue_shift_limit=0.2, 
                sat_shift_limit=0.2, 
                val_shift_limit=0.2, 
                p=0.5
            ),
            albumentations.RandomBrightnessContrast(
                brightness_limit=(-0.1,0.1), 
                contrast_limit=(-0.1, 0.1), 
                p=0.5
            ),
            albumentations.Normalize(
                mean=[0.485, 0.456, 0.406], 
                std=[0.229, 0.224, 0.225], 
                max_pixel_value=255.0, 
                p=1.0
            ),
            albumentations.CoarseDropout(p=0.5),
            albumentations.Cutout(p=0.5), 
            ToTensorV2()],
            p=1.)
  
        
valid_augs = albumentations.Compose([
            albumentations.CenterCrop(sz, sz, p=1.),
            albumentations.Resize(sz, sz),
            albumentations.Normalize(
                mean=[0.485, 0.456, 0.406], 
                std=[0.229, 0.224, 0.225], 
                max_pixel_value=255.0, 
                p=1.0)
            ,ToTensorV2()], p=1.)

### CutMix, Mixup


In [None]:
def mixup_data(x, y, alpha=1.0, use_cuda=True):
    '''Returns mixed inputs, pairs of targets, and lambda'''
    if alpha > 0:
        lam = np.random.beta(alpha, alpha)
    else:
        lam = 1

    batch_size = x.size()[0]
    if use_cuda:
        index = torch.randperm(batch_size).cuda()
    else:
        index = torch.randperm(batch_size)

    mixed_x = lam * x + (1 - lam) * x[index, :]
    y_a, y_b = y, y[index]
    return mixed_x, y_a, y_b, lam


def mixup_criterion(criterion, pred, y_a, y_b, lam):
    return lam * criterion(pred, y_a) + (1 - lam) * criterion(pred, y_b)

### PL Data Module - https://www.kaggle.com/artgor/cassava-disease-identification-with-lightning

In [None]:
class CassavaDataModule(pl.LightningDataModule):
    def __init__(self,
                 df,
                 train_augs,
                 valid_augs,
                 path,
                bs=8,
                fold=0):
        super().__init__()
        self.df = df
        self.train_augs = train_augs
        self.valid_augs = valid_augs
        self.path = path
        self.bs = bs
        self.fold = fold

    def prepare_data(self):
        pass

    def setup(self, stage=None):
        
        folds = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
        
        train_indexes, valid_indexes = list(folds.split(self.df, self.df['label']))[self.fold]
        
        train_df = self.df.iloc[train_indexes]
        valid_df = self.df.iloc[valid_indexes]

        
        self.train_dataset = ImageClassificationDataset(image_names=train_df['image_id'].values,
                                                        transforms=train_augs,
                                                        labels=train_df['label'].values,
                                                        img_path=self.path,
                                                        mode='train',
                                                        labels_to_ohe=False,
                                                        n_classes=5)
        self.valid_dataset = ImageClassificationDataset(image_names=valid_df['image_id'].values,
                                                        transforms=valid_augs,
                                                        labels=valid_df['label'].values,
                                                        img_path=self.path,
                                                        mode='valid',
                                                        labels_to_ohe=False,
                                                        n_classes=5)

    def train_dataloader(self):
        train_loader = torch.utils.data.DataLoader(
            self.train_dataset,
            batch_size=self.bs,
            num_workers=0,
            shuffle=True,
        )
        return train_loader

    def val_dataloader(self):
        valid_loader = torch.utils.data.DataLoader(
            self.valid_dataset,
            batch_size=self.bs,
            num_workers=0,
            shuffle=False,
        )

        return valid_loader

    def test_dataloader(self):
        return None

### Focal Cosine Loss

In [None]:
class CosineLoss(nn.Module):
    def __init__(self, xent=.1, reduction="mean"):
        super(CosineLoss, self).__init__()
        self.xent = xent
        self.reduction = reduction
        
        self.y = torch.Tensor([1]).cuda()
        
    def forward(self, input, target):
        cosine_loss = F.cosine_embedding_loss(input, F.one_hot(target, num_classes=input.size(-1)), self.y, reduction=self.reduction)
        cent_loss = F.cross_entropy(F.normalize(input), target, reduction=self.reduction)
        
        return cosine_loss + self.xent * cent_loss

class FocalCosineLoss(nn.Module):
    def __init__(self, alpha=1, gamma=2, xent=.1, reduction="mean"):
        super(FocalCosineLoss, self).__init__()
        self.alpha = alpha
        self.gamma = gamma

        self.xent = xent
        self.reduction = reduction
        
        self.y = torch.Tensor([1]).cuda()
        
    def forward(self, input, target):
        cosine_loss = F.cosine_embedding_loss(input, F.one_hot(target, num_classes=input.size(-1)), self.y, reduction=self.reduction)

        cent_loss = F.cross_entropy(F.normalize(input), target, reduce=False)
        pt = torch.exp(-cent_loss)
        focal_loss = self.alpha * (1-pt)**self.gamma * cent_loss
        loss_ = focal_loss
        if self.reduction == "mean":
            focal_loss = torch.mean(focal_loss)
        
        return cosine_loss + self.xent * focal_loss,loss_

### Taylor Series Loss

In [None]:
import torch.cuda.amp as amp


'''
proposed in this paper: [Exploring Alternatives to Softmax Function](https://arxiv.org/pdf/2011.11538.pdf)
'''


##
# version 1: use torch.autograd
class TaylorSoftmax(nn.Module):
    '''
    This is the autograd version
    '''
    def __init__(self, dim=1, n=2):
        super(TaylorSoftmax, self).__init__()
        assert n % 2 == 0
        self.dim = dim
        self.n = n

    def forward(self, x):
        '''
        usage similar to nn.Softmax:
            >>> mod = TaylorSoftmax(dim=1, n=4)
            >>> inten = torch.randn(1, 32, 64, 64)
            >>> out = mod(inten)
        '''
        fn = torch.ones_like(x)
        denor = 1.
        for i in range(1, self.n+1):
            denor *= i
            fn = fn + x.pow(i) / denor
        out = fn / fn.sum(dim=self.dim, keepdims=True)
        return out


##
# version 1: use torch.autograd
class TaylorCrossEntropyLoss(nn.Module):
    '''
    This is the autograd version
    '''
    def __init__(self, n=2, ignore_index=-1, reduction='mean'):
        super(TaylorCrossEntropyLoss, self).__init__()
        assert n % 2 == 0
        self.taylor_softmax = TaylorSoftmax(dim=1, n=n)
        self.reduction = reduction
        self.ignore_index = ignore_index

    def forward(self, logits, labels):
        '''
        usage similar to nn.CrossEntropyLoss:
            >>> crit = TaylorCrossEntropyLoss(n=4)
            >>> inten = torch.randn(1, 10, 64, 64)
            >>> label = torch.randint(0, 10, (1, 64, 64))
            >>> out = crit(inten, label)
        '''
        log_probs = self.taylor_softmax(logits).log()
        loss = F.nll_loss(log_probs, labels, reduction=self.reduction,
                ignore_index=self.ignore_index)
        loss_ = F.nll_loss(log_probs, labels, reduction='none',
                          ignore_index=self.ignore_index)
        return loss,loss_

### Define encoder - https://www.kaggle.com/artgor/cassava-disease-identification-with-lightning

In [None]:
def freeze_until(net: Any, param_name: str = None) -> None:
    """
    Freeze net until param_name


    Args:
        net:
        param_name:

    """
    found_name = False
    for name, params in net.named_parameters():
        if name == param_name:
            found_name = True
        params.requires_grad = found_name

class BasicEncoder(nn.Module):
    def __init__(
        self,
        arch: str = 'resnet18',
        source: str = 'torchvision',
        pretrained: str = None,
        n_layers: int = -1,
        freeze: bool = False,
        to_one_channel: bool = False,
        freeze_until_layer: str = None,
    ) -> None:
        """
        Initialize Encoder.

        Args:
            num_classes: the number of target classes, the size of the last layer's output
            arch: the name of the architecture form pretrainedmodels
            pretrained: the mode for pretrained model from pretrainedmodels
            n_layers: number of layers to keep
            freeze: to freeze model
            freeze_until: freeze until this layer. If None, then freeze all layers
        """
        super().__init__()
        if source == 'timm':
            net = timm.create_model(arch, pretrained=pretrained)
            self.output_dimension = list(net.children())[-1].in_features
        else:
            print("Not implemented")
            
        if freeze:
            freeze_until(net, freeze_until_layer)
        
        layers = list(net.children())[:n_layers]    
        self.layers = nn.Sequential(*layers)

    def forward(self, x):
        output = self.layers(x)

        return output

In [None]:
class BasicDecoder(nn.Module):
    def __init__(self, pool_output_size: int = 1, n_classes: int = 5, output_dimension: int = 512) -> None:
        """
        Initialize Decoder.

        Args:
            pool_output_size: the size of the result feature map after adaptive pooling layer
            n_classes: n classes to output
            output_dimension: output dimension of encoder
        """
        super().__init__()
        self.fc = nn.Sequential(
            nn.Linear(output_dimension, 512),
            nn.ReLU(inplace=True),
            nn.Dropout(0.1),
            nn.Linear(512, 5),
        )
    def forward(self, x):
        #x = self.pool(x)
        output = self.fc(x.view(x.size()[0], -1))
        return output

In [None]:
timm.list_models()

In [None]:
class Net(nn.Module):
    def __init__(self) -> None:
        
        """
        Model class.

        Args:
            cfg: main config
        """
        super().__init__()
        self.encoder = BasicEncoder(arch='seresnext50_32x4d',
                                    source='timm',
                                    pretrained=True,
                                    n_layers=-1,
                                    freeze=False,
                                    to_one_channel=False,
                                    freeze_until_layer=None)
        self.decoder = BasicDecoder(1,5,self.encoder.output_dimension)
        self.loss_focal = FocalCosineLoss()
        self.loss_taylor= TaylorCrossEntropyLoss()
        
    def forward(self, x, targets):
        out = self.encoder(x)
        logits = self.decoder(out)
        loss_focal,f_ = self.loss_focal(logits, targets)
        loss_taylor,t_ = self.loss_taylor(logits, targets)
#         loss_focal,f_ = self.loss_focal(logits, targets).view(1)
#         loss_taylor,t_ = self.loss_taylor(logits, targets).view(1)
        return logits,loss_focal,loss_taylor,f_,t_


In [None]:
int(.4*12)

In [None]:
class LitCassava(pl.LightningModule):
    def __init__(self, model):
        super(LitCassava, self).__init__()
        self.model = model
        self.metric = pl.metrics.Accuracy()
        self.learning_rate = 1e-4 #1e-4
        
    def forward(self, x, targets, *args, **kwargs):
        return self.model(x, targets)

    def configure_optimizers(self):
        optimizer = torch.optim.AdamW(self.model.parameters(), lr=self.learning_rate, weight_decay=0.001)
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(optimizer, factor=0.1, patience=2)
        return (
            [optimizer],
            [{'scheduler': scheduler, 'interval': 'epoch', 'monitor': 'valid_loss'}],
        )

    def training_step(
        self, batch: torch.Tensor, batch_idx: int
    ) -> Union[int, Dict[str, Union[torch.Tensor, Dict[str, torch.Tensor]]]]:
        image = batch['image']
        target = batch['target']
        
        ### add in cutmix+mixup+cutout
        const = np.random.randint(10)
        if batch_idx%2 == 0 and batch_idx%2 == 1:
            if const<3:
                mixed_input, targets_a, targets_b, lam = mixup_data(image, target,alpha=0.4)
                logits,loss_focal_a,loss_taylor_a,f_a,t_a = self(mixed_input,targets_a)
                logits,loss_focal_b,loss_taylor_b,f_b,t_b = self(mixed_input,targets_b)
                loss = lam * loss_focal_a + (1 - lam) * loss_focal_b
                loss_ = lam * f_a + (1 - lam) * f_b
            else:
                logits, loss_focal,loss_taylor,f_,t_ = self(image, target)
                loss = loss_focal
                loss_ = f_
            
        else:
            if const<3:
                mixed_input, targets_a, targets_b, lam = mixup_data(image, target,alpha=0.4)
                logits,loss_focal_a,loss_taylor_a,f_a,t_a = self(mixed_input,targets_a)
                logits,loss_focal_b,loss_taylor_b,f_b,t_b = self(mixed_input,targets_b)               
                loss = lam * loss_taylor_a + (1 - lam) * loss_taylor_b
                loss_ = lam * t_a + (1 - lam) * t_b
            else:
                logits, loss_focal,loss_taylor,f_,t_ = self(image, target)
                loss = loss_taylor
                loss_ = t_
                
        if self.trainer.current_epoch%3== 0 and self.trainer.current_epoch >0 and len(loss_) == 12:
            _, idxs = loss_.topk(10, largest=False)
            loss = loss_.index_select(0, idxs)
            loss = torch.mean(loss)
        else:
            loss = loss
            
            
        score = self.metric(logits.argmax(1), target)
        logs = {'train_loss': loss, f'train_accuracy': score}
        return {
            'loss': loss,
            'log': logs,
            'progress_bar': logs,
            'logits': logits,
            'target': target,
            f'train_accuracy': score,
        }


    def training_epoch_end(self, outputs):
        avg_loss = torch.stack([x['loss'] for x in outputs]).mean()
        y_true = torch.cat([x['target'] for x in outputs])
        y_pred = torch.cat([x['logits'] for x in outputs])
        score = self.metric(y_pred.argmax(1), y_true)
        
        logs = {'train_loss': avg_loss, 'train_accuracy': score}
        return {'log': logs, 'progress_bar': logs}

    def validation_step(
        self, batch: torch.Tensor, batch_idx: int
    ) -> Union[int, Dict[str, Union[torch.Tensor, Dict[str, torch.Tensor]]]]:
        image = batch['image']
        target = batch['target']
        logits, loss_focal,loss_taylor,f_,t_ = self(image, target)
        loss = (loss_focal+loss_taylor)/2
        score = self.metric(logits.argmax(1), target)
        logs = {'valid_loss': loss, f'valid_accuracy': score}

        return {
            'loss': loss,
            'log': logs,
            'progress_bar': logs,
            'logits': logits,
            'target': target,
            f'valid_accuracy': score,
        }

    def validation_epoch_end(self, outputs):
        avg_loss = torch.stack([x['loss'] for x in outputs]).mean()
        y_true = torch.cat([x['target'] for x in outputs])
        y_pred = torch.cat([x['logits'] for x in outputs])
        score = self.metric(y_pred.argmax(1), y_true)

        # score = torch.tensor(1.0, device=self.device)
        logs = {'valid_loss': avg_loss, f'valid_accuracy': score, 'accuracy': score}
        return {'valid_loss': avg_loss, 'log': logs, 'progress_bar': logs}

In [None]:
import time

In [None]:
#time.sleep(60*5)
for f in range(5):
    print(f"Running Fold:{f}")
    model = Net()
    dm = CassavaDataModule(labels, train_augs, valid_augs, f'{base_dir}/train_images/',bs=12,fold=f)

    modelSavePath = f'C:/Users/Kaggle/Leaf_Classification/saved_models/{f}'
    if not os.path.exists(modelSavePath):
        os.makedirs(modelSavePath)
        
    trainer = pl.Trainer(
        auto_scale_batch_size = False,
        checkpoint_callback=ModelCheckpoint(monitor='valid_accuracy',
                                            save_top_k=2, dirpath = modelSavePath,filename='{epoch}_{valid_loss:.4f}_{valid_accuracy:.4f}', mode='max'),
        accumulate_grad_batches = 1,
        gpus=1,
        max_epochs=25,
        num_sanity_val_steps=0,
        weights_summary='top',
       precision = 16,
        callbacks = [EarlyStopping(monitor='valid_accuracy', patience=6, mode='max')]
    )
    
    lit_model = LitCassava(model)


    trainer.fit(lit_model, dm)

    del trainer,model,lit_model
    gc.collect()
    time.sleep(60*5)