# Directory settings

In [None]:
# ====================================================
# Directory settings
# ====================================================
import os

OUTPUT_DIR = './'
MODEL_DIR = '../input/cassava-resnext50-32x4d-weights-test/'
MODEL_DIR_TEST = '../input/cassava-weights-test/'
MODEL_DIR_RES = '../input/cassava-resnext50-32x4d-weights/'
MODEL_DIR_SERES_MIX = '../input/cassava-seresnext50-32x4d-weights/'
MODEL_DIR_SERES_NO = '../input/cassava-seresnext50-32x4d-weights-nomix/'
MODEL_DIR_EFNET4 = '../input/cassava-efficientnet-b4-weights/'
MODEL_DIR_EFNET5 = '../input/cassava-efficientnet-b5-weights/'
MODEL_DIR_CSP = '../input/cassava-cspresnext50-weights/'
MODEL_DIR_VIT = '../input/vitbase16/'
MODEL_DIR_REG = '../input/cassava-regnety-weights/'
if not os.path.exists(OUTPUT_DIR):
    os.makedirs(OUTPUT_DIR)
    
TRAIN_PATH = '../input/cassava-leaf-disease-classification/train_images'
TEST_PATH = '../input/cassava-leaf-disease-classification/test_images'

# CFG

In [None]:
# ====================================================
# CFG
# ====================================================
class CFG:
    debug=False
    num_workers=8
    model_name='resnext50_32x4d' # tf_efficientnet_b4_ns, resnext50_32x4d
    size=512
    size_vit=384
    batch_size=32
    seed=2020
    target_size=5
    target_col='label'
    n_fold=5
    trn_fold=[0,1,2,3,4]
    inference=True
    tta=False
    tta_vit=False
    tta_res=False
    tta_se=False
    tta_ef5=False
    num_tta=10

In [None]:
# ====================================================
# Library
# ====================================================
import sys
sys.path.append('../input/timm-nfnet/')

import os
import math
import time
import random
import shutil
from pathlib import Path
from contextlib import contextmanager
from collections import defaultdict, Counter

import scipy as sp
import numpy as np
import pandas as pd

from sklearn import preprocessing
from sklearn.metrics import accuracy_score
from sklearn.model_selection import StratifiedKFold

from tqdm.auto import tqdm
from functools import partial

import cv2
from PIL import Image

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.optim import Adam, SGD
import torchvision.models as models
from torch.nn.parameter import Parameter
from torch.utils.data import DataLoader, Dataset
from torch.optim.lr_scheduler import CosineAnnealingWarmRestarts, CosineAnnealingLR, ReduceLROnPlateau

import albumentations as A
from albumentations.pytorch import ToTensorV2

import timm

import warnings 
warnings.filterwarnings('ignore')

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Utils

In [None]:
# ====================================================
# Utils
# ====================================================
def get_score(y_true, y_pred):
    return accuracy_score(y_true, y_pred)


@contextmanager
def timer(name):
    t0 = time.time()
    LOGGER.info(f'[{name}] start')
    yield
    LOGGER.info(f'[{name}] done in {time.time() - t0:.0f} s.')


def init_logger(log_file=OUTPUT_DIR+'inference.log'):
    from logging import getLogger, INFO, FileHandler,  Formatter,  StreamHandler
    logger = getLogger(__name__)
    logger.setLevel(INFO)
    handler1 = StreamHandler()
    handler1.setFormatter(Formatter("%(message)s"))
    handler2 = FileHandler(filename=log_file)
    handler2.setFormatter(Formatter("%(message)s"))
    logger.addHandler(handler1)
    logger.addHandler(handler2)
    return logger

#LOGGER = init_logger()

def seed_torch(seed=42):
    random.seed(seed)
    os.environ['PYTHONHASHSEED'] = str(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.backends.cudnn.deterministic = True

seed_torch(seed=CFG.seed)

# Data Loading

In [None]:
test = pd.read_csv('../input/cassava-leaf-disease-classification/sample_submission.csv')
test_ef = pd.read_csv('../input/cassava-leaf-disease-classification/sample_submission.csv')
test.head()

# Dataset

In [None]:
# ====================================================
# Dataset
# ====================================================
class TestDataset(Dataset):
    def __init__(self, df, transform=None):
        self.df = df
        self.file_names = df['image_id'].values
        self.transform = transform
        
    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        file_name = self.file_names[idx]
        file_path = f'{TEST_PATH}/{file_name}'
        image = cv2.imread(file_path)
        image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        if self.transform:
            augmented = self.transform(image=image)
            image = augmented['image']
        return image

# Transforms

In [None]:
# ====================================================
# Transforms
# ====================================================
def get_transforms(*, data):
    if data == 'valid':
        return A.Compose([
            A.Resize(CFG.size, CFG.size),
            A.Normalize(
                mean=[0.485, 0.456, 0.406],
                std=[0.229, 0.224, 0.225],
            ),
            ToTensorV2(),
        ])

def get_inference_transforms():
    return A.Compose([
            A.Resize(CFG.size, CFG.size),
            A.HorizontalFlip(p=0.5),
            A.VerticalFlip(p=0.5),
            A.Normalize(mean=[0.485, 0.456, 0.406], 
                        std=[0.229, 0.224, 0.225],),
            ToTensorV2(),
        ])

def get_inference_transforms_vit():
    return A.Compose([
            A.RandomResizedCrop(CFG.size_vit, CFG.size_vit),
#             A.Resize(CFG.size, CFG.size),
            A.Transpose(p=0.5),
            A.HorizontalFlip(p=0.5),
            A.VerticalFlip(p=0.5),
            A.HueSaturationValue(hue_shift_limit=0.2, sat_shift_limit=0.2, val_shift_limit=0.2, p=0.5),
            A.RandomBrightnessContrast(brightness_limit=(-0.1,0.1), contrast_limit=(-0.1, 0.1), p=0.5),
            A.Normalize(mean=[0.485, 0.456, 0.406], 
                        std=[0.229, 0.224, 0.225],
                        max_pixel_value=255.0, p=1.0),
            ToTensorV2(p=1.0),
        ], p=1.)


def get_transforms_vit(*, data):
    if data == 'valid':
        return A.Compose([
            A.Resize(CFG.size_vit, CFG.size_vit),
            A.Normalize(
                mean=[0.485, 0.456, 0.406],
                std=[0.229, 0.224, 0.225],
            ),
            ToTensorV2(),
        ])

In [None]:
import albumentations as A
from albumentations import Compose
from albumentations.pytorch import ToTensorV2


def get_inference_transforms_efnet4():
    return A.Compose([
            A.HorizontalFlip(p=0.5),
            A.VerticalFlip(p=0.5),
            A.ShiftScaleRotate(p = 1.0),
            A.ColorJitter(brightness=0.1, contrast=0.2, saturation=0.2, hue=0.00, always_apply=False, p=1.0),
            A.RandomCrop(height= CFG.size, width = CFG.size,always_apply=True, p=1.0),
            A.Normalize(mean=[0.485, 0.456, 0.406], 
                        std=[0.229, 0.224, 0.225],
                        max_pixel_value=255.0, p=1.0),
            ToTensorV2(p=1.0),
        ], p=1.)

# MODEL

In [None]:
# ====================================================
# MODEL
# ====================================================
class CustomResNext(nn.Module):
    def __init__(self, model_name='resnext50_32x4d', pretrained=False):
        super().__init__()
        self.model = timm.create_model(model_name, pretrained=pretrained)    
        n_features = self.model.fc.in_features
        self.model.fc = nn.Linear(n_features, CFG.target_size, bias=True)

    def forward(self, x):
        x = self.model(x)
        return x

In [None]:
# ====================================================
# MODEL
# ====================================================
class CustomSeResNext(nn.Module):
    def __init__(self, model_name='seresnext50_32x4d', pretrained=False):
        super().__init__()
        self.model = timm.create_model(model_name, pretrained=pretrained)
        n_features = self.model.fc.in_features
        self.model.fc = nn.Linear(n_features, CFG.target_size, bias=True)


    def forward(self, x):
        x = self.model(x)
        return x

In [None]:
class CustomResNextTorch(nn.Module):
    def __init__(self, pretrained=False):
        super().__init__()
        self.model = models.resnext50_32x4d(pretrained = pretrained)
        n_features = self.model.fc.in_features
        self.model.fc = nn.Linear(n_features, CFG.target_size, bias=True)


    def forward(self, x):
        x = self.model(x)
        return x

In [None]:
class CustomEfficientNet(nn.Module):
    def __init__(self, model_name=CFG.model_name, pretrained=False):
        super().__init__()
        self.model = timm.create_model(model_name, pretrained=False)
        n_features = self.model.classifier.in_features
        self.model.classifier = nn.Linear(n_features, CFG.target_size)

    def forward(self, x):
        x = self.model(x)
        return x

In [None]:
class ViTBase16(nn.Module):
    def __init__(self, pretrained=False):

        super(ViTBase16, self).__init__()
        self.model = timm.create_model('vit_base_patch16_384', pretrained=False)
        self.model.head = nn.Linear(self.model.head.in_features, CFG.target_size)
        
    def forward(self, x):
        x = self.model(x)
        return x

In [None]:
class CustomEcaRes(nn.Module):
    def __init__(self, model_name='ecaresnet50d', pretrained=False):
        super().__init__()
        self.model = timm.create_model(model_name, pretrained=pretrained)
        n_features = self.model.fc.in_features
        self.model.fc = nn.Linear(n_features, CFG.target_size, bias=True)

    def forward(self, x):
        x = self.model(x)
        return x

In [None]:
class CustomCspRes(nn.Module):
    def __init__(self, model_name=CFG.model_name, pretrained=False):
        super().__init__()
        self.model = timm.create_model(model_name, pretrained=pretrained)
        n_features = self.model.head.fc.in_features
        self.model.head.fc = nn.Linear(n_features, CFG.target_size, bias=True)

    def forward(self, x):
        x = self.model(x)
        return x

In [None]:
class RegNetY(nn.Module):
    def __init__(self, model_name='regnety_080', pretrained=False):

        super(RegNetY, self).__init__()
        self.model = timm.create_model(model_name, pretrained=pretrained)
        n_features = self.model.head.fc.in_features
        self.model.head.fc = nn.Linear(n_features, CFG.target_size, bias=True)
        
    def forward(self, x):
        x = self.model(x)
        return x

# Helper functions

In [None]:
# ====================================================
# Helper functions
# ====================================================
def load_state(model_path, model):
    try:  # single GPU model_file
        model.load_state_dict(torch.load(model_path)['model'], strict=True)
        state_dict = torch.load(model_path)['model']
    except:  # multi GPU model_file
        state_dict = torch.load(model_path)['model']
        state_dict = {k[7:] if k.startswith('module.') else k: state_dict[k] for k in state_dict.keys()}

    return state_dict


def inference(model, states, test_loader, device):
    model.to(device)
    tk0 = tqdm(enumerate(test_loader), total=len(test_loader))
    probs = []
    for i, (images) in tk0:
        images = images.to(device)
        avg_preds = []
        for state in states:
            model.load_state_dict(state) 
            model.eval()
            with torch.no_grad():
                y_preds = model(images)
                
            avg_preds.append(y_preds.softmax(1).to('cpu').numpy())
        avg_preds = np.mean(avg_preds, axis=0)
        probs.append(avg_preds)
        
    probs = np.concatenate(probs)
    return probs

# inference

In [None]:
model = ViTBase16(pretrained=False)
states = [load_state(MODEL_DIR_VIT+f'vit_base_patch32_384_fold{fold}_best.pth', model) for fold in CFG.trn_fold]

if CFG.tta_vit == True:
    test_dataset = TestDataset(test, transform=get_inference_transforms_vit())
    test_loader = DataLoader(test_dataset, batch_size=CFG.batch_size, shuffle=False, 
                         num_workers=CFG.num_workers, pin_memory=True)
    
    predictions_vit = []
    for _ in range(CFG.num_tta):
         predictions_vit += [inference(model, states, test_loader, device)]
    predictions_vit = np.mean(predictions_vit, axis=0)
    
else:
    test_dataset = TestDataset(test, transform=get_transforms_vit(data='valid'))
    test_loader = DataLoader(test_dataset, batch_size=CFG.batch_size, shuffle=False, 
                         num_workers=CFG.num_workers, pin_memory=True)
    
    predictions_vit = inference(model, states, test_loader, device)

print(predictions_vit)

In [None]:
# ====================================================
# inference
# ====================================================
model = CustomEfficientNet('tf_efficientnet_b4_ns', pretrained=False)
states = [load_state(MODEL_DIR_EFNET4+f'tf_efficientnet_b4_ns_fold{fold}_best.pth', model) for fold in CFG.trn_fold]

if CFG.tta == True:
    test_dataset = TestDataset(test, transform=get_inference_transforms_efnet4())
    test_loader = DataLoader(test_dataset, batch_size=CFG.batch_size, shuffle=False, 
                         num_workers=CFG.num_workers, pin_memory=True)
    
    predictions_ef = []
    
    for _ in range(5):
         predictions_ef += [inference(model, states, test_loader, device)]
    predictions_ef = np.mean(predictions_ef, axis=0)
    
else:
    test_dataset = TestDataset(test, transform=get_transforms(data='valid'))
    test_loader = DataLoader(test_dataset, batch_size=CFG.batch_size, shuffle=False, 
                         num_workers=CFG.num_workers, pin_memory=True)
    
    predictions_ef = inference(model, states, test_loader, device)



print(predictions_ef)

# # submission
# test['label'] = np.argmax(predictions_ef, axis=1)
# test[['image_id', 'label']].to_csv(OUTPUT_DIR+'submission.csv', index=False)
# test.head()

In [None]:
model = CustomEfficientNet('tf_efficientnet_b5_ns', pretrained=False)
states = [load_state(MODEL_DIR_EFNET5+f'tf_efficientnet_b5_ns_fold{fold}_best.pth', model) for fold in CFG.trn_fold]

if CFG.tta_ef5 == True:
    test_dataset = TestDataset(test, transform=get_inference_transforms())
    test_loader = DataLoader(test_dataset, batch_size=CFG.batch_size, shuffle=False, 
                         num_workers=CFG.num_workers, pin_memory=True)
    
    predictions_ef5 = []
    for _ in range(5):
         predictions_ef5 += [inference(model, states, test_loader, device)]
    predictions_ef5 = np.mean(predictions_ef5, axis=0)
    
else:
    test_dataset = TestDataset(test, transform=get_transforms(data='valid'))
    test_loader = DataLoader(test_dataset, batch_size=CFG.batch_size, shuffle=False, 
                         num_workers=CFG.num_workers, pin_memory=True)
    
    predictions_ef5 = inference(model, states, test_loader, device)



print(predictions_ef5)

# submission
# test['label'] = np.argmax(predictions_ef, axis=1)
# test[['image_id', 'label']].to_csv(OUTPUT_DIR+'submission.csv', index=False)
# test.head()

In [None]:
model = CustomResNextTorch(pretrained=False)
states = [load_state(MODEL_DIR_RES+f'resnext50_32x4d_fold{fold}_best.pth', model) for fold in CFG.trn_fold]

if CFG.tta_res == True:
    test_dataset = TestDataset(test, transform=get_inference_transforms())
    test_loader = DataLoader(test_dataset, batch_size=CFG.batch_size, shuffle=False, 
                         num_workers=CFG.num_workers, pin_memory=True)
    
    predictions_res = []
    for _ in range(10):
         predictions_res += [inference(model, states, test_loader, device)]
    predictions_res = np.mean(predictions_res, axis=0)
    
else:
    test_dataset = TestDataset(test, transform=get_transforms(data='valid'))
    test_loader = DataLoader(test_dataset, batch_size=CFG.batch_size, shuffle=False, 
                         num_workers=CFG.num_workers, pin_memory=True)
    
    predictions_res = inference(model, states, test_loader, device)

print(predictions_res)

# # # submission
# test['label'] = np.argmax(predictions_res, axis=1)
# test[['image_id', 'label']].to_csv(OUTPUT_DIR+'submission.csv', index=False)
# test.head()

In [None]:
model = CustomSeResNext('seresnext50_32x4d', pretrained=False)
states = [load_state(MODEL_DIR_SERES_NO+f'seresnext50_32x4d_fold{fold}_best.pth', model) for fold in CFG.trn_fold]

if CFG.tta_se == True:
    test_dataset = TestDataset(test, transform=get_inference_transforms())
    test_loader = DataLoader(test_dataset, batch_size=CFG.batch_size, shuffle=False, 
                         num_workers=CFG.num_workers, pin_memory=True)
    
    predictions_seres = []
    for _ in range(CFG.num_tta):
        predictions_seres += [inference(model, states, test_loader, device)]
        
    predictions_seres = np.mean(predictions_seres, axis=0)
    
else:
    test_dataset = TestDataset(test, transform=get_transforms(data='valid'))
    test_loader = DataLoader(test_dataset, batch_size=CFG.batch_size, shuffle=False, 
                         num_workers=CFG.num_workers, pin_memory=True)
    
    predictions_seres = inference(model, states, test_loader, device)

print(predictions_seres)

# # submission
# test['label'] = np.argmax(predictions_seres, axis=1)
# test[['image_id', 'label']].to_csv(OUTPUT_DIR+'submission.csv', index=False)
# test.head()

In [None]:
model = CustomSeResNext('seresnext50_32x4d', pretrained=False)
states = [load_state(MODEL_DIR_SERES_MIX+f'seresnext50_32x4d_fold{fold}_best.pth', model) for fold in CFG.trn_fold]

if CFG.tta_se == True:
    test_dataset = TestDataset(test, transform=get_inference_transforms())
    test_loader = DataLoader(test_dataset, batch_size=CFG.batch_size, shuffle=False, 
                         num_workers=CFG.num_workers, pin_memory=True)
    
    predictions_seres_mix = []
    for _ in range(CFG.num_tta):
        predictions_seres_mix += [inference(model, states, test_loader, device)]
        
    predictions_seres_mix = np.mean(predictions_seres_mix, axis=0)
    
else:
    test_dataset = TestDataset(test, transform=get_transforms(data='valid'))
    test_loader = DataLoader(test_dataset, batch_size=CFG.batch_size, shuffle=False, 
                         num_workers=CFG.num_workers, pin_memory=True)
    
    predictions_seres_mix = inference(model, states, test_loader, device)

print(predictions_seres_mix)

In [None]:
model = CustomEcaRes('ecaresnet50d', pretrained=False)
states = [load_state(MODEL_DIR+f'ecaresnet50d_fold{fold}_best.pth', model) for fold in CFG.trn_fold]

if CFG.tta_se == True:
    test_dataset = TestDataset(test, transform=get_inference_transforms())
    test_loader = DataLoader(test_dataset, batch_size=CFG.batch_size, shuffle=False, 
                         num_workers=CFG.num_workers, pin_memory=True)
    
    predictions_res_removed = []
    
    for _ in range(5):
         predictions_res_removed += [inference(model, states, test_loader, device)]
    predictions_res_removed = np.mean(predictions_res_removed, axis=0)
    
else:
    test_dataset = TestDataset(test, transform=get_transforms(data='valid'))
    test_loader = DataLoader(test_dataset, batch_size=CFG.batch_size, shuffle=False, 
                         num_workers=CFG.num_workers, pin_memory=True)
    
    predictions_res_removed = inference(model, states, test_loader, device)



print(predictions_res_removed)

# # submission
# test['label'] = np.argmax(predictions_res_removed, axis=1)
# test[['image_id', 'label']].to_csv(OUTPUT_DIR+'submission.csv', index=False)
# test.head()

In [None]:
model = RegNetY('regnety_080', pretrained=False)
states = [load_state(MODEL_DIR_REG+f'regnety_080_fold{fold}_best.pth', model) for fold in CFG.trn_fold]

if CFG.tta_se == True:
    test_dataset = TestDataset(test, transform=get_inference_transforms())
    test_loader = DataLoader(test_dataset, batch_size=CFG.batch_size, shuffle=False, 
                         num_workers=CFG.num_workers, pin_memory=True)
    
    predictions_regnet = []
    for _ in range(CFG.num_tta):
        predictions_regnet += [inference(model, states, test_loader, device)]
        
    predictions_regnet = np.mean(predictions_regnet, axis=0)
    
else:
    test_dataset = TestDataset(test, transform=get_transforms(data='valid'))
    test_loader = DataLoader(test_dataset, batch_size=CFG.batch_size, shuffle=False, 
                         num_workers=CFG.num_workers, pin_memory=True)
    
    predictions_regnet = inference(model, states, test_loader, device)

print(predictions_regnet)

# #submission
# test['label'] = np.argmax(predictions_regnet, axis=1)
# test[['image_id', 'label']].to_csv(OUTPUT_DIR+'submission.csv', index=False)
# test.head()

In [None]:
# submission
predictions = 0.125*(predictions_seres + predictions_res + predictions_vit + predictions_ef5 + predictions_seres_mix + predictions_res_removed + predictions_regnet + predictions_ef)
print(predictions)
print(np.sum(predictions))

test['label'] = np.argmax(predictions, axis=1)
test[['image_id', 'label']].to_csv(OUTPUT_DIR+'submission.csv', index=False)
test.head()